博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RxJS速成 (下)
阅读量:5943 次
发布时间:2019-06-19

本文共 6963 字,大约阅读时间需要 23 分钟。

上一部分: 

Subject

Subject比较特殊, 它即是Observable又是Observer.

作为Observable, Subject是比较特殊的, 它可以对多个Observer进行广播, 而普通的Observable只能单播, 它有点像EventEmitters(事件发射器), 维护着多个注册的Listeners.

作为Observable, 你可以去订阅它, 提供一个Observer就会正常的收到推送的值. 从Observer的角度是无法分辨出这个Observable是单播的还是一个Subject.

从Subject内部来讲, subscribe动作并没有调用一个新的执行来传递值, 它只是把Observer注册到一个列表里, 就像其他库的AddListener一样.

作为Observer, 它是一个拥有next(), error(), complete()方法的对象, 调用next(value)就会为Subject提供一个新的值, 然后就会多播到注册到这个Subject的Observers.

 

例子 subject.ts:

import { Subject } from "rxjs/Subject";const subject = new Subject();const subscriber1 = subject.subscribe({    next: (v) => console.log(`observer1: ${v}`)});const subscriber2 = subject.subscribe({    next: (v) => console.log(`observer2: ${v}`)});subject.next(1);subscriber2.unsubscribe();subject.next(2);const subscriber3 = subject.subscribe({    next: (v) => console.log(`observer3: ${v}`)});subject.next(3);

 

订阅者1,2从开始就订阅了subject. 然后subject推送值1的时候, 它们都收到了. 

然后订阅者2, 取消了订阅, 随后subject推送值2, 只有订阅者1收到了.

后来订阅者3也订阅了subject, 然后subject推送了3, 订阅者1,3都收到了这个值.

 

下面是一个angular 5的例子:

app.component.html:

从Subject共享Observable到多个Subscribers

Subscriber to input events got {
{inputValue}}
Subscriber to keyup events got {
{keyValue}}

 

app.component.ts:

import { Component } from '@angular/core';import { Subject } from 'rxjs/Subject';import 'rxjs/add/operator/filter';import 'rxjs/add/operator/map';@Component({  selector: 'app-root',  templateUrl: './app.component.html',  styleUrls: ['./app.component.css']})export class AppComponent {  title = 'app';  keyValue: string;  inputValue: string;  mySubject: Subject
= new Subject(); constructor() { // subscriber 1 this.mySubject.filter(({ type }) => type === 'keyup') .map(e => (
e).key) .subscribe(value => this.keyValue = value); // subscriber 2 this.mySubject.filter(({ type }) => type === 'input') .map(e => (
e.target).value) .subscribe(value => this.inputValue = value); }}

input和keyup动作都把event推送到mySubject, 然后mySubject把值推送给订阅者, 订阅者1通过过滤和映射它只处理keyup类型的事件, 而订阅者2只处理input事件.

效果:

 

BehaviorSubject

BehaviorSubject 是Subject的一个变种, 它有一个当前值的概念, 它会把它上一次发送给订阅者值保存起来, 一旦有新的Observer进行了订阅, 那这个Observer马上就会从BehaviorSubject收到这个当前值.

也可以这样理解BehaviorSubject的特点:

  • 它代表一个随时间变化的值, 例如, 生日的流就是Subject, 而一个人的年龄流就是BehaviorSubject.
  • 每个订阅者都会从BehaviorSubject那里得到它推送出来的初始值和最新的值.
  • 用例: 共享app状态.

例子 behavior-subject.ts:

import { BehaviorSubject } from "rxjs/BehaviorSubject";const subject = new BehaviorSubject(0);subject.subscribe({    next: v => console.log(`Observer1: ${v}`)});subject.next(1);subject.next(2);subject.subscribe({    next: v => console.log(`Observer2: ${v}`)});subject.next(3);

 

效果:

 

常用Operators:

concat 

concat: 按顺序合并observables. 只会在前一个observable结束之后才会订阅下一个observable.

它适合用于顺序处理, 例如http请求.

例子: 

import { Observable } from "rxjs/Observable";import 'rxjs/add/observable/timer';import 'rxjs/add/operator/mapTo';import 'rxjs/add/observable/concat';let firstReq = Observable.timer(3000).mapTo('First Response');let secondReq = Observable.timer(1000).mapTo('Second Response');Observable.concat(firstReq, secondReq)    .subscribe(res => console.log(res));

 

效果:

 

merge

把多个输入的observable交错的混合成一个observable, 不按顺序.

merge实际上是订阅了每个输入的observable, 它只是把输入的observable的值不带任何转换的发送给输出的Observable. 只有当所有输入的observable都结束了, 输出的observable才会结束. 任何在输入observable传递来的错误都会立即发射到输出的observable, 也就是把整个流都杀死了 .

例子:

import { Observable } from "rxjs/Observable";import 'rxjs/add/observable/timer';import 'rxjs/add/operator/mapTo';import 'rxjs/add/observable/merge';let firstReq = Observable.timer(3000).mapTo('First Response');let secondReq = Observable.timer(1000).mapTo('Second Response');Observable.merge(firstReq, secondReq)    .subscribe(res => console.log(res));

 

效果:

 

mergeMap (原来叫flatMap)

mergeMap把每个输入的Observable的值映射成Observable, 然后把它们混合成一个Observable.

mergeMap可以把嵌套的observables拼合成非嵌套的observable.

它有这些好处:

  • 不必编写嵌套的subscribe()
  • 把每个observable发出来的值转换成另一个observable
  • 自动订阅内部的observable并且把它们(可能)交错的合成一排.

这个还是通过例子来理解比较好:

import { Observable } from "rxjs/Observable";import 'rxjs/add/observable/from';import 'rxjs/add/operator/mergeMap';function getData() {    const students = Observable.from([        { name: 'Dave', age: 17 },        { name: 'Nick', age: 18 },        { name: 'Lee', age: 15 }    ]);    const teachers = Observable.from([        { name: 'Miss Wan', age: 28 },        { name: 'Mrs Wang', age: 31 },    ]);    return Observable.create(        observer => {            observer.next(students);            observer.next(teachers);        }    );}getData()    .mergeMap(persons => persons)    .subscribe(        p => console.log(`Subscriber got ${p.name} - ${p.age}`)    );

 

效果:

 

switchMap

switchMap把每个值都映射成Observable, 然后使用switch把这些内部的Observables合并成一个.

switchMap有一部分很想mergeMap, 但也仅仅是一部分像而已.

因为它还具有取消的效果, 每次发射的时候, 前一个内部的observable会被取消, 下一个observable会被订阅. 可以把这个理解为切换到一个新的observable上了.

 

这个还是看marble图比较好理解:

 

例子: 

// 立即发出值, 然后每5秒发出值const source = Rx.Observable.timer(0, 5000);// 当 source 发出值时切换到新的内部 observable,发出新的内部 observable 所发出的值const example = source.switchMap(() => Rx.Observable.interval(500));// 输出: 0,1,2,3,4,5,6,7,8,9...0,1,2,3,4,5,6,7,8const subscribe = example.subscribe(val => console.log(val));

更好的例子是: 网速比较慢的时候, 客户端发送了多次重复的请求, 如果前一次请求在2秒内没有返回的话, 那么就取消前一次请求, 不再需要前一次请求的结果了, 这里就应该使用debounceTime配合switchMap.

 

mergeMap vs switchMap的例子

mergeMap:

import { Observable } from "rxjs/Observable";import 'rxjs/add/observable/interval';import 'rxjs/add/operator/take';import 'rxjs/add/operator/map';import 'rxjs/add/operator/mergeMap';import 'rxjs/add/operator/switchMap';const outer = Observable.interval(1000).take(2);const combined = outer.mergeMap(x => {    return Observable.interval(400)        .take(3)        .map(y => `outer ${x}: inner ${y}`);});combined.subscribe(res => console.log(`result ${res}`));

 

效果:

switchMap:

import { Observable } from "rxjs/Observable";import 'rxjs/add/observable/interval';import 'rxjs/add/operator/take';import 'rxjs/add/operator/map';import 'rxjs/add/operator/mergeMap';import 'rxjs/add/operator/switchMap';const outer = Observable.interval(1000).take(2);const combined = outer.switchMap(x => {    return Observable.interval(400)        .take(3)        .map(y => `outer ${x}: inner ${y}`);});combined.subscribe(res => console.log(`result ${res}`));

 

 

zip

zip操作符也会合并多个输入的observables成为一个observable. 多个输入的observable的值, 按顺序, 按索引进行合并, 如果某一个observable在该索引上的值还没有发射值, 那么会等它, 直到所有的输入observables在该索引位置上的值都发射出来, 输出的observable才会发射该索引的值.

例子:

import { Observable } from "rxjs/Observable";import 'rxjs/add/observable/of';import 'rxjs/add/observable/zip';let age$ = Observable.of
(27, 25, 29);let name$ = Observable.of
('Foo', 'Bar', 'Beer');let isDev$ = Observable.of
(true, true, false);Observable .zip(age$, name$, isDev$, (age: number, name: string, isDev: boolean) => ({ age, name, isDev })) .subscribe(x => console.log(x));

 

效果:

 

就不往下写了, 其实看文档就行, 最重要的还是上一部分.

下面是我的关于ASP.NET Core Web API相关技术的公众号--草根专栏:

转载地址:http://dnzxx.baihongyu.com/

你可能感兴趣的文章
sublime Text 2 配置以及 Python环境搭建
查看>>
[2778]小明的花费预算 (二分查找)SDUT
查看>>
Runnable接口介绍(中文文档)
查看>>
URAL 1353 Milliard Vasya's Function DP
查看>>
速读《构建之法:现代软件工程》提问
查看>>
在iOS微信浏览器中自动播放HTML5 audio(音乐)的2种正确方式
查看>>
Android onclicklistener中使用外部类变量时为什么需要final修饰【转】
查看>>
Matlab2012a下配置LibSVM—3.18
查看>>
SpringBoot 搭建
查看>>
中信国健临床通讯2011年7月期目录
查看>>
iOS开发之设计模式篇
查看>>
电信运营商 IT 系统介绍
查看>>
简单粗暴的“Debug模式”
查看>>
汇编实验一
查看>>
Python 面试总结
查看>>
ansible register when: result | succeeded when: item.rc != 0
查看>>
有赞MySQL自动化运维之路—ZanDB
查看>>
ssm中shiro的使用
查看>>
10个操作数的随机四则运算(二)
查看>>
django中聚合aggregate和annotate GROUP BY的使用方法
查看>>