SpringCloudRPC调用核心原理:RxJava响应式编程框架,观察者模式

愿天堂没有BUG

共 10182字,需浏览 21分钟

 ·

2022-03-06 13:05


RxJava响应式编程框架

在Spring Cloud框架中涉及的Ribbon和Hystrix两个重要的组件都使用了RxJava响应式编程框架,其作为重要的编程基础知识,特开辟一章对RxJava的使用进行详细的介绍。

Hystrix和Ribbon的代码中大量运用了RxJava的API,对于有RxJava基础的同学,学习Hystrix和Ribbon并不是一件难事。如果不懂RxJava,对于Hystrix和Ribbon的学习就会令人头疼不已。


从基础原理讲起:观察者模式

本文的重要特色,从基础原理讲起。只有了解了基础原理,大家对新的知识,特别是复杂的知识才能更加容易地理解和掌握。

RxJava是基于观察者模式实现的,这里先带领大家复习一下观察者模式的基础原理和经典实现。当然,这也是Java工程师面试必备的一个重要知识点。

观察者模式的基础原理

观察者模式是常用的设计模式之一,是所有Java工程师必须掌握的设计模式。观察者模式也叫发布订阅模式。

此模式的角色中有一个可观察的主题对象Subject,有多个观察者Observer去关注它。当Subject的状态发生变化时,会自动通知这些Observer订阅者,令Observer做出响应。

在整个观察者模式中一共有4个角色:Subject(抽象主题、抽象被观察者)、Concrete Subject(具体主题、具体被观察者)、Observer(抽象观察者)以及ConcreteObserver(具体观察者)。

观察者模式的4个角色以及它们之间的关系如图4-1所示。

图4-1 观察者模式的4个角色以及它们之间的关系

观察者模式中4个角色的介绍如下:

(1)Subject(抽象主题):Subject抽象主题的主要职责之一为维护Observer观察者对象的集合,集合里的所有观察者都订阅过该主题。Subject抽象主题负责提供一些接口,可以增加、删除和更新观察者对象。

(2)ConcreteSubject(具体主题):ConcreteSubject用于保持主题的状态,并且在主题的状态发生变化时给所有注册过的观察者发出通知。具体来说,ConcreteSubject需要调用Subject(抽象主题)基类的通知方法给所有注册过的观察者发出通知。

(3)Observer(抽象观察者):观察者的抽象类定义更新接口,使得被观察者可以在收到主题通知的时候更新自己的状态。

(4)ConcreteObserver(具体观察者):实现抽象观察者Observer所定义的更新接口,以便在收到主题的通知时完成自己状态的真正更新。

观察者模式的经典实现

首先来看Subject主题类的代码实现:它将所有订阅过自己的Observer观察者对象保存在一个集合中,然后提供一组方法完成Observer观察者的新增、删除和通知。

Subject主题类的参考代码实现如下:

package com.crazymaker.demo.observerPattern;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public class Subject {
//保存订阅过自己的观察者对象
private List observers = new ArrayList<>();
//观察者对象订阅
public void add(Observer observer) {
observers.add(observer);
log.info( "add an observer");
}
//观察者对象注销
public void remove(Observer observer) {
observers.remove(observer);
log.info( "remove an observer");
}
//通知所有注册的观察者对象
public void notifyObservers(String newState) {
for (Observer observer : observers) {
observer.update(newState);
}
}
}

接着来看ConcreteSubject具体主题类:它首先拥有一个成员用于保持主题的状态,并且在主题的状态变化时调用基类Subject(抽象主题)的通知方法给所有注册过的观察者发出通知。

package com.crazymaker.demo.observerPattern;
import lombok.extern.slf4j.Slf4j;
@Data
@Slf4jpublic class ConcreteSubject extends Subject {
private String state; //保持主题的状态
public void change(String newState) {
state = newState;
log.info( "change state :" + newState);
//状态发生改变,通知观察者
notifyObservers(newState);
}
}

然后来看一下观察者Observer接口,它抽象出了一个观察者自身的状态更新方法。

package com.crazymaker.demo.observerPattern;
public interface Observer {
void update(String newState); //状态更新的方法
}

接着来看ConcreteObserver具体观察者类:它首先接收主题的通知,实现抽象观察者Observer所定义的update接口,以便在收到主题的状态发生变化时完成自己的状态更新。

package com.crazymaker.demo.observerPattern;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ObserverA implements Observer {
//观察者状态
private String observerState;
@Override
public void update(String newState) {
//更新观察者状态,让它与主题的状态一致
observerState = newState;
log.info( "观察者的当前状态为:"+observerState);
}
}

4个角色的实现代码已经介绍完了。如何使用观察者模式呢?步骤如下:

package com.crazymaker.demo.observerPattern;
public class ObserverPatternDemo {
public static void main(String[] args) {
//第一步:创建主题
ConcreteSubject mConcreteSubject = new ConcreteSubject();
//第二步:创建观察者
Observer observerA = new ObserverA();
Observer ObserverB = new ObserverA();
//第三步:主题订阅
mConcreteSubject.add(observerA);
mConcreteSubject.add(ObserverB);
//第四步:主题状态变更
mConcreteSubject.change("倒计时结束,开始秒杀");
}
}

运行示例程序,结果如下:

22:46:03.548 [main] INFO c.c.d.o.ConcreteSubject - change state:倒计时结束,开始秒杀
22:46:03.548 [main] INFO c.c.d.o.ObserverA -观察者的当前状态为:倒计时结束,开始秒杀
22:46:03.548 [main] INFO c.c.d.o.ObserverA - 观察者的当前状态为:倒计时结束,开始秒杀

RxJava中的观察者模式

RxJava是基于观察者模式设计的。RxJava中的Observable类和Subscriber类分别对应观察者模式中的Subject(抽象主题)和Observer(抽象观察者)两个角色。

在RxJava中,Observable和Subscriber通过subscribe()方法实现订阅关系,如图4-2所示。

图4-2 RxJava通过subscribe()方法实现订阅关系

在RxJava中,Observable和Subscriber之间通过emitter.onNext(...)弹射的方式实现主题的消息发布,如图4-3所示。

图4-3 RxJava通过emitter.onNext()弹射主题消息

RxJava中主题的消息发布方式之一是通过内部的弹射器Emitter完成。Emitter除了使用onNext()方法弹射消息之外,还定义了两个特殊的通知方法:onCompleted()和onError()。

(1)onCompleted():表示消息序列弹射完结。

RxJava主题(可观察者)中的Emitter可以不只发布(弹射)一个消息,可以重复使用其onNext()方法弹射一系列消息(或事件),这一系列消息组成一个序列。在绝大部分场景下,Observable内部有一个专门的Queue(队列)来负责缓存消息序列。当Emitter明确不会再有新的消息弹射出来时,需要触发onCompleted()方法,作为消息序列的结束标志。

RxJava主题(可观察者)的Emitter弹射器所弹出的消息序列也可以称为消息流。

(2)onError():表示主题的消息序列异常终止。

如果Observable在事件处理过程中出现异常,Emitter的onError()就会被触发,同时消息序列自动终止,不允许再有消息弹射出来。

RxJava的一个简单使用示例代码如下:

package com.crazymaker.demo.observerPattern;
//省略import
@Slf4j
public class RxJavaObserverDemo {
/**
*演示RxJava中的Observer模式
*/

@Test
public void rxJavaBaseUse() {
//被观察者(主题)
Observable observable = Observable.create(
new Action1>() {
@Override
public void call(Emitter emitter) {
emitter.onNext("apple");
emitter.onNext("banana");
emitter.onNext("pear"); emitter.onCompleted();
}
},Emitter.BackpressureMode.NONE);
//订阅者(观察者)
Subscriber subscriber = new Subscriber() {
@Override
public void onNext(String s) {
log.info("onNext: {}", s);
}
@Override
public void onCompleted() {
log.info("onCompleted");
}
@Override
public void onError(Throwable e) {
log.info("onError");
}
};
//订阅:Observable与Subscriber之间依然通过subscribe()进行关联
observable.subscribe(subscriber);
}
}

运行这个示例程序,结果如下:

11:29:07.555 [main] INFO c.c.d.o.RxJavaObserverDemo - onNext: apple
11:29:07.564 [main] INFO c.c.d.o.RxJavaObserverDemo - onNext: banana
11:29:07.564 [main] INFO c.c.d.o.RxJavaObserverDemo - onNext: pear
11:29:07.564 [main] INFO c.c.d.o.RxJavaObserverDemo - onCompleted

通过代码和运行接口可以看出:被观察者Observable与观察者Subscriber产生关联通过subscribe()方法完成。当订阅开始时,Observable主题便开始发送事件。

通过代码还可以看出:Subscriber有3个回调方法,其中onNext(String s)回调方法用于响应Observable主题正常的弹射消息,onCompleted()回调方法用于响应Observable主题的结束消息,onError(Throwable e)回调方法用于响应Observable主题的异常消息。在一个消息序列中,Emitter弹射器的onCompleted()正常结束和onError()异常终止只能调用一个,并且必须是消息序列中最后一个被发送的消息。换句话说,Emitter的onCompleted()和onError()两个方法是互斥的,在消息序列中调用了其中一个,就不可以再调用另一个。

通过示例可以看出,RxJava与经典的观察者模式不同。在RxJava中,主题内部有一个弹射器的角色,而经典的观察者模式中,主题所发送的是单个消息,并不是一个消息序列。

在RxJava中,Observable主题还会负责消息序列缓存,这一点像经典的生产者/消费者模式。在经典的生产者/消费者模式中,生产者生产数据后放入缓存队列,自己不进行处理,而消费者从缓存队列里拿到所要处理的数据,完成逻辑处理。从这一点来说,RxJava借鉴了生产者消费者模式的思想。

 RxJava的不完整回调

Java 8引入函数式编程方式大大地提高了编码效率。但是,Java8的函数式编程有一个非常重要的要求:需要函数式接口作为支撑。什么是函数式接口呢?指的是有且只有一个抽象方法的接口,比如Java中内置的Runnable接口。

RxJava的一大特色是支持函数式的编程。由于标准的Subscriber观察者接口有3个抽象方法,当然就不是一个函数式接口,因此直接使用Subscriber观察者接口是不支持函数式编程的。

RxJava为了支持函数式编程,另外定义了几个函数式接口,比较重要的有Action0和Action1。

1.Action0回调接口

这是一个无参数、无返回值的函数式接口,源码如下:

package rx.functions;
/**
*A zero-argument action.
*/

public interface Action0 extends Action {
void call();
}

Action0接口的call()方法无参数、无返回值,它的具体使用场景对应于Subscriber观察者中的onCompleted()回调方法的使用场景,因为Subscriber的onCompleted()回调方法也是无参数、无返回值的。

2.Action1回调接口

这是一个有一个参数、泛型、无返回值的函数式接口,源码如下:

package rx.functions;
/**
*A one-argument action.
*@param the first argument type
*/

public interface Action1<T> extends Action {
void call(T t);
}

Action1回调接口主要有以下两种用途:

(1)作为函数式编程替代使用Subscriber的onNext()方法的传统编程,前提是Action1回调接口的泛型类型与Subscriber的onNext()回调方法的参数类型保持一致。

(2)作为函数式编程替代使用Subscriber的onErrorAction(Throwable e)方法的传统编程,前提是Action1回调接口的泛型类型与Subscriber的onErrorAction()回调方法的参数类型保持一致。

Action1接口承担的主要是观察者(订阅者)角色,所以RxJava为主题类提供了重载的subscribe(Action1 action)订阅方法,可以接收一个Action1回调接口的实现对象作为弹射消息序列的订阅者。

下面使用不完整回调实现4.1.3节的例子,大家可以对比一下。具体的源码如下:

package com.crazymaker.demo.observerPattern;
//省略import
@Slf4j
public class RxJavaObserverDemo {
/**
演示
中的不完整观察者 *演示RxJava中的不完整观察者
*/

@Test
public void rxJavaActionDemo() {
//被观察者(主题)
Observable observable = Observable.create(
new Action1>() {
@Override
public void call(Emitter emitter) {
emitter.onNext("apple");
emitter.onNext("banana");
emitter.onNext("pear");
emitter.onCompleted();
}
},Emitter.BackpressureMode.NONE);
Action1 onNextAction = new Action1() {
@Override
public void call(String s) {
log.info(s);
}
};
Action1 onErrorAction = new Action1() {
@Override
public void call(Throwable throwable) {
log.info("onError,Error Info is:" + throwable.getMessage());
}
};
Action0 onCompletedAction = new Action0() {
@Override
public void call() {
log.info("onCompleted");
}
};
log.info("第1次订阅:");
//根据onNextAction来定义onNext()
observable.subscribe(onNextAction);
log.info("第2次订阅:");
//根据onNextAction来定义onNext(),根据onErrorAction来定义onError()
observable.subscribe(onNextAction, onErrorAction);
log.info("第3次订阅:");
//根据onNextAction来定义onNext(),根据onErrorAction来定义onError()
//根据onCompletedAction来定义onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
}
}

运行这个示例程序,结果如下:

11:06:22.015 [main] INFO c.c.d.o.RxJavaObserverDemo - 第1次订阅:
11:06:22.015 [main] INFO c.c.d.o.RxJavaObserverDemo - apple
11:06:22.015 [main] INFO c.c.d.o.RxJavaObserverDemo - banana
11:06:22.015 [main] INFO c.c.d.o.RxJavaObserverDemo - pear
11:06:22.015 [main] INFO c.c.d.o.RxJavaObserverDemo - 第2次订阅:
11:06:22.015 [main] INFO c.c.d.o.RxJavaObserverDemo - apple
11:06:22.016 [main] INFO c.c.d.o.RxJavaObserverDemo - banana
11:06:22.016 [main] INFO c.c.d.o.RxJavaObserverDemo - pear
11:06:22.016 [main] INFO c.c.d.o.RxJavaObserverDemo - 第3次订阅:
11:06:22.016 [main] INFO c.c.d.o.RxJavaObserverDemo - apple
11:06:22.016 [main] INFO c.c.d.o.RxJavaObserverDemo - banana
11:06:22.016 [main] INFO c.c.d.o.RxJavaObserverDemo - pear
11:06:22.016 [main] INFO c.c.d.o.RxJavaObserverDemoonCompleted

在上面的代码中,observable被订阅了3次,由于没有异常消息,因此从输出中只能看到正常消息和结束消息。

总之,RxJava提供的Action0回调接口和Action1回调接口可以看作Subscriber观察者接口的阉割版本和函数式编程版本。使用RxJava的不完整回调观察者接口并结合Java 8的函数式编程,能够编写出更为简洁和灵动的代码。

RxJava的函数式编程

有了Action0和Action1这两个函数式接口,就可以使用RxJava进行函数式编程了。下面使用函数式编程的风格实现上节的例子,大家对比一下。

public class RxJavaObserverDemo {
...
/**
*演示RxJava中的Lamda表达式实现
*/

@Test
public void rxJavaActionLamda()
{
Observable observable =
Observable.just("apple", "banana", "pear");
log.info("第1次订阅:");
//使用Action1 函数式接口来实现onNext回调
observable.subscribe(s -> log.info(s));
log.info("第2次订阅:");
//使用Action1 函数式接口来实现onNext回调
//使用Action1 函数式接口来实现onError回调
observable.subscribe(
s -> log.info(s),
e -> log.info("Error Info is:" + e.getMessage()));
log.info("第3次订阅:");
//使用Action1 函数式接口来实现onNext回调
//使用Action1 函数式接口来实现onError回调
//使用Action0 函数式接口来实现onCompleted回调
observable.subscribe(
s -> log.info(s),
e -> log.info("Error Info is:" + e.getMessage()),
() -> log.info("onCompleted弹射结束"));
}
}

运行这个示例程序,输出的结果和4.1.4节的示例程序的输出结果是一致的,所以这里不再赘述。对比4.1.4节的程序可以看出,RxJava的函数式编程比普通的Java编程简洁很多。

实际上,在RxJava源码中,Observable类的subscribe()订阅方法的重载版本中使用的是一个ActionSubscriber包装类实例,对3个函数式接口实例进行包装。所以,最终的消息订阅者还是一个Subscriber类型的实例。下面是Observable类的一个重载的subscribe(...)订阅方法的源码,具体如下:

public final Subscription subscribe(final Action1super T> onNext,
final Action1 onError, final Action0 onCompleted)

{
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
if (onError == null) {
throw new IllegalArgumentException("onError can not be null");
}
if (onCompleted == null) {
throw new IllegalArgumentException("onComplete can not be null");
}
//通过包装类进行包装
return subscribe(new ActionSubscriber(onNext, onError, onCompleted));
}

上面的源码中用到的ActionSubscriber类是Subscriber接口的一个实现类,主要用于包装3个函数式接口的实现。

RxJava的操作符

RxJava的操作符实质上是为了方便数据流的操作,是RxJava为Observable主题所定义的一系列函数。

RxJava的操作符按照其作用具体可以分为以下几类:

(1)创建型操作符:创建一个可观察对象Observable主题对象,并根据输入参数弹射数据。

(2)过滤型操作符:从Observable弹射的消息流中过滤出满足条件的消息。

(3)转换型操作符:对Observable弹射的消息执行转换操作。

(4)聚合型操作符:对Observable弹射的消息流进行聚合操作,比如统计数量等。

本文给大家讲解的内容是SpringCloudRPC远程调用核心原理:RxJava响应式编程框架,从基础原理讲起:观察者模式

  1. 下篇文章给大家讲解的是SpringCloudRPC远程调用核心原理:RxJava响应式编程框架,创建型操作符;

  2. 觉得文章不错的朋友可以转发此文关注小编;

  3. 感谢大家的支持!


在腾讯课堂每晚八点都有公开课的 您也可以看看 要是有感兴趣的您再联系我 我再给您找对应的资料

浏览 38
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报