简单说说redis分布式锁

news2024/10/6 18:21:52

什么是分布式锁

分布式锁(多服务共享锁)在分布式的部署环境下,通过锁机制来让多客户端互斥的对共享资源进行访问/操作。

为什么需要分布式锁

在单体应用服务里,不同的客户端操作同一个资源,我们可以通过操作系统提供的互斥(锁/信号量等等)来提供互斥的能力,保证操作资源的只有一个客户端。

在分布式的情况里,就需要第三方组件来保证对统一资源的操作的互斥。

(下单中,两个人下单,一个人下单请求走订单服务A机器,另一个人下单请求走订单服务B,这样用单体的思维处理就可能不是很合适,需要借用第三方组件配合来实现分布式锁)

分布式锁可以用redis, zookeeper , etcd等等来实现,下面我们简单说说....

redis分布式锁

简单例子

set key value ex/px nx 或setnx

以setnx 为例,可以使用 setnx key value 来进行 "加锁" ( setnx 主要 是nx加了语义:如果存在就不操作,不存在就添加),多个客户端确保只有一个加锁成功去操作统一资源

127.0.0.1:6379[1]> setnx lockObj 1   // 加锁
(integer) 1
127.0.0.1:6379[1]> setnx lockObj 1  // 存在了就不能再加锁
(integer) 0
127.0.0.1:6379[1]> get lockObj
"1"
127.0.0.1:6379[1]> del lockObj // 释放锁
(integer) 1
127.0.0.1:6379[1]> get lockObj
(nil)
127.0.0.1:6379[1]> setnx lockObj 1
(integer) 1

上面就是简单的加锁的例子,仔细思考下分布式锁使用的使用我们需要考虑哪些问题?

存在的问题

简单的总结了下,我们在使用redis分布式锁的时候需要考虑如下情况:

1- 死锁问题

2- 续锁生命周期

3- 操作原子性

4- 锁的归属权

5- redis 集群,锁的状态一致性

死锁问题

如果我们业务代码出现bug或服务器出现问题,没有及时释放锁,那么其他的客户端就永远获取不到这个加锁的资格。

这个时候我们就可以加上对应的处理逻辑:

golang 加上defer 加锁锁逻辑 , python try-Except-finally 用finally 释放锁。并且在加锁的时候加上过期时间(根据业务进行合适的加)

续锁生命周期

上面我们可以解决锁的释放问题,但是我们的业务处理时间不一定百分百能知道处理的时间,这个时候如果锁过期了但是资源操作没有做完,那么就会出现问题。

在java的 Redisson 有个watch 续命机制, golang 的话可以 借鉴 Rllock ,开启一个守护进程监听,定时续命(一定要提前续命,不要等到到时间再续)

操作原子性

在我们进行加锁 加过期时间的时候,这两个操作不能分两步操作。因为如果setnx加锁成功了,这时候失败了,那么这个锁就永远被占用了。

根据这个问题我们可以使用lua脚本或者使用第三方模块是可以同时进行这两个步骤的

`if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then return redis.call('PEXPIRE',KEYS[1],ARGV[2]) else return 0 end`

锁的归属权问题

现在有几个场景:

1- 客户端1 拿到锁处理业务, 没处理完已经过期了;这时候客户端2 拿到锁在处理,结果客户端1 处理完就释放锁了

2- 一个业务不同画像的人处理的业务不同,这时候我们就需要根据不同画像人进行分配 “锁”

我们在实际开发的时候有时候需要了解业务场景, 有时候需要给锁加一个所属权的令牌。可以在setnx key时,key设定特殊化的数值

redis 集群,锁的状态一致性

在redis采用集群(主从),如果master加锁失败了,这时候服务宕机了,slave还同步这个 key ,那么这个时候就会有客户端加锁成功

redis作者对于这个问题提出了解答:REDLOCK

zookeeper 实现分布式锁

简单例子

[zk: localhost:2181(CONNECTED) 62] create  /lock 
Created /lock
[zk: localhost:2181(CONNECTED) 63] create -s -e /lock/req  // 创建临时节点
Created /lock/req0000000000 
[zk: localhost:2181(CONNECTED) 64] create -s -e /lock/req
Created /lock/req0000000001
[zk: localhost:2181(CONNECTED) 65] create -s -e /lock/req
Created /lock/req0000000002
[zk: localhost:2181(CONNECTED) 67] ls /lock // 节点下的临时节点
[req0000000000, req0000000001, req0000000002] 
[zk: localhost:2181(CONNECTED) 68] delete /lock/req0000000000 // 释放第一锁

golang的例子

package main

import (
    "fmt"
    "github.com/samuel/go-zookeeper/zk"
    "sort"
    "time"
)

func sortChildren(children []string) {
    sort.Slice(children, func(i, j int) bool {
        return children[i] < children[j]
    })
}
func main() {
    go func() {
        conn, _, err := zk.Connect([]string{"xx.xx.xx.xx:2181"}, time.Second*5)
        if err != nil {
            fmt.Println("Connect:", err.Error())
            return
        }
        defer conn.Close()

        lockPath := "/locksObj"
        lockName := "lock"

        // 创建锁的根节点
        _, err = conn.Create(lockPath, []byte{}, int32(0), zk.WorldACL(zk.PermAll))
        if err != nil && err != zk.ErrNodeExists {
            fmt.Println("Create:", err.Error())
            return
        }

        // 获取锁
        lockNodePath, err := conn.CreateProtectedEphemeralSequential(lockPath+"/"+lockName+"-", []byte{}, zk.WorldACL(zk.PermAll))
        if err != nil {
            fmt.Println("CreateProtectedEphemeralSequential:", err.Error())
            return
        }
        // dowork

        for {
            children, _, err := conn.Children(lockPath)
            if err != nil {
                fmt.Println("Children:", err.Error())
                return
            }

            // 对子节点按照序列号进行排序
            sortChildren(children)

            // 检查自己创建的节点是否是第一个节点
            if lockNodePath == lockPath+"/"+children[0] {
                // 获取到了锁
                fmt.Println("Acquired lock")
                break
            }

            // 监听前一个节点的删除事件
            exists, _, watch, err := conn.ExistsW(lockPath + "/" + children[0])
            if err != nil {
                fmt.Println("ExistsW: ", err.Error())
                break
            }

            if !exists {
                // 前一个节点已删除,再次检查自己创建的节点是否是第一个节点
                children, _, err = conn.Children(lockPath)
                if err != nil {
                    fmt.Println("Children: ", err.Error())
                    break
                }

                sortChildren(children)

                if lockNodePath == lockPath+"/"+children[0] {
                    // 获取到了锁
                    fmt.Println("Acquired lock")
                    break
                }
            }

            // 等待前一个节点的删除事件
            <-watch
        }

        // 执行需要保护的代码
        fmt.Println("====start-1====")

        time.Sleep(3 * time.Second)
        fmt.Println(11111111)

        // 释放锁,删除自己创建的节点
        err = conn.Delete(lockNodePath, -1)
        if err != nil {
            fmt.Println("Delete: ", err.Error())
            return
        }

        fmt.Println("Released lock")
    }()
    go func() {
        conn, _, err := zk.Connect([]string{"xx.xx.xx.xx:2181"}, time.Second*5)
        if err != nil {
            fmt.Println("Connect:", err.Error())
            return
        }
        defer conn.Close()

        lockPath := "/locksObj"
        lockName := "lock"

        // 创建锁的根节点
        _, err = conn.Create(lockPath, []byte{}, int32(0), zk.WorldACL(zk.PermAll))
        if err != nil && err != zk.ErrNodeExists {
            fmt.Println("Create:", err.Error())
            return
        }

        // 获取锁
        lockNodePath, err := conn.CreateProtectedEphemeralSequential(lockPath+"/"+lockName+"-", []byte{}, zk.WorldACL(zk.PermAll))
        if err != nil {
            fmt.Println("CreateProtectedEphemeralSequential:", err.Error())
            return
        }

        for {
            children, _, err := conn.Children(lockPath)
            if err != nil {
                fmt.Println("Children:", err.Error())
                return
            }

            // 对子节点按照序列号进行排序
            sortChildren(children)

            // 检查自己创建的节点是否是第一个节点
            if lockNodePath == lockPath+"/"+children[0] {
                // 获取到了锁
                fmt.Println("Acquired lock")
                break
            }

            // 监听前一个节点的删除事件
            exists, _, watch, err := conn.ExistsW(lockPath + "/" + children[0])
            if err != nil {
                fmt.Println("ExistsW: ", err.Error())
                break
            }

            if !exists {
                // 前一个节点已删除,再次检查自己创建的节点是否是第一个节点
                children, _, err = conn.Children(lockPath)
                if err != nil {
                    fmt.Println("Children: ", err.Error())
                    break
                }

                sortChildren(children)

                if lockNodePath == lockPath+"/"+children[0] {
                    // 获取到了锁
                    fmt.Println("Acquired lock")
                    break
                }
            }

            // 等待前一个节点的删除事件
            <-watch
        }

        // 执行需要保护的代码
        fmt.Println("====start-2====")

        time.Sleep(5 * time.Second)
        fmt.Println(22222222)
        // 释放锁,删除自己创建的节点
        err = conn.Delete(lockNodePath, -1)
        if err != nil {
            fmt.Println("Delete: ", err.Error())
            return
        }

        fmt.Println("Released lock")
    }()
    time.Sleep(10 * time.Second)
}

zookeeper 怎么实现 分布式锁的

zookeeper 会建立一个长链接,监听锁对象节点的状态和事件

ETCD实现 分布式锁

简单实现

package main

import (
	"context"
	"fmt"
	clientv3 "go.etcd.io/etcd/client/v3"
	"time"
)


func main() {
	go func() {
		config := clientv3.Config{
			Endpoints:   []string{"xx.xx.xx.xx:2379"},
			DialTimeout: 5 * time.Second,
		}

		// 获取客户端连接
		client, err := clientv3.New(config)
		if err != nil {
			fmt.Println(err)
			return
		}

		//  上锁
		// 用于申请租约
		lease := clientv3.NewLease(client)

		// 申请一个10s的租约
		leaseGrantResp, err := lease.Grant(context.TODO(), 10) //10s
		if err != nil {
			fmt.Println(err)
			return
		}

		// 拿到租约的id
		leaseID := leaseGrantResp.ID

		ctx, cancelFunc := context.WithCancel(context.TODO())

		// 停止
		defer cancelFunc()
		// 确保函数退出后,租约会失效
		defer lease.Revoke(context.TODO(), leaseID)

		// 自动续租
		keepRespChan, err := lease.KeepAlive(ctx, leaseID)
		if err != nil {
			fmt.Println(err)
			return
		}

		// 处理续租应答的协程
		go func() {
			select {
			case keepResp := <-keepRespChan:
				if keepRespChan == nil {
					fmt.Println("lease has expired")
					break
				} else {
					// 每秒会续租一次
					fmt.Println("收到自动续租应答", keepResp.ID)
				}
			}
		}()

		// if key 不存在,then设置它,else抢锁失败
		kv := clientv3.NewKV(client)
		// 创建事务
		txn := kv.Txn(context.TODO())
		// 如果key不存在
		txn.If(clientv3.Compare(clientv3.CreateRevision("/lockObj/lock/job"), "=", 0)).
			Then(clientv3.OpPut("/lockObj/lock/job", "", clientv3.WithLease(leaseID))).
			Else(clientv3.OpGet("/lockObj/lock/job")) //如果key存在

		// 提交事务
		txnResp, err := txn.Commit()
		if err != nil {
			fmt.Println(err)
			return
		}

		// 判断是否抢到了锁
		if !txnResp.Succeeded {
			fmt.Println("锁被占用了:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
			return
		}

		// 处理业务

		fmt.Println("======work======")
		time.Sleep(5 * time.Second)
		fmt.Println("======END======")


	}()
	time.Sleep(20 * time.Second)
}

实现原理

etcd 支持以下功能,正是依赖这些功能来实现分布式锁的:

  • Lease机制:即租约机制(TTL,Time To Live),etcd可以为存储的kv对设置租约,当租约到期,kv将失效删除;同时也支持续约,keepalive
  • Revision机制:每个key带有一个Revision属性值,etcd每进行一次事务对应的全局Revision值都会+1,因此每个key对应的Revision属性值都是全局唯一的。通过比较Revision的大小就可以知道进行写操作的顺序
  • 在实现分布式锁时,多个程序同时抢锁,根据Revision值大小依次获得锁,避免“惊群效应”,实现公平锁
  • Prefix机制:也称为目录机制,可以根据前缀获得该目录下所有的key及其对应的属性值
  • watch机制:watch支持watch某个固定的key或者一个前缀目录,当watch的key发生变化,客户端将收到通知

执行流程

  • 步骤 1: 准备

客户端连接 Etcd,以 /lock/mylock 为前缀创建全局唯一的 key,假设第一个客户端对应的 key="/lock/mylock/UUID1",第二个为 key="/lock/mylock/UUID2";客户端分别为自己的 key 创建租约 - Lease,租约的长度根据业务耗时确定,假设为 15s;

  • 步骤 2: 创建定时任务作为租约的“心跳”

当一个客户端持有锁期间,其它客户端只能等待,为了避免等待期间租约失效,客户端需创建一个定时任务作为“心跳”进行续约。此外,如果持有锁期间客户端崩溃,心跳停止,key 将因租约到期而被删除,从而锁释放,避免死锁。

  • 步骤 3: 客户端将自己全局唯一的 key 写入 Etcd

进行 put 操作,将步骤 1 中创建的 key 绑定租约写入 Etcd,根据 Etcd 的 Revision 机制,假设两个客户端 put 操作返回的 Revision 分别为 1、2,客户端需记录 Revision 用以接下来判断自己是否获得锁。

  • 步骤 4: 客户端判断是否获得锁

客户端以前缀 /lock/mylock 读取 keyValue 列表(keyValue 中带有 key 对应的 Revision),判断自己 key 的 Revision 是否为当前列表中最小的,如果是则认为获得锁;否则监听列表中前一个 Revision 比自己小的 key 的删除事件,一旦监听到删除事件或者因租约失效而删除的事件,则自己获得锁。

  • 步骤 5: 执行业务

获得锁后,操作共享资源,执行业务代码。

  • 步骤 6: 释放锁

完成业务流程后,删除对应的key释放锁。

扩展

马丁·克莱普曼 对 分布式锁以及对redlock的看法

分布式锁的目的是确保在可能尝试执行同一工作的多个节点中,只有一个节点实际执行该操作(至少一次只有一个)。

主要有两个功能:

1- 效率:使用锁可以避免不必要地重复相同的工作,多执行一次也无妨,只要最终正确就行

2- 正确性:锁定可以防止并发进程互相干扰并扰乱系统状态。如果锁定失败并且两个节点同时处理同一数据,则会导致文件损坏、数据丢失、永久不一致。

马丁认为锁在分布式系统使用会碰到以下三类问题:

1- 网络延迟:您可以保证数据包始终在某个保证的最大延迟内到达

2- GC问题: 导致锁无法续期等等问题

3- 时钟飘移:依赖于时钟的就容易出现问题

马丁认为redlock 强依赖于时钟,节点之间时钟不对,会使锁不可靠:

假设系统有五个 Redis 节点(A、B、C、D 和 E)和两个客户端(1 和 2)。如果其中一个 Redis 节点上的时钟向前跳动,会发生什么情况?

  1. 客户端 1 获取节点 A、B、C 上的锁。由于网络问题,无法访问 D 和 E。
  2. 节点C上的时钟向前跳跃,导致锁过期。
  3. 客户端2获取节点C、D、E上的锁。由于网络问题,无法访问A和B。
  4. 客户 1 和 2 现在都相信他们持有锁。

如果 C 在将锁持久保存到磁盘之前崩溃并立即重新启动,则可能会发生类似的问题。因此,Redlock 文档建议延迟重新启动崩溃的节点,至少要延迟最长寿命锁的生存时间。但这种重新启动延迟再次依赖于对时间的相当准确的测量,并且如果时钟跳跃就会失败。

马丁提出了 fencing token 方案

客户端 1 获取租约并获得令牌 33,但随后它进入长时间暂停状态并且租约到期。客户端 2 获取租约,获取令牌 34(数字始终增加),然后将其写入发送到存储服务,包括 34 的令牌。稍后,客户端 1 恢复正常并将其写入发送到存储服务,包括其令牌值 33。但是,存储服务器记得它已经处理了具有更高令牌编号 (34) 的写入,因此它拒绝具有令牌 33 的请求。

总结

分布式锁不是百分百安全,我们要根据实际使用情况来考虑锁的使用(解决效率问题还是正确行问题),在使用分布式锁的时候我们需要考虑锁的续期,锁归属,集群数据一致性,操作原子性,GC,时钟飘逸,网络延迟等等的问题。在cap 理论里, redis保证了ap, zk和etcd保证cp ,所以实际使用中根据业务的情况,选择redis/zk/etcd之一来实现分布式锁。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1434506.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java SPI 代码示例

Java Service Provider Interface 是JDK自带的服务提供者接口又叫服务发现机制更是一种面向接口的设计思想。即JDK本身提供接口类&#xff0c; 第三方实现其接口&#xff0c;并作为jar包或其他方式注入到其中&#xff0c; 在运行时会被JDK ServiceLoader 发现并加载&#xff0c…

锐捷VSU和M-LAG介绍

参考网站 堆叠、级联和集群的概念 什么是堆叠&#xff1f; 框式集群典型配置 RG-S6230[RG-S6501-48VS8CQ]系列交换机 RGOS 12.5(4)B1005版本 配置指南 总结 根据以上的几篇文章总结如下&#xff1a; 级联&#xff1a;简单&#xff0c;交换机相连就叫级联&#xff0c;跟搭…

SaaS 电商设计 (八) 直接就能用的一套商品池完整的设计方案(建议收藏)

目录 一.前言1.1 在哪些业务场景里使用1.2 一些名词搞懂他1.3 结合业务思考一下-业务or产品的意图 二.方案设计2.1 业务主流程2.2 一步步带你分析B端如何配置2.3 数据流2.3.1 ES 数据表建设2.3.2 核心商品池流程2.3.2.1 商品池B端维护流程2.3.2.2 商品池版本更新逻辑 2.4 核心代…

PySpark(三)RDD持久化、共享变量、Spark内核制度,Spark Shuffle

目录 RDD持久化 RDD 的数据是过程数据 RDD 缓存 RDD CheckPoint 共享变量 广播变量 累加器 Spark 内核调度 DAG DAG 的宽窄依赖和阶段划分 内存迭代计算 Spark是怎么做内存计算的? DAG的作用?Stage阶段划分的作用? Spark为什么比MapReduce快&#xff1f; Spar…

Java项目服务器部署

Java项目服务器部署 Tomocat Java项目进行云服务器部署 如果有需要比赛部署的同学也可以联系我&#xff0c;我后续还会对于这个专栏继续展开 1、云服务器选购 1.1 阿里云选购&#xff08;宝塔面板&#xff09; 1.2 端口放行 这里说的就是端口放行&#xff0c;后面一些访问比…

打造直播带货商城APP:源码开发技术全解析

直播带货商城APP的创新模式吸引了用户&#xff0c;提升销售业绩&#xff0c;已经成为了近期开发者讨论的热门话题。今天&#xff0c;小编将深入讲解如何打造一款功能强大的直播带货商城APP&#xff0c;着重分析源码开发技术&#xff0c;为开发者提供全方位的指导。 一、前期准…

电商推荐系统

此篇博客主要记录一下商品推荐系统的主要实现过程。 一、获取用户对商品的偏好值 代码实现 package zb.grms;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Doub…

docker自定义镜像并使用

写在前面 本文看下如何自定义镜像。 ik包从这里 下载。 1&#xff1a;自定义带有ik的es镜像 先看下目录结构&#xff1a; /opt/program/mychinese [rootlocalhost mychinese]# ll total 16 -rw-r--r-- 1 root root 1153 Feb 5 04:18 docker-compose.yaml -rw-rw-r-- 1 el…

2024智慧城市新纪元:引领未来,重塑都市生活

随着科技的飞速发展和数字化转型的不断深入&#xff0c;2024年智慧城市领域迎来了全新的发展格局。 这一年&#xff0c;智慧城市的建设更加注重人性化、可持续性和创新性&#xff0c;为城市居民带来了前所未有的便捷与舒适。以下将重点关注智慧城市的几个核心内容&#xff0c;…

Java设计模式-模板方法模式(14)

行为型模式 行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对…

【UE 材质】扇形材质

目录 效果 步骤 &#xff08;1&#xff09;控制扇形的弧宽度 &#xff08;2&#xff09;控制扇形的角度 &#xff08;3&#xff09;完整节点 效果 步骤 &#xff08;1&#xff09;控制扇形的弧宽度 创建一个材质&#xff0c;混合模式设置为“Additive”&#xff0c;着色…

代码随想录算法训练营DAY13 | 栈与队列 (3)

一、LeetCode 239 滑动窗口最大值 题目链接&#xff1a;239.滑动窗口最大值https://leetcode.cn/problems/sliding-window-maximum/ 思路&#xff1a;使用单调队列&#xff0c;只保存窗口中可能存在的最大值&#xff0c;从而降低时间复杂度。 public class MyQueue{Deque<I…

On the Spectral Bias of Neural Networks论文阅读

1. 摘要 众所周知&#xff0c;过度参数化的深度神经网络(DNNs)是一种表达能力极强的函数&#xff0c;它甚至可以以100%的训练精度记忆随机数据。这就提出了一个问题&#xff0c;为什么他们不能轻易地对真实数据进行拟合呢。为了回答这个问题&#xff0c;研究人员使用傅里叶分析…

mysql+node.js+html+js完整扫雷项目

一.下载 可以直接下载绑定资源&#xff0c; 也可以访问&#xff1a;克隆仓库&#xff1a;mine_clearance: mysqlnode.jshtmljs完整扫雷项目 (gitee.com) 二.运行sql数据文件 将mysql数据文件导入到本地 先在本地localhost里创建数据库 mine_clearance&#xff0c; 然后如图&…

编译原理本科课程 专题5 基于 SLR(1)分析的语义分析及中间代码生成程序设计

一、程序功能描述 本程序由C/C编写&#xff0c;实现了赋值语句语法制导生成四元式&#xff0c;并完成了语法分析和语义分析过程。 以专题 1 词法分析程序的输出为语法分析的输入&#xff0c;完成以下描述赋值语句 SLR(1)文法的语义分析及中间代码四元式的过程&#xff0c;实现…

进程和线程的区别详解

&#x1f3a5; 个人主页&#xff1a;Dikz12&#x1f4d5;格言&#xff1a;那些在暗处执拗生长的花&#xff0c;终有一日会馥郁传香欢迎大家&#x1f44d;点赞✍评论⭐收藏 目录 进程 进程在系统中是如何管理的 进一步认识PCB 线程 能否一直增加线程数目来提高效率 进程和线程…

【240126】上海大学—调剂信息

上海大学 学校层级&#xff1a;211 调剂专业&#xff1a;081000 信息与通信工程 发布时间&#xff1a;2024.1.26 发布来源&#xff1a;网络发布 调剂要求&#xff1a;要求考数一英一且初试成绩在320分以上 来源说明 1、官方发布&#xff1a;学校官网、研招网 2、网络发布…

EOF和0区别

题目描述 KiKi学习了循环&#xff0c;BoBo老师给他出了一系列打印图案的练习&#xff0c;该任务是打印用“*”组成的X形图案。 输入描述&#xff1a; 多组输入&#xff0c;一个整数&#xff08;2~20&#xff09;&#xff0c;表示输出的行数&#xff0c;也表示组成“X”的反斜…

Architecture Lab:Part C【流水线通用原理/Y86-64的流水线实现/实现IIADDQ指令】

目录 任务描述 知识回顾 流水线通用原理 Y86-64流水线实现&#xff08;PIPE-与PIPE&#xff09; 开始实验 IIADDQ指令的添加 优化 ncopy.ys 仅用第四章知识&#xff0c;CEP11.55 8x1展开&#xff0c;CPE9.35 8x1展开2x1展开消除气泡&#xff0c;CPE8.10 流水线化通过…

在VM虚拟机上搭建MariaDB数据库服务器

例题&#xff1a;搭建MariaDB数据库服务器&#xff0c;并实现主主复制。 1.在二台服务器中分别MariaDB安装。 2.在二台服务器中分别配置my.cnf文件&#xff0c;开启log_bin。 3.在二台服务器中分别创建专用于数据库同步的用户replication_user&#xff0c;并授权SLAVE。&#x…