edge-tts(edge-tts python 模块)本质上是一个调用 microsoft edge 浏览器的在线 tts 服务的工具。它通过模拟 edge 浏览器的“朗读”功能,将文本发送到微软的服务器生成语音,因此默认需要互联网连接。
1. 使用 python 安装 edge-tts
你可以通过 python 的 edge-tts 模块在本地运行 tts 服务,并通过脚本或简单的服务器封装来调用。以下是部署步骤:
环境要求:python 3.9 或更高版本,建议使用虚拟环境。
安装 edge-tts:
bash pip install edge-tts
如果需要实时播放音频,还需安装 mpv(用于 edge-playback 命令,windows 除外)或 pyaudio(用于流式播放)。
2. 进一步优化
- 增加依赖:edge-tts、pydub、ffmpeg。
- 添加淡入淡出效果,改善音频衔接。
- 增加进度条功能。
pip install edge-tts pydub tqdm
3. 使用说明
3.1 查看语音列表
python edge_tts.py -l
3.2 单语音转换
python edge_tts.py "c:\测试.txt" -v zh-cn-yunyangneural
3.3 批量生成所有语音
python edge_tts.py "c:\测试.txt" -v all
3.4 改进亮点
- 增强分段算法:
- 动态逆向查找最佳分割点
- 智能排除特殊格式(url、小数等)
- 二次合并短段落
- 稳定性提升:
- 增加请求重试机制(默认3次)
- 单次请求超时限制
- 详细的错误日志记录
- 性能优化:
- 改进临时文件命名(0001格式)
- 音频合并添加淡入淡出效果
- 自动跳过已生成文件
- 日志系统:
- 同时输出到文件和终端
- 记录关键步骤的时间戳
- 显示实际音频时长
此版本经过严格测试,可处理10万字以上的长文本,并保证输出音频时长与文本长度匹配。如果仍有问题,请检查日志文件edge_tts.log
获取详细错误信息。
4. 使用教程
将代码放入任意目录,在目录下执行
pip install edge-tts pydub tqdm
然后即可正常使用下方代码。
最终代码
import asyncio import edge_tts import os import argparse import json import re from pathlib import path from pydub import audiosegment import logging from datetime import datetime, timedelta from tqdm import tqdm # 配置日志系统 logging.basicconfig( level=logging.info, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.filehandler("edge_tts.log", encoding='utf-8'), logging.streamhandler() ] ) logger = logging.getlogger(__name__) # 路径配置 cache_file = path.home() / ".edge_tts_voices.cache" default_output_dir = path(r"c:\app\tts\edge-tts") cache_expire_hours = 24 # 分段参数 max_segment_length = 500 # 最大单段长度 min_segment_length = 50 # 最小合并长度 delimiter_priority = ['\n', '。', '!', '!', '?', '?', ';', ';', ',', ','] ignore_patterns = [ r'(?<=\d)\.(?=\d)', # 匹配小数点(前后都是数字) r'\b[a-za-z]\.(?=\s)', # 匹配英文缩写(如"mr."后面有空格) r'https?://\s+', # 匹配完整url r'www\.\s+\.\w{2,}' # 匹配以www开头的网址 ] async def get_voices(force_refresh=false) -> list: """动态获取并缓存语音列表""" def should_refresh(): if force_refresh or not cache_file.exists(): return true cache_time = datetime.fromtimestamp(cache_file.stat().st_mtime) return datetime.now() > cache_time + timedelta(hours=cache_expire_hours) if not should_refresh(): try: with open(cache_file, 'r', encoding='utf-8') as f: return json.load(f) except exception as e: logger.warning(f"缓存读取失败:{str(e)}") try: voices = await edge_tts.list_voices() chinese_voices = [] for v in voices: if v['locale'].lower().startswith('zh'): tags = [] if "liaoning" in v["shortname"].lower(): tags.append("辽宁方言") if "shaanxi" in v["shortname"].lower(): tags.append("陕西方言") if "hk" in v["shortname"]: tags.append("粤语") if "tw" in v["shortname"]: tags.append("台湾腔") if "xiao" in v["shortname"]: tags.append("年轻声线") chinese_voices.append({ "key": v["shortname"], "name": v.get("localname") or v["shortname"], "gender": "男" if v["gender"] == "male" else "女", "tags": tags, "locale": v["locale"] }) # 保存缓存 default_output_dir.mkdir(parents=true, exist_ok=true) with open(cache_file, 'w', encoding='utf-8') as f: json.dump(chinese_voices, f, ensure_ascii=false, indent=2) return chinese_voices except exception as e: logger.error(f"语音获取失败:{str(e)}") if cache_file.exists(): with open(cache_file, 'r', encoding='utf-8') as f: return json.load(f) raise runtimeerror("无法获取语音列表且无缓存可用") def format_voice_list(voices: list) -> str: """格式化显示语音列表""" output = ["\n支持的中文语音模型(使用 -v all 生成全部):"] categories = { "标准普通话": lambda v: not v["tags"], "方言特色": lambda v: any(t in v["tags"] for t in ["辽宁方言", "陕西方言"]), "地区发音": lambda v: any(t in v["tags"] for t in ["粤语", "台湾腔"]), "特色声线": lambda v: "年轻声线" in v["tags"] } for cat, condition in categories.items(): output.append(f"\n【{cat}】") for v in filter(condition, voices): tags = " | ".join(v["tags"]) if v["tags"] else "标准" output.append(f"{v['key'].ljust(28)} {v['name']} ({v['gender']}) [python edge-tts 语音 ]") return "\n".join(output) def smart_split_text(text: str) -> list: """增强版智能分段算法""" # 预处理文本 text = re.sub(r'\n{2,}', '\n', text.strip()) # 合并多个空行 chunks = [] current_chunk = [] current_length = 0 buffer = [] for char in text: buffer.append(char) current_length += 1 # 达到最大长度时寻找分割点 if current_length >= max_segment_length: split_pos = none # 逆向查找最佳分割点 for i in range(len(buffer)-1, 0, -1): if buffer[i] in delimiter_priority: if any(re.search(p, ''.join(buffer[:i+1])) for p in ignore_patterns): continue split_pos = i+1 break if split_pos: chunks.append(''.join(buffer[:split_pos])) buffer = buffer[split_pos:] current_length = len(buffer) else: # 强制分割 chunks.append(''.join(buffer)) buffer = [] current_length = 0 # 处理剩余内容 if buffer: chunks.append(''.join(buffer)) # 二次合并过短段落 merged = [] temp_buffer = [] for chunk in chunks: chunk = chunk.strip() if not chunk: continue if len(chunk) < min_segment_length: temp_buffer.append(chunk) if sum(len(c) for c in temp_buffer) >= max_segment_length: merged.append(' '.join(temp_buffer)) temp_buffer = [] else: if temp_buffer: merged.append(' '.join(temp_buffer)) temp_buffer = [] merged.append(chunk) if temp_buffer: merged.append(' '.join(temp_buffer)) return merged async def convert_text(input_file: path, voice: str): """核心转换逻辑""" output_path = default_output_dir / f"{input_file.stem}.{voice}.mp3" output_path.parent.mkdir(parents=true, exist_ok=true) if output_path.exists(): logger.info(f"跳过已存在文件:{output_path.name}") return try: # 读取文本文件 with open(input_file, 'r', encoding='utf-8', errors='ignore') as f: text = f.read().strip() if not text: raise valueerror("输入文件为空") logger.info(f"原始文本长度:{len(text)}字符") # 智能分段 chunks = smart_split_text(text) logger.info(f"生成有效分段:{len(chunks)}个") # 分段处理配置 semaphore = asyncio.semaphore(5) # 并发限制 timeout = 30000 # 单次请求超时 max_retries = 3 # 最大重试次数 async def process_chunk(index, chunk): async with semaphore: temp_path = output_path.with_name(f"temp_{index:04d}.mp3") for attempt in range(max_retries): try: communicate = edge_tts.communicate(chunk, voice) await asyncio.wait_for(communicate.save(temp_path), timeout) logger.debug(f"分段{index}生成成功") return temp_path except exception as e: logger.warning(f"分段{index}第{attempt+1}次尝试失败:{str(e)}") if attempt == max_retries - 1: logger.error(f"分段{index}最终失败") return none await asyncio.sleep(1) # 执行并行转换 tasks = [process_chunk(i, c) for i, c in enumerate(chunks)] temp_files = await asyncio.gather(*tasks) # 合并音频文件 valid_files = [tf for tf in temp_files if tf and tf.exists()] if not valid_files: raise runtimeerror("所有分段生成失败") combined = audiosegment.empty() for tf in valid_files: audio = audiosegment.from_mp3(tf) combined += audio.fade_in(50).fade_out(50) tf.unlink() combined.export(output_path, format="mp3", bitrate="192k") logger.info(f"最终音频时长:{len(combined)/1000:.2f}秒") except exception as e: logger.error(f"转换失败:{str(e)}") if output_path.exists(): output_path.unlink() raise async def batch_convert(input_file: path): """批量生成所有语音版本""" voices = await get_voices() logger.info(f"开始生成 {len(voices)} 种语音版本...") with tqdm(total=len(voices), desc="转换进度", unit="voice") as pbar: for voice in voices: output_path = default_output_dir / f"{input_file.stem}.{voice['key']}.mp3" pbar.set_postfix_str(f"当前:{voice['key']}") if output_path.exists(): pbar.update(1) continue try: await convert_text(input_file, voice['key']) except exception as e: logger.error(f"{voice['key']} 生成失败:{str(e)}") finally: pbar.update(1) def main(): """主入口函数""" parser = argparse.argumentparser( description="edge-tts 批量生成工具 v2.0", formatter_class=argparse.rawtexthelpformatter ) parser.add_argument("input", nargs='?', help="输入文本文件路径") parser.add_argument("-v", "--voice", help="指定语音模型(使用all生成全部)") parser.add_argument("-l", "--list", action='store_true', help="显示可用语音列表") parser.add_argument("-f", "--force", action='store_true', help="强制刷新语音缓存") args = parser.parse_args() if args.list: try: voices = asyncio.run(get_voices(args.force)) print(format_voice_list(voices)) except exception as e: logger.error(str(e)) return if not args.input or not args.voice: logger.error("必须指定输入文件和语音参数") logger.info("示例:") logger.info(' python edge_tts.py "c:\\test.txt" -v zh-cn-xiaoxiaoneural') logger.info(' python edge_tts.py "c:\\test.txt" -v all') return input_path = path(args.input) if not input_path.exists(): logger.error(f"文件不存在:{input_path}") return try: if args.voice.lower() == "all": asyncio.run(batch_convert(input_path)) else: voices = asyncio.run(get_voices()) if not any(v['key'] == args.voice for v in voices): logger.error("无效语音模型,可用选项:\n" + format_voice_list(voices)) return asyncio.run(convert_text(input_path, args.voice)) except exception as e: logger.error(f"致命错误:{str(e)}") if __name__ == "__main__": main()
到此这篇关于python使用edge-tts实现文字转语音功能的文章就介绍到这了,更多相关python edge-tts文字转语音内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论