typora-copy-images-to: imgs
Zookeeper的使用
1、Zookeeper简介
Apache ZooKeeper 是 Apache 软件基金会的一个软件项目,为大型分布式系统提供开源分布式配置服务、同步服务和命名注册。ZooKeeper原本是Hadoop的一个子项目,但现在它本身已经是一个顶级项目了。
zookeeper是经典的分布式数据一致性解决方案,致力于为分布式应用提供一个高性能,高可用,且具有严格顺序访问控制能力的分布式协调存储服务。
2、使用Docker快速部署zookeeper
2.1、Docker官方镜像
Docker Zookeeper
2.2、Docker安装zookeeper
下载zookeeper最新版的镜像
docker search zookeeper
docker pull zookeeper
docker images
docker inspect zookeeper
docker inspect zookeeper用来查看zookeeper的详细信息
在/root/docker/目录下新建一个zookeeper挂载点文件夹
mkdir /root/docker/zookeeper
挂载本地文件夹并启动服务
docker run -e TZ="Asia/Shanghai" -d -p 2181:2181 -v /root/docker/zookeeper:/data --name zookeeper --restart always zookeeper
参数解释
-e TZ="Asia/Shanghai" # 指定上海时区
-d # 指示后台运行容器
-p 2181:2181 # 端口映射,前面的端口为本地的2181端口,后者为容器内的端口
--name # 设置创建的容器的名称
-v # 挂在文件,将本地目录或文件挂在到容器指定目录
--restart always # 始终重新启动zookeeper
2.3、进入zookeeper容器客户端
方式一
docker run -it --rm --link zookeeper:zookeeper zookeeper zkCli.sh -server zookeeper
运行上诉命令后会进入到zkCli
方式二
前台进入zookeeper容器执行脚本新建一个Client
docker exec -it zookeeper bash //进入zookeeper容器,退出时不会关闭容器
./bin/zkCli.sh //执行脚本新建一个Client
3、docker构建zookeeper集群
3.1、创建docker-compose.yml文件
cd /root/docker/docker-compose/zookeeper # 进入你想要存放docker-compose.yml的文件
vim docker-compose.yml # 把下面的代码复制到docker-compose.yml文件中
version: 'latest'
services:
zoo1:
image: zookeeper
restart: always
hostname: zoo1
container_name: zoo1
ports:
- 2181:2181
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
volumes:
- /root/docker/docker-compose/zoo1/data:/data
- /root/docker/docker-compose/zoo1/datalog:/datalog
zoo2:
image: zookeeper
restart: always
hostname: zoo2
container_name: zoo2
ports:
- 2182:2181
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
volumes:
- /root/docker/docker-compose/zoo2/data:/data
- /root/docker/docker-compose/zoo2/datalog:/datalog
zoo3:
image: zookeeper
restart: always
hostname: zoo3
container_name: zoo3
ports:
- 2183:2181
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
volumes:
- /root/docker/docker-compose/zoo3/data:/data
- /root/docker/docker-compose/zoo3/datalog:/datalog
volumes表示的是文件映射要根据自己的情况进行修改
3.2、执行构建集群命令
3.2.1、安装Docker Compose(Linux下)
我们先要搭建以下docker-compose的环境,执行以下命令下载Docker Compose,要更改版本的话替换v2.2.2就好
sudo curl -L "https://github.com/docker/compose/releases/download/v2.2.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
赋予可执行权限
sudo chmod +x /usr/local/bin/docker-compose
创建软链接
sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose
3.2.2、启动docker-compose.yml文件
在执行命令之前要保证2181-2183端口没有被占用
cd /root/docker/docker-compose/zookeeper # 到docker-compose.yml所在的目录
docker-compose up -d
3.2.3、验证集群是否搭建成功
docker exec -it zoo1 bash
zkServer.sh status
看到如下信息表示集群已经搭建成功,可以看出zoo1的角色是follower,查看其他的会发现zoo2是follower,zoo3是leader。
4、Zookeeper的简单使用
3.1、Go语言与Zookeeper服务端建立连接
3.1.1、建立连接的代码
package main
import (
"fmt"
"log"
"time"
"github.com/samuel/go-zookeeper/zk"
)
//打印节点状态信息的函数
func StatePrintf(state *zk.Stat) {
fmt.Println("State->")
fmt.Printf("Czxid = %v,\nMzxid = %v,\nCtime = %v,\nMtime = %v,\nVersion = %v,\nCversion = %v,\nAversion = %v,\nEphemeralOwner = %v,\nDataLength = %v,\nNumChildren = %v,\nPzxid = %v,\n",
state.Czxid, //创建该节点的zxid
state.Mzxid, //最后一个修改该节点的zxid
state.Ctime, //创建该节点的时间
state.Mtime, //最后一次修改该节点的时间
state.Version, //修改该节点数据的次数
state.Cversion, //修改该节点儿子节点的次数
state.Aversion, //修改ACL的次数
state.EphemeralOwner, //创建该临时节点的会话id
state.DataLength, //节点数据的长度
state.NumChildren, //该节点的儿子节点的数量
state.Pzxid, //最后一个修改的儿子节点的zxid(当创建或者删除子节点时才会改变)
)
}
//输出错误信息的函数
func FailOnError(msg string, err error) {
if err != nil {
log.Fatalf("%v : %v\n", msg, err)
}
}
func main() {
hosts := []string{"192.168.17.95:2181"}
conn, _, err := zk.Connect(hosts, time.Second*5)
if err != nil {
fmt.Println("Failed to Connected to zookeeper")
}
CreateNode(conn)
SetTest(conn)
GetTest(conn)
DeleteTest(conn)
//休眠5分钟再关闭,以便看到创建的临时节点
time.Sleep(5 * time.Minute)
defer conn.Close()
}
3.1.2、关于zxid的一些解释
Zookeeper中的**zxid**是一个长为64位的数字。高32位用来表示当前Leader的周期,低32位用来表示当前请求产生的事务在当前Leader周期内的顺序。每产生一个新的事务,zxid的低32位就会自动加1。当zxid达到最大值,即zxid的低32位达到`0xffffffff`,就会触发集群强制选主,Leader变更后高32位都会自增1,并重置zxid低32位的计数值(zxid高32位变为新Leader的周期,低32位变为0)。
如果一个zookeeper集群每秒能操作10000次,即10k/s ops,那么
2^32/(86400*10000)≈4.97天
也就是说4.97天之后就会进行自动切主的操作,对于一些服务来说平均五天切一次主是难以容许的,我们可以重新设计zxid,增加低位技术的位数到自己需要的值,假设64位全部用做低位计数。
2^64/(86400*10000)≈21350398233.46天,即58494241.73年
一般来说集群不可能可靠的运行这么多年,所以重新设计zxid还是要根据业务需求来进行。
3.2、创建节点
3.2.1、zkCli操作
create /节点路径 value # 可以在创建节点的同时设置节点的值,创建的节点是持久化的节点
create -e /节点路径 value # 创建临时节点,在客户端断开后会自动删除的节点
create -s /节点路径 value # 创建顺序节点,zookeeper会自动在节点路径后面加顺序递增的编号
直接创建节点
临时节点,顺序节点,quit退出之后再进入刚创建的temp节点会消失。
3.2.2、Go语言API操作
// 创建节点
func CreateNode(conn *zk.Conn) {
//创建永久节点
path, err := conn.Create("/app3", []byte("zhangsan"), 0, zk.WorldACL(zk.PermAll))
FailOnError("Failed to Create node", err)
fmt.Printf("Created node path[%v]\n", path)
//创建临时节点,在会话结束时会自动删除临时节点
ephemeral, err := conn.Create("/ephemeral", nil, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
FailOnError("Failed to Create ephemeral node", err)
fmt.Printf("Created ephemeral node path[%v]\n", ephemeral)
//创建顺序节点
sequence, err := conn.Create("/sequence", nil, zk.FlagSequence, zk.WorldACL(zk.PermAll))
FailOnError("Failed to Create sequence node", err)
fmt.Printf("Created sequence node path[%v]\n", sequence)
//创建临时顺序节点 create -es /ephemeralsequece
ephemeralsequece, err := conn.Create("/ephemeralsequece", nil, zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll))
FailOnError("Failed to Create ephemeralsequece node", err)
fmt.Printf("Created ephemeralsequece node path[%v]\n", ephemeralsequece)
}
3.3、修改节点
3.3.1、zkCli操作
set /节点路径 value
示例如下
3.3.2、Go语言API操作
// Set操作
func SetTest(conn *zk.Conn) {
//获取节点的version信息
_, state, _ := conn.Get("/app3")
//set /app3 lisi
state, err := conn.Set("/app3", []byte("lisi"), state.Version)
FailOnError("Failed to Set Value", err)
StatePrintf(state)
//获取修改后的值
value, _, err := conn.Get("/app3")
FailOnError("Failed to Get New value", err)
fmt.Println("New Value = ", value)
}
3.4、查询节点
3.4.1、zkCli操作
ls 目录 # 查看目录下的所有子节点
get /节点路径
ls -s 目录 # 查看目录的所有详细信息
示例如下
3.4.2、Go语言API操作
// 查询节点
func GetTest(conn *zk.Conn) {
result, state, err := conn.Get("/app3")
//获取子节点
//children,state,err:=conn.Children("/app3")
FailOnError("Failed to Get Node Info", err)
fmt.Printf("result:[%v]\n", string(result))
StatePrintf(state)
}
3.5、删除节点
3.5.1、zkCli操作
delete /节点路径 # 删除单个节点
deleteall /节点路径 # 删除带有子节点的节点
示例如下
3.5.2、Go语言API操作
// 删除节点
func DeleteTest(conn *zk.Conn) {
path := "/app3"
//先判断节点存不存在
exists, state, _ := conn.Exists(path)
fmt.Printf("path[%s] exists:%v\n", path, exists)
//删除节点
err := conn.Delete("/app3", state.Version)
FailOnError("Failed to Delete node", err)
fmt.Printf("path[%s] is deleted.", path)
exists, _, _ = conn.Exists(path)
fmt.Printf("path[%s] exists: %v\n", path, exists)
}
5、go-zookeeper权限(ACL)
zookeeper的节点有五种权限:Create、Read、Write、Delete、Admin。
ACL权限由schema:id:permissions
组成
schema
有四种方式
- world
- auth
- digest
- ip
下面对这四种方式都测试一遍
4.1、world
默认方式,相当于全世界都能访问。
将/app3
节点的权限修改为 crwa
后尝试删除其子节点 /p1
:
package main
import (
"fmt"
"log"
"time"
"github.com/go-zookeeper/zk"
)
func FailOnError(msg string, err error) {
if err != nil {
log.Fatalf("%v : %v\n", msg, err)
}
}
func main() {
hosts := []string{"192.168.17.95:2181"}
conn, _, err := zk.Connect(hosts, time.Second*5)
FailOnError("Failed to Connected to zookeeper", err)
defer conn.Close()
//获取/app3节点的acl信息
acl, state, err := conn.GetACL("/app3")
FailOnError("Failed to GetACL", err)
fmt.Println("\nget acl:")
fmt.Println("scheme =", acl[0].Scheme)
fmt.Println("id =", acl[0].ID)
fmt.Println("permissions =", acl[0].Perms)
//修改/app3节点的权限修改为crwa
perms := zk.PermCreate | zk.PermRead | zk.PermWrite | zk.PermAdmin
_, err = conn.SetACL("/app3", zk.WorldACL(int32(perms)), state.Aversion)
FailOnError("Failed to SetACL", err)
fmt.Println("SetAcl successful.")
//create child node
_, err = conn.Create("/app3/p1", nil, 0, zk.WorldACL(zk.PermAll))
FailOnError("Failed to Create node", err)
//get state of child node
_, state, err = conn.Get("/app3/p1")
FailOnError("Failed to Get node info", err)
//delete /app3/p1
err = conn.Delete("/app3/p1", state.Version)
FailOnError("Failed to Delete Node", err)
}
测试结果如下:因为我们没有赋予/app3节点Delete权限,即使子结点/p1赋予了全部权限也不能删除该子节点。
4.2、auth
auth 用来授予用户权限,所以需要先创建用户。
package main
import (
"fmt"
"log"
"time"
"github.com/go-zookeeper/zk"
)
func FailOnError(msg string, err error) {
if err != nil {
log.Fatalf("%v : %v\n", msg, err)
}
}
func main() {
hosts := []string{"192.168.17.95:2181"}
conn, _, err := zk.Connect(hosts, time.Second*5)
FailOnError("Failed to Connected to zookeeper", err)
defer conn.Close()
//获取/auth节点的状态信息
_, state, err := conn.Get("/auth")
FailOnError("Failed to Get node info", err)
//用户授权,用户不存在的话会新建
err = conn.AddAuth("digest", []byte("user1:123456"))
FailOnError("Failed to AddAuth", err)
acl := zk.ACL{
Scheme: "auth",
Perms: zk.PermAll,
ID: "user1:123456",
}
//为用户授权
_, err = conn.SetACL("/auth", []zk.ACL{acl}, state.Version)
FailOnError("Failed to SetACL", err)
fmt.Println("AddAuthSuccess")
}
授权成功之后如果在其他连接中要查询节点信息要先验证用户信息才能进入下一步操作,也就是把conn.AddAuth操作提前,如果使用不正确的用户名和密码,得到的会是同样的用户认证失败的结果。
package main
import (
"fmt"
"log"
"time"
"github.com/go-zookeeper/zk"
)
func FailOnError(msg string, err error) {
if err != nil {
log.Fatalf("%v : %v\n", msg, err)
}
}
func main() {
hosts := []string{"192.168.17.95:2181"}
conn, _, err := zk.Connect(hosts, time.Second*5)
FailOnError("Failed to Connected to zookeeper", err)
defer conn.Close()
//先进行访问/auth节点的话会报错
_, _, err = conn.Get("/auth")
if err != nil {
fmt.Println("Get Node info error:", err)
}
//要先进行AddAuth操作
err = conn.AddAuth("digest", []byte("user1:123456"))
FailOnError("Failed to Add Auth", err)
//再获取/auth节点的信息就不会报错了
acl, _, err := conn.GetACL("/auth")
FailOnError("Failed to Get node info", err)
fmt.Println("acl 信息:")
for i := 0; i < len(acl); i++ {
fmt.Println("scheme =", acl[0].Scheme)
fmt.Println("id =", acl[0].ID)
fmt.Println("permissions =", acl[0].Perms)
}
}
从测试结果来看在未进行AddAuth操作时我们是获取不到/auth节点信息的,节点的密码返回的是加密后的密码。
4.3、digest
digest
与auth
基本相同,唯一的区别在于设置权限时,密码需要使用密文。
zk golang 库中有专为digest
构造的方法:
zk.DigestACL(perms int32, user, password string)
此方法传入的密码需要是明文,其内部逻辑会将明文转为密文再向 zookeeper 传递。
使用示例:
package main
import (
"fmt"
"log"
"time"
"github.com/go-zookeeper/zk"
)
func FailOnError(msg string, err error) {
if err != nil {
log.Fatalf("%v : %v\n", msg, err)
}
}
func main() {
hosts := []string{"192.168.17.95:2181"}
conn, _, err := zk.Connect(hosts, time.Second*5)
FailOnError("Failed to Connected to zookeeper", err)
defer conn.Close()
//获取/digest
_, state, err := conn.Get("/digest")
FailOnError("Failed to Get node info", err)
//用户授权,用户不存在的话会新建
err = conn.AddAuth("digest", []byte("user1:123456"))
FailOnError("Failed to AddAuth", err)
//zk.DigestACL会将传入的明文转换成密文acl
acl := zk.DigestACL(zk.PermAll, "user1", "123456")
//为用户授权
_, err = conn.SetACL("/digest", acl, state.Version)
FailOnError("Failed to SetACL", err)
fmt.Println("节点[/digest]已对用户 user1 授权")
}
4.4、ip
ip 权限顾名思义,就是限制 ip 地址的访问权限。
把节点的权限设置给指定的 ip 地址后,其他 ip 将无法访问该节点。
package main
import (
"fmt"
"log"
"time"
"github.com/go-zookeeper/zk"
)
func FailOnError(msg string, err error) {
if err != nil {
log.Fatalf("%v : %v\n", msg, err)
}
}
func main() {
hosts := []string{"192.168.17.95:2181"}
conn, _, err := zk.Connect(hosts, time.Second*5)
FailOnError("Failed to Connected to zookeeper", err)
defer conn.Close()
//获取/ip节点的状态信息
_, state, err := conn.Get("/ip")
FailOnError("Failed to Get node info", err)
acl := zk.ACL{
Scheme: "ip",
Perms: zk.PermAll,
ID: "192.168.17.1",
}
//为用户授权
_, err = conn.SetACL("/ip", []zk.ACL{acl}, state.Aversion)
FailOnError("Failed to SetACL", err)
fmt.Println("节点[/ip]已对用户 192.168.17.1 授权")
//获取以下节点的acl权限
acls, _, err := conn.GetACL("/ip")
FailOnError("Failed to Get node info", err)
fmt.Println("acl 信息:")
for i := 0; i < len(acls); i++ {
fmt.Println("scheme =", acls[0].Scheme)
fmt.Println("id =", acls[0].ID)
fmt.Println("permissions =", acls[0].Perms)
}
}
这里我用的是VMware的虚拟机的zookeeper然后用本地Windows去连接zookeeper发送的消息这个过程会经过虚拟路由转发,所以这里授权的ip地址是VMware虚拟网卡的地址,不然的话会报没有权限的错误。
6、watch机制
5.1、watch的事件类型
watch 用来实现发布/订阅功能,能够让多个订阅者同时监听某一个主题对象,当这个主题对象自身状态发生变化时,会通知所有订阅者。
每个 watch 仅有一次触发的机会,一旦触发会立即失效,想要持续监听,就需要一直注册。
go-zookeeper监听的事件类型分为五种:
- zk.EventNodeCreated: 节点创建事件,需要watch一个不存在的节点,当节点被创建时触发,此watch通过conn.ExistsW(path string)设置
- zk. EventNodeDeleted :节点删除事件,需要watch一个已存在的节点,当节点被移除时触发,此watch通过conn.ExistsW(path string)设置
- zk. EventNodeDataChanged: 节点数据变化事件,此watch通过conn.GetW(path string) 以及 conn.ExistsW(path string) 设置
- zk. EventNodeChildrenChanged : 子节点改变事件(数量改变),此watch通过conn.ChildrenW(path string)设置, 当path 下面增删子节点时触发(修改path下的子节点的内容时,不会触发通知)。
- zk.EventNoWatching: watch移除事件,服务端出于某些原因不再为客户端watch节点时触发。
5.2、监听的方式
方式一、全局监听
全局监听的方式会在有监听事件发生时会执行监听器的回调函数
package main
import (
"fmt"
"log"
"time"
"github.com/samuel/go-zookeeper/zk"
)
func main() {
callbackOption := zk.WithEventCallback(callback)
hosts := []string{"192.168.17.95:2181"}
conn, _, err := zk.Connect(hosts, time.Second*5, callbackOption)
if err != nil {
fmt.Println("Failed to Connected to zookeeper")
}
//注册一个监听事件
exists, state, _, err := conn.ExistsW("/global")
if err != nil {
log.Println(err)
}
//如果节点不存在则创建
if !exists {
//创建一个临时的global节点
_, err = conn.Create("/global", []byte("globaltest"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
if err != nil {
log.Println(err)
}
//在注册一个监听事件监听/global节点的删除
_, state, _, err = conn.ExistsW("/global")
if err != nil {
log.Println(err)
}
}
err = conn.Delete("/global", state.Version)
if err != nil {
log.Println(err)
}
defer conn.Close()
}
// 监听事件的回调函数
func callback(event zk.Event) {
fmt.Println("###########################")
fmt.Println("path: ", event.Path)
fmt.Println("type: ", event.Type.String())
fmt.Println("state: ", event.State.String())
fmt.Println("---------------------------")
}
测试结果
方式二、局部监听
package main
import (
"fmt"
"log"
"time"
"github.com/samuel/go-zookeeper/zk"
)
// 输出错误信息的函数
func FailOnError(msg string, err error) {
if err != nil {
log.Printf("%v : %v\n", msg, err)
}
}
func main() {
hosts := []string{"192.168.17.95:2181"}
conn, _, err := zk.Connect(hosts, time.Second*5)
if err != nil {
fmt.Println("Failed to Connected to zookeeper")
}
//先等待连接上再启动监听携程
time.Sleep(5 * time.Second)
go watchNodeCreated("/partial", conn)
go watchNodeDataChanged("/partial", conn)
go watchNodeChildrenChanged("/partial", conn)
go watchNodeDeleted("/partial", conn)
defer conn.Close()
//等待操作结束
time.Sleep(1 * time.Hour)
}
// 监听节点的创建事件
func watchNodeCreated(path string, conn *zk.Conn) {
log.Printf("watching node[%v] Create\n", path)
for {
//利用channel通信机制将Event数据传递给ch
//ch:=make(chan Event)
_, _, ch, err := conn.ExistsW(path)
//当err为nil时ch才会有数据否者会阻塞协程
if err == nil {
e := <-ch
if e.Type == zk.EventNodeCreated {
log.Printf("Node[%v] Created\n", path)
}
} else {
FailOnError("Failed to watchNodeCreated", err)
}
}
}
// 监听节点数据修改事件
func watchNodeDataChanged(path string, conn *zk.Conn) {
log.Printf("watching node[%v] Data Change\n", path)
for {
_, _, ch, err := conn.GetW(path)
if err == nil {
e := <-ch
if e.Type == zk.EventNodeDataChanged {
log.Printf("Node[%v] Data Changed", path)
}
}
}
}
// 监听节点子节点的修改事件
func watchNodeChildrenChanged(path string, conn *zk.Conn) {
log.Printf("watching node[%v] Children Change", path)
for {
_, _, ch, err := conn.ChildrenW(path)
if err == nil {
e := <-ch
if e.Type == zk.EventNodeChildrenChanged {
log.Printf("Node[%v] Children Changed", path)
}
}
}
}
// 监听节点的删除事件
func watchNodeDeleted(path string, conn *zk.Conn) {
log.Printf("watching node[%v] Delete", path)
for {
_, _, ch, err := conn.ExistsW(path)
if err == nil {
e := <-ch
if e.Type == zk.EventNodeDeleted {
log.Printf("Node[%v] Deleted", path)
}
} else {
FailOnError("Failed to watchNodeDeleted", err)
}
}
}
启动程序之后在虚拟机上运行客户端程序执行以下命令
程序会输出以下结果
7、go-zookeeper实现分布式锁
zookeeper的分布式锁可以利用每个节点的唯一性来完成,但所有服务监听一个节点对于分布式系统来说完全是资源浪费。而zookeeper可以利用临时顺序节点来创建一个有序的临时节点列表来完成分布式锁:
- 客户端获取锁时,在lock节点下创建临时顺序节点。
- 然后获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁之后,将该节点删除。
- 如果发现自己创建的子节点并非所有子节点中最小的,说明自己没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。
- 如果发现比自己小的那个节点删除,则客户端的Watcher会收到相应的同支,此时再次判断自己创建的节点是否时lock子节点中序号最小的,如果是则获取到锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听。
假如服务 A 创建了节点 a,此时节点 a 的前面没有节点,所以服务 A 可以执行。此时服务 B 创建了节点 b,节点 b 是节点 a 的下一个节点,那么服务 B 只需要监听节点 a 即可。
也就是说,因为临时有序节点列表是有序的,所以每个服务只需要监听自己创建的节点的前一个节点即可。
我们利用golang的goroutine来模拟客户端实现分布式锁的过程,以下是50个goroutine进行抢锁的示例:
package main
import (
"fmt"
"sync"
"time"
"github.com/go-zookeeper/zk"
)
func main() {
hosts := []string{"192.168.17.95:2181"}
conn, _, err := zk.Connect(hosts, time.Second*5)
if err != nil {
fmt.Println("Failed to Connected to zookeeper")
}
var wg sync.WaitGroup
for i := 0; i < 50; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
//新建一个锁
lock := zk.NewLock(conn, "/root/lock", zk.WorldACL(zk.PermAll))
//加锁
err = lock.LockWithData([]byte("it is a lock"))
if err != nil {
panic(err)
}
fmt.Println("第", n, "个 goroutine 获取到了锁")
time.Sleep(time.Second) // 1 秒后释放锁
//解锁
lock.Unlock()
}(i)
}
//等待协程运行结束
wg.Wait()
}
运行结果如下(只截取了一部分)