基于 brpc+etcd + 百度 ai sdk 的分布式语音识别服务实践:从代码架构到踩坑复盘
一、项目背景与核心功能
最近基于 c++ 实现了一个分布式语音识别子服务,核心目标是提供高可用的 rpc 接口,支持客户端上传 pcm 音频文件并返回识别结果。技术栈选型如下:
- rpc 框架:brpc(百度开源高性能 rpc 框架,支持多种协议);
- 数据序列化:protobuf(定义 rpc 接口和数据结构);
- 服务注册与发现:etcd(分布式键值存储,实现服务上下线感知);
- 语音识别能力:百度 ai 语音 sdk(提供成熟的 pcm 音频转文字能力);
- 日志与配置:spdlog(高性能日志库)、gflags(命令行参数解析)。
项目分为服务端和客户端两部分:
- 服务端:实现 rpc 服务、注册到 etcd、封装百度 ai sdk 调用;
- 客户端:通过 etcd 发现服务、读取音频文件、发起 rpc 请求。
二、核心代码架构解析
为了保证代码的可扩展性和可维护性,采用 “模块化 + builder 模式” 设计,各组件职责单一,解耦清晰。
1. 整体架构概览
语音识别服务 ├─ 服务端(speech\_server) │ ├─ rpc服务实现(speechserviceimpl):处理语音识别请求 │ ├─ 服务构建器(speechserverbuilder):组装各模块(asr、注册、rpc) │ ├─ 语音识别封装(asrclient):调用百度ai sdk │ ├─ 服务注册(registry):将服务节点注册到etcd │ └─ 日志配置:初始化spdlog日志 └─ 客户端(speech\_client) ├─ 服务发现(discovery):从etcd获取服务节点 ├─ 信道管理(servicemanager):rr轮询负载均衡 └─ 音频读取:调用百度ai sdk工具函数读取pcm文件
2. 关键模块代码解析
(1)rpc 接口定义(protobuf)
首先通过speech.proto定义 rpc 服务和数据结构,明确请求(音频数据)和响应(识别结果)格式:
syntax = "proto3";
package zrt; // 命名空间,避免类名冲突
option cc_generic_services = true; // 生成c++ rpc服务代码
// 语音识别请求
message speechrecognitionreq {
string request_id = 1; // 请求id(用于追踪)
bytes speech_content = 2; // 核心:pcm音频数据(二进制)
optional string user_id = 3; // 可选:用户id
optional string session_id = 4; // 可选:会话id(鉴权用)
}
// 语音识别响应
message speechrecognitionrsp {
string request_id = 1; // 对应请求的id
bool success = 2; // 识别是否成功
optional string errmsg = 3; // 失败原因(success=false时必选)
optional string recognition_result = 4; // 识别结果(success=true时必选)
}
// rpc服务定义
service speechservice {
rpc speechrecognition(speechrecognitionreq) returns (speechrecognitionrsp);
}通过protoc编译生成speech.pb.cc和speech.pb.h,为 rpc 服务提供基础代码。
(2)语音识别封装(asrclient)
封装百度 ai sdk 的调用逻辑,对外提供简洁的recognize接口,隐藏 sdk 细节:
#pragma once
#include "../third/include/aip-cpp-sdk/speech.h"
#include "logger.hpp"
namespace zrt {
class asrclient {
public:
using ptr = std::shared_ptr<asrclient>;
// 初始化:传入百度ai的appid、apikey、secretkey
asrclient(const std::string &app_id, const std::string &api_key, const std::string &secret_key)
: _client(app_id, api_key, secret_key) {}
// 核心接口:输入pcm音频数据,输出识别结果
std::string recognize(const std::string &speech_data, std::string &err) {
// 调用百度sdk:pcm格式(16k采样率)
json::value result = _client.recognize(speech_data, "pcm", 16000, aip::null);
// 处理sdk返回:err_no=0表示成功
if (result["err_no"].asint() != 0) {
log_error("语音识别失败:{}", result["err_msg"].asstring());
err = result["err_msg"].asstring(); // 传出错误信息
return "";
}
return result["result"][0].asstring(); // 返回第一个识别结果
}
private:
aip::speech _client; // 百度ai sdk的speech客户端
};
}(3)rpc 服务实现(speechserviceimpl)
继承 protobuf 生成的服务基类,实现speechrecognition接口,处理客户端请求:
class speechserviceimpl : public zrt::speechservice {
public:
// 注入asrclient实例(依赖注入,解耦服务与asr实现)
speechserviceimpl(const asrclient::ptr &asr_client) : _asr_client(asr_client) {}
void speechrecognition(google::protobuf::rpccontroller* controller,
const ::zrt::speechrecognitionreq* request,
::zrt::speechrecognitionrsp* response,
::google::protobuf::closure* done) {
log_debug("收到语音转文字请求!request_id: {}", request->request_id());
brpc::closureguard rpc_guard(done); // 自动释放closure,避免内存泄漏
// 1. 调用asrclient识别音频
std::string err;
std::string res = _asr_client->recognize(request->speech_content(), err);
// 2. 组装响应
response->set_request_id(request->request_id());
if (res.empty()) {
// 识别失败:设置错误信息
response->set_success(false);
response->set_errmsg("语音识别失败:" + err);
return;
}
// 识别成功:返回结果
response->set_success(true);
response->set_recognition_result(res);
}
private:
asrclient::ptr _asr_client; // 语音识别客户端
};(4)服务注册与发现(etcd 集成)
- 服务注册(registry):将服务节点注册到 etcd,并通过 lease(租约)维持节点存活,服务下线时自动删除;
- 服务发现(discovery):监听 etcd 的服务目录,感知服务上下线,并回调更新信道。
以服务发现为例,核心代码:
class discovery {
public:
using ptr = std::shared_ptr<discovery>;
// 回调类型:服务上下线时通知外部(如更新信道)
using notifycallback = std::function<void(std::string, std::string)>;
// 初始化:连接etcd、拉取现有服务、监听变化
discovery(const std::string &host, const std::string &basedir,
const notifycallback &put_cb, const notifycallback &del_cb)
: _client(std::make_shared<etcd::client>(host)), _put_cb(put_cb), _del_cb(del_cb) {
// 1. 拉取现有服务节点(服务启动时初始化)
auto resp = _client->ls(basedir).get();
if (!resp.is_ok()) {
log_error("获取服务列表失败:{}", resp.error_message());
return;
}
// 遍历现有节点,调用上线回调
for (int i = 0; i < resp.keys().size(); ++i) {
if (_put_cb) _put_cb(resp.key(i), resp.value(i).as_string());
}
// 2. 监听etcd目录变化(实时感知上下线)
_watcher = std::make_shared<etcd::watcher>(
*_client.get(), basedir,
std::bind(&discovery::callback, this, std::placeholders::_1),
true // 递归监听子目录
);
}
private:
// etcd事件回调:处理put(上线)和delete(下线)事件
void callback(const etcd::response &resp) {
if (!resp.is_ok()) {
log_error("etcd事件错误:{}", resp.error_message());
return;
}
for (auto &ev : resp.events()) {
if (ev.event_type() == etcd::event::put) {
log_debug("服务上线:{}-{}", ev.kv().key(), ev.kv().as_string());
if (_put_cb) _put_cb(ev.kv().key(), ev.kv().as_string());
} else if (ev.event_type() == etcd::event::delete_) {
log_debug("服务下线:{}-{}", ev.prev_kv().key(), ev.prev_kv().as_string());
if (_del_cb) _del_cb(ev.prev_kv().key(), ev.prev_kv().as_string());
}
}
}
private:
notifycallback _put_cb; // 服务上线回调
notifycallback _del_cb; // 服务下线回调
std::shared_ptr<etcd::client> _client; // etcd客户端
std::shared_ptr<etcd::watcher> _watcher; // etcd监听器
};(5)信道管理与负载均衡(servicemanager)
客户端通过servicemanager管理 rpc 信道,采用 rr(round-robin)轮询策略实现负载均衡,避免单节点压力过大:
class servicemanager {
public:
using ptr = std::shared_ptr<servicemanager>;
// 声明需要关注的服务(只处理声明过的服务)
void declared(const std::string &service_name) {
std::unique_lock<std::mutex> lock(_mutex);
_follow_services.insert(service_name);
}
// 服务上线回调:添加信道
void onserviceonline(const std::string &service_instance, const std::string &host) {
std::string service_name = getservicename(service_instance);
// 只处理关注的服务
if (_follow_services.count(service_name) == 0) {
log_debug("{}服务上线,无需关注", service_name);
return;
}
// 获取或创建服务的信道管理对象
auto service = getorcreateservicechannel(service_name);
service->append(host); // 添加新节点的信道
}
// 选择一个信道(rr轮询)
servicechannel::channelptr choose(const std::string &service_name) {
std::unique_lock<std::mutex> lock(_mutex);
auto it = _services.find(service_name);
if (it == _services.end()) {
log_error("无{}服务的可用节点", service_name);
return nullptr;
}
return it->second->choose(); // 调用servicechannel的rr逻辑
}
private:
// 从实例名中提取服务名(如/service/speech_service/instance → /service/speech_service)
std::string getservicename(const std::string &service_instance) {
auto pos = service_instance.find_last_of('/');
return pos == std::string::npos ? service_instance : service_instance.substr(0, pos);
}
private:
std::mutex _mutex; // 线程安全锁
std::unordered_set<std::string> _follow_services; // 关注的服务列表
// 服务名 → 信道管理对象的映射
std::unordered_map<std::string, servicechannel::ptr> _services;
};三、核心问题与解决方案(踩坑复盘)
在项目开发过程中,遇到了多个编译期和运行期问题,以下是关键问题的排查过程和解决方案,均为 c++ 分布式服务开发中的常见坑。
1. 编译期:百度 ai sdk 的toupper重载歧义
问题现象
编译客户端时,报std::transform调用toupper的重载歧义错误:
error: no matching function for call to ‘transform(..., <unresolved overloaded function type>)' note: couldn't deduce template parameter ‘_unaryoperation'
原因分析
c++ 中有两个toupper版本,编译器无法确定使用哪个:
<cctype>中的int toupper(int c):处理单个字符,参数为int(兼容 eof);<locale>中的template <class chart> chart toupper(chart c, const locale& loc):带本地化参数的模板函数。
百度 ai sdk 的utils.h中直接调用std::transform(..., toupper),未明确版本,导致歧义。
解决方案
用lambda 表达式显式指定toupper版本,消除歧义,并处理char类型转换(避免负数问题):
// 修改前(sdk原代码,错误)
std::transform(src.begin(), src.end(), src.begin(), toupper);
// 修改后(正确)
std::transform(src.begin(), src.end(), src.begin(),
[](unsigned char c) { // 转unsigned char,避免char负数(如中文乱码)
return static_cast<char>(std::toupper(c)); // 显式调用<cctype>版本
}
);关键思路:lambda 作为 “中间层”,明确参数类型和函数版本,让编译器无需猜测。
2. 编译期:函数漏写return语句的警告
问题现象
修改utils.h后,编译报 “无返回语句” 警告:
warning: no return statement in function returning non-void [-wreturn-type]
原因分析
to_upper/to_lower函数声明返回std::string,但修改时不小心删除了return src;语句,导致函数无返回值(c++ 中属于未定义行为,编译器宽容处理为警告,但运行时可能返回随机值)。
解决方案
补全return语句,确保函数返回处理后的字符串:
std::string aip::to_upper(std::string src) {
std::transform(...); // 处理逻辑
return src; // 补全返回语句
}
3. 运行期:音频文件读取失败(invalid audio length)
问题现象
客户端运行时,输出file_content.size() = 0,百度 ai sdk 返回 “invalid audio length”:
0 语音识别失败:invalid audio length
原因分析
- 路径错误:客户端用相对路径
"16k.pcm",但运行目录(如build/)下无此文件; - 文件权限:文件存在但无读权限;
- 格式错误:文件不是百度 sdk 要求的 “16khz 采样率、16 位深度、单声道”pcm。
解决方案
- 使用绝对路径:明确指定文件位置,避免相对路径陷阱:
// 修改前
aip::get_file_content("16k.pcm", &file_content);
// 修改后(替换为实际路径)
aip::get_file_content("/home/zrt/workspace/16k.pcm", &file_content);
- 验证文件权限:
# 查看权限,确保有r(读)权限 ls -l 16k.pcm # 无权限则添加 chmod +r 16k.pcm
- 验证 pcm 格式:用
ffmpeg转换为标准格式:
# 将任意音频转为16k、16位、单声道pcm ffmpeg -i test.wav -ar 16000 -ac 1 -sample_fmt s16le 16k.pcm
4. 运行期:etcdwatcher警告(watcher doesn't exit normally)
问题现象
程序退出时,报watcher doesn't exit normally警告。
原因分析
discovery的watcher线程未正常停止,程序退出时强制终止线程导致警告。
解决方案
在discovery析构函数中主动取消watcher:
~discovery() {
_watcher->cancel(); // 主动停止监听器
}四、项目运行流程
- 服务端启动:
# 运行客户端(发现服务并发起请求) ./speech_client --etcd_host=http://127.0.0.1:2379 --speech_service=/service/speech_service
- 客户端调用:
# 运行客户端(发现服务并发起请求) ./speech_client --etcd_host=http://127.0.0.1:2379 --speech_service=/service/speech_service
- 成功输出:
12345 # file_content.size(),非0表示读取成功 收到响应: 111111 收到响应: 你好,世界
五、总结与经验
- 第三方 sdk 踩坑:第三方库代码可能不严谨(如百度 sdk 的
toupper歧义),需针对性修改,修改时注意保留原功能; - 分布式服务核心:服务注册发现(etcd)和负载均衡(rr)是分布式服务的基石,需保证高可用和线程安全;
- c++ 编译问题:编译错误需重点看
error:前的具体代码行,尤其是模板推导失败(如重载歧义),可通过显式类型或 lambda 解决; - 路径与权限:文件操作尽量用绝对路径,避免运行目录依赖;权限问题在 linux 下容易被忽略,需提前验证。
这个项目不仅实现了语音识别的核心功能,更重要的是梳理了 c++ 分布式服务的开发流程和问题排查思路,后续可扩展多节点部署、熔断降级等高级特性。
到此这篇关于c++分布式语音识别服务实践的文章就介绍到这了,更多相关c++语音识别内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论