当前位置: 代码网 > it编程>前端脚本>Python > python操作麦克风方式

python操作麦克风方式

2026年01月17日 Python 我要评论
查询麦克风import sounddevice as sd# 1. 查看所有主机apiprint("=== all host apis ===")for i, h in enumerate(sd.qu

查询麦克风

import sounddevice as sd

# 1. 查看所有主机api
print("=== all host apis ===")
for i, h in enumerate(sd.query_hostapis()):
    print(f"hostapi {i}: {h['name']}")

print("\n=== all devices ===")
# 2. 查看所有设备
for i, d in enumerate(sd.query_devices()):
    hostapi = sd.query_hostapis(d['hostapi'])
    device_type = []
    if d['max_input_channels'] > 0:
        device_type.append("input")
    if d['max_output_channels'] > 0:
        device_type.append("output")
    
    print(f"device {i}: {d['name']}")
    print(f"  hostapi: {hostapi['name']}")
    print(f"  type: {', '.join(device_type)}")
    print(f"  input channels: {d['max_input_channels']}")
    print(f"  output channels: {d['max_output_channels']}")
    print()

# 3. 尝试使用默认设备
print("\n=== testing default devices ===")
print(f"default input device: {sd.default.device[0]}")
print(f"default output device: {sd.default.device[1]}")

# 4. 直接使用默认输入设备
try:
    def audio_callback(indata, frames, time, status):
        if status:
            print(f"status: {status}")
        print(f"audio shape: {indata.shape}")
    
    
    with sd.inputstream(samplerate=16000, channels=1, dtype='int16', blocksize=320, callback=audio_callback):
        input("🎤 recording with default device... press enter to stop\n")
except exception as e:
    print(f"error: {e}")

# 5. 或者,列出所有输入设备
print("\n=== all input devices ===")
input_devices = []
for i, d in enumerate(sd.query_devices()):
    if d['max_input_channels'] > 0:
        hostapi = sd.query_hostapis(d['hostapi'])
        input_devices.append((i, d, hostapi['name']))
        print(f"device {i}: {d['name']}")
        print(f"  hostapi: {hostapi['name']}")
        print(f"  channels: {d['max_input_channels']}")

# 6. 选择一个可用的输入设备
if input_devices:
    print("\n=== try using first available input device ===")
    mic_index = input_devices[0][0]
    device_info = input_devices[0][1]
    
    channels = min(1, device_info['max_input_channels'])  # 使用单声道更通用
    
    try:
        with sd.inputstream(device=mic_index, samplerate=16000, channels=channels, dtype='int16', blocksize=320, callback=lambda indata, frames, time, status: print(f"audio shape: {indata.shape}")):
            input(f"🎤 recording with {device_info['name']}... press enter to stop\n")
    except exception as e:
        print(f"error with device {mic_index}: {e}")
else:
    print("no input devices found at all!")

测试麦克风

import sounddevice as sd
import numpy as np

# 直接使用设备10
mic_index = 10

print("testing microphone...")

def callback(indata, frames, time, status):
    volume = np.linalg.norm(indata) * 10
    print(f"microphone level: {volume:.2f}", end='\r')

# 尝试不同的参数组合
settings_to_try = [
    {'samplerate': 16000, 'channels': 1, 'dtype': 'int16'},
    {'samplerate': 44100, 'channels': 1, 'dtype': 'float32'},
    {'samplerate': 48000, 'channels': 1, 'dtype': 'int16'},
]

for i, settings in enumerate(settings_to_try):
    print(f"\ntry {i+1}: {settings}")
    try:
        with sd.inputstream(
            device=mic_index,
            callback=callback,
            **settings
        ):
            input(f"settings {i+1} working! press enter to stop...\n")
            break
    except exception as e:
        print(f"failed: {e}")

读取麦克风

import sounddevice as sd
import numpy as np
from scipy import signal

mic_index = 10
target_samplerate = 16000  # 目标采样率
original_samplerate = 44100  # 设备支持的采样率

print(f"recording at {original_samplerate}hz, resampling to {target_samplerate}hz")


def callback(indata, frames, time, status):
    """接收44100hz音频,重采样到16000hz"""
    if status:
        print(status)
    
    # 如果是立体声,转换为单声道
    if indata.shape[1] > 1:
        audio = np.mean(indata, axis=1)
    else:
        audio = indata.flatten()
    
    # 重采样到16000hz
    num_samples = int(len(audio) * target_samplerate / original_samplerate)
    resampled = signal.resample(audio, num_samples)
    
    print(f"original: {len(audio)} samples, resampled: {len(resampled)} samples", end='\r')


with sd.inputstream(device=mic_index, samplerate=original_samplerate, channels=1, dtype='float32', callback=callback):
    input("recording and resampling... press enter to stop\n")

web socker server.py

import asyncio
import websockets
import json
import numpy as np
from datetime import datetime
import time

async def handle_audio_client(websocket, path):
    """处理音频客户端连接"""
    client_id = id(websocket)
    client_ip = websocket.remote_address[0]
    
    print(f"\n✅ client {client_id} connected from {client_ip}")
    
    try:
        # 1. 接收音频格式信息
        try:
            format_msg = await asyncio.wait_for(websocket.recv(), timeout=5.0)
            if isinstance(format_msg, str):
                format_data = json.loads(format_msg)
                print(f"📋 audio format: {format_data}")
                
                # 发送确认
                await websocket.send(json.dumps({'status': 'ready', 'message': 'start sending audio!'}))
        except asyncio.timeouterror:
            print("⚠️ no format received, assuming default settings")
        
        # 2. 实时接收音频数据
        print("👂 listening for audio data...")
        print("-" * 60)
        
        packet_count = 0
        total_bytes = 0
        start_time = time.time()
        last_print_time = time.time()
        
        try:
            while true:
                try:
                    # 接收数据(设置超时)
                    message = await asyncio.wait_for(websocket.recv(), timeout=2.0)
                    packet_count += 1
                    
                    if isinstance(message, bytes):
                        # 音频数据
                        data_size = len(message)
                        total_bytes += data_size
                        
                        # 解析音频数据
                        try:
                            audio_data = np.frombuffer(message, dtype=np.float32)
                            
                            # 计算实时统计
                            if len(audio_data) > 0:
                                # 计算音量
                                rms = np.sqrt(np.mean(audio_data ** 2))
                                max_val = np.max(np.abs(audio_data))
                                
                                # 转换为分贝
                                if rms > 0:
                                    db = 20 * np.log10(rms)
                                else:
                                    db = -100
                                
                                # 创建音量可视化
                                bars = max(0, min(int((db + 60) / 3), 20))
                                volume_bar = "█" * bars + "░" * (20 - bars)
                                
                                # 每秒更新显示
                                current_time = time.time()
                                if current_time - last_print_time >= 0.1:  # 每0.1秒更新一次
                                    elapsed = current_time - start_time
                                    data_rate = total_bytes / elapsed / 1024  # kb/s
                                    
                                    print(f"\r🎤 packets: {packet_count:4d} | "
                                          f"rate: {data_rate:5.1f} kb/s | "
                                          f"rms: {rms:6.4f} | "
                                          f"db: {db:6.1f} | "
                                          f"volume: [{volume_bar}]", end="", flush=true)
                                    
                                    last_print_time = current_time
                        
                        except exception as e:
                            print(f"\n⚠️ audio processing error: {e}")
                    
                    elif isinstance(message, str):
                        print(f"\n📨 message: {message}")
                
                except asyncio.timeouterror:
                    # 超时,检查连接是否还活着
                    try:
                        await websocket.ping()
                        continue
                    except:
                        break
        
        except websockets.exceptions.connectionclosed:
            print(f"\n🔌 client disconnected normally")
    
    except exception as e:
        print(f"\n❌ error: {e}")
    
    finally:
        # 连接结束,显示统计
        elapsed = time.time() - start_time
        if elapsed > 0:
            print(f"\n" + "=" * 60)
            print(f"📊 connection statistics:")
            print(f"   client id: {client_id}")
            print(f"   duration: {elapsed:.1f} seconds")
            print(f"   packets received: {packet_count}")
            print(f"   total data: {total_bytes / 1024:.1f} kb")
            print(f"   average rate: {total_bytes / elapsed / 1024:.1f} kb/s")
            print(f"   packets/sec: {packet_count / elapsed:.1f}")
            print("=" * 60)


async def main():
    server = await websockets.serve(handle_audio_client, "0.0.0.0", 8765, ping_interval=10, ping_timeout=20, max_size=10 * 1024 * 1024)
    
    print(f"✅ server running on ws://0.0.0.0:8765")
    print(f"📡 ready to receive audio streams")
    print(f"💡 press ctrl+c to stop\n")
    
    await server.wait_closed()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except keyboardinterrupt:
        print("\n👋 server stopped")

audio_client.py

import asyncio
import websockets
import sounddevice as sd
import numpy as np
import json
import threading
import queue
import time

print("🚀 websocket audio streaming client")
print("=" * 60)


class workingclient:
    def __init__(self):
        self.server_url = "ws://localhost:8765"
        self.device_index = 1
        self.sample_rate = 44100
        self.channels = 1
        self.dtype = 'float32'
        self.blocksize = 1024
        
        # 音频队列
        self.audio_queue = queue.queue(maxsize=50)
        
        # 控制标志
        self.running = true
        self.is_recording = false
    
    def start_audio_capture(self):
        """启动音频采集 - 在单独的线程中运行"""
        print(f"🎤 opening microphone (device {self.device_index})...")
        
        def callback(indata, frames, time_info, status):
            if status:
                print(f"⚠️ audio status: {status}")
            
            # 检查音频是否有效
            if np.any(indata):
                # 计算音量
                volume = np.linalg.norm(indata)
                
                
                # 只将有声音的数据放入队列
                if volume > 0.52:  # 音量阈值
                    try:
                        # 复制数据并放入队列
                        audio_copy = indata.copy()
                        self.audio_queue.put(audio_copy, timeout=0.01)
                        
                        # 显示音量(每20个包显示一次)
                        if hasattr(callback, 'counter'):
                            callback.counter += 1
                        else:
                            callback.counter = 0
                        
                        if callback.counter % 20 == 0:
                            print(f"\r🎤 mic level: {volume:.4f} | queue: {self.audio_queue.qsize()}", end="")
                    
                    except queue.full:
                        # 队列满了,清空并重新开始
                        try:
                            self.audio_queue.get_nowait()
                        except:
                            pass
        
        try:
            # 创建音频流
            self.stream = sd.inputstream(device=self.device_index, samplerate=self.sample_rate, channels=self.channels, dtype=self.dtype, blocksize=self.blocksize, callback=callback)
            
            self.stream.start()
            self.is_recording = true
            print("✅ microphone is ready! speak now...")
            return true
        
        except exception as e:
            print(f"❌ failed to open microphone: {e}")
            return false
    
    async def connect_and_stream(self):
        """连接服务器并发送音频"""
        print(f"\n🔗 connecting to server...")
        
        try:
            # 连接websocket
            async with websockets.connect(self.server_url, ping_interval=10, ping_timeout=20) as websocket:
                
                print("✅ connected to server!")
                
                # 发送音频格式
                await websocket.send(json.dumps({'type': 'audio_format', 'samplerate': self.sample_rate, 'channels': self.channels, 'dtype': self.dtype, 'blocksize': self.blocksize}))
                
                # 等待服务器响应
                response = await websocket.recv()
                print(f"📨 server: {response}")
                
                # 开始流式传输
                print("\n" + "=" * 50)
                print("📤 streaming audio to server...")
                print("💡 speak into your microphone!")
                print("🛑 press ctrl+c to stop")
                print("=" * 50 + "\n")
                
                packet_count = 0
                last_display_time = time.time()
                
                # 主发送循环
                while self.running:
                    try:
                        # 从队列获取音频数据(非阻塞)
                        if not self.audio_queue.empty():
                            audio_data = self.audio_queue.get_nowait()
                            
                            # 发送音频数据
                            await websocket.send(audio_data.tobytes())
                            packet_count += 1
                            
                            # 每秒显示一次统计
                            current_time = time.time()
                            if current_time - last_display_time >= 1.0:
                                queue_size = self.audio_queue.qsize()
                                print(f"\r📦 packets: {packet_count:4d} | "
                                      f"queue: {queue_size:2d} | "
                                      f"sample: {audio_data[0, 0]:7.4f}...", end="")
                                last_display_time = current_time
                        
                        else:
                            # 队列为空,短暂等待
                            await asyncio.sleep(0.01)
                            
                            # 偶尔发送ping保持连接
                            if packet_count > 0 and packet_count % 100 == 0:
                                await websocket.ping()
                    
                    except queue.empty:
                        # 队列空,短暂等待
                        await asyncio.sleep(0.01)
                    
                    except exception as e:
                        print(f"\n⚠️ streaming error: {e}")
                        break
                
                print(f"\n🛑 stopped. total packets sent: {packet_count}")
        
        except exception as e:
            print(f"❌ connection error: {e}")
    
    async def run(self):
        """运行客户端"""
        try:
            # 启动音频采集
            if not self.start_audio_capture():
                return
            
            # 连接并流式传输
            await self.connect_and_stream()
        
        except keyboardinterrupt:
            print("\n👋 stopped by user")
        except exception as e:
            print(f"\n❌ error: {e}")
        finally:
            # 清理
            self.running = false
            if hasattr(self, 'stream'):
                self.stream.stop()
                self.stream.close()
                print("🔇 microphone closed")


# 运行客户端
async def main():
    client = workingclient()
    await client.run()


if __name__ == "__main__":
    asyncio.run(main())

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com