当前位置: 代码网 > it编程>编程语言>C/C++ > C++实现生产者与消费者模式方式

C++实现生产者与消费者模式方式

2025年12月28日 C/C++ 我要评论
多线程工作池创建workercount个工作线程(示例中为 3 个),每个线程执行相同的workerloop逻辑。线程通过condition_variable竞争任务队列中的任务,确保任务被均匀分发。

多线程工作池

  • 创建workercount个工作线程(示例中为 3 个),每个线程执行相同的workerloop逻辑。
  • 线程通过condition_variable竞争任务队列中的任务,确保任务被均匀分发。

线程安全保障

  • 任务队列的读写仍通过std::mutex保护,避免多线程竞争导致的数据错乱。
  • cv.notify_one()每次唤醒一个线程处理任务,cv.notify_all()在停止时唤醒所有线程退出。

任务分发逻辑

  • 生产者(主线程)提交任务时,通过notify_one()唤醒空闲线程,避免线程忙等。
  • 多个工作线程同时消费任务,提升任务处理效率(尤其适合 cpu/io 密集型任务)。
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
#include <chrono>
#include <thread>
#include <vector>

// 任务队列类型
using task = std::function<void()>;

int main() {
    std::mutex mtx;
    std::condition_variable cv;
    std::queue<task> taskqueue;
    bool stop = false; // 退出标志
    const size_t workercount = 3; // 工作线程数量
    std::vector<std::thread> workers; // 工作线程列表

    // ========== 工作线程循环:多线程消费任务 ==========
    auto workerloop = [&](int workerid) {
        while (true) {
            task task;

            // 加锁,获取任务或检测退出信号
            {
                std::unique_lock<std::mutex> lock(mtx);
                
                // 等待条件:有任务 或 需要停止
                cv.wait(lock, [&]() {
                    return !taskqueue.empty() || stop;
                });

                // 若停止且任务队列为空,退出循环
                if (stop && taskqueue.empty()) {
                    std::cout << "[线程" << workerid << "] 退出工作循环..." << std::endl;
                    break;
                }

                // 取出队列头部任务(多线程竞争,确保线程安全)
                task = std::move(taskqueue.front());
                taskqueue.pop();
                std::cout << "[线程" << workerid << "] 取出任务,准备执行..." << std::endl;
            } // 解锁,避免执行任务时持有锁

            // 执行任务
            if (task) {
                task();
            }
        }
    };

    // ========== 创建多个工作线程 ==========
    for (int i = 0; i < workercount; ++i) {
        workers.emplace_back(workerloop, i);
    }

    // ========== 模拟提交任务(生产者逻辑) ==========
    auto submittask = [&](task task) {
        std::lock_guard<std::mutex> lock(mtx);
        taskqueue.push(std::move(task));
        std::cout << "提交任务,当前队列大小:" << taskqueue.size() << std::endl;
        cv.notify_one(); // 唤醒一个等待的工作线程
    };

    // 批量提交10个任务
    for (int i = 0; i < 10; ++i) {
        submittask([i]() {
            std::cout << "执行任务" << i << ":线程id=" << std::this_thread::get_id() << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(200)); // 模拟任务耗时
        });
    }

    // 等待所有任务执行(可选,也可通过队列状态判断)
    std::this_thread::sleep_for(std::chrono::seconds(3));

    // ========== 停止所有工作线程 ==========
    {
        std::lock_guard<std::mutex> lock(mtx);
        stop = true;
        cv.notify_all(); // 唤醒所有等待的线程,确保全部退出
        std::cout << "\n通知所有线程停止..." << std::endl;
    }

    // 等待所有工作线程结束
    for (auto& worker : workers) {
        if (worker.joinable()) {
            worker.join();
        }
    }

    std::cout << "程序结束" << std::endl;
    return 0;
}

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com