zookeeper的介绍和用docker搭建zookeeper集群,以及Go语言使用zookeeper

news2025/1/22 14:41:34

typora-copy-images-to: imgs


Zookeeper的使用

1、Zookeeper简介

 Apache ZooKeeper 是 Apache 软件基金会的一个软件项目,为大型分布式系统提供开源分布式配置服务、同步服务和命名注册。ZooKeeper原本是Hadoop的一个子项目,但现在它本身已经是一个顶级项目了。

 zookeeper是经典的分布式数据一致性解决方案,致力于为分布式应用提供一个高性能,高可用,且具有严格顺序访问控制能力的分布式协调存储服务。 

2、使用Docker快速部署zookeeper

2.1、Docker官方镜像

Docker Zookeeper

2.2、Docker安装zookeeper

下载zookeeper最新版的镜像

docker search zookeeper 
docker pull zookeeper
docker images
docker inspect zookeeper 

docker inspect zookeeper用来查看zookeeper的详细信息

在/root/docker/目录下新建一个zookeeper挂载点文件夹

mkdir /root/docker/zookeeper

挂载本地文件夹并启动服务

docker run -e TZ="Asia/Shanghai" -d -p 2181:2181 -v /root/docker/zookeeper:/data --name zookeeper --restart always zookeeper

参数解释

-e TZ="Asia/Shanghai" # 指定上海时区
-d # 指示后台运行容器
-p 2181:2181 # 端口映射,前面的端口为本地的2181端口,后者为容器内的端口
--name # 设置创建的容器的名称
-v # 挂在文件,将本地目录或文件挂在到容器指定目录
--restart always # 始终重新启动zookeeper

2.3、进入zookeeper容器客户端

方式一
docker run -it --rm --link zookeeper:zookeeper zookeeper zkCli.sh -server zookeeper       

运行上诉命令后会进入到zkCli

在这里插入图片描述

方式二

前台进入zookeeper容器执行脚本新建一个Client

 docker exec -it zookeeper bash   //进入zookeeper容器,退出时不会关闭容器
 ./bin/zkCli.sh		//执行脚本新建一个Client    

3、docker构建zookeeper集群

3.1、创建docker-compose.yml文件

cd /root/docker/docker-compose/zookeeper # 进入你想要存放docker-compose.yml的文件
vim docker-compose.yml		# 把下面的代码复制到docker-compose.yml文件中


version: 'latest'
services:
  zoo1:
    image: zookeeper
    restart: always
    hostname: zoo1
    container_name: zoo1
    ports:
      - 2181:2181
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
    volumes:
      - /root/docker/docker-compose/zoo1/data:/data
      - /root/docker/docker-compose/zoo1/datalog:/datalog

  zoo2:
    image: zookeeper
    restart: always
    hostname: zoo2
    container_name: zoo2
    ports:
      - 2182:2181
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
    volumes:
      - /root/docker/docker-compose/zoo2/data:/data
      - /root/docker/docker-compose/zoo2/datalog:/datalog

  zoo3:
    image: zookeeper
    restart: always
    hostname: zoo3
    container_name: zoo3
    ports:
      - 2183:2181
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
    volumes:
      - /root/docker/docker-compose/zoo3/data:/data
      - /root/docker/docker-compose/zoo3/datalog:/datalog

volumes表示的是文件映射要根据自己的情况进行修改

3.2、执行构建集群命令

3.2.1、安装Docker Compose(Linux下)

我们先要搭建以下docker-compose的环境,执行以下命令下载Docker Compose,要更改版本的话替换v2.2.2就好

sudo curl -L "https://github.com/docker/compose/releases/download/v2.2.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose

赋予可执行权限

sudo chmod +x /usr/local/bin/docker-compose

创建软链接

sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose
3.2.2、启动docker-compose.yml文件

在执行命令之前要保证2181-2183端口没有被占用

cd /root/docker/docker-compose/zookeeper  # 到docker-compose.yml所在的目录
docker-compose up -d
3.2.3、验证集群是否搭建成功
docker exec -it zoo1 bash
zkServer.sh status

看到如下信息表示集群已经搭建成功,可以看出zoo1的角色是follower,查看其他的会发现zoo2是follower,zoo3是leader。

在这里插入图片描述

4、Zookeeper的简单使用

3.1、Go语言与Zookeeper服务端建立连接

3.1.1、建立连接的代码
package main

import (
	"fmt"
	"log"
	"time"

	"github.com/samuel/go-zookeeper/zk"
)
//打印节点状态信息的函数
func StatePrintf(state *zk.Stat) {
	fmt.Println("State->")
	fmt.Printf("Czxid = %v,\nMzxid = %v,\nCtime = %v,\nMtime = %v,\nVersion = %v,\nCversion = %v,\nAversion = %v,\nEphemeralOwner = %v,\nDataLength = %v,\nNumChildren = %v,\nPzxid = %v,\n",
		state.Czxid,          //创建该节点的zxid
		state.Mzxid,          //最后一个修改该节点的zxid
		state.Ctime,          //创建该节点的时间
		state.Mtime,          //最后一次修改该节点的时间
		state.Version,        //修改该节点数据的次数
		state.Cversion,       //修改该节点儿子节点的次数
		state.Aversion,       //修改ACL的次数
		state.EphemeralOwner, //创建该临时节点的会话id
		state.DataLength,     //节点数据的长度
		state.NumChildren,    //该节点的儿子节点的数量
		state.Pzxid,          //最后一个修改的儿子节点的zxid(当创建或者删除子节点时才会改变)
	)
}
//输出错误信息的函数
func FailOnError(msg string, err error) {
	if err != nil {
		log.Fatalf("%v : %v\n", msg, err)
	}
}
func main() {
	hosts := []string{"192.168.17.95:2181"}
	conn, _, err := zk.Connect(hosts, time.Second*5)
	if err != nil {
		fmt.Println("Failed to Connected to zookeeper")
	}
	CreateNode(conn)
	SetTest(conn)
	GetTest(conn)
	DeleteTest(conn)
    //休眠5分钟再关闭,以便看到创建的临时节点
	time.Sleep(5 * time.Minute)
	defer conn.Close()
}
3.1.2、关于zxid的一些解释
Zookeeper中的**zxid**是一个长为64位的数字。高32位用来表示当前Leader的周期,低32位用来表示当前请求产生的事务在当前Leader周期内的顺序。每产生一个新的事务,zxid的低32位就会自动加1。当zxid达到最大值,即zxid的低32位达到`0xffffffff`,就会触发集群强制选主,Leader变更后高32位都会自增1,并重置zxid低32位的计数值(zxid高32位变为新Leader的周期,低32位变为0)。

如果一个zookeeper集群每秒能操作10000次,即10k/s ops,那么
	2^32/(86400*10000)≈4.97天

也就是说4.97天之后就会进行自动切主的操作,对于一些服务来说平均五天切一次主是难以容许的,我们可以重新设计zxid,增加低位技术的位数到自己需要的值,假设64位全部用做低位计数。

	2^64/(86400*10000)≈21350398233.46天,即58494241.73年

一般来说集群不可能可靠的运行这么多年,所以重新设计zxid还是要根据业务需求来进行。

3.2、创建节点

3.2.1、zkCli操作
create /节点路径 value  # 可以在创建节点的同时设置节点的值,创建的节点是持久化的节点
create -e /节点路径 value  # 创建临时节点,在客户端断开后会自动删除的节点
create -s /节点路径 value  # 创建顺序节点,zookeeper会自动在节点路径后面加顺序递增的编号

直接创建节点

在这里插入图片描述

临时节点,顺序节点,quit退出之后再进入刚创建的temp节点会消失。

在这里插入图片描述

3.2.2、Go语言API操作
// 创建节点
func CreateNode(conn *zk.Conn) {
	//创建永久节点
	path, err := conn.Create("/app3", []byte("zhangsan"), 0, zk.WorldACL(zk.PermAll))
	FailOnError("Failed to Create node", err)
	fmt.Printf("Created node path[%v]\n", path)

	//创建临时节点,在会话结束时会自动删除临时节点
	ephemeral, err := conn.Create("/ephemeral", nil, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
	FailOnError("Failed to Create ephemeral node", err)
	fmt.Printf("Created ephemeral node path[%v]\n", ephemeral)

	//创建顺序节点
	sequence, err := conn.Create("/sequence", nil, zk.FlagSequence, zk.WorldACL(zk.PermAll))
	FailOnError("Failed to Create sequence node", err)
	fmt.Printf("Created sequence node path[%v]\n", sequence)

	//创建临时顺序节点 create -es /ephemeralsequece
	ephemeralsequece, err := conn.Create("/ephemeralsequece", nil, zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll))
	FailOnError("Failed to Create ephemeralsequece node", err)
	fmt.Printf("Created ephemeralsequece node path[%v]\n", ephemeralsequece)
}

3.3、修改节点

3.3.1、zkCli操作
set /节点路径 value

示例如下

在这里插入图片描述

3.3.2、Go语言API操作
// Set操作
func SetTest(conn *zk.Conn) {
	//获取节点的version信息
	_, state, _ := conn.Get("/app3")

	//set /app3 lisi
	state, err := conn.Set("/app3", []byte("lisi"), state.Version)
	FailOnError("Failed to Set Value", err)
	StatePrintf(state)

	//获取修改后的值
	value, _, err := conn.Get("/app3")
	FailOnError("Failed to Get New value", err)
	fmt.Println("New Value = ", value)
}

3.4、查询节点

3.4.1、zkCli操作
ls 目录   # 查看目录下的所有子节点 
get /节点路径
ls -s 目录   # 查看目录的所有详细信息

示例如下

在这里插入图片描述

在这里插入图片描述

3.4.2、Go语言API操作
// 查询节点
func GetTest(conn *zk.Conn) {
	result, state, err := conn.Get("/app3")
	//获取子节点
	//children,state,err:=conn.Children("/app3")
	FailOnError("Failed to Get Node Info", err)
	fmt.Printf("result:[%v]\n", string(result))
	StatePrintf(state)
}

3.5、删除节点

3.5.1、zkCli操作
delete /节点路径   # 删除单个节点
deleteall /节点路径 	# 删除带有子节点的节点

示例如下

在这里插入图片描述

3.5.2、Go语言API操作
// 删除节点
func DeleteTest(conn *zk.Conn) {
	path := "/app3"
	//先判断节点存不存在
	exists, state, _ := conn.Exists(path)
	fmt.Printf("path[%s] exists:%v\n", path, exists)
	//删除节点
	err := conn.Delete("/app3", state.Version)
	FailOnError("Failed to Delete node", err)
	fmt.Printf("path[%s] is deleted.", path)

	exists, _, _ = conn.Exists(path)
	fmt.Printf("path[%s] exists: %v\n", path, exists)
}

5、go-zookeeper权限(ACL)

zookeeper的节点有五种权限:Create、Read、Write、Delete、Admin。

ACL权限由schema:id:permissions组成

schema有四种方式

  • world
  • auth
  • digest
  • ip

下面对这四种方式都测试一遍

4.1、world

默认方式,相当于全世界都能访问。

/app3节点的权限修改为 crwa 后尝试删除其子节点 /p1

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/go-zookeeper/zk"
)

func FailOnError(msg string, err error) {
	if err != nil {
		log.Fatalf("%v : %v\n", msg, err)
	}
}
func main() {
	hosts := []string{"192.168.17.95:2181"}
	conn, _, err := zk.Connect(hosts, time.Second*5)
	FailOnError("Failed to Connected to zookeeper", err)
	defer conn.Close()

	//获取/app3节点的acl信息
	acl, state, err := conn.GetACL("/app3")
	FailOnError("Failed to GetACL", err)
	fmt.Println("\nget acl:")
	fmt.Println("scheme =", acl[0].Scheme)
	fmt.Println("id =", acl[0].ID)
	fmt.Println("permissions =", acl[0].Perms)

	//修改/app3节点的权限修改为crwa
	perms := zk.PermCreate | zk.PermRead | zk.PermWrite | zk.PermAdmin
	_, err = conn.SetACL("/app3", zk.WorldACL(int32(perms)), state.Aversion)
	FailOnError("Failed to SetACL", err)
	fmt.Println("SetAcl successful.")

	//create child node
	_, err = conn.Create("/app3/p1", nil, 0, zk.WorldACL(zk.PermAll))
	FailOnError("Failed to Create node", err)

	//get state of child node
	_, state, err = conn.Get("/app3/p1")
	FailOnError("Failed to Get node info", err)

	//delete /app3/p1
	err = conn.Delete("/app3/p1", state.Version)
	FailOnError("Failed to Delete Node", err)
}

测试结果如下:因为我们没有赋予/app3节点Delete权限,即使子结点/p1赋予了全部权限也不能删除该子节点。

在这里插入图片描述

4.2、auth

auth 用来授予用户权限,所以需要先创建用户。

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/go-zookeeper/zk"
)

func FailOnError(msg string, err error) {
	if err != nil {
		log.Fatalf("%v : %v\n", msg, err)
	}
}
func main() {
	hosts := []string{"192.168.17.95:2181"}
	conn, _, err := zk.Connect(hosts, time.Second*5)
	FailOnError("Failed to Connected to zookeeper", err)
	defer conn.Close()

	//获取/auth节点的状态信息
	_, state, err := conn.Get("/auth")
	FailOnError("Failed to Get node info", err)

	//用户授权,用户不存在的话会新建
	err = conn.AddAuth("digest", []byte("user1:123456"))
	FailOnError("Failed to AddAuth", err)

	acl := zk.ACL{
		Scheme: "auth",
		Perms:  zk.PermAll,
		ID:     "user1:123456",
	}

	//为用户授权
	_, err = conn.SetACL("/auth", []zk.ACL{acl}, state.Version)
	FailOnError("Failed to SetACL", err)
	fmt.Println("AddAuthSuccess")
}

在这里插入图片描述

授权成功之后如果在其他连接中要查询节点信息要先验证用户信息才能进入下一步操作,也就是把conn.AddAuth操作提前,如果使用不正确的用户名和密码,得到的会是同样的用户认证失败的结果。

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/go-zookeeper/zk"
)

func FailOnError(msg string, err error) {
	if err != nil {
		log.Fatalf("%v : %v\n", msg, err)
	}
}
func main() {
	hosts := []string{"192.168.17.95:2181"}
	conn, _, err := zk.Connect(hosts, time.Second*5)
	FailOnError("Failed to Connected to zookeeper", err)
	defer conn.Close()

	//先进行访问/auth节点的话会报错
	_, _, err = conn.Get("/auth")
	if err != nil {
		fmt.Println("Get Node info error:", err)
	}

	//要先进行AddAuth操作
	err = conn.AddAuth("digest", []byte("user1:123456"))
	FailOnError("Failed to Add Auth", err)

	//再获取/auth节点的信息就不会报错了
	acl, _, err := conn.GetACL("/auth")
	FailOnError("Failed to Get node info", err)
	fmt.Println("acl 信息:")
	for i := 0; i < len(acl); i++ {
		fmt.Println("scheme =", acl[0].Scheme)
		fmt.Println("id =", acl[0].ID)
		fmt.Println("permissions =", acl[0].Perms)
	}
}

从测试结果来看在未进行AddAuth操作时我们是获取不到/auth节点信息的,节点的密码返回的是加密后的密码。

在这里插入图片描述

4.3、digest

digestauth基本相同,唯一的区别在于设置权限时,密码需要使用密文。

zk golang 库中有专为digest构造的方法:

zk.DigestACL(perms int32, user, password string)

此方法传入的密码需要是明文,其内部逻辑会将明文转为密文再向 zookeeper 传递。

使用示例:

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/go-zookeeper/zk"
)

func FailOnError(msg string, err error) {
	if err != nil {
		log.Fatalf("%v : %v\n", msg, err)
	}
}
func main() {
	hosts := []string{"192.168.17.95:2181"}
	conn, _, err := zk.Connect(hosts, time.Second*5)
	FailOnError("Failed to Connected to zookeeper", err)
	defer conn.Close()

	//获取/digest
	_, state, err := conn.Get("/digest")
	FailOnError("Failed to Get node info", err)

	//用户授权,用户不存在的话会新建
	err = conn.AddAuth("digest", []byte("user1:123456"))
	FailOnError("Failed to AddAuth", err)

	//zk.DigestACL会将传入的明文转换成密文acl
	acl := zk.DigestACL(zk.PermAll, "user1", "123456")

	//为用户授权
	_, err = conn.SetACL("/digest", acl, state.Version)
	FailOnError("Failed to SetACL", err)
	fmt.Println("节点[/digest]已对用户 user1 授权")
}

4.4、ip

ip 权限顾名思义,就是限制 ip 地址的访问权限。

把节点的权限设置给指定的 ip 地址后,其他 ip 将无法访问该节点。

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/go-zookeeper/zk"
)

func FailOnError(msg string, err error) {
	if err != nil {
		log.Fatalf("%v : %v\n", msg, err)
	}
}
func main() {
	hosts := []string{"192.168.17.95:2181"}
	conn, _, err := zk.Connect(hosts, time.Second*5)
	FailOnError("Failed to Connected to zookeeper", err)
	defer conn.Close()
	//获取/ip节点的状态信息
	_, state, err := conn.Get("/ip")
	FailOnError("Failed to Get node info", err)

	acl := zk.ACL{
		Scheme: "ip",
		Perms:  zk.PermAll,
		ID:     "192.168.17.1",
	}

	//为用户授权
	_, err = conn.SetACL("/ip", []zk.ACL{acl}, state.Aversion)
	FailOnError("Failed to SetACL", err)
	fmt.Println("节点[/ip]已对用户 192.168.17.1 授权")

	//获取以下节点的acl权限
	acls, _, err := conn.GetACL("/ip")
	FailOnError("Failed to Get node info", err)
	fmt.Println("acl 信息:")
	for i := 0; i < len(acls); i++ {
		fmt.Println("scheme =", acls[0].Scheme)
		fmt.Println("id =", acls[0].ID)
		fmt.Println("permissions =", acls[0].Perms)
	}
}

这里我用的是VMware的虚拟机的zookeeper然后用本地Windows去连接zookeeper发送的消息这个过程会经过虚拟路由转发,所以这里授权的ip地址是VMware虚拟网卡的地址,不然的话会报没有权限的错误。

在这里插入图片描述

在这里插入图片描述

6、watch机制

5.1、watch的事件类型

watch 用来实现发布/订阅功能,能够让多个订阅者同时监听某一个主题对象,当这个主题对象自身状态发生变化时,会通知所有订阅者。

每个 watch 仅有一次触发的机会,一旦触发会立即失效,想要持续监听,就需要一直注册。

go-zookeeper监听的事件类型分为五种:

  • zk.EventNodeCreated: 节点创建事件,需要watch一个不存在的节点,当节点被创建时触发,此watch通过conn.ExistsW(path string)设置
  • zk. EventNodeDeleted :节点删除事件,需要watch一个已存在的节点,当节点被移除时触发,此watch通过conn.ExistsW(path string)设置
  • zk. EventNodeDataChanged: 节点数据变化事件,此watch通过conn.GetW(path string) 以及 conn.ExistsW(path string) 设置
  • zk. EventNodeChildrenChanged : 子节点改变事件(数量改变),此watch通过conn.ChildrenW(path string)设置, 当path 下面增删子节点时触发(修改path下的子节点的内容时,不会触发通知)。
  • zk.EventNoWatching: watch移除事件,服务端出于某些原因不再为客户端watch节点时触发。

5.2、监听的方式

方式一、全局监听

全局监听的方式会在有监听事件发生时会执行监听器的回调函数

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/samuel/go-zookeeper/zk"
)

func main() {
	callbackOption := zk.WithEventCallback(callback)
	hosts := []string{"192.168.17.95:2181"}
	conn, _, err := zk.Connect(hosts, time.Second*5, callbackOption)
	if err != nil {
		fmt.Println("Failed to Connected to zookeeper")
	}
	//注册一个监听事件
	exists, state, _, err := conn.ExistsW("/global")
	if err != nil {
		log.Println(err)
	}
	//如果节点不存在则创建
	if !exists {
		//创建一个临时的global节点
		_, err = conn.Create("/global", []byte("globaltest"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
		if err != nil {
			log.Println(err)
		}
		//在注册一个监听事件监听/global节点的删除
		_, state, _, err = conn.ExistsW("/global")
		if err != nil {
			log.Println(err)
		}
	}
	err = conn.Delete("/global", state.Version)
	if err != nil {
		log.Println(err)
	}
	defer conn.Close()
}

// 监听事件的回调函数
func callback(event zk.Event) {
	fmt.Println("###########################")
	fmt.Println("path: ", event.Path)
	fmt.Println("type: ", event.Type.String())
	fmt.Println("state: ", event.State.String())
	fmt.Println("---------------------------")
}

在这里插入图片描述

测试结果

在这里插入图片描述

方式二、局部监听
package main

import (
	"fmt"
	"log"
	"time"

	"github.com/samuel/go-zookeeper/zk"
)

// 输出错误信息的函数
func FailOnError(msg string, err error) {
	if err != nil {
		log.Printf("%v : %v\n", msg, err)
	}
}
func main() {
	hosts := []string{"192.168.17.95:2181"}
	conn, _, err := zk.Connect(hosts, time.Second*5)
	if err != nil {
		fmt.Println("Failed to Connected to zookeeper")
	}
	//先等待连接上再启动监听携程
	time.Sleep(5 * time.Second)
	go watchNodeCreated("/partial", conn)
	go watchNodeDataChanged("/partial", conn)
	go watchNodeChildrenChanged("/partial", conn)
	go watchNodeDeleted("/partial", conn)
	defer conn.Close()
	//等待操作结束
	time.Sleep(1 * time.Hour)
}

// 监听节点的创建事件
func watchNodeCreated(path string, conn *zk.Conn) {
	log.Printf("watching node[%v] Create\n", path)
	for {
		//利用channel通信机制将Event数据传递给ch
		//ch:=make(chan Event)
		_, _, ch, err := conn.ExistsW(path)
		//当err为nil时ch才会有数据否者会阻塞协程
		if err == nil {
			e := <-ch
			if e.Type == zk.EventNodeCreated {
				log.Printf("Node[%v] Created\n", path)
			}
		} else {
			FailOnError("Failed to watchNodeCreated", err)
		}
	}
}

// 监听节点数据修改事件
func watchNodeDataChanged(path string, conn *zk.Conn) {
	log.Printf("watching node[%v] Data Change\n", path)
	for {
		_, _, ch, err := conn.GetW(path)
		if err == nil {
			e := <-ch
			if e.Type == zk.EventNodeDataChanged {
				log.Printf("Node[%v] Data Changed", path)
			}
		}
	}
}

// 监听节点子节点的修改事件
func watchNodeChildrenChanged(path string, conn *zk.Conn) {
	log.Printf("watching node[%v] Children Change", path)
	for {
		_, _, ch, err := conn.ChildrenW(path)
		if err == nil {
			e := <-ch
			if e.Type == zk.EventNodeChildrenChanged {
				log.Printf("Node[%v] Children Changed", path)
			}
		}
	}
}

// 监听节点的删除事件
func watchNodeDeleted(path string, conn *zk.Conn) {
	log.Printf("watching node[%v] Delete", path)
	for {
		_, _, ch, err := conn.ExistsW(path)
		if err == nil {
			e := <-ch
			if e.Type == zk.EventNodeDeleted {
				log.Printf("Node[%v] Deleted", path)
			}
		} else {
			FailOnError("Failed to watchNodeDeleted", err)
		}
	}
}

启动程序之后在虚拟机上运行客户端程序执行以下命令

在这里插入图片描述

程序会输出以下结果

在这里插入图片描述

7、go-zookeeper实现分布式锁

zookeeper的分布式锁可以利用每个节点的唯一性来完成,但所有服务监听一个节点对于分布式系统来说完全是资源浪费。而zookeeper可以利用临时顺序节点来创建一个有序的临时节点列表来完成分布式锁:
  1. 客户端获取锁时,在lock节点下创建临时顺序节点。
  2. 然后获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁之后,将该节点删除。
  3. 如果发现自己创建的子节点并非所有子节点中最小的,说明自己没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。
  4. 如果发现比自己小的那个节点删除,则客户端的Watcher会收到相应的同支,此时再次判断自己创建的节点是否时lock子节点中序号最小的,如果是则获取到锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听。

假如服务 A 创建了节点 a,此时节点 a 的前面没有节点,所以服务 A 可以执行。此时服务 B 创建了节点 b,节点 b 是节点 a 的下一个节点,那么服务 B 只需要监听节点 a 即可。

也就是说,因为临时有序节点列表是有序的,所以每个服务只需要监听自己创建的节点的前一个节点即可。

我们利用golang的goroutine来模拟客户端实现分布式锁的过程,以下是50个goroutine进行抢锁的示例:
package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/go-zookeeper/zk"
)

func main() {
	hosts := []string{"192.168.17.95:2181"}
	conn, _, err := zk.Connect(hosts, time.Second*5)
	if err != nil {
		fmt.Println("Failed to Connected to zookeeper")
	}
	var wg sync.WaitGroup

	for i := 0; i < 50; i++ {
		wg.Add(1)
		go func(n int) {
			defer wg.Done()
			//新建一个锁
			lock := zk.NewLock(conn, "/root/lock", zk.WorldACL(zk.PermAll))
			//加锁
			err = lock.LockWithData([]byte("it is a lock"))
			if err != nil {
				panic(err)
			}
			fmt.Println("第", n, "个 goroutine 获取到了锁")
			time.Sleep(time.Second) // 1 秒后释放锁
			//解锁
			lock.Unlock()
		}(i)
	}
	//等待协程运行结束
	wg.Wait()
}

运行结果如下(只截取了一部分)

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1119801.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

新年学新语言Go之三

一、前言 这一篇简单介绍一下Go中的数组、切片、map和指针。 二、数组 Go语言的数组和Java差不多都是定长的&#xff0c;用于存储有相同类型的元素&#xff0c;数组在内存中是连续分配的&#xff0c;索引数组中任意数据速度都非常快。 注&#xff1a;Go声明变量和其它强类型…

C++前缀和算法的应用:得到连续 K 个 1 的最少相邻交换次数 原理源码测试用例

本文涉及的基础知识点 C算法&#xff1a;前缀和、前缀乘积、前缀异或的原理、源码及测试用例 包括课程视频 滑动窗口 题目 给你一个整数数组 nums 和一个整数 k 。 nums 仅包含 0 和 1 。每一次移动&#xff0c;你可以选择 相邻 两个数字并将它们交换。 请你返回使 nums 中包…

FL Studio中文最新21破解版本水果软件下载

那么&#xff0c;大家知道编曲是什么吗&#xff1f;编曲和作曲又有什么区别呢&#xff1f; 一首歌的制作过程通常是由作词或作曲开始的&#xff0c;作曲就是运用基本乐理、和声学、复调、配器法、曲式结构的技术理论体系来表达创作者音乐思想的方法。说白了其实就是制作一首歌…

学信息系统项目管理师第4版系列34_10大管理49过程ITTO

整合管理 组 过程 输入 工具和技术 输出 启动 制定项目章程 立项管理文件协议事业环境因素组织过程资产 专家判断数据收集人际关系与团队技能会议 项目章程假设日志 计划 2.制定项目管理计划 项目章程其他知识领域规划过程的输出事业环境因素组织过程资产 专家…

【软考】9.5 排序算法原理

《直接插入排序》 针对少量数据的排序情况多次比较&#xff0c;一次插入 默认第一个元素为有序队列&#xff0c;依次与前面的元素进行比较&#xff0c;直到找到第一个小于他的值&#xff0c;才插入 《希尔排序》 缩小增量排序&#xff1b;针对大数据的排序情况分组&#xff0…

STM32cubemx对FreeRTOS的适配(工程模板配置)

文章目录 前言一、工程的创建二、什么是CMSIS三、STM32cubemx生成的FreeRTOS工程分析总结 前言 本篇文章将带大家使用STM32cubemx对FreeRTOS进行工程模板的配置。 一、工程的创建 1.开始工程的创建&#xff1a; 2.芯片型号选择&#xff1a; 3.修改时钟为TIM8&#xff1a; …

00-开源离线同步工具DataX3.0重磅详解!

1 概览 DataX 是一个异构数据源离线同步工具&#xff0c;致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。 1.1 设计理念 为了解决异构数据源同步问题&#xff0c;DataX将复杂的网状的同步链路变成了…

Tomcat部署项目的两种方式

第一种: 将项目放到tomcat的webapps目录下,war包会自动解压 里面有个页面 为什么会默认访问asd.html 可以配置 tomcat--->conf---->web.xml 第二种方式 在Tomcat/conf/Catalina/localhost/目录下随便建个xxx.xml文件 注意字符编码 utf-8 注意aaa就是上下文地址 …

【计算机网络笔记】OSI参考模型中端-端层(传输层、会话层、表示层、应用层)功能介绍

系列文章目录 什么是计算机网络&#xff1f; 什么是网络协议&#xff1f; 计算机网络的结构 数据交换之电路交换 数据交换之报文交换和分组交换 分组交换 vs 电路交换 计算机网络性能&#xff08;1&#xff09;——速率、带宽、延迟 计算机网络性能&#xff08;2&#xff09;…

C++笔记之遍历vector的所有方式

C笔记之遍历vector的所有方式 —— 2023年4月15日 上海 code review 文章目录 C笔记之遍历vector的所有方式1.普通for循环2.迭代器版3.const迭代器4.C11引入的范围for循环5.使用auto关键字和迭代器6.使用std::for_each算法7.使用std::for_each和lambda表达式8.普通版vector::at…

API接口随心搭,自由定制你的数据流

API接口是现代软件开发中非常重要的一部分。API接口可以帮助不同的系统和应用程序之间进行数据交换和通信。随着计算机网络技术的不断发展&#xff0c;API接口的适用范围也越来越广泛。如今&#xff0c;在互联网上许多网站和应用程序都提供了各种各样的API接口&#xff0c;供开…

html5语义化标签

目录 前言 什么是语义化标签 常见的语义化标签 语义化的好处 前言 HTML5 的设计目的是为了在移动设备上支持多媒体。之前网页如果想嵌入视频音频&#xff0c;需要用到 flash &#xff0c;但是苹果设备是不支持 flash 的&#xff0c;所以为了改变这一现状&#xff0c;html5 …

自然语言处理---Tr ansformer机制详解之Transformer结构

1 Encoder模块 1.1 Encoder模块的结构和作用 经典的Transformer结构中的Encoder模块包含6个Encoder Block.每个Encoder Block包含一个多头自注意力层&#xff0c;和一个前馈全连接层. 1.2 Encoder Block 在Transformer架构中&#xff0c;6个一模一样的Encoder …

《算法设计与分析(第4版)》笔记——第 1 章 算法入门

现在跟的是 b站黑马 的视频课&#xff0c;还是这个好哇 2023新版数据结构与算法Java视频教程&#xff08;上篇&#xff09; 2023新版数据结构与算法Java视频教程&#xff08;下篇&#xff09; 之前跟的是 青岛大学 张公敬教授 的《算法设计与分析》&#xff08;做了笔记就发出…

【德哥说库系列】-RHEL8环境源码编译安装MySQL8.0

&#x1f4e2;&#x1f4e2;&#x1f4e2;&#x1f4e3;&#x1f4e3;&#x1f4e3; 哈喽&#xff01;大家好&#xff0c;我是【IT邦德】&#xff0c;江湖人称jeames007&#xff0c;10余年DBA及大数据工作经验 一位上进心十足的【大数据领域博主】&#xff01;&#x1f61c;&am…

docker 安装 sftpgo

sftpgo 简介 sftpgo 是一个功能齐全且高度可配置的 SFTP 服务器&#xff0c;具有可选的 HTTP/S、FTP/S 和 WebDAV 支持。支持多种存储后端&#xff1a;本地文件系统、加密本地文件系统、S3&#xff08;兼容&#xff09;对象存储、Google 云存储、Azure Blob 存储、SFTP。 官…

香港科技大学广州|可持续能源与环境学域博士招生宣讲会—广州大学城专场!!!(暨全额奖学金政策)

香港科技大学广州&#xff5c;可持续能源与环境学域博士招生宣讲会—广州大学城专场&#xff01;&#xff01;&#xff01;&#xff08;暨全额奖学金政策&#xff09; “面向未来改变游戏规则的——可持续能源与环境学域” &#xfffd;&#xfffd;&#xfffd;专注于能源环…

【27】c++设计模式——>迭代器模式(遍历双向链表)(2)

//实现双向链表 #pragma once #include<iostream> #include<string> #include<vector> using namespace std;class Iterator; class ForwardIterator; class ReverseIterator;//链表的最小组成部分是一个节点&#xff0c;先实现一个节点 struct Node //c中st…

【每周一测】Java阶段二第五周学习

目录 1、关于static说法不正确的是&#xff1f; 2、以下关于继承的叙述正确的是&#xff08;&#xff09; 3、Restful风格传参用的注解是 4、反射可以访问私有成员变量和方法吗&#xff1f; 5、关于SqlSession的说法&#xff0c;说法正确的是 6、自定义SpringMvc拦截器时…

Web攻防04_MySQL注入_盲注

文章目录 MYSQL-SQL操作-增删改查盲注概念盲注分类盲注语句参考&更多盲注语句/函数 注入条件-数据回显&错误处理PHP开发项目-注入相关条件&#xff1a;基于延时&#xff1a;基于布尔&#xff1a;基于报错&#xff1a; CMS案例-插入报错&删除延时-PHP&MYSQL1、x…