介绍
cap 是一个基于 .net standard 的 c# 库,它是一种处理分布式事务的解决方案,同样具有 eventbus 的功能,它具有轻量级、易使用、高性能等特点。
新建项目
新建.net7web项目

安装依赖包

安装软件
安装redis和sql server
修改代码
新建redisconfigmodel
namespace capstu01.models;
public class redisconfigmodel
{
/// <summary>
/// 服务器地址
/// </summary>
public string host { get; set; }
/// <summary>
/// 端口号
/// </summary>
public int port { get; set; }
/// <summary>
/// 密码
/// </summary>
public string pwd { get; set; }
}
修改appsettings.json
{
"logging": {
"loglevel": {
"default": "information",
"microsoft.aspnetcore": "warning"
}
},
"connectionstrings": {
"sqlserver": "server=127.0.0.1;user id=sa;password=xxxx;database=capstu;encrypt=true;trustservercertificate=true;connection timeout=600;"
},
"redisconfig": {
"host": "127.0.0.1",
"port": 6379,
"pwd": ""
}
}
修改program.cs
using capstu01.models;
var builder = webapplication.createbuilder(args);
builder.services.addcontrollers();
var redisconfig = builder.configuration.getsection("redisconfig").get<redisconfigmodel>();
var connectionstr = builder.configuration.getconnectionstring("sqlserver") ?? "";
builder.services.addcap(x =>
{
x.useredis(options =>
{
if (options.configuration != null && redisconfig != null)
{
options.configuration.endpoints.add(redisconfig.host, redisconfig.port);
options.configuration.defaultdatabase = 0;
options.configuration.password = redisconfig?.pwd ?? "";
}
});
x.usesqlserver(sqlserveroptions =>
{
sqlserveroptions.schema = "dbo";
sqlserveroptions.connectionstring = connectionstr;
});
//开启面板
x.usedashboard(d =>
{
//允许匿名访问
d.allowanonymousexplicit = true;
});
});
var app = builder.build();
app.userouting();
app.mapcontrollers();
app.run();
新建homecontroller
using dotnetcore.cap;
using microsoft.aspnetcore.mvc;
namespace capstu01.controllers;
[apicontroller]
public class homecontroller:controllerbase
{
public homecontroller()
{
}
/// <summary>
/// 发送消息
/// </summary>
/// <returns></returns>
[httpget("/")]
public iactionresult index([fromservices]icappublisher capbus)
{
capbus.publish("test.show.time","你好,cap");
return content("发送消息成功");
}
/// <summary>
/// 接受消息
/// </summary>
/// <param name="data"></param>
[nonaction]
[capsubscribe("test.show.time")]
public void receivemessage(string data)
{
console.writeline("message data is:" + data);
}
}
结果



如果使用redis需要定期清理streams内容
安装freeredis,修改program.cs
builder.services.addsingleton<iredisclient>(new redisclient($"{redisconfig.host}:{redisconfig.port},password={redisconfig.pwd},defaultdatabase=0"));
新增清除方法
private readonly iredisclient _redisclient;
public homecontroller(iredisclient redisclient)
{
_redisclient = redisclient;
}
/// <summary>
/// 清除已处理的redis数据
/// </summary>
/// <returns></returns>
[httpget("/clear")]
public iactionresult clearackstream()
{
var groups = _redisclient.xinfogroups("test.show.time");
var unreandmsgs = new list<string>();
//获取所有的未读消息
foreach (var group in groups)
{
if (group.pending > 0)
{
//有未读消息
var unreadlist = _redisclient.xpending("test.show.time", group.name);
if (unreadlist.count > 0)
{
var groupinfo = _redisclient.xpending("test.show.time", group.name);
var unreandlist = _redisclient.xpending("test.show.time", group.name, groupinfo.minid, groupinfo.maxid,
groupinfo.count);
foreach (var unre in unreandlist)
{
unreandmsgs.add(unre.id);
}
}
}
}
//获取全部的消息
var allmsgs = _redisclient.xrange("test.show.time", "-", "+");
foreach (var msg in allmsgs)
{
if (unreandmsgs.contains(msg.id))
{
//这个消息未读则跳过
continue;
}
//删除已处理的消息
_redisclient.xdel("test.show.time", msg.id);
}
return content($"共处理未读消息:{unreandmsgs.count}个,已读消息{allmsgs.length}个");
}

以上就是.net使用cap实现消息异步处理的详细内容,更多关于.net cap消息处理的资料请关注代码网其它相关文章!
发表评论