一、spring实现自定义事件的发布订阅
1、事件定义
/** * 定义事件类型 * * @author lihongmin * @date 2019/11/3 20:30 */ public class orderevent extends applicationevent { public orderevent(object source) { super(source); } }
2、事件监听(泛型)
/** * 订单事件监听 * @author lihongmin * @date 2019/11/3 20:33 */ @component public class ordereventlistener implements applicationlistener<orderevent> { @override public void onapplicationevent(orderevent orderevent) { try { thread.sleep(3000); } catch (interruptedexception e) { e.printstacktrace(); } system.out.println("我受到了一个事件:" + orderevent.getsource()); } }
3、模拟事件发送
/** * 事件触发模拟 * * 我受到了一个事件:我发布了事件!!! * 我执行完毕了!!! * * @author lihongmin * @date 2019/11/3 20:35 */ @controller public class ordereventcontroller implements applicationcontextaware { private applicationcontext applicationcontext; @override public void setapplicationcontext(applicationcontext applicationcontext) throws beansexception { this.applicationcontext = applicationcontext; } @getmapping("publishorderevent") public string publishorderevent() { applicationcontext.publishevent(new orderevent("我发布了事件!!!")); system.out.println("我执行完毕了!!!"); return "发送事件了!"; } }
4、启动项目,调用 127.0.0.1:8080/publishorderevent
我受到了一个事件:我发布了事件!!!
我执行完毕了!!!
总结:事件发送非常的简单,一个事件类型,一个监听,一个触发机制。并且该事件为同步机制(后续在spring boot中可以方便切换为异步)。
二、spring事件驱动原理分析(spring版本为5.1.7)
1、applicationcontext委派applicationeventpublisher发送事件
我们调用的是 applicationcontext的
publishevent(new orderevent("我发布了事件!!!"));
查看applicationcontext 结构,发现调用的是父类 applicationeventpublisher的接口
如下:
public interface applicationcontext extends environmentcapable, listablebeanfactory, hierarchicalbeanfactory, messagesource, applicationeventpublisher, resourcepatternresolver { @nullable string getid(); string getapplicationname(); string getdisplayname(); long getstartupdate(); @nullable applicationcontext getparent(); autowirecapablebeanfactory getautowirecapablebeanfactory() throws illegalstateexception; }
public interface applicationeventpublisher { default void publishevent(applicationevent event) { this.publishevent((object)event); } void publishevent(object var1); }
那么就是其子类 abstractapplicationcontext 实现的发送操作
public void publishevent(object event) { this.publishevent(event, (resolvabletype)null); } protected void publishevent(object event, @nullable resolvabletype eventtype) { assert.notnull(event, "event must not be null"); object applicationevent; if (event instanceof applicationevent) { applicationevent = (applicationevent)event; } else { applicationevent = new payloadapplicationevent(this, event); if (eventtype == null) { eventtype = ((payloadapplicationevent)applicationevent).getresolvabletype(); } } if (this.earlyapplicationevents != null) { this.earlyapplicationevents.add(applicationevent); } else { this.getapplicationeventmulticaster().multicastevent((applicationevent)applicationevent, eventtype); } if (this.parent != null) { if (this.parent instanceof abstractapplicationcontext) { ((abstractapplicationcontext)this.parent).publishevent(event, eventtype); } else { this.parent.publishevent(event); } } }
发现执行到
getapplicationeventmulticaster().multicastevent((applicationevent)applicationevent, eventtype);
那么其实这里算是一个委派模式了(个人认为),spring容器将发送事件委派给 abstractapplicationcontext的applicationeventmulticaster applicationeventmulticaster对象。
2、applicationeventmutulcaster类型的确认和初始化
不难发现(或者对spring applicationcontext比较熟悉的话)是项目启动时,不同类型的applicationcontext(如:classpathxmlapplicationcontext)
在调用父类 abstractapplicationcontext的refresh方法(之前分析过是一个模板方法)时, initapplicationeventmulticaster()
如下:
protected void initapplicationeventmulticaster() { configurablelistablebeanfactory beanfactory = this.getbeanfactory(); if (beanfactory.containslocalbean("applicationeventmulticaster")) { this.applicationeventmulticaster = (applicationeventmulticaster)beanfactory.getbean("applicationeventmulticaster", applicationeventmulticaster.class); if (this.logger.istraceenabled()) { this.logger.trace("using applicationeventmulticaster [" + this.applicationeventmulticaster + "]"); } } else { this.applicationeventmulticaster = new simpleapplicationeventmulticaster(beanfactory); beanfactory.registersingleton("applicationeventmulticaster", this.applicationeventmulticaster); if (this.logger.istraceenabled()) { this.logger.trace("no 'applicationeventmulticaster' bean, using [" + this.applicationeventmulticaster.getclass().getsimplename() + "]"); } } }
逻辑比较简单,在beanfactory中获取名称为 applicationeventmulticaster的bean,当然如果我们没有自定义并且注册为该名称的bean,肯定是获取不到的。
那么会new一个 simpleapplicationeventmulticaster类型的bean注册到容器中。
也就是说上面的getapplicationeventmulticaster()获取到的就是simpleapplicationeventmulticaster。
但是还需要注意使用的是有参数构造进行初始化,如下:
public simpleapplicationeventmulticaster(beanfactory beanfactory) { this.setbeanfactory(beanfactory); }
在父类中实现:
public void setbeanfactory(beanfactory beanfactory) { this.beanfactory = beanfactory; if (beanfactory instanceof configurablebeanfactory) { configurablebeanfactory cbf = (configurablebeanfactory)beanfactory; if (this.beanclassloader == null) { this.beanclassloader = cbf.getbeanclassloader(); } this.retrievalmutex = cbf.getsingletonmutex(); } }
获取bean工厂中所以的所以单例对象放入属性retrievalmutex 中,将类加载器也进行赋值,后续会用到。
3、simpleapplicationeventmulticaster的发送事件方法
public void multicastevent(applicationevent event, @nullable resolvabletype eventtype) { resolvabletype type = eventtype != null ? eventtype : this.resolvedefaulteventtype(event); iterator var4 = this.getapplicationlisteners(event, type).iterator(); while(var4.hasnext()) { applicationlistener<?> listener = (applicationlistener)var4.next(); executor executor = this.gettaskexecutor(); if (executor != null) { executor.execute(() -> { this.invokelistener(listener, event); }); } else { this.invokelistener(listener, event); } } }
分析一下这个方法:
- 1)、获取或确认 resolvabletype 类型
- 2)、根据事件对象和resolvabletype 类型,获取订阅者列表
- 3)、发现如果 simpleapplicationeventmulticaster对象的线程池属性 executor taskexecutor不为null则异步执行监听方法。但是我们看到的是自己new了一个对象,所以如果想 事件监听使用线程池异步执行的话(自己想到应该可以这样玩,自己比较喜欢自定义线程参数,心里有数,当前一般还会设置线程池前缀名称):
@component public class designpatternapplication implements beanfactoryaware { private beanfactory beanfactory; @override public void setbeanfactory(beanfactory beanfactory) throws beansexception { this.beanfactory = beanfactory; } @bean("application_event_multicaster_bean_name") public simpleapplicationeventmulticaster init() { threadpoolexecutor multicasterexecutor = new threadpoolexecutor(5, 5, 60, timeunit.seconds, new linkedblockingdeque<>(), executors.defaultthreadfactory(), new threadpoolexecutor.discardpolicy()); simpleapplicationeventmulticaster multicaster = new simpleapplicationeventmulticaster(); multicaster.settaskexecutor(multicasterexecutor); multicaster.setbeanfactory(beanfactory); return multicaster; } }
- 4)、最后肯定是invokelistener(listener, event);
4、resolvabletype类型确认
首先我们传入的eventtype是null,所以先根据我们传入的对象调用resolvedefaulteventtype方法
如下:
private resolvabletype resolvedefaulteventtype(applicationevent event) { return resolvabletype.forinstance(event); }
再调用,肯定orderevent肯定没有实现resolvabletypeprovider接口:
public static resolvabletype forinstance(object instance) { assert.notnull(instance, "instance must not be null"); if (instance instanceof resolvabletypeprovider) { resolvabletype type = ((resolvabletypeprovider) instance).getresolvabletype(); if (type != null) { return type; } } return resolvabletype.forclass(instance.getclass()); }
再调用:
public static resolvabletype forclass(@nullable class<?> clazz) { return new resolvabletype(clazz); }
所以我们或者到了一个新创建的 resolvabletype 对象,对象的clazz字段为我们的 orderevent。
为什么追这么深,是因为下面就是根据类型来获取监听器的。
5、获取所有的监听列表,并且看看是怎么做到监听泛型类型
protected collection<applicationlistener<?>> getapplicationlisteners(applicationevent event, resolvabletype eventtype) { object source = event.getsource(); class<?> sourcetype = source != null ? source.getclass() : null; abstractapplicationeventmulticaster.listenercachekey cachekey = new abstractapplicationeventmulticaster.listenercachekey(eventtype, sourcetype); abstractapplicationeventmulticaster.listenerretriever retriever = (abstractapplicationeventmulticaster.listenerretriever)this.retrievercache.get(cachekey); if (retriever != null) { return retriever.getapplicationlisteners(); } else if (this.beanclassloader == null || classutils.iscachesafe(event.getclass(), this.beanclassloader) && (sourcetype == null || classutils.iscachesafe(sourcetype, this.beanclassloader))) { object var7 = this.retrievalmutex; synchronized(this.retrievalmutex) { retriever = (abstractapplicationeventmulticaster.listenerretriever)this.retrievercache.get(cachekey); if (retriever != null) { return retriever.getapplicationlisteners(); } else { retriever = new abstractapplicationeventmulticaster.listenerretriever(true); collection<applicationlistener<?>> listeners = this.retrieveapplicationlisteners(eventtype, sourcetype, retriever); this.retrievercache.put(cachekey, retriever); return listeners; } } } else { return this.retrieveapplicationlisteners(eventtype, sourcetype, (abstractapplicationeventmulticaster.listenerretriever)null); } }
在自己的 concurrenthashmap类型的retrievercache缓存中获取,key是根据 orderevent类型和我发送的数据源(当前为string类型)如下:
- map的key:
private static final class listenercachekey implements comparable<abstractapplicationeventmulticaster.listenercachekey> { private final resolvabletype eventtype; @nullable private final class<?> sourcetype; // ..... }
- map的value类型:
private class listenerretriever { public final set<applicationlistener<?>> applicationlisteners = new linkedhashset(); public final set<string> applicationlistenerbeans = new linkedhashset(); private final boolean prefiltered; }
很清楚的结构,两个linkedhashset, 就是为了保证两个set个数相同,并且顺序一一对应。用于存放当前的监听对象和监听的类型。
当前的缓存是在abstractapplicationcontext的refresh的registerbeanpostprocessors(注册所有的beanpostprocess),的最后一步,注册了applicationlistenerdetector类型。
并且在refresh的最后会将所有懒加载的bean都初始化,则会将所有的实现了该接口的bean放入容器中。
则重点是 retrieveapplicationlisteners方法,比较长:
private collection<applicationlistener<?>> retrieveapplicationlisteners(resolvabletype eventtype, @nullable class<?> sourcetype, @nullable abstractapplicationeventmulticaster.listenerretriever retriever) { list<applicationlistener<?>> alllisteners = new arraylist(); object var7 = this.retrievalmutex; linkedhashset listeners; linkedhashset listenerbeans; synchronized(this.retrievalmutex) { listeners = new linkedhashset(this.defaultretriever.applicationlisteners); listenerbeans = new linkedhashset(this.defaultretriever.applicationlistenerbeans); } iterator var14 = listeners.iterator(); while(var14.hasnext()) { applicationlistener<?> listener = (applicationlistener)var14.next(); if (this.supportsevent(listener, eventtype, sourcetype)) { if (retriever != null) { retriever.applicationlisteners.add(listener); } alllisteners.add(listener); } } if (!listenerbeans.isempty()) { beanfactory beanfactory = this.getbeanfactory(); iterator var16 = listenerbeans.iterator(); while(var16.hasnext()) { string listenerbeanname = (string)var16.next(); try { class<?> listenertype = beanfactory.gettype(listenerbeanname); if (listenertype == null || this.supportsevent(listenertype, eventtype)) { applicationlistener<?> listener = (applicationlistener)beanfactory.getbean(listenerbeanname, applicationlistener.class); if (!alllisteners.contains(listener) && this.supportsevent(listener, eventtype, sourcetype)) { if (retriever != null) { if (beanfactory.issingleton(listenerbeanname)) { retriever.applicationlisteners.add(listener); } else { retriever.applicationlistenerbeans.add(listenerbeanname); } } alllisteners.add(listener); } } } catch (nosuchbeandefinitionexception var13) { ; } } } annotationawareordercomparator.sort(alllisteners); if (retriever != null && retriever.applicationlistenerbeans.isempty()) { retriever.applicationlisteners.clear(); retriever.applicationlisteners.addall(alllisteners); } return alllisteners; }
分析该方法,上面锁住的是 retrievalmutex对象,现在又是同步锁该对象。
为了保证linkedhashset中的值不会乱(monitor enter两次exit两次),去缓存中的每个查看每个监听器是否是对象的类型,检查了监听器的泛型对象和事件源类型。
6、根据监听列表,循环调用(同步或异步)
我们实现的 onapplicationevent(orderevent orderevent)方法
protected void invokelistener(applicationlistener<?> listener, applicationevent event) { errorhandler errorhandler = this.geterrorhandler(); if (errorhandler != null) { try { this.doinvokelistener(listener, event); } catch (throwable var5) { errorhandler.handleerror(var5); } } else { this.doinvokelistener(listener, event); } }
所以 errorhandler想在这里处理,则需要在该对象中创建该异常处理器(可以有很多中方式处理,利用bean的生命周期,这是一个很好的扩展点,后续可以去实现),继续 doinvokelistener方法
private void doinvokelistener(applicationlistener listener, applicationevent event) { try { listener.onapplicationevent(event); } catch (classcastexception var6) { string msg = var6.getmessage(); if (msg != null && !this.matchesclasscastmessage(msg, event.getclass())) { throw var6; } log logger = logfactory.getlog(this.getclass()); if (logger.istraceenabled()) { logger.trace("non-matching event type for listener: " + listener, var6); } } }
最后看见 listener.onapplicationevent(event);
it is over!!!
总结
1、applicationcontext发送事件是委托给了一个 spring容器在refresh时初始化的simpleapplicationeventmulticaster bean(由于没有初始化内部线程池对象,所以事件是同步发送的)。
2、发送前先获取事件的resolvabletype类型(当前为orderevent clazz)和事件源类型(当前为string)
3、获取监听者列表。 先去自己bean内部先查询缓存,否则从beanfactory中获取所有单利bean进行匹配(再放入缓存conturrenthashmap)。
4、监听者列表循环(同步或异步)地调用我们自己写的监听方法onapplicationevent。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论