当前位置: 代码网 > it编程>编程语言>C# > 详解C#如何实现一个事件总线

详解C#如何实现一个事件总线

2024年11月25日 C# 我要评论
使用 c# 实现一个 event busevent bus(事件总线)是一种用于在应用程序内部或跨应用程序组件之间进行事件通信的机制。它允许不同的组件通过发布和订阅事件来进行解耦和通信。在给定的代码片

使用 c# 实现一个 event bus

event bus(事件总线)是一种用于在应用程序内部或跨应用程序组件之间进行事件通信的机制。

它允许不同的组件通过发布和订阅事件来进行解耦和通信。在给定的代码片段中,我们可以看到一个使用c#实现的event bus。它定义了一些接口和类来实现事件的发布和订阅。

首先,我们有两个基本的约束接口:ieventiasynceventhandler<tevent>

ievent是一个空接口,用于约束事件的类型。iasynceventhandler<tevent>是一个泛型接口,用于约束事件处理程序的类型。它定义了处理事件的异步方法handleasync和处理异常的方法handleexception。接下来,我们有一个ieventbus接口,它定义了一些操作方法用于发布和订阅事件。

其中,publish<tevent>publishasync<tevent>方法用于发布事件,而onsubscribe<tevent>方法用于订阅事件。然后,我们看到一个实现了本地事件总线的类localeventbusmanager<tevent>。它实现了ilocaleventbusmanager<tevent>接口,用于在单一管道内处理本地事件。它使用了一个channel<tevent>来存储事件,并提供了发布事件的方法publishpublishasync。此外,它还提供了一个自动处理事件的方法autohandle

总的来说event bus提供了一种方便的方式来实现组件之间的松耦合通信。

通过发布和订阅事件,组件可以独立地进行操作,而不需要直接依赖于彼此的实现细节。

这种机制可以提高代码的可维护性和可扩展性。

github仓库地址:https://github.com/donpangpang/soda-event-bus

实现一些基本约束

先实现一些约束,实现ievent约束事件,实现iasyncevnethandler<tevent> where tevent:ievent来约束事件的处理程序。

public interface ievent
{

}

public interface iasynceventhandler<in tevent> where tevent : ievent
{
    task handleasync(ievent @event);

    void handleexception(ievent @event, exception ex);
}

接下来规定一下咱们的ieventbus,会有哪些操作方法。基本就是发布和订阅。

public interface ieventbus
{
    void publish<tevent>(tevent @event) where tevent : ievent;
    task publishasync<tevent>(tevent @event) where tevent : ievent;

    void onsubscribe<tevent>() where tevent : ievent;
}

实现一个本地事件总线

本地事件处理

本地事件的处理我打算采用两种方式实现,一种是localeventbusmanager即本地事件管理,第二种是localeventbuspool池化本地事件。

localevnetbusmanager

localeventbusmanager主要在单一管道内进行处理,集中进行消费。

public interface ilocaleventbusmanager<in tevent>where tevent : ievent
{
    void publish(tevent @event);
    task publishasync(tevent @event) ;
    
    void autohandle();
}

public class localeventbusmanager<tevent>(iserviceprovider serviceprovider):ilocaleventbusmanager<tevent>
    where tevent: ievent
{
    readonly iserviceprovider _servicesprovider = serviceprovider;

    private readonly channel<tevent> _eventchannel = channel.createunbounded<tevent>();

    public void publish(tevent @event)
    {
        debug.assert(_eventchannel != null, nameof(_eventchannel) + " != null");
        _eventchannel.writer.writeasync(@event);
    }

    private cancellationtokensource cts { get; } = new();

    public void cancel()
    {
        cts.cancel();
    }
    
    public async task publishasync(tevent @event)
    {
        await _eventchannel.writer.writeasync(@event);
    }

    public void autohandle()
    {
        // 确保只启动一次
        if (!cts.iscancellationrequested) return;

        task.run(async () =>
        {
            while (!cts.iscancellationrequested)
            {
                var reader = await _eventchannel.reader.readasync();
                await handleasync(reader);
            }
        }, cts.token);
    }

    async task handleasync(tevent @event)
    {
        var handler = _servicesprovider.getservice<iasynceventhandler<tevent>>();

        if (handler is null)
        {
            throw new nullreferenceexception($"no handler for event {@event.gettype().name}");
        }
        try
        {
            await handler.handleasync(@event);
        }
        catch (exception ex)
        {
            handler.handleexception( @event, ex);
        }
    }
}

localeventbuspool

localeventbuspool即所有的event都会有一个单独的管道处理,单独消费处理,并行能力更好一些。

public sealed class localeventbuspool(iserviceprovider serviceprovider)
{
    private readonly iserviceprovider _serviceprovider = serviceprovider;

    private class channelkey
    {
        public required string key { get; init; }
        public int subscribers { get; set; }

        public override bool equals(object? obj)
        {
            if (obj is channelkey key)
            {
                return string.equals(key.key, key, stringcomparison.ordinalignorecase);
            }

            return false;
        }

        public override int gethashcode()
        {
            return 0;
        }
    }

    private channel<ievent> rent(string channel)
    {
        _channels.trygetvalue(new channelkey() { key = channel }, out var value);

        if (value != null) return value;
        value = channel.createunbounded<ievent>();
        _channels.tryadd(new channelkey() { key = channel }, value);
        return value;
    }

    private channel<ievent> rent(channelkey channelkey)
    {
        _channels.trygetvalue(channelkey, out var value);
        if (value != null) return value;
        value = channel.createunbounded<ievent>();
        _channels.tryadd(channelkey, value);
        return value;
    }

    private readonly concurrentdictionary<channelkey, channel<ievent>> _channels = new();

    private cancellationtokensource cts { get; } = new();

    public void cancel()
    {
        cts.cancel();
        _channels.clear();
        cts.tryreset();
    }

    public async task publishasync<tevent>(tevent @event) where tevent : ievent
    {
        await rent(typeof(tevent).name).writer.writeasync(@event);
    }

    public void publish<tevent>(tevent @event) where tevent : ievent
    {
        rent(typeof(tevent).name).writer.trywrite(@event);
    }

    public void onsubscribe<tevent>() where tevent : ievent
    {
        var channelkey = _channels.firstordefault(x => x.key.key == typeof(tevent).name).key ??
                         new channelkey() { key = typeof(tevent).name };
        channelkey.subscribers++;

        task.run(async () =>
        {
            try
            {
                while (!cts.iscancellationrequested)
                {
                    var @event = await readasync(channelkey);

                    var handler = _serviceprovider.getservice<iasynceventhandler<tevent>>();
                    if (handler == null) throw new nullreferenceexception($"no handler for event {typeof(tevent).name}");
                    try
                    {
                        await handler.handleasync((tevent)@event);
                    }
                    catch (exception ex)
                    {
                        handler.handleexception((tevent)@event, ex);
                    }
                }
            }
            catch (exception e)
            {
                throw new invalidoperationexception("error on onsubscribe handler", e);
            }
        }, cts.token);
    }

    private async task<ievent> readasync(string channel)
    {
        return await rent(channel).reader.readasync(cts.token);
    }

    private async task<ievent> readasync(channelkey channel)
    {
        return await rent(channel).reader.readasync(cts.token);
    }
}

localeventbus

实现localeventbus继承自ieventbus即可,如果有需要扩展的方法自行添加,池化和管理器的情况单独处理。

public interface ilocaleventbus: ieventbus
{

}
public class localeventbus(iserviceprovider serviceprovider, localeventbusoptions options) : ilocaleventbus
{
    private  localeventbuspool? eventbuspool => serviceprovider.getservice<localeventbuspool>();
    
    
    public void publish<tevent>(tevent @event) where tevent : ievent
    {
        if (options.pool)
        {
            debug.assert(eventbuspool != null, nameof(eventbuspool) + " != null");
            eventbuspool.publish(@event);
        }
        else
        {
            var manager = serviceprovider.getservice<localeventbusmanager<tevent>>();
            if (manager is null) throw new nullreferenceexception($"no manager for event {typeof(tevent).name}, please add singleton service it.");
            manager.publish(@event);
        }
    }

    public async task publishasync<tevent>(tevent @event) where tevent : ievent
    {
        if (options.pool)
        {
            debug.assert(eventbuspool != null, nameof(eventbuspool) + " != null");
            await eventbuspool.publishasync(@event);
        }
        else
        {
            var manager = serviceprovider.getservice<localeventbusmanager<tevent>>();
            if (manager is null) throw new nullreferenceexception($"no manager for event {typeof(tevent).name}, please add singleton service it.");
            await manager.publishasync(@event);
        }
    }

    public void onsubscribe<tevent>() where tevent : ievent
    {
        if (options.pool)
        {
            debug.assert(eventbuspool != null, nameof(eventbuspool) + " != null");
            eventbuspool.onsubscribe<tevent>();
        }
        else
        {
            var manager = serviceprovider.getservice<localeventbusmanager<tevent>>();
            if (manager is null) throw new nullreferenceexception($"no manager for event {typeof(tevent).name}, please add singleton service it.");
            manager.autohandle();
        }
    }
}

分布式事件总线

根据需要扩展即可,基本逻辑相同,但可能需要增加确认机制等。

最后

到此这篇关于详解c#如何实现一个事件总线的文章就介绍到这了,更多相关c#实现事件总线内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com