EventBus原理深度解析
一、问题描述
在工作中,经常会遇见使用异步的方式来发送事件,或者触发另外一个动作:经常用到的框架是MQ(分布式方式通知)。如果是同一个jvm里面通知的话,就可以使用EventBus。由于EventBus使用起来简单、便捷,因此,工作中会经常用到。深入理解该框架的原理就很有必要。
二、框架解析
2.1、组织结构
eventbus的组织结构如下:
eventbus主要有以下几部分组成:
1、eventbus、asyncEventBus:事件发送器。
2、event:事件承载单元。
3、SubscriberRegistry:订阅者注册器,将订阅者注册到event上,即将有注解Subscribe的方法和event绑定起来。
4、Dispatcher:事件分发器,将事件的订阅者调用来执行。
5、Subscriber、SynchronizedSubscriber:订阅者,并发订阅还是同步订阅。
2.2、运行原理
1、eventbus是基于注册监听的方式来运行的,因此,首先需要将eventbus,然后才会有事件及监听者。新建eventbus或者AsyncEventBus的方式如下:
EventBus eventBus = new EventBus();
或者
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(20); ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, workQueue); AsyncEventBus asyncEventBus = new AsyncEventBus(executor);
2、注册监听者。
eventBus.register(eventListener);
底层就是将类eventListener中所有注解有Subscribe的方法与其Event对放在一个map中(一个event可以对应多个Subscribe的方法)。实现如下:
void register(Object listener) { Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener); for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { Class<?> eventType = entry.getKey(); Collection<Subscriber> eventMethodsInListener = entry.getValue(); CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); if (eventSubscribers == null) { CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>(); eventSubscribers = MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet); } eventSubscribers.addAll(eventMethodsInListener); } }
3、事件发送:执行指定事件类型的订阅者(包含了method),从订阅者中获取指定事件的订阅者,然后按照规则(同步、异步)执行指定的方法。
public void post(Object event) { Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); if (eventSubscribers.hasNext()) { dispatcher.dispatch(event, eventSubscribers); } else if (!(event instanceof DeadEvent)) { // the event had no subscribers and was not itself a DeadEvent post(new DeadEvent(this, event)); } }
上述代码说明,如果事件没有监听者,就当作死亡事件来对待。
/** Dispatches {@code event} to this subscriber using the proper executor. */ final void dispatchEvent(final Object event) { executor.execute( new Runnable() { @Override public void run() { try { invokeSubscriberMethod(event); } catch (InvocationTargetException e) { bus.handleSubscriberException(e.getCause(), context(event)); } } }); } void invokeSubscriberMethod(Object event) throws InvocationTargetException { try { method.invoke(target, checkNotNull(event)); } catch (IllegalArgumentException e) { throw new Error("Method rejected target/argument: " + event, e); } catch (IllegalAccessException e) { throw new Error("Method became inaccessible: " + event, e); } catch (InvocationTargetException e) { if (e.getCause() instanceof Error) { throw (Error) e.getCause(); } throw e; } }
这里就说明,最后就是被订阅的方法被调用。
4、EventBus与AsyncEventBus的区别
从字面上看,AsyncEventBus是异步的EventBus,那么EventBus应该就是同步的了。EventBus的executor为MoreExecutors.directExecutor(),其实现如下:
public static Executor directExecutor() { return DirectExecutor.INSTANCE; } /** See {@link #directExecutor} for behavioral notes. */ private enum DirectExecutor implements Executor { INSTANCE; @Override public void execute(Runnable command) { command.run(); } @Override public String toString() { return "MoreExecutors.directExecutor()"; } }
其execute方法直接执行线程的run方法,即同步调用run方法执行。EventBus的dispatcher为PerThreadQueuedDispatcher。其dispatch方法如下:
@Override void dispatch(Object event, Iterator<Subscriber> subscribers) { checkNotNull(event); checkNotNull(subscribers); Queue<Event> queueForThread = queue.get(); queueForThread.offer(new Event(event, subscribers)); if (!dispatching.get()) { dispatching.set(true); try { Event nextEvent; while ((nextEvent = queueForThread.poll()) != null) { while (nextEvent.subscribers.hasNext()) { nextEvent.subscribers.next().dispatchEvent(nextEvent.event); } } } finally { dispatching.remove(); queue.remove(); } } }
dispatchEvent的实现如下:
final void dispatchEvent(final Object event) { executor.execute( new Runnable() { @Override public void run() { try { invokeSubscriberMethod(event); } catch (InvocationTargetException e) { bus.handleSubscriberException(e.getCause(), context(event)); } } }); }
因此,整个执行过程如下:
整个过程都是同步方式执行,因此,EventBus是同步的。
AsyncEventBus的dispatcher为LegacyAsyncDispatcher,executor为自己指定的线程池。运行流程如下:
虚线为线程池异步调度,因此,AsyncEventBus为异步方式。
5、AllowConcurrentEvents的作用
它所在的代码为:
static Subscriber create(EventBus bus, Object listener, Method method) { return isDeclaredThreadSafe(method) ? new Subscriber(bus, listener, method) : new SynchronizedSubscriber(bus, listener, method); } private static boolean isDeclaredThreadSafe(Method method) { return method.getAnnotation(AllowConcurrentEvents.class) != null; }
即如果订阅者方法上有注解AllowConcurrentEvents,则返回Subscriber,否则,返回SynchronizedSubscriber。SynchronizedSubscriber的字面意思为同步订阅者,它的实现代码为:
@Override void invokeSubscriberMethod(Object event) throws InvocationTargetException { synchronized (this) { super.invokeSubscriberMethod(event); } }
即没有使用注解AllowConcurrentEvents的订阅者,在并发环境中,都是串行执行。这在高并发环境中,会严重影响性能。
三、使用案例
3.1、eventbus定义
@Configuration public class ConfigBean { @Bean public EventBus executorService() { BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(20); ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, workQueue); return new AsyncEventBus(executor); } }
3.2、注册与事件发送
@Service public class TestService implements InitializingBean { @Autowired private EventListener eventListener ; @Autowired private EventBus eventBus ; public void postEvent(){ eventBus.post(new LoginEvent("iwill","123456")); } @Override public void afterPropertiesSet() throws Exception { eventBus.register(eventListener); } }
3.3、订阅者定义
package com.iwill.eventBus.listener; import com.google.common.eventbus.Subscribe; import com.iwill.eventBus.event.LoginEvent; import com.iwill.eventBus.event.RegisterEvent; import org.springframework.stereotype.Component; @Component public class EventListener { @Subscribe public void subscribeLoginEvent1(LoginEvent event){ System.out.println("method 1 : receive login event "); } @Subscribe public void subscribeLoginEvent2(LoginEvent event){ System.out.println("method 2 : receive login event "); } @Subscribe public void subscribeRegisterEvent(RegisterEvent event){ try{ Thread.sleep(10000L); }catch (Exception exp){ exp.printStackTrace(); } System.out.println("method : receive register event "); } }
四、注意事项
1、在高并发的环境下使用AsyncEventBus时,发送事件可能会出现异常,因为它使用的线程池,当线程池的线程不够用时,会拒绝接收任务,就会执行线程池的拒绝策略,如果需要关注是否提交事件成功,就需要将线程池的拒绝策略设为抛出异常,并且try-catch来捕获异常。如下:
try { eventBus.post(new LoginEvent("iwill", "123456")); }catch (Exception exp){ //TODO 落表或者其他处理 }
2、本文用到的guava版本如下:
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>26.0-jre</version> </dependency>
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
小游戏入门到精通OR放弃?
这里说的小游戏是QQ玩一玩,后面会写微信小游戏... 0、体验QQ轻游戏 需要使用Android手机 登录手Q开启厘米秀 侧滑点击人物形象或者选择任意一好友点击**「+」滑拔一下找到「厘米秀」** 搜索厘米秀 申请体验资格 开启厘米秀 侧滑 好友点「+」或者直接点击人物 游戏入口页 1、平台申请账号 注册很简单,使用已有Q号登录「厘米游戏」开放平台按照流程提交资料审核即可 。开发者接入官方说明文档 「厘米游戏」 开放平台注册提交资料的同时会注册一个相关联的**「QQ服务号」**。游戏中显示的用户信息是通过后台静默授权「QQ服务号」后再通过用户相关的接口获得,这点与微信公众号以及微信小游戏类似。 一句话概括:目前暂未对个人开放,现阶段为邀请码模式。但如果你有好的IP资源或者优秀开发团队是比较好申请的。 与 「微信小游戏」 做比较目前来看最大的优势就是 现阶段游戏中集成广告所得广告费用平台不分成 游戏评级高官方可以让游戏上中心化首页推荐位 上线游戏都需要 「游戏自审自查报告」、「计算机软件著作权登记证书」,如需内购需要提供 「广电总局版号批文」 以及 「文化部备案信息」 2、环境搭建 QQ...
- 下一篇
Java JDK 11:现在可以使用所有新功能
删除了CORBA,Java EE和JavaFX支持,但添加了十几个主要新功能 目录 哪里可以下载JDK 11 Java 11 JDK中的新功能 从Java JDK 11中删除了什么 Java Development Kit(JDK)11现已普遍可用,可供生产使用,提高了工作效率,并提供了实现HTTP / 2的HTTP客户端API。 Java Standard Edition(SE)版本11有16个主要功能更改。Java 11还通过删除CORBA和Java EE(最近更名为Jakarta EE)模块以及删除JavaFX而失去了一些功能,JavaFX现在可作为独立技术使用。 在Java 11中,Oracle已将主线存储库jdk / jdk指定到jdk / jdk11稳定存储库。推送到jdk / jdk或jdk / client的更改将被标记为JDK 12。稳定存储库可以接受选定的错误修复,如果获得批准,则可以做为JDK发布过程中的后期增强功能。 Oracle标准Java实施的最新版本是长期支持(LTS)版本,该版本将获得Oracle的商业支持至少八年。错误修复和安全更新将在2026年之前...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Linux系统CentOS6、CentOS7手动修改IP地址
- Docker安装Oracle12C,快速搭建Oracle学习环境
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- CentOS6,CentOS7官方镜像安装Oracle11G
- CentOS7安装Docker,走上虚拟化容器引擎之路
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- 设置Eclipse缩进为4个空格,增强代码规范
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- 2048小游戏-低调大师作品