分布式事务的21种武器 - 6

news2025/1/19 11:13:58

在分布式系统中,事务的处理分布在不同组件、服务中,因此分布式事务的ACID保障面临着一些特殊难点。本系列文章介绍了21种分布式事务设计模式,并分析其实现原理和优缺点,在面对具体分布式事务问题时,可以选择合适的模式进行处理。原文: Exploring Solutions for Distributed Transactions (6)

Duncan Meyer @Unsplash
Duncan Meyer @Unsplash

在不同业务场景下,可以有不同的解决方案,常见方法有:

  1. 阻塞重试(Blocking Retry)
  2. 二阶段和三阶段提交(Two-Phase Commit (2PC) and Three-Phase Commit (3PC))
  3. 基于后台队列的异步处理(Using Queues to Process Asynchronously in the Background)
  4. TCC补偿(TCC Compensation Matters)
  5. 本地消息表(异步保证)/发件箱模式(Local Message Table (Asynchronously Ensured)/Outbox Pattern)
  6. MQ事务(MQ Transaction)
  7. Saga模式(Saga Pattern)
  8. 事件驱动(Event Sourcing)
  9. 命令查询职责分离(Command Query Responsibility Segregation, CQRS)
  10. 原子提交(Atomic Commitment)
  11. 并行提交(Parallel Commits)
  12. 事务复制(Transactional Replication)
  13. 一致性算法(Consensus Algorithms)
  14. 时间戳排序(Timestamp Ordering)
  15. 乐观并发控制(Optimistic Concurrency Control)
  16. 拜占庭容错(Byzantine Fault Tolerance, BFT)
  17. 分布式锁(Distributed Locking)
  18. 分片(Sharding)
  19. 多版本并发控制(Multi-Version Concurrency Control, MVCC)
  20. 分布式快照(Distributed Snapshots)
  21. 主从复制(Leader-Follower Replication)

本文将介绍拜占庭容错、分布式锁以及分片三种模式。

16. 拜占庭容错(Byzantine Fault Tolerance, BFT)
  • 一种容错机制,允许分布式系统在存在故障节点的情况下正常运行。
  • 节点之间相互通信从而就决策达成共识,即使存在某些恶意或错误节点,也可以工作。
  • 涉及如下步骤:
    1. 系统中每个节点向所有其他节点发送消息,其中包含建议的决策或值的信息。
    2. 每个节点检查接收到的消息的有效性。如果接收到来自故障节点或攻击者的消息,则忽略该消息。
    3. 每个节点收集接收到的所有有效消息,并创建系统视图(view)。视图是一组特定节点的有效消息,每个节点与系统中的所有其他节点共享视图。
    4. 每个节点检查从其他节点接收到的视图的有效性。如果接收到无效视图,则忽略该视图。
    5. 每个节点创建包含其系统视图的证书(certificate),并与所有其他节点共享。证书是经过签名的声明,用于证明视图的有效性。
    6. 每个节点检查从其他节点收到的证书的有效性。如果收到无效证书,应该忽略该证书。
    7. 一旦节点验证了收到的所有证书,就可以对提议的决策或值达成共识。每个节点根据收到的证书对相同的值达成共识。
    8. 一旦达成共识,每个节点执行已达成一致的决策。
from typing import List, Dict, Tuple

class Message:
    def __init__(self, sender: int, content: str):
        self.sender = sender
        self.content = content

class ByzantineNode:
    def __init__(self, id: int, network: Dict[int, List[Message]], threshold: int):
        self.id = id
        self.network = network
        self.threshold = threshold
        self.decisions = {}

    def send_message(self, receiver: int, content: str):
        message = Message(self.id, content)
        self.network[receiver].append(message)

    def receive_messages(self) -> List[Message]:
        messages = self.network[self.id]
        self.network[self.id] = []
        return messages

    def generate_vote(self, messages: List[Message]) -> bool:
        count = 0
        for message in messages:
            if message.content == 'True':
                count += 1
            elif message.content == 'False':
                count -= 1
        return count >= self.threshold

    def run_bft(self, decision_content: str):
        # Phase 1: Broadcast proposal to all nodes
        proposal = Message(self.id, decision_content)
        for node_id in self.network:
            self.send_message(node_id, str(proposal))

        # Phase 2: Receive messages and generate votes
        messages = self.receive_messages()
        vote = self.generate_vote(messages)

        # Phase 3: Broadcast decision to all nodes
        decision = Message(self.id, str(vote))
        for node_id in self.network:
            self.send_message(node_id, str(decision))

        # Phase 4: Receive decisions and count votes
        decisions = [m.content for m in self.receive_messages()]
        count_true = decisions.count('True')
        count_false = decisions.count('False')

        # Record decision if it meets threshold, else record failure
        if count_true >= self.threshold:
            self.decisions[decision_content] = True
        elif count_false >= self.threshold:
            self.decisions[decision_content] = False
        else:
            self.decisions[decision_content] = None

示例代码

  • 只要故障节点数量小于阈值,该算法就可以容忍故障。
  • 由两个类组成:
    1. Message —— 表示网络中节点之间发送的消息,包含发送者ID和消息内容。
    2. ByzantineNode —— 表示网络节点,包含ID、网络拓扑、容忍的故障数量阈值,以及存储节点决策的字典。
    • ByzantineNode类提供了几个方法:
      • send_message()方法 —— 发送消息到网络中的另一个节点
      • receive_messages()方法 —— 检索自上次调用receive_messages()以来收到的所有消息
      • generate_vote()方法 —— 将消息列表作为输入,并根据消息内容生成投票。如果"True"消息的数量大于或等于阈值,则该方法返回True,否则返回False。
      • run_bft()方法 —— 实现BFT算法的4个阶段。
        • 阶段1 —— 用send_message()方法向网络中所有节点广播提案。提案是个Message对象,其内容是作为参数传递给run_bft()方法的decision_content。
        • 阶段2 —— 用receive_messages()方法接收来自网络中其他节点的消息。用generate_vote()方法根据收到的消息生成投票,根据收到的"True"和"False"数量,投票"True"或"False"。
        • 阶段3 —— 用send_message()方法将决策广播到网络中所有节点。决策是个Message对象,其内容为上一阶段生成的投票。
        • 阶段4 —— 计算收到的票数,如果票数达到阈值,则记录决策。使用receive_messages()方法检索自上次调用receive_messages()以来收到的所有消息。检查每条消息内容,并计算"True"和"False"消息的数量。如果"True"的数量大于或等于阈值,则将该决策记录为True。如果"False"的数量大于或等于阈值,则该决策被记录为False。如果两个条件都不满足,则该决策记录为None。

优点

  • 在分布式系统中容忍一定数量的错误或失败
  • 即使存在故障节点或恶意攻击,也能确保分布式系统中所有节点达成一致的决策
  • 为用于加密货币和其他应用的区块链网络提供高水平的安全性和弹性

缺点

  • 可能需要昂贵的计算,并且需要节点之间有高质量网络通信,否则可能会增加延迟并降低系统性能
  • 因为可能需要节点之间的高级协调和通信,因此可能不适合所有类型的分布式系统
  • 不能为分布式系统中所有类型的故障或攻击提供完整解决方案

适用场景

  • 金融系统 —— 股票交易
  • 基础设施系统 —— 电网或运输系统
  • 区块链网络 —— 加密货币和其他应用

挑战

  • 设计和实现BFT系统可能很复杂,并且需要在分布式系统、密码学和安全性方面具有高水平专业知识。
  • 确保所有节点都是可信、没有恶意的。
  • 在BFT系统中实现高性能和低延迟具有挑战性。

17. 分布式锁(Distributed Locking)
  • 管理分布式系统中共享资源的访问。
  • 保证系统中多个节点不能同时访问或修改相同的资源,避免可能的不一致和数据损坏。
  • 涉及以下步骤:
    1. 节点请求对共享资源加锁。请求包含资源的唯一标识符以及所请求的锁类型(例如,读或写)。
    2. 锁管理器管理锁,接收请求,并检查资源是否已经锁定。如果资源未被锁定,锁管理器将锁授予请求节点并发送确认。
    3. 如果资源已经被锁定,锁管理器检查请求节点是否被授权访问该资源。如果该节点已获得授权,锁管理器将该请求添加到资源的挂起请求队列中,并向请求节点发送确认信息。如果该节点未被授权,则锁管理器拒绝该请求并发送拒绝消息。
    4. 在等待授予锁时,请求节点定期轮询锁管理器以获取锁状态。
    5. 当节点访问完资源后,通过向锁管理器发送释放请求来释放锁。锁管理器从资源中删除锁,并将锁授予队列中的下一个节点(如果有的话)。
    6. 如果持有锁的节点发生故障或崩溃,锁管理器将检测到该故障,并代表发生故障的节点释放锁,然后将锁授予队列中的下一个节点(如果有的话)。
    7. 如果节点请求锁,但没有收到锁管理器的响应,那么假定锁管理器已经失败,并通过选举新的领导节点来接管锁管理器的角色。
from kazoo.client import KazooClient
from kazoo.exceptions import LockTimeout
import time

class DistributedLock:
    def __init__(self, zk_address, lock_path):
        self.zk = KazooClient(hosts=zk_address)
        self.lock_path = lock_path
        self.lock = None

    def __enter__(self):
        self.zk.start()
        self.lock = self.zk.Lock(self.lock_path)
        try:
            self.lock.acquire(timeout=10)
        except LockTimeout:
            self.zk.stop()
            raise Exception("Timeout while waiting for lock")

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.lock.release()
        self.zk.stop()

if __name__ == '__main__':
    zk_address = 'localhost:2181'
    lock_path = '/my_lock'
    with DistributedLock(zk_address, lock_path):
        print("Acquired lock!")
        time.sleep(10)
    print("Released lock!")

示例代码

  • 基于 Apache ZooKeeper分布式协调服务
  • 导入所需的库 —— KazooClient和LockTimeout
    • KazooClient —— ZooKeeper的Python客户端
    • LockTimeout —— 当不能在指定超时内获得锁时引发异常。
  • 定义DistributedLock类 —— 接受2个参数: zk_addresslock_path
    • zk_address —— ZooKeeper服务器地址
    • lock_path —— 锁节点在ZooKeeper中的路径
    • init方法中初始化ZooKeeper客户端,并存储 lock_path,并且将锁变量初始化为None。
    • enter方法中启动ZooKeeper客户端,创建锁对象,并尝试获取锁。如果锁不能在10秒内获得,则引发LockTimeout异常。
    • exit方法中释放锁并停止ZooKeeper客户端。
  • 在代码主体部分,用指定的ZooKeeper地址和锁路径创建DistributedLock类实例。
  • with语句获取锁。
  • 当获得锁时,打印一条消息,表示已获得锁,然后sleep 10秒,以模拟持有锁时正在完成的一些工作。
  • sleep后,锁会被with语句自动释放,打印一条消息,表明锁已被释放。

优点

  • 在分布式系统中,通过确保一次只有一个进程可以修改共享资源来维护数据一致性
  • 防止多个进程同时访问共享资源,以确保该资源在需要时始终可用
  • 允许多个进程跨多个节点访问共享资源

缺点

  • 需要在分布式系统中的多个节点之间进行协调
  • 在分布式系统中引入延迟并降低性能
  • 如果分布式锁定机制失败,可能会导致整个分布式系统失败

适用场景

  • 分布式数据库
  • 电子商务系统 —— 管理对购物车的访问或防止多个用户同时购买相同的商品

挑战

  • 需要公平分配资源,以确保所有进程都能平等访问共享资源
  • 不正确实现的分布式锁定可能导致死锁,多个进程等待彼此释放锁

18. 分片(Sharding)
  • 用于在多个服务器之间对数据进行水平分区,称为分片。
  • 每个分片包含数据的一个子集,所有分片组合起来就构成了完整的数据集。
  • 用于提高分布式数据库的可伸缩性、性能和可用性。
  • 涉及如下步骤:
    1. 根据分片键将数据划分为更小的子集。分片键的选择使得数据可以均匀分布在各个分片上,并且可以将查询路由到正确的分片。
    2. 数据分区后,分片分布在多个服务器上。每个分片一个特定服务器,多个分片可以分配给同一个服务器。
    3. 当客户端向数据库发送查询时,该查询首先路由到协调器节点。协调器节点负责确定哪个分片包含执行查询所需的数据。
    4. 一旦协调节点确定了正确的分片,查询将被发送到包含该分片的服务器。服务器执行查询并将结果返回给协调器节点。
    5. 如果需要来自多个分片的数据完成查询,协调节点将每个分片的结果聚合并将最终结果返回给客户端。
import mysql.connector

# Connect to MySQL database
mydb = mysql.connector.connect(
  host="localhost",
  user="yourusername",
  password="yourpassword",
  database="mydatabase"
)

# Define sharding rules
shard_key = "user_id"
num_shards = 4

# Create sharded tables
for i in range(num_shards):
    cursor = mydb.cursor()
    cursor.execute(f"CREATE TABLE users_{i} (id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255), email VARCHAR(255))")

# Insert data into sharded tables
users = [
    {"id"1"name""John""email""john@example.com"},
    {"id"2"name""Jane""email""jane@example.com"},
    {"id"3"name""Bob""email""bob@example.com"},
    # ...
]

for user in users:
    shard_id = user[shard_key] % num_shards
    cursor = mydb.cursor()
    cursor.execute(f"INSERT INTO users_{shard_id} (id, name, email) VALUES (%s, %s, %s)", (user["id"], user["name"], user["email"]))

# Query data from sharded tables
cursor = mydb.cursor()
cursor.execute("SELECT * FROM users_0 UNION SELECT * FROM users_1 UNION SELECT * FROM users_2 UNION SELECT * FROM users_3")
users = cursor.fetchall()

print(users)

示例代码

  • 使用MySQL
  • 可能因用例和所使用的数据库系统而异。
  • 连接MySQL数据库,定义分片规则。在本例中,我们用 user_id作为分片键,并创建4个分片表来存储数据。
  • 通过计算基于用户ID的分片ID,并用该ID将数据插入到相应的分片表中,从而将数据插入到分片表。
  • 使用UNION语句查询所有分片表中的数据并打印结果。

优点

  • 允许数据库水平扩容,因此随着数据增长,可以向系统添加额外的服务器来处理增加的负载。
  • 因为每个服务器只需要搜索较小的数据集,因此可以更快执行查询。
  • 如果一台服务器出现故障,只会影响部分数据,系统的其余部分可以继续正常运行。

缺点

  • 需要确保对数据进行了正确的分区,并且分片均匀分布在各个服务器上。
  • 维护所有分片之间的数据一致性可能具有挑战
  • 实现分片需要额外的硬件、软件和维护成本

适用场景

  • 适用于高读写负载的大型数据库,可以横向扩展以处理增加的流量。当存在地理或法规限制,需要将数据存储在不同位置时,会非常有用。
  • 生成大量数据并需要快速存储和检索的社交媒体平台
  • 处理大量交易,需要快速存储和检索数据的电商平台
  • 需要快速安全的存储和检索大量患者数据的医疗保健应用

参考文献

Byzantine Fault Tolerance (BFT) | River Glossary

What Is Byzantine Fault Tolerance?

Byzantine Fault Tolerance (BFT) Explained

Distributed Locks with Redis

How to do distributed locking

Distributed Locking

Sharding

What is Database Sharding?

What is Sharding?

Understanding Database Sharding


你好,我是俞凡,在Motorola做过研发,现在在Mavenir做技术工作,对通信、网络、后端架构、云原生、DevOps、CICD、区块链、AI等技术始终保持着浓厚的兴趣,平时喜欢阅读、思考,相信持续学习、终身成长,欢迎一起交流学习。微信公众号:DeepNoMind

- END -

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/575985.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux——进程轮换

目录 一.进程切换 1.定义: 2.硬件上下文: 总结: 一.进程切换 1.定义: 进程切换(process switch),作为抢占式多任务的一个重要功能,其实质就是OS内核挂起正在运行的进程A,然后将先前被挂起的另一个进程B恢复运行。 2.硬…

天书奇谈3D服务端搭建架设教程Centos

天书奇谈3D服务端搭建架设教程Centos 大家好,我是艾西,今天给大家分享一款回合制MMORPG手游的搭建教程。也算是G 内回合制手游的第一梯队吧,回合制手游总会有那么一帮热爱的玩家我们话不多说直接进入正题开始操作: 架设准备&…

00后们劝你们不要去外包,3年外包,废了....

先说一下自己的情况,专科生,19年通过校招进入杭州某个外包软件公司,干了接近3年的功能测试,今年年初,感觉自己不能够在这样下去了,长时间呆在一个舒适的环境会让一个人堕落! 而我已经在一个企业干了3年的功…

UNIX网络编程卷一 学习笔记 第十六章 非阻塞式IO

套接字的默认状态是阻塞的,当发出一个不能立即完成的套接字调用时,进程将被投入睡眠,等待相应操作完成。可能阻塞的套接字调用有以下四类: 1.输入操作:包括read、readv、recv、recvfrom、recvmsg函数。如果进程对一个阻…

公司study three

ctrlwind:新建桌面 ctrlwin 箭头 切换桌面 WIN CTRL F4 删除桌面 mybatis-plusstreamlambda lambda遍历 存值 if (bpmBoEntityList ! null && !bpmBoEntityList.isEmpty()) {bpmBoEntityList.forEach(x -> {BpmBoEntityDTO dto new BpmBoEntityDT…

一.《HIT2台服韩服》背包遍历和物品品质潜规则

首先找背包遍历 1.通过物品数量我们入手找 2.首先CE搜索当前药品数量 3.然后消耗一瓶血药 4.CE继续搜索10,你会发现还剩下1423个结果 5.经过我们几次的筛选,最终找到几个结果 6.拿到地址后,我们用XDBG附加游戏后查看这个地址 7.随后我们在这个地址上下写入断点,通过消耗血药,就…

English Learning - L3 作业打卡 Lesson3 Day19 2023.5.23 周二

English Learning - L3 作业打卡 Lesson3 Day19 2023.5.23 周二 引言🍉句1: She also told us “you have to break some eggs to make an omelet”.成分划分弱读连读爆破语调 🍉句2: This means you have to do what is necessary to move forward.成分…

English Learning - L3 作业打卡 Lesson3 Day20 2023.5.24 周三

English Learning - L3 作业打卡 Lesson3 Day20 2023.5.24 周三 引言🍉句1: She would always give us nutritious food.成分划分连读语调 🍉句2: She liked serving us meat and potatoes for dinner.成分划分弱读连读爆破语调 # 🍉句3: Mea…

ACL 2019 - AMR Parsing as Sequence-to-Graph Transduction

AMR Parsing as Sequence-to-Graph Transduction 论文:https://arxiv.org/pdf/1905.08704.pdf 代码:https://github.com/sheng-z/stog 期刊/会议:ACL 2019 摘要 我们提出了一个基于注意力的模型,将AMR解析视为序列到图的转导。…

Doris---索引

前缀索引 doris中,对于前缀索引有如下约束: 他的索引键最大长度是36个字节 当他遇到了varchar数据类型的时候,即使没有超过36个字节,也会自动截断 示例1:以下表中我们定义了: user_id,age,message作为表的key ; C…

【C++】“最强查找“哈希表的底层实现

哈希表的查找的时间复杂度是O(1)~ 文章目录 前言一、哈希冲突和哈希函数二、哈希表底层实现 1.开放地址法2.链地址法总结 前言 哈希概念: 顺序结构以及平衡树 中,元素关键码与其存储位置之间没有对应的关系,因此在 查…

100道护网面试题大全(附答案)

最近日入1000的护网行动已经开始摇人了, 不少大学生在后台私信我如何参加护网、面试问些什么、有没有护网内推 作为一个负责任的博主,收到大家反馈的我,连夜发动钞能力,收集整理了一套护网蓝初面试文档 1. 什么是DDoS攻击&#x…

深度学习编译器

1.为什么需要深度学习编译器 深度学习编译器主要为解决不同框架下训练的模型部署到指定的某些设备上时所遇到的一系列复杂的问题,即将各种深度学习训练框架的模型部署到各种硬件所面临的问题; 首先深度学习领域,从训练框架看,当前…

安科瑞消防设备电源监控系统选型及介绍

安科瑞 徐浩竣 江苏安科瑞电器制造有限公司 zx acrelxhj 摘要:自 2014 年《火灾自动报警系统设计规范》实施以来,由于针对消防设备电源监控系统的规定较为模糊,尚未确立详细的规定,导致当前消防设备电源监控系统的设计和建立呈…

【C++】unordered_set 和 unordered_map 使用 | 封装

文章目录 1. 使用1. unordered_set的使用2. unordered_map的使用 2. 封装修改结构定义针对insert参数 data的两种情况复用 哈希桶的insertKeyOfT模板参数的作用 迭代器operator()beginendunordered_set对于 begin和end的复用unordered_map对于 begin和end的复用unordered_map中…

计组期末复习---个人版

(一)计算机系统概论 1.1计算机分类与发展历史 分类:电子模拟计算机和电子数字计算机 电子模拟计算机:数值由连续量来表示,运算过程是连续的 电子数字计算机:按位运算,并且不是连续地跳动运算…

【JavaSE】Java基础语法(二十四):时间日期类

文章目录 1. Date类2. Date类常用方法3. SimpleDateFormat类(应用) 1. Date类 计算机中时间原点 1970年1月1日 00:00:00 时间换算单位 1秒 1000毫秒 Date类概述 Date 代表了一个特定的时间,精确到毫秒 Date类构造方法 示例代码 publi…

数据结构-顺序表

数据结构-顺序表 线性表顺序表的概念和结构静态顺序表和动态顺序表 接口的实现顺序表的初始化顺序表的打印顺序表的销毁顺序表的增容顺序表的尾插顺序表的尾删顺序表的头插顺序表的头删顺序表的任意位置插入顺序表的任意位置删除顺序表中元素的查找 完整代码 线性表 线性表是n…

数据包伪造替换、会话劫持、https劫持之探索和测试

(一)数据包替换攻击 该攻击过程如下:伪造服务器响应客户端的数据包。监听客户端的数据包,用预先伪造的数据包,伪装成服务器返回的数据发送给客户端。 因为攻击者跟目标在同一个局域网,所以攻击者发送的数…

无监督学习——k均值

文章目录 聚类k均值代码实现1. 引入依赖2. 数据加载3. 算法实现4. 测试 无监督学习重要的应用有两类:聚类、降维。 聚类: k均值 基于密度的聚类 最大期望聚类 降维: 潜语义分析(LSA) 主成分分析(PCA&a…