对Observable.of方法的探究
public static func of(_ elements: E ..., scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable {return ObservableSequence(elements: elements, scheduler: scheduler)}
Observable.of方法接受一个不定参数,其返回一个ObservableSequence类型的Observable,该Observable默认是持有一个CurrentThreadScheduler,该类是一个当前线程任务调度器,调度器内部有一个队列类型,用来存放已提交的还没执行的任务,只要调度器执行完成当前任务,他将执行其队列内部的任务。
该方法整体的逻辑是,创建一个匿名的观察者AnonymousObserver,其持有相应事件的代码
#if DEBUGsynchronizationTracker.register(synchronizationErrorMessage: .default)defer { synchronizationTracker.unregister() }#endifswitch event {case .next(let value):onNext?(value)case .error(let error):if let onError = onError {onError(error)}else {Hooks.defaultErrorHandler(callStack, error)}disposable.dispose()case .completed:onCompleted?()disposable.dispose()}
然后调用Observable的subscribe方法,将自己(Observable与Observer)关联,此处的Observable是ObservableSequence类型,ObservableSequence是Producer的子类,ObservablerSequence的subscribe方法是直接继承Producer的。我们看一下Producer的Subscribe代码:
override func subscribe(_ observer: O) -> Disposable where O.E == Element {if !CurrentThreadScheduler.isScheduleRequired {// The returned disposable needs to release all references once it was disposed.let disposer = SinkDisposer()let sinkAndSubscription = self.run(observer, cancel: disposer)disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)return disposer}else {//开启一个当前线程任务调度器调度了一个任务,该任务启动了observable,sink,observer三个对象的关联return CurrentThreadScheduler.instance.schedule(()) { _ inlet disposer = SinkDisposer()let sinkAndSubscription = self.run(observer, cancel: disposer)disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)return disposer//这个disposer}}}
其实subscribe方法做的事情就是调用自身的run方法实现observer与observable的绑定,至于if else,CurrentThreadScheduler是多线程相关的代码,不是我们关心的重点。接下来我们看一下ObservableSequence的run方法
override func run(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)let subscription = sink.run()return (sink: sink, subscription: subscription)}
这里我们看到,代码里用到了ObservableSequenceSink的类型,其持有了ObserverSequence与观察者还有上面代码传过来的SinkDisposer。ObservableSequenceSink是专门为ObserverSequence而服务的Sink,Sink我们可以理解为管道,将Observable的信号传递给Observer的桥梁。接下来我们看一下ObservableSequenceSink的run方法是如何执行的。
func run() -> Disposable {//启动管道,让调度器开启一个递归调度代码块,这个代码块是发送信号的核心(为什么要递归地执行代码?)return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse invar mutableIterator = iteratorprint("state=\(iterator)")if let next = mutableIterator.next() {print(next)self.forwardOn(.next(next))//发送了一次信号recurse(mutableIterator)}else {self.forwardOn(.completed)self.dispose()}}}
这里run方法做的事情就是在CurrentThreadScheduler里递归地投递任务,如何实现递归地投递?我们看一下scheduleRecursive的代码:
public func scheduleRecursive(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> Void) -> Void) -> Disposable {let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)//此处action就是发送信号的actionrecursiveScheduler.schedule(state)//提交了一个任务到CurrentSchedulerreturn Disposables.create(with: recursiveScheduler.dispose)}
这里是通过RecursiveImmediateScheduler来实现递归的投递任务,从该类的名字我们很容易知道该类的作用:递归地投递立即执行的任务,他的schedule方法是投递任务的关键,我们来看一下schedule方法的实现:
func schedule(_ state: State) {var scheduleState: ScheduleState = .initial//这里的又启动了一个任务,但是这个任务并不能马上执行,所以该任务被封装成ScheduleItem放入Scheduler的执行队列里了let d = self._scheduler.schedule(state) { state -> Disposable in// best effortif self._group.isDisposed {return Disposables.create()}//这里抛开线程安全的代码不看,做的事情其实就是1,将任务从自身group中移除,将并将当前任务状态更改为完成,并将自身所持有的action返回let action = self._lock.calculateLocked { () -> Action? inswitch scheduleState {case let .added(removeKey):self._group.remove(for: removeKey)case .initial:breakcase .done:break}scheduleState = .donereturn self._action}//执行actionif let action = action {print("state=\(state)")action(state, self.schedule)//这里的action其实是发送信号的action,这里的self.schedule是递归的所在}return Disposables.create()}//这里将任务的执行状态设置为加入队列,任务状态一共有三种,intial added doneself._lock.performLocked {switch scheduleState {case .added:rxFatalError("Invalid state")case .initial:if let removeKey = self._group.insert(d) {scheduleState = .added(removeKey)}else {scheduleState = .done}case .done:break}}}
抛开线程安全的代码不看,他就是在CurrentThreadScheduler里投递了一个任务,这个任务当前并没有执行,而是等到当前任务执行完成后才会执行,我们所执行的代码就是当前执行的任务.
public func schedule(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {if CurrentThreadScheduler.isScheduleRequired {CurrentThreadScheduler.isScheduleRequired = falselet disposable = action(state)defer {CurrentThreadScheduler.isScheduleRequired = trueCurrentThreadScheduler.queue = nil}guard let queue = CurrentThreadScheduler.queue else {return disposable}while let latest = queue.value.dequeue() {if latest.isDisposed {continue}latest.invoke()//ScheduledItem要执行了}return disposable}let existingQueue = CurrentThreadScheduler.queuelet queue: RxMutableBox>if let existingQueue = existingQueue {queue = existingQueue}else {queue = RxMutableBox(Queue(capacity: 1))CurrentThreadScheduler.queue = queue}let scheduledItem = ScheduledItem(action: action, state: state)queue.value.enqueue(scheduledItem)return scheduledItem}
上面代码是CurrentThreadScheduler的启动任务方法,以上所讲的代码其实都是在let disposable=action(state)里,都是在执行action().当RecursiveImmediateScheduler提交完递归任务后,代码就会逐级返回,最终会回到CurrentThreadScheduler的schedule方法里,继续向下执行,(defer是一个语法糖,其保证defer的block会在函数将要返回之前执行,可以在defer block里执行一些清理工作),接下来函数将保存好的ScheduleItem取出,执行,这里的ScheduleItem所持有的block是RecursiveImmediateScheduler在他的schedule方法里提交的,代码如下:
let d = self._scheduler.schedule(state) { state -> Disposable in// best effortif self._group.isDisposed {return Disposables.create()}//这里抛开线程安全的代码不看,做的事情其实就是1,将任务从自身group中移除,将并将当前任务状态更改为完成,并将自身所持有的action返回let action = self._lock.calculateLocked { () -> Action? inswitch scheduleState {case let .added(removeKey):self._group.remove(for: removeKey)case .initial:breakcase .done:break}scheduleState = .donereturn self._action}//执行actionif let action = action {print("state=\(state)")action(state, self.schedule)//这里的action其实是发送信号的action,这里的self.schedule是递归的所在}return Disposables.create()}
其实就是执行RecursiveScheduler所持有的代码:
var mutableIterator = iteratorprint("state=\(iterator)")if let next = mutableIterator.next() {print(next)self.forwardOn(.next(next))//发送了一次信号recurse(mutableIterator)}else {self.forwardOn(.completed)self.dispose()}
该action的两个参数,一个是由最开始调用Observable.of传的sequence参数生成的迭代器,另一个参数是RecursiveScheduler的schedule函数。我们注意观察这个action,这个action的大意是取出迭代器下一个元素,发送信号,然后再次执行RecursiveScheduler的schedule方法,其实就是再次提交一个任务到CurrentThreadScheduler,提交这个任务时,迭代器还是最开始的那个迭代器,只是他的next已经指向了下一个元素,也就能发送下一个信号了,随着迭代器持续的next(),最后迭代器的next方法将返回nil,那么action将发送完成信号,然后开始清理工作。至此,Observable.of().suscribe方法已经完成了信号发送,响应。
我们再看一下forwardOn代码:
final func forwardOn(_ event: Event) {#if DEBUGself._synchronizationTracker.register(synchronizationErrorMessage: .default)defer { self._synchronizationTracker.unregister() }#endifif isFlagSet(&self._disposed, 1) {return}self._observer.on(event)}
这段代码就是调用了其持有的观察者的on block,也就是匿名观察者持有的信号响应block,这个block判断信号,如果是.next的,就会调用最开始你写的响应方法,也就完成了信号的相应。
RxSwift是一个优秀的框架,本人水平有限,如有错误,请批评指正,如果转载,请附上原链接
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
