RxJS学习笔记(二)创建类操作符

操作符

简单来讲,操作符就是解决某个具体应用问题的模式,在针对不同问题时,需要使用不同的操作符,如合并、过滤、异常捕获等

我的理解就是,如果把一个Observalbe比做一个数组,那么操作符就是数组的方法,可以使用mapfilterforEach来对这个数组进行操作

操作符大致分为以下几类

  1. 创建类
  2. 转化类
  3. 过滤类
  4. 合并类
  5. 多播类
  6. 错误处理类
  7. 辅助工具类
  8. 条件分支类
  9. 数学和合计类

创建类

如果使用new Observable这样的方式来创建,定制性很高,如果只是使用一些简单的数据来创建Observable,那么RxJS中提供了专门用来创建Observable的操作符

部分操作符接受一个可选参数`scheduler`,不过我暂时没有学习,所以先忽略掉

of

of操作符可以指定固定的数据来创建Observable对象

import { of } from "rxjs";

const source$ = of(1, 2, 3);
source$.subscribe(value => console.log(value));
// 1	2	 3

range

of是指定数据,range则是指定一个范围

import { range } from "rxjs";

const source$ = range(5, 10);
source$.subscribe(value => console.log(value));
// 5	6  7  8  ...  14

range函数接受 2 个可选参数,第一个参数为起始值,默认值为 0,第二个参数为个数,默认值为undefined

后续的每一个数都会加 1,向上述例子,从 5 开始,每次加 1,知道个数达到 10 个,也就是到达数字 14 时结束

generate

range可以连续的创建数字,如果想要每次输出相隔两个数字呢?

generate就像是RxJS中的for循环

import { generate } from "rxjs";

const source$ = generate(0, x => x < 10, x => x + 1);
source$.subscribe(value => console.log(value));
// 0 1 2 3 4 5 6 7 8 9

generate可接受 4 个参数,第一个参数为初始值,第二个参数为判断条件,第三个参数为值的递增,第四个参数可以是结果的处理函数

const source$ = generate(0, x => x < 10, x => x + 1, x => x * 10);
source$.subscribe(value => console.log(value));
// 0 10 20 30 40 50 60 70 80 90

增加第四个参数后,输入会根据处理函数的返回值

相较于for循环来看,RxJS中的generate更纯了,函数式,比如for循环,每次都会对条件变量做处理,所以不能用const

repeat

repeat可以对上游的数据进行重复的输出,如果想要一个1,2,3,1,2,3,1,2,3的数据,用of当然是可以做,但是用repeat更加清晰

import { of } from "rxjs";
import { repeat } from "rxjs/operators";

const source$ = of(1, 2, 3).pipe(repeat(3));
source$.subscribe(value => console.log(value));
// 1 2 3 1 2 3 1 2 3

重复 3 次of(1,2,3)的数据,这里调用了一个函数叫pipe,在RxJS v5中,这里的调用方式应该是

of(1, 2, 3).repeat(3)

就是链式操作,但在RxJS v6中,需要使用pipe操作符来,pipe就是一个管道,数据从管道进入,进过处理,再从管道出去

EMPTY、NEVER、throwError

EMPTY是直接返回一个完成的Observable,会直接调用观察者的completed函数

NEVER则是返回一个永不完结的Observable,不会调用观察者的任何函数

throwError是抛出一个错误,如果观察者有error函数则调用,否则抛出一个错误

import { EMPTY, NEVER, throwError } from "rxjs";

const source1$ = EMPTY;
const source2$ = NEVER;
const source3$ = throwError("error");
source1$.subscribe({
    complete() {
        console.log("completed");
    },
});
source2$.subscribe(value => console.log(value));
source3$.subscribe({
    error(err) {
        console.log(err);
    },
});
// completed
// 什么都没有
// error

interval 和 timer

上面的操作符,都是产生同步数据的,最简单的产生异步数据的方法就是intervaltimer,从名字可以看出,它们的功能和setIntervalsetTimeout类似

interval

import { interval } from "rxjs";

const source$ = interval(1000);
source$.subscribe(value => console.log(value));
// 0 1 2 3 4 ...

interval可接受1个参数,如上每隔1000ms`输出依次,从 0 开始,每次加 1

1 秒后输出 0,再一秒后输出 1,再一秒后输出 2...永不完结

timer

import { timer } from "rxjs";

const source$ = timer(1000);
source$.subscribe(value => console.log(value));
// 0

timer可接受 2 个参数,不同的是第一个参数可以为数字,也可以为Date对象,如果是数字,则代表毫秒,等待相应时间后输出 0

上例中,一秒后输出 0,完结

第二个参数则是周期,如果有第二个参数,timer也能用不完结

const source$ = timer(1000, 1000);
source$.subscribe(value => console.log(value));
// 0 1 2 3 4 ...

这样的效果就和interval一样了

from

from可以把一个像Observable的参数,转为真正的Observable对象

一个数组像Observable,一个伪数组对象也像,字符串也像,Promise也,都可以通过from转为真正的Observable对象

数组

import { from } from "rxjs";

const source$ = from([1, 2, 3]);
source$.subscribe(value => console.log(value));
// 1 2 3

伪数组

const source$ = from({
    0: 0,
    1: 1,
    2: 2,
    length: 3,
});
source$.subscribe(value => console.log(value));
// 1 2 3

字符串

const source$ = from("hello RxJS");
source$.subscribe(value => console.log(value));
// h e l l o   R x J S

Promise

const source$ = from(Promise.resolve("resolve"));
source$.subscribe(value => console.log(value));
// 输出resolve

这里的Promiseresolve的,如果是reject,则会触发抛出错误或调用观察者的error函数

const source$ = from(Promise.reject("reject"));
source$.subscribe(value => console.log(value));
// 抛出错误reject

fromEvent

fromEvent可以把网页中的DOM事件转化为Observalbe对象

import { fromEvent } from "rxjs";

const source$ = fromEvent(document, "click");
source$.subscribe(value => console.log(value));
// MouseEvent

fromEvent可以接受 3 个参数,第一个参数为事件的DOM对象,第二个参数为事件的名称,第三个参数是EventListenerOptions

const source$ = fromEvent(document, "click", { once: true });
source$.subscribe(value => console.log(value));

如这个事件因为传入了第三个参数,只会触发一次

repeatWhen

repeat能够重复订阅上游的Observable,但是不能控制订阅时间,如等待 2 秒再重新订阅

repeatWhen就可以满足上面的需求

import { of } from "rxjs";
import { delay, repeatWhen } from "rxjs/operators";

const source$ = of(1, 2, 3).pipe(
    repeatWhen(notifications => notifications.pipe(delay(2000)))
);
source$.subscribe(value => console.log(value));
// 1 2 3 等待2秒 1 2 3

当上游Observable completed的时候,会传给repeatWhen一个notifications参数,这个参数也是Observable对象

defer

defer可以只在观察中订阅的时候才创建Observable对象,通常用于Observable工厂函数

import { defer, fromEvent, interval } from "rxjs";

const clicksOrInterval = defer(function() {
    return Math.random() > 0.5 ? fromEvent(document, "click") : interval(1000);
});
clicksOrInterval.subscribe(x => console.log(x));

刷新页面后,可能输出MouseEvent,也可能输出interval的数字