Debezium日常分享系列之:流式传输 Cassandra
- 一、批量 ETL 选项
- 二、流媒体选项
- 三、Kafka 作为事件源
- 四、解析提交日志
- 五、提交日志深入探讨
- 1.延迟处理
- 2.空间管理
- 3.重复的事件
- 4.无序事件
- 5.带外架构更改
- 6.行数据不完整
- 六、最低限度可行的基础设施
- 1.无状态流处理
- 2.有状态流处理
- 3.读取时处理
- 七、Cassandra数据库对Gossip协议的应用
选择Cassandra 这个 NoSQL 数据库,主要是因为它的高可用性、水平可扩展性以及处理高写入吞吐量的能力。
一、批量 ETL 选项
将 Cassandra 引入我们的基础设施后,我们的下一个挑战是找到一种方法将 Cassandra 中的数据公开给我们的数据仓库 BigQuery,以进行分析和报告。我们快速构建了一个 Airflow hook 和操作符来执行满载。这显然无法扩展,因为它会在每次加载时重写整个数据库。为了扩展管道,我们评估了两种增量加载方法,但两者都有其缺点:
- 范围查询。这是一种常见的 ETL 方法,其中通过范围查询定期(例如每小时或每天)提取数据。任何熟悉 Cassandra 数据建模的人都会很快意识到这种方法是多么不切实际。 Cassandra 表需要建模以优化生产中使用的查询模式。在大多数情况下,添加此查询模式进行分析意味着使用不同的集群键克隆表。 RDBMS 人员可能会建议二级索引来支持这种查询模式,但 Cassandra 中的二级索引是本地的,因此这种方法本身会带来性能和扩展问题。
- 处理未合并的 SSTable。 SSTables 是 Cassandra 的不可变存储文件。 Cassandra 提供了 sstabledump CLI 命令,可将 SSTable 内容转换为人类可读的 JSON。然而,Cassandra 是建立在日志结构合并 (LSM) 树概念之上的,这意味着 SSTable 会定期合并到新的压缩文件中。根据压缩策略,在带外检测未合并的 SSTable 文件可能具有挑战性(我们后来了解了 Cassandra 中的增量备份功能,该功能仅备份未压缩的 SSTable;因此这种方法也能发挥作用。)
考虑到这些挑战,在为 MySQL 构建和运营流数据管道后,我们开始探索 Cassandra 的流选项。
二、流媒体选项
双写
这个想法是每次在 Cassandra 上执行写入操作时都会发布到 Kafka。这种双重写入可以通过内置触发器或客户端周围的自定义包装器来执行。这种方法存在性能问题。首先,由于我们现在需要写入两个系统而不是一个系统,因此写入延迟增加了。更重要的是,当对一个系统的写入由于超时而失败时,写入是否成功是不确定的。为了保证两个系统上的数据一致性,我们必须实现分布式事务,但多次往返共识会增加延迟并进一步降低吞吐量。这违背了高写入吞吐量数据库的目的。
三、Kafka 作为事件源
这个想法是写给 Kafka,而不是直接写给 Cassandra;然后通过消费来自 Kafka 的事件将写入应用到 Cassandra。事件溯源是当今非常流行的方法。但是,如果您已有直接写入 Cassandra 的现有服务,则需要更改应用程序代码并进行重要的迁移。这种方法还违反了读你所写的一致性:如果一个进程执行写入,那么执行后续读取的同一进程必须观察写入的效果。由于写入是通过 Kafka 路由的,因此发出写入和应用写入之间会存在延迟;在此期间,读取 Cassandra 将导致数据过时。这可能会导致不可预见的生产问题。
四、解析提交日志
Cassandra 在 3.0 中引入了变更数据捕获 (CDC) 功能来公开其提交日志。提交日志是 Cassandra 中的预写日志,旨在在机器崩溃时提供持久性。它们通常在冲洗时被丢弃。启用 CDC 后,它们会在刷新时传输到本地 CDC 目录,然后可由 Cassandra 节点上的其他进程读取。这允许我们使用与 MySQL 流管道中相同的 CDC 机制。它将生产运营与分析分离,因此不需要应用工程师进行额外的工作。
最终,在考虑了吞吐量、一致性和关注点分离之后,最后一个选项——解析提交日志——成为了最有力的竞争者。
五、提交日志深入探讨
除了公开提交日志之外,Cassandra 还提供 CommitLogReader 和 CommitLogReadHandler 类来帮助进行日志的反序列化。看来艰苦的工作已经完成,剩下的就是应用转换——将反序列化表示转换为 Avro 记录并将其发布到 Kafka。然而,当我们进一步深入研究 CDC 功能和 Cassandra 本身的实现时,我们意识到存在许多新的挑战。
1.延迟处理
提交日志仅在 CDC 目录已满时到达,在这种情况下,它将被刷新/丢弃。这意味着记录事件和捕获事件之间存在延迟。如果执行很少或不执行写入,则事件捕获的延迟可能会任意长。
2.空间管理
在MySQL中,您可以设置binlog保留,以便在配置的保留期限后自动删除日志。然而在 Cassandra 中没有这样的选项。一旦提交日志传输到CDC目录,处理后必须进行消费以清理提交日志。如果 CDC 目录的可用磁盘空间超过给定阈值,则对数据库的进一步写入将被拒绝。
3.重复的事件
单个 Cassandra 节点上的提交日志并不反映对集群的所有写入;它们仅反映对节点的写入。这使得有必要在所有节点上处理提交日志。但如果复制因子为 N,则每个事件的 N 个副本会发送到下游。
4.无序事件
对单个 Cassandra 节点的写入在到达时会被连续记录。但是,这些事件从发出时起可能会无序到达。这些事件的下游消费者必须了解事件时间并实现与 Cassandra 的读取路径类似的最后写入获胜逻辑,以获得正确的结果。
5.带外架构更改
表的架构更改通过gossip protocol进行通信,并且不会记录在提交日志中。因此,只能尽力检测架构中的更改。
6.行数据不完整
Cassandra 不会执行先读后写的操作,因此更改事件不会捕获每个列的状态,它们仅捕获已修改列的状态。这使得更改事件不如整行可用时有用。
一旦我们深入了解 Cassandra 提交日志,我们就会根据给定的约束重新评估我们的要求,以设计最小可行的基础设施。
六、最低限度可行的基础设施
借鉴最小可行产品理念,我们希望设计一个具有最少功能和要求的数据管道,以满足我们的直接客户的需求。对于 Cassandra CDC,这意味着:
引入 CDC 不应对生产数据库的健康状况和性能产生负面影响;运营放缓和系统停机比分析管道延迟的成本要高得多
查询数据仓库中的 Cassandra 表应该与查询生产数据库的结果相匹配(排除延迟);具有重复和/或不完整的行会增加每个最终用户的后处理工作量有了这些标准,我们开始集思广益寻找解决方案,并最终提出了三种方法:
1.无状态流处理
- 该解决方案的灵感来自 Datastax 的高级复制博客文章。
- 这个想法是在每个 Cassandra 节点上部署一个代理来处理本地提交日志。每个代理都被视为基于分区键的写入子集的“主要”,这样每个事件都只有一个主要代理。
- 然后在CDC期间,为了避免重复事件,每个代理仅将事件发送到Kafka(如果它是该事件的主代理)。
- 为了处理最终的一致性,每个代理都会在事件到达时将其分类到每个表的时间切片窗口中(但不会立即发布它们);
- 当窗口到期时,该窗口中的事件将被散列,并将散列与其他节点进行比较。如果它们不匹配,则从不一致的节点获取数据,以便最后一次写入获胜可以解析正确的值。
- 最后,该窗口中更正的事件将被发送到 Kafka。
- 任何超出时间切片窗口的无序事件都必须记录到无序文件中并单独处理。
- 由于重复数据删除和排序是在内存中完成的,因此对代理故障转移导致数据丢失、影响生产数据库的 OOM 问题以及此实现的整体复杂性的担忧阻止了我们进一步探索它。
2.有状态流处理
- 该解决方案功能最丰富。
- 这个想法是,每个 Cassandra 节点上的代理将处理提交日志并将事件发布到 Kafka,而无需重复数据删除和排序。
- 然后,流处理引擎将消耗这些原始事件并完成繁重的工作(例如使用缓存过滤掉重复事件,使用事件时间窗口管理事件顺序,以及通过在状态存储上执行先读后写来捕获未修改列的状态) ),然后将这些派生事件发布到单独的 Kafka 主题。
- 最后,KCBQ 将用于消费该主题中的事件并将其上传到 BigQuery。这种方法很有吸引力,因为它一般性地解决了问题——任何人都可以订阅后一个 Kafka 主题,而无需自己处理重复数据删除和排序。
- 然而,这种方法会带来大量的运营开销;我们必须维护一个流处理引擎、一个数据库和一个缓存。
3.读取时处理
- 与之前的方法类似,其想法是处理每个 Cassandra 节点上的提交日志并将事件发送到 Kafka,无需重复数据删除和排序。
- 与之前的方法不同,流处理部分被完全消除。相反,原始事件将通过 KCBQ 直接上传到 BigQuery。视图是在原始表之上创建的,用于处理重复数据删除、排序和合并列以形成完整的行。由于 BigQuery 视图是虚拟表,因此每次查询视图时都会延迟处理。为了防止视图查询变得过于昂贵,视图将定期具体化。这种方法利用 BigQuery 的大规模并行查询引擎,消除了操作复杂性和代码复杂性。然而,缺点是非 KCBQ 下游消费者必须自己完成所有工作。
- 鉴于我们流式 Cassandra 的主要目的是数据仓库,我们最终决定实现读时处理。它为我们现有的用例提供了基本功能,并提供了将来扩展到上述其他两个更通用的解决方案的灵活性。
七、Cassandra数据库对Gossip协议的应用
Cassandra数据库使用Gossip协议主要有以下几个用处:
节点发现和自动加入:Cassandra集群中的节点使用Gossip协议进行相互通信,通过定期交换消息来发现新加入的节点并自动将其加入到集群中。这使得节点的动态加入和离开成为可能,而无需依赖于集中式的节点发现服务。
全局状态信息的传播:Cassandra使用Gossip协议来传播集群中节点的状态信息,如节点的健康状态、数据分布信息等。通过收集和传播这些信息,集群中的节点可以更好地了解整个系统的状态,并做出相应的调整和决策。
数据一致性的维护:Cassandra使用Gossip协议来传播和更新副本之间的数据变更信息。节点会将数据变更信息传播给其他节点,以保持副本之间的数据一致性。这种基于Gossip协议的数据传播方式可以在分布式环境下有效地维护数据的一致性。
故障检测和恢复:通过Gossip协议,节点可以检测到其他节点的故障,并将故障信息传播给其他节点。这使得集群可以快速地检测到故障节点并采取相应的恢复措施。
总的来说,Cassandra使用Gossip协议来实现分布式环境下的节点发现、全局状态信息传播、数据一致性维护和故障检测恢复等功能,确保集群的可靠性、容错性和一致性。