Hadoop Yarn事件处理框架源码分析

由于想在项目中使用类似yarn的事件处理机制,就看了实现。主要是由Dispatcher.java,EventHandler.java,Service.java这3个类撑起来的。

 在事件处理之前,先注册相应的事件处理handler,收到事件event后,由派发事件的Dispatcher进行派发,默认采用异步事件处理方式将事件放到事件队列(LinkedBlockingQueue)中,消费者会循环从队列中取出事件进行处理。

要使用事件处理,首先需要创建Dispatcher,示例代码如下:

 dispatcher = new AsyncDispatcher();//创建
  addIfService(dispatcher);// 由于继承AbstractService,可以方便完成服务统一管理,比如初始化和资源释放等操作
  dispatcher.register(EventType.class,new EventHandler());//注册对应的事件处理方法

然后通过AsyncDispatcher调用getEventHandler()返回的EventHandler的处理对应事件,AsyncDispatcher类的getEventHandler()方法如下:

@Override
  public EventHandler getEventHandler() {
    if (handlerInstance == null) {
      handlerInstance = new GenericEventHandler();//如果没有注册生产事件处理,就走通用事件处理
    }
    return handlerInstance;
  }
class GenericEventHandler implements EventHandler<Event> {
    public void handle(Event event) {
      if (blockNewEvents) {
        return;
      }


      /* all this method does is enqueue all the events onto the queue */
      int qSize = eventQueue.size();
      if (qSize !=0 && qSize %1000 == 0) {
        LOG.info("Size of event-queue is " + qSize);
      }
      int remCapacity = eventQueue.remainingCapacity();
      if (remCapacity < 1000) {
        LOG.warn("Very low remaining capacity in the event-queue: "
            + remCapacity);
      }
      try {
        eventQueue.put(event);//放进队列
        drained = false;
      } catch (InterruptedException e) {
        if (!stopped) {
          LOG.warn("AsyncDispatcher thread interrupted", e);
        }
        throw new RuntimeException(e);
      }
    };
  }

上述完成生产,再看消费如下实现的:

@Override
protected void serviceStart() throws Exception {
  //start all the components
  super.serviceStart();
  eventHandlingThread = new Thread(createThread()); // 调用创建消费eventQueue队列中事件的线程
  eventHandlingThread.setName("AsyncDispatcher event handler");
  eventHandlingThread.start();
}

查看createThread()方法,如下所示:

Runnable createThread() {
    return new Runnable() {
      @Override
      public void run() {
        while (!stopped && !Thread.currentThread().isInterrupted()) {
          drained = eventQueue.isEmpty();
          // blockNewEvents is only set when dispatcher is draining to stop,
          // adding this check is to avoid the overhead of acquiring the lock
          // and calling notify every time in the normal run of the loop.
          if (blockNewEvents) {
            synchronized (waitForDrained) {
              if (drained) {
                waitForDrained.notify();
              }
            }
          }
          Event event;
          try {
            event = eventQueue.take();
          } catch(InterruptedException ie) {
            if (!stopped) {
              LOG.warn("AsyncDispatcher thread interrupted", ie);
            }
            return;
          }
          if (event != null) {
            dispatch(event);//分发事件
          }
        }
      }
    };
  }

从eventQueue队列中取出Event,然后调用dispatch(event);来处理事件,看dispatch(event)方法,如下所示:

protected void dispatch(Event event) {
  //all events go thru this loop
  if (LOG.isDebugEnabled()) {
    LOG.debug("Dispatching the event " + event.getClass().getName() + "."
        + event.toString());
  }

  Class<? extends Enum> type = event.getType().getDeclaringClass();

  try{
    EventHandler handler = eventDispatchers.get(type); //通过event获取事件类型,根据事件类型得到注册的EventHandler
    if(handler != null) {
      handler.handle(event); //EventHandler处理事件event
    } else {
      throw new Exception("No handler for registered for " + type);
    }
  } catch (Throwable t) {
    //TODO Maybe log the state of the queue
    LOG.fatal("Error in dispatcher thread", t);
    // If serviceStop is called, we should exit this thread gracefully.
    if (exitOnDispatchException
        && (ShutdownHookManager.get().isShutdownInProgress()) == false
        && stopped == false) {
      LOG.info("Exiting, bbye..");
      System.exit(-1);
    }
  }
}
整个过程使用生产--消费者模型,异步事件处理,整体实现起来还是很简单的!


优秀的个人博客,低调大师

微信关注我们

原文链接:https://yq.aliyun.com/articles/238551

转载内容版权归作者及来源网站所有!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

相关文章

发表评论

资源下载

更多资源
Mario,低调大师唯一一个Java游戏作品

Mario,低调大师唯一一个Java游戏作品

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

Oracle Database,又名Oracle RDBMS

Oracle Database,又名Oracle RDBMS

Oracle Database,又名Oracle RDBMS,或简称Oracle。是甲骨文公司的一款关系数据库管理系统。它是在数据库领域一直处于领先地位的产品。可以说Oracle数据库系统是目前世界上流行的关系数据库管理系统,系统可移植性好、使用方便、功能强,适用于各类大、中、小、微机环境。它是一种高效率、可靠性好的、适应高吞吐量的数据库方案。

Apache Tomcat7、8、9(Java Web服务器)

Apache Tomcat7、8、9(Java Web服务器)

Tomcat是Apache 软件基金会(Apache Software Foundation)的Jakarta 项目中的一个核心项目,由Apache、Sun 和其他一些公司及个人共同开发而成。因为Tomcat 技术先进、性能稳定,而且免费,因而深受Java 爱好者的喜爱并得到了部分软件开发商的认可,成为目前比较流行的Web 应用服务器。

Sublime Text 一个代码编辑器

Sublime Text 一个代码编辑器

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。