[iOS/Swift] RxSwift subscribe flow
방출된 이벤트를 구독하는 일련의 과정 알아보기
사전 지식
ObservableConvertibleType
public protocol ObservableConvertibleType {
/// Type of elements in sequence.
associatedtype Element
/// Converts `self` to `Observable` sequence.
///
/// - returns: Observable sequence that represents `self`.
func asObservable() -> Observable<Element>
}
ObservableType
public protocol ObservableType: ObservableConvertibleType {
func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
}
extension ObservableType {
/// Default implementation of converting `ObservableType` to `Observable`.
public func asObservable() -> Observable<Element> {
Observable.create { o in self.subscribe(o) }
}
}
Observable
Observable은 ObservableType를 준수하는 구체클래스(실체가 있다)
- subscribe의 내부가 비어있는데, 이는 Observable를 직접 생성해 subscribe 할 수 없기 때문
public class Observable<Element> : ObservableType {
init() {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
}
public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
rxAbstractMethod()
}
public func asObservable() -> Observable<Element> { self }
deinit {
#if TRACE_RESOURCES
_ = Resources.decrementTotal()
#endif
}
}
Producer
Observable을 상속받는 클래스
filter, map을 사용하면 Filter 인스턴스와 Map 인스턴스가 생성되는데 모두 Producer를 준수한다.
run 메서드가 빈 메서드로 추가되어 있음
- Producer를 상속받는 클래스는 run의 구현부를 작성해야 하며 이는 Producer의 subscribe 메서드와 밀접한 관계를 가진다.
class Producer<Element>: Observable<Element> {
override init() {
super.init()
}
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer() // Observable를 subscribe 할 때 받는 그 disposable
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
rxAbstractMethod()
}
}
final private class Filter<Element>: Producer<Element> {
...
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
// Sink를 생성함 Sink를 통해 observer에게 값을 보내고 dispose 할 수 있음
let sink = FilterSink(predicate: self.predicate, observer: observer, cancel: cancel)
/*
source는 이전 Observable로 Observable.just(1).filter의 경우 just(1)이 _source에 해당함
sink는 ObserverType으로 source의 observe 한다.
따라서 생성은 순차적으로, 구독은 역순으로 진행되게 된다.
*/
let subscription = self.source.subscribe(sink)
return (sink: sink, subscription: subscription)
}
}
탐구예시로 확인하기
Observable.just(3)
.filter { $0 % 4 != 0 }
.map { $0 - 4 }
.subscribe()
구독 전
Observable.just 부터 map까지 subscribe를 하지 않으면 내부적으로 진행되는 일은 딱 한 가지밖에 없다.
Filter과 map은 Producer이기 때문에 Filter 또는 Map을 source와 필요한 매개변수를 사용해 초기화 한다.
Map(Filter(Just))가 되어 있는 상태라고 할 수 있다.
구독 시작
subscribe() → Map
subscribe()
subscribe 메서드를 사용해 Map과의 연결을 시도한다.
Producer에 있는 subscribe 메서드와 다르게 observer를 인자로 받지 않는다.
subscribe()를 하면 내부적으로 AnonymousObserver를 생성해서 subscribe(observer: ) 메서드의 인자로 넣어준다.
AnonymousObserver
subscribe 메서드를 보통 사용하면 매개변수로 (onNext, onComplete)와 같은 클로저를 전달할 때가 있다.
이 클로저들은 AnonymousObserver 내부에서 사용되고 있다.
subscribe(observer: )
subscribe(observer:)를 호출하면 SinkDisposer가 생성된다.
- SinkDisposer는 SinkDiposer의 sink와 subscription의 dispose를 관리한다
생성한 SinkDisposer와 Observer(Anonymous Observer) 를 인자로 run 메서드를 호출한다.
run
SinkDisposer와 Observer를 사용해 MapSink를 생성한다.
MapSink는 Observer를 준수하고 있으므로 _source(Filter)의 subscribe의 인자로 전달할 수 있다.
Map → Filter
subscribe(observer: )
subscribe(observer:)를 호출하면 SinkDisposer가 생성한다
- SinkDisposer는 SinkDiposer의 sink와 subscription의 dispose를 관리한다
생성한 SinkDisposer와 Observer를 매개변수로 run 메서드를 호출한다.
run
SinkDisposer와 Observer를 사용해 FilterSink를 생성한다.
FilterSink는 Observer를 준수하고 있으므로 _source(Observable.just)의 subscribe의 인자로 전달할 수 있다.
Filter → Observable.just
Observable.just에 subscribe 한다
Observable.just
class Just<Element> : Producer<Element> {
private let _element: Element
init(element: Element) {
_element = element
}
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
observer.on(.next(_element))
observer.on(.completed)
return Disposables.create()
}
}
값 & completed 이벤트를 옵저버에게 전달한다.
Disposables를 반환한다.
Observable.just → Filter
run 메서드에서 Observable.just().subscribe의 반환값인 Disposable과 생성한 FilterSink를 반환한다.
Filter의 subscribe(observer: )은 위에서 반환 받은 값을 모두 이전에 생성한 SinkDisposer 인스턴스에 할당한다.
그리고 SinkDisposer를 반환한다
Filter → Map
run 메서드에서 Filter.subscribe의 반환값이 SinkDisposer와 생성한 MapSink를 반환한다.
Map의 subscribe(observer:)은 위에서 받은 값을 모두 이전에 생성한 SinkDisposer 인스턴스에 할당한다.
그리고 SinkDisposer를 반환한다.
Map → Closure
SinkDisposer 반환
구독 끝(Dispose)
Observable.just → Filter
onComplete 이벤트 방출
Filter
onComplete이벤트를 FilterSink가 받으면 FilterSink의 dispose 메서드를 호출한다.
FilterSink의 dispose 메서드를 호출하면 FilterSink를 생성할 때 넣어줬던 SinkDisposer(Filter)의 dispose를 호출한다.
- SinkDisposer는 가지고 있는 두 가지에게 dispose 메서드를 호출한다
- FilterSink
- Disposable(Observable.just)
Filter → Map
observer를 통해 onComplete 이벤트 방출
Map
onComplete 이벤트를 MapSink가 받으면 MapSink의 dispose 메서드를 호출한다
MapSink의 dispose 메서드를 호출하면 MapSink를 생성할 때 넣어줬던 SinkDisposer(Map)의 dispose를 호출한다.
- SinkDisposer는 가지고 있는 두 가지에게 dispose 메서드를 호출한다
- MapSink
- Filter(SinkDisposer)
Map → Closure
observer를 통해 onComplete 이벤트 방출
Closure
onCompleted 클로저 호출 후 disposable.dispose()로 dispose 처리