EventBus原理深度解析
2019独角兽企业重金招聘Python工程师标准>>> 
一、问题描述
在工作中,经常会遇见使用异步的方式来发送事件,或者触发另外一个动作:经常用到的框架是MQ(分布式方式通知)。如果是同一个jvm里面通知的话,就可以使用EventBus。由于EventBus使用起来简单、便捷,因此,工作中会经常用到。深入理解该框架的原理就很有必要。
二、框架解析
2.1、组织结构
eventbus的组织结构如下:

eventbus主要有以下几部分组成:
1、eventbus、asyncEventBus:事件发送器。
2、event:事件承载单元。
3、SubscriberRegistry:订阅者注册器,将订阅者注册到event上,即将有注解Subscribe的方法和event绑定起来。
4、Dispatcher:事件分发器,将事件的订阅者调用来执行。
5、Subscriber、SynchronizedSubscriber:订阅者,并发订阅还是同步订阅。
2.2、运行原理
1、eventbus是基于注册监听的方式来运行的,因此,首先需要将eventbus,然后才会有事件及监听者。新建eventbus或者AsyncEventBus的方式如下:
EventBus eventBus = new EventBus(); 或者
BlockingQueue workQueue = new LinkedBlockingQueue<>(20);ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 20,30, TimeUnit.SECONDS, workQueue);AsyncEventBus asyncEventBus = new AsyncEventBus(executor); 2、注册监听者。
eventBus.register(eventListener); 底层就是将类eventListener中所有注解有Subscribe的方法与其Event对放在一个map中(一个event可以对应多个Subscribe的方法)。实现如下:
void register(Object listener) {Multimap, Subscriber> listenerMethods = findAllSubscribers(listener);for (Entry, Collection> entry : listenerMethods.asMap().entrySet()) {Class> eventType = entry.getKey();Collection eventMethodsInListener = entry.getValue();CopyOnWriteArraySet eventSubscribers = subscribers.get(eventType);if (eventSubscribers == null) {CopyOnWriteArraySet newSet = new CopyOnWriteArraySet<>();eventSubscribers =MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);}eventSubscribers.addAll(eventMethodsInListener);}} 3、事件发送:执行指定事件类型的订阅者(包含了method),从订阅者中获取指定事件的订阅者,然后按照规则(同步、异步)执行指定的方法。
public void post(Object event) {Iterator eventSubscribers = subscribers.getSubscribers(event);if (eventSubscribers.hasNext()) {dispatcher.dispatch(event, eventSubscribers);} else if (!(event instanceof DeadEvent)) {// the event had no subscribers and was not itself a DeadEventpost(new DeadEvent(this, event));}} 上述代码说明,如果事件没有监听者,就当作死亡事件来对待。
/** Dispatches {@code event} to this subscriber using the proper executor. */final void dispatchEvent(final Object event) {executor.execute(new Runnable() {@Overridepublic void run() {try {invokeSubscriberMethod(event);} catch (InvocationTargetException e) {bus.handleSubscriberException(e.getCause(), context(event));}}});}void invokeSubscriberMethod(Object event) throws InvocationTargetException {try {method.invoke(target, checkNotNull(event));} catch (IllegalArgumentException e) {throw new Error("Method rejected target/argument: " + event, e);} catch (IllegalAccessException e) {throw new Error("Method became inaccessible: " + event, e);} catch (InvocationTargetException e) {if (e.getCause() instanceof Error) {throw (Error) e.getCause();}throw e;}} 这里就说明,最后就是被订阅的方法被调用。
4、EventBus与AsyncEventBus的区别
从字面上看,AsyncEventBus是异步的EventBus,那么EventBus应该就是同步的了。EventBus的executor为MoreExecutors.directExecutor(),其实现如下:
public static Executor directExecutor() {return DirectExecutor.INSTANCE;}/** See {@link #directExecutor} for behavioral notes. */private enum DirectExecutor implements Executor {INSTANCE;@Overridepublic void execute(Runnable command) {command.run();}@Overridepublic String toString() {return "MoreExecutors.directExecutor()";}} 其execute方法直接执行线程的run方法,即同步调用run方法执行。EventBus的dispatcher为PerThreadQueuedDispatcher。其dispatch方法如下:
@Overridevoid dispatch(Object event, Iterator subscribers) {checkNotNull(event);checkNotNull(subscribers);Queue queueForThread = queue.get();queueForThread.offer(new Event(event, subscribers));if (!dispatching.get()) {dispatching.set(true);try {Event nextEvent;while ((nextEvent = queueForThread.poll()) != null) {while (nextEvent.subscribers.hasNext()) {nextEvent.subscribers.next().dispatchEvent(nextEvent.event);}}} finally {dispatching.remove();queue.remove();}}} dispatchEvent的实现如下:
final void dispatchEvent(final Object event) {executor.execute(new Runnable() {@Overridepublic void run() {try {invokeSubscriberMethod(event);} catch (InvocationTargetException e) {bus.handleSubscriberException(e.getCause(), context(event));}}});} 因此,整个执行过程如下:

整个过程都是同步方式执行,因此,EventBus是同步的。
AsyncEventBus的dispatcher为LegacyAsyncDispatcher,executor为自己指定的线程池。运行流程如下:

虚线为线程池异步调度,因此,AsyncEventBus为异步方式。
5、AllowConcurrentEvents的作用
它所在的代码为:
static Subscriber create(EventBus bus, Object listener, Method method) {return isDeclaredThreadSafe(method)? new Subscriber(bus, listener, method): new SynchronizedSubscriber(bus, listener, method);}private static boolean isDeclaredThreadSafe(Method method) {return method.getAnnotation(AllowConcurrentEvents.class) != null;} 即如果订阅者方法上有注解AllowConcurrentEvents,则返回Subscriber,否则,返回SynchronizedSubscriber。SynchronizedSubscriber的字面意思为同步订阅者,它的实现代码为:
@Overridevoid invokeSubscriberMethod(Object event) throws InvocationTargetException {synchronized (this) {super.invokeSubscriberMethod(event);}} 即没有使用注解AllowConcurrentEvents的订阅者,在并发环境中,都是串行执行。这在高并发环境中,会严重影响性能。
三、使用案例
3.1、eventbus定义
@Configuration
public class ConfigBean {@Beanpublic EventBus executorService() {BlockingQueue workQueue = new LinkedBlockingQueue<>(20);ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 20,30, TimeUnit.SECONDS, workQueue);return new AsyncEventBus(executor);}
} 3.2、注册与事件发送
@Service
public class TestService implements InitializingBean {@Autowiredprivate EventListener eventListener ;@Autowiredprivate EventBus eventBus ;public void postEvent(){eventBus.post(new LoginEvent("iwill","123456"));}@Overridepublic void afterPropertiesSet() throws Exception {eventBus.register(eventListener);}
} 3.3、订阅者定义
package com.iwill.eventBus.listener;import com.google.common.eventbus.Subscribe;
import com.iwill.eventBus.event.LoginEvent;
import com.iwill.eventBus.event.RegisterEvent;
import org.springframework.stereotype.Component;@Component
public class EventListener {@Subscribepublic void subscribeLoginEvent1(LoginEvent event){System.out.println("method 1 : receive login event ");}@Subscribepublic void subscribeLoginEvent2(LoginEvent event){System.out.println("method 2 : receive login event ");}@Subscribepublic void subscribeRegisterEvent(RegisterEvent event){try{Thread.sleep(10000L);}catch (Exception exp){exp.printStackTrace();}System.out.println("method : receive register event ");}
}
四、注意事项
1、在高并发的环境下使用AsyncEventBus时,发送事件可能会出现异常,因为它使用的线程池,当线程池的线程不够用时,会拒绝接收任务,就会执行线程池的拒绝策略,如果需要关注是否提交事件成功,就需要将线程池的拒绝策略设为抛出异常,并且try-catch来捕获异常。如下:
try {eventBus.post(new LoginEvent("iwill", "123456"));}catch (Exception exp){//TODO 落表或者其他处理} 2、本文用到的guava版本如下:
com.google.guava guava 26.0-jre
转载于:https://my.oschina.net/yangjianzhou/blog/2208677
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
