Reactor中的Thread和Scheduler
简介
今天我们要介绍的是Reactor中的多线程模型和定时器模型,Reactor之前我们已经介绍过了,它实际上是观察者模式的延伸。
所以从本质上来说,Reactor是和多线程无关的。你可以把它用在多线程或者不用在多线程。
今天将会给大家介绍一下如何在Reactor中使用多线程和定时器模型。
Thread多线程
先看一下之前举的Flux的创建的例子:
Flux<String> flux = Flux.generate( () -> 0, (state, sink) -> { sink.next("3 x " + state + " = " + 3*state); if (state == 10) sink.complete(); return state + 1; }); flux.subscribe(System.out::println);
可以看到,不管是Flux generator还是subscriber,他们实际上都是运行在同一个线程中的。
如果我们想让subscribe发生在一个新的线程中,我们需要新启动一个线程,然后在线程内部进行subscribe操作。
Mono<String> mono = Mono.just("hello "); Thread t = new Thread(() -> mono .map(msg -> msg + "thread ") .subscribe(v -> System.out.println(v + Thread.currentThread().getName()) ) ); t.start(); t.join();
上面的例子中,Mono在主线程中创建,而subscribe发生在新启动的Thread中。
Schedule定时器
很多情况下,我们的publisher是需要定时去调用一些方法,来产生元素的。Reactor提供了一个新的Schedule类来负责定时任务的生成和管理。
Scheduler是一个接口:
public interface Scheduler extends Disposable
它定义了一些定时器中必须要实现的方法:
比如立即执行的:
Disposable schedule(Runnable task);
延时执行的:
default Disposable schedule(Runnable task, long delay, TimeUnit unit)
和定期执行的:
default Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit)
Schedule有一个工具类叫做Schedules,它提供了多个创建Scheduler的方法,它的本质就是对ExecutorService和ScheduledExecutorService进行封装,将其做为Supplier来创建Schedule。
简单点看Schedule就是对ExecutorService的封装。
Schedulers工具类
Schedulers工具类提供了很多个有用的工具类,我们来详细介绍一下:
Schedulers.immediate():
提交的Runnable将会立马在当前线程执行。
Schedulers.single():
使用同一个线程来执行所有的任务。
Schedulers.boundedElastic():
创建一个可重用的线程池,如果线程池中的线程在长时间内都没有被使用,那么将会被回收。boundedElastic会有一个最大的线程个数,一般来说是CPU cores x 10。 如果目前没有可用的worker线程,提交的任务将会被放入队列等待。
Schedulers.parallel():
创建固定个数的工作线程,个数和CPU的核数相关。
Schedulers.fromExecutorService(ExecutorService):
从一个现有的线程池创建Scheduler。
Schedulers.newXXX:
Schedulers提供了很多new开头的方法,来创建各种各样的Scheduler。
我们看一个Schedulers的具体应用,我们可以指定特定的Scheduler来产生元素:
Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))
publishOn 和 subscribeOn
publishOn和subscribeOn主要用来进行切换Scheduler的执行上下文。
先讲一个结论,就是在链式调用中,publishOn可以切换Scheduler,但是subscribeOn并不会起作用。
这是因为真正的publish-subscribe关系只有在subscriber开始subscribe的时候才建立。
下面我们来具体看一下这两个方法的使用情况:
publishOn
publishOn可以在链式调用的过程中,进行publish的切换:
[@Test](https://my.oschina.net/azibug) public void usePublishOn() throws InterruptedException { Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); final Flux<String> flux = Flux .range(1, 2) .map(i -> 10 + i + ":"+ Thread.currentThread()) .publishOn(s) .map(i -> "value " + i+":"+ Thread.currentThread()); new Thread(() -> flux.subscribe(System.out::println),"ThreadA").start(); System.out.println(Thread.currentThread()); Thread.sleep(5000); }
上面我们创建了一个名字为parallel-scheduler的scheduler。
然后创建了一个Flux,Flux先做了一个map操作,然后切换执行上下文到parallel-scheduler,最后右执行了一次map操作。
最后,我们采用一个新的线程来进行subscribe的输出。
先看下输出结果:
Thread[main,5,main] value 11:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main] value 12:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main]
可以看到,主线程的名字是Thread。Subscriber线程的名字是ThreadA。
那么在publishOn之前,map使用的线程就是ThreadA。 而在publishOn之后,map使用的线程就切换到了parallel-scheduler线程池。
subscribeOn
subscribeOn是用来切换Subscriber的执行上下文,不管subscribeOn出现在调用链的哪个部分,最终都会应用到整个调用链上。
我们看一个例子:
[@Test](https://my.oschina.net/azibug) public void useSubscribeOn() throws InterruptedException { Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); final Flux<String> flux = Flux .range(1, 2) .map(i -> 10 + i + ":" + Thread.currentThread()) .subscribeOn(s) .map(i -> "value " + i + ":"+ Thread.currentThread()); new Thread(() -> flux.subscribe(System.out::println), "ThreadA").start(); Thread.sleep(5000); }
同样的,上面的例子中,我们使用了两个map,然后在两个map中使用了一个subscribeOn用来切换subscribe执行上下文。
看下输出结果:
value 11:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main] value 12:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main]
可以看到,不管哪个map,都是用的是切换过的parallel-scheduler。
本文的例子learn-reactive
本文作者:flydean程序那些事
本文链接:http://www.flydean.com/reactor-thread-scheduler/
本文来源:flydean的博客
欢迎关注我的公众号:「程序那些事」最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Spring Security 实战干货:OAuth2授权请求是如何构建并执行的
在Spring Security 实战干货:客户端OAuth2授权请求的入口中我们找到了拦截OAuth2授权请求入口/oauth2/authorization的过滤器OAuth2AuthorizationRequestRedirectFilter,并找到了真正发起OAuth2授权请求的方法sendRedirectForAuthorization。但是这个方法并没有细说,所以今天接着上一篇把这个坑给补上。 2. sendRedirectForAuthorization 这个sendRedirectForAuthorization方法没多少代码,它的主要作用就是向第三方平台进行授权重定向访问。它所有的逻辑都和OAuth2AuthorizationRequest有关,因此我们对OAuth2AuthorizationRequest进行轻描淡写是不行的,我们必须掌握OAuth2AuthorizationRequest是怎么来的,干嘛用的。 OAuth2AuthorizationRequestResolver 这就需要去分析解析类OAuth2AuthorizationRequestResolver...
- 下一篇
解Bug之路-NAT引发的性能瓶颈
解Bug之路-NAT引发的性能瓶颈 笔者最近解决了一个非常曲折的问题,从抓包开始一路排查到不同内核版本间的细微差异,最后才完美解释了所有的现象。在这里将整个过程写成博文记录下来,希望能够对读者有所帮助。(篇幅可能会有点长,耐心看完,绝对物有所值~) 环境介绍 先来介绍一下出问题的环境吧,调用拓扑如下图所示: 调用拓扑图 合作方的多台机器用NAT将多个源ip映射成同一个出口ip 20.1.1.1,而我们内网将多个Nginx映射成同一个目的ip 30.1.1.1。这样,在防火墙和LVS之间,所有的请求始终是通过(20.1.1.1,30.1.1.1)这样一个ip地址对进行访问。 同时还固定了一个参数,那就是目的端口号始终是443。 短连接-HTTP1.0 由于对方是采用短连接和Nginx进行交互的,而且采用的协议是HTTP-1.0。所以我们的Nginx在每个请求完成后,会主动关闭连接,从而造成有大量的TIME_WAIT。 值得注意的是,TIME_WAIT取决于Server端和Client端谁先关闭这个Socket。所以Nginx作为Server端先关闭的话,也必然会产生TIME_WAIT。 ...
相关文章
文章评论
共有0条评论来说两句吧...