之前的文章已经介绍过Publisher
和Subscriber
,对于概念类的东西这里就不多介绍了,在介绍Publisher
和Subscriber
的交互流程之前,先补充一下前面没有提到过的Subscription
。
Subscription
Subscription
是一个协议,实现该协议的对象负责将订阅者链接到发布者。只要它在内存中,订阅者就会继续接收值。它只包含一个方法:
public protocol Subscription : Cancellable, CustomCombineIdentifierConvertible {
/// Tells a publisher that it may send more values to the subscriber.
func request(_ demand: Subscribers.Demand)
}
当订阅者在Publisher
中接收到subscription
对象后,便开始调用request
方法,demand
参数决定了订阅者要从发布者那里获取多少个值。
demand
参数有几个可选的参数值:
none
:表示订阅者一个值都不会收到。max(value)
: 表示订阅者要接收value个值。unlimited
:表示订阅者要接受无限个值。
Subscription
的实例对象中包含了一个Subscriber
的引用,以使其保持最新状态。
Subscription
协议有继承了Cancellable
协议,所以有了cancel
方法,而在自定义Subscription
的时候,request
和cancel
方法都是必须实现的。
Publisher和Subscriber的交互流程
介绍完了Subscription
协议,现在看看Publisher
和Subscriber
是如何建立的联系。
- 由
Publisher
调用subscribe(_:)
方法开启链接申请,同时参数传入Subscriber
实例对象。 - 在第一步调用
subscribe(_:)
方法后,即触发Publisher
内部调用receive(subscriber:)
方法,在该方法中创建一个连接Publisher
和Subscriber
的Subscription
对象,然后调用Subscriber
的receive(subscription:)
方法,将Subscription
对象传给Subscriber
。 - 在
Subscriber
的receive(subscription:)
方法中,使用传进来的subscription
对象调用request
方法,并设置Subscriber
的请求次数。 - 在
Subscription
的request
方法中,知道了Subscriber
的请求次数,经过相关的逻辑处理后,在此方法中给Subscriber
发送数据。 - 通过
Subscriber
的receive(_:)
方法向Subscriber
发送数据。 - 通过
Subscriber
的receive(completion:)
方法向Subscriber
发送结束或者失败信息。
因为Subscription
是起了一个桥梁的作用,属于幕后,所以上面第5条、第6条从语义上来说相当于Publisher
通过receive(_:)
方法或receive(completion:)
方法向Subscriber
发送数据或者结束信息。实际上Subscription
替Publisher
做了向下游发送数据的事情。
自定义Subscriber
首先看一下Subscriber
协议的定义:
public protocol Subscriber<Input, Failure> : CustomCombineIdentifierConvertible {
associatedtype Input
associatedtype Failure : Error
func receive(subscription: any Subscription)
func receive(_ input: Self.Input) -> Subscribers.Demand
func receive(completion: Subscribers.Completion<Self.Failure>)
}
协议中有两个类型,三个方法。自定义的Subscriber
需要使用class
定义,而非struct
,否则会报错,另外struct
是值类型,Subscription
没有持有最初的那个Subscriber
对象。
// 自定义Subscriber
class CustomSubscriber: Subscriber {
// 确定输入类型,需要和Publisher的输出类型一致。
typealias Input = Int
// 确定失败类型,需要和Publisher的失败类型一致,永远不会失败就定义为Never。
typealias Failure = Never
/** 交互流程中第3步
* 接收subscription对象的方法。
* 方法内subscription对象调用request方法,设置请求次数。
*/
func receive(subscription: any Subscription) {
debugPrint("CustomSubscriber subscription.request")
subscription.request(.max(5))
}
/** 交互流程中第5步
* 接收Publisher发送数据的方法。
* 该方法返回`Subscribers.Demand`,用于在request方法中计算请求次数。
*/
func receive(_ input: Int) -> Subscribers.Demand {
print("New value \(input)")
return .none
}
/** 交互流程中第6步
* 接收Publisher发送结束的方法,或者正常结束,或者失败。
*/
func receive(completion: Subscribers.Completion<Never>) {
print("Completion: \(completion)")
}
}
自定义Publisher
在自定义Publisher
前,再看一下Publisher
协议的定义:
public protocol Publisher<Output, Failure> {
associatedtype Output
associatedtype Failure : Error
func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
}
自定义的Publisher
需要继承这个协议,比如:
// 自定义Publisher
class CustomPublisher: Publisher {
// 确定输出类型,需要和Subscriber的输入类型一致。
typealias Output = Int
// 确定失败类型,需要和Subscriber的失败类型一致,永远不会失败就定义为Never。
typealias Failure = Never
/** 交互流程中第2步
* 接收subscriber对象的方法。方法传入Subscriber实例对象,开始建立联系。
* 方法内创建Subscription对象,然后调用Subscriber的receive(subscription:)方法,将Subscription对象传给Subscriber。
*/
func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
// 创建Subscription对象
let subscription = CustomSubscription(subscriber: subscriber)
debugPrint("CustomPublisher subscriber.receive")
// 将Subscription对象传给Subscriber
subscriber.receive(subscription: subscription)
}
}
自定义Subscription
先看一下Subscription
协议:
public protocol Subscription : Cancellable, CustomCombineIdentifierConvertible {
/// Tells a publisher that it may send more values to the subscriber.
func request(_ demand: Subscribers.Demand)
}
该协议中规定了要实现request
方法,因为继承了Cancellable
,所以还需要实现一个cancel
方法。
public protocol Cancellable {
func cancel()
}
同Subscriber
一样,自定义的Subscription
需要使用class
定义,而非struct
,否则会报错,另外创建的Subscription
实例对象需要在内存中保持,否则订阅就失效了。
下面是自定义Subscription
:
// 自定义Subscription
class CustomSubscription<S: Subscriber>: Subscription where S.Input == Int, S.Failure == Never {
// 持有传入进来的Subscriber对象。
private var subscriber: S
private var counter = 0
private var isCompleted = false
// 初始化的时候将Subscriber对象传入进来,并持有,待后续发送数据使用。
init(subscriber: S) {
self.subscriber = subscriber
}
/** 交互流程中第4步
* 该方法传入请求数据的次数,并给Subscriber发送数据。
*/
func request(_ demand: Subscribers.Demand) {
debugPrint("CustomSubscription request")
guard !isCompleted else { return }
for _ in 0..<(demand.max ?? 10) {
_ = subscriber.receive(counter) // 给Subscriber发送数据
counter += 1
}
if counter >= 5 {
subscriber.receive(completion: .finished) // 通知Subscriber结束。
isCompleted = true
}
}
// 该方法中执行一些取消订阅的操作。
func cancel() {
isCompleted = true
}
}
如何使用
定义完了上面的,现在看看怎么使用吧。还是依托SwiftUI
的界面,我们在对应的ViewModel
中添加方法,使用上面自定义的类。
首先定义一个ViewModel
。
class CustomCombineViewModel: ObservableObject {
var subscription: AnyCancellable?
func testMethod1() {
// 创建自定义的Publisher
let publisher = CustomPublisher()
// 创建自定义的Subscriber
let subscriber = CustomSubscriber()
debugPrint("Begin subscribe")
/** 交互流程中第1步,申请订阅。
* 由Publisher对象调用subscribe方法,传入Subscriber对象开始。
*/
publisher.subscribe(subscriber)
}
func testMethod2() {
// 创建自定义的Publisher
let publisher = CustomPublisher()
// 通过sink方法申请订阅,并将创建的subscription持有,否则订阅失败,sink方法返回的时AnyCancellable,这里做了类型抹除。
subscription = publisher
.sink { completion in
print("sink completion: \(completion)")
} receiveValue: { value in
print("sink new value \(value)")
}
}
}
在上面代码中的testMethod1
方法中,分别创建了Publisher
和Subscriber
,并用Publisher
对象调用subscribe
方法开启订阅,这也是订阅的开启入口。
当执行testMethod1
时候,输出打印:
"Begin subscribe"
"CustomPublisher subscriber.receive"
"CustomSubscriber subscription.request"
"CustomSubscription request"
New value 0
New value 1
New value 2
New value 3
New value 4
Completion: finished
上面的输出也反应了从开始订阅到发送数据结束的过程。打印了5个数据是应为我们在Subscriber
类中调用request
方法的时候传入了.max(5)
,最多发送5个数据。
再看一下第二个方法testMethod2()
,这个方法中没有明确的Publisher
调用subscribe
方法呢?
Subscribers
有两个内置的Subscriber
,分别为Subscribers.Sink
和Subscribers.Assign
。当调用sink
或者assign
方法的时候,就开启了订阅流程。
当执行testMethod2
时候,输出打印:
"CustomPublisher subscriber.receive"
"CustomSubscription request"
sink new value 0
sink new value 1
sink new value 2
sink new value 3
sink new value 4
sink new value 5
sink new value 6
sink new value 7
sink new value 8
sink new value 9
sink completion: finished
因为sink
请求的是无限次数数据,所以将我们在Subscription
中的数据都打印出来了。
Subscribers.Sink
Sink
创建的时候会立即调用 Subscription
对象的 request(.unlimited)
。
Publisher
有两个 sink
扩展方法:
sink(receiveCompletion:receiveValue:)
sink(receiveValue:)
Subscribers.Assign
Assign
会将接收到的值赋值给一个类对象的属性或者一个另一个 @Published
publisher 上,它对 publisher 的 demand
也是 .unlimited
。
Publisher
有两个 assign
扩展方法:
assign<Root>(to keyPath: ReferenceWritableKeyPath<Root, Self.Output>, on object: Root)
assign(to published: inout Published<Self.Output>.Publisher)
写在最后
现在我们完全理解了Combine
订阅交互流程,是不是对Combine
框架有了进一步的认识呢?
在实际开发过程中,不建议我们自己去实现Publisher
,Subscriber
和Subscription
,因为一个逻辑错误可能会破坏发布者和订阅者之间的所有连接,这可能会导致意想不到的结果。
最后,希望能够帮助到有需要的朋友,如果觉得有帮助,还望点个赞,添加个关注,笔者也会不断地努力,写出更多更好用的文章。