ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [iOS/Swift] RxSwift subscribe flow
    야매 iOS 2023. 8. 7. 20:48

    방출된 이벤트를 구독하는 일련의 과정 알아보기

    사전 지식

    ObservableConvertibleType

    public protocol ObservableConvertibleType {
        /// Type of elements in sequence.
        associatedtype Element
    
        /// Converts `self` to `Observable` sequence.
        ///
        /// - returns: Observable sequence that represents `self`.
        func asObservable() -> Observable<Element>
    }
    

    ObservableType

    public protocol ObservableType: ObservableConvertibleType {
        func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
    }
    
    extension ObservableType {
        
        /// Default implementation of converting `ObservableType` to `Observable`.
        public func asObservable() -> Observable<Element> {
            Observable.create { o in self.subscribe(o) }
        }
    }
    

    Observable

    Observable은 ObservableType를 준수하는 구체클래스(실체가 있다)

    • subscribe의 내부가 비어있는데, 이는 Observable를 직접 생성해 subscribe 할 수 없기 때문
    public class Observable<Element> : ObservableType {
        init() {
    				#if TRACE_RESOURCES
            _ = Resources.incrementTotal()
    				#endif
        }
        
        public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
            rxAbstractMethod()
        }
        
        public func asObservable() -> Observable<Element> { self }
        
        deinit {
    				#if TRACE_RESOURCES
            _ = Resources.decrementTotal()
    				#endif
        }
    }
    

    Producer

    Observable을 상속받는 클래스

    filter, map을 사용하면 Filter 인스턴스와 Map 인스턴스가 생성되는데 모두 Producer를 준수한다.

    run 메서드가 빈 메서드로 추가되어 있음

    • Producer를 상속받는 클래스는 run의 구현부를 작성해야 하며 이는 Producer의 subscribe 메서드와 밀접한 관계를 가진다.
    class Producer<Element>: Observable<Element> {
        override init() {
            super.init()
        }
    
        override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
            if !CurrentThreadScheduler.isScheduleRequired {
                // The returned disposable needs to release all references once it was disposed.
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                return disposer
            }
            else {
                return CurrentThreadScheduler.instance.schedule(()) { _ in
                    let disposer = SinkDisposer() // Observable를 subscribe 할 때 받는 그 disposable
                    let sinkAndSubscription = self.run(observer, cancel: disposer)
                    disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                    return disposer
                }
            }
        }
    
        func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
            rxAbstractMethod()
        }
    }
    
    final private class Filter<Element>: Producer<Element> {
    		...
    
        override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
    				// Sink를 생성함 Sink를 통해 observer에게 값을 보내고 dispose 할 수 있음
            let sink = FilterSink(predicate: self.predicate, observer: observer, cancel: cancel)
     				/* 
    				source는 이전 Observable로 Observable.just(1).filter의 경우 just(1)이 _source에 해당함
    				sink는 ObserverType으로 source의 observe 한다.
    				따라서 생성은 순차적으로, 구독은 역순으로 진행되게 된다.
    				*/
            let subscription = self.source.subscribe(sink)
            return (sink: sink, subscription: subscription)
        }
    }
    

    탐구예시로 확인하기

    Observable.just(3)
    	.filter { $0 % 4 != 0 }
    	.map { $0 - 4 }
    	.subscribe()
    

    구독 전

    Observable.just 부터 map까지 subscribe를 하지 않으면 내부적으로 진행되는 일은 딱 한 가지밖에 없다.

    Filter과 map은 Producer이기 때문에 Filter 또는 Map을 source와 필요한 매개변수를 사용해 초기화 한다.

    Map(Filter(Just))가 되어 있는 상태라고 할 수 있다.
    

    구독 시작

    subscribe() → Map

    subscribe()

    subscribe 메서드를 사용해 Map과의 연결을 시도한다.

    Producer에 있는 subscribe 메서드와 다르게 observer를 인자로 받지 않는다.

    subscribe()를 하면 내부적으로 AnonymousObserver를 생성해서 subscribe(observer: ) 메서드의 인자로 넣어준다.

     

    AnonymousObserver

    subscribe 메서드를 보통 사용하면 매개변수로 (onNext, onComplete)와 같은 클로저를 전달할 때가 있다.

    이 클로저들은 AnonymousObserver 내부에서 사용되고 있다.

     

    subscribe(observer: )

    subscribe(observer:)를 호출하면 SinkDisposer가 생성된다.

    • SinkDisposer는 SinkDiposer의 sink와 subscription의 dispose를 관리한다

    생성한 SinkDisposer와 Observer(Anonymous Observer) 를 인자로 run 메서드를 호출한다.

    run

    SinkDisposer와 Observer를 사용해 MapSink를 생성한다.

    MapSink는 Observer를 준수하고 있으므로 _source(Filter)의 subscribe의 인자로 전달할 수 있다.

    Map → Filter

    subscribe(observer: )

    subscribe(observer:)를 호출하면 SinkDisposer가 생성한다

    • SinkDisposer는 SinkDiposer의 sink와 subscription의 dispose를 관리한다

    생성한 SinkDisposer와 Observer를 매개변수로 run 메서드를 호출한다.

    run

    SinkDisposer와 Observer를 사용해 FilterSink를 생성한다.

    FilterSink는 Observer를 준수하고 있으므로 _source(Observable.just)의 subscribe의 인자로 전달할 수 있다.

    Filter → Observable.just

    Observable.just에 subscribe 한다

    Observable.just

    class Just<Element> : Producer<Element> {
      private let _element: Element
      
      init(element: Element) {
        _element = element
      }
      
      override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        observer.on(.next(_element))
        observer.on(.completed)
        return Disposables.create()
      }
    }
    

    값 & completed 이벤트를 옵저버에게 전달한다.

    Disposables를 반환한다.

    Observable.just → Filter

    run 메서드에서 Observable.just().subscribe의 반환값인 Disposable과 생성한 FilterSink를 반환한다.

    Filter의 subscribe(observer: )은 위에서 반환 받은 값을 모두 이전에 생성한 SinkDisposer 인스턴스에 할당한다.

    그리고 SinkDisposer를 반환한다

    Filter → Map

    run 메서드에서 Filter.subscribe의 반환값이 SinkDisposer와 생성한 MapSink를 반환한다.

    Map의 subscribe(observer:)은 위에서 받은 값을 모두 이전에 생성한 SinkDisposer 인스턴스에 할당한다.

    그리고 SinkDisposer를 반환한다.

    Map → Closure

    SinkDisposer 반환

    구독 끝(Dispose)

    Observable.just → Filter

    onComplete 이벤트 방출

    Filter

    onComplete이벤트를 FilterSink가 받으면 FilterSink의 dispose 메서드를 호출한다.

    FilterSink의 dispose 메서드를 호출하면 FilterSink를 생성할 때 넣어줬던 SinkDisposer(Filter)의 dispose를 호출한다.

    • SinkDisposer는 가지고 있는 두 가지에게 dispose 메서드를 호출한다
      1. FilterSink
      2. Disposable(Observable.just)

    Filter → Map

    observer를 통해 onComplete 이벤트 방출

    Map

    onComplete 이벤트를 MapSink가 받으면 MapSink의 dispose 메서드를 호출한다

    MapSink의 dispose 메서드를 호출하면 MapSink를 생성할 때 넣어줬던 SinkDisposer(Map)의 dispose를 호출한다.

    • SinkDisposer는 가지고 있는 두 가지에게 dispose 메서드를 호출한다
      1. MapSink
      2. Filter(SinkDisposer)

    Map → Closure

    observer를 통해 onComplete 이벤트 방출

    Closure

    onCompleted 클로저 호출 후 disposable.dispose()로 dispose 처리

    '야매 iOS' 카테고리의 다른 글

    [iOS/Swift] RxGesture - Gesture가 동시에 인식되는 문제  (0) 2023.08.07
    [Swift] Dispatch  (0) 2023.08.07
    [Swift] Copy on Write  (0) 2023.08.07
    [iOS/Swift] SwiftUI 계산기  (0) 2023.08.07
    [iOS] Certificate & Provisioning  (0) 2023.08.07
Designed by Tistory.