RxJS 是什么?用一句话类概括就是:RxJS 是用于 JavaScript 的ReactiveX 库,它是一个可以通过观察序列变化,掌握非同步事件处理的利器。
在学习 RxJS 之前我们首先需要了解以下的前提知识:
Reactive Extensions(Rx)
是一个使用可观察序列和 LINQ 样式查询运算符组成异步和基于事件的程序的库。
ReactiveX
是 observer 模式、lterator 模式和函数式编程的最佳思想的组合。
Reactive Programming(反应式程序设计)
是用异步数据流编程。
Streams
流——便宜又无处不在,什么都可以是一股流。比如变量、用户输入、属性、缓存、数据结构都可以被称为流。
举一个程序化的例子来说,假如现在有一个观察者订阅了 click 事件,当有用户在页面上点击鼠标时,每一次点击都会触发一次 click 事件,并发送 click 事件资料(Mouse Event)给观察者,当用户连续点击多次,观察者就会收到一系列的事件资料,我们把这一系列的事件资料就叫做一个流(Stream)。
再举一个非同步事件的例子,我们通过某非同步事件,向服务端发送 Ajax 请求,然后从服务端得到响应数据,在这个过程中,可能会持续地发出多个 Ajax 请求,并且响应数据会不同时间、不同顺序地返回客户端。类似这样的过程,就是串流的概念。
在非同步事件中,由于事件响应的时间不确定、顺序不确定,JavaScript 在处理非同步事件时,会有一定的困难,很容易出现 bug,而 RxJS 可以轻松帮我们处理这样的非同步事件。
总结来说,使用 RxJS 的目的就是,有效管理非同步环境下的事件资料。
我们知道 RxJS 是一组可用来处理非同步或事件的 JavaScript 函式库,那么非同步或事件的种类有哪些呢?
非同步
Ajax/XHR/fetch API
Service Worker/Node Stream
setTimeout/setInterval
Promise
事件
各种 Dom 事件(click/dbclick/keyup/mousemove……)
CSS 动画事件(CSS3 transitionEnd event)
HTML5 Geolocation/WebSockets……
Observable 可观察的物件
代表一组未来即将产生的事件资料(被观察的物件)。从上面举的例子中来说,click 事件就是一个 Observable,也就是说当 click 事件被触发的时候,它会发出一些事件资料(Stream)给观察者。
Observer 观察者
代表一个用来接收观察结果的物件(Observer 收到的就是事件资料,也就是 Stream)。
Subscription 订阅
表示 Observable 的执行,主要用于取消订阅。
Operator 运算符
用来处理一系列的事件资料集合,可以理解成 Operator 会通过一系列的过滤、筛选等操作,对 Observable 进行处理,最终将事件资料送给观察者。
常见的运算符包括:map/filter/concat/flatMap……
Subject 主体物件
主要用来广播收到的事件资料给多位 Observer。
Scheduler 排程控制器
用来集中管理和调度多重事件之间的资料,以控制事件并发情况。
上面这段代码的意思是,通过 RxJS 去使用建立运算符(Creation)的 API(interval) 建立一个 Observable,这个 Observable 会每 0.5s 发出一个数字(从 0 开始)。然后通过 pipe(中文是水管的意思,可以结合流的概念来理解)中的过滤运算符 take ,只发送前四笔数据给观察者。观察者(console.log)通过 subscribe 去订阅到 Observable 发送的资料。
上面的代码运行结果如下:
在上面提到的核心概念中,有提到 Subscription 这一概念,在这里可以举个例子帮助我们加深理解。现在我们将上面代码中通过 take 方法只取前 4 笔数据,改为取前 40 笔数据,并且将观察结果存在 subs 中,这里的 subs 也就是我们所说的 Subscription。最后在通过 subs 调用 unsubscribe 方法取消观察者和可观察物件的订阅关系,也即不再接受可观察物件发来的订阅资料。
上边那段代码还可以利用 ES5 解构赋值来精简一下写法:
也可以通过 ES Module 来精简代码:
上面是 RxJS 基本的运作过程,那代码怎么写呢?
建立可观察的 Observable 物件
var clicks$ = rxjs.fromEvent(document,'click'); //在 RxJS 的命名方式中,通常以 $ 结尾的变量都代表这个变量是可观察物件
建立观察者 Observer 物件
var observer = { next:(x) => console.log(x) }; //观察者可以放函数进去,也可以放一个物件进去,这个物件有三种属性,其中一种属性是 next,next 会取出下一笔资料
建立订阅物件(订阅 Observable 物件,并传入 Observer 观察者物件)
var subs$ = clicks$.subscribe(observer);
取消订阅 Subscription 物件
subs$.unsubscribe();
上面的代码运行结果就是,每当在页面 document 范围内点击鼠标时,控制台就会打印一次 MouseEvent 事件。这里需要注意一点的是,必须有观察者订阅 Observable,Observable 发生的事件才会被触发,没有订阅的观察者,Observable 就不会产生任何事件资料。
上面是语义清晰的一种写法,下面是一种简易的写法:
建立可观察的 Observable 物件
var clicks$ = rxjs.fromEvent(document,'click');
建立订阅物件(订阅 Observable 物件,并自动建立观察者物件)
var subs$ = clicks$.subscribe((x) => console.log(x));
取消订阅 Subscription 物件
subs$.unsubscribe();
现在我想要订阅 click 事件,并且返回 ClientX 小于 1000 的所有事件,代码如下:
建立可观察的 Observable 物件
var clicks$ = rxjs.fromEvent(document,'click');
套用 filter 运算符
const { filter } = rxjs.operators;
clicks$ = clicks$.pipe(filter(x => x.clientX <1000));
建立订阅物件(订阅 Observable 物件,并自动建立观察者物件)
var subs$ = clicks$.subscribe((x) => console.log(x));
取消订阅 Subscription 物件
subs$.unsubscribe();
假如现在我只需要返回 ClientX 小于 1000 的所有事件中的前四笔数据,我们需要对运算符部分的操作进行更改:
var subs = clicks$.pipe(filter(x => x.clientX <1000),take(4)
).subscribe((x) => console.log(x));
这里需要注意的是,只有符合条件的数据流能够被打印出来,并且一但 take 运算符一旦收集够四个事件之后,就会自动执行取消订阅的操作。
建立主体物件(Subject,之后要靠这个主体物件进行广播)
var subject = new rxjs.Subject();
建立可观察的 Observable 物件
var clicks$ = rxjs.fromEvent(document,'click');
设定最多取得两个事件资料就将 Observable 物件设为完成
clicks$ = clicks$.pipe(take(2));
设定将 clicks$ 全部交由 subject 主体物件进行广播
clicks$.subscribe(subject);
最后再由 subject 去建立 Observer 观察者物件
var subs1$ = subject.subscribe((x) => console.log(x.clientX));
var subs2$ = subject.subscribe((x) => console.log(x.clientX));
取消订阅 Subscription 物件
subs1$.unsubscribe();
subs2$.unsubscribe();
通过 Subscription 事件资料只需要发送一次给 subject 就完成了,而不需要分别发送两次给 subs1$ 和 subs2$。
弹珠图可以很好的描述 RxJS 中操作运算符的运算过程。
take
map
concat
在这个网站可以看到更多弹珠图。
在这个网站还可以看到以动画呈现的弹珠图
RxJS 的运算符众多,有时候不知道选择哪一个更有效,这时候可以在这个网站通过互动式的问答,选择出合适的运算符。
运算符的分类:
本篇文章整理自b站视频一小时 RxJS 入门