使用RxJava怎么实现消息发送和线程切换
本篇文章给大家分享的是有关使用RxJava怎么实现消息发送和线程切换,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
创新互联建站服务项目包括宝安网站建设、宝安网站制作、宝安网页制作以及宝安网络营销策划等。多年来,我们专注于互联网行业,利用自身积累的技术优势、行业经验、深度合作伙伴关系等,向广大中小型企业、政府机构等提供互联网行业的解决方案,宝安网站推广取得了明显的社会效益与经济效益。目前,我们服务的客户以成都为中心已经辐射到宝安省份的部分城市,未来相信会继续扩大服务区域并继续获得客户的支持与信任!
消息订阅发送
首先让我们看看消息订阅发送最基本的代码组成:
Observable observable = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { emitter.onNext("Jack1"); emitter.onNext("Jack2"); emitter.onNext("Jack3"); emitter.onComplete(); } }); Observer observer = new Observer () { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(String s) { Log.d(TAG, "onNext : " + s); } @Override public void onError(Throwable e) { Log.d(TAG, "onError : " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }; observable.subscribe(observer);
代码很简单,observable为被观察者,observer为观察者,然后通过observable.subscribe(observer),把观察者和被观察者关联起来。被观察者发送消息(emitter.onNext("内容")),观察者就可以在onNext()方法里回调出来。
我们先来看Observable,创建是用Observable.create()方法进行创建,源码如下:
public staticObservable create(ObservableOnSubscribe source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate (source)); } public static T requireNonNull(T object, String message) { if (object == null) { throw new NullPointerException(message); } return object; } public static Observable onAssembly(@NonNull Observable source) { Function super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; }
可以看出,create()方法里最主要的还是创建用ObservableOnSubscribe传入创建了一个ObservableCreate对象并且保存而已。
public final class ObservableCreateextends Observable { final ObservableOnSubscribe source; public ObservableCreate(ObservableOnSubscribe source) { this.source = source; } }
接着是创建Observer,这比较简单只是单纯创建一个接口对象而已
public interface Observer{ void onSubscribe(@NonNull Disposable d); void onNext(@NonNull T t); void onError(@NonNull Throwable e); void onComplete(); }
订阅发送消息
observable.subscribe(observer)的subscribe方法如下:
public final void subscribe(Observer super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } } //ObjectHelper.requireNonNull()方法 public staticT requireNonNull(T object, String message) { if (object == null) { throw new NullPointerException(message); } return object; } //RxJavaPlugins.onSubscribe()方法 public static Observer super T> onSubscribe(@NonNull Observable source, @NonNull Observer super T> observer) { BiFunction super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe; if (f != null) { return apply(f, source, observer); } return observer; }
从上面源码可以看出requireNonNull()只是做非空判断而已,而RxJavaPlugins.onSubscribe()也只是返回最终的观察者而已。所以关键代码是抽象方法subscribeActual(observer);那么subscribeActual对应哪个代码段呢?
还记得Observable.create()创建的ObservableCreate类吗,这就是subscribeActual()具体实现类,源码如下:
protected void subscribeActual(Observer super T> observer) { CreateEmitterparent = new CreateEmitter (observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
从上面的代码可以看出,首先创建了一个CreateEmitter对象并传入observer,然后回到observer的onSubscribe()方法,而source就是我们之前创建ObservableCreate传入的ObservableOnSubscribe对象。
class CreateEmitterextends AtomicReference implements ObservableEmitter , Disposable { }
而CreateEmitter又继承ObservableEmitter接口,又回调ObservableOnSubscribe的subscribe方法,对应着我们的:
Observable observable = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { emitter.onNext("Jack1"); emitter.onNext("Jack2"); emitter.onNext("Jack3"); emitter.onComplete(); } });
当它发送消息既调用emitter.onNext()方法时,既调用了CreateEmitter的onNext()方法:
public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } }
可以看到最终又回调了观察者的onNext()方法,把被观察者的数据传输给了观察者。有人会问
isDisposed()是什么意思,是判断要不要终止传递的,我们看emitter.onComplete()源码:
public void onComplete() { if (!isDisposed()) { try { observer.onComplete(); } finally { dispose(); } } } public static boolean dispose(AtomicReferencefield) { Disposable current = field.get(); Disposable d = DISPOSED; if (current != d) { current = field.getAndSet(d); if (current != d) { if (current != null) { current.dispose(); } return true; } } return false; } public static boolean isDisposed(Disposable d) { return d == DISPOSED; }
dispose()方法是终止消息传递,也就付了个DISPOSED常量,而isDisposed()方法就是判断这个常量而已。这就是整个消息订阅发送的过程,用的是观察者模式。
线程切换
在上面模板代码的基础上,线程切换只是改变了如下代码:
observable.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(observer);
下面我们对线程切换的源码进行一下分析,分为两部分:subscribeOn()和observeOn()
subscribeOn()
首先是subscribeOn()源码如下:
public final ObservablesubscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn (this, scheduler)); }
我们传进去了一个Scheduler类,Scheduler是一个调度类,能够延时或周期性地去执行一个任务。
Scheduler有如下类型:
类型 | 使用方式 | 含义 | 使用场景 |
---|---|---|---|
IoScheduler | Schedulers.io() | io操作线程 | 读写SD卡文件,查询数据库,访问网络等IO密集型操作 |
NewThreadScheduler | Schedulers.newThread() | 创建新线程 | 耗时操作等 |
SingleScheduler | Schedulers.single() | 单例线程 | 只需一个单例线程时 |
ComputationScheduler | Schedulers.computation() | CPU计算操作线程 | 图片压缩取样、xml,json解析等CPU密集型计算 |
TrampolineScheduler | Schedulers.trampoline() | 当前线程 | 需要在当前线程立即执行任务时 |
HandlerScheduler | AndroidSchedulers.mainThread() | Android主线程 | 更新UI等 |
接着就没什么了,只是返回一个ObservableSubscribeOn对象而已。
observeOn()
首先看源码如下:
public final ObservableobserveOn(Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); } public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn (this, scheduler, delayError, bufferSize)); }
这里也是没什么,只是最终返回一个ObservableObserveOn对象而已。
接着还是像原来那样调用subscribe()方法进行订阅,看起来好像整体变化不大,就是封装了一些对象而已,不过着恰恰是RxJava源码的精华,当他再次调用subscribeActual()方法时,已经不是之前的ObservableCreate()里subscribeActual方法了,而是最先调用ObservableObserveOn的subscribeActual()方法,对应源码如下:
protected void subscribeActual(Observer super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize)); } }
在这里有两点要讲,一点是ObserveOnObserver是执行观察者的线程,后面还会详解,然后就是source.subscribe,这个source.subscribe调的是ObservableSubscribeOn的subscribe方法,而subscribe方法因为继承的也是Observable,是Observable里的方法,所以和上面的ObservableCreate一样的方法,所以会调用ObservableSubscribeOn里的subscribeActual()方法,对应的代码如下:
public void subscribeActual(final Observer super T> s) { final SubscribeOnObserverparent = new SubscribeOnObserver (s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }
上面代码中,首先把ObserveOnObserver返回给来的用SubscribeOnObserver“包装”起来,然后在回调Observer的onSubscribe(),就是对应模板代码的onSubscribe()方法。
接着看SubscribeTask类的源码:
final class SubscribeTask implements Runnable { private final SubscribeOnObserverparent; SubscribeTask(SubscribeOnObserver parent) { this.parent = parent; } @Override public void run() { source.subscribe(parent); } }
其中的source.subscribe(parent),就是我们执行子线程的回调方法,对应我们模板代码里的被观察者的subscribe()方法。它放在run()方法里,并且继承Runnable,说明这个类主要是线程运行。接着看scheduler.scheduleDirect()方法对应的源码如下:
public Disposable scheduleDirect(@NonNull Runnable run) { return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS); } public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit); return task; }
在这里,createWorker()也是一个抽象方法,调用的是我们的调度类对应的Schedulers类里面的方法,这里是IoScheduler类,
public final class IoScheduler extends Scheduler{ final AtomicReferencepool; //省略.... public Worker createWorker() { return new EventLoopWorker(pool.get()); } static final class EventLoopWorker extends Scheduler.Worker { private final CompositeDisposable tasks; private final CachedWorkerPool pool; private final ThreadWorker threadWorker; final AtomicBoolean once = new AtomicBoolean(); EventLoopWorker(CachedWorkerPool pool) { this.pool = pool; this.tasks = new CompositeDisposable(); this.threadWorker = pool.get(); } //省略.... @NonNull @Override public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { if (tasks.isDisposed()) { // don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); } } } static final class CachedWorkerPool implements Runnable { //省略.... ThreadWorker get() { if (allWorkers.isDisposed()) { return SHUTDOWN_THREAD_WORKER; } while (!expiringWorkerQueue.isEmpty()) { ThreadWorker threadWorker = expiringWorkerQueue.poll(); if (threadWorker != null) { return threadWorker; } } ThreadWorker w = new ThreadWorker(threadFactory); allWorkers.add(w); return w; } //省略.... }
这就是IoScheduler的createWorker()的方法,其实最主要的意思就是获取线程池,以便于生成子线程,让SubscribeTask()可以运行。然后直接调用 w.schedule(task, delay, unit)方法让它在线程池里执行。上面中那ThreadWorker的源码如下:
static final class ThreadWorker extends NewThreadWorker { private long expirationTime; ThreadWorker(ThreadFactory threadFactory) { super(threadFactory); this.expirationTime = 0L; } //省略代码.... } public class NewThreadWorker extends Scheduler.Worker implements Disposable { private final ScheduledExecutorService executor; public NewThreadWorker(ThreadFactory threadFactory) { executor = SchedulerPoolFactory.create(threadFactory); } public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); if (parent != null) { if (!parent.add(sr)) { return sr; } } Future> f; try { if (delayTime <= 0) { f = executor.submit((Callable
可以看到,这就调了原始的javaAPI来进行线程池操作。
然后最后一环在子线程调用source.subscribe(parent)方法,然后回调刚开始创建的ObservableCreate的subscribeActual(),既:
protected void subscribeActual(Observer super T> observer) { CreateEmitterparent = new CreateEmitter (observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
进行消息的订阅绑定。
当我们在调用 emitter.onNext(内容)时,是在io线程里的,那回调的onNext()又是什么时候切换的?那就是前面为了整个流程流畅性没讲的在observeOn()里的ObserveOnObserver是执行观察者的线程的过程。
class ObserveOnObserverextends BasicIntQueueDisposable implements Observer , Runnable { //省略代码.... ObserveOnObserver(Observer super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) { this.actual = actual; this.worker = worker; this.delayError = delayError; this.bufferSize = bufferSize; } @Override public void onSubscribe(Disposable s) { if (DisposableHelper.validate(this.s, s)) { this.s = s; if (s instanceof QueueDisposable) { @SuppressWarnings("unchecked") QueueDisposable qd = (QueueDisposable ) s; int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY); if (m == QueueDisposable.SYNC) { sourceMode = m; queue = qd; done = true; actual.onSubscribe(this); schedule(); return; } if (m == QueueDisposable.ASYNC) { sourceMode = m; queue = qd; actual.onSubscribe(this); return; } } queue = new SpscLinkedArrayQueue (bufferSize); actual.onSubscribe(this); } } @Override public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); } void schedule() { if (getAndIncrement() == 0) { worker.schedule(this); } } //省略代码.... }
当调用emitter.onNext(内容)方法,会调用上面的onNext()方法,然后在这个方法里会把数据压入一个队列,然后执行worker.schedule(this)方法,work是什么呢,还记得AndroidSchedulers.mainThread()吗,这个对应这个HandlerScheduler这个类,所以createWorker()对应着:
private static final class MainHolder { static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper())); } public Worker createWorker() { return new HandlerWorker(handler); } private static final class HandlerWorker extends Worker { private final Handler handler; private volatile boolean disposed; HandlerWorker(Handler handler) { this.handler = handler; } @Override public Disposable schedule(Runnable run, long delay, TimeUnit unit) { if (run == null) throw new NullPointerException("run == null"); if (unit == null) throw new NullPointerException("unit == null"); if (disposed) { return Disposables.disposed(); } run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); Message message = Message.obtain(handler, scheduled); message.obj = this; // Used as token for batch disposal of this worker's runnables. handler.sendMessageDelayed(message, unit.toMillis(delay)); if (disposed) { handler.removeCallbacks(scheduled); return Disposables.disposed(); } return scheduled; } }
在next()方法里,运用android自带的Handler消息机制,通过把方法包裹在Message里,同通过handler.sendMessageDelayed()发送消息,就会在ui线程里回调Next()方法,从而实现从子线程切换到android主线程的操作。我们在主线程拿到数据就可以进行各种在主线程的操作了。
总结一下:
ObservableCreate 一> ObservableSubscribeOn 一> ObservableObserveOn为初始化顺序
当调用observable.subscribe(observer)时的执行顺序
ObservableObserveOn 一> ObservableSubscribeOn 一> ObservableCreate
当发送消息的执行顺序
ObservableCreate 一> ObservableSubscribeOn 一> ObservableObserveOn
以上就是使用RxJava怎么实现消息发送和线程切换,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注创新互联行业资讯频道。
名称栏目:使用RxJava怎么实现消息发送和线程切换
新闻来源:http://azwzsj.com/article/jghddc.html