介绍
cap 是一个基于 .net standard 的 c# 库,它是一种处理分布式事务的解决方案,同样具有 eventbus 的功能,它具有轻量级、易使用、高性能等特点。
新建项目
新建.net7web项目
安装依赖包
安装软件
安装redis和sql server
修改代码
新建redisconfigmodel
namespace capstu01.models; public class redisconfigmodel { /// <summary> /// 服务器地址 /// </summary> public string host { get; set; } /// <summary> /// 端口号 /// </summary> public int port { get; set; } /// <summary> /// 密码 /// </summary> public string pwd { get; set; } }
修改appsettings.json
{ "logging": { "loglevel": { "default": "information", "microsoft.aspnetcore": "warning" } }, "connectionstrings": { "sqlserver": "server=127.0.0.1;user id=sa;password=xxxx;database=capstu;encrypt=true;trustservercertificate=true;connection timeout=600;" }, "redisconfig": { "host": "127.0.0.1", "port": 6379, "pwd": "" } }
修改program.cs
using capstu01.models; var builder = webapplication.createbuilder(args); builder.services.addcontrollers(); var redisconfig = builder.configuration.getsection("redisconfig").get<redisconfigmodel>(); var connectionstr = builder.configuration.getconnectionstring("sqlserver") ?? ""; builder.services.addcap(x => { x.useredis(options => { if (options.configuration != null && redisconfig != null) { options.configuration.endpoints.add(redisconfig.host, redisconfig.port); options.configuration.defaultdatabase = 0; options.configuration.password = redisconfig?.pwd ?? ""; } }); x.usesqlserver(sqlserveroptions => { sqlserveroptions.schema = "dbo"; sqlserveroptions.connectionstring = connectionstr; }); //开启面板 x.usedashboard(d => { //允许匿名访问 d.allowanonymousexplicit = true; }); }); var app = builder.build(); app.userouting(); app.mapcontrollers(); app.run();
新建homecontroller
using dotnetcore.cap; using microsoft.aspnetcore.mvc; namespace capstu01.controllers; [apicontroller] public class homecontroller:controllerbase { public homecontroller() { } /// <summary> /// 发送消息 /// </summary> /// <returns></returns> [httpget("/")] public iactionresult index([fromservices]icappublisher capbus) { capbus.publish("test.show.time","你好,cap"); return content("发送消息成功"); } /// <summary> /// 接受消息 /// </summary> /// <param name="data"></param> [nonaction] [capsubscribe("test.show.time")] public void receivemessage(string data) { console.writeline("message data is:" + data); } }
结果
如果使用redis需要定期清理streams内容
安装freeredis,修改program.cs
builder.services.addsingleton<iredisclient>(new redisclient($"{redisconfig.host}:{redisconfig.port},password={redisconfig.pwd},defaultdatabase=0"));
新增清除方法
private readonly iredisclient _redisclient; public homecontroller(iredisclient redisclient) { _redisclient = redisclient; } /// <summary> /// 清除已处理的redis数据 /// </summary> /// <returns></returns> [httpget("/clear")] public iactionresult clearackstream() { var groups = _redisclient.xinfogroups("test.show.time"); var unreandmsgs = new list<string>(); //获取所有的未读消息 foreach (var group in groups) { if (group.pending > 0) { //有未读消息 var unreadlist = _redisclient.xpending("test.show.time", group.name); if (unreadlist.count > 0) { var groupinfo = _redisclient.xpending("test.show.time", group.name); var unreandlist = _redisclient.xpending("test.show.time", group.name, groupinfo.minid, groupinfo.maxid, groupinfo.count); foreach (var unre in unreandlist) { unreandmsgs.add(unre.id); } } } } //获取全部的消息 var allmsgs = _redisclient.xrange("test.show.time", "-", "+"); foreach (var msg in allmsgs) { if (unreandmsgs.contains(msg.id)) { //这个消息未读则跳过 continue; } //删除已处理的消息 _redisclient.xdel("test.show.time", msg.id); } return content($"共处理未读消息:{unreandmsgs.count}个,已读消息{allmsgs.length}个"); }
以上就是.net使用cap实现消息异步处理的详细内容,更多关于.net cap消息处理的资料请关注代码网其它相关文章!
发表评论