etcd的使用

news2024/7/6 20:04:43

什么是etcd

ETCD是一个分布式、可靠的key-value存储的分布式系统,用于存储分布式系统中的关键数据;当然,它不仅仅用于存储,还提供配置共享及服务发现;基于Go语言实现 。

etcd的特点

  • 完全复制:集群中的每个节点都可以使用完整的存档
  • 高可用性:Etcd可用于避免硬件的单点故障或网络问题
  • 一致性:每次读取都会返回跨多主机的最新写入
  • 简单:包括一个定义良好、面向用户的API(gRPC)
  • 安全:实现了带有可选的客户端证书身份验证的自动化TLS
  • 可靠:使用Raft算法实现了强一致、高可用的服务存储目录

etcd的应用场景

服务注册与发现

服务发现还能注册

服务注册发现解决的是分布式系统中最常见的问题之一,即在同一个分布式系统中,找到我们需要的目标服务,建立连接,然后完成整个链路的调度。

本质上来说,服务发现就是想要了解集群中是否有进程在监听 udp 或 tcp 端口,并且通过名字就可以查找和连接。要解决服务发现的问题,需要有下面三大支柱,缺一不可。

1、一个强一致性、高可用的服务存储目录。基于Raft算法的etcd天生就是这样一个强一致性高可用的服务存储目录。

2、一种注册服务和监控服务健康状态的机制。用户可以在etcd中注册服务,并且对注册的服务设置key TTL,定时保持服务的心跳以达到监控健康状态的效果。

3、一种查找和连接服务的机制。通过在 etcd 指定的主题下注册的服务也能在对应的主题下查找到。为了确保连接,我们可以在每个服务机器上都部署一个Proxy模式的etcd,这样就可以确保能访问etcd集群的服务都能互相连接。

image

一个用户的api请求,可能调用多个微服务资源,这些服务我们可以使用etcd进行服务注册和服务发现,当每个服务启动的时候就注册到etcd中,当我们需要使用的时候,直接在etcd中寻找,调用即可。

当然,每个服务的实例不止一个,比如我们的用户服务,我们可能启动了多个实例,这些实例在服务启动过程中全部注册到了etcd中,但是某个实例可能出现故障重启,这时候就etcd在进行转发的时候,就会屏蔽到故障的实例节点,只向正常运行的实例,进行请求转发。

image

来看个服务注册发现的demo

这里放一段比较核心的代码,这里摘录了我们线上正在使用的etcd实现grpc服务注册和发现的实现,具体的实现可参考,etcd实现grpc的服务注册和服务发现

对于etcd中的连接,我们每个都维护一个租约,通过KeepAlive自动续保。如果租约过期则所有附加在租约上的key将过期并被删除,即所对应的服务被拿掉。

package discovery

import (
	"context"
	"encoding/json"
	"errors"
	"net/http"
	"strconv"
	"strings"
	"time"

	clientv3 "go.etcd.io/etcd/client/v3"
	"go.uber.org/zap"
)

// Register for grpc server
type Register struct {
	EtcdAddrs   []string
	DialTimeout int

	closeCh     chan struct{}
	leasesID    clientv3.LeaseID
	keepAliveCh <-chan *clientv3.LeaseKeepAliveResponse

	srvInfo Server
	srvTTL  int64
	cli     *clientv3.Client
	logger  *zap.Logger
}

// NewRegister create a register base on etcd
func NewRegister(etcdAddrs []string, logger *zap.Logger) *Register {
	return &Register{
		EtcdAddrs:   etcdAddrs,
		DialTimeout: 3,
		logger:      logger,
	}
}

// Register a service
func (r *Register) Register(srvInfo Server, ttl int64) (chan<- struct{}, error) {
	var err error

	if strings.Split(srvInfo.Addr, ":")[0] == "" {
		return nil, errors.New("invalid ip")
	}

	if r.cli, err = clientv3.New(clientv3.Config{
		Endpoints:   r.EtcdAddrs,
		DialTimeout: time.Duration(r.DialTimeout) * time.Second,
	}); err != nil {
		return nil, err
	}

	r.srvInfo = srvInfo
	r.srvTTL = ttl

	if err = r.register(); err != nil {
		return nil, err
	}

	r.closeCh = make(chan struct{})

	go r.keepAlive()

	return r.closeCh, nil
}

// Stop stop register
func (r *Register) Stop() {
	r.closeCh <- struct{}{}
}

// register 注册节点
func (r *Register) register() error {
	leaseCtx, cancel := context.WithTimeout(context.Background(), time.Duration(r.DialTimeout)*time.Second)
	defer cancel()

	leaseResp, err := r.cli.Grant(leaseCtx, r.srvTTL)
	if err != nil {
		return err
	}
	r.leasesID = leaseResp.ID
	if r.keepAliveCh, err = r.cli.KeepAlive(context.Background(), leaseResp.ID); err != nil {
		return err
	}

	data, err := json.Marshal(r.srvInfo)
	if err != nil {
		return err
	}
	_, err = r.cli.Put(context.Background(), BuildRegPath(r.srvInfo), string(data), clientv3.WithLease(r.leasesID))
	return err
}

// unregister 删除节点
func (r *Register) unregister() error {
	_, err := r.cli.Delete(context.Background(), BuildRegPath(r.srvInfo))
	return err
}

// keepAlive
func (r *Register) keepAlive() {
	ticker := time.NewTicker(time.Duration(r.srvTTL) * time.Second)
	for {
		select {
		case <-r.closeCh:
			if err := r.unregister(); err != nil {
				r.logger.Error("unregister failed", zap.Error(err))
			}
			if _, err := r.cli.Revoke(context.Background(), r.leasesID); err != nil {
				r.logger.Error("revoke failed", zap.Error(err))
			}
			return
		case res := <-r.keepAliveCh:
			if res == nil {
				if err := r.register(); err != nil {
					r.logger.Error("register failed", zap.Error(err))
				}
			}
		case <-ticker.C:
			if r.keepAliveCh == nil {
				if err := r.register(); err != nil {
					r.logger.Error("register failed", zap.Error(err))
				}
			}
		}
	}
}

消息发布和订阅

在分布式系统中,最适用的一种组件间通信方式就是消息发布与订阅。即构建一个配置共享中心,数据提供者在这个配置中心发布消息,而消息使用者则订阅他们关心的主题,一旦主题有消息发布,就会实时通知订阅者。通过这种方式可以做到分布式系统配置的集中式管理与动态更新

  • 应用中用到的一些配置信息放到 etcd 上进行集中管理。这类场景的使用方式通常是这样:应用在启动的时候主动从etcd获取一次配置信息,同时,在etcd节点上注册一个Watcher并等待,以后每次配置有更新的时候,etcd都会实时通知订阅者,以此达到获取最新配置信息的目的。
  • 分布式搜索服务中,索引的元信息和服务器集群机器的节点状态存放在etcd中,供各个客户端订阅使用。使用etcd的key TTL功能可以确保机器状态是实时更新的。
  • 分布式日志收集系统。这个系统的核心工作是收集分布在不同机器的日志。收集器通常是按照应用(或主题)来分配收集任务单元,因此可以在 etcd 上创建一个以应用(主题)命名的目录 P,并将这个应用(主题相关)的所有机器 ip,以子目录的形式存储到目录 P 上,然后设置一个etcd递归的Watcher,递归式的监控应用(主题)目录下所有信息的变动。这样就实现了机器 IP(消息)变动的时候,能够实时通知到收集器调整任务分配。
  • 系统中信息需要动态自动获取与人工干预修改信息请求内容的情况。通常是暴露出接口,例如 JMX 接口,来获取一些运行时的信息。引入 etcd 之后,就不用自己实现一套方案了,只要将这些信息存放到指定的 etcd 目录中即可,etcd 的这些目录就可以通过 HTTP 的接口在外部访问。

image

消息发布被订阅的实际应用

我们一个性能要求比较高的项目,所需要的配置信息,存放到本地的localCache中,通过etcd的消息发布和订阅实现,实现配置信息在不同节点同步更新。

来看下如何实现

func init() {
	handleMap = make(map[string]func([]byte) error)
}

var handleMap map[string]func([]byte) error

func RegisterUpdateHandle(key string, f func([]byte) error) {
	handleMap[key] = f
}

type PubClient interface {
	Pub(ctx context.Context, key string, val string) error
}

var Pub PubClient

type PubClientImpl struct {
	client *clientv3.Client
	logger *zap.Logger
	prefix string
}

// 监听变化,实时更新到本地的map中
func (c *PubClientImpl) Watcher() {
	ctx, cancel := context.WithCancel(context.Background())
	rch := c.client.Watch(ctx, c.prefix, clientv3.WithPrefix())
	defer cancel()

	for wresp := range rch {
		for _, ev := range wresp.Events {
			switch ev.Type {
			case mvccpb.PUT:
				c.logger.Warn("Cache Update", zap.Any("value", ev.Kv))
				err := handleCacheUpdate(ev.Kv)
				if err != nil {
					c.logger.Error("Cache Update", zap.Error(err))
				}
			case mvccpb.DELETE:
				c.logger.Error("Cache Delete NOT SUPPORT")
			}
		}
	}
}

func handleCacheUpdate(val *mvccpb.KeyValue) error {
	if val == nil {
		return nil
	}
	f := handleMap[string(val.Key)]
	if f != nil {
		return f(val.Value)
	}
	return nil
}

func (c *PubClientImpl) Pub(ctx context.Context, key string, val string) error {
	ctx, _ = context.WithTimeout(ctx, time.Second*10)
	_, err := c.client.Put(ctx, key, val)
	if err != nil {
		return err
	}
	return nil
}

负载均衡

关于负载均衡,通常意义上有两种

  • 软负载,顾名思义就是靠软件手段来实现的负载均衡。软负载也通常被称为 4层或 7 层负载!
  • 硬负载,就是靠硬件实现的负载均衡,数据包转发功能。常见的就是 F5。

通过etcd实现的负载均衡就是软负载,在分布式系统中,高并发的场景下,我们通常会构建服务的集群,当某一个机器宕机了,别的机器可以马上顶替上来。

etcd中实现负载均衡,例如我们上文的例子服务注册和发现,对于一个用户服务来讲,后面的用户服务的实例可能是多个,每个都有自己的ip和port,这些服务会在项目启动的时候全部注册到etcd中,所以当使用的时候,每次etcd会轮询出一个健康的服务实例,来处理用户的请求。

image

分布式通知与协调

这里说到的分布式通知与协调,与消息发布和订阅有些相似。都用到了etcd中的Watcher机制,通过注册与异步通知机制,实现分布式环境下不同系统之间的通知与协调,从而对数据变更做到实时处理。实现方式通常是这样:不同系统都在etcd上对同一个目录进行注册,同时设置Watcher观测该目录的变化(如果对子目录的变化也有需要,可以设置递归模式),当某个系统更新了etcd的目录,那么设置了Watcher的系统就会收到通知,并作出相应处理。

  • 通过etcd进行低耦合的心跳检测。检测系统和被检测系统通过 etcd 上某个目录关联而非直接关联起来,这样可以大大减少系统的耦合性。
  • 通过etcd完成系统调度。某系统有控制台和推送系统两部分组成,控制台的职责是控制推送系统进行相应的推送工作。管理人员在控制台作的一些操作,实际上是修改了etcd上某些目录节点的状态,而etcd就把这些变化通知给注册了Watcher的推送系统客户端,推送系统再作出相应的推送任务。
  • 通过etcd完成工作汇报。大部分类似的任务分发系统,子任务启动后,到etcd来注册一个临时工作目录,并且定时将自己的进度进行汇报(将进度写入到这个临时目录),这样任务管理者就能够实时知道任务进度。

image

分布式锁

因为etcd使用Raft算法保持了数据的强一致性,某次操作存储到集群中的值必然是全局一致的,所以很容易实现分布式锁。锁服务有两种使用方式,一是保持独占,二是控制时序。

首先,来看一下分布式锁应该具备哪些条件。

  • 互斥性:在任意时刻,对于同一个锁,只有一个客户端能持有,从而保证一个共享资源同一时间只能被一个客户端操作;
  • 安全性:即不会形成死锁,当一个客户端在持有锁的期间崩溃而没有主动解锁的情况下,其持有的锁也能够被正确释放,并保证后续其它客户端能加锁;
  • 可用性:当提供锁服务的节点发生宕机等不可恢复性故障时,“热备” 节点能够接替故障的节点继续提供服务,并保证自身持有的数据与故障节点一致。
  • 对称性:对于任意一个锁,其加锁和解锁必须是同一个客户端,即客户端 A 不能把客户端 B 加的锁给解了。

etcd的 Watch 机制、Lease 机制、Revision 机制和 Prefix 机制,这些机制赋予了 Etcd 实现分布式锁的能力。

  • Lease 机制
    即租约机制(TTL,Time To Live),Etcd 可以为存储的 Key-Value 对设置租约,当租约到期,Key-Value 将失效删除;同时也支持续约,通过客户端可以在租约到期之前续约,以避免 Key-Value 对过期失效。Lease 机制可以保证分布式锁的安全性,为锁对应的 Key 配置租约,即使锁的持有者因故障而不能主动释放锁,锁也会因租约到期而自动释放。

  • Revision 机制
    每个 Key 带有一个 Revision 号,每进行一次事务便加一,因此它是全局唯一的,如初始值为 0,进行一次 put(key, value),Key 的 Revision 变为 1,同样的操作,再进行一次,Revision 变为 2;换成 key1 进行put(key1, value)操作,Revision将变为 3;这种机制有一个作用:通过 Revision 的大小就可以知道写操作的顺序。在实现分布式锁时,多个客户端同时抢锁,根据 Revision 号大小依次获得锁,可以避免 “羊群效应” (也称“惊群效应”),实现公平锁。

  • Prefix 机制
    即前缀机制,也称目录机制,例如,一个名为 /mylock 的锁,两个争抢它的客户端进行写操作,实际写入的Key分别为:key1=“/mylock/UUID1”,key2=“/mylock/UUID2”,其中,UUID表示全局唯一的ID,确保两个Key的唯一性。很显然,写操作都会成功,但返回的Revision不一样,那么,如何判断谁获得了锁呢?通过前缀“/mylock”查询,返回包含两个Key-Value对的Key-Value列表,同时也包含它们的Revision,通过Revision大小,客户端可以判断自己是否获得锁,如果抢锁失败,则等待锁释放(对应的 Key 被删除或者租约过期),然后再判断自己是否可以获得锁。

  • Watch 机制
    即监听机制,Watch机制支持监听某个固定的Key,也支持监听一个范围(前缀机制),当被监听的Key或范围发生变化,客户端将收到通知;在实现分布式锁时,如果抢锁失败,可通过Prefix机制返回的Key-Value列表获得Revision比自己小且相差最小的 Key(称为 Pre-Key),对Pre-Key进行监听,因为只有它释放锁,自己才能获得锁,如果监听到Pre-Key的DELETE事件,则说明Pre-Key已经释放,自己已经持有锁。

来看下etcd中锁是如何实现的

// Mutex implements the sync Locker interface with etcd
type Mutex struct {
	s *Session

	pfx   string // 前缀
	myKey string // key
	myRev int64 // 自增的Revision
	hdr   *pb.ResponseHeader
}

// Lock 使用可取消的context锁定互斥锁。如果context被取消
// 在尝试获取锁时,互斥锁会尝试清除其过时的锁条目。
func (m *Mutex) Lock(ctx context.Context) error {
	resp, err := m.tryAcquire(ctx)
	if err != nil {
		return err
	}
	// if no key on prefix / the minimum rev is key, already hold the lock
	ownerKey := resp.Responses[1].GetResponseRange().Kvs
	if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
		m.hdr = resp.Header
		return nil
	}
	client := m.s.Client()

	// waitDeletes 有效地等待,直到所有键匹配前缀且不大于
	// 创建的version。
	_, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
	// release lock key if wait failed
	if werr != nil {
		m.Unlock(client.Ctx())
		return werr
	}

	// make sure the session is not expired, and the owner key still exists.
	gresp, werr := client.Get(ctx, m.myKey)
	if werr != nil {
		m.Unlock(client.Ctx())
		return werr
	}

	if len(gresp.Kvs) == 0 { // is the session key lost?
		return ErrSessionExpired
	}
	m.hdr = gresp.Header

	return nil
}

func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
	s := m.s
	client := m.s.Client()
	// s.Lease()租约
	m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
	// 比较Revision, 这里构建了一个比较表达式
	// 具体的比较逻辑在下面的client.Txn用到
	// 如果等于0,写入当前的key,并设置租约,
	// 否则获取这个key,重用租约中的锁(这里主要目的是在于重入)
	// 通过第二次获取锁,判断锁是否存在来支持重入
	// 所以只要租约一致,那么是可以重入的.
	cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
	// 通过 myKey 将自己锁在waiters;最早的waiters将获得锁
	put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
	// 获取已经拿到锁的key的信息
	get := v3.OpGet(m.myKey)
	// 仅使用一个 RPC 获取当前持有者以完成无竞争路径
	getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
	// 这里是比较的逻辑,如果等于0,写入当前的key,否则则读取这个key
	// 大佬的代码写的就是奇妙
	resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
	if err != nil {
		return nil, err
	}

	// 根据比较操作的结果写入Revision到m.myRev中
	m.myRev = resp.Header.Revision
	if !resp.Succeeded {
		m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
	}
	return resp, nil
}

// 抽象出了一个session对象来持续保持租约不过期
func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
	...
	ctx, cancel := context.WithCancel(ops.ctx)
	// 保证锁,在线程的活动期间,实现锁的的续租
	keepAlive, err := client.KeepAlive(ctx, id)
	if err != nil || keepAlive == nil {
		cancel()
		return nil, err
	}

	...
	return s, nil
}

设计思路:

1、多个请求来前抢占锁,通过Revision来判断锁的先后顺序;

2、如果有比当前key的Revision小的Revision存在,说明有key已经获得了锁;

3、等待直到前面的key被删除,然后自己就获得了锁。

通过etcd实现的锁,直接包含了锁的续租,如果使用Redis还要自己去实现,相比较使用更简单。

image

来实现一个etcd的锁

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	clientv3 "go.etcd.io/etcd/client/v3"
	"go.etcd.io/etcd/client/v3/concurrency"
)

func main() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"localhost:2379"},
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer cli.Close()

	// m1来抢锁
	go func() {
		s1, err := concurrency.NewSession(cli)
		if err != nil {
			log.Fatal(err)
		}
		defer s1.Close()
		m1 := concurrency.NewMutex(s1, "/my-lock/")

		// acquire lock for s1
		if err := m1.Lock(context.TODO()); err != nil {
			log.Fatal(err)
		}
		fmt.Println("m1---获得了锁")

		time.Sleep(time.Second * 3)

		// 释放锁
		if err := m1.Unlock(context.TODO()); err != nil {
			log.Fatal(err)
		}
		fmt.Println("m1++释放了锁")
	}()

	// m2来抢锁
	go func() {
		s2, err := concurrency.NewSession(cli)
		if err != nil {
			log.Fatal(err)
		}
		defer s2.Close()
		m2 := concurrency.NewMutex(s2, "/my-lock/")
		if err := m2.Lock(context.TODO()); err != nil {
			log.Fatal(err)
		}
		fmt.Println("m2---获得了锁")

		// mock业务执行的时间
		time.Sleep(time.Second * 3)

		// 释放锁
		if err := m2.Unlock(context.TODO()); err != nil {
			log.Fatal(err)
		}

		fmt.Println("m2++释放了锁")
	}()

	time.Sleep(time.Second * 10)
}

打印下输出

m2---获得了锁
m2++释放了锁
m1---获得了锁
m1++释放了锁

分布式队列

即创建一个先进先出的队列保持顺序。另一种比较有意思的实现是在保证队列达到某个条件时再统一按顺序执行。这种方法的实现可以在/queue这个目录中另外建立一个/queue/condition节点。

  • condition 可以表示队列大小。比如一个大的任务需要很多小任务就绪的情况下才能执行,每次有一个小任务就绪,就给这个 condition 数字加 1,直到达到大任务规定的数字,再开始执行队列里的一系列小任务,最终执行大任务。
  • condition 可以表示某个任务在不在队列。这个任务可以是所有排序任务的首个执行程序,也可以是拓扑结构中没有依赖的点。通常,必须执行这些任务后才能执行队列中的其他任务。
  • condition 还可以表示其它的一类开始执行任务的通知。可以由控制程序指定,当 condition 出现变化时,开始执行队列任务。

来看下实现

入队

func newUniqueKV(kv v3.KV, prefix string, val string) (*RemoteKV, error) {
	for {
		newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
		// 创建对应的key
		rev, err := putNewKV(kv, newKey, val, v3.NoLease)
		if err == nil {
			return &RemoteKV{kv, newKey, rev, val}, nil
		}
		// 如果之前已经创建了,就返回
		if err != ErrKeyExists {
			return nil, err
		}
	}
}

// 只有在没有创建的时候才能创建成功
func putNewKV(kv v3.KV, key, val string, leaseID v3.LeaseID) (int64, error) {
	cmp := v3.Compare(v3.Version(key), "=", 0)
	req := v3.OpPut(key, val, v3.WithLease(leaseID))
	// 这里还用到了这种比较的逻辑
	txnresp, err := kv.Txn(context.TODO()).If(cmp).Then(req).Commit()
	if err != nil {
		return 0, err
	}
	// 已经存在则返回错误
	if !txnresp.Succeeded {
		return 0, ErrKeyExists
	}
	return txnresp.Header.Revision, nil
}

出队

// Dequeue处理的是一个先进新出的队列
// 如果队列为空,Dequeue将会阻塞直到里面有值塞入
func (q *Queue) Dequeue() (string, error) {
	resp, err := q.client.Get(q.ctx, q.keyPrefix, v3.WithFirstRev()...)
	if err != nil {
		return "", err
	}

	kv, err := claimFirstKey(q.client, resp.Kvs)
	if err != nil {
		return "", err
	} else if kv != nil {
		return string(kv.Value), nil
		// more 表示在请求的范围内是否还有更多的键要返回。
		// 则进行Dequeue重试
	} else if resp.More {
		// missed some items, retry to read in more
		return q.Dequeue()
	}

	// nothing yet; wait on elements
	ev, err := WaitPrefixEvents(
		q.client,
		q.keyPrefix,
		resp.Header.Revision,
		[]mvccpb.Event_EventType{mvccpb.PUT})
	if err != nil {
		return "", err
	}

	ok, err := deleteRevKey(q.client, string(ev.Kv.Key), ev.Kv.ModRevision)
	if err != nil {
		return "", err
	} else if !ok {
		// 如果删除失败,重试
		return q.Dequeue()
	}
	return string(ev.Kv.Value), err
}

总结

1、这里的入队是一个先进新出的队列;

2、出队的实现也很简单,如果队列为空,Dequeue将会阻塞直到里面有值塞入;

来个demo

package main

import (
	"fmt"
	"log"
	"time"

	clientv3 "go.etcd.io/etcd/client/v3"
	recipe "go.etcd.io/etcd/client/v3/experimental/recipes"
)

func main() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints: []string{"localhost:2379"},
	})
	if err != nil {
		log.Fatalf("error New (%v)", err)
	}

	go func() {
		q := recipe.NewQueue(cli, "testq")
		for i := 0; i < 5; i++ {
			if err := q.Enqueue(fmt.Sprintf("%d", i)); err != nil {
				log.Fatalf("error enqueuing (%v)", err)
			}
		}
	}()

	go func() {
		q := recipe.NewQueue(cli, "testq")
		for i := 10; i < 100; i++ {
			if err := q.Enqueue(fmt.Sprintf("%d", i)); err != nil {
				log.Fatalf("error enqueuing (%v)", err)
			}
		}
	}()

	q := recipe.NewQueue(cli, "testq")
	for i := 0; i < 100; i++ {
		s, err := q.Dequeue()
		if err != nil {
			log.Fatalf("error dequeueing (%v)", err)
		}
		fmt.Println(s)
	}

	time.Sleep(time.Second * 3)
}

集群监控与Leader竞选

通过etcd来进行监控实现起来非常简单并且实时性强

1、前面几个场景已经提到Watcher机制,当某个节点消失或有变动时,Watcher会第一时间发现并告知用户。

2、节点可以设置TTL key,比如每隔 30s 发送一次心跳使代表该机器存活的节点继续存在,否则节点消失。

这样就可以第一时间检测到各节点的健康状态,以完成集群的监控要求

另外,使用分布式锁,可以完成Leader竞选。这种场景通常是一些长时间CPU计算或者使用IO操作的机器,只需要竞选出的Leader计算或处理一次,就可以把结果复制给其他的Follower。从而避免重复劳动,节省计算资源。

这个的经典场景是搜索系统中建立全量索引。如果每个机器都进行一遍索引的建立,不但耗时而且建立索引的一致性不能保证。通过在etcd的CAS机制同时创建一个节点,创建成功的机器作为Leader,进行索引计算,然后把计算结果分发到其它节点。

image

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/743335.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring Boot 系列1 -- 概念、创建和使用

目录 1. 什么是Spring Boot? 2. Spring Boot 的优点 3. Spring Boot 项目的创建 3.1 使用IDEA创建 3.2 网页版创建 4. 项目目录和项目运行 4.1 项目目录 4.2 运行项目 4.3 使用Spring Boot项目实现网页输出Hello World 5. 路径问题 1. 什么是Spring Boot? Spring …

【vue+vant使用请求loading】【vant如何关闭Toast】

vuevant使用请求loading 文档&#xff1a;https://vant-contrib.gitee.io/vant/v2/#/zh-CN/toast 需求&#xff1a;目前需求是在请求中使用toast-loading&#xff0c;请求完成后关闭这个toast&#xff1b; 问题&#xff1a;vant如何关闭toast呢&#xff1f; 解决&#xff1a…

【UE4 C++】05-添加组件

在“SCharacter.h”中添加如下代码&#xff0c;从而为“SCharacter”添加弹簧臂和摄像机组件。 在“SCharacter.cpp”中添加如下代码 重新生成解决方案 打开虚幻编辑器&#xff0c;此时在视口中可以看到新添加的摄像机组件&#xff0c;摄像机处于世界坐标原点&#xff0c;并不会…

VisProg解析:根据自然语言指令解决复杂视觉任务

VisProg&#xff1a;根据自然语言指令解决复杂视觉任务 1. 介绍 VisProg 是一种神经符号系统&#xff0c;可以根据自然语言指令解决复杂的组合视觉任务。VisProg 使用 GPT3 的上下文学习能力来生成 Python 程序&#xff0c;然后执行这些程序以获得解决方案和全面且可解释的基…

前端学习——css盒子模型、css3新特性、伪类、布局0711TODO

样式还是得具体使用才能理解&#xff0c;不然会忘记也理解不透彻&#xff1b;还有定位&#xff0c;元素溢出&#xff0c;浮动&#xff0c;布局水平&垂直对齐&#xff1a; css3新特性 1过渡 2 动画 3 2D、3D转换 伪类 三种定位方式 弹性布局/栅格布局

VS+QT+OpenCV+C++多线程多摄像头视频监控采集窗体

程序示例精选 VSQTOpenCV多线程多摄像头视频监控采集窗体 如需安装运行环境或远程调试&#xff0c;见文章底部个人QQ名片&#xff0c;由专业技术人员远程协助&#xff01; 前言 这篇博客针对<<VSQTOpenCV多线程多摄像头视频监控采集窗体>>编写代码&#xff0c;代码…

【Matlab】智能优化算法_算数优化算法AOA

【Matlab】智能优化算法_算数优化算法AOA 1.背景介绍2.数学模型2.1 初始化阶段2.2 勘探阶段2.3 开采阶段 3.文件结构4.伪代码5.详细代码及注释5.1 AOA.m5.2 func_plot.m5.3 Get_F.m5.4 initialization.m5.5 main.m 6.运行结果7.参考文献 1.背景介绍 算术是数论的基本组成部分&a…

MySQL的表操作DML,DDL

建表 mysql> create table work(-> dept_id int(11) not null comment 部门号,-> staff_id int(11) not null comment 职工号,-> work_time date not null comment 工作时间,-> wage float(8.2) not null comment 工资,-> poli_face varchar(20) not null …

软考A计划-系统集成项目管理工程师-项目成本管理-中

点击跳转专栏>Unity3D特效百例点击跳转专栏>案例项目实战源码点击跳转专栏>游戏脚本-辅助自动化点击跳转专栏>Android控件全解手册点击跳转专栏>Scratch编程案例点击跳转>软考全系列 &#x1f449;关于作者 专注于Android/Unity和各种游戏开发技巧&#xff…

如何在本地组策略编辑器中启用或禁用剪贴板历史记录

复制粘贴是我们大家都会做的事情,可能一天要做多次。但是,如果你需要一次又一次地复制同样的几件事,你该怎么办?如何在设备上复制内容? 从Windows 10版本17666开始,微软正在解决这一问题,并将剪贴板提升到一个新的水平,只需按下Win+V,你将获得全新的剪贴板体验。 你…

session共享问题和其他常见问题及解决方案

目录 1.shirospringboot中session的共享问题 1.1 如何解决session共享问题 2. 解决前端不支持cookie的效果 2.1.如何把sessionId放入请求头 2.2.重写DefaultWebSessionManager的方法 3.设置前端前置路由守卫 4.如何防止恶意重复登录 5.退出 6.获取当前登录用户的信息 …

jQuery根据数据动态创建表格:列固定,行超出滚动条,绑定td点击事件等

示例如图&#xff0c;代码如下: html: <div class"layui-row" id"avTableulL"><ul></ul></div><div id"avTableulR"><div id"avTableulT"><ul></ul></div><div id"avT…

【华为认证】HCIP-Datacom 2023最新题库

正在备考华为认证的小伙伴应该知道&#xff0c;除了理论知识外&#xff0c;刷题也相当重要&#xff0c;周工这里有一份HCIAHCIP-Datacom带解析的最新题库 点赞留言 即可领取。 1. &#xff08;多选题&#xff09;ISIS的Hello报文主要分为哪几种类型? A.P2P LAN IIH B.…

UnityVR--机械臂场景12-简单流水线应用4

目录 一. 手爪 二. 红外线传感器 三. 工件生成器 四. 总结 上一篇已经实现了机械臂各种动作的控制&#xff0c;本篇实现一下其余的组成部分&#xff0c;比如手爪、传感器和自动放置工件等。 一. 手爪 手爪的模型调整就不多说了&#xff0c;需要设置的是Rigidbody、Collide…

在Visual Studio Code里导出8266固件

1.编辑 .vscode目录下 arduion.json 添加 一个配置项output即输出目录.当然你不设置其它软固件一样会生成,只是就不知道你能不能找到了.我的配置如下 当然这个路径你想写什么 就是什么 . 2. 切换到 arduion的项目文件 xxxx.ino.点击vsc右上的验证 即可在上面设置的目录下找到…

Nginx系列之 一 入门

目录 一、Nginx概述 二、yum安装 三、nginx.conf配置文件详解 3.1 全局块 3.2 events 块 3.3 HTTP 块 四、Nginx 常用命令 五、Nginx代理 4.1 正向代理 4.2 反向代理 六、Nginx的Master-Worker模式 6.1 Master进程的作用是&#xff1f; 6.2 Worker进程的作用是&am…

Layui动态树详解

Layui动态树详解 一、什么是动态树形&#xff1f;二、Layui动态树形基本使用三、动态加载数据4.案列1.实体类2.dao方法3.子实现类4.jsp页面 前言 在前端开发过程中&#xff0c;树形控件是比较常用的控件之一。而Layui框架中&#xff0c;也提供了基于jQuery的树形控件。除了普通…

小程序接口返回errno: 600009 errMsg: “request:fail invalid url “异常问题排查修复记录

小程序封装wxrequest更换域名baseurl后调用接口返回errMsg: "request:fail invalid url "&#xff0c;errno: 600009 控制台输出request的url也是正常的&#xff0c;起初怀疑是没配置域名白名单&#xff0c;但是小程序模拟器勾选了不校验合法域名的&#xff0c;而且…

PWM技术在嵌入式设备运行中的调节应用

PWM&#xff08;脉宽调制&#xff09;是一种通过改变信号的脉冲宽度来控制电压或电流的技术。PWM的等效电压是指将PWM信号转换为相应的直流电压或电流的数值。 在PWM信号中&#xff0c;占空比表示高电平和低电平脉冲宽度的比例。例如&#xff0c;一个占空比为50%的PWM信号意味…

ApiPost - 踩坑指南

1.应用场景 主要用于记录apipost遇到的坑, 以及为遇到的开发者提供参考. 2.学习/操作 1.文档阅读 chatgpt & 其他资料 ApiPost问答-localhost的坑的问题列表 localhost 不能正确解析为本机-ApiPost使用-ApiPost问答 断网了&#xff0c;还能ping通 127.0.0.1 吗&#xff1…