查询麦克风
import sounddevice as sd
# 1. 查看所有主机api
print("=== all host apis ===")
for i, h in enumerate(sd.query_hostapis()):
print(f"hostapi {i}: {h['name']}")
print("\n=== all devices ===")
# 2. 查看所有设备
for i, d in enumerate(sd.query_devices()):
hostapi = sd.query_hostapis(d['hostapi'])
device_type = []
if d['max_input_channels'] > 0:
device_type.append("input")
if d['max_output_channels'] > 0:
device_type.append("output")
print(f"device {i}: {d['name']}")
print(f" hostapi: {hostapi['name']}")
print(f" type: {', '.join(device_type)}")
print(f" input channels: {d['max_input_channels']}")
print(f" output channels: {d['max_output_channels']}")
print()
# 3. 尝试使用默认设备
print("\n=== testing default devices ===")
print(f"default input device: {sd.default.device[0]}")
print(f"default output device: {sd.default.device[1]}")
# 4. 直接使用默认输入设备
try:
def audio_callback(indata, frames, time, status):
if status:
print(f"status: {status}")
print(f"audio shape: {indata.shape}")
with sd.inputstream(samplerate=16000, channels=1, dtype='int16', blocksize=320, callback=audio_callback):
input("🎤 recording with default device... press enter to stop\n")
except exception as e:
print(f"error: {e}")
# 5. 或者,列出所有输入设备
print("\n=== all input devices ===")
input_devices = []
for i, d in enumerate(sd.query_devices()):
if d['max_input_channels'] > 0:
hostapi = sd.query_hostapis(d['hostapi'])
input_devices.append((i, d, hostapi['name']))
print(f"device {i}: {d['name']}")
print(f" hostapi: {hostapi['name']}")
print(f" channels: {d['max_input_channels']}")
# 6. 选择一个可用的输入设备
if input_devices:
print("\n=== try using first available input device ===")
mic_index = input_devices[0][0]
device_info = input_devices[0][1]
channels = min(1, device_info['max_input_channels']) # 使用单声道更通用
try:
with sd.inputstream(device=mic_index, samplerate=16000, channels=channels, dtype='int16', blocksize=320, callback=lambda indata, frames, time, status: print(f"audio shape: {indata.shape}")):
input(f"🎤 recording with {device_info['name']}... press enter to stop\n")
except exception as e:
print(f"error with device {mic_index}: {e}")
else:
print("no input devices found at all!")测试麦克风
import sounddevice as sd
import numpy as np
# 直接使用设备10
mic_index = 10
print("testing microphone...")
def callback(indata, frames, time, status):
volume = np.linalg.norm(indata) * 10
print(f"microphone level: {volume:.2f}", end='\r')
# 尝试不同的参数组合
settings_to_try = [
{'samplerate': 16000, 'channels': 1, 'dtype': 'int16'},
{'samplerate': 44100, 'channels': 1, 'dtype': 'float32'},
{'samplerate': 48000, 'channels': 1, 'dtype': 'int16'},
]
for i, settings in enumerate(settings_to_try):
print(f"\ntry {i+1}: {settings}")
try:
with sd.inputstream(
device=mic_index,
callback=callback,
**settings
):
input(f"settings {i+1} working! press enter to stop...\n")
break
except exception as e:
print(f"failed: {e}")读取麦克风
import sounddevice as sd
import numpy as np
from scipy import signal
mic_index = 10
target_samplerate = 16000 # 目标采样率
original_samplerate = 44100 # 设备支持的采样率
print(f"recording at {original_samplerate}hz, resampling to {target_samplerate}hz")
def callback(indata, frames, time, status):
"""接收44100hz音频,重采样到16000hz"""
if status:
print(status)
# 如果是立体声,转换为单声道
if indata.shape[1] > 1:
audio = np.mean(indata, axis=1)
else:
audio = indata.flatten()
# 重采样到16000hz
num_samples = int(len(audio) * target_samplerate / original_samplerate)
resampled = signal.resample(audio, num_samples)
print(f"original: {len(audio)} samples, resampled: {len(resampled)} samples", end='\r')
with sd.inputstream(device=mic_index, samplerate=original_samplerate, channels=1, dtype='float32', callback=callback):
input("recording and resampling... press enter to stop\n")web socker server.py
import asyncio
import websockets
import json
import numpy as np
from datetime import datetime
import time
async def handle_audio_client(websocket, path):
"""处理音频客户端连接"""
client_id = id(websocket)
client_ip = websocket.remote_address[0]
print(f"\n✅ client {client_id} connected from {client_ip}")
try:
# 1. 接收音频格式信息
try:
format_msg = await asyncio.wait_for(websocket.recv(), timeout=5.0)
if isinstance(format_msg, str):
format_data = json.loads(format_msg)
print(f"📋 audio format: {format_data}")
# 发送确认
await websocket.send(json.dumps({'status': 'ready', 'message': 'start sending audio!'}))
except asyncio.timeouterror:
print("⚠️ no format received, assuming default settings")
# 2. 实时接收音频数据
print("👂 listening for audio data...")
print("-" * 60)
packet_count = 0
total_bytes = 0
start_time = time.time()
last_print_time = time.time()
try:
while true:
try:
# 接收数据(设置超时)
message = await asyncio.wait_for(websocket.recv(), timeout=2.0)
packet_count += 1
if isinstance(message, bytes):
# 音频数据
data_size = len(message)
total_bytes += data_size
# 解析音频数据
try:
audio_data = np.frombuffer(message, dtype=np.float32)
# 计算实时统计
if len(audio_data) > 0:
# 计算音量
rms = np.sqrt(np.mean(audio_data ** 2))
max_val = np.max(np.abs(audio_data))
# 转换为分贝
if rms > 0:
db = 20 * np.log10(rms)
else:
db = -100
# 创建音量可视化
bars = max(0, min(int((db + 60) / 3), 20))
volume_bar = "█" * bars + "░" * (20 - bars)
# 每秒更新显示
current_time = time.time()
if current_time - last_print_time >= 0.1: # 每0.1秒更新一次
elapsed = current_time - start_time
data_rate = total_bytes / elapsed / 1024 # kb/s
print(f"\r🎤 packets: {packet_count:4d} | "
f"rate: {data_rate:5.1f} kb/s | "
f"rms: {rms:6.4f} | "
f"db: {db:6.1f} | "
f"volume: [{volume_bar}]", end="", flush=true)
last_print_time = current_time
except exception as e:
print(f"\n⚠️ audio processing error: {e}")
elif isinstance(message, str):
print(f"\n📨 message: {message}")
except asyncio.timeouterror:
# 超时,检查连接是否还活着
try:
await websocket.ping()
continue
except:
break
except websockets.exceptions.connectionclosed:
print(f"\n🔌 client disconnected normally")
except exception as e:
print(f"\n❌ error: {e}")
finally:
# 连接结束,显示统计
elapsed = time.time() - start_time
if elapsed > 0:
print(f"\n" + "=" * 60)
print(f"📊 connection statistics:")
print(f" client id: {client_id}")
print(f" duration: {elapsed:.1f} seconds")
print(f" packets received: {packet_count}")
print(f" total data: {total_bytes / 1024:.1f} kb")
print(f" average rate: {total_bytes / elapsed / 1024:.1f} kb/s")
print(f" packets/sec: {packet_count / elapsed:.1f}")
print("=" * 60)
async def main():
server = await websockets.serve(handle_audio_client, "0.0.0.0", 8765, ping_interval=10, ping_timeout=20, max_size=10 * 1024 * 1024)
print(f"✅ server running on ws://0.0.0.0:8765")
print(f"📡 ready to receive audio streams")
print(f"💡 press ctrl+c to stop\n")
await server.wait_closed()
if __name__ == "__main__":
try:
asyncio.run(main())
except keyboardinterrupt:
print("\n👋 server stopped")audio_client.py
import asyncio
import websockets
import sounddevice as sd
import numpy as np
import json
import threading
import queue
import time
print("🚀 websocket audio streaming client")
print("=" * 60)
class workingclient:
def __init__(self):
self.server_url = "ws://localhost:8765"
self.device_index = 1
self.sample_rate = 44100
self.channels = 1
self.dtype = 'float32'
self.blocksize = 1024
# 音频队列
self.audio_queue = queue.queue(maxsize=50)
# 控制标志
self.running = true
self.is_recording = false
def start_audio_capture(self):
"""启动音频采集 - 在单独的线程中运行"""
print(f"🎤 opening microphone (device {self.device_index})...")
def callback(indata, frames, time_info, status):
if status:
print(f"⚠️ audio status: {status}")
# 检查音频是否有效
if np.any(indata):
# 计算音量
volume = np.linalg.norm(indata)
# 只将有声音的数据放入队列
if volume > 0.52: # 音量阈值
try:
# 复制数据并放入队列
audio_copy = indata.copy()
self.audio_queue.put(audio_copy, timeout=0.01)
# 显示音量(每20个包显示一次)
if hasattr(callback, 'counter'):
callback.counter += 1
else:
callback.counter = 0
if callback.counter % 20 == 0:
print(f"\r🎤 mic level: {volume:.4f} | queue: {self.audio_queue.qsize()}", end="")
except queue.full:
# 队列满了,清空并重新开始
try:
self.audio_queue.get_nowait()
except:
pass
try:
# 创建音频流
self.stream = sd.inputstream(device=self.device_index, samplerate=self.sample_rate, channels=self.channels, dtype=self.dtype, blocksize=self.blocksize, callback=callback)
self.stream.start()
self.is_recording = true
print("✅ microphone is ready! speak now...")
return true
except exception as e:
print(f"❌ failed to open microphone: {e}")
return false
async def connect_and_stream(self):
"""连接服务器并发送音频"""
print(f"\n🔗 connecting to server...")
try:
# 连接websocket
async with websockets.connect(self.server_url, ping_interval=10, ping_timeout=20) as websocket:
print("✅ connected to server!")
# 发送音频格式
await websocket.send(json.dumps({'type': 'audio_format', 'samplerate': self.sample_rate, 'channels': self.channels, 'dtype': self.dtype, 'blocksize': self.blocksize}))
# 等待服务器响应
response = await websocket.recv()
print(f"📨 server: {response}")
# 开始流式传输
print("\n" + "=" * 50)
print("📤 streaming audio to server...")
print("💡 speak into your microphone!")
print("🛑 press ctrl+c to stop")
print("=" * 50 + "\n")
packet_count = 0
last_display_time = time.time()
# 主发送循环
while self.running:
try:
# 从队列获取音频数据(非阻塞)
if not self.audio_queue.empty():
audio_data = self.audio_queue.get_nowait()
# 发送音频数据
await websocket.send(audio_data.tobytes())
packet_count += 1
# 每秒显示一次统计
current_time = time.time()
if current_time - last_display_time >= 1.0:
queue_size = self.audio_queue.qsize()
print(f"\r📦 packets: {packet_count:4d} | "
f"queue: {queue_size:2d} | "
f"sample: {audio_data[0, 0]:7.4f}...", end="")
last_display_time = current_time
else:
# 队列为空,短暂等待
await asyncio.sleep(0.01)
# 偶尔发送ping保持连接
if packet_count > 0 and packet_count % 100 == 0:
await websocket.ping()
except queue.empty:
# 队列空,短暂等待
await asyncio.sleep(0.01)
except exception as e:
print(f"\n⚠️ streaming error: {e}")
break
print(f"\n🛑 stopped. total packets sent: {packet_count}")
except exception as e:
print(f"❌ connection error: {e}")
async def run(self):
"""运行客户端"""
try:
# 启动音频采集
if not self.start_audio_capture():
return
# 连接并流式传输
await self.connect_and_stream()
except keyboardinterrupt:
print("\n👋 stopped by user")
except exception as e:
print(f"\n❌ error: {e}")
finally:
# 清理
self.running = false
if hasattr(self, 'stream'):
self.stream.stop()
self.stream.close()
print("🔇 microphone closed")
# 运行客户端
async def main():
client = workingclient()
await client.run()
if __name__ == "__main__":
asyncio.run(main())总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论