Rxjava介绍<1>

2019-09-18 21:49 来源:未知

Rxjava github地址
给初学者的RxJava2.0教程------水管系列
手把手教你使用 RxJava 2.0
这可能是最好的RxJava 2.x 入门教程
RxJava API文档
RxJava 2.x 使用详解(一) 快速入门
实战CSDN作者余志强的RxJava2操作符系列
推荐:Rxjava2 教程大集合

篇幅较长,请参阅给 Android 开发者的 RxJava 详解。

2.概念

由于RxJava是利用观察者模式来实现一些列的操作,所以对于观察者模式中的观察者,被观察者,以及订阅、事件需要有一个了解。

  • Observable:在观察者模式中称为“被观察者”;
  • Observer:观察者模式中的“观察者”,可接收-
    Observable发送的数据;
  • subscribe:订阅,观察者与被观察者,通过subscribe()方法进行订阅;
  • Subscriber:也是一种观察者,在2.0中 它与Observer没什么实质的区别,不同的是 Subscriber要与Flowable(也是一种被观察者)联合使用,该部分内容是2.0新增的。Obsesrver用于订阅Observable,而Subscriber用于订阅Flowable

在新版本中,出现了两种观察者模式:

Observable ( 被观察者 ) / Observer ( 观察者 )
Flowable (被观察者)/ Subscriber (观察者)

图片 1

image.png

Observable发送消息,而Subscriber则用于消费消息。与观察者不同的是,Observable一般只有等到有Subscriber通过subscribe方法订阅它,才会开始发送消息。

4.基本使用

Observable ( 被观察者 ) / Observer ( 观察者 ):
Observable的创建:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                //执行一些其他操作
                //.............
                //执行完毕,触发回调,通知观察者
                e.onNext("我来发射数据");
            }
        });

Observer的创建:

Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            //观察者接收到通知,进行相关操作
            public void onNext(String aLong) {
                System.out.println("我接收到数据了");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };

订阅:

 observable.subscribe(observer);

Flowable (被观察者)/ Subscriber (观察者):

  Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                        Log.d(TAG, "First requested = "   emitter.requested());
                        boolean flag;
                        for (int i = 0; ; i  ) {
                            flag = false;
                            while (emitter.requested() == 0) {
                                if (!flag) {
                                    Log.d(TAG, "Oh no! I can't emit value!");
                                    flag = true;
                                }
                            }
                            emitter.onNext(i);
                            Log.d(TAG, "emit "   i   " , requested = "   emitter.requested());
                        }
                    }
                }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        s.request(96);
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: "   integer);
//                        try {
//                            Thread.sleep(1000);
//                        } catch (InterruptedException e) {
//                            e.printStackTrace();
//                        }
                        mSubscription.request(1);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

create()方法是 RxJava 最基本的创造事件序列的方法。基于这个方法, RxJava 还提供了一些方法用来快捷创建事件队列。例如:

3.工程引用

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
// Because RxAndroid releases are few and far between, it is recommended you also
// explicitly depend on RxJava's latest version for bug fixes and new features.
compile 'io.reactivex.rxjava2:rxjava:2.1.8'

参阅给 Android 开发者的 RxJava 详解什么是函数式编程RxJava 2.0 全新来袭基于RxJava 1.x,结合RxJava 2.0整理学习笔记。

RxJava2 源码解析(一)
RxJava2 源码解析(二)

1.作用

RxJava的目的就是异步。
RxJava的特点就是可以非常简便的实现异步调用,可以在逻辑复杂的代码逻辑中以比较轻易的方式实现异步调用。随着逻辑的复杂,需求的更改,代码可依然能保持极强的阅读性,在深入的使用过程中一定对这点深有体会。

6.线程切换

在RxJava中, 已经内置了4种线程选项供我们选择:

  • Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
  • Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
  • Schedulers.newThread() 代表一个常规的新线程
  • AndroidSchedulers.mainThread() 代表Android的主线程
  1. 简单地说,subscribeOn() 指定的就是发射事件的线程,observerOn 指定的就是订阅者接收事件的线程。
  2. 多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用 subscribeOn() 只有第一次的有效,其余的会被忽略。
  3. 但多次指定订阅者接收线程是可以的,也就是说每调用一次 observerOn(),下游的线程就会切换一次。
 /** * 创建一个Observable对象,并定义事件处理规则。当它被订阅的时候,事件会按顺序依次触发。 */Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext; emitter.onNext; emitter.onNext; emitter.onComplete;

5.基本使用

RxJava 有四个基本概念:Observable (被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在完成某些操作,获得一些结果后,回调触发事件,即发出事件来通知 Observer。

关于回调,如果理解则可以跳过这一段,如果不理解,在RxJava中可以简单的理解为:为了方便Observable和Observer交互,在Observable中,将Observer对象传入,在完成某些操作后调用Observer对象的方法,此时将触发Observer中具体实现的对应方法。
注意:Observer是个接口,Observable是个类。

与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() 之外,还定义了三个特殊的事件:onComplete() 和 onError(),onSubscribe()。

onComplete(): 事件队列完结时调用该方法。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。
onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
onSubscribe():RxJava 2.0 中新增的,传递参数为Disposable ,Disposable 相当于RxJava1.x中的Subscription,用于解除订阅。
注意:onComplete() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。前者用于指定被观察者执行的线程,或者叫事件产生的线程。后者用于指定观察者执行的线程,或者叫事件消费的线程。

  • Observable ,抽象类它决定什么时候触发事件以及触发怎样的事件

1.实现了异步操作的库;2.通过扩展观察者模式来实现异步;

图片 2订阅方法

显然,subscribe方法支持不完整定义的回调,可以根据需求单独处理只需要的回调,而无需每次都处理Observer中的4个回调。Consumer可以定义Observer的每一个部分,Observable.subscribe()函数能够处理一个,两个、三个或者4个参数,分别表示onNext(),onError(),onComplete()和onSubscribe函数。响应顺序是onSubscribe->onNext->onComplete或者onError。

/** * 创建一个观察者 */Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext { } @Override public void onError(Throwable e) { } @Override public void onComplete() { }};
Observable.create(new ObservableOnSubscribe<Drawable>() { @Override public void subscribe(ObservableEmitter<Drawable> e) throws Exception { //根据id获取Drawable对象,回调到观察者中。 Drawable drawable = getResources().getDrawable(R.drawable.ic_action_name); e.onNext; e.onComplete.subscribe(new Consumer<Drawable>() { @Override public void accept(@NonNull Drawable drawable) throws Exception { ImageView imageView = (ImageView) findViewById(R.id.image); imageView.setImageDrawable; }}, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { System.out.println(throwable.getMessage;
  • Consumer, 一个接口。用于接受单个值。
  • just

    图片 3just方法

  • fromArray(T... items)

    Observable observable1 = Observable.just("Hello", "Hi", "Aloha");Observable observable2 = Observable.fromArray("Hello", "Hi", "Aloha");
    
  • Flowable,抽象类。等价于Observable。RxJava 2.x引入。

  • ** Observer**,接口。它决定事件触发的时候将有怎样的行为。定义了4个行为/方法: onSubscribe(), onNext(), onError(), onComplete(),
observable.subscribe;//RxJava 2.x中如下方法编译报错,没有提供与Subscriber对象关联的方法//observable1.subscribe(subscriber);

简单的说就是在发送者Observable和消息消费者Subscriber之间对消息进行各种你所需要的加工处理。RxJava基础知识RxJava Operator

RxJava1.x中,Observeable用于订阅Observer和Subscriber。

RxJava2.x中, Observeable用于订阅Observer ,是不支持背压的,而 Flowable用于订阅Subscriber ,是支持背压(Backpressure)的。

背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略 ,在1.0中,关于背压最大的遗憾,就是集中在Observable这个类中,导致有的Observable支持背压,有的不支持。为了解决这种缺憾,新版本把支持背压和不支持背压的Observable区分开来。

关键方法

  • map()
  • flatMap()

当然,除了上面这两种观察者,还有一类观察者Single/SingleObserverCompletable/CompletableObserverMaybe/MaybeObserver

Rxjava介绍<1>。更多请参阅RxJava 2.0 全新来袭

/** * 创建一个订阅者 */Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onSubscribe(Subscription s) { } @Override public void onNext { } @Override public void onError(Throwable t) { } @Override public void onComplete() { }};
  • subscribe,方法。
  1. 由 id 取得图片并显示
String[] names = {"Jason", "Bob", "Coco"};Observable.fromArray.subscribe(new Consumer<String>() { @Override public void accept(@NonNull String s) throws Exception { System.out.println("name:"   s); }});
  • Scheduler,抽象类
  • Scheduler的子类有ComputationScheduler、ExecutorScheduler、ImmediateThinScheduler、NewThreadScheduler、SingleScheduler、TrampolineScheduler。
  • Schedulers 一个可以返回标准Scheduler实例的静态工厂。

    图片 4方法

    • Schedulers.newThread(): 为每个工作单元创建一个新的线程。
    • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。与newThread()的区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程,计算工作可以使用computation()方法。
    • Schedulers.computation(): 用于计算型工作例如事件循环和回调处理。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
    • Schedulers.single(): 单线程,共享的Scheduler。
  • Schedulers.trampoline():在当前线程上工作,但不立即执行的Scheduler。在当前线程中的工作放入队列中排队,并依次操作。

  • 另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

可以从上图的订阅方法中发现Consumer类。

在 RxJava 的默认规则中,事件的发出和消费都是在同一个线程的。也就是说,如果只用上面的方法,实现出来的只是一个同步的观察者模式。观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于 RxJava 是至关重要的。而要实现异步,则需要用到 RxJava 的另一个概念: Scheduler 。

  • SubscriberRxjava介绍<1>。,接口。等价于观察者。与观察者不同之处在于:onSubscribe方法的参数不同,而且两者位于不同的jar包下。Subscriber位于reactive-streams.jar文件下,包名:org.reactivestreams.SubscriberObservable位于rxjava.jar文件下,包名:io.reactivex.Observable
Observable.create(new ObservableOnSubscribe<Drawable>() { @Override public void subscribe(ObservableEmitter<Drawable> e) throws Exception { //根据id获取Drawable对象,回调到观察者中。 Drawable drawable = getResources().getDrawable(R.drawable.ic_action_name); e.onNext; e.onComplete.subscribeOn(Schedulers.io//用于指定被观察者执行的线程.observeOn(AndroidSchedulers.mainThread//用于执行观察者执行的线程.subscribe(new Consumer<Drawable>() { @Override public void accept(@NonNull Drawable drawable) throws Exception { ImageView imageView = (ImageView) findViewById(R.id.image); imageView.setImageDrawable; }}, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { System.out.println(throwable.getMessage;
Consumer onNextConsumer = new Consumer<String>() { @Override public void accept(@NonNull String o) throws Exception { }};Consumer onErrorConsumer = new Consumer<String>() { @Override public void accept(@NonNull String o) throws Exception { }};observable.subscribe(onNextConsumer, onErrorConsumer);
  1. 将字符串数组 names 中的所有字符串依次打印出来:

所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。

TAG标签: 学习笔记 RxJava
版权声明:本文由彩民之家高手论坛发布于编程技术,转载请注明出处:Rxjava介绍<1>