etcd简介
浅谈etcd服务注册与发现
etcd官网
etcd中文文档
apt安装etcd,启动命令十分简单etcd
。
etcd分为v2版本和v3版本,命令有所不一样,使用命令etcdctl h
查看
如上图所示并没有出现API的版本,此时是使用默认的v2版本,但是v2版本很多命令使用不了,因此切换为v3版本,命令如下:
# 设置命令为v3
export ETCDCTL_API=3
# 查看所有的key,会出现两行,第一行key,第二行value
etcdctl get --prefix ""
etcd是一个k-v存储的格式和redis类似,使用etcdctl set k v
存储数据,使用etcdctl get k
获取数据。
go中使用etcd
安装
go get go.etcd.io/etcd/clientv3
客户端代码
package main
import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"time"
)
// etcd client put/get demo
// use etcd/clientv3
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
// handle error!
fmt.Printf("connect to etcd failed, err:%v\n", err)
return
}
fmt.Println("connect to etcd success")
defer cli.Close()
//put
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err = cli.Put(ctx, "zhangsan", "yes")
cancel()
if err != nil {
fmt.Printf("put to etcd failed, err:%v\n", err)
return
}
// get
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, "zhangsan")
cancel()
if err != nil {
fmt.Printf("get from etcd failed, err:%v\n", err)
return
}
for _, ev := range resp.Kvs {
fmt.Printf("%s:%s\n", ev.Key, ev.Value)
}
}
启动etcd服务器
etcd --listen-client-urls 'http://0.0.0.0:2379' --advertise-client-urls 'http://0.0.0.0:2379'
不要直接
etcd
启动,会报错context deadline exceeded
运行客户端代码
可以看出etcd是键值对存储的。
go-zero使用etcd
etcd最具特色的功能是订阅与发布和监测变更,在etcd作为服务注册与查找时,etcd提供了检测功能,开启该功能后会持续想etcd服务器发送心跳,如果服务停掉或者心跳停止,etcd服务器就会删除该次记录(服务器自主完成)。
一般来说,rpc服务连接etcd服务器发送心跳,实现服务到服务调度中心的注册,服务名称和地址被存储到etcd服务器,另一个服务调用时不是直接连接rpc服务器,而是先根据服务名称在etcd中找到ip地址,在通过ip地址调用服务。这样以来服务名称和其对于的地址就与源代码解耦,在部署到不同ip的服务器时无需修改源代码。
- 开启etcd服务器
etcd --listen-client-urls 'http://0.0.0.0:2379' --advertise-client-urls 'http://0.0.0.0:2379'
- key存储并开启心跳检测
package main
import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"time"
)
func main() {
config := clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
}
client, err := clientv3.New(config)
if err != nil {
fmt.Println(err)
return
}
defer client.Close()
lease := clientv3.NewLease(client) // 创建一个心跳检测
leaseResp, err := lease.Grant(context.Background(), 10) // 配置检测超时 10秒,查过检测时间etcd服务器自动删除k-v键值对
if err != nil {
fmt.Println(err)
return
}
leaseRespChan, _ := lease.KeepAlive(context.Background(), leaseResp.ID) // 通过keeplive方法定期发送心跳
kv := clientv3.NewKV(client)
key := fmt.Sprintf("/services/http/%d", leaseResp.ID)
value := `{"host": "localhost", "port": 2379}`
fmt.Println("key", key)
// 绑定带有心跳的服务并将自身服务信息put到etcd中心
_, err = kv.Put(context.Background(), key, value, clientv3.WithLease(leaseResp.ID))
if err != nil {
fmt.Println(err)
return
}
fmt.Println("register service success")
for {
select {
case i := <-leaseRespChan:
fmt.Println("heart beat", i.String()) // 心跳信息
}
}
}
如上图所示,服务etcd心跳会一直发送到etcd服务器,那么可以在rpc服务下启动一个心跳,一直检测服务的状态,rpc服务如果宕机心跳停止,etcd服务器就会删除该服务的k-v记录。
etcd服务器的心跳检测时etcd服务器自动完成的,开发者只需要配置心跳时间,调用心跳方法就可以了,服务器会根据心跳自动管理数据。
在go-zero中已经集成了etcd,通过配置文件的形式,在goctl下都会有,如下配置:
Etcd:
Hosts:
- 127.0.0.1:2379
Key: rpcservice.rpc
该部分配置etcd服务器的接口和key。配置etcd服务器后需要设置心跳,如下步骤:
- 下载etcd客户端
go get go.etcd.io/etcd/clientv3
- 配置客户端调用心跳函数
//etcd服务端启动
etcdConfig := clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
}
//etcd客户端
etcdClient, err := clientv3.New(etcdConfig)
if err != nil {
println(err)
return
}
defer etcdClient.Close()
//配置心跳
lease := clientv3.NewLease(etcdClient)
lgr, err := lease.Grant(context.Background(), 10) //10秒发送一次心跳
if err != nil {
println(err)
return
}
c2, _ := lease.KeepAlive(context.Background(), lgr.ID)
//通过客户端获取k-v存储对象
k := clientv3.NewKV(etcdClient)
key := "order.rpc" //可以同配置文件中获取
value := "localhost:2379" //不同服务器的ip不一样,这里设置为localhost或者通过方法获取本机ip也可以
k.Put(context.Background(), key, value, clientv3.WithLease(lgr.ID))
//打印心跳检测返回的数据
for {
if len(c2) != 0 {
fmt.Println("heart beat ", <-c2)
}
}
报错如下:
# github.com/coreos/etcd/clientv3/balancer/picker
C:\Users\sspaas\go\pkg\mod\github.com\coreos\etcd@v3.3.27+incompatible\clientv3\balancer\picker\err.go:37:53: undefined: balancer.PickOptions
C:\Users\sspaas\go\pkg\mod\github.com\coreos\etcd@v3.3.27+incompatible\clientv3\balancer\picker\roundrobin_balanced.go:55:63: undefined: balancer.PickOptions
# github.com/coreos/etcd/clientv3/balancer/resolver/endpoint
C:\Users\sspaas\go\pkg\mod\github.com\coreos\etcd@v3.3.27+incompatible\clientv3\balancer\resolver\endpoint\endpoint.go:114:87: undefined: resolver.BuildOption
C:\Users\sspaas\go\pkg\mod\github.com\coreos\etcd@v3.3.27+incompatible\clientv3\balancer\resolver\endpoint\endpoint.go:182:40: undefined: resolver.ResolveNowOption
原因时grpc和etcd版本冲突问题参考
又报错:
原因主要是etcd中使用的bbolt和grpc版本冲突引起参考
解决冲突后,在服务气短通过检测心跳的方式将rpc的名称为ip的k-v存储到rpc服务器中,然后通过其他客户端通过服务名称获取值后进行rpc远程调用即可。
客户端程序
如果遇到zero集成的etcd和grpc有冲突的话,使用独立的etcd,不要在使用zero集成的etcd。代码如下:
package main
import (
"context"
"fmt"
"time"
"go.etcd.io/etcd/clientv3"
)
// etcd client put/get demo
// use etcd/clientv3
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
// handle error!
fmt.Printf("connect to etcd failed, err:%v\n", err)
return
}
fmt.Println("connect to etcd success")
defer cli.Close()
//put
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err = cli.Put(ctx, "zhangsan", "yes")
cancel()
if err != nil {
fmt.Printf("put to etcd failed, err:%v\n", err)
return
}
// get
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, "demo.rpc")
cancel()
if err != nil {
fmt.Printf("get from etcd failed, err:%v\n", err)
return
}
for _, ev := range resp.Kvs {
fmt.Printf("%s:%s\n", ev.Key, ev.Value)
}
}
服务端程序
package main
import (
"context"
"fmt"
"time"
"go.etcd.io/etcd/clientv3"
)
func main() {
config := clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
}
client, err := clientv3.New(config)
if err != nil {
fmt.Println(err)
return
}
defer client.Close()
lease := clientv3.NewLease(client) // 创建一个租约
leaseResp, err := lease.Grant(context.Background(), 10) // 设置租约超时 10秒
if err != nil {
fmt.Println(err)
return
}
leaseRespChan, _ := lease.KeepAlive(context.Background(), leaseResp.ID) // 通过keeplive定期发送心跳
kv := clientv3.NewKV(client)
key := "demo.rpc"
value := "127.0.0.1:9000"
fmt.Println("key", key)
// 绑定带有心跳的租约并将自身服务信息put到etcd中心
_, err = kv.Put(context.Background(), key, value, clientv3.WithLease(leaseResp.ID))
if err != nil {
fmt.Println(err)
return
}
fmt.Println("register service success")
for {
select {
case i := <-leaseRespChan:
fmt.Println("heart beat", i.String()) // 心跳信息
}
}
}
别忘了启动etcd服务器
etcd --listen-client-urls 'http://0.0.0.0:2379' --advertise-client-urls 'http://0.0.0.0:2379'
服务端心跳
获取键值对
go语言学习网站