야매 iOS

[iOS/Swift] RxSwift subscribe flow

the Cosmos 2023. 8. 7. 20:48

방출된 이벤트를 구독하는 일련의 과정 알아보기

사전 지식

ObservableConvertibleType

public protocol ObservableConvertibleType {
    /// Type of elements in sequence.
    associatedtype Element

    /// Converts `self` to `Observable` sequence.
    ///
    /// - returns: Observable sequence that represents `self`.
    func asObservable() -> Observable<Element>
}

ObservableType

public protocol ObservableType: ObservableConvertibleType {
    func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
}

extension ObservableType {
    
    /// Default implementation of converting `ObservableType` to `Observable`.
    public func asObservable() -> Observable<Element> {
        Observable.create { o in self.subscribe(o) }
    }
}

Observable

Observable은 ObservableType를 준수하는 구체클래스(실체가 있다)

  • subscribe의 내부가 비어있는데, 이는 Observable를 직접 생성해 subscribe 할 수 없기 때문
public class Observable<Element> : ObservableType {
    init() {
				#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
				#endif
    }
    
    public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        rxAbstractMethod()
    }
    
    public func asObservable() -> Observable<Element> { self }
    
    deinit {
				#if TRACE_RESOURCES
        _ = Resources.decrementTotal()
				#endif
    }
}

Producer

Observable을 상속받는 클래스

filter, map을 사용하면 Filter 인스턴스와 Map 인스턴스가 생성되는데 모두 Producer를 준수한다.

run 메서드가 빈 메서드로 추가되어 있음

  • Producer를 상속받는 클래스는 run의 구현부를 작성해야 하며 이는 Producer의 subscribe 메서드와 밀접한 관계를 가진다.
class Producer<Element>: Observable<Element> {
    override init() {
        super.init()
    }

    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer() // Observable를 subscribe 할 때 받는 그 disposable
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }

    func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        rxAbstractMethod()
    }
}

final private class Filter<Element>: Producer<Element> {
		...

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
				// Sink를 생성함 Sink를 통해 observer에게 값을 보내고 dispose 할 수 있음
        let sink = FilterSink(predicate: self.predicate, observer: observer, cancel: cancel)
 				/* 
				source는 이전 Observable로 Observable.just(1).filter의 경우 just(1)이 _source에 해당함
				sink는 ObserverType으로 source의 observe 한다.
				따라서 생성은 순차적으로, 구독은 역순으로 진행되게 된다.
				*/
        let subscription = self.source.subscribe(sink)
        return (sink: sink, subscription: subscription)
    }
}

탐구예시로 확인하기

Observable.just(3)
	.filter { $0 % 4 != 0 }
	.map { $0 - 4 }
	.subscribe()

구독 전

Observable.just 부터 map까지 subscribe를 하지 않으면 내부적으로 진행되는 일은 딱 한 가지밖에 없다.

Filter과 map은 Producer이기 때문에 Filter 또는 Map을 source와 필요한 매개변수를 사용해 초기화 한다.

Map(Filter(Just))가 되어 있는 상태라고 할 수 있다.

구독 시작

subscribe() → Map

subscribe()

subscribe 메서드를 사용해 Map과의 연결을 시도한다.

Producer에 있는 subscribe 메서드와 다르게 observer를 인자로 받지 않는다.

subscribe()를 하면 내부적으로 AnonymousObserver를 생성해서 subscribe(observer: ) 메서드의 인자로 넣어준다.

 

AnonymousObserver

subscribe 메서드를 보통 사용하면 매개변수로 (onNext, onComplete)와 같은 클로저를 전달할 때가 있다.

이 클로저들은 AnonymousObserver 내부에서 사용되고 있다.

 

subscribe(observer: )

subscribe(observer:)를 호출하면 SinkDisposer가 생성된다.

  • SinkDisposer는 SinkDiposer의 sink와 subscription의 dispose를 관리한다

생성한 SinkDisposer와 Observer(Anonymous Observer) 를 인자로 run 메서드를 호출한다.

run

SinkDisposer와 Observer를 사용해 MapSink를 생성한다.

MapSink는 Observer를 준수하고 있으므로 _source(Filter)의 subscribe의 인자로 전달할 수 있다.

Map → Filter

subscribe(observer: )

subscribe(observer:)를 호출하면 SinkDisposer가 생성한다

  • SinkDisposer는 SinkDiposer의 sink와 subscription의 dispose를 관리한다

생성한 SinkDisposer와 Observer를 매개변수로 run 메서드를 호출한다.

run

SinkDisposer와 Observer를 사용해 FilterSink를 생성한다.

FilterSink는 Observer를 준수하고 있으므로 _source(Observable.just)의 subscribe의 인자로 전달할 수 있다.

Filter → Observable.just

Observable.just에 subscribe 한다

Observable.just

class Just<Element> : Producer<Element> {
  private let _element: Element
  
  init(element: Element) {
    _element = element
  }
  
  override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
    observer.on(.next(_element))
    observer.on(.completed)
    return Disposables.create()
  }
}

값 & completed 이벤트를 옵저버에게 전달한다.

Disposables를 반환한다.

Observable.just → Filter

run 메서드에서 Observable.just().subscribe의 반환값인 Disposable과 생성한 FilterSink를 반환한다.

Filter의 subscribe(observer: )은 위에서 반환 받은 값을 모두 이전에 생성한 SinkDisposer 인스턴스에 할당한다.

그리고 SinkDisposer를 반환한다

Filter → Map

run 메서드에서 Filter.subscribe의 반환값이 SinkDisposer와 생성한 MapSink를 반환한다.

Map의 subscribe(observer:)은 위에서 받은 값을 모두 이전에 생성한 SinkDisposer 인스턴스에 할당한다.

그리고 SinkDisposer를 반환한다.

Map → Closure

SinkDisposer 반환

구독 끝(Dispose)

Observable.just → Filter

onComplete 이벤트 방출

Filter

onComplete이벤트를 FilterSink가 받으면 FilterSink의 dispose 메서드를 호출한다.

FilterSink의 dispose 메서드를 호출하면 FilterSink를 생성할 때 넣어줬던 SinkDisposer(Filter)의 dispose를 호출한다.

  • SinkDisposer는 가지고 있는 두 가지에게 dispose 메서드를 호출한다
    1. FilterSink
    2. Disposable(Observable.just)

Filter → Map

observer를 통해 onComplete 이벤트 방출

Map

onComplete 이벤트를 MapSink가 받으면 MapSink의 dispose 메서드를 호출한다

MapSink의 dispose 메서드를 호출하면 MapSink를 생성할 때 넣어줬던 SinkDisposer(Map)의 dispose를 호출한다.

  • SinkDisposer는 가지고 있는 두 가지에게 dispose 메서드를 호출한다
    1. MapSink
    2. Filter(SinkDisposer)

Map → Closure

observer를 통해 onComplete 이벤트 방출

Closure

onCompleted 클로저 호출 후 disposable.dispose()로 dispose 처리