SpringCloudRPC核心原理:RxJava响应式编程框架Scheduler调度器
RxJava的Scheduler调度器
顾名思义,Scheduler是一种用来对RxJava流操作进行调度的类,从Scheduler的工厂方法可以获取现有调度器的实现,如下:
(1)Schedulers.io():用于获取内部的ioScheduler调度器实例。
(2)Schedulers.newThread():用于获取内部的newThreadScheduler调度器实例,该调度器为RxJava流操作创建一个新线程。
(3)Schedulers.computation():用于获取内部的computationScheduler调度器实例。
(4)Schedulers.trampoline():使用当前线程立即执行RxJava流操作。
(5)Schedulers.single():使用RxJava内置的单例线程执行RxJava流操作。
关于以上5个获取调度器的方法具体介绍如下:
(1)Schedulers.io():获取内部的ioScheduler调度器实例主要用于IO密集型的流操作,例如读写SD卡文件、查询数据库、访问网络等。此调度器具有线程缓存机制,在接收到任务后,先检查线程缓存池中是否有空闲的线程,如果有就复用,如果没有就创建新的线程,并加入IO专用线程池中,如果专用线程池每次都没有空闲线程可用,就可以无上限地创建新线程。
(2)Schedulers.newThread():每执行一个RxJava流操作创建一个新的线程,不具有线程缓存机制,因为创建一个新的线程比复用一个线程更耗时耗力,Schedulers.newThread()的效率没有Schedulers.io()的效率高。
(3)Schedulers.computation():获取内部的具有固定线程池的内部computationScheduler调度器实例,用于执行CPU密集型的流操作,线程数大小为CPU的核数。不可以用于I/O操作,例如不能用于XML/JSON文件的解析、Bitmap图片的压缩取样等,因为I/O操作会浪费CPU时间。
(4)Schedulers.trampoline():如果要在当前线程执行流操作,而当前线程有任务在执行,就会等当前任务执行完之后再接着执行流操作。
(5)Schedulers.single():RxJava拥有一个专用的线程单例,此调度器负责的所有流操作都在这个线程中执行,当此线程中有任务执行时,其他任务将会按照先进先出的顺序依次排队。
一个简单的调度器使用实例的代码如下:
package com.crazymaker.demo.rxJava.basic;import lombok.extern.slf4j.Slf4j;import org.junit.Test;import rx.Observable;import rx.Subscriber;import rx.schedulers.Schedulers;@Slf4jpublic class SchedulerDemo { /** *演示Schedulers的基本使用 */ @Test public void testScheduler() throws InterruptedException { //被观察者 Observable observable = Observable.create( new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { for (int i = 0; i < 5; i++) { log.info("produce ->" + i); subscriber.onNext(String.valueOf(i)); } subscriber.onCompleted(); } }); //订阅Observable与Subscriber之间依然通过subscribe()进行关联 observable //使用具有线程缓存机制的可复用线程 .subscribeOn(Schedulers.io()) //每执行一个任务创建一个新的线程 .observeOn(Schedulers.newThread()) .subscribe(s -> { log.info("consumer ->" + s); }); Thread.sleep(Integer.MAX_VALUE); }}
运行这个演示程序,输出的部分结果如下:
17:04:17.922 [RxIoScheduler-2] INFO c.c.d.r.b.SchedulerDemo - produce ->017:04:17.932 [RxIoScheduler-2] INFO c.c.d.r.b.SchedulerDemo - produce ->117:04:17.932 [RxNewThreadScheduler-1] INFO c.c.d.r.b.SchedulerDemo - consumer ->017:04:17.933 [RxIoScheduler-2] INFO c.c.d.r.b.SchedulerDemo - produce ->217:04:17.933 [RxNewThreadScheduler-1] INFO c.c.d.r.b.SchedulerDemo - consumer ->117:04:17.933 [RxIoScheduler-2] INFO c.c.d.r.b.SchedulerDemo - produce ->317:04:17.933 [RxNewThreadScheduler-1] INFO c.c.d.r.b.SchedulerDemo - consumer ->217:04:17.933 [RxIoScheduler-2] INFO c.c.d.r.b.SchedulerDemo - produce ->417:04:17.933 [RxNewThreadScheduler-1] INFO c.c.d.r.b.SchedulerDemo - consumer ->317:04:17.933 [RxNewThreadScheduler-1] INFO c.c.d.r.b.SchedulerDemo - consumer ->4
通过上面的代码可以看出,RxJava提供了两个方法来改变流操作的调度器:
(1)subscribeOn():主要改变的是弹射的线程。
(2)observeOn():主要改变的是订阅的线程。
在RxJava中,创建操作符创建的Observable主题的弹射任务,将由其后最近的subscribeOn()所设置的调度器负责执行。
在RxJava中,Observable主题的下游消费型操作(如流转换等)的线程调度,将由其前面最近的observeOn()所设置的调度器负责。observeOn()可以多次设置,每一次设置都对下一次observeOn()设置之前的流操作产生作用。
本文给大家讲解的内容是SpringCloudRPC远程调用核心原理:RxJava响应式编程框架,RxJava的Scheduler调度器
下篇文章给大家讲解的是SpringCloudRPC远程调用核心原理:RxJava响应式编程框架,背压问题的几种应对模式;
觉得文章不错的朋友可以转发此文关注小编;
感谢大家的支持!
本文就是愿天堂没有BUG给大家分享的内容,大家有收获的话可以分享下,想学习更多的话可以到微信公众号里找我,我等你哦。