当前位置: 代码网 > it编程>编程语言>C# > .net使用cap实现消息异步处理

.net使用cap实现消息异步处理

2024年06月14日 C# 我要评论
介绍cap 是一个基于 .net standard 的 c# 库,它是一种处理分布式事务的解决方案,同样具有 eventbus 的功能,它具有轻量级、易使用、高性能等特点。新建项目新建.net7web

介绍

cap 是一个基于 .net standard 的 c# 库,它是一种处理分布式事务的解决方案,同样具有 eventbus 的功能,它具有轻量级、易使用、高性能等特点。

新建项目

新建.net7web项目

安装依赖包

安装软件

安装redis和sql server

修改代码

新建redisconfigmodel

namespace capstu01.models;

public class redisconfigmodel
{
    /// <summary>
    /// 服务器地址
    /// </summary>
    public string host { get; set; }

    /// <summary>
    /// 端口号
    /// </summary>
    public int port { get; set; }

    /// <summary>
    /// 密码
    /// </summary>
    public string pwd { get; set; }
}

修改appsettings.json

{
  "logging": {
    "loglevel": {
      "default": "information",
      "microsoft.aspnetcore": "warning"
    }
  },
  "connectionstrings": {
    "sqlserver": "server=127.0.0.1;user id=sa;password=xxxx;database=capstu;encrypt=true;trustservercertificate=true;connection timeout=600;"
  },
  "redisconfig": {
    "host": "127.0.0.1",
    "port": 6379,
    "pwd": ""
  }
}

修改program.cs

using capstu01.models;

var builder = webapplication.createbuilder(args);
builder.services.addcontrollers();
var redisconfig = builder.configuration.getsection("redisconfig").get<redisconfigmodel>();
var connectionstr = builder.configuration.getconnectionstring("sqlserver") ?? "";
builder.services.addcap(x =>
{
    x.useredis(options =>
    {
        if (options.configuration != null && redisconfig != null)
        {
            options.configuration.endpoints.add(redisconfig.host, redisconfig.port);
            options.configuration.defaultdatabase = 0;
            options.configuration.password = redisconfig?.pwd ?? "";
        }
    });
    x.usesqlserver(sqlserveroptions =>
    {
        sqlserveroptions.schema = "dbo";
        sqlserveroptions.connectionstring = connectionstr;
    });
    //开启面板
    x.usedashboard(d =>
    {
        //允许匿名访问
        d.allowanonymousexplicit = true;
    });
});
var app = builder.build();

app.userouting();
app.mapcontrollers();
app.run();

新建homecontroller

using dotnetcore.cap;
using microsoft.aspnetcore.mvc;

namespace capstu01.controllers;

[apicontroller]
public class homecontroller:controllerbase
{
    public homecontroller()
    {
        
    }

    /// <summary>
    /// 发送消息
    /// </summary>
    /// <returns></returns>
    [httpget("/")]
    public iactionresult index([fromservices]icappublisher capbus)
    {
        capbus.publish("test.show.time","你好,cap");
        return content("发送消息成功");
    }
    
    /// <summary>
    /// 接受消息
    /// </summary>
    /// <param name="data"></param>
    [nonaction]
    [capsubscribe("test.show.time")]
    public void receivemessage(string data)
    {
        console.writeline("message data is:" + data);
    }
}

结果

如果使用redis需要定期清理streams内容

安装freeredis,修改program.cs

builder.services.addsingleton<iredisclient>(new redisclient($"{redisconfig.host}:{redisconfig.port},password={redisconfig.pwd},defaultdatabase=0"));

新增清除方法

private readonly iredisclient _redisclient;

public homecontroller(iredisclient redisclient)
{
    _redisclient = redisclient;
}

/// <summary>
/// 清除已处理的redis数据
/// </summary>
/// <returns></returns>
[httpget("/clear")]
public iactionresult clearackstream()
{
    var groups = _redisclient.xinfogroups("test.show.time");
    var unreandmsgs = new list<string>();
    //获取所有的未读消息
    foreach (var group in groups)
    {
        if (group.pending > 0)
        {
            //有未读消息
            var unreadlist = _redisclient.xpending("test.show.time", group.name);
            if (unreadlist.count > 0)
            {
                var groupinfo = _redisclient.xpending("test.show.time", group.name);
                var unreandlist = _redisclient.xpending("test.show.time", group.name, groupinfo.minid, groupinfo.maxid,
                    groupinfo.count);
                foreach (var unre in unreandlist)
                {
                    unreandmsgs.add(unre.id);
                }
            }
        }
    }
    //获取全部的消息
    var allmsgs = _redisclient.xrange("test.show.time", "-", "+");
    foreach (var msg in allmsgs)
    {
        if (unreandmsgs.contains(msg.id))
        {
            //这个消息未读则跳过
            continue;
        }
        //删除已处理的消息
        _redisclient.xdel("test.show.time", msg.id);
    }

    return content($"共处理未读消息:{unreandmsgs.count}个,已读消息{allmsgs.length}个");
}

以上就是.net使用cap实现消息异步处理的详细内容,更多关于.net cap消息处理的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com