faster-whisper 实时识别电脑语音转文本
前言
以前做的智能对话软件接的baidu api,想换成本地的,就搭一套faster-whisper吧。
下面是b站视频实时转写的截图
项目
搭建环境
所需要的cudann已经装好了,如果装的是12.2应该是包含cublas了
没装的,可以从下面链接下载装一下,文末的参考视频中也有讲解
https://github.com/purfview/whisper-standalone-win/releases/tag/libs
ancanda
的运行环境去clone
一下之前配好的环境,用之前bertvits的即可
安装faster-whisper
输入即可安装
pip install faster-whisper
下载模型
https://huggingface.co/systran/faster-whisper-large-v3
下载完放到代码旁边就可以了
编写测试代码
# local_files_only=true 表示加载本地模型
# model_size_or_path=path 指定加载模型路径
# device="cuda" 指定使用cuda
# compute_type="int8_float16" 量化为8位
# language="zh" 指定音频语言
# vad_filter=true 开启vad
# vad_parameters=dict(min_silence_duration_ms=1000) 设置vad参数
from faster_whisper import whispermodel
model_size = "large-v3"
path = r"d:\project\python_project\fasterwhisper\large-v3"
# run on gpu with fp16
model = whispermodel(model_size_or_path=path, device="cuda", local_files_only=true)
# or run on gpu with int8
# model = whispermodel(model_size, device="cuda", compute_type="int8_float16")
# or run on cpu with int8
# model = whispermodel(model_size, device="cpu", compute_type="int8")
segments, info = model.transcribe("audio.wav", beam_size=5, language="zh", vad_filter=true, vad_parameters=dict(min_silence_duration_ms=1000))
print("detected language '%s' with probability %f" % (info.language, info.language_probability))
for segment in segments:
print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))
运行测试代码
找个音频放入文件夹内,输入python main.py
即可运行!
可以看到正确(不太正确)的识别出了音频说了什么。
实时转写脚本
新建一个脚本transper.py
运行即可
import os
import sys
import time
import wave
import tempfile
import threading
import torch
import pyaudiowpatch as pyaudio
from faster_whisper import whispermodel as whisper
# a bigger audio buffer gives better accuracy
# but also increases latency in response.
# 表示音频缓冲时间的常量
audio_buffer = 5
# 此函数使用 pyaudio 库录制音频,并将其保存为一个临时的 wav 文件。
# 使用 pyaudio.pyaudio 实例创建一个音频流,通过指定回调函数 callback 来实时写入音频数据到 wav 文件。
# time.sleep(audio_buffer) 会阻塞执行,确保录制足够的音频时间。
# 最后,函数返回保存的 wav 文件的文件名。
def record_audio(p, device):
"""record audio from output device and save to temporary wav file."""
with tempfile.namedtemporaryfile(suffix=".wav", delete=false) as f:
filename = f.name
wave_file = wave.open(filename, "wb")
wave_file.setnchannels(device["maxinputchannels"])
wave_file.setsampwidth(pyaudio.get_sample_size(pyaudio.paint16))
wave_file.setframerate(int(device["defaultsamplerate"]))
def callback(in_data, frame_count, time_info, status):
"""write frames and return pa flag"""
wave_file.writeframes(in_data)
return (in_data, pyaudio.pacontinue)
stream = p.open(
format=pyaudio.paint16,
channels=device["maxinputchannels"],
rate=int(device["defaultsamplerate"]),
frames_per_buffer=pyaudio.get_sample_size(pyaudio.paint16),
input=true,
input_device_index=device["index"],
stream_callback=callback,
)
try:
time.sleep(audio_buffer) # blocking execution while playing
finally:
stream.stop_stream()
stream.close()
wave_file.close()
# print(f"{filename} saved.")
return filename
# 此函数使用 whisper 模型对录制的音频进行转录,并输出转录结果。
def whisper_audio(filename, model):
"""transcribe audio buffer and display."""
# segments, info = model.transcribe(filename, beam_size=5, task="translate", language="zh", vad_filter=true, vad_parameters=dict(min_silence_duration_ms=1000))
segments, info = model.transcribe(filename, beam_size=5, language="zh", vad_filter=true, vad_parameters=dict(min_silence_duration_ms=1000))
os.remove(filename)
# print(f"{filename} removed.")
for segment in segments:
# print(f"[{segment.start:.2f} -> {segment.end:.2f}] {segment.text.strip()}")
print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))
# main 函数是整个脚本的主控制函数。
# 加载 whisper 模型,选择合适的计算设备(gpu 或 cpu)。
# 获取默认的 wasapi 输出设备信息,并选择默认的扬声器(输出设备)。
# 使用 pyaudio 开始录制音频,并通过多线程运行 whisper_audio 函数进行音频转录。
def main():
"""load model record audio and transcribe from default output device."""
print("loading model...")
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"using {device} device.")
# model = whisper("large-v3", device=device, compute_type="float16")
model = whisper("large-v3", device=device, local_files_only=true)
print("model loaded.")
with pyaudio.pyaudio() as pya:
# create pyaudio instance via context manager.
try:
# get default wasapi info
wasapi_info = pya.get_host_api_info_by_type(pyaudio.pawasapi)
except oserror:
print("looks like wasapi is not available on the system. exiting...")
sys.exit()
# get default wasapi speakers
default_speakers = pya.get_device_info_by_index(
wasapi_info["defaultoutputdevice"]
)
if not default_speakers["isloopbackdevice"]:
for loopback in pya.get_loopback_device_info_generator():
# try to find loopback device with same name(and [loopback suffix]).
# unfortunately, this is the most adequate way at the moment.
if default_speakers["name"] in loopback["name"]:
default_speakers = loopback
break
else:
print(
"""
default loopback output device not found.
run `python -m pyaudiowpatch` to check available devices.
exiting...
"""
)
sys.exit()
print(
f"recording from: {default_speakers['name']} ({default_speakers['index']})\n"
)
while true:
filename = record_audio(pya, default_speakers)
thread = threading.thread(target=whisper_audio, args=(filename, model))
thread.start()
main()
实时转写websocket服务器模式
在最新google bard的帮助下,从同步多线程单机版变成了异步websocket服务器版本,unity可以链接并监听实时转写的数据了(写这篇文章时是冬季,chatgpt实测已经开始休眠状态了)
import asyncio
import os
import wave
import tempfile
import torch
import pyaudiowpatch as pyaudio
from faster_whisper import whispermodel as whisper
import websockets
import json
# audio buffer time
audio_buffer = 5
# dictionary to store websocket connections
clients = {}
# handle client
async def handle_client(websocket):
client_id = id(websocket) # using the websocket object's id as a unique identifier
print(f"client connected from {websocket.remote_address} with id {client_id}")
clients[client_id] = websocket
try:
# print(f"client connected from {websocket.remote_address}")
# wait for messages from the client
async for message in websocket:
print(f"received message from client {client_id}: {message}")
# process the message (you can replace this with your logic)
response = f"server received: {message}"
# send a response back to the client
await websocket.send(response)
print(f"sent response to client {client_id}: {response}")
except websockets.exceptions.connectionclosederror:
print(f"connection with {websocket.remote_address} closed.")
finally:
# remove the websocket connection when the client disconnects
del clients[client_id]
# send a message to all connected clients
async def send_all_clients(message):
if clients==none or clients=={}:
print("no clients connected.")
return
for client_id, websocket in clients.items():
try:
await websocket.send(message)
print(f"sent message to client {client_id}: {message}")
except exception as e:
print(f"error sending message to client {client_id}: {e}")
# send a message to a specific client identified by client_id
async def send_message(client_id, message):
if client_id in clients:
websocket = clients[client_id]
await websocket.send(message)
print(f"sent message to client {client_id}: {message}")
else:
print(f"client with id {client_id} not found.")
# start the server
async def main_server():
server = await websockets.serve(handle_client, "localhost", 8765)
print("websocket server started. listening on ws://localhost:8765")
await server.wait_closed()
#this function records audio using the pyaudio library and saves it as a temporary wav file.
#use pyaudio pyaudio instance creates an audio stream and writes audio data in real-time to a wav file by specifying the callback function callback.
#due to the use of the asyncio library, it is no longer necessary to use time. sleep() to block execution, but instead to use asyncio. sleep() to wait asynchronously.
#finally, the function returns the file name of the saved wav file.
async def record_audio(p, device):
"""record audio from output device and save to temporary wav file."""
with tempfile.namedtemporaryfile(suffix=".wav", delete=false) as f:
filename = f.name
wave_file = wave.open(filename, "wb")
wave_file.setnchannels(device["maxinputchannels"])
wave_file.setsampwidth(pyaudio.get_sample_size(pyaudio.paint16))
wave_file.setframerate(int(device["defaultsamplerate"]))
def callback(in_data, frame_count, time_info, status):
"""write frames and return pa flag"""
wave_file.writeframes(in_data)
return (in_data, pyaudio.pacontinue)
stream = p.open(
format=pyaudio.paint16,
channels=device["maxinputchannels"],
rate=int(device["defaultsamplerate"]),
frames_per_buffer=pyaudio.get_sample_size(pyaudio.paint16),
input=true,
input_device_index=device["index"],
stream_callback=callback,
)
await asyncio.sleep(audio_buffer)
stream.stop_stream()
stream.close()
wave_file.close()
# print(f"{filename} saved.")
return filename
# segmentdata class
class segmentdata:
def __init__(self, start, end,text):
# 实例属性
self.start = start
self.end = end
self.text = text
def __dict__(self):
return {"start": self.start, "end": self.end, "text": self.text}
def convert_to_unity_data(data): # 参数 data 为字典列表
unity_data = []
for item in data:
segment_data = segmentdata(item["start"], item["end"], item["text"])
unity_data.append(segment_data)
return unity_data
# this function transcribes the recorded audio using the whisper model and outputs the transcription result.
async def whisper_audio(filename, model):
"""transcribe audio buffer and display."""
segments, info = model.transcribe(filename, beam_size=5, language="zh", vad_filter=true, vad_parameters=dict(min_silence_duration_ms=1000))
os.remove(filename)
# print(f"{filename} removed.")
if segments:
segments_dict_list = [{"start": segment.start, "end": segment.end, "text": segment.text.strip()} for segment in segments]
json_transcriptions=json.dumps(segments_dict_list)
print(f"transcription: {json_transcriptions}")
try:
await send_all_clients(json_transcriptions)
except exception as e:
print(f"error sending message: {e}")
# start recording audio using pyaudio and concurrently run the whisper_audio function for audio transcription using asyncio.gather.
async def main():
"""load model record audio and transcribe from default output device."""
print("loading model...")
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"using {device} device.")
model = whisper("large-v3", device=device, local_files_only=true,compute_type="int8_float16")
print("model loaded.")
with pyaudio.pyaudio() as pya:
# get microphone device information (assuming you want to select the first microphone device)
microphone_index = 0
microphone_info = pya.get_device_info_by_index(microphone_index)
while true:
filename = await record_audio(pya, microphone_info)
await asyncio.gather(whisper_audio(filename, model))
async def appmain():
await asyncio.gather(main(), main_server()) # gather coroutines here
if __name__ == "__main__":
asyncio.run(appmain()) # pass the main coroutine to asyncio.run()
参考
faster-whisper
mylobishop/transper
google bard
基于faster_whisper的实时语音识别
基于faster whisper实现实时语音识别项目语音转文本python编程实现
发表评论