RxJS 如何助力业务开发?

共 12351字,需浏览 25分钟

 ·

2021-04-29 09:13


这次又给大家带来了 RxJS 的相关文章,本文将结合实际项目需求和示例来给大家讲解 RxJS 的优缺点和最佳实践。

本文作者:阳羡

⚠️注意啦:字节跳动春招开始了,大家ready了吗?欢迎大家找我内推,最快方式进入到部门筛选,筛选方式可以在公众号后台回复关键字,详情见本文末❤️

背景/目的

rxjs 在我们实际项目中被大量使用,并证明了收益。
本文与大家分享收益与沉淀的 best practice。

优势 - 副作用管理

个人认为 rxjs 最强的优势就是副作用管理。什么是副作用管理呢?

你未看此花时,此花与汝心同归于寂。你来看此花时,则此花颜色一时明白起来 --王阳明

一旦一个 Subscribable 未被 subscribe ,其逻辑会在执行完当前 operator 后停止。

为了最大限度的利用这个特性,个人建议把逻辑拆成更小的函数,并用更多的 operator 连接。

如果不需要这个特性,就没有使用 rxjs 的必要性,仅仅徒增复杂度而已,如果使用了 rxjs,但是却没有利用这个特性,我建议还不如直接使用 Promise 。

举一个实际的例子来说明 rxjs 是如何进行副作用管理的,比如说常见的 polling 的需求,大家一般会选择如下的写法:

const resultId = await getResultId(dsl);history.push(`?resultId=${resultId}`);const result = await polling(resultId);setState(result);

以上写法有问题吗?

其实几乎每句都有问题😂

第二句话:如果用户发起了请求后立刻切走了路由,那么第二句话会把路由切回去第三句话:如果用户发起了新的查询请求,虽然不需要上次的请求了,但是并不会取消上次的查询,浪费服务端资源与前端请求数。第四句话:如果组件被卸载了,此时再去 setState 会报一个 warning。但这不是关键问题,关键问题是 竞速问题[2]。如果前一个轮询较慢,而最新的轮询较快,用户很可能看见的结果是前一个轮询的结果。

这时候有同学说,的确,那我给每一行加上检测是否 unmount 或者 AbortController 不就行了。

你以为你在写 go 啊

这样会有两个问题:

代码变得异常臃肿,降低了可维护性、可读性其次把这种机械性的逻辑交给人来写,一方面浪费人力与时间,另一方面也不可靠,万一忘了呢

而 rxjs 就零成本的解决了这些问题:

useEffect(() => {  const subscription = dslInput$.pipe(    switchMap(dsl => getResultId(dsl)),    tap(resultId => history.push(`?resultId=${resultId}`)),    switchMap(resultId => polling(resultId)),  ).subscribe(result => setState(result));  return () => subscription.unsubscribe()}, [])

当组件卸载,调用 subscription.unsubscribe(),副作用则会立刻停止,不再执行之后的逻辑。当然还有其他优点,不过不是 rxjs 特有的优点,这里就不展开了。

缺点 - 函数式/声明式

个人认为 rxjs 没有什么特别大的缺点,它最大的缺点就是函数式/声明式的通病,就是不方便处理生命周期长的变量。

举个例子,比如说你要给上面的代码加上埋点,看看这些请求一共耗时多少,于是代码就膨胀成了这样:

dslInput$.pipe(  map(dsl => ({dsl, startTime: new Date()}))  switchMap(async ({dsl, startTime}) => ({resultId: await getResultId(dsl), startTime})),  tap(({resultId}) => history.push(`?resultId=${resultId}`)),  switchMap(async ({resultId, startTime}) => ({result: await polling(resultId), startTime})),  tap(({startTime}) => Tracker.collect(Date.now() - startTime.getTime())),).subscribe(({result}) => setState(result));

啊这,好乱啊,真心看不下去。

如果有大佬有解决方案,希望能带带我。

其实 Promise 本身也算是属于函数式,所以它也有这个问题,那么它是怎么解决的呢?

升级成了 async/await 的语法糖🍬。

基本概念

我在 Rxjs TENET 问题解析 中已经写过一个最简单的 Subject 了,没有什么 magic:

class Subject<T> {  private fns: Array<(value: T) => void> = []  public next(value: T) {    this.fns.forEach(fn => fn(value))  }  public subscribe(fn: (value: T) => void) {    this.fns.push(fn)    return () => {      this.fns = this.fns.filter(x => x !== fn)    }  }}

副作用管理也没有什么 magic,它依赖于第4行,当没有任何 subscription 时,便不会调用任何函数,即逻辑不会继续往下执行。

Observable 就是没有 next 的 Subject。通常是由单一的数据源进行控制,一般来说,不需要开发者自己去调用构造函数,最常见的是来自于 pipe 的返回值。

ReplaySubject 内部有一个保存 value 的数组,在第4行前会记录一下当前的 value,每当有一个新的 Subscription,会把最近 n 项的 value 重播给订阅方。

BehaviorSubject 相当于 n 为 1 的 ReplaySubject。因为比 ReplaySubject 常用,所以单独成为了一个类。我还真没怎么用过 ReplaySubject

pipe 用于连接 Observable,一般主要逻辑都会放在其中执行。一般来说,其中逻辑是否执行有两个必要条件:

输出流在 pipe 之后产生值BehaviorSubject 和 ReplaySubject 在有新的 subscription 的时候会发出当前值,所以作为输入时必产生值有订阅(subscribe)方例外:shareReplay

pipe 的原理是链式(reduce)调用 innerSubscribe 函数,和 subscribe 的区别在于,不会触发副作用的执行。

对于输入流的每次 next 调用,pipe 中逻辑的默认执行次数为订阅方的数量。在绝大多数情况下,不希望逻辑执行多遍,于是需要使用多播。很多文档把多播说的非常复杂,个人觉得没有必要,懂这两个 operator 就行:shareshareReplay。把这两个 operator 放在 pipe 的最后一个位置,即可保证前面的逻辑执行次数与订阅方数量无关。不同的是,shareReplay 不管有没有订阅方,都会执行逻辑,而 share 必须存在第一个订阅方之后才开始执行逻辑。

使用 shareReplay 的又叫做热流,其他叫做冷流。在有订阅时,冷流开始执行逻辑,在无订阅时,冷流终止逻辑,冷流具有 rxjs 的核心优势:副作用管理。热流与是否订阅无关,在没有订阅时,也不会终止逻辑。因此,个人很少把 shareReplay 与异步结合使用。

参考资料:可以深入了解[1]

引用链接

[1] 可以深入了解: https://blog.thoughtram.io/angular/2016/06/16/cold-vs-hot-observables.html#hot-vs-cold-observables

综上,使用 rxjs 包装逻辑的基本结构如下:

const input$ = new Subject();const output$ = new BehaviorSubject(initValue);const subscription = input$.pipe(  operator1(),  operator2(),).subscribe(output => output$.next(output));

约定:在 UI 层消费数据时,input$ 结尾的流仅可使用 next;output$ 结尾的流仅可使用 subscribe一般消费 output$ 使用 hook:const output = useSubscribable(output$)记得把 subscription 传出去,在组件卸载时 unscribable

有同学问了,为什么不像这么写:

const input$ = new Subject();const output$ = input$.pipe(  operator1(),  operator2(),);

第二种写法需要考虑多播。当冷流、热流混搭,最后到底什么执行了,什么没执行;或者什么副作用能被清理,什么无法清理,就乱掉了。而第一种写法根本不用考虑复杂的多播。You may not need broadcast.md。在第二种写法中,output$ 的类型是 Observable当组件在数据发出后订阅的话,会丢失当前的数据。没有初始值,调用时需要 useSubscribable(output$, initValue),初始值散落在各个 UI 组件里,降低了可维护性。startsWith 可以部分解决这个问题。没有 next 方法,当需求变得复杂时,通常需要对 output$ 进行处理,这时候又会改造成第一种写法。

一句话:第一种写法就不用考虑乱七八糟的概念问题

值得注意的是,一个流 error 或者 complete 后,无论如何订阅方都不会再收到新值了,并且不再执行任何逻辑。

input$.pipe(tap(() => throw new Error())).subscribe(x => {  // never reach here})

所以需要自己注意错误处理,在我们项目中,封装了一些自定义 operator 将异步返回值包装为:

loading: booleanerror?: Errordata?: Tinput?: any

从而防止因为出现错误,而导致流异常终止,无法继续执行的问题。

Rxjs 中有个特殊的 ObservableNEVER。它从不发出任何值,常用于提前终止逻辑,类似于普通函数中的 return

input$.pipe(  switchMap(x => {    if(!x) {      return NEVER    }    return x  }),  operator1()).subscribe(x => {  // x will never be falsy})

Operator

按好用度排序:

switchMap: 超好用的 op。在 subscribe 下个流前,自动 unscbscribe 上个流,从而完美解决竞速问题。和异步有关的用它就对了。debounce:一般个人喜欢与 pair 之类的 op 结合使用,通过判断哪个值改变从而决定 debounce 多少时间。distinctUntilChanged:浅比较,如果两个值相等则不发出新值。一般传入深比较函数来使用。withLatestFrom:获得目标流的最新值而不触发副作用。和 combineLatest 一样的是,如果某个流没有值,则会卡住,这个挺坑的。不同的是,combineLatest 参数中的流发出新值不会执行 pipe 中的逻辑。exhaustMap。在当前流 complete 前,忽视其他流。一般用于和下载功能结合。值得注意的是,需要记得把上一个流 complete,否则下一次逻辑永远不会执行。编写自定义 operator 时,一定要思考如何取消副作用。个人建议组合已有 operator 来编写。

赋能业务 - 以 form-result 为例

接下来看看 rxjs 是如何帮助业务快速迭代的。在我们的业务中,最常见的是查询分析页,由查询条件与结果展示两块组成,这样的结构我称之为 form-result。

但是不管是怎样的界面,都可以划分为如下几层:

一个业务组件通常结构如下:

const { input$, output$ } = useXxx(); // 从 context 中取数据const handleClick = useCallback(() => input$.next(someValue)); // 使用 input$const output = useSubscrible(output$); // 使用output$,大部分情况下为 BehaviorSubject,此时不需要初始值return <div onClick={handleClick}>{output}</div> // 渲染界面

所有业务组件最外层是一个 Provider

const value = useXxxProvider();return <XxxContext.Provider value={value}>{children}</XxxContext.Provider>

几乎所有的业务逻辑都在 useXxxProvider 里,useXxxProvider 会调用 createXxx 去生成各个子业务的逻辑,最后把所有逻辑进行 merge 并返回。

function useXxxProvider() {    const { observables, subscriptions } = useMemo(() => {        // 创建 form 有关的流        const {             observables: formObservables,             subscriptions: formSubscriptions        } = createForm();        // 创建 result 有关的流        const {            observables: resultObservables,            subscriptions: resultSubscriptions        } = createResult(formObservables.formOutput$);        // 把流和订阅返回        return {            observables: {                ...formObservables,                ...resultObservables            },            subscriptions: [...formSubscriptions, ...resultSubscriptions]        }    }, [])     // 一般没有任何依赖项,如有,需保证不变,或以 ref 形式传入    // 在组件卸载时销毁订阅    useEffect(() => {        return () => subscriptions.forEach(sub => sub.unsubscribe())    }, []);    // 返回生成的流    return observables;}

其中,每一小块业务逻辑都被包装在 createXxx 里,其基本结构如下:

1. 创建有关的流,一般分为 input$ 和 output$2. 使用 pipe 连接两条流,包装主要业务逻辑3. 把流与订阅返回

function createXxx() {    // 创建有关的 Subject    const someSubject$ = new Subject();    // 连接流与业务逻辑    const subscription = someSubject$.pipe(...).subscribe()    // 返回流与订阅    return {        observables: { someSubject$ }        subscriptions: [subscription]    }}

具体到我们的业务上,其中表单逻辑的代码如下:

1. 输入流的类型为 Partial<T>,因为在表单 onChange 的时候,通常不会关心所有表单的值,大部分情况下只要把自己的值调用 formInput$.next() 就行了2. 把输入流中的值与原值进行 merge,并根据实际业务添加参数校验与表单联动的逻辑后,就形成了输出流

function createForm() {    const formInput$ = new Subject<Pratial<Form>>();    const formOutput$ = new BehaviorSubject<Form>(INIT_FORM);    const subscription = formInput$.pipe(        withLatestFrom(formOutput$),        map(([input, output]) => {            // 如有其他的参数校验、表单联动等逻辑,可以加在这里            const extra = {}            if('someThing' in input) {                extra.otherThing = someValue            }            return {...output, ...input, ...extra}        })    ).subscribe(output => formOutput$.next(output));    return {        observables: {            formInput$,            formOutput$        },        subscriptions: [subscription]    }}

结果业务的代码如下:

1. 根据实际业务场景生成请求流,如遇请求参数与 UI state 差别很大的,可以在这里使用 map 进行转换2. 当产生请求流时,发送请求,生成结果流3. 把结果流和有关订阅返回

function createResult(formOutput$){    // 情况一:无查询按钮,修改表单则自动发请求    const request$ = formOutput$.pipe(        pair(),// 自定义op,第一次返回[undefined, T],其他和pairwise一致        debounce(([prev])=> timer(prev?0:800)),// 首次立刻发请求        distinctUntilChanged(equals) // 根据业务需要,可以增加这个,注意的是不传入深比较函数则为浅比较    );    // 情况二:点击查询按钮再发请求    const requestButtonInput$ = new Subject();    const request$ = requestButtonInput$.pipe(        withLatestFrom(formOutput$),        map(([,form]) => form)    );    // 情况三:两者兼具    const requestButtonInput$ = new Subject();    const request$ = merge(formOutput$.pipe(        pair(),// 自定义op,第一次返回[undefined, T],其他和pairwise一致        debounce(([prev])=> timer(prev?0:800))// 首次立刻发请求    ), requestButtonInput$.pipe(        withLatestForm(formOutput$),        map(([, form]) => form)    ));    // 准备完毕请求流,开始发请求    const resultOutput$ = new BehaviorSubject(INIT_RESULT)    const subscription = request$.pipe(        switchMap(request => getResult(request))    ).subscribe(output => resultOutput$.next(output));    return {        observables: {            requestButtonInput$,            resultOutput$        },        subscriptions: [subscription]    }}

useSubscribable

附上 useSubscribable代码:

import useRefWrapper from 'hooks/useRefWrapper'import { useState, useEffect } from 'react'import { Subscribable, BehaviorSubject } from 'rxjs'function getInitValue<T, R>(  subscribable: Subscribable<T>,  selector: (value: T) => R,  initValue?: R) {  if (initValue !== undefined) {    return initValue  }  if (subscribable instanceof BehaviorSubject) {    return selector((subscribable as any)._value)  }  return undefined}export default function useSubscribable<T>(subscribable: BehaviorSubject<T>): Texport default function useSubscribable<T, R>(  subscribable: BehaviorSubject<T>,  selector: (value: T) => R): Rexport default function useSubscribable<T>(  subscribable: Subscribable<T>,  initValue: T): Texport default function useSubscribable<T, R>(  subscribable: Subscribable<T>,  selector: (value: T) => R,  initValue: R): Rexport default function useSubscribable<T>(  subscribable: Subscribable<T>): T | undefinedexport default function useSubscribable<T, R>(  subscribable: Subscribable<T>,  selector: (value: T) => R): R | undefinedexport default function useSubscribable<T, R = T>(  subscribable: Subscribable<T>,  selectorOrInitValue?: (value: T) => R,  initValue?: R): R | undefined {  const innerInitValue =    typeof selectorOrInitValue === 'function' ? initValue : selectorOrInitValue  const innerSelector =    typeof selectorOrInitValue === 'function'      ? selectorOrInitValue      : (x: T) => (x as unknown) as R  const innerSelectorRef = useRefWrapper(innerSelector)  const [state, setState] = useState(() =>    getInitValue(subscribable, innerSelector, innerInitValue)  )  useEffect(() => {    const subscription = subscribable.subscribe((x) =>      setState(innerSelectorRef.current(x))    )    return () => subscription.unsubscribe()  }, [innerSelectorRef, subscribable])  return state}

总结

rxjs 的优势是副作用管理如果使用 input$、output$,然后再使用 pipe 连接的形式,就不需要考虑多播、冷热等复杂概念也可轻松上手注意错误处理

引用链接

[1] 竞速问题: https://en.wikipedia.org/wiki/Race_condition

[2] 可以深入了解: https://blog.thoughtram.io/angular/2016/06/16/cold-vs-hot-observables.html#hot-vs-cold-observables

浏览 39
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报