ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [iOS/Swift] Combine 알아보기 ix
    야매 iOS 2023. 3. 26. 17:12

    Publisher에 subscribe 하면 어떤 일이 일어나는지 이해하기

    Publisher가 값을 뱅출하면 sink의 receiveValue 또는 receiveCompletion 클로저가 호출된다.

    sink

    sink는 Publisher 프로토콜에 정의된 메서드

    • 어느 publisher에서도 sink를 호출할 수 있는 이유

    Combine의 Subscriber’s enum을 보면 Sink 객체가 있는 것을 확인할 수 있다.

    → 위 정보로 sink 메서드 내부에서 Sink subscriber를 생성하고 그 subscriber가 publisher를 subscribe 한다는 것을 유추할 수 있다

    extension Publisher {
        func customSink(
            receiveCompletion: @escaping (Subscribers.Completion<Self.Failure>) -> Void,
            receiveValue: @escaping (Self.Output) -> Void
        ) -> AnyCancellable {
            
            let sink = Subscribers.Sink(receiveCompletion: receiveCompletion, receiveValue: receiveValue)
            self.subscribe(sink)
            
            return AnyCancellable(sink)
        }
    }
    

    위의 코드는 실제로 sink의 구현부가 아니라는 것을 유의해야 한다.

    이해를 돕기 위한 코드다.

    subscirbe

    customSink 내부에서 subscribe 메서드가 사용되는 것을 확인할 수 있다.

    subscribe 메서드는 publisher와 subscriber를 이어 구독 스트림을 설정한다.

    Publisher은 subscribe에 대해 여러 구현이 존재한다.

    subscribe(Subject)

    publisher에 subject를 부착한다.

    • publisher로부터 방출된 이벤트를 구독하고 있는 subject에게 넘겨준다.\
    var cancellables = Set<AnyCancellable>()
    
    let publisher = [1,2,3].publisher
    let subject = PassthroughSubject<Int, Never>()
    
    subject
        .sink(receiveValue: { receivedInt in
            print("subject", receivedInt)
        })
        .store(in: &cancellables)
    
    publisher
        .subscribe(subject)
        .store(in: &cancellables)
    
    /*
    subject 1
    subject 2
    subject 3
    */
    

    subscribe(Subscriber)

    customSink 내부에서 사용된 메서드

    subscriber와 publisher의 사이의 구독 스트림을 생성한다.

    애플 공식 문서를 살펴보면 Publisher 프로토콜에 있는 receive(subscriber:) 메서드에 대한 언급이 있다.

    • Publisher의 어느 subscribe 메서드들을 호출하면 그 내부에서 receive(subscriber:)가 호출된다.

    receive(subscriber)

    receive(subscriber) 메서드는 새로운 subscriber를 받아 아래의 메서드로 subscriber와 상호작용 한다.

    • receive(subscription)
      • subscriber에게 subscription을 전달하기 위해 사용하는 메서드
      • subscription을 이용해 새로운 값을 요구할 수도 있고 구독을 종료할 수 있다.
    • receive(_:)
      • subscriber에게 새로운 값을 전달할 때 호출된다.
    • receive(completion:)
      • subscirber에게 종료 이벤트를 전달할 때 호출

    이 메서드들은 Subscriber 프로토콜을 준수하는 인스턴스가 꼭 구현해야 한다.

    Custom Subscriber

    아래는 커스텀 Subscriber를 구현한 코드다.

    extension Subscribers {
        class CustomSink<Input, Failure: Error>: Subscriber {
            let receiveValue: (Input) -> Void
            let receiveCompletion: (Subscribers.Completion<Failure>) -> Void
            
            var subscription: Subscription?
            
            init(
                receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void,
                 receiveValue: @escaping (Input) -> Void
            ) {
                self.receiveCompletion = receiveCompletion
                self.receiveValue = receiveValue
            }
            
            func receive(subscription: Subscription) {
                self.subscription = subscription
                subscription.request(.unlimited)
            }
            
            func receive(_ input: Input) -> Subscribers.Demand {
                receiveValue(input)
                return .none
            }
            
            func receive(completion: Subscribers.Completion<Failure>) {
                receiveCompletion(completion)
            }
        }
    }
    

    Subscibers를 확장해서 새로운 클래스인 CustomSink를 추가했다.

    커스텀 Subscriber의 Input과 Failure 타입은 구독하려는 Publisher의 그것과 동일해야 한다.

    보통의 Subsciber.Sink처럼 커스텀 버전도 receive(_:)와 receive(completion:) 클로저를 가진다.

    receive(subscription: Subscription)

    인자로 받은 subscription을 저장한다.

    • 원할 때 구독을 취소하기 위해서

    Subscriber.Sink와 같이 publisher로부터 방출되는 모든 값을 받기 위해 subscription.request(.unlimited)를 사용한다.

    Subscribers.Demand

    이는 Combine의 backpressure 또는 backpressure 관리라고 불린다.

    Subscribers.CustomSink 적용

    extension Publisher {
        func customSink(
            receiveCompletion: @escaping (Subscribers.Completion<Self.Failure>) -> Void,
            receiveValue: @escaping (Self.Output) -> Void
        ) -> AnyCancellable {
            
            let sink = Subscribers.CustomSink(receiveCompletion: receiveCompletion, receiveValue: receiveValue)
            self.subscribe(sink)
            
            return AnyCancellable(sink)
        }
    }
    

    Publisher 내부에서 Subscriber 프로토콜을 준수하는 인스턴스와 연결을 맺고 난 후 그 Subscriber를 사용해 AnyCancellable을 wrapping 해야 한다.

    이를 위해선 CustomSink가 Cancellable 프로토콜을 준수해야 한다.

    extension Subscribers.CustomSink: Cancellable {
        func cancel() {
            subscription?.cancel()
            subscription = nil
        }
    }
    /*
    구독을 취소하고 subscription에 nil을 할당해 memeory leak으로부터 방지한다. 
    */
    

    backpressure 이해하기

    backpressure는 Combine에서 가장 중요한 기능이다.

    • 하지만 이 기능은 프레임워크의 사용자에게 잘 가려져있다.

    Combine을 사용할 때 backpressure를 잘 몰라도 된다

    • 하지만 커스텀 Subscriber 또는 Publisher를 만들기 위해선 backpressure의 이해가 필요하다.

    Combine에서 받을 아이템 수를 지정하는 것은 subscriber

    • subscription 객체를 통해 값을 요구한다.

    subscription 객체를 통해 subscriber가 새로운 값을 받을 준비가 되었는지 소통한다.

    • subscription이 subscriber가 처리할 수 있는 것보다 많은 값을 건네주는 것을 방지할 수 있다.

    subscriber는 receive(subscription:) 메서드를 사용해 초기에 몇 개의 값을 받을 것인지 지정할 수 있다.

    • Subscribers.Sink는 구독이 끊길 때까지 무제한으로 아이템을 받는다.
    • 이는 Subscribers.Demand.unlimited 객체를 통해 표현된다.

    Subscribers.Demand

    unlimited

    • publisher가 방출하는 모든 값을 받는다.

    max(Int)

    • 최대 Int로 지정된 수만 받는다.

    none

    • 하나도 받지 않는다.

    Demand와 Subscriber

    // 하나의 값만 방출된다.
    func receive(subscription: Subscription) {
    		self.subscription = subscription
    		subscription.request(.max(1))
    }
    

    위 코드를 demand의 초기값은 1이다.

    subscriber가 새로운 subscription.request(_:)의 매개변수로 새로운 demand를 건네준다면 새로운 값을 추가로 받을 수 있다.

    subscriber의 receive(_:)의 메서드로 .none 보다 큰 값을 가지는 demand를 반환하면 새로운 값을 추가로 받을 수 있다.

    → 초기값에서 새로운 demand 값을 더해 demand를 증가시킬 수 있다.

    CustomSink

    CustomSink의 receive(_:) 메서드는 아래와 같다

    func receive(_ input: Input) -> Subscribers.Demand {
    		receiveValue(input)
    		return .none
    }
    

    위 코드에서 새로운 값을 받을 때마다 .none를 반환하지만 계속 값을 받을 수 있는 것을 알 수 있다.

    receive(_:)에서 .none을 반환하는 것은 초기에 설정한 demand에 부정적으로 영향을 미치지 않는다.

    • Combine의 demand는 상승할 수만 있다.
    • .max(-1)같이 demand를 낮출 수 없다.

    그러므로 demand의 초기값이 .unlimit이고 추가되는 demand가 .none이기 때문에 계속해서 값을 받을 수 있는 것

    Demand와 Completion

    Subscription은 demand가 높을 때만 값을 subscriber에게 보낼 수 있다.

    • 초기값이 .max(10)이고 receive(_:)에서 .none을 반환할 때 subscription은 10번째 아이템 이후로 subscriber에게 값을 전달하지 않을 것이다.
    • demand의 제한에 도달하면 publisher가 방출하는 completion 이벤트를 받지 못할 수도 있다.

    Custom Publisher

    Apple의 custom publisher를 생성하는 것을 지양한다

    • 적합한 backpressure management를 custom publisher에 적용하는 것은 쉽지 않은 일이기 때문

    custom publisher 생성하기

    Custom Publisher는 Publisher 프로토콜을 준수해야 하고 receive(_:) 메서드를 구현해야 한다.

    Multiple Subscription

    URLSession.DataTaskPublisher를 사용할 때 multiple subscription이 생기면 그만큼의 네트워크 요청이 생성되는 것을 본 적이 있다.

    어떻게 본다면 value stream을 생성하는 것은 publisher가 아닐 수도 있다.

    var cancellables = Set<AnyCancellables>()
    
    let ints = [1,2,3].publisher
    
    ints
    	.sink(receiveValue : { print($0) })
    	.store(in: &cancellables)
    
    ints
    	.sink(receiveValue: { print($0) })
    	.store(in &cancellables)
    

    ints로부터 동일하게 1,2,3이 방출되는 것을 볼 수 있다.

    하지만 방출되는 그 시점은 모두 다르다.

    → 방출하는 행위는 publisher의 밀접하게 묶여있지 않음을 의미한다.

    궁극적으로 publisher가 생성한 subscription이 값을 방출과 시작을 책임진다.

    Future와 Publisher의 차이점

    Publisher는 subscription이 값의 방출을 담당한다.

    → subscriber가 있어야 한다.

    future는 publisher와 달리 바로 한 번만 실행된다.

    Custom Publisher

    extension Publishers {
    		struct IntPublisher: Publiser {
    				typealias Output = Int
    				typealias Failure = Never
    
    				let numberOfValues: Int
    
    				func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
    						// subscriber를 사용해 subscription 생성
    						let subscription = Subscriptions.IntSubscription(numberOfValues: numberOfValues, subscriber: subscriber)
    						subscriber.receive(subscription: subscription)
    				}
    		}
    }
    

    Custom Subscription

    extension Subscriptions {
        class IntSubscription<S: Subscriber>: Subscription where S.Input == Int, S.Failure == Never {
            let numberOfValues: Int
            var currentValue = 0
    
            var subscriber: S?
    
            var openDemand = Subscribers.Demand.none
    
            init(numberOfValues: Int, subscriber: S) {
                self.numberOfValues = numberOfValues
                self.subscriber = subscriber
            }
    
            func request(_ demand: Subscribers.Demand) {
    						// subscriber로부터 초기값 가져옴
                openDemand += demand
    
    						
                while openDemand > 0 && currentValue < numberOfValues {
    								// receive로 반환된 demand 값 더해줌
                    if let newDemand = subscriber?.receive(currentValue) {
                        openDemand += newDemand
                    }
    								// 방출한 값의 수보다 demand가 적으면 방출을 하지 않아야 하므로 openDemand -= 1
                    currentValue += 1
                    openDemand -= 1
                }
    
                if currentValue == numberOfValues {
                    subscriber?.receive(completion: .finished)
                    cancel()
                }
            }
    
            func cancel() {
                subscriber = nil
            }
        }
    }
    

    위의 예시들은 대략 어떻게 동작하는지에 대한 이해를 돕기 위함이다.

    실제로 위와 같이 구현되어 있는지는 정확하지 않다.

    let customPublisher = Publishers.IntPublisher(numberOfValues: 10)
    
    customPublisher
    		.customSink(receiveCompletion: { completion in
    				print(completion)
    		}, receiveValue: { int in
    				print(int)
    		})
    		.store(in: &cancellables)
    		
    

    위 코드를 실행시켜 보면 이때까지 사용했던 애플이 제공하는 publisher와 subscriber를 사용했을 때와 동일한 결과가 나오는 것을 알 수 있다.

    UIControl을 custom publisher를 사용해 확장하기

    UIControl을 custom publisher를 사용해 확장해서 사용할 수 있다.

    아래는 특정 이벤트를 tracking 하기 위한 코드다.

    slider
    		.publisher(for: .valueChanged)
    		.sink(receiveValue: { control in
    				guard let slider = control as? UISlider else { return }
    				print(slider.value)
    		}).store(in: &cancellables)
    

    Publisher

    extension UIControl {
        struct EventPublisher: Publisher {
            typealias Output = UIControl
            typealias Failure = Never
    
            let control: UIControl
            let controlEvent: UIControl.Event
    
            func receive<S>(subscriber: S) where S : Subscriber, Never == S.Failure, Output == S.Input {
                let subscription = EventSubscription(control: control, event: controlEvent, subscriber: subscriber)
                subscriber.receive(subscription: subscription)
            }
        }
    }
    

    Subscription

    extension UIControl {
        class EventSubscription<S: Subscriber>: Subscription where S.Input == UIControl, S.Failure == Never {
    
            let control: UIControl
            let event: UIControl.Event
            var subscriber: S?
    
            var currentDemand = Subscribers.Demand.none
    
            init(control: UIControl, event: UIControl.Event, subscriber: S) {
                self.control = control
                self.event = event
                self.subscriber = subscriber
                /*
                생성되지마자 subscriber가 없어도 이벤트를 받는다
                 */
                control.addTarget(self, action: #selector(eventOccured), for: event)
            }
    
            func request(_ demand: Subscribers.Demand) {
                currentDemand += demand
            }
    
            func cancel() {
                subscriber = nil
    
                control.removeTarget(self, action: #selector(eventOccured), for: event)
            }
    
            @objc
            func eventOccured() {
                if currentDemand > 0 {
                    currentDemand += subscriber?.receive(control) ?? .none
                    // 값을 방출했으므로 1을 줄인다.
                    currentDemand -= 1
                }
            }
        }
    }
    

    UIControl Extension

    extension UIControl {
    		func publisher(for event: UIControl.Event) -> UIControl.EventPublisher {
    				return UIControl.EventPublisher(control: self, controlEvent: event)
    		}
    }
    

    사용법

    slider에 값이 변경되면 sink내의 클로저가 호출된다.

    slider
    		.publisher(for: .valueChanged)
    		.debounce(for: 0.2, scheduler: DispatchQueue.main)
    		.sink(receiveValue: { control in
    				guard let slider = control as? UISlider else { return }
    				print(slider.value)
    		})
    		.store(in: &cancellables)
    
Designed by Tistory.