多线程工作池
- 创建
workercount个工作线程(示例中为 3 个),每个线程执行相同的workerloop逻辑。 - 线程通过
condition_variable竞争任务队列中的任务,确保任务被均匀分发。
线程安全保障
- 任务队列的读写仍通过
std::mutex保护,避免多线程竞争导致的数据错乱。 cv.notify_one()每次唤醒一个线程处理任务,cv.notify_all()在停止时唤醒所有线程退出。
任务分发逻辑
- 生产者(主线程)提交任务时,通过
notify_one()唤醒空闲线程,避免线程忙等。 - 多个工作线程同时消费任务,提升任务处理效率(尤其适合 cpu/io 密集型任务)。
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
#include <chrono>
#include <thread>
#include <vector>
// 任务队列类型
using task = std::function<void()>;
int main() {
std::mutex mtx;
std::condition_variable cv;
std::queue<task> taskqueue;
bool stop = false; // 退出标志
const size_t workercount = 3; // 工作线程数量
std::vector<std::thread> workers; // 工作线程列表
// ========== 工作线程循环:多线程消费任务 ==========
auto workerloop = [&](int workerid) {
while (true) {
task task;
// 加锁,获取任务或检测退出信号
{
std::unique_lock<std::mutex> lock(mtx);
// 等待条件:有任务 或 需要停止
cv.wait(lock, [&]() {
return !taskqueue.empty() || stop;
});
// 若停止且任务队列为空,退出循环
if (stop && taskqueue.empty()) {
std::cout << "[线程" << workerid << "] 退出工作循环..." << std::endl;
break;
}
// 取出队列头部任务(多线程竞争,确保线程安全)
task = std::move(taskqueue.front());
taskqueue.pop();
std::cout << "[线程" << workerid << "] 取出任务,准备执行..." << std::endl;
} // 解锁,避免执行任务时持有锁
// 执行任务
if (task) {
task();
}
}
};
// ========== 创建多个工作线程 ==========
for (int i = 0; i < workercount; ++i) {
workers.emplace_back(workerloop, i);
}
// ========== 模拟提交任务(生产者逻辑) ==========
auto submittask = [&](task task) {
std::lock_guard<std::mutex> lock(mtx);
taskqueue.push(std::move(task));
std::cout << "提交任务,当前队列大小:" << taskqueue.size() << std::endl;
cv.notify_one(); // 唤醒一个等待的工作线程
};
// 批量提交10个任务
for (int i = 0; i < 10; ++i) {
submittask([i]() {
std::cout << "执行任务" << i << ":线程id=" << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // 模拟任务耗时
});
}
// 等待所有任务执行(可选,也可通过队列状态判断)
std::this_thread::sleep_for(std::chrono::seconds(3));
// ========== 停止所有工作线程 ==========
{
std::lock_guard<std::mutex> lock(mtx);
stop = true;
cv.notify_all(); // 唤醒所有等待的线程,确保全部退出
std::cout << "\n通知所有线程停止..." << std::endl;
}
// 等待所有工作线程结束
for (auto& worker : workers) {
if (worker.joinable()) {
worker.join();
}
}
std::cout << "程序结束" << std::endl;
return 0;
}总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论