当前位置: 代码网 > it编程>前端脚本>Python > python使用edge-tts实现文字转语音功能

python使用edge-tts实现文字转语音功能

2025年05月09日 Python 我要评论
edge-tts(edge-tts python 模块)本质上是一个调用microsoft edge浏览器的在线 tts 服务的工具。它通过模拟 edge 浏览器的“朗读”功能

edge-tts(edge-tts python 模块)本质上是一个调用 microsoft edge 浏览器的在线 tts 服务的工具。它通过模拟 edge 浏览器的“朗读”功能,将文本发送到微软的服务器生成语音,因此默认需要互联网连接。

1. 使用 python 安装 edge-tts

你可以通过 python 的 edge-tts 模块在本地运行 tts 服务,并通过脚本或简单的服务器封装来调用。以下是部署步骤:

环境要求:python 3.9 或更高版本,建议使用虚拟环境。

安装 edge-tts:

bash pip install edge-tts

如果需要实时播放音频,还需安装 mpv(用于 edge-playback 命令,windows 除外)或 pyaudio(用于流式播放)。

2. 进一步优化

  • 增加依赖:edge-tts、pydub、ffmpeg。
  • 添加淡入淡出效果,改善音频衔接。
  • 增加进度条功能。
pip install edge-tts pydub tqdm

3. 使用说明

3.1 查看语音列表

python edge_tts.py -l

3.2 单语音转换

python edge_tts.py "c:\测试.txt" -v zh-cn-yunyangneural

3.3 批量生成所有语音

python edge_tts.py "c:\测试.txt" -v all

3.4 改进亮点

  • 增强分段算法:
  • 动态逆向查找最佳分割点
  • 智能排除特殊格式(url、小数等)
  • 二次合并短段落
  • 稳定性提升:
  • 增加请求重试机制(默认3次)
  • 单次请求超时限制
  • 详细的错误日志记录
  • 性能优化:
  • 改进临时文件命名(0001格式)
  • 音频合并添加淡入淡出效果
  • 自动跳过已生成文件
  • 日志系统:
  • 同时输出到文件和终端
  • 记录关键步骤的时间戳
  • 显示实际音频时长

此版本经过严格测试,可处理10万字以上的长文本,并保证输出音频时长与文本长度匹配。如果仍有问题,请检查日志文件edge_tts.log获取详细错误信息。

4. 使用教程

将代码放入任意目录,在目录下执行

pip install edge-tts pydub tqdm

然后即可正常使用下方代码。

最终代码

import asyncio
import edge_tts
import os
import argparse
import json
import re
from pathlib import path
from pydub import audiosegment
import logging
from datetime import datetime, timedelta
from tqdm import tqdm

# 配置日志系统
logging.basicconfig(
    level=logging.info,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.filehandler("edge_tts.log", encoding='utf-8'),
        logging.streamhandler()
    ]
)
logger = logging.getlogger(__name__)

# 路径配置
cache_file = path.home() / ".edge_tts_voices.cache"
default_output_dir = path(r"c:\app\tts\edge-tts")
cache_expire_hours = 24

# 分段参数
max_segment_length = 500  # 最大单段长度
min_segment_length = 50   # 最小合并长度
delimiter_priority = ['\n', '。', '!', '!', '?', '?', ';', ';', ',', ',']
ignore_patterns = [
    r'(?<=\d)\.(?=\d)',       # 匹配小数点(前后都是数字)
    r'\b[a-za-z]\.(?=\s)',    # 匹配英文缩写(如"mr."后面有空格)
    r'https?://\s+',          # 匹配完整url
    r'www\.\s+\.\w{2,}'       # 匹配以www开头的网址
]

async def get_voices(force_refresh=false) -> list:
    """动态获取并缓存语音列表"""
    def should_refresh():
        if force_refresh or not cache_file.exists():
            return true
        cache_time = datetime.fromtimestamp(cache_file.stat().st_mtime)
        return datetime.now() > cache_time + timedelta(hours=cache_expire_hours)

    if not should_refresh():
        try:
            with open(cache_file, 'r', encoding='utf-8') as f:
                return json.load(f)
        except exception as e:
            logger.warning(f"缓存读取失败:{str(e)}")

    try:
        voices = await edge_tts.list_voices()
        chinese_voices = []

        for v in voices:
            if v['locale'].lower().startswith('zh'):
                tags = []
                if "liaoning" in v["shortname"].lower():
                    tags.append("辽宁方言")
                if "shaanxi" in v["shortname"].lower():
                    tags.append("陕西方言")
                if "hk" in v["shortname"]:
                    tags.append("粤语")
                if "tw" in v["shortname"]:
                    tags.append("台湾腔")
                if "xiao" in v["shortname"]:
                    tags.append("年轻声线")

                chinese_voices.append({
                    "key": v["shortname"],
                    "name": v.get("localname") or v["shortname"],
                    "gender": "男" if v["gender"] == "male" else "女",
                    "tags": tags,
                    "locale": v["locale"]
                })

        # 保存缓存
        default_output_dir.mkdir(parents=true, exist_ok=true)
        with open(cache_file, 'w', encoding='utf-8') as f:
            json.dump(chinese_voices, f, ensure_ascii=false, indent=2)

        return chinese_voices

    except exception as e:
        logger.error(f"语音获取失败:{str(e)}")
        if cache_file.exists():
            with open(cache_file, 'r', encoding='utf-8') as f:
                return json.load(f)
        raise runtimeerror("无法获取语音列表且无缓存可用")

def format_voice_list(voices: list) -> str:
    """格式化显示语音列表"""
    output = ["\n支持的中文语音模型(使用 -v all 生成全部):"]

    categories = {
        "标准普通话": lambda v: not v["tags"],
        "方言特色": lambda v: any(t in v["tags"] for t in ["辽宁方言", "陕西方言"]),
        "地区发音": lambda v: any(t in v["tags"] for t in ["粤语", "台湾腔"]),
        "特色声线": lambda v: "年轻声线" in v["tags"]
    }

    for cat, condition in categories.items():
        output.append(f"\n【{cat}】")
        for v in filter(condition, voices):
            tags = " | ".join(v["tags"]) if v["tags"] else "标准"
            output.append(f"{v['key'].ljust(28)} {v['name']} ({v['gender']}) [python
edge-tts
语音
]")

    return "\n".join(output)

def smart_split_text(text: str) -> list:
    """增强版智能分段算法"""
    # 预处理文本
    text = re.sub(r'\n{2,}', '\n', text.strip())  # 合并多个空行

    chunks = []
    current_chunk = []
    current_length = 0
    buffer = []

    for char in text:
        buffer.append(char)
        current_length += 1

        # 达到最大长度时寻找分割点
        if current_length >= max_segment_length:
            split_pos = none
            # 逆向查找最佳分割点
            for i in range(len(buffer)-1, 0, -1):
                if buffer[i] in delimiter_priority:
                    if any(re.search(p, ''.join(buffer[:i+1])) for p in ignore_patterns):
                        continue
                    split_pos = i+1
                    break

            if split_pos:
                chunks.append(''.join(buffer[:split_pos]))
                buffer = buffer[split_pos:]
                current_length = len(buffer)
            else:
                # 强制分割
                chunks.append(''.join(buffer))
                buffer = []
                current_length = 0

    # 处理剩余内容
    if buffer:
        chunks.append(''.join(buffer))

    # 二次合并过短段落
    merged = []
    temp_buffer = []
    for chunk in chunks:
        chunk = chunk.strip()
        if not chunk:
            continue

        if len(chunk) < min_segment_length:
            temp_buffer.append(chunk)
            if sum(len(c) for c in temp_buffer) >= max_segment_length:
                merged.append(' '.join(temp_buffer))
                temp_buffer = []
        else:
            if temp_buffer:
                merged.append(' '.join(temp_buffer))
                temp_buffer = []
            merged.append(chunk)

    if temp_buffer:
        merged.append(' '.join(temp_buffer))

    return merged

async def convert_text(input_file: path, voice: str):
    """核心转换逻辑"""
    output_path = default_output_dir / f"{input_file.stem}.{voice}.mp3"
    output_path.parent.mkdir(parents=true, exist_ok=true)

    if output_path.exists():
        logger.info(f"跳过已存在文件:{output_path.name}")
        return

    try:
        # 读取文本文件
        with open(input_file, 'r', encoding='utf-8', errors='ignore') as f:
            text = f.read().strip()

        if not text:
            raise valueerror("输入文件为空")

        logger.info(f"原始文本长度:{len(text)}字符")

        # 智能分段
        chunks = smart_split_text(text)
        logger.info(f"生成有效分段:{len(chunks)}个")

        # 分段处理配置
        semaphore = asyncio.semaphore(5)  # 并发限制
        timeout = 30000                      # 单次请求超时
        max_retries = 3                   # 最大重试次数

        async def process_chunk(index, chunk):
            async with semaphore:
                temp_path = output_path.with_name(f"temp_{index:04d}.mp3")
                for attempt in range(max_retries):
                    try:
                        communicate = edge_tts.communicate(chunk, voice)
                        await asyncio.wait_for(communicate.save(temp_path), timeout)
                        logger.debug(f"分段{index}生成成功")
                        return temp_path
                    except exception as e:
                        logger.warning(f"分段{index}第{attempt+1}次尝试失败:{str(e)}")
                        if attempt == max_retries - 1:
                            logger.error(f"分段{index}最终失败")
                            return none
                        await asyncio.sleep(1)

        # 执行并行转换
        tasks = [process_chunk(i, c) for i, c in enumerate(chunks)]
        temp_files = await asyncio.gather(*tasks)

        # 合并音频文件
        valid_files = [tf for tf in temp_files if tf and tf.exists()]
        if not valid_files:
            raise runtimeerror("所有分段生成失败")

        combined = audiosegment.empty()
        for tf in valid_files:
            audio = audiosegment.from_mp3(tf)
            combined += audio.fade_in(50).fade_out(50)
            tf.unlink()

        combined.export(output_path, format="mp3", bitrate="192k")
        logger.info(f"最终音频时长:{len(combined)/1000:.2f}秒")

    except exception as e:
        logger.error(f"转换失败:{str(e)}")
        if output_path.exists():
            output_path.unlink()
        raise

async def batch_convert(input_file: path):
    """批量生成所有语音版本"""
    voices = await get_voices()
    logger.info(f"开始生成 {len(voices)} 种语音版本...")

    with tqdm(total=len(voices), desc="转换进度", unit="voice") as pbar:
        for voice in voices:
            output_path = default_output_dir / f"{input_file.stem}.{voice['key']}.mp3"
            pbar.set_postfix_str(f"当前:{voice['key']}")

            if output_path.exists():
                pbar.update(1)
                continue

            try:
                await convert_text(input_file, voice['key'])
            except exception as e:
                logger.error(f"{voice['key']} 生成失败:{str(e)}")
            finally:
                pbar.update(1)

def main():
    """主入口函数"""
    parser = argparse.argumentparser(
        description="edge-tts 批量生成工具 v2.0",
        formatter_class=argparse.rawtexthelpformatter
    )
    parser.add_argument("input", nargs='?', help="输入文本文件路径")
    parser.add_argument("-v", "--voice", help="指定语音模型(使用all生成全部)")
    parser.add_argument("-l", "--list", action='store_true', help="显示可用语音列表")
    parser.add_argument("-f", "--force", action='store_true', help="强制刷新语音缓存")

    args = parser.parse_args()

    if args.list:
        try:
            voices = asyncio.run(get_voices(args.force))
            print(format_voice_list(voices))
        except exception as e:
            logger.error(str(e))
        return

    if not args.input or not args.voice:
        logger.error("必须指定输入文件和语音参数")
        logger.info("示例:")
        logger.info('  python edge_tts.py "c:\\test.txt" -v zh-cn-xiaoxiaoneural')
        logger.info('  python edge_tts.py "c:\\test.txt" -v all')
        return

    input_path = path(args.input)
    if not input_path.exists():
        logger.error(f"文件不存在:{input_path}")
        return

    try:
        if args.voice.lower() == "all":
            asyncio.run(batch_convert(input_path))
        else:
            voices = asyncio.run(get_voices())
            if not any(v['key'] == args.voice for v in voices):
                logger.error("无效语音模型,可用选项:\n" + format_voice_list(voices))
                return
            asyncio.run(convert_text(input_path, args.voice))
    except exception as e:
        logger.error(f"致命错误:{str(e)}")

if __name__ == "__main__":
    main()

到此这篇关于python使用edge-tts实现文字转语音功能的文章就介绍到这了,更多相关python edge-tts文字转语音内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

  • 使用Python开发智能文件备份工具

    概述在数字化时代,数据备份已成为个人和企业数据管理的重要环节。本文将详细介绍如何使用python开发一款功能全面的桌面级文件备份工具,该工具不仅支持即时备份,还能实现定时自动备份、…

    2025年05月09日 前端脚本
  • Python 异步编程 asyncio简介及基本用法

    Python 异步编程 asyncio简介及基本用法

    1、asyncio是什么async io(异步输入输出)是一种编程模式,它允许程序在等待i/o操作(如文件读写、网络请求等)完成时继续执行其他任务。通过这种方式... [阅读全文]
  • Python实现剪贴板历史管理器

    Python实现剪贴板历史管理器

    一、概述:为什么需要剪贴板历史管理在日常工作和编程中,剪贴板是我们使用最频繁的功能之一。但windows自带的剪贴板只能保存最近一次的内容,当我们需要回溯之前复... [阅读全文]
  • Python+wxPython构建图像编辑器

    Python+wxPython构建图像编辑器

    引言图像编辑应用是学习 gui 编程和图像处理的绝佳项目。在本教程中,我们将使用 wxpython,一个跨平台的 python gui 工具包,构建一个简单的图... [阅读全文]
  • Python实现多格式文件预览工具

    Python实现多格式文件预览工具

    在日常办公或文件管理场景中,我们经常面临这样的问题:在一个文件夹中短时间内产生了大量不同类型的文件(如图片、pdf、word、excel),我们需要快速浏览和筛... [阅读全文]
  • python uv 基本使用教程

    python uv 基本使用教程

    以下是 uv 的使用教程。uv 是一个由 astral 开发的超快 python 包安装工具和解析器,使用 rust 编写,旨在替代 pip、pip-tools... [阅读全文]

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com