val observer = object : Observer {override fun onSubscribe(d: Disposable) {LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")}override fun onNext(t: Int) {LjyLogUtil.d("${Thread.currentThread().name}_onNext:$t")}override fun onError(e: Throwable) {LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")}override fun onComplete() {LjyLogUtil.d("${Thread.currentThread().name}_onComplete")}}
val btn1 = findViewById(R.id.btn_1)
btn1.clicks().throttleFirst(2, TimeUnit.SECONDS).subscribeOn(AndroidSchedulers.mainThread()).subscribe {LjyLogUtil.d("点击按钮")}
3. editText输入监听
//也可用作联想搜索优化
val et1 = findViewById(R.id.et_1)
et1.textChanges().debounce(1, TimeUnit.SECONDS)//跳过第1次请求 因为初始输入框的空字符状态.skip(1).observeOn(AndroidSchedulers.mainThread()).subscribe{LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")}
4. 联合/表单判断
val etName = findViewById(R.id.et_name)
val etPwd = findViewById(R.id.et_pwd)
val obName = etName.textChanges()
val obPwd = etPwd.textChanges()
Observable.combineLatest(obName, obPwd, { name, pwd -> name == "ljy" && pwd == "123" })//跳过第1次请求 因为初始输入框的空字符状态.skip(1).observeOn(AndroidSchedulers.mainThread()).subscribe { isLogin -> LjyLogUtil.d(if (isLogin) "登录成功" else "登录失败") }
5. 定时器任务
val time = 10L
val btnLogin = findViewById(R.id.btn_login)
btnLogin.clicks().throttleFirst(time, TimeUnit.SECONDS).subscribeOn(AndroidSchedulers.mainThread()).doOnNext { btnLogin.isEnabled = false }.subscribe {LjyLogUtil.d("点击登录")Observable.intervalRange(0, time, 0, 1,TimeUnit.SECONDS, AndroidSchedulers.mainThread()).subscribe({ btnLogin.text = "剩余${time - it}秒" },{ LjyLogUtil.e(it.message) },{btnLogin.text = "获取验证码"btnLogin.isEnabled = true})}
利用RxLifecycle解决内存泄漏问题
1. 添加依赖
//RxLifecycleimplementation 'com.trello.rxlifecycle4:rxlifecycle:4.0.2'
// If you want to bind to Android-specific lifecyclesimplementation 'com.trello.rxlifecycle4:rxlifecycle-android:4.0.2'
// If you want pre-written Activities and Fragments you can subclass as providersimplementation 'com.trello.rxlifecycle4:rxlifecycle-components:4.0.2'
// If you want pre-written support preference Fragments you can subclass as providersimplementation 'com.trello.rxlifecycle4:rxlifecycle-components-preference:4.0.2'
// If you want to use Android Lifecycle for providersimplementation 'com.trello.rxlifecycle4:rxlifecycle-android-lifecycle:4.0.2'
// If you want to use Kotlin syntaximplementation 'com.trello.rxlifecycle4:rxlifecycle-kotlin:4.0.2'
// If you want to use Kotlin syntax with Android Lifecycleimplementation 'com.trello.rxlifecycle4:rxlifecycle-android-lifecycle-kotlin:4.0.2'
var result = "数据来自:"
val net = Observable.just("网络")
val disk = Observable.just("磁盘")
//使用merge,从网络和本地获取数据并展示
Observable.merge(net, disk).subscribe({result += "$it, "}, {}, {LjyLogUtil.d("result: $result")})
//使用zip,合并2个网络请求向获取数据并展示
val repo1 = apiService.getItem(1001).subscribeOn(Schedulers.io())
val repo2 = apiService.getItem(1002).subscribeOn(Schedulers.io())
Observable.zip(repo1, repo2, { data1, data2 ->val repoList = ArrayList()repoList.add(data1)repoList.add(data2)repoList}).observeOn(AndroidSchedulers.mainThread()).subscribe {for (repoDetail in it) {LjyLogUtil.d("result: ${repoDetail.name}")}}
源码解析
入口
以下面代码为源码阅读入口,Single是最简单的被观察者;
Single.just(1).subscribe(object : SingleObserver {override fun onSubscribe(d: Disposable) {LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")}override fun onSuccess(t: String) {LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$t")}override fun onError(e: Throwable) {LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")}})
被观察者的创建
先看上面代码中被观察者的创建方法:Single.just();
public static <@NonNull T> Single just(T item) {Objects.requireNonNull(item, "item is null");//判空return RxJavaPlugins.onAssembly(new SingleJust<>(item));
}
@Nullable
static volatile Function super Single, ? extends Single> onSingleAssembly;public static Single onAssembly(@NonNull Single source) {Function super Single, ? extends Single> f = onSingleAssembly;if (f != null) {return apply(f, source);}return source;
}
public final class SingleJust extends Single {final T value;public SingleJust(T value) {this.value = value;}@Overrideprotected void subscribeActual(SingleObserver super T> observer) {observer.onSubscribe(Disposable.disposed());observer.onSuccess(value);}
}
public abstract class Single<@NonNull T> implements SingleSource {...protected abstract void subscribeActual(@NonNull SingleObserver super T> observer);...public final void subscribe(@NonNull SingleObserver super T> observer) {...}
}
Single实现了SingleSource接口,并实现subscribe方法;
public interface SingleSource<@NonNull T> {/*** Subscribes the given {@link SingleObserver} to this {@link SingleSource} instance.* @param observer the {@code SingleObserver}, not {@code null}* @throws NullPointerException if {@code observer} is {@code null}*/void subscribe(@NonNull SingleObserver super T> observer);
}
订阅
下面来看看开头例子中的第二行的subscribe方法:Single.subscribe();
public final void subscribe(@NonNull SingleObserver super T> observer) {//判空Objects.requireNonNull(observer, "observer is null");//钩子方法,默认还是入参的SingleObserverobserver = RxJavaPlugins.onSubscribe(this, observer);//判空Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null SingleObserver. Please check the handler provided to RxJavaPlugins.setOnSingleSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");try {subscribeActual(observer);} catch (NullPointerException ex) {throw ex;} catch (Throwable ex) {Exceptions.throwIfFatal(ex);NullPointerException npe = new NullPointerException("subscribeActual failed");npe.initCause(ex);throw npe;}
}
Single.just(1).map { "num_$it" }.subscribe { num ->LjyLogUtil.d(num)}
那么看一下map的实现:
public final <@NonNull R> Single map(@NonNull Function super T, ? extends R> mapper) {Objects.requireNonNull(mapper, "mapper is null");return RxJavaPlugins.onAssembly(new SingleMap<>(this, mapper));
}
public final class SingleMap extends Single {final SingleSource extends T> source;final Function super T, ? extends R> mapper;public SingleMap(SingleSource extends T> source, Function super T, ? extends R> mapper) {this.source = source;this.mapper = mapper;}@Overrideprotected void subscribeActual(final SingleObserver super R> t) {source.subscribe(new MapSingleObserver(t, mapper));}...//省略内部类MapSingleObserver
}
Single.just(1).map { "num_$it" }.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe { num ->LjyLogUtil.d(num)}
subscribeOn
先来看看subscribeOn的入参Schedulers.io():
public static Scheduler io() {return RxJavaPlugins.onIoScheduler(IO);
}static {SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());IO = RxJavaPlugins.initIoScheduler(new IOTask());TRAMPOLINE = TrampolineScheduler.instance();NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}static final class IOTask implements Supplier {@Overridepublic Scheduler get() {return IoHolder.DEFAULT;}
}static final class IoHolder {static final Scheduler DEFAULT = new IoScheduler();
}public final class IoScheduler extends Scheduler {...
}
可以看到最后返回的是Scheduler的实现类IoScheduler实例;
再来看看subscribeOn方法的实现:
public final Single subscribeOn(@NonNull Scheduler scheduler) {Objects.requireNonNull(scheduler, "scheduler is null");return RxJavaPlugins.onAssembly(new SingleSubscribeOn<>(this, scheduler));
}
//通过静态内部类提供HandlerScheduler的单例
public final class AndroidSchedulers {//开放的入口方法public static Scheduler mainThread() {//钩子return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);}//私有的静态变量private static final Scheduler MAIN_THREAD =RxAndroidPlugins.initMainThreadScheduler(() -> MainHolder.DEFAULT);//私有的静态内部类private static final class MainHolder {//本质还说离不开Handler,通过Looper.getMainLooper()切换到UI线程static final Scheduler DEFAULT= new HandlerScheduler(new Handler(Looper.getMainLooper()), true);}//私有的构造方法,并抛出异常,因为这里是创建HandlerScheduler的单例,而不是AndroidSchedulers本身private AndroidSchedulers() {throw new AssertionError("No instances.");}
}final class HandlerScheduler extends Scheduler {private final Handler handler;private final boolean async;HandlerScheduler(Handler handler, boolean async) {this.handler = handler;this.async = async;}public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {...//省略判空和钩子方法ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);Message message = Message.obtain(handler, scheduled);if (async) {message.setAsynchronous(true);}//通过主线程的handler发送消息handler.sendMessageDelayed(message, unit.toMillis(delay));return scheduled;}@Overridepublic Worker createWorker() {return new HandlerWorker(handler, async);}private static final class HandlerWorker extends Worker {private final Handler handler;private final boolean async;private volatile boolean disposed;HandlerWorker(Handler handler, boolean async) {this.handler = handler;this.async = async;}public Disposable schedule(Runnable run, long delay, TimeUnit unit) {...if (disposed) {return Disposable.disposed();}...ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);Message message = Message.obtain(handler, scheduled);message.obj = this;...handler.sendMessageDelayed(message, unit.toMillis(delay));if (disposed) {handler.removeCallbacks(scheduled);return Disposable.disposed();}return scheduled;}@Overridepublic void dispose() {disposed = true;handler.removeCallbacksAndMessages(this /* token */);}@Overridepublic boolean isDisposed() {return disposed;}}private static final class ScheduledRunnable implements Runnable, Disposable {private final Handler handler;private final Runnable delegate;private volatile boolean disposed;ScheduledRunnable(Handler handler, Runnable delegate) {this.handler = handler;this.delegate = delegate;}@Overridepublic void run() {try {delegate.run();} catch (Throwable t) {RxJavaPlugins.onError(t);}}@Overridepublic void dispose() {handler.removeCallbacks(this);disposed = true;}@Overridepublic boolean isDisposed() {return disposed;}}
}
然后来看observeOn方法:
public final Single observeOn(@NonNull Scheduler scheduler) {Objects.requireNonNull(scheduler, "scheduler is null");return RxJavaPlugins.onAssembly(new SingleObserveOn<>(this, scheduler));
}
本质还是创建了一个被观察者:
public final class SingleObserveOn extends Single {final SingleSource source;final Scheduler scheduler;public SingleObserveOn(SingleSource source, Scheduler scheduler) {this.source = source;this.scheduler = scheduler;}@Overrideprotected void subscribeActual(final SingleObserver super T> observer) {//调用上游被观察者source的订阅方法subscribe,传入一个观察者ObserveOnSingleObserver,其入参是下游的观察者observersource.subscribe(new ObserveOnSingleObserver<>(observer, scheduler));}...//省略内部类ObserveOnSingleObserver
}
public static <@NonNull T> Single create(@NonNull SingleOnSubscribe source) {Objects.requireNonNull(source, "source is null");return RxJavaPlugins.onAssembly(new SingleCreate<>(source));
}
同样的create方法中创建了一个被观察者SingleCreate并返回:
public final class SingleCreate extends Single {final SingleOnSubscribe source;public SingleCreate(SingleOnSubscribe source) {this.source = source;}@Overrideprotected void subscribeActual(SingleObserver super T> observer) {Emitter parent = new Emitter<>(observer);observer.onSubscribe(parent);try {source.subscribe(parent);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);}}...//省略了内部类Emitter
}