当前位置: 代码网 > it编程>编程语言>Java > Spring源码之事件监听机制(实现EventListener接口方式)

Spring源码之事件监听机制(实现EventListener接口方式)

2024年08月13日 Java 我要评论
一、spring实现自定义事件的发布订阅github地址为:https://github.com/kevin-lihongmin/designpattern/tree/master/src/main/

一、spring实现自定义事件的发布订阅

github地址为:https://github.com/kevin-lihongmin/designpattern/tree/master/src/main/java/com/kevin/designpattern/headfirst/observer/spring

1、事件定义

/**
 *  定义事件类型
 *
 * @author lihongmin
 * @date 2019/11/3 20:30
 */
public class orderevent extends applicationevent {

    public orderevent(object source) {
        super(source);
    }
}

2、事件监听(泛型)

/**
 *  订单事件监听
 * @author lihongmin
 * @date 2019/11/3 20:33
 */
@component
public class ordereventlistener implements applicationlistener<orderevent> {

    @override
    public void onapplicationevent(orderevent orderevent) {
        try {
            thread.sleep(3000);
        } catch (interruptedexception e) {
            e.printstacktrace();
        }
        system.out.println("我受到了一个事件:" + orderevent.getsource());
    }
}

3、模拟事件发送

/**
 *  事件触发模拟
 *
 *  我受到了一个事件:我发布了事件!!!
 *  我执行完毕了!!!
 *
 * @author lihongmin
 * @date 2019/11/3 20:35
 */
@controller
public class ordereventcontroller implements applicationcontextaware {

    private applicationcontext applicationcontext;

    @override
    public void setapplicationcontext(applicationcontext applicationcontext) throws beansexception {
        this.applicationcontext = applicationcontext;
    }

    @getmapping("publishorderevent")
    public string publishorderevent() {
        applicationcontext.publishevent(new orderevent("我发布了事件!!!"));
        system.out.println("我执行完毕了!!!");
        return "发送事件了!";
    }
}

4、启动项目,调用 127.0.0.1:8080/publishorderevent

我受到了一个事件:我发布了事件!!!

我执行完毕了!!!

总结:事件发送非常的简单,一个事件类型,一个监听,一个触发机制。并且该事件为同步机制(后续在spring boot中可以方便切换为异步)。

二、spring事件驱动原理分析(spring版本为5.1.7)

1、applicationcontext委派applicationeventpublisher发送事件

我们调用的是 applicationcontext的

publishevent(new orderevent("我发布了事件!!!")); 

查看applicationcontext 结构,发现调用的是父类 applicationeventpublisher的接口

如下:

public interface applicationcontext extends environmentcapable, listablebeanfactory,
 hierarchicalbeanfactory, messagesource, applicationeventpublisher, resourcepatternresolver {
    @nullable
    string getid();

    string getapplicationname();

    string getdisplayname();

    long getstartupdate();

    @nullable
    applicationcontext getparent();

    autowirecapablebeanfactory getautowirecapablebeanfactory() throws illegalstateexception;
}
public interface applicationeventpublisher {
    default void publishevent(applicationevent event) {
        this.publishevent((object)event);
    }

    void publishevent(object var1);
}

那么就是其子类 abstractapplicationcontext 实现的发送操作

public void publishevent(object event) {
        this.publishevent(event, (resolvabletype)null);
    }

    protected void publishevent(object event, @nullable resolvabletype eventtype) {
        assert.notnull(event, "event must not be null");
        object applicationevent;
        if (event instanceof applicationevent) {
            applicationevent = (applicationevent)event;
        } else {
            applicationevent = new payloadapplicationevent(this, event);
            if (eventtype == null) {
                eventtype = ((payloadapplicationevent)applicationevent).getresolvabletype();
            }
        }

        if (this.earlyapplicationevents != null) {
            this.earlyapplicationevents.add(applicationevent);
        } else {
            this.getapplicationeventmulticaster().multicastevent((applicationevent)applicationevent, eventtype);
        }

        if (this.parent != null) {
            if (this.parent instanceof abstractapplicationcontext) {
                ((abstractapplicationcontext)this.parent).publishevent(event, eventtype);
            } else {
                this.parent.publishevent(event);
            }
        }

    }

发现执行到

getapplicationeventmulticaster().multicastevent((applicationevent)applicationevent, eventtype); 

那么其实这里算是一个委派模式了(个人认为),spring容器将发送事件委派给 abstractapplicationcontext的applicationeventmulticaster applicationeventmulticaster对象。

2、applicationeventmutulcaster类型的确认和初始化

不难发现(或者对spring applicationcontext比较熟悉的话)是项目启动时,不同类型的applicationcontext(如:classpathxmlapplicationcontext)

在调用父类 abstractapplicationcontext的refresh方法(之前分析过是一个模板方法)时, initapplicationeventmulticaster()

如下:

protected void initapplicationeventmulticaster() {
        configurablelistablebeanfactory beanfactory = this.getbeanfactory();
        if (beanfactory.containslocalbean("applicationeventmulticaster")) {
            this.applicationeventmulticaster = (applicationeventmulticaster)beanfactory.getbean("applicationeventmulticaster", applicationeventmulticaster.class);
            if (this.logger.istraceenabled()) {
                this.logger.trace("using applicationeventmulticaster [" + this.applicationeventmulticaster + "]");
            }
        } else {
            this.applicationeventmulticaster = new simpleapplicationeventmulticaster(beanfactory);
            beanfactory.registersingleton("applicationeventmulticaster", this.applicationeventmulticaster);
            if (this.logger.istraceenabled()) {
                this.logger.trace("no 'applicationeventmulticaster' bean, using [" + this.applicationeventmulticaster.getclass().getsimplename() + "]");
            }
        }

    }

逻辑比较简单,在beanfactory中获取名称为 applicationeventmulticaster的bean,当然如果我们没有自定义并且注册为该名称的bean,肯定是获取不到的。

那么会new一个 simpleapplicationeventmulticaster类型的bean注册到容器中。

也就是说上面的getapplicationeventmulticaster()获取到的就是simpleapplicationeventmulticaster

但是还需要注意使用的是有参数构造进行初始化,如下:

public simpleapplicationeventmulticaster(beanfactory beanfactory) {
    this.setbeanfactory(beanfactory);
}

在父类中实现:

public void setbeanfactory(beanfactory beanfactory) {
	this.beanfactory = beanfactory;
	if (beanfactory instanceof configurablebeanfactory) {
		configurablebeanfactory cbf = (configurablebeanfactory)beanfactory;
		if (this.beanclassloader == null) {
			this.beanclassloader = cbf.getbeanclassloader();
		}

		this.retrievalmutex = cbf.getsingletonmutex();
	}

}

获取bean工厂中所以的所以单例对象放入属性retrievalmutex 中,将类加载器也进行赋值,后续会用到。

3、simpleapplicationeventmulticaster的发送事件方法

public void multicastevent(applicationevent event, @nullable resolvabletype eventtype) {
        resolvabletype type = eventtype != null ? eventtype : this.resolvedefaulteventtype(event);
        iterator var4 = this.getapplicationlisteners(event, type).iterator();

        while(var4.hasnext()) {
            applicationlistener<?> listener = (applicationlistener)var4.next();
            executor executor = this.gettaskexecutor();
            if (executor != null) {
                executor.execute(() -> {
                    this.invokelistener(listener, event);
                });
            } else {
                this.invokelistener(listener, event);
            }
        }

    }

分析一下这个方法:

  • 1)、获取或确认 resolvabletype 类型
  • 2)、根据事件对象和resolvabletype 类型,获取订阅者列表
  • 3)、发现如果 simpleapplicationeventmulticaster对象的线程池属性 executor taskexecutor不为null则异步执行监听方法。但是我们看到的是自己new了一个对象,所以如果想 事件监听使用线程池异步执行的话(自己想到应该可以这样玩,自己比较喜欢自定义线程参数,心里有数,当前一般还会设置线程池前缀名称):
@component
public class designpatternapplication implements beanfactoryaware {

	private beanfactory beanfactory;
	
	@override
	public void setbeanfactory(beanfactory beanfactory) throws beansexception {
		this.beanfactory = beanfactory;
	}
	
	@bean("application_event_multicaster_bean_name")
	public simpleapplicationeventmulticaster init() {
		threadpoolexecutor multicasterexecutor = new threadpoolexecutor(5, 5, 60, timeunit.seconds,
				new linkedblockingdeque<>(), executors.defaultthreadfactory(), new threadpoolexecutor.discardpolicy());
		simpleapplicationeventmulticaster multicaster = new simpleapplicationeventmulticaster();
		multicaster.settaskexecutor(multicasterexecutor);
		multicaster.setbeanfactory(beanfactory);
		return multicaster;
	}
}
  • 4)、最后肯定是invokelistener(listener, event);

4、resolvabletype类型确认

首先我们传入的eventtype是null,所以先根据我们传入的对象调用resolvedefaulteventtype方法

如下:

private resolvabletype resolvedefaulteventtype(applicationevent event) {
    return resolvabletype.forinstance(event);
}

再调用,肯定orderevent肯定没有实现resolvabletypeprovider接口:

public static resolvabletype forinstance(object instance) {
	assert.notnull(instance, "instance must not be null");
	if (instance instanceof resolvabletypeprovider) {
		resolvabletype type = ((resolvabletypeprovider) instance).getresolvabletype();
		if (type != null) {
			return type;
		}
	}
	return resolvabletype.forclass(instance.getclass());
}

再调用:

public static resolvabletype forclass(@nullable class<?> clazz) {
	return new resolvabletype(clazz);
}

所以我们或者到了一个新创建的 resolvabletype 对象,对象的clazz字段为我们的 orderevent。

为什么追这么深,是因为下面就是根据类型来获取监听器的。

5、获取所有的监听列表,并且看看是怎么做到监听泛型类型

protected collection<applicationlistener<?>> getapplicationlisteners(applicationevent event, resolvabletype eventtype) {
        object source = event.getsource();
        class<?> sourcetype = source != null ? source.getclass() : null;
        abstractapplicationeventmulticaster.listenercachekey cachekey = new abstractapplicationeventmulticaster.listenercachekey(eventtype, sourcetype);
        abstractapplicationeventmulticaster.listenerretriever retriever = (abstractapplicationeventmulticaster.listenerretriever)this.retrievercache.get(cachekey);
        if (retriever != null) {
            return retriever.getapplicationlisteners();
        } else if (this.beanclassloader == null || classutils.iscachesafe(event.getclass(), this.beanclassloader) && (sourcetype == null || classutils.iscachesafe(sourcetype, this.beanclassloader))) {
            object var7 = this.retrievalmutex;
            synchronized(this.retrievalmutex) {
                retriever = (abstractapplicationeventmulticaster.listenerretriever)this.retrievercache.get(cachekey);
                if (retriever != null) {
                    return retriever.getapplicationlisteners();
                } else {
                    retriever = new abstractapplicationeventmulticaster.listenerretriever(true);
                    collection<applicationlistener<?>> listeners = this.retrieveapplicationlisteners(eventtype, sourcetype, retriever);
                    this.retrievercache.put(cachekey, retriever);
                    return listeners;
                }
            }
        } else {
            return this.retrieveapplicationlisteners(eventtype, sourcetype, (abstractapplicationeventmulticaster.listenerretriever)null);
        }
    }

在自己的 concurrenthashmap类型的retrievercache缓存中获取,key是根据 orderevent类型和我发送的数据源(当前为string类型)如下:

  • map的key:
private static final class listenercachekey implements 
comparable<abstractapplicationeventmulticaster.listenercachekey> {
    private final resolvabletype eventtype;
    @nullable
    private final class<?> sourcetype;

    // .....
}
  • map的value类型:
private class listenerretriever {
    public final set<applicationlistener<?>> applicationlisteners = 
        new linkedhashset();
    public final set<string> applicationlistenerbeans = new linkedhashset();
    private final boolean prefiltered;
}

很清楚的结构,两个linkedhashset, 就是为了保证两个set个数相同,并且顺序一一对应。用于存放当前的监听对象和监听的类型。

当前的缓存是在abstractapplicationcontext的refresh的registerbeanpostprocessors(注册所有的beanpostprocess),的最后一步,注册了applicationlistenerdetector类型。

并且在refresh的最后会将所有懒加载的bean都初始化,则会将所有的实现了该接口的bean放入容器中。

则重点是 retrieveapplicationlisteners方法,比较长:

private collection<applicationlistener<?>> retrieveapplicationlisteners(resolvabletype eventtype, @nullable class<?> sourcetype, @nullable abstractapplicationeventmulticaster.listenerretriever retriever) {
	list<applicationlistener<?>> alllisteners = new arraylist();
	object var7 = this.retrievalmutex;
	linkedhashset listeners;
	linkedhashset listenerbeans;
	synchronized(this.retrievalmutex) {
		listeners = new linkedhashset(this.defaultretriever.applicationlisteners);
		listenerbeans = new linkedhashset(this.defaultretriever.applicationlistenerbeans);
	}

	iterator var14 = listeners.iterator();

	while(var14.hasnext()) {
		applicationlistener<?> listener = (applicationlistener)var14.next();
		if (this.supportsevent(listener, eventtype, sourcetype)) {
			if (retriever != null) {
				retriever.applicationlisteners.add(listener);
			}

			alllisteners.add(listener);
		}
	}

	if (!listenerbeans.isempty()) {
		beanfactory beanfactory = this.getbeanfactory();
		iterator var16 = listenerbeans.iterator();

		while(var16.hasnext()) {
			string listenerbeanname = (string)var16.next();

			try {
				class<?> listenertype = beanfactory.gettype(listenerbeanname);
				if (listenertype == null || this.supportsevent(listenertype, eventtype)) {
					applicationlistener<?> listener = (applicationlistener)beanfactory.getbean(listenerbeanname, applicationlistener.class);
					if (!alllisteners.contains(listener) && this.supportsevent(listener, eventtype, sourcetype)) {
						if (retriever != null) {
							if (beanfactory.issingleton(listenerbeanname)) {
								retriever.applicationlisteners.add(listener);
							} else {
								retriever.applicationlistenerbeans.add(listenerbeanname);
							}
						}

						alllisteners.add(listener);
					}
				}
			} catch (nosuchbeandefinitionexception var13) {
				;
			}
		}
	}

	annotationawareordercomparator.sort(alllisteners);
	if (retriever != null && retriever.applicationlistenerbeans.isempty()) {
		retriever.applicationlisteners.clear();
		retriever.applicationlisteners.addall(alllisteners);
	}

	return alllisteners;
}

分析该方法,上面锁住的是 retrievalmutex对象,现在又是同步锁该对象。

为了保证linkedhashset中的值不会乱(monitor enter两次exit两次),去缓存中的每个查看每个监听器是否是对象的类型,检查了监听器的泛型对象和事件源类型。

6、根据监听列表,循环调用(同步或异步)

我们实现的 onapplicationevent(orderevent orderevent)方法

protected void invokelistener(applicationlistener<?> listener, applicationevent event) {
	errorhandler errorhandler = this.geterrorhandler();
	if (errorhandler != null) {
		try {
			this.doinvokelistener(listener, event);
		} catch (throwable var5) {
			errorhandler.handleerror(var5);
		}
	} else {
		this.doinvokelistener(listener, event);
	}

}

所以 errorhandler想在这里处理,则需要在该对象中创建该异常处理器(可以有很多中方式处理,利用bean的生命周期,这是一个很好的扩展点,后续可以去实现),继续 doinvokelistener方法

private void doinvokelistener(applicationlistener listener, applicationevent event) {
	try {
		listener.onapplicationevent(event);
	} catch (classcastexception var6) {
		string msg = var6.getmessage();
		if (msg != null && !this.matchesclasscastmessage(msg, event.getclass())) {
			throw var6;
		}

		log logger = logfactory.getlog(this.getclass());
		if (logger.istraceenabled()) {
			logger.trace("non-matching event type for listener: " + listener, var6);
		}
	}

}

最后看见 listener.onapplicationevent(event);

it is over!!!

总结

1、applicationcontext发送事件是委托给了一个 spring容器在refresh时初始化的simpleapplicationeventmulticaster bean(由于没有初始化内部线程池对象,所以事件是同步发送的)。

2、发送前先获取事件的resolvabletype类型(当前为orderevent clazz)和事件源类型(当前为string)

3、获取监听者列表。 先去自己bean内部先查询缓存,否则从beanfactory中获取所有单利bean进行匹配(再放入缓存conturrenthashmap)。

4、监听者列表循环(同步或异步)地调用我们自己写的监听方法onapplicationevent。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com