一、rabbitmq是什么?
rabbitmq是一个开源的消息代理软件,实现了amqp(高级消息队列协议),用来实现应用程序之间的异步通信。简单说,它就像一个"消息邮局",生产者把消息投递到"邮局",消费者从"邮局"取走消息。
rabbitmq:企业级消息中间件
rabbitmq是一个开源的消息队列中间件,作用是:
✅ 系统解耦:不同应用间通过消息传递通信,无需直接依赖 ✅ 异步处理:将耗时操作放入队列,提升系统响应速度 ✅ 流量削峰:在高并发场景下缓冲请求,避免系统崩溃 ✅ 高可用:支持镜像队列和仲裁队列,保证服务可用性
典型应用:电商平台用户注册后,通过rabbitmq异步发送注册邮件和短信;电商大促时,用消息队列缓冲订单,避免下游系统被压垮。
rabbitmq是用erlang语言写的,但c#有很好的客户端库支持,特别适合企业级应用。
二、环境准备
2.1. 安装rabbitmq服务器
- windows:下载安装rabbitmq(官网下载),安装时记得同时安装erlang
- linux:使用yum或apt安装,或者用docker
重要提示:安装完成后,启用web管理插件(rabbitmq-plugins enable rabbitmq_management),然后访问http://localhost:15672/,默认账号guest/guest
2.2. 安装c#客户端库
# 使用nuget安装rabbitmq.client(官方客户端) install-package rabbitmq.client # 如果喜欢更简单的封装,可以安装easynetq(基于rabbitmq.client) install-package easynetq
三、基础使用示例
3.1. 简单发布/订阅模式(使用rabbitmq.client)
发布者(publisher):
using rabbitmq.client;
using system.text;
class program
{
static void main(string[] args)
{
var factory = new connectionfactory() { hostname = "localhost" };
using (var connection = factory.createconnection())
using (var channel = connection.createmodel())
{
// 声明交换机(使用fanout类型,适合广播)
channel.exchangedeclare(exchange: "logs", type: "fanout");
// 发送消息
string message = "hello rabbitmq from c#!";
var body = encoding.utf8.getbytes(message);
channel.basicpublish(exchange: "logs",
routingkey: "",
basicproperties: null,
body: body);
console.writeline($" [x] sent '{message}'");
}
}
}订阅者(consumer):
using rabbitmq.client;
using rabbitmq.client.events;
using system;
using system.text;
class program
{
static void main(string[] args)
{
var factory = new connectionfactory() { hostname = "localhost" };
using (var connection = factory.createconnection())
using (var channel = connection.createmodel())
{
// 声明相同交换机
channel.exchangedeclare(exchange: "logs", type: "fanout");
// 创建临时队列(会自动删除)
var queuename = channel.queuedeclare().queuename;
// 绑定队列到交换机
channel.queuebind(queue: queuename,
exchange: "logs",
routingkey: "");
// 创建消费者
var consumer = new eventingbasicconsumer(channel);
consumer.received += (model, ea) =>
{
var body = ea.body.toarray();
var message = encoding.utf8.getstring(body);
console.writeline($" [x] received '{message}'");
};
channel.basicconsume(queue: queuename,
autoack: true,
consumer: consumer);
console.writeline(" [*] waiting for messages. to exit press ctrl+c");
console.readline();
}
}
}四、高级特性
4.1. 持久化消息(确保消息不丢失)
重要:必须同时设置交换机、队列、消息为持久化,才能保证消息不丢失
// 发布者
channel.exchangedeclare(exchange: "persistent", type: "direct", durable: true);
channel.queuedeclare(queue: "persistent_queue", durable: true);
channel.queuebind(queue: "persistent_queue", exchange: "persistent", routingkey: "key");
var message = "persistent message";
var body = encoding.utf8.getbytes(message);
var properties = channel.createbasicproperties();
properties.persistent = true; // 关键!设置为持久化
channel.basicpublish(exchange: "persistent",
routingkey: "key",
basicproperties: properties,
body: body);4.2. 使用easynetq简化代码
easynetq是rabbitmq.client的封装,让代码更简洁:
// 安装easynetq
// install-package easynetq
using easynetq;
using system;
class program
{
static void main()
{
// 创建连接
var bus = rabbithutch.createbus("host=localhost");
// 发布消息
bus.publish(new message { text = "hello easynetq!" });
// 订阅消息
bus.subscribe<message>("my-queue", message =>
{
console.writeline($"received: {message.text}");
});
console.writeline("waiting for messages...");
console.readline();
}
}五、实际应用场景
5.1.电商订单处理(解耦系统)
// 订单服务(生产者)
public void createorder(order order)
{
// 处理订单逻辑...
// 发布订单消息到rabbitmq
var bus = rabbithutch.createbus("host=localhost");
bus.publish(new ordercreatedevent { orderid = order.id });
}
// 邮件服务(消费者)
bus.subscribe<ordercreatedevent>("order-events", order =>
{
// 发送确认邮件
emailservice.sendorderconfirmation(order.orderid);
});5.2.日志收集系统
// 日志服务(生产者)
public void log(string message)
{
var bus = rabbithutch.createbus("host=localhost");
bus.publish(new logmessage { message = message, timestamp = datetime.utcnow });
}
// 日志处理服务(消费者)
bus.subscribe<logmessage>("log-queue", log =>
{
// 保存到数据库或文件
logrepository.save(log);
});六、专业建议
连接管理:使用连接池,避免频繁创建连接
// 使用连接工厂创建连接
var factory = new connectionfactory { hostname = "localhost" };
using (var connection = factory.createconnection())
{
// 用同一个连接创建多个channel
using (var channel = connection.createmodel())
{
// ...
}
}- 错误处理:不要只检查连接状态,要处理所有可能的异常
try
{
// 消息处理逻辑
}
catch (exception ex)
{
// 记录错误并重试
console.writeline($"处理消息失败: {ex.message}");
}- 队列持久化:生产环境中,几乎所有的队列都应该是持久化的
channel.queuedeclare(queue: "my-queue",
durable: true,
exclusive: false,
autodelete: false);- 消息确认:使用手动确认(manual acknowledgment)确保消息被正确处理
channel.basicconsume(queue: "my-queue",
autoack: false, // 关键!设置为false
consumer: consumer);七、常见问题
q: rabbitmq和mqtt有什么区别? a: rabbitmq是企业级消息中间件,适合应用间通信;mqtt是物联网专用轻量级协议,适合设备间通信。两者可以结合使用,例如用mqtt收集物联网设备数据,用rabbitmq处理业务逻辑。
q: 为什么我的消息不见了? a: 可能原因:
- 队列没有持久化
- 没有正确绑定交换机和队列
- 消费者没有正确订阅
- 交换机类型不匹配(如用direct交换机但用fanout方式发送)
q: 如何监控rabbitmq? a: 用web管理界面(http://localhost:15672),或者用rabbitmq的api进行监控。
八、其他类似中间件对比总结表

九、选择建议
- 如果你需要低延迟、简单路由 → 选rabbitmq
- 如果你处理海量数据、日志收集 → 选kafka
- 如果你需要顺序消息、电商场景 → 选rocketmq
- 如果你需要多协议支持、复杂路由 → 选activemq
- 如果你追求极致速度、不关心数据丢失 → 选zeromq
- 如果你做云原生应用、需要长期存储 → 选pulsar
到此这篇关于c#中rabbitmq的使用小结的文章就介绍到这了,更多相关c# rabbitmq使用内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论