Spring观察监听器-ApplicationEventPublisher的publishEvent实现异步事件解耦业务
ApplicationEventPublisher的publishEvent默认是同步操作,但是业务逻辑处理时候需要异步执行其他子业务,异步可能马上联想到线程池和MQ,但是每次手动使用需要单独引用线程池或者mq发布等对象,原本本质只需要 发布事件 → 处理事件 ,其余作为装饰
§ 1.实践
§ 1.1 发布事件工具 - EventUtil
避免每次都需要注入事件发布对象,封装基础的事件调用工具类 EventUtil ,实现统一管理事件的发布和多余的对象注入,缺点是IDEA无法跳转到事件处理的方法
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;/*** 事件工具** @author 北辰微雨*/
@Slf4j
@Component
public class EventUtil {/*** 应用事件发布*/public static ApplicationEventPublisher publisher;@Autowiredpublic void setPublisher(ApplicationEventPublisher publisher) {EventUtil.publisher = publisher;}/*** 同步推送** @param event 触发事件*/public static void pushSynEvent(Object event) {try {log.debug("同步发布事件 Class:{}", event.getClass().getName());publisher.publishEvent(event);} catch (Exception e) {log.error("同步发布事件错误!==> {}", JsonUtil.toJSONString(event), e);}}
}
§ 1.2 事件处理
ApplicationEventPublisher的publishEvent默认是同步操作,可以由同步发布 → 异步执行的策略改造,对于异步操作可以封装一个基础异步组件,即基础事件服务类 BaseEventService
PS:JsonUtil.toJSONString可以替换为自己的json格式化工具
import com.marcus.common.utils.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import javax.annotation.Resource;
import java.util.function.Consumer;/*** 基础事件服务** @author 北辰微雨*/
@Slf4j
public class BaseEventService {/*** 公共线程池*/@Resourceprotected ThreadPoolTaskExecutor commonExecutor;/*** 异步执行某个方法** @param data 调用数据* @param consumer 消费函数* @param 泛型*/protected void asynDo(T data, Consumer consumer) {commonExecutor.submit(() -> {try {log.debug("异步执行事件 Class:{}", data.getClass().getName());consumer.accept(data);} catch (Exception e) {log.error("异步执行事件错误!==> {}", JsonUtil.toJSONString(data), e);}});}
}
业务的事件触发服务 TestEventService,没有事务要求可以使用 @EventListener,这里使用 @TransactionalEventListener 的原因是如果是事务方法提交过来的请求,如果do something操作数据库,会存在两个事务,该枚举可以解决此问题,默认为phase是AFTER_COMMIT(在提交后生效)
支持的事务如下,可以看TransactionPhase枚举
🔹 BEFORE_COMMIT 事务提交前
🔹 AFTER_COMMIT 事务提交后
🔹 AFTER_ROLLBACK 事务回滚后
🔹 AFTER_COMPLETION 事务完成后
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionalEventListener;/*** 测试事件服务** @author 北辰微雨*/
@Component
public class TestEventService extends BaseEventService {/*** 调用基础事件能力** @param event*/@TransactionalEventListener(classes = TestEvent.class)public void event(TestEvent event) {asynDo(event, data -> {// do something});}}
§ 2.原理
§ 2.1 事件发布 - ApplicationEventPublisher → publishEvent
ApplicationEventPublisher实现类的publishEvent方法发布事件,接口 ApplicationEventPublisher默认是推送ApplicationEvent包装的类,重载一个推送对象的类,
@FunctionalInterface
public interface ApplicationEventPublisher {default void publishEvent(ApplicationEvent event) {this.publishEvent((Object)event);}void publishEvent(Object event);
}
接口ApplicationContext继承ApplicationEventPublisher接口,Spring默认实现AbstractApplicationContext,主要校验事件并解析事件类型统一封装应用事件发布
protected void publishEvent(Object event, @Nullable ResolvableType eventType) {//校验event不能为空Assert.notNull(event, "Event must not be null");//接受ApplicationEvent的封装对象Object applicationEvent;//实参event是否是ApplicationEvent实例的多态if (event instanceof ApplicationEvent) {applicationEvent = (ApplicationEvent)event;} else {//如果不是,则会使用ApplicationEvent的PayloadApplicationEvent封装event类applicationEvent = new PayloadApplicationEvent(this, event);if (eventType == null) {eventType = ((PayloadApplicationEvent)applicationEvent).getResolvableType();}}//earlyApplicationEvents是早期应用事件模式if (this.earlyApplicationEvents != null) {this.earlyApplicationEvents.add(applicationEvent);} else {//获取默认实现的事件群播器,并调用群播事件,SpringEvent的发布会走此方法this.getApplicationEventMulticaster().multicastEvent((ApplicationEvent)applicationEvent, eventType);}//父类是否为空if (this.parent != null) {//父类如果也是AbstractApplicationContext调用自身if (this.parent instanceof AbstractApplicationContext) {((AbstractApplicationContext)this.parent).publishEvent(event, eventType);} else {this.parent.publishEvent(event);}}}
§ 2.3 事件群播发布 - SimpleApplicationEventMulticaster → multicastEvent
该springEvent的群播模式,获取ApplicationEventMulticaster对象,Spring默认实现SimpleApplicationEventMulticaster,也就是调用该类的multicastEvent方法,获取当前event的全部监听类,通过是否有线程池选择同步和异步调用监听器
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {//如果eventType不为空选择自己,反之为空如果是ResolvableTypeProvider实现类返回获取不为空的ResolvableType,当前对象不为空封装ResolvableType返回,兜底为ResolvableType的NONEResolvableType type = eventType != null ? eventType : this.resolveDefaultEventType(event);//获取线程池,如果没有自定义线程池,默认为nullExecutor executor = this.getTaskExecutor();//获取event的监听类进行便利Iterator var5 = this.getApplicationListeners(event, type).iterator();//遍历event的监听类while(var5.hasNext()) {//获取当前遍历的event监听类ApplicationListener> listener = (ApplicationListener)var5.next();//如果线程池为空同步调用监听类,如果不为null,则使用异步调用监听类if (executor != null) {//异步调用监听器executor.execute(() -> {this.invokeListener(listener, event);});} else {//同步调用监听器this.invokeListener(listener, event);}}}
§ 2.4 获取全部监听器 - SimpleApplicationEventMulticaster → getApplicationListeners
this.getApplicationListeners(event, type) 先从缓存中获取监听器,如果缓存没有,实时手动获取监听器
protected Collection> getApplicationListeners(ApplicationEvent event, ResolvableType eventType) {//获取事件对象和事件对象类型Object source = event.getSource();Class> sourceType = source != null ? source.getClass() : null;//组装缓存的key,从缓存中获取存在的监听回收器和新的回收器AbstractApplicationEventMulticaster.ListenerCacheKey cacheKey = new AbstractApplicationEventMulticaster.ListenerCacheKey(eventType, sourceType);AbstractApplicationEventMulticaster.CachedListenerRetriever newRetriever = null;AbstractApplicationEventMulticaster.CachedListenerRetriever existingRetriever = (AbstractApplicationEventMulticaster.CachedListenerRetriever)this.retrieverCache.get(cacheKey);//如果缓存存在回收器为null,则初始化newRetriever和existingRetrieverif (existingRetriever == null && (this.beanClassLoader == null || ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) && (sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) {//实例化被缓存的监听回收器newRetriever = new AbstractApplicationEventMulticaster.CachedListenerRetriever();//返回当前key对应的指,如果不存在填充监听回收器,但是返回的还是nullexistingRetriever = (AbstractApplicationEventMulticaster.CachedListenerRetriever)this.retrieverCache.putIfAbsent(cacheKey, newRetriever);//如果existingRetriever不为null,新建监听回收器置为nullif (existingRetriever != null) {newRetriever = null;}}//如果缓存监听器存在,返回事件监听器if (existingRetriever != null) {Collection> result = existingRetriever.getApplicationListeners();if (result != null) {return result;}}//实时获取应用监听器(无缓存)return this.retrieveApplicationListeners(eventType, sourceType, newRetriever);}
§ 2.5 手动实时获取监听器 - SimpleApplicationEventMulticaster → retrieveApplicationListeners
this.retrieveApplicationListeners(eventType, sourceType, newRetriever);
获取全部监听器过滤事件监听器然后排序和缓存返回
private Collection> retrieveApplicationListeners(ResolvableType eventType, @Nullable Class> sourceType, @Nullable AbstractApplicationEventMulticaster.CachedListenerRetriever retriever) {//所有的监听器,作为结果返回List> allListeners = new ArrayList();//filteredListeners和filteredListenerBeans用来缓存被过滤监听器的数据//如果retriever不为null会新创建集合存放Set> filteredListeners = retriever != null ? new LinkedHashSet() : null;Set filteredListenerBeans = retriever != null ? new LinkedHashSet() : null;//存储监听类LinkedHashSet listeners;//存储String类型的Bean名称LinkedHashSet listenerBeans;//当前回收器加锁synchronized(this.defaultRetriever) {//将全部监听类放入listeners中listeners = new LinkedHashSet(this.defaultRetriever.applicationListeners);//自定义监听类放入listenerBeanslistenerBeans = new LinkedHashSet(this.defaultRetriever.applicationListenerBeans);}//获取全部监听器的遍历器Iterator var9 = listeners.iterator();//遍历全部监听器while(var9.hasNext()) {//当前遍历监听器ApplicationListener> listener = (ApplicationListener)var9.next();//筛选是否支持事件的监听类if (this.supportsEvent(listener, eventType, sourceType)) {//如果缓存参数retriever存在,则往缓存集合存入数据if (retriever != null) {filteredListeners.add(listener);}//加入结果中allListeners.add(listener);}}//自定义监听器不为nullif (!listenerBeans.isEmpty()) {//获取bean工程和自定义监听器的遍历器ConfigurableBeanFactory beanFactory = this.getBeanFactory();Iterator var16 = listenerBeans.iterator();//自定义监听器遍历while(var16.hasNext()) {//当前遍历的监听器bean名称String listenerBeanName = (String)var16.next();try {//筛选是否是事件对应的监听类if (this.supportsEvent(beanFactory, listenerBeanName, eventType)) {//以监听器名称通过beanFactory获取监听器ApplicationListener> listener = (ApplicationListener)beanFactory.getBean(listenerBeanName, ApplicationListener.class);//如果结果集合allListeners中不存在该监听类并且是事件对应的监听类if (!allListeners.contains(listener) && this.supportsEvent(listener, eventType, sourceType)) {//如果缓存参数retriever存在,则忘缓存集合填充数据,区分单例if (retriever != null) {if (beanFactory.isSingleton(listenerBeanName)) {filteredListeners.add(listener);} else {filteredListenerBeans.add(listenerBeanName);}}//结果集合填充监听器allListeners.add(listener);}} else {//不是事件监听器//以监听器名称通过beanFactory获取当前单例对象Object listener = beanFactory.getSingleton(listenerBeanName);//如果缓存retriever存在,缓存集合移除数据if (retriever != null) {filteredListeners.remove(listener);}//结果集合移除监听器allListeners.remove(listener);}} catch (NoSuchBeanDefinitionException var13) {}}}//通过order或者实例排序AnnotationAwareOrderComparator.sort(allListeners);//如果缓存retriever存在if (retriever != null) {//如果过滤监听bena是空,全部监听器填充为应用监听器,否则指向被过滤监听器if (filteredListenerBeans.isEmpty()) {retriever.applicationListeners = new LinkedHashSet(allListeners);retriever.applicationListenerBeans = filteredListenerBeans;} else {retriever.applicationListeners = filteredListeners;retriever.applicationListenerBeans = filteredListenerBeans;}}//返回全部监听器return allListeners;}
§ 2.6 事件监听处理准备 - @EventListener / @TransactionalEventListener
AnnotationConfigUtils的 registerAnnotationConfigProcessors ,如果不存在EventListenerFactory和EventListenerMethodProcessor注入其中,其中工厂EventListenerFactory是把 @EventListener 标注的方法变成ApplicationListener
public static Set registerAnnotationConfigProcessors(BeanDefinitionRegistry registry, @Nullable Object source) {//获取对象DefaultListableBeanFactory beanFactory = unwrapDefaultListableBeanFactory(registry);//...//@EventListener注解处理器,如果没有处理器,新建一个放入容器中if (!registry.containsBeanDefinition("org.springframework.context.event.internalEventListenerProcessor")) {def = new RootBeanDefinition(EventListenerMethodProcessor.class);def.setSource(source);beanDefs.add(registerPostProcessor(registry, def, "org.springframework.context.event.internalEventListenerProcessor"));}//内部管理的EventListenerFactory的bean名称,如果没有事件处理工程,新建一个放入容器中if (!registry.containsBeanDefinition("org.springframework.context.event.internalEventListenerFactory")) {def = new RootBeanDefinition(DefaultEventListenerFactory.class);def.setSource(source);beanDefs.add(registerPostProcessor(registry, def, "org.springframework.context.event.internalEventListenerFactory"));}return beanDefs;}
EventListenerMethodProcessor与EventListenerFactory建立关系
public class EventListenerMethodProcessor implements SmartInitializingSingleton, ApplicationContextAware, BeanFactoryPostProcessor {//...//存放事件监听工厂@Nullableprivate List eventListenerFactories;//...//初始化事件监听工厂public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) {//获取bean工厂this.beanFactory = beanFactory;//获取容器中所有的EventListenerFactory,并实例化Map beans = beanFactory.getBeansOfType(EventListenerFactory.class, false, false);//拿到所有的EventListenerFactory集合List factories = new ArrayList(beans.values());//对集合排序AnnotationAwareOrderComparator.sort(factories);//存放到eventListenerFactories中this.eventListenerFactories = factories;}// ...
}
§ 2.7 事件监听处理解析 - @EventListener
EventListenerMethodProcessor在实现了SmartInitializingSingleton的
afterSingletonsInstantiated() ,在单例bean实例化之后执行该方法
public void afterSingletonsInstantiated() {//获取bean工厂和基础校验ConfigurableListableBeanFactory beanFactory = this.beanFactory;Assert.state(this.beanFactory != null, "No ConfigurableListableBeanFactory set");//以Object类型拿到所有的bean名称String[] beanNames = beanFactory.getBeanNamesForType(Object.class);String[] var3 = beanNames;int var4 = beanNames.length;//遍历bean名称for(int var5 = 0; var5 < var4; ++var5) {当前遍历的bean名称String beanName = var3[var5];//当前bean是null或者不是以scopedTarget.开头if (!ScopedProxyUtils.isScopedTarget(beanName)) {Class type = null;try {//拿到代理的真实类type = AutoProxyUtils.determineTargetClass(beanFactory, beanName);} catch (Throwable var10) {if (this.logger.isDebugEnabled()) {this.logger.debug("Could not resolve target class for bean with name '" + beanName + "'", var10);}}//如果拿到真实的类存在if (type != null) {//判断ScopedObject和type相同或者其超类或超接口相同if (ScopedObject.class.isAssignableFrom(type)) {try {//获取创建了ScopedProxy代理类的bean的类型Class> targetClass = AutoProxyUtils.determineTargetClass(beanFactory, ScopedProxyUtils.getTargetBeanName(beanName));if (targetClass != null) {type = targetClass;}} catch (Throwable var11) {if (this.logger.isDebugEnabled()) {this.logger.debug("Could not resolve target bean for scoped proxy '" + beanName + "'", var11);}}}try {//处理bean里面的方法this.processBean(beanName, type);} catch (Throwable var9) {throw new BeanInitializationException("Failed to process @EventListener annotation on bean with name '" + beanName + "'", var9);}}}}}
§ 2.8 事件监听处理处理 - @EventListener
this.processBean(beanName, type); 所有的 @EventListener 方法用EventListenerFactory解析成一个ApplicationListener, @EventListener 方法只要有到一个可以解析他的EventListenerFactory
private void processBean(final String beanName, final Class> targetType) {//没有注解不包含这个类斌,并且这个注解名称以java.开头或者这个类不是java.开头//并且不是spring容器的类if (!this.nonAnnotatedClasses.contains(targetType) && AnnotationUtils.isCandidateClass(targetType, EventListener.class) && !isSpringContainerClass(targetType)) {Map annotatedMethods = null;try {//找到类中用@EventListener标注或者内嵌标注的方法annotatedMethods = MethodIntrospector.selectMethods(targetType, (methodx) -> {return (EventListener)AnnotatedElementUtils.findMergedAnnotation(methodx, EventListener.class);});} catch (Throwable var12) {if (this.logger.isDebugEnabled()) {this.logger.debug("Could not resolve methods for bean with name '" + beanName + "'", var12);}}//被标注的annotatedMethods是否为空集合if (CollectionUtils.isEmpty(annotatedMethods)) {//添加该类型到没有注解的集合类中this.nonAnnotatedClasses.add(targetType);if (this.logger.isTraceEnabled()) {this.logger.trace("No @EventListener annotations found on bean class: " + targetType.getName());}} else {//获取应用上下文ConfigurableApplicationContext context = this.applicationContext;Assert.state(context != null, "No ApplicationContext set");//获取容器中事件的监听工程List factories = this.eventListenerFactories;Assert.state(factories != null, "EventListenerFactory List not initialized");//遍历被@EventListener标注或者内嵌标注的方法Iterator var6 = annotatedMethods.keySet().iterator();while(true) {while(var6.hasNext()) {//当前遍历的方法Method method = (Method)var6.next();遍历所有所有的时间监听的工厂Iterator var8 = factories.iterator();while(var8.hasNext()) {//当前便利的工厂EventListenerFactory factory = (EventListenerFactory)var8.next();//这里默认是true,但是如果是事务监听工程会看是否有注解@TransactionalEventListenerif (factory.supportsMethod(method)) {//aop获取可以执行的方法//ps: 若是JDK的代理类,请不要在实现类里书写@EventListener注解的监听器,否则会报错的(CGLIB代理没有关系)Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName));//利用EventListenerFactory创建ApplicationListenerApplicationListener> applicationListener = factory.createApplicationListener(beanName, targetType, methodToUse);//如果ApplicationListener是ApplicationListenerMethodAdapter类,那么执行其init方法if (applicationListener instanceof ApplicationListenerMethodAdapter) {((ApplicationListenerMethodAdapter)applicationListener).init(context, this.evaluator);}//放到容器中
context.addApplicationListener(applicationListener);//解析完成跳出break;}}}if (this.logger.isDebugEnabled()) {this.logger.debug(annotatedMethods.size() + " @EventListener methods processed on bean '" + beanName + "': " + annotatedMethods);}//解析完成跳出break;}}}}
§ 2.9 事件监听适配初始化 - ApplicationListenerMethodAdapter → init
public class ApplicationListenerMethodAdapter implements GenericApplicationListener {//@EventListener标注的方法private final String beanName;//@EventListener标注的方法private final Method method;//@EventListener标注的真实方法对象,防止其是代理方法private final Method targetMethod;//方法申明,如public void demo.Ball.applicationContextEvent(demo.OrderEvent)private final AnnotatedElementKey methodKey;存储方法的参数private final List declaredEventTypes;//@EventListener的condition@Nullableprivate final String condition;//顺序private final int order;//监听器Id@Nullableprivate volatile String listenerId;//应用上下文@Nullableprivate ApplicationContext applicationContext;//@EventListener的EventExpressionEvaluator@Nullableprivate EventExpressionEvaluator evaluator;public ApplicationListenerMethodAdapter(String beanName, Class> targetClass, Method method) {this.beanName = beanName;this.method = BridgeMethodResolver.findBridgedMethod(method);this.targetMethod = !Proxy.isProxyClass(targetClass) ? AopUtils.getMostSpecificMethod(method, targetClass) : this.method;this.methodKey = new AnnotatedElementKey(this.targetMethod, targetClass);//获取方法上的@EventListener注解对象EventListener ann = (EventListener)AnnotatedElementUtils.findMergedAnnotation(this.targetMethod, EventListener.class);this.declaredEventTypes = resolveDeclaredEventTypes(method, ann);this.condition = ann != null ? ann.condition() : null;this.order = resolveOrder(this.targetMethod);String id = ann != null ? ann.id() : "";this.listenerId = !id.isEmpty() ? id : null;}//...public void onApplicationEvent(ApplicationEvent event) {//处理事件this.processEvent(event);}//...public void processEvent(ApplicationEvent event) {Object[] args = this.resolveArguments(event);//根据@EventListener的condition,判断是否要处理if (this.shouldHandle(event, args)) {//调用方法Object result = this.doInvoke(args);if (result != null) {//如果有监听器可以监听这个结果,那么可以触发那个监听器this.handleResult(result);} else {this.logger.trace("No result object given - no result to handle");}}}}
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
