etcd
Go 操作 Etcd 参考
go get go.etcd.io/etcd/client/v3
-
民间文档:http://www.topgoer.com/%E6%95%B0%E6%8D%AE%E5%BA%93%E6%93%8D%E4%BD%9C/go%E6%93%8D%E4%BD%9Cetcd/%E6%93%8D%E4%BD%9Cetcd.html
-
官方文档:https://github.com/etcd-io/etcd/blob/main/client/v3/README.md
单机安装
下载地址 https://github.com/etcd-io/etcd/releases,解压后查看版本
etcd --version
etcdctl version
创建配置文件:conf.yaml
name: "hezebin-etcd"
data-dir: /usr/local/etcd/data
listen-client-urls: "http://127.0.0.1:2379"
advertise-client-urls: "http://127.0.0.1:2379"
initial-cluster-token: "hezebin-etcd-token"
initial-cluster: "hezebin-etcd=http://127.0.0.1:2380"
initial-advertise-peer-urls: "http://127.0.0.1:2380"
initial-cluster-state: "new"
启动 etcd 服务:
etcd --config-file=/usr/local/etcd/conf.yaml
打开终端,执行以下命令来检查 etcd 服务的健康状态:
etcdctl endpoint health
如果一切正常,您将看到类似以下的输出:
127.0.0.1:2379 is healthy: successfully committed proposal: took = 11.667µs
如果 etcd 服务正在运行且健康,您会收到健康状态的确认。
您还可以通过执行以下命令来查看 etcd 集群的成员状态:
etcdctl member list
这将列出 etcd 集群中的成员信息,包括成员的 ID、名称和状态。
设置键值对
etcdctl put name "hezebin"
etcdctl get name
# 查询Etcd所有的key
etcdctl get --prefix ""
# 只读取键 name 的值的命令
etcdctl get name --print-value-only
# 以 name 为前缀的所有键的命令,结果数量限制为2:
etcdctl get --prefix --limit=2 name
# 访问修订版本为 4 时的键的版本
etcdctl get --prefix --rev=4 name
# 假设 etcd 集群已经有下列键:
a = 123
b = 456
z = 789
# 读取大于等于键 b 的 byte 值的键的命令:
etcdctl get --from-key b
# 清空数据
etcdctl del name
# 删除所有 n 前缀的节点
etcdctl del n -- prefix
# 删除键 zoo 并返回被删除的键值对的命令:
etcdctl del --prev-kv zoo
watch操作
watch 监测一个键值的变化,一旦键值发生更新,就会输出最新的值并退出
etcdctl watch name
#更新键值
etcdctl put name "korbin"
# watch 阻塞处返回结果
PUT
name
korbin
压缩修订版本
如我们提到的,etcd 保存修订版本以便应用可以读取键的过往版本。但是,为了避免积累无限数量的历史数据,压缩过往的修订版本就变得很重要。压缩之后,etcd 删除历史修订版本,释放资源来提供未来使用。所有修订版本在压缩修订版本之前的被替代的数据将不可访问。
这是压缩修订版本的命令:
$ etcdctl compact 5
compacted revision 5
# 在压缩修订版本之前的任何修订版本都不可访问
$ etcdctl get --rev=4 foo
Error: rpc error: code = 11 desc = etcdserver: mvcc: required revision has been compacted
注意: etcd 服务器的当前修订版本可以在任何键(存在或者不存在)以json格式使用get命令来找到。下面展示的例子中 mykey 是在 etcd 服务器中不存在的:
$ etcdctl get mykey -w=json
{"header":{"cluster_id":14841639068965178418,"member_id":10276657743932975437,"revision":15,"raft_term":4}}
lease租约(过期机制)
应用可以为 etcd 集群里面的键授予租约。当键被附加到租约时,它的存活时间被绑定到租约的存活时间,而租约的存活时间相应的被 time-to-live
(TTL)管理。在租约授予时每个租约的最小TTL值由应用指定。租约的实际 TTL 值是不低于最小 TTL,由 etcd 集群选择。一旦租约的 TTL 到期,租约就过期并且所有附带的键都将被删除。
授予租约
# 授予租约,TTL为10秒
$ etcdctl lease grant 10
lease 32695410dcc0ca06 granted with TTL(10s)
# 附加键 foo 到租约32695410dcc0ca06
$ etcdctl put --lease=32695410dcc0ca06 foo barOK
应用通过租约 id 可以撤销租约。撤销租约将删除所有它附带的 key。
撤销租约
$ etcdctl lease revoke 32695410dcc0ca06
lease 32695410dcc0ca06 revoked
$ etcdctl get foo
# 空应答,因为租约撤销导致foo被删除
keepAlive续约
应用可以通过刷新键的 TTL 来维持租约,以便租约不过期。维持同一个租约的命令:
$ etcdctl lease keep-alive 32695410dcc0ca06
lease 32695410dcc0ca06 keepalived with TTL(10)
lease 32695410dcc0ca06 keepalived with TTL(10)
lease 32695410dcc0ca06 keepalived with TTL(10)
...
获取租约信息
应用程序可能想知道租约信息,以便可以更新或检查租约是否仍然存在或已过期。应用程序也可能想知道有那些键附加到了特定租约。
获取租约信息的命令:
$ etcdctl lease timetolive 694d5765fc71500b
lease 694d5765fc71500b granted with TTL(500s), remaining(258s)
获取租约信息和租约附带的键的命令:
$ etcdctl lease timetolive --keys 694d5765fc71500b
lease 694d5765fc71500b granted with TTL(500s), remaining(132s), attached keys([zoo2 zoo1])
# 如果租约已经过期或者不存在,它将给出下面的应答:
Error: etcdserver: requested lease not found
事务
etcd 的事务(Transaction)机制允许你在一个原子操作中执行一系列操作,这些操作要么全部成功,要么全部失败,确保数据的一致性和完整性。
etcdctl txn 并没有对事务提供过多的支持,执行事务最好通过 go 来实现:
go get go.etcd.io/etcd/client/v3
package main
import (
"context"
"fmt"
"log"
"time"
"go.etcd.io/etcd/clientv3"
)
func main() {
// 创建 etcd 客户端
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"http://localhost:2379"}, // 替换为你的 etcd 地址
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// 创建一个 etcd 事务
etcdTxn := clientv3.NewKV(cli).Txn(context.Background())
// 定义事务的比较和操作
etcdTxn.If(
clientv3.Compare(clientv3.Value("my_key"), "=", "10"),
).
Then(
clientv3.OpPut("my_key", "20"),
).
Else(
clientv3.OpPut("my_key", "30"),
)
// 提交事务
txnResp, err := etcdTxn.Commit()
if err != nil {
log.Fatal(err)
}
// 检查事务是否成功
if !txnResp.Succeeded {
fmt.Println("Transaction failed")
} else {
fmt.Println("Transaction succeeded")
}
}
注意:etcdTxn.If
中指定的条件是 clientv3.Compare(clientv3.Value("my_key"), "=", "10")
,也就是检查键 “my_key” 的值是否等于 “10”。如果这个条件不满足(例如,键 “my_key” 的值为空),那么事务的 Else
分支会执行,而且整个事务会被标记为失败。
基于etcd实现分布式锁
原生实现
要在 etcd 中实现分布式锁,你可以利用 etcd 的事务特性和租约(Lease)机制。以下是一个使用 etcd 实现分布式锁的示例:
# 伪代码,etcdctl 并不支持下述命令,需要以 go 方式实现,api 更丰富
etcdctl txn -- \
put my_lock some_value --lease=0d7689bc575d6611 \
if_not_exists
# if_not_exists: 这是一个条件,表示只有在键不存在时才执行上述的 put 操作。这个条件确保只有第一次创建节点的时候才会成功,从而实现锁的获取。
这个命令的目的是在一个 etcd 事务中,尝试创建一个指定键名的键值对,如果该键名在 etcd 中尚不存在(即尝试获取锁),则创建成功,否则操作失败。这个操作模式可以帮助实现分布式锁的基本机制。
Go 代码实现:
package main
import (
"context"
"fmt"
"log"
"time"
"go.etcd.io/etcd/clientv3"
)
func main() {
// 创建 etcd 客户端
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"http://localhost:2379"}, // 替换为你的 etcd 地址
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// 创建租约
leaseResp, err := cli.Grant(context.Background(), 10) // 10 秒的租约时间
if err != nil {
log.Fatal(err)
}
// 锁的键名
lockKey := "my_lock"
// 尝试获取锁
txnResp, err := cli.Txn(context.Background()).
If(clientv3.Compare(clientv3.CreateRevision(lockKey), "=", 0)).
Then(clientv3.OpPut(lockKey, "lock_holder", clientv3.WithLease(leaseResp.ID))).
Commit()
if err != nil {
log.Fatal(err)
}
// 检查是否成功获取锁
if !txnResp.Succeeded {
log.Println("Failed to acquire lock")
return
}
log.Println("Lock acquired")
// 续约循环(保持锁)
keepAliveCh, err := cli.KeepAlive(context.Background(), leaseResp.ID)
if err != nil {
log.Fatal(err)
}
go func() {
for range keepAliveCh {
// 续约成功,执行你的逻辑,或者也可以不做任何处理
}
}()
// 在这里执行需要保护的临界区代码
// 释放锁(取消续约)
_, err = cli.Revoke(context.Background(), leaseResp.ID)
if err != nil {
log.Fatal(err)
}
log.Println("Lock released")
}
Go 封装实现
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// 创建两个单独的会话用来演示锁竞争
s1, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s1.Close()
m1 := concurrency.NewMutex(s1, "/my-lock/")
s2, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s2.Close()
m2 := concurrency.NewMutex(s2, "/my-lock/")
// 会话s1获取锁
if err := m1.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("acquired lock for s1")
m2Locked := make(chan struct{})
go func() {
defer close(m2Locked)
// 等待直到会话s1释放了/my-lock/的锁
if err := m2.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
}()
if err := m1.Unlock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("released lock for s1")
<-m2Locked
fmt.Println("acquired lock for s2")
通过源码可以发现concurrency包的实现原理为执行一个 key 的前缀,并在最终设置到 etcd key 的尾部拼接上 /lease_id,租约为 60 秒,且每隔 20 秒续租一次。
在 tryAcquire 函数中通过 put 上述的键值,判断版本号,若设置成功则拿到锁,否则阻塞等待锁:
阻塞期间先通过查询一次 key 是否存在判断是否阻塞,若不存在表示拿到锁,结束阻塞;存在则通过 watch 监听 key 的变更,仅允许DELETE类型,其他变更操作会报错,并做强制解锁。
服务注册与发现
当使用 etcd 实现服务注册与发现时,通常需要以下步骤:
- 引入 etcd 的 Go 客户端库
- 连接到 etcd 服务器
- 注册服务:将服务的信息写入 etcd 中
- 发现服务:从 etcd 中获取已注册的服务信息
以下是一个简单示例,演示如何使用 Go 语言操作 etcd 实现基本的服务注册与发现功能。请确保已经安装了 etcd 并启动了 etcd 服务器。
package main
import (
"context"
"fmt"
"log"
"time"
"go.etcd.io/etcd/clientv3"
)
func main() {
// 创建 etcd 客户端
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"}, // etcd 服务器地址
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// 注册服务
serviceName := "my-service"
serviceIP := "192.168.1.100"
servicePort := "8080"
serviceKey := fmt.Sprintf("/services/%s/%s:%s", serviceName, serviceIP, servicePort)
serviceValue := "some-metadata-about-the-service"
ctx := context.Background()
_, err = cli.Put(ctx, serviceKey, serviceValue)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Service registered: %s\n", serviceKey)
// 发现服务
discoveryKey := fmt.Sprintf("/services/%s", serviceName)
resp, err := cli.Get(ctx, discoveryKey, clientv3.WithPrefix())
if err != nil {
log.Fatal(err)
}
fmt.Println("Discovered services:")
for _, kv := range resp.Kvs {
fmt.Printf("Key: %s, Value: %s\n", kv.Key, kv.Value)
}
}
可以结合 watch 机制优化更新服务变更。