当前位置: 代码网 > it编程>软件设计>搜素引擎 > Faster-Whisper 实时识别电脑语音转文本

Faster-Whisper 实时识别电脑语音转文本

2024年07月31日 搜素引擎 我要评论
以前做的智能对话软件接的Baidu API,想换成本地的,就搭一套Faster-Whisper吧。下面是B站视频实时转写的截图。

前言

以前做的智能对话软件接的baidu api,想换成本地的,就搭一套faster-whisper吧。
下面是b站视频实时转写的截图
效果图

项目

搭建环境

所需要的cudann已经装好了,如果装的是12.2应该是包含cublas了
没装的,可以从下面链接下载装一下,文末的参考视频中也有讲解
https://github.com/purfview/whisper-standalone-win/releases/tag/libs

配置好的cudann

ancanda的运行环境去clone一下之前配好的环境,用之前bertvits的即可

安装faster-whisper

输入即可安装

pip install faster-whisper

下载模型

https://huggingface.co/systran/faster-whisper-large-v3
下载完放到代码旁边就可以了
模型放代码同文件夹

编写测试代码

模型放这里

# local_files_only=true 表示加载本地模型
# model_size_or_path=path 指定加载模型路径
# device="cuda" 指定使用cuda
# compute_type="int8_float16" 量化为8位
# language="zh" 指定音频语言
# vad_filter=true 开启vad
# vad_parameters=dict(min_silence_duration_ms=1000) 设置vad参数
from faster_whisper import whispermodel

model_size = "large-v3"
path = r"d:\project\python_project\fasterwhisper\large-v3"

# run on gpu with fp16
model = whispermodel(model_size_or_path=path, device="cuda", local_files_only=true)
 
# or run on gpu with int8
# model = whispermodel(model_size, device="cuda", compute_type="int8_float16")
# or run on cpu with int8
# model = whispermodel(model_size, device="cpu", compute_type="int8")

segments, info = model.transcribe("audio.wav", beam_size=5, language="zh", vad_filter=true, vad_parameters=dict(min_silence_duration_ms=1000))

print("detected language '%s' with probability %f" % (info.language, info.language_probability))

for segment in segments:
    print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))


运行测试代码

找个音频放入文件夹内,输入python main.py即可运行!
可以看到正确(不太正确)的识别出了音频说了什么。
运行效果

实时转写脚本

新建一个脚本transper.py
运行即可

import os
import sys
import time
import wave
import tempfile
import threading

import torch
import pyaudiowpatch as pyaudio
from faster_whisper import whispermodel as whisper

# a bigger audio buffer gives better accuracy
# but also increases latency in response.
# 表示音频缓冲时间的常量
audio_buffer = 5

# 此函数使用 pyaudio 库录制音频,并将其保存为一个临时的 wav 文件。
# 使用 pyaudio.pyaudio 实例创建一个音频流,通过指定回调函数 callback 来实时写入音频数据到 wav 文件。
# time.sleep(audio_buffer) 会阻塞执行,确保录制足够的音频时间。
# 最后,函数返回保存的 wav 文件的文件名。
def record_audio(p, device):
    """record audio from output device and save to temporary wav file."""
    with tempfile.namedtemporaryfile(suffix=".wav", delete=false) as f:
        filename = f.name
        wave_file = wave.open(filename, "wb")
        wave_file.setnchannels(device["maxinputchannels"])
        wave_file.setsampwidth(pyaudio.get_sample_size(pyaudio.paint16))
        wave_file.setframerate(int(device["defaultsamplerate"]))

        def callback(in_data, frame_count, time_info, status):
            """write frames and return pa flag"""
            wave_file.writeframes(in_data)
            return (in_data, pyaudio.pacontinue)

        stream = p.open(
            format=pyaudio.paint16,
            channels=device["maxinputchannels"],
            rate=int(device["defaultsamplerate"]),
            frames_per_buffer=pyaudio.get_sample_size(pyaudio.paint16),
            input=true,
            input_device_index=device["index"],
            stream_callback=callback,
        )

        try:
            time.sleep(audio_buffer)  # blocking execution while playing
        finally:
            stream.stop_stream()
            stream.close()
            wave_file.close()
            # print(f"{filename} saved.")
    return filename

# 此函数使用 whisper 模型对录制的音频进行转录,并输出转录结果。
def whisper_audio(filename, model):
    """transcribe audio buffer and display."""
    # segments, info = model.transcribe(filename, beam_size=5, task="translate", language="zh", vad_filter=true, vad_parameters=dict(min_silence_duration_ms=1000))
    segments, info = model.transcribe(filename, beam_size=5, language="zh", vad_filter=true, vad_parameters=dict(min_silence_duration_ms=1000))
    os.remove(filename)
    # print(f"{filename} removed.")
    for segment in segments:
        # print(f"[{segment.start:.2f} -> {segment.end:.2f}] {segment.text.strip()}")
        print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))

# main 函数是整个脚本的主控制函数。
# 加载 whisper 模型,选择合适的计算设备(gpu 或 cpu)。
# 获取默认的 wasapi 输出设备信息,并选择默认的扬声器(输出设备)。
# 使用 pyaudio 开始录制音频,并通过多线程运行 whisper_audio 函数进行音频转录。
def main():
    """load model record audio and transcribe from default output device."""
    print("loading model...")
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"using {device} device.")
    # model = whisper("large-v3", device=device, compute_type="float16")
    model = whisper("large-v3", device=device, local_files_only=true)

    print("model loaded.")

    with pyaudio.pyaudio() as pya:
        # create pyaudio instance via context manager.
        try:
            # get default wasapi info
            wasapi_info = pya.get_host_api_info_by_type(pyaudio.pawasapi)
        except oserror:
            print("looks like wasapi is not available on the system. exiting...")
            sys.exit()

        # get default wasapi speakers
        default_speakers = pya.get_device_info_by_index(
            wasapi_info["defaultoutputdevice"]
        )

        if not default_speakers["isloopbackdevice"]:
            for loopback in pya.get_loopback_device_info_generator():
                # try to find loopback device with same name(and [loopback suffix]).
                # unfortunately, this is the most adequate way at the moment.
                if default_speakers["name"] in loopback["name"]:
                    default_speakers = loopback
                    break
            else:
                print(
                    """
                    default loopback output device not found.
                    run `python -m pyaudiowpatch` to check available devices.
                    exiting...
                    """
                )
                sys.exit()

        print(
            f"recording from: {default_speakers['name']} ({default_speakers['index']})\n"
        )

        while true:
            filename = record_audio(pya, default_speakers)
            thread = threading.thread(target=whisper_audio, args=(filename, model))
            thread.start()

main()

实时转写websocket服务器模式

在最新google bard的帮助下,从同步多线程单机版变成了异步websocket服务器版本,unity可以链接并监听实时转写的数据了(写这篇文章时是冬季,chatgpt实测已经开始休眠状态了

import asyncio
import os
import wave
import tempfile
import torch
import pyaudiowpatch as pyaudio
from faster_whisper import whispermodel as whisper
import websockets
import json

# audio buffer time
audio_buffer = 5

# dictionary to store websocket connections
clients = {}

# handle client
async def handle_client(websocket):
   client_id = id(websocket)  # using the websocket object's id as a unique identifier
   print(f"client connected from {websocket.remote_address} with id {client_id}")
   clients[client_id] = websocket
   try:
      # print(f"client connected from {websocket.remote_address}")
      # wait for messages from the client
      async for message in websocket:
         print(f"received message from client {client_id}: {message}")
         # process the message (you can replace this with your logic)
         response = f"server received: {message}"
         # send a response back to the client
         await websocket.send(response)
         print(f"sent response to client {client_id}: {response}")

   except websockets.exceptions.connectionclosederror:
      print(f"connection with {websocket.remote_address} closed.")
   finally:
      # remove the websocket connection when the client disconnects
      del clients[client_id]

# send a message to all connected clients
async def send_all_clients(message):
   if clients==none or clients=={}: 
      print("no clients connected.")
      return
   for client_id, websocket in clients.items():
      try:
         await websocket.send(message)
         print(f"sent message to client {client_id}: {message}")
      except exception as e:
         print(f"error sending message to client {client_id}: {e}")

# send a message to a specific client identified by client_id
async def send_message(client_id, message):
   if client_id in clients:
      websocket = clients[client_id]
      await websocket.send(message)
      print(f"sent message to client {client_id}: {message}")
   else:
      print(f"client with id {client_id} not found.")


# start the server
async def main_server():
   server = await websockets.serve(handle_client, "localhost", 8765)
   print("websocket server started. listening on ws://localhost:8765")

   await server.wait_closed()



#this function records audio using the pyaudio library and saves it as a temporary wav file.
#use pyaudio pyaudio instance creates an audio stream and writes audio data in real-time to a wav file by specifying the callback function callback.
#due to the use of the asyncio library, it is no longer necessary to use time. sleep() to block execution, but instead to use asyncio. sleep() to wait asynchronously.
#finally, the function returns the file name of the saved wav file.
async def record_audio(p, device):
    """record audio from output device and save to temporary wav file."""
    with tempfile.namedtemporaryfile(suffix=".wav", delete=false) as f:
        filename = f.name
        wave_file = wave.open(filename, "wb")
        wave_file.setnchannels(device["maxinputchannels"])
        wave_file.setsampwidth(pyaudio.get_sample_size(pyaudio.paint16))
        wave_file.setframerate(int(device["defaultsamplerate"]))

        def callback(in_data, frame_count, time_info, status):
            """write frames and return pa flag"""
            wave_file.writeframes(in_data)
            return (in_data, pyaudio.pacontinue)

        stream = p.open(
            format=pyaudio.paint16,
            channels=device["maxinputchannels"],
            rate=int(device["defaultsamplerate"]),
            frames_per_buffer=pyaudio.get_sample_size(pyaudio.paint16),
            input=true,
            input_device_index=device["index"],
            stream_callback=callback,
        )

        await asyncio.sleep(audio_buffer)

        stream.stop_stream()
        stream.close()
        wave_file.close()
        # print(f"{filename} saved.")
    return filename
 
# segmentdata class
class segmentdata:
    def __init__(self, start, end,text):
        # 实例属性
        self.start = start
        self.end = end
        self.text = text

    def __dict__(self):
        return {"start": self.start, "end": self.end, "text": self.text}

def convert_to_unity_data(data):  # 参数 data 为字典列表
    unity_data = []
    for item in data:
        segment_data = segmentdata(item["start"], item["end"], item["text"])
        unity_data.append(segment_data)
    return unity_data


# this function transcribes the recorded audio using the whisper model and outputs the transcription result.
async def whisper_audio(filename, model):
    """transcribe audio buffer and display."""
    segments, info = model.transcribe(filename, beam_size=5, language="zh", vad_filter=true, vad_parameters=dict(min_silence_duration_ms=1000))
    os.remove(filename)
    # print(f"{filename} removed.")
    if segments:
        segments_dict_list = [{"start": segment.start, "end": segment.end, "text": segment.text.strip()} for segment in segments]
        json_transcriptions=json.dumps(segments_dict_list)
        print(f"transcription: {json_transcriptions}")
        try:
            await send_all_clients(json_transcriptions)
        except exception as e:
            print(f"error sending message: {e}")



# start recording audio using pyaudio and concurrently run the whisper_audio function for audio transcription using asyncio.gather.
async def main():
    """load model record audio and transcribe from default output device."""
    print("loading model...")
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"using {device} device.")
    model = whisper("large-v3", device=device, local_files_only=true,compute_type="int8_float16")

    print("model loaded.")

    with pyaudio.pyaudio() as pya:
        # get microphone device information (assuming you want to select the first microphone device)
        microphone_index = 0
        microphone_info = pya.get_device_info_by_index(microphone_index)
        while true:
            filename = await record_audio(pya, microphone_info)
            await asyncio.gather(whisper_audio(filename, model))


async def appmain():
    await asyncio.gather(main(), main_server())  # gather coroutines here

if __name__ == "__main__":
    asyncio.run(appmain())  # pass the main coroutine to asyncio.run()

参考

faster-whisper
mylobishop/transper
google bard
基于faster_whisper的实时语音识别
基于faster whisper实现实时语音识别项目语音转文本python编程实现

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com