一、spring实现自定义事件的发布订阅
1、事件定义
/**
* 定义事件类型
*
* @author lihongmin
* @date 2019/11/3 20:30
*/
public class orderevent extends applicationevent {
public orderevent(object source) {
super(source);
}
}2、事件监听(泛型)
/**
* 订单事件监听
* @author lihongmin
* @date 2019/11/3 20:33
*/
@component
public class ordereventlistener implements applicationlistener<orderevent> {
@override
public void onapplicationevent(orderevent orderevent) {
try {
thread.sleep(3000);
} catch (interruptedexception e) {
e.printstacktrace();
}
system.out.println("我受到了一个事件:" + orderevent.getsource());
}
}3、模拟事件发送
/**
* 事件触发模拟
*
* 我受到了一个事件:我发布了事件!!!
* 我执行完毕了!!!
*
* @author lihongmin
* @date 2019/11/3 20:35
*/
@controller
public class ordereventcontroller implements applicationcontextaware {
private applicationcontext applicationcontext;
@override
public void setapplicationcontext(applicationcontext applicationcontext) throws beansexception {
this.applicationcontext = applicationcontext;
}
@getmapping("publishorderevent")
public string publishorderevent() {
applicationcontext.publishevent(new orderevent("我发布了事件!!!"));
system.out.println("我执行完毕了!!!");
return "发送事件了!";
}
}4、启动项目,调用 127.0.0.1:8080/publishorderevent
我受到了一个事件:我发布了事件!!!
我执行完毕了!!!
总结:事件发送非常的简单,一个事件类型,一个监听,一个触发机制。并且该事件为同步机制(后续在spring boot中可以方便切换为异步)。
二、spring事件驱动原理分析(spring版本为5.1.7)
1、applicationcontext委派applicationeventpublisher发送事件
我们调用的是 applicationcontext的
publishevent(new orderevent("我发布了事件!!!")); 查看applicationcontext 结构,发现调用的是父类 applicationeventpublisher的接口
如下:
public interface applicationcontext extends environmentcapable, listablebeanfactory,
hierarchicalbeanfactory, messagesource, applicationeventpublisher, resourcepatternresolver {
@nullable
string getid();
string getapplicationname();
string getdisplayname();
long getstartupdate();
@nullable
applicationcontext getparent();
autowirecapablebeanfactory getautowirecapablebeanfactory() throws illegalstateexception;
}public interface applicationeventpublisher {
default void publishevent(applicationevent event) {
this.publishevent((object)event);
}
void publishevent(object var1);
}那么就是其子类 abstractapplicationcontext 实现的发送操作
public void publishevent(object event) {
this.publishevent(event, (resolvabletype)null);
}
protected void publishevent(object event, @nullable resolvabletype eventtype) {
assert.notnull(event, "event must not be null");
object applicationevent;
if (event instanceof applicationevent) {
applicationevent = (applicationevent)event;
} else {
applicationevent = new payloadapplicationevent(this, event);
if (eventtype == null) {
eventtype = ((payloadapplicationevent)applicationevent).getresolvabletype();
}
}
if (this.earlyapplicationevents != null) {
this.earlyapplicationevents.add(applicationevent);
} else {
this.getapplicationeventmulticaster().multicastevent((applicationevent)applicationevent, eventtype);
}
if (this.parent != null) {
if (this.parent instanceof abstractapplicationcontext) {
((abstractapplicationcontext)this.parent).publishevent(event, eventtype);
} else {
this.parent.publishevent(event);
}
}
}发现执行到
getapplicationeventmulticaster().multicastevent((applicationevent)applicationevent, eventtype);
那么其实这里算是一个委派模式了(个人认为),spring容器将发送事件委派给 abstractapplicationcontext的applicationeventmulticaster applicationeventmulticaster对象。
2、applicationeventmutulcaster类型的确认和初始化
不难发现(或者对spring applicationcontext比较熟悉的话)是项目启动时,不同类型的applicationcontext(如:classpathxmlapplicationcontext)
在调用父类 abstractapplicationcontext的refresh方法(之前分析过是一个模板方法)时, initapplicationeventmulticaster()
如下:
protected void initapplicationeventmulticaster() {
configurablelistablebeanfactory beanfactory = this.getbeanfactory();
if (beanfactory.containslocalbean("applicationeventmulticaster")) {
this.applicationeventmulticaster = (applicationeventmulticaster)beanfactory.getbean("applicationeventmulticaster", applicationeventmulticaster.class);
if (this.logger.istraceenabled()) {
this.logger.trace("using applicationeventmulticaster [" + this.applicationeventmulticaster + "]");
}
} else {
this.applicationeventmulticaster = new simpleapplicationeventmulticaster(beanfactory);
beanfactory.registersingleton("applicationeventmulticaster", this.applicationeventmulticaster);
if (this.logger.istraceenabled()) {
this.logger.trace("no 'applicationeventmulticaster' bean, using [" + this.applicationeventmulticaster.getclass().getsimplename() + "]");
}
}
}逻辑比较简单,在beanfactory中获取名称为 applicationeventmulticaster的bean,当然如果我们没有自定义并且注册为该名称的bean,肯定是获取不到的。
那么会new一个 simpleapplicationeventmulticaster类型的bean注册到容器中。
也就是说上面的getapplicationeventmulticaster()获取到的就是simpleapplicationeventmulticaster。
但是还需要注意使用的是有参数构造进行初始化,如下:
public simpleapplicationeventmulticaster(beanfactory beanfactory) {
this.setbeanfactory(beanfactory);
}在父类中实现:
public void setbeanfactory(beanfactory beanfactory) {
this.beanfactory = beanfactory;
if (beanfactory instanceof configurablebeanfactory) {
configurablebeanfactory cbf = (configurablebeanfactory)beanfactory;
if (this.beanclassloader == null) {
this.beanclassloader = cbf.getbeanclassloader();
}
this.retrievalmutex = cbf.getsingletonmutex();
}
}获取bean工厂中所以的所以单例对象放入属性retrievalmutex 中,将类加载器也进行赋值,后续会用到。
3、simpleapplicationeventmulticaster的发送事件方法
public void multicastevent(applicationevent event, @nullable resolvabletype eventtype) {
resolvabletype type = eventtype != null ? eventtype : this.resolvedefaulteventtype(event);
iterator var4 = this.getapplicationlisteners(event, type).iterator();
while(var4.hasnext()) {
applicationlistener<?> listener = (applicationlistener)var4.next();
executor executor = this.gettaskexecutor();
if (executor != null) {
executor.execute(() -> {
this.invokelistener(listener, event);
});
} else {
this.invokelistener(listener, event);
}
}
}分析一下这个方法:
- 1)、获取或确认 resolvabletype 类型
- 2)、根据事件对象和resolvabletype 类型,获取订阅者列表
- 3)、发现如果 simpleapplicationeventmulticaster对象的线程池属性 executor taskexecutor不为null则异步执行监听方法。但是我们看到的是自己new了一个对象,所以如果想 事件监听使用线程池异步执行的话(自己想到应该可以这样玩,自己比较喜欢自定义线程参数,心里有数,当前一般还会设置线程池前缀名称):
@component
public class designpatternapplication implements beanfactoryaware {
private beanfactory beanfactory;
@override
public void setbeanfactory(beanfactory beanfactory) throws beansexception {
this.beanfactory = beanfactory;
}
@bean("application_event_multicaster_bean_name")
public simpleapplicationeventmulticaster init() {
threadpoolexecutor multicasterexecutor = new threadpoolexecutor(5, 5, 60, timeunit.seconds,
new linkedblockingdeque<>(), executors.defaultthreadfactory(), new threadpoolexecutor.discardpolicy());
simpleapplicationeventmulticaster multicaster = new simpleapplicationeventmulticaster();
multicaster.settaskexecutor(multicasterexecutor);
multicaster.setbeanfactory(beanfactory);
return multicaster;
}
}- 4)、最后肯定是invokelistener(listener, event);
4、resolvabletype类型确认
首先我们传入的eventtype是null,所以先根据我们传入的对象调用resolvedefaulteventtype方法
如下:
private resolvabletype resolvedefaulteventtype(applicationevent event) {
return resolvabletype.forinstance(event);
}再调用,肯定orderevent肯定没有实现resolvabletypeprovider接口:
public static resolvabletype forinstance(object instance) {
assert.notnull(instance, "instance must not be null");
if (instance instanceof resolvabletypeprovider) {
resolvabletype type = ((resolvabletypeprovider) instance).getresolvabletype();
if (type != null) {
return type;
}
}
return resolvabletype.forclass(instance.getclass());
}再调用:
public static resolvabletype forclass(@nullable class<?> clazz) {
return new resolvabletype(clazz);
}所以我们或者到了一个新创建的 resolvabletype 对象,对象的clazz字段为我们的 orderevent。
为什么追这么深,是因为下面就是根据类型来获取监听器的。
5、获取所有的监听列表,并且看看是怎么做到监听泛型类型
protected collection<applicationlistener<?>> getapplicationlisteners(applicationevent event, resolvabletype eventtype) {
object source = event.getsource();
class<?> sourcetype = source != null ? source.getclass() : null;
abstractapplicationeventmulticaster.listenercachekey cachekey = new abstractapplicationeventmulticaster.listenercachekey(eventtype, sourcetype);
abstractapplicationeventmulticaster.listenerretriever retriever = (abstractapplicationeventmulticaster.listenerretriever)this.retrievercache.get(cachekey);
if (retriever != null) {
return retriever.getapplicationlisteners();
} else if (this.beanclassloader == null || classutils.iscachesafe(event.getclass(), this.beanclassloader) && (sourcetype == null || classutils.iscachesafe(sourcetype, this.beanclassloader))) {
object var7 = this.retrievalmutex;
synchronized(this.retrievalmutex) {
retriever = (abstractapplicationeventmulticaster.listenerretriever)this.retrievercache.get(cachekey);
if (retriever != null) {
return retriever.getapplicationlisteners();
} else {
retriever = new abstractapplicationeventmulticaster.listenerretriever(true);
collection<applicationlistener<?>> listeners = this.retrieveapplicationlisteners(eventtype, sourcetype, retriever);
this.retrievercache.put(cachekey, retriever);
return listeners;
}
}
} else {
return this.retrieveapplicationlisteners(eventtype, sourcetype, (abstractapplicationeventmulticaster.listenerretriever)null);
}
}在自己的 concurrenthashmap类型的retrievercache缓存中获取,key是根据 orderevent类型和我发送的数据源(当前为string类型)如下:
- map的key:
private static final class listenercachekey implements
comparable<abstractapplicationeventmulticaster.listenercachekey> {
private final resolvabletype eventtype;
@nullable
private final class<?> sourcetype;
// .....
}- map的value类型:
private class listenerretriever {
public final set<applicationlistener<?>> applicationlisteners =
new linkedhashset();
public final set<string> applicationlistenerbeans = new linkedhashset();
private final boolean prefiltered;
}很清楚的结构,两个linkedhashset, 就是为了保证两个set个数相同,并且顺序一一对应。用于存放当前的监听对象和监听的类型。
当前的缓存是在abstractapplicationcontext的refresh的registerbeanpostprocessors(注册所有的beanpostprocess),的最后一步,注册了applicationlistenerdetector类型。
并且在refresh的最后会将所有懒加载的bean都初始化,则会将所有的实现了该接口的bean放入容器中。
则重点是 retrieveapplicationlisteners方法,比较长:
private collection<applicationlistener<?>> retrieveapplicationlisteners(resolvabletype eventtype, @nullable class<?> sourcetype, @nullable abstractapplicationeventmulticaster.listenerretriever retriever) {
list<applicationlistener<?>> alllisteners = new arraylist();
object var7 = this.retrievalmutex;
linkedhashset listeners;
linkedhashset listenerbeans;
synchronized(this.retrievalmutex) {
listeners = new linkedhashset(this.defaultretriever.applicationlisteners);
listenerbeans = new linkedhashset(this.defaultretriever.applicationlistenerbeans);
}
iterator var14 = listeners.iterator();
while(var14.hasnext()) {
applicationlistener<?> listener = (applicationlistener)var14.next();
if (this.supportsevent(listener, eventtype, sourcetype)) {
if (retriever != null) {
retriever.applicationlisteners.add(listener);
}
alllisteners.add(listener);
}
}
if (!listenerbeans.isempty()) {
beanfactory beanfactory = this.getbeanfactory();
iterator var16 = listenerbeans.iterator();
while(var16.hasnext()) {
string listenerbeanname = (string)var16.next();
try {
class<?> listenertype = beanfactory.gettype(listenerbeanname);
if (listenertype == null || this.supportsevent(listenertype, eventtype)) {
applicationlistener<?> listener = (applicationlistener)beanfactory.getbean(listenerbeanname, applicationlistener.class);
if (!alllisteners.contains(listener) && this.supportsevent(listener, eventtype, sourcetype)) {
if (retriever != null) {
if (beanfactory.issingleton(listenerbeanname)) {
retriever.applicationlisteners.add(listener);
} else {
retriever.applicationlistenerbeans.add(listenerbeanname);
}
}
alllisteners.add(listener);
}
}
} catch (nosuchbeandefinitionexception var13) {
;
}
}
}
annotationawareordercomparator.sort(alllisteners);
if (retriever != null && retriever.applicationlistenerbeans.isempty()) {
retriever.applicationlisteners.clear();
retriever.applicationlisteners.addall(alllisteners);
}
return alllisteners;
}分析该方法,上面锁住的是 retrievalmutex对象,现在又是同步锁该对象。
为了保证linkedhashset中的值不会乱(monitor enter两次exit两次),去缓存中的每个查看每个监听器是否是对象的类型,检查了监听器的泛型对象和事件源类型。
6、根据监听列表,循环调用(同步或异步)
我们实现的 onapplicationevent(orderevent orderevent)方法
protected void invokelistener(applicationlistener<?> listener, applicationevent event) {
errorhandler errorhandler = this.geterrorhandler();
if (errorhandler != null) {
try {
this.doinvokelistener(listener, event);
} catch (throwable var5) {
errorhandler.handleerror(var5);
}
} else {
this.doinvokelistener(listener, event);
}
}所以 errorhandler想在这里处理,则需要在该对象中创建该异常处理器(可以有很多中方式处理,利用bean的生命周期,这是一个很好的扩展点,后续可以去实现),继续 doinvokelistener方法
private void doinvokelistener(applicationlistener listener, applicationevent event) {
try {
listener.onapplicationevent(event);
} catch (classcastexception var6) {
string msg = var6.getmessage();
if (msg != null && !this.matchesclasscastmessage(msg, event.getclass())) {
throw var6;
}
log logger = logfactory.getlog(this.getclass());
if (logger.istraceenabled()) {
logger.trace("non-matching event type for listener: " + listener, var6);
}
}
}最后看见 listener.onapplicationevent(event);
it is over!!!
总结
1、applicationcontext发送事件是委托给了一个 spring容器在refresh时初始化的simpleapplicationeventmulticaster bean(由于没有初始化内部线程池对象,所以事件是同步发送的)。
2、发送前先获取事件的resolvabletype类型(当前为orderevent clazz)和事件源类型(当前为string)
3、获取监听者列表。 先去自己bean内部先查询缓存,否则从beanfactory中获取所有单利bean进行匹配(再放入缓存conturrenthashmap)。
4、监听者列表循环(同步或异步)地调用我们自己写的监听方法onapplicationevent。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论