SpringCloudRPC核心原理:RxJava响应式编程框架Scheduler调度器

共 3206字,需浏览 7分钟

 ·

2022-08-25 18:18

RxJava的Scheduler调度器

顾名思义,Scheduler是一种用来对RxJava流操作进行调度的类,从Scheduler的工厂方法可以获取现有调度器的实现,如下:

(1)Schedulers.io():用于获取内部的ioScheduler调度器实例。

(2)Schedulers.newThread():用于获取内部的newThreadScheduler调度器实例,该调度器为RxJava流操作创建一个新线程。

(3)Schedulers.computation():用于获取内部的computationScheduler调度器实例。

(4)Schedulers.trampoline():使用当前线程立即执行RxJava流操作。

(5)Schedulers.single():使用RxJava内置的单例线程执行RxJava流操作。

关于以上5个获取调度器的方法具体介绍如下:

(1)Schedulers.io():获取内部的ioScheduler调度器实例主要用于IO密集型的流操作,例如读写SD卡文件、查询数据库、访问网络等。此调度器具有线程缓存机制,在接收到任务后,先检查线程缓存池中是否有空闲的线程,如果有就复用,如果没有就创建新的线程,并加入IO专用线程池中,如果专用线程池每次都没有空闲线程可用,就可以无上限地创建新线程。

(2)Schedulers.newThread():每执行一个RxJava流操作创建一个新的线程,不具有线程缓存机制,因为创建一个新的线程比复用一个线程更耗时耗力,Schedulers.newThread()的效率没有Schedulers.io()的效率高。

(3)Schedulers.computation():获取内部的具有固定线程池的内部computationScheduler调度器实例,用于执行CPU密集型的流操作,线程数大小为CPU的核数。不可以用于I/O操作,例如不能用于XML/JSON文件的解析、Bitmap图片的压缩取样等,因为I/O操作会浪费CPU时间。

(4)Schedulers.trampoline():如果要在当前线程执行流操作,而当前线程有任务在执行,就会等当前任务执行完之后再接着执行流操作。

(5)Schedulers.single():RxJava拥有一个专用的线程单例,此调度器负责的所有流操作都在这个线程中执行,当此线程中有任务执行时,其他任务将会按照先进先出的顺序依次排队。


一个简单的调度器使用实例的代码如下:

package com.crazymaker.demo.rxJava.basic;import lombok.extern.slf4j.Slf4j;import org.junit.Test;import rx.Observable;import rx.Subscriber;import rx.schedulers.Schedulers;@Slf4jpublic class SchedulerDemo { /** *演示Schedulers的基本使用 */ @Test public void testScheduler() throws InterruptedException { //被观察者 Observable observable = Observable.create( new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { for (int i = 0; i < 5; i++) { log.info("produce ->" + i); subscriber.onNext(String.valueOf(i)); } subscriber.onCompleted(); } }); //订阅Observable与Subscriber之间依然通过subscribe()进行关联 observable //使用具有线程缓存机制的可复用线程 .subscribeOn(Schedulers.io()) //每执行一个任务创建一个新的线程 .observeOn(Schedulers.newThread()) .subscribe(s -> { log.info("consumer ->" + s); }); Thread.sleep(Integer.MAX_VALUE); }}

运行这个演示程序,输出的部分结果如下:

17:04:17.922 [RxIoScheduler-2] INFO c.c.d.r.b.SchedulerDemo - produce ->017:04:17.932 [RxIoScheduler-2] INFO c.c.d.r.b.SchedulerDemo - produce ->117:04:17.932 [RxNewThreadScheduler-1] INFO c.c.d.r.b.SchedulerDemo - consumer ->017:04:17.933 [RxIoScheduler-2] INFO c.c.d.r.b.SchedulerDemo - produce ->217:04:17.933 [RxNewThreadScheduler-1] INFO c.c.d.r.b.SchedulerDemo - consumer ->117:04:17.933 [RxIoScheduler-2] INFO c.c.d.r.b.SchedulerDemo - produce ->317:04:17.933 [RxNewThreadScheduler-1] INFO c.c.d.r.b.SchedulerDemo - consumer ->217:04:17.933 [RxIoScheduler-2] INFO c.c.d.r.b.SchedulerDemo - produce ->417:04:17.933 [RxNewThreadScheduler-1] INFO c.c.d.r.b.SchedulerDemo - consumer ->317:04:17.933 [RxNewThreadScheduler-1] INFO c.c.d.r.b.SchedulerDemo - consumer ->4

通过上面的代码可以看出,RxJava提供了两个方法来改变流操作的调度器:

(1)subscribeOn():主要改变的是弹射的线程。

(2)observeOn():主要改变的是订阅的线程。

在RxJava中,创建操作符创建的Observable主题的弹射任务,将由其后最近的subscribeOn()所设置的调度器负责执行。

在RxJava中,Observable主题的下游消费型操作(如流转换等)的线程调度,将由其前面最近的observeOn()所设置的调度器负责。observeOn()可以多次设置,每一次设置都对下一次observeOn()设置之前的流操作产生作用。


本文给大家讲解的内容是SpringCloudRPC远程调用核心原理:RxJava响应式编程框架,RxJava的Scheduler调度器

  1. 下篇文章给大家讲解的是SpringCloudRPC远程调用核心原理:RxJava响应式编程框架,背压问题的几种应对模式

  2. 觉得文章不错的朋友可以转发此文关注小编;

  3. 感谢大家的支持!

本文就是愿天堂没有BUG给大家分享的内容,大家有收获的话可以分享下,想学习更多的话可以到微信公众号里找我,我等你哦。

浏览 15
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报