Swift Combine — Publisher和Subscriber的交互流程(自定义Publisher、Subscriber、Subscription)

之前的文章已经介绍过PublisherSubscriber,对于概念类的东西这里就不多介绍了,在介绍PublisherSubscriber的交互流程之前,先补充一下前面没有提到过的Subscription

Subscription

Subscription是一个协议,实现该协议的对象负责将订阅者链接到发布者。只要它在内存中,订阅者就会继续接收值。它只包含一个方法:

public protocol Subscription : Cancellable, CustomCombineIdentifierConvertible {
    /// Tells a publisher that it may send more values to the subscriber.
    func request(_ demand: Subscribers.Demand)
}

当订阅者在Publisher中接收到subscription对象后,便开始调用request方法,demand参数决定了订阅者要从发布者那里获取多少个值。

demand参数有几个可选的参数值:

  • none:表示订阅者一个值都不会收到。
  • max(value): 表示订阅者要接收value个值。
  • unlimited:表示订阅者要接受无限个值。

Subscription的实例对象中包含了一个Subscriber的引用,以使其保持最新状态。
Subscription协议有继承了Cancellable协议,所以有了cancel方法,而在自定义Subscription的时候,requestcancel方法都是必须实现的。

Publisher和Subscriber的交互流程

介绍完了Subscription协议,现在看看PublisherSubscriber是如何建立的联系。

  1. Publisher调用subscribe(_:) 方法开启链接申请,同时参数传入Subscriber实例对象。
  2. 在第一步调用subscribe(_:) 方法后,即触发Publisher内部调用receive(subscriber:)方法,在该方法中创建一个连接PublisherSubscriberSubscription对象,然后调用Subscriberreceive(subscription:)方法,将Subscription对象传给Subscriber
  3. Subscriberreceive(subscription:)方法中,使用传进来的subscription对象调用request方法,并设置Subscriber的请求次数。
  4. Subscriptionrequest方法中,知道了Subscriber的请求次数,经过相关的逻辑处理后,在此方法中给Subscriber发送数据。
  5. 通过Subscriberreceive(_:)方法向Subscriber发送数据。
  6. 通过Subscriberreceive(completion:)方法向Subscriber发送结束或者失败信息。

因为Subscription是起了一个桥梁的作用,属于幕后,所以上面第5条、第6条从语义上来说相当于Publisher通过receive(_:)方法或receive(completion:)方法向Subscriber发送数据或者结束信息。实际上SubscriptionPublisher做了向下游发送数据的事情。

自定义Subscriber

首先看一下Subscriber协议的定义:

public protocol Subscriber<Input, Failure> : CustomCombineIdentifierConvertible {
    associatedtype Input
    associatedtype Failure : Error

    func receive(subscription: any Subscription)
    func receive(_ input: Self.Input) -> Subscribers.Demand
    func receive(completion: Subscribers.Completion<Self.Failure>)
}

协议中有两个类型,三个方法。自定义的Subscriber需要使用class定义,而非struct,否则会报错,另外struct是值类型,Subscription没有持有最初的那个Subscriber对象。

// 自定义Subscriber
class CustomSubscriber: Subscriber {
  // 确定输入类型,需要和Publisher的输出类型一致。
  typealias Input = Int
  // 确定失败类型,需要和Publisher的失败类型一致,永远不会失败就定义为Never。
  typealias Failure = Never

  /** 交互流程中第3步
   *  接收subscription对象的方法。
   *  方法内subscription对象调用request方法,设置请求次数。
   */
  func receive(subscription: any Subscription) {
    debugPrint("CustomSubscriber subscription.request")
    subscription.request(.max(5))
  }
  
  /** 交互流程中第5步
   *  接收Publisher发送数据的方法。
   *  该方法返回`Subscribers.Demand`,用于在request方法中计算请求次数。
   */
  func receive(_ input: Int) -> Subscribers.Demand {
    print("New value \(input)")
    return .none
  }

  /** 交互流程中第6步
   *  接收Publisher发送结束的方法,或者正常结束,或者失败。
   */
  func receive(completion: Subscribers.Completion<Never>) {
    print("Completion: \(completion)")
  }
}

自定义Publisher

在自定义Publisher前,再看一下Publisher协议的定义:

public protocol Publisher<Output, Failure> {

    associatedtype Output
    associatedtype Failure : Error

    func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
}

自定义的Publisher需要继承这个协议,比如:

// 自定义Publisher
class CustomPublisher: Publisher {
  // 确定输出类型,需要和Subscriber的输入类型一致。
  typealias Output = Int
  // 确定失败类型,需要和Subscriber的失败类型一致,永远不会失败就定义为Never。
  typealias Failure = Never

  /** 交互流程中第2步
   *  接收subscriber对象的方法。方法传入Subscriber实例对象,开始建立联系。
   *  方法内创建Subscription对象,然后调用Subscriber的receive(subscription:)方法,将Subscription对象传给Subscriber。
   */
  func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
    // 创建Subscription对象
    let subscription = CustomSubscription(subscriber: subscriber)
    debugPrint("CustomPublisher subscriber.receive")
    // 将Subscription对象传给Subscriber
    subscriber.receive(subscription: subscription)
  }
}

自定义Subscription

先看一下Subscription协议:

public protocol Subscription : Cancellable, CustomCombineIdentifierConvertible {
    /// Tells a publisher that it may send more values to the subscriber.
    func request(_ demand: Subscribers.Demand)
}

该协议中规定了要实现request方法,因为继承了Cancellable,所以还需要实现一个cancel方法。

public protocol Cancellable {
    func cancel()
}

Subscriber一样,自定义的Subscription需要使用class定义,而非struct,否则会报错,另外创建的Subscription实例对象需要在内存中保持,否则订阅就失效了。

下面是自定义Subscription

// 自定义Subscription
class CustomSubscription<S: Subscriber>: Subscription where S.Input == Int, S.Failure == Never {
  // 持有传入进来的Subscriber对象。
  private var subscriber: S
  private var counter = 0
  private var isCompleted = false

  // 初始化的时候将Subscriber对象传入进来,并持有,待后续发送数据使用。
  init(subscriber: S) {
    self.subscriber = subscriber
  }
  
  /** 交互流程中第4步
   *  该方法传入请求数据的次数,并给Subscriber发送数据。
   */
  func request(_ demand: Subscribers.Demand) {
    debugPrint("CustomSubscription request")
    guard !isCompleted else { return }

    for _ in 0..<(demand.max ?? 10) {
      _ = subscriber.receive(counter) // 给Subscriber发送数据
      counter += 1
    }

    if counter >= 5 {
      subscriber.receive(completion: .finished) // 通知Subscriber结束。
      isCompleted = true
    }
  }

  // 该方法中执行一些取消订阅的操作。
  func cancel() {
    isCompleted = true
  }
}

如何使用

定义完了上面的,现在看看怎么使用吧。还是依托SwiftUI的界面,我们在对应的ViewModel中添加方法,使用上面自定义的类。

首先定义一个ViewModel

class CustomCombineViewModel: ObservableObject {

  var subscription: AnyCancellable?

  func testMethod1() {
    // 创建自定义的Publisher
    let publisher = CustomPublisher()
    // 创建自定义的Subscriber
    let subscriber = CustomSubscriber()

    debugPrint("Begin subscribe")

    /** 交互流程中第1步,申请订阅。
     *  由Publisher对象调用subscribe方法,传入Subscriber对象开始。
     */
    publisher.subscribe(subscriber)
  }

  func testMethod2() {
    // 创建自定义的Publisher
    let publisher = CustomPublisher()
    // 通过sink方法申请订阅,并将创建的subscription持有,否则订阅失败,sink方法返回的时AnyCancellable,这里做了类型抹除。
    subscription = publisher
      .sink { completion in
        print("sink completion: \(completion)")
      } receiveValue: { value in
        print("sink new value \(value)")
      }
  }
}

在上面代码中的testMethod1方法中,分别创建了PublisherSubscriber,并用Publisher对象调用subscribe方法开启订阅,这也是订阅的开启入口。

当执行testMethod1时候,输出打印:

"Begin subscribe"
"CustomPublisher subscriber.receive"
"CustomSubscriber subscription.request"
"CustomSubscription request"
New value 0
New value 1
New value 2
New value 3
New value 4
Completion: finished

上面的输出也反应了从开始订阅到发送数据结束的过程。打印了5个数据是应为我们在Subscriber类中调用request方法的时候传入了.max(5),最多发送5个数据。

再看一下第二个方法testMethod2(),这个方法中没有明确的Publisher调用subscribe方法呢?

Subscribers有两个内置的Subscriber,分别为Subscribers.SinkSubscribers.Assign。当调用sink或者assign方法的时候,就开启了订阅流程。

当执行testMethod2时候,输出打印:

"CustomPublisher subscriber.receive"
"CustomSubscription request"
sink new value 0
sink new value 1
sink new value 2
sink new value 3
sink new value 4
sink new value 5
sink new value 6
sink new value 7
sink new value 8
sink new value 9
sink completion: finished

因为sink请求的是无限次数数据,所以将我们在Subscription中的数据都打印出来了。

Subscribers.Sink

Sink 创建的时候会立即调用 Subscription 对象的 request(.unlimited)

Publisher 有两个 sink 扩展方法:

  • sink(receiveCompletion:receiveValue:)
  • sink(receiveValue:)

Subscribers.Assign

Assign 会将接收到的值赋值给一个类对象的属性或者一个另一个 @Published publisher 上,它对 publisher 的 demand 也是 .unlimited

Publisher 有两个 assign 扩展方法:

  • assign<Root>(to keyPath: ReferenceWritableKeyPath<Root, Self.Output>, on object: Root)
  • assign(to published: inout Published<Self.Output>.Publisher)

写在最后

现在我们完全理解了Combine订阅交互流程,是不是对Combine框架有了进一步的认识呢?
在实际开发过程中,不建议我们自己去实现PublisherSubscriberSubscription,因为一个逻辑错误可能会破坏发布者和订阅者之间的所有连接,这可能会导致意想不到的结果。

最后,希望能够帮助到有需要的朋友,如果觉得有帮助,还望点个赞,添加个关注,笔者也会不断地努力,写出更多更好用的文章。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/742383.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

软件质量保证与测试

目录 一、测试流程 二、测试用例 2.1概念 2.2用例编写格式 三、设计测试点 3.1等价类 3.1.1概念 3.1.2案例 3.1.3适用场景 3.1.4执行用例 3.2边界值 3.2.1概念 3.2.2案例 3.2.3使用场景 3.3判定表 3.3.1判定表使用原因 3.3.2概念 3.3.3案例 3.3.4使用场景 …

【Linux】Ubuntu 部署 Zabbix 7.0

实验环境&#xff1a;Ubuntu-22.04 官方下载地址&#xff1a; 下载Zabbix 7.0 LTS for Ubuntu 22.04 (Jammy), MySQL, Apache 1、下载 Zabbix 官方安装包以及环境配置 下载 zabbix 安装包 wget https://repo.zabbix.com/zabbix/7.0/ubuntu/pool/main/z/zabbix-release/zabb…

Go语言之数据类型

网站&#xff1a;http://hardyfish.top/ 免费书籍分享&#xff1a; 资料链接&#xff1a;https://url81.ctfile.com/d/57345181-61545511-81795b?p3899 访问密码&#xff1a;3899 免费专栏分享&#xff1a; 资料链接&#xff1a;https://url81.ctfile.com/d/57345181-6161623…

篮球联盟管理系统的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;管理员管理&#xff0c;球员管理&#xff0c;用户管理&#xff0c;球队管理&#xff0c;论坛管理&#xff0c;篮球资讯管理&#xff0c;基础数据管理 前台账户功能包括&#xff1a;系统首页&#xff0…

解决ssh: connect to host IP port 22: Connection timed out报错(scp传文件指定端口)

错误消息 ssh: connect to host IP port 22: Connection timed out 指出 SSH 客户端尝试连接到指定的 IP 地址和端口号&#xff08;默认 SSH 端口是 22&#xff09;&#xff0c;但是连接超时了。这意味着客户端没有在预定时间内收到来自服务器的响应。 可能的原因 SSH 服务未…

【数据结构与算法】最短路径,Floyd算法,Dijkstra算法 详解

Floyd算法 for (int k 0; k < n; k) {for (int i 0; i < n; i) {for (int j 0; j < n; j) {if (d[i][k] ! INF && d[k][j] ! INF) {d[i][j] min(d[i][j], d[i][k] d[k][j]);}}} }Dijkstra算法&#xff08;基于最小堆&#xff09; void dijkstra(int st…

【JavaEE精炼宝库】多线程进阶(1)常见锁策略 | CAS | ABA问题

目录 一、常见的锁策略&#xff1a; 1.1 悲观锁 | 乐观锁&#xff1a; 1.2 重量级锁 | 轻量级锁&#xff1a; 1.3 自旋锁 | 挂起等待锁&#xff1a; 1.4 公平锁 | 非公平锁&#xff1a; 1.5 可重入锁 | 不可重入锁&#xff1a; 1.6 互斥锁 | 读写锁&#xff1a; 1.7 面…

服务器神秘挂起:一场惊心动魄的内核探案

2024年6月17日&#xff0c;我们的运维团队突然收到了一连串的告警。监控大屏上&#xff0c;代表着不同 Sealos 可用区的绿点中&#xff0c;零星地闪烁起了一两个红点。 “奇怪&#xff0c;怎么有几台服务器突然 hang 住了&#xff1f;” 值班的小辉皱起了眉头。 这次故障的诡…

python遍历文件夹中所有图片

python遍历文件夹中的图片-CSDN博客 这个是之前的版本&#xff0c;现在这个版本会更好&#xff0c;直接进来就在列表中 path glob.glob("1/*.jpg")print(path)print(len(path))path_img glob.glob("1/*.jpg")path_img.extend(path)print(len(path_img))…

基于Hexo+GITHUB搭建个人博客网站(PS:不用域名,不用服务器,重点是免费,小白也能轻松掌握)

✌ 作者名字&#xff1a;高峰君主 &#x1f4eb; 如果文章知识点有错误的地方&#xff0c;请指正&#xff01;和大家一起学习&#xff0c;一起进步&#x1f440; &#x1f4ac; 人生格言&#xff1a;没有我不会的语言&#xff0c;没有你过不去的坎儿。&#x1f4ac; &#x1f5…

25.模式和匹配

目录 一、概念二、模式的位置2.1 match分支2.2 if let表达式2.3 while let条件循环2.4 for循环2.5 let语句2.6 函数参数 三、模式是否会匹配失效四、模式语法4.1 匹配字面量4.2 匹配命名变量4.3 解构并分解值1&#xff09;解构结构体2&#xff09;解构枚举3&#xff09;解构嵌套…

动态规划数字三角形模型——AcWing 1015. 摘花生

动态规划数字三角形模型 定义 动态规划数字三角形模型是在一个三角形的数阵中&#xff0c;通过一定规则找到从顶部到底部的最优路径或最优值。 运用情况 通常用于解决具有递推关系、需要在不同路径中做出选择以达到最优结果的问题。比如计算最短路径、最大和等 注意事项 …

MySQL之复制(十一)

复制 复制的问题和解决方案 数据损坏或丢失的错误 当一个二进制日志损坏时&#xff0c;能恢复多少数据取决于损坏的类型&#xff0c;有几种比较常见的类型: 1.数据改变&#xff0c;但事件仍是有效的SQL 不幸的是&#xff0c;MySQL甚至无法察觉这种损坏。因此最好还是经常检查…

【小程序】聊天功能

文章目录 聊天功能实现功能实现思路后端前端效果展示 聊天功能 实现功能 要实现一个聊天机器人&#xff0c;它能够解答用户疑问&#xff0c;并且能够识别到用户聊天的主题&#xff0c;涉及到饮食方面时&#xff0c;会自动决定是否要去数据库中读取用户的相关喜好信息&#xf…

录音怎么转文字更高效?5款软件带你轻松拿捏文本转换~

临近大学生们最难熬的期末考试周&#xff0c;如何在短时间内复习完所有必考的科目也就成为大家迫在眉睫的首要任务。 想要在复习的过程中&#xff0c;更加高效地捕捉和整理关键信息、提高学习效率&#xff0c;那么录音转文字免费应用无疑是你的一大好帮手&#xff01; 倘若你…

YOLOv5改进 | SPPF | 具有多尺度带孔卷积层的ASPP【CVPR2018】

&#x1f4a1;&#x1f4a1;&#x1f4a1;本专栏所有程序均经过测试&#xff0c;可成功执行&#x1f4a1;&#x1f4a1;&#x1f4a1; 专栏目录&#xff1a; 《YOLOv5入门 改进涨点》专栏介绍 & 专栏目录 |目前已有40篇内容&#xff0c;内含各种Head检测头、损失函数Loss、…

设计模式5-策略模式(Strategy)

设计模式5-策略模式 简介目的定义结构策略模式的结构要点 举例说明1. 策略接口2. 具体策略类3. 上下文类4. 客户端代码 策略模式的反例没有使用策略模式的代码 对比分析 简介 策略模式也是属于组件协作模式一种。现代软件专业分工之后的第一个结果是框架语音应用程序的划分。组…

WEB界面上使用ChatGPT

&#xff08;作者&#xff1a;陈玓玏&#xff09; 开源项目&#xff0c;欢迎star哦&#xff0c;https://github.com/tencentmusic/cube-studio 随着大模型不断发展&#xff0c;现在无论写代码&#xff0c;做设计&#xff0c;甚至老师备课、评卷都可以通过AI大模型来实现了&…

【数据结构与算法】动态查找表(二叉排序树,二叉平衡树)详解

二叉排序树的数据结构。 struct TreeNode {ElemType data;TreeNode *left, *right; }; using BiTree TreeNode *;结构体包含三个成员&#xff1a; data 是一个 ElemType 类型的变量&#xff0c;用于存储二叉搜索树节点的数据。left 是一个指向 TreeNode 类型的指针&#xff…

【Pandas驯化-16】一文搞懂Pandas中高性能query、eval函数技巧

【Pandas驯化-16】一文搞懂Pandas中高性能query、eval函数技巧 本次修炼方法请往下查看 &#x1f308; 欢迎莅临我的个人主页 &#x1f448;这里是我工作、学习、实践 IT领域、真诚分享 踩坑集合&#xff0c;智慧小天地&#xff01; &#x1f387; 相关内容文档获取 微信公众…