您现在的位置是:首页 > 文章详情

spring-boot项目整合Disruptor的初步使用

日期:2019-06-17点击:766

1.在项目的pom文件中配置

 <dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.4.2</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>r09</version> </dependency>

2.创建BaseQueueHelper

/** * lmax.disruptor 高效队列处理模板. 支持初始队列,即在init()前进行发布。 * * 调用init()时才真正启动线程开始处理 系统退出自动清理资源. */ public abstract class BaseQueueHelper<D, E extends ValueWrapper<D>, H extends WorkHandler<E>> { /** * 记录所有的队列,系统退出时统一清理资源 */ private static List<BaseQueueHelper> queueHelperList = new ArrayList<BaseQueueHelper>(); /** * Disruptor 对象 */ private Disruptor<E> disruptor; /** * RingBuffer */ private RingBuffer<E> ringBuffer; /** * initQueue */ private List<D> initQueue = new ArrayList<D>(); /** * 队列大小 * * @return 队列长度,必须是2的幂 */ protected abstract int getQueueSize(); /** * 事件工厂 * * @return EventFactory */ protected abstract EventFactory<E> eventFactory(); /** * 事件消费者 * * @return WorkHandler[] */ protected abstract WorkHandler[] getHandler(); /** * 初始化 */ public void init() { ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("DisruptorThreadPool").build(); disruptor = new Disruptor<E>(eventFactory(), getQueueSize(), namedThreadFactory, ProducerType.SINGLE, getStrategy()); disruptor.setDefaultExceptionHandler(new MyHandlerException()); disruptor.handleEventsWithWorkerPool(getHandler()); ringBuffer = disruptor.start(); //初始化数据发布 for (D data : initQueue) { ringBuffer.publishEvent(new EventTranslatorOneArg<E, D>() { @Override public void translateTo(E event, long sequence, D data) { event.setValue(data); } }, data); } //加入资源清理钩子 synchronized (queueHelperList) { if (queueHelperList.isEmpty()) { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { for (BaseQueueHelper baseQueueHelper : queueHelperList) { baseQueueHelper.shutdown(); } } }); } queueHelperList.add(this); } } /** * 如果要改变线程执行优先级,override此策略. YieldingWaitStrategy会提高响应并在闲时占用70%以上CPU, * 慎用SleepingWaitStrategy会降低响应更减少CPU占用,用于日志等场景. * * @return WaitStrategy */ protected abstract WaitStrategy getStrategy(); /** * 插入队列消息,支持在对象init前插入队列,则在队列建立时立即发布到队列处理. */ public synchronized void publishEvent(D data) { if (ringBuffer == null) { initQueue.add(data); return; } ringBuffer.publishEvent(new EventTranslatorOneArg<E, D>() { @Override public void translateTo(E event, long sequence, D data) { event.setValue(data); } }, data); } /** * 关闭队列 */ public void shutdown() { disruptor.shutdown(); } }

3.创建MyHandlerException

 public class MyHandlerException implements ExceptionHandler { private Logger logger = LoggerFactory.getLogger(MyHandlerException.class); /* * (non-Javadoc) 运行过程中发生时的异常 * * @see * com.lmax.disruptor.ExceptionHandler#handleEventException(java.lang.Throwable * , long, java.lang.Object) */ @Override public void handleEventException(Throwable ex, long sequence, Object event) { ex.printStackTrace(); logger.error("process data error sequence ==[{}] event==[{}] ,ex ==[{}]", sequence, event.toString(), ex.getMessage()); } /* * (non-Javadoc) 启动时的异常 * * @see * com.lmax.disruptor.ExceptionHandler#handleOnStartException(java.lang. * Throwable) */ @Override public void handleOnStartException(Throwable ex) { logger.error("start disruptor error ==[{}]!", ex.getMessage()); } /* * (non-Javadoc) 关闭时的异常 * * @see * com.lmax.disruptor.ExceptionHandler#handleOnShutdownException(java.lang * .Throwable) */ @Override public void handleOnShutdownException(Throwable ex) { logger.error("shutdown disruptor error ==[{}]!", ex.getMessage()); } }

4.创建ValueWrapper

public abstract class ValueWrapper<T> { private T value; public ValueWrapper() {} public ValueWrapper(T value) { this.value = value; } public T getValue() { return value; } public void setValue(T value) { this.value = value; } }

5.创建EventFactory

public class EventFactory implements com.lmax.disruptor.EventFactory<SeriesDataEvent> { @Override public SeriesDataEvent newInstance() { return new SeriesDataEvent(); } }
  1. 创建DisruptorConfig
@Configuration @ComponentScan(value = {"com.demo.disruptor"}) public class DisruptorConfig { /** * smsParamEventHandler1 * * @return SeriesDataEventHandler */ @Bean public SeriesDataEventHandler smsParamEventHandler1() { return new SeriesDataEventHandler(); } /** * smsParamEventHandler2 * * @return SeriesDataEventHandler */ @Bean public SeriesDataEventHandler smsParamEventHandler2() { return new SeriesDataEventHandler(); } /** * smsParamEventHandler3 * * @return SeriesDataEventHandler */ @Bean public SeriesDataEventHandler smsParamEventHandler3() { return new SeriesDataEventHandler(); } /** * smsParamEventHandler4 * * @return SeriesDataEventHandler */ @Bean public SeriesDataEventHandler smsParamEventHandler4() { return new SeriesDataEventHandler(); } /** * smsParamEventHandler5 * * @return SeriesDataEventHandler */ @Bean public SeriesDataEventHandler smsParamEventHandler5() { return new SeriesDataEventHandler(); } /** * smsParamEventHandler5 * * @return SeriesDataEventHandler */ @Bean public SeriesDataEventHandler smsParamEventHandler6() { return new SeriesDataEventHandler(); } }

7.创建SeriesData

public class SeriesData { private String deviceInfoStr; public SeriesData() { } public SeriesData(String deviceInfoStr) { this.deviceInfoStr = deviceInfoStr; } public String getDeviceInfoStr() { return deviceInfoStr; } public void setDeviceInfoStr(String deviceInfoStr) { this.deviceInfoStr = deviceInfoStr; } @Override public String toString() { return "SeriesData{" + "deviceInfoStr='" + deviceInfoStr + '\'' + '}'; } }

8.创建SeriesDataEvent

public class SeriesDataEvent extends ValueWrapper<SeriesData> { }

9.创建SeriesDataEventQueueHelper

@Component public class SeriesDataEventQueueHelper extends BaseQueueHelper<SeriesData, SeriesDataEvent, SeriesDataEventHandler> implements InitializingBean { private static final int QUEUE_SIZE = 1024; @Autowired private List<SeriesDataEventHandler> seriesDataEventHandler; @Override protected int getQueueSize() { return QUEUE_SIZE; } @Override protected com.lmax.disruptor.EventFactory eventFactory() { return new EventFactory(); } @Override protected WorkHandler[] getHandler() { int size = seriesDataEventHandler.size(); SeriesDataEventHandler[] paramEventHandlers = (SeriesDataEventHandler[]) seriesDataEventHandler.toArray(new SeriesDataEventHandler[size]); return paramEventHandlers; } @Override protected WaitStrategy getStrategy() { return new BlockingWaitStrategy(); //return new YieldingWaitStrategy(); } @Override public void afterPropertiesSet() throws Exception { this.init(); } }

10.创建SeriesDataEventHandler

public class SeriesDataEventHandler implements WorkHandler<SeriesDataEvent> { private Logger logger = LoggerFactory.getLogger(SeriesDataEventHandler.class); @Autowired private SocketService socketService; @Override public void onEvent(SeriesDataEvent event) { if (event.getValue() == null || StringUtils.isEmpty(event.getValue().getDeviceInfoStr())) { logger.warn("receiver series data is empty!"); } logger.error("hello word!"); } } 

11.使用操作

@Autowired private SeriesDataEventQueueHelper seriesDataEventQueueHelper; @Test public void demo(){ seriesDataEventQueueHelper.publishEvent(new SeriesData("hello word")); }

12.后面推出Disruptor的各种操作

原文链接:https://yq.aliyun.com/articles/705811
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章