RjxavaSample练习

Rxjava sample 项目介绍#

引入依赖#

基础依赖#

依赖后可以使用rxjava操作符

1
2
implementation 'io.reactivex.rxjava2:rxjava:2.2.2'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'

扩展依赖#

网络库,可以返回Observable对象,方便后续操作符操作

1
implementation 'com.amitshekhar.android:rx2-android-networking:1.0.2'

生命周期库,使用后可以将rxjava与Adnroid组件生命周期关联

1
implementation 'com.trello.rxlifecycle4:rxlifecycle:4.0.2'

UI布局组件,使用后按钮可以自适应布局,自动换行

1
implementation 'com.google.android:flexbox:2.0.1'

项目结构#

此项目按目录分为如下多个package,每一个package表示rxjava一类用法,接下来我们一级一级目录来研究这些用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
├─java
│ └─com
│ └─shaunsheep
│ └─rxjavasample
│ ├─observable // 被观察者
│ ├─operators
│ │ ├─arithmetic // 数学运算符
│ │ ├─combine // 结合
│ │ ├─condition // 条件运算符
│ │ ├─connect // 链接
│ │ ├─create // 创建
│ │ ├─filter // 过滤
│ │ ├─model // 模型
│ │ └─transform // 变换
│ ├─scene // 典型场景
│ │ ├─bus // 事件总线
│ │ └─cache // 三级缓存
│ │ ├─controller
│ │ └─model
│ └─schedulers // 调度器

observable#

此部分文件结构

1
2
3
4
5
6
7
8
|-- AsyncSubjectActivity.java // 四种Subject之一
|-- BehaviorSubjectActivity.java // 四种Subject之一
|-- PublishSubjectActivity.java // 四种Subject之一
|-- ReplaySubjectActivity.java // 四种Subject之一
|-- CompletableActivity.java // Completable消息
|-- FlowableActivity.java // Flowable背压
|-- ObservableActivity.java // Observable基础用法
-- SingleActivity.java // Single消息

Observable#

观察者和被观察者:Observable指被观察者,Observer指观察者或订阅者

冷和热的概念:热是指Observable创建完就发射数据,观察者在之后才有可能订阅数据,有可能会错过一些数据;冷是指Observable会一直等待,等待有观察者订阅,才会发射数据,确保观察者能收到整个数据流。

Subject#

既是Observable、也是Observer。

AsyncSubject PublishSubject:只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者 PublishSubject BehaviorSubject:观察者订阅BehaviorSubject时,它开始发射原始Observable最近发射的数据(如果此时还没有收到任何数据,它会发射一个默认值),然后继续发射其它任何来自原始Observable的数据 PublishSubject ReplaySubject:ReplaySubject会发射所有来自原始Observable的数据给观察者,无论它们是何时订阅的。 PublishSubject
Subject类型 会损失已发的数据 损失的特点 先发射数据,后订阅
能否收到之前发射的所有数据
必须onComplete
如果不执行onComplete
就收不到消息
onComplete之后能否收到消息
AsyncSubject 只收到onComplete之前发射的最后一条数据; × ×
BehaviorSubject 只收到订阅之前发射的最近一条数据,和订阅之后的所有数据;无法收到订阅之前所有的数据 × × ×
PublishSubject 只能收到订阅之后发射的数据,无法收到订阅之前的数据 × × ×
ReplaySubject × - × ×

Completab#

不会发射任何消息,只发射完成或失败的结果。

Flowable#

一种特殊的Observable,支持5种背压策略

用途:处理上层线程发射数据流的速度远大于下层线程处理数据流的速度的情况

策略:

1
2
3
4
5
6
7
public enum BackpressureStrategy{
MISSING, //通过create创建的Flowable,需要下游指定背压策略
ERROR, //放入Flowable异步缓存池的数据超限,将抛出MissingBackpressureException
BUFFER, //异步缓存池无大小限制,可无限添加数据,不会抛出异常,但会OOM
DROP, //如果Flowable异步缓存池满了,将抛弃将要添加的数据
LATEST //如果Flowable异步缓存池满了,将抛弃将要添加的数据,但强制添加最后一条数据
}

其他参考本项目其他笔记-rxjava使用总结

Single#

一个特殊的Observable,只能调用onSuccess 或onError ,任何一个消息进入这俩方法之一,订阅关系就终止了

operators#

Observer和Observable只是Rxjava的开始,它们只是观察者模式的基础扩展,目的是为了更好处理数据流。Rxjava强大的地方在于操作符,它拥有回调所有的效率优势,也解决了异步任务中嵌套回调的问题。

此部分文件结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|-- CompositeDisposableActivity.java // CompositeDisposable
|-- OperatorsLauncherActivity.java // 操作符入口
|-- SimpleOperationActivity.java // 入门示例
|-- arithmetic // 算数操作符
| |-- ReduceActivity.java
|-- combine // 组合操作符
| |-- ConcatActivity.java
| |-- MergeActivity.java
| |-- ZipActivity.java
|-- condition // 条件运算操作符
| |-- TakeUtilActivity.java
| |-- TakeWhileActivity.java
|-- connect // 链接操作符
| `-- ReplayActivity.java
|-- create // 创建操作符
| |-- CreateActivity.java
| |-- DeferActivity.java
| |-- IntervalActivity.java
| |-- IntervalRangeActivity.java
| |-- JustActivity.java
| `-- TimerActivity.java
|-- filter // 过滤操作符
| |-- DebounceActivity.java
| |-- DelayActivity.java
| |-- DistinctActivity.java
| |-- FilterActivity.java
| |-- LastActivity.java
| |-- ScanActivity.java
| |-- SkipActivity.java
| |-- TakActivity.java
| `-- ThrottleActivity.java
|-- model // 示例所需的实体类
| |-- NetworkUser.java
| `-- UserBean.java
`-- transform // 转换操作符
|-- BufferActivity.java
|-- FlatMapActivity.java
|-- MapOperationActivity.java
`-- SwitchMapActivity.java

本项目介绍了哪些操作符?

  1. combine 组合类:Concat,Merge,Zip
  2. arithmetic 算数类:Reduce
  3. condition 布尔运算类:TakeUtil、TakeWhile
  4. filter 过滤类:Debounce,Throttle,Delay,Distinct,Filter,Last,Scan,Skip,Take
  5. transform 转换类:Buffer,Map,SwithMap,FlatMap,Scan,GroupBy
  6. creat 创建类:Create,Defer,Interval,Timer,Just,From

下面是Rxjava常用的操作符列表:

加粗的是此项目已实现的。

  1. 创建操作 Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, Timer
  2. 变换操作 Buffer, FlatMap, GroupBy, Map,SwitchMap, Scan和Window
  3. 过滤操作 Debounce,Throttle, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take, TakeLast
  4. 组合操作 And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, Zip
  5. 错误处理 Catch和Retry
  6. 辅助操作 Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn, TimeInterval, Timeout, Timestamp, Using
  7. 条件和布尔操作 All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, TakeWhile
  8. 算术和集合操作 Average, Concat, Count, Max, Min, Reduce, Sum
  9. 转换操作 To
  10. 连接操作 Connect, Publish, RefCount, Replay
  11. 反压操作,用于增加特殊的流程控制策略的操作符

易混淆对比#

时间类#

入参 出参 用途
Timer long Observable 创建一个延时发射的Observable
Delay long Observable 将emitter发射来的数据延迟指定时间再发射
Defer Callable Observable 延迟Observable的创建,被订阅才会进入Callable方法创建Observable
IntervalRange start,count,initialDelay,period Observable 创建一个按固定时间间隔、固定延时时间发射数据的Observable,它发射数据的规律是整数递增,有限个数
Interval initialDelay,period Observable 创建一个固定时间间隔、固定延时时间发射数据的Observable,它发射的规律是整数递增,无限个数

组合1类#

入参 出参 异步发射数据流
Observer是否按发射顺序接收
Observables长度不同
对Observer接收到的数量有影响
影响是什么
Zip 多个Observable Observable 收到数据流数量等于数量最少的Observable的数量
Merge 多个Observable Observable × × 交错接收不同Observable发射来的数据
Concat 多个Observable Observable × -

组合2类#

入参 子函数出参 函数出参 特性
reduce BiFunction<T1, T2, R> R Maybe reduce只会收到所有迭代项的最终处理结果
支持的观察者是MybeObserver,只有onSuccess\onError
scan BiFunction<T1, T2, R> R Observable scan会收到迭代器每两项的处理结果
支持的观察者是Observer,有onNext\onError

组合1类与组合2类的区别

  • 1类是针对Observable或者数据流的
  • 2类只能针对数据流进行合并

转换类#

入参泛型 出参 特性
Map ? super T ? extends R 把对象A变为对象B
FlatMap ? super T ? extends ObservableSource<? extends R> 返回值为Observable
SwithMap ? super T, ? extends ObservableSource<? extends R> 当原始Observable发射新数据,
会停止在这个时间点之前正在处理的Observable

重名类#

入参 入参的出参 出参 特性
Take long - Observable 发射前N项
Last long - Observable 返回最后一项
TakeLast long - Observable 发射最后n项数据
TakeWhile Predicate boolean test方法return 为true的时候,将停止发射后面的数据
TakeUntil ObservableSource - Observable 当第二个Observable发射了一项数据或者终止时,丢弃原始Observable

过滤类#

入参 子函数出参 特性
Debounce long - 消息A之候X秒内无新消息,则发射消息A
Distinct - - 去重,判等依据是hashcode、equals
Filter Predicate boolean 只发射test返回结果为true的数据流
Skip long - 跳过Observable发射的前N项数据
Last long - 返回最后一项
Tak long - 只发射前面的N项数据
TakLast long - 返回后n项数据

异常类#

入参 出参 特性
onErrorReturn Function Observable 中断数据流发射
onErrorResumeNext 备用Observable Observable 中断数据流发射,使用备用Observable发射数据,emitter.onError传入的是Exceptio或Throwable对象,都会执行此方法
onExceptionResumeNext 备用Observable Observable 中断数据流发射,如果onError传入的Exception,则使用备用Observable;
如果传入的是Throwable,则完全停止发射数据流,不使用备用Observable

易错对比#

join

任何时候,只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了一条数据,就结合两个Observable发射的数据。

难以理解的点:如何结合两个Observable发射的数据

解答:排列组合的方式结合,比如Observable1 发射 1 2 3,Observable2 发射A,那么组合结果是 1A, 2A, 3A

难以理解的点:另一个Observable发射的数据定义的时间窗口内

解答:Observable2发射A并指定了A即将与Observable1 哪一个时间段内发射的数据进行排列组合

参考

http://www.introtorx.com/uat/content/v1.0.10621.0/17_SequencesOfCoincidence.html#Join

When left produces a value, a window is opened. That value is also then passed to the leftDurationSelector function. The result of this function is an IObservable. When that sequence produces a value or completes then the window for that value is closed. Note that it is irrelevant what the type of TLeftDuration is. This initially left me with the feeling that IObservable was all a bit over kill as you effectively just need some sort of event to say ‘Closed’. However by allowing you to use IObservable you can do some clever stuff as we will see later.

So let us first imagine a scenario where we have the left sequence producing values twice as fast as the right sequence. Imagine that we also never close the windows. We could do this by always returning Observable.Never() from the leftDurationSelector function. This would result in the following pairs being produced.

Left Sequence

L 0-1-2-3-4-5-
Right Sequence

R –A—B—C-
0, A
1, A
0, B
1, B
2, B
3, B
0, C
1, C
2, C
3, C
4, C
5, C
As you can see the left values are cached and replayed each time the right produces a value.
说白了join操作我们不能把两个Observable等价看待,join的效果类似于排列组合,把第一个数据源A作为基座窗口,他根据自己的节奏不断发射数据元素,第二个数据源B,每发射一个数据,我们都把它和第一个数据源A中已经发射的数据进行一对一匹配;

举例来说,如果某一时刻B发射了一个数据“B”,此时A已经发射了0,1,2,3共四个数据,那么我们的合并操作就会把“B”依次与0,1,2,3配对,得到四组数据: 0, B 1, B 2, B 3, B

是不是和简单?可惜我在在花费大量时间研究出join本质前,一直无法理解,希望本文能帮助到遇到同样问题的人;

小结
所以join得本质就是这么简单,但是很遗憾在RxJava的各种介绍和Github上面对join都是一笔带过,并没有把他的概念讲清楚,如果读者在细致的学习RxJava时就会遇到很多问题

do

问:同一doXX是否可以调用多次

答:可以多次,不会覆盖

问:多个doXX方法谁先谁后?

答:同步在一个线程的时候,提交顺序等于执行顺序;异步在多个线程的时候,取决于谁的Scheduler先执行,先执行的先进入对应的do方法;

问:所有的doXX一定会被执行?

答:不一定,downstream 进入某一个方法后,遇到Exception,下面的doXX是不会进行的

问题:如何记住某些doXX的顺序?

答:以subscribe的时间点位分割线

在subscribe 前后提交的doOnTerminate 和 doAfterTerminate

在subscribe 前后提交的doOnNext和 doAfterNext
doFinally 总是最后提交的,并且是一定提交的,无论是data,error,或者dispose

to

操作符 函数 备注
toMap Function<T, R>:T是入参,R是返回值
toMap的结果将R作为Map的key,将T作为Map的value
取值方式是在观察者里的Map<R,T>,注意R是返回值key
toList 取值方式直接在观察者里onSuccess里获取
toFuture 取值方式用future.get

自定义#

自定义操作符接口类:ObservableOperator

自定义变换操作符接口类:ObservableTransformer

参考文献#

RreactiveX中文版:介绍了Reactive的基本概念,rxjava对应的实现

Rxjava官网:wiki官网

点击查看
-------------------本文结束 感谢您的阅读-------------------