etcd选举源码分析和例子

news2024/12/27 13:50:28

本文主要介绍etcd在分布式多节点服务中如何实现选主。

1、基础知识

在开始之前,先介绍etcd中 Version, Revision, ModRevision, CreateRevision 几个基本概念。
1、version
作用域为key,表示某个key的版本,每个key刚创建的version为1,每次更新key这个值都会自增,表示这个key自创建以来更新的次数。
2、revision
作用域为集群,单调递增,集群内任何key的增删改都会使它自增。可以把它理解为集群的一个逻辑状态标志。记录了每一次集群内的增删改操作。
3、ModRevision
作用域为key,表示某个key修改时的版本,它等于修改这个key时的revision的值。
4、CreateRevision
作用域为key,表示某个key创建时的版本,等于创建这个key时revision的值,再删除之前会保持不变。
现在在etcd里面放一个key:
!
可以看到,Version是1,CreateRevision与Modrevision和Revision相等,都是7677720。
在修改了key的值以后:
在这里插入图片描述
version自增变为2,CreateRevision没有变动。modRevision等于修改时的Revsion。Revision已经跑到前面去了,因为此时还有其他的程序在修改etcd里面的key。

2、选举

先初始化一个session和选举:
	electionKey := "/my-election"

	// Create an election session
	session, err := concurrency.NewSession(client, concurrency.WithTTL(10))
	if err != nil {
		log.Fatal(err)
	}
	defer session.Close()

	election := concurrency.NewElection(session, electionKey)
在启动了三台服务后,在etcd里面找到以下三个key:

在这里插入图片描述
现在的leader是第一台:
在这里插入图片描述
去看下compaign的源码:

func (e *Election) Campaign(ctx context.Context, val string) error {
	s := e.session
	client := e.session.Client()

	//用leaseID与前面的key前缀拼成key
	k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease())
	//判断当前key的createRevision是否是0,也就是否创建
	txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
	txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
	txn = txn.Else(v3.OpGet(k))
	resp, err := txn.Commit()
	if err != nil {
		return err
	}
	//获取key的Revision
	e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
	if !resp.Succeeded {
		kv := resp.Responses[0].GetResponseRange().Kvs[0]
		e.leaderRev = kv.CreateRevision
		if string(kv.Value) != val {
			if err = e.Proclaim(ctx, val); err != nil {
				e.Resign(ctx)
				return err
			}
		}
	}

	_, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
	if err != nil {
		// clean up in case of context cancel
		select {
		case <-ctx.Done():
			e.Resign(client.Ctx())
		default:
			e.leaderSession = nil
		}
		return err
	}
	e.hdr = resp.Header

	return nil
}

用当前key和leaseid在etcd中创建一个key,并获取到key此时的Revision。

// waitDeletes efficiently waits until all keys matching the prefix and no greater
// than the create revision.
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
	getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
	for {
		resp, err := client.Get(ctx, pfx, getOpts...)
		if err != nil {
			return nil, err
		}
		if len(resp.Kvs) == 0 {
			return resp.Header, nil
		}
		lastKey := string(resp.Kvs[0].Key)
		if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
			return nil, err
		}
	}
}
func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
	cctx, cancel := context.WithCancel(ctx)
	defer cancel()

	var wr v3.WatchResponse
	wch := client.Watch(cctx, key, v3.WithRev(rev))
	for wr = range wch {
		for _, ev := range wr.Events {
			if ev.Type == mvccpb.DELETE {
				return nil
			}
		}
	}
	if err := wr.Err(); err != nil {
		return err
	}
	if err := ctx.Err(); err != nil {
		return err
	}
	return fmt.Errorf("lost watcher waiting for delete")
}

这里get的option,找到/my-ection前缀下最新创建的key,并且createRev的值小于等于当前key创建的createRev-1.

// WithLastCreate gets the key with the latest creation revision in the request range.
func WithLastCreate() []OpOption { return withTop(SortByCreateRevision, SortDescend) }
// WithMaxCreateRev filters out keys for Get with creation revisions greater than the given revision.
func WithMaxCreateRev(rev int64) OpOption { return func(op *Op) { op.maxCreateRev = rev } }

此时/my-election,在这个revision之前没有任何key,所以node2启动直接竞选到了主。
紧接着node3启动,在这个get,它能获取到node2创建的key,所以node3去watchnode2的的删除事件。
同理,node4启动以后,在这里的get,它获取的是node3的key,它去wachnode3的删除事件。
当node2释放后,node3获取到node2的删除事件变成主。node3释放以后,node4变成watch到node3
的删除事件变成主。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3、观察者

在上面的例子中,如果node4挂了以后,node2能继续变回为leader吗?
在引入observe以后,可以做到。
在这里插入图片描述
找到/my-election这个前缀下最开始创建的key,也就是node2。
然后把这个key放入返回ch中。

func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
	client := e.session.Client()

	defer close(ch)
	for {
		resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
		if err != nil {
			return
		}

		var kv *mvccpb.KeyValue
		var hdr *pb.ResponseHeader

		if len(resp.Kvs) == 0 {
			cctx, cancel := context.WithCancel(ctx)
			// wait for first key put on prefix
			opts := []v3.OpOption{v3.WithRev(resp.Header.Revision), v3.WithPrefix()}
			wch := client.Watch(cctx, e.keyPrefix, opts...)
			for kv == nil {
				wr, ok := <-wch
				if !ok || wr.Err() != nil {
					cancel()
					return
				}
				// only accept puts; a delete will make observe() spin
				for _, ev := range wr.Events {
					if ev.Type == mvccpb.PUT {
						hdr, kv = &wr.Header, ev.Kv
						// may have multiple revs; hdr.rev = the last rev
						// set to kv's rev in case batch has multiple Puts
						hdr.Revision = kv.ModRevision
						break
					}
				}
			}
			cancel()
		} else {
			hdr, kv = resp.Header, resp.Kvs[0]
		}

		select {
		case ch <- v3.GetResponse{Header: hdr, Kvs: []*mvccpb.KeyValue{kv}}:
		case <-ctx.Done():
			return
		}

		cctx, cancel := context.WithCancel(ctx)
		wch := client.Watch(cctx, string(kv.Key), v3.WithRev(hdr.Revision+1))
		keyDeleted := false
		for !keyDeleted {
			wr, ok := <-wch
			if !ok {
				cancel()
				return
			}
			for _, ev := range wr.Events {
				if ev.Type == mvccpb.DELETE {
					keyDeleted = true
					break
				}
				resp.Header = &wr.Header
				resp.Kvs = []*mvccpb.KeyValue{ev.Kv}
				select {
				case ch <- *resp:
				case <-cctx.Done():
					cancel()
					return
				}
			}
		}
		cancel()
	}
}

wch := client.Watch(cctx, string(kv.Key), v3.WithRev(hdr.Revision+1))
接下里watch/my-election下第一个创建的key,在当前revision以后的delete事件。如果有delete事件则把这个消息通知出去。也就是发现了当前主的节点发生了delete事件,主发生了变化。
在这里插入图片描述
同理,当node2 down掉,它的key被删除,node3变成了主,observe此时watch的就是node3的key,因为此时node3创建的key是/my-election下的第一个key。
完整代码:

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"time"

	clientv3 "go.etcd.io/etcd/client/v3"
	"go.etcd.io/etcd/client/v3/concurrency"
)

const (
	FOLLOWER = "follower"
	LEADER   = "leader"
)

var state string = FOLLOWER
var preState string = FOLLOWER

func compaign(election *concurrency.Election, val string) {
	// Campaign for leadership
	fmt.Println("start compaign!")
	if err := election.Campaign(context.Background(), val); err != nil {
		log.Fatal(err)
	}

	fmt.Println("Became leader!")
	preState = state
	state = LEADER

	// Hold leadership until a key press
	fmt.Println("Press Enter to release leadership...")
	fmt.Scanln()

	// Resign leadership
	if err := election.Resign(context.Background()); err != nil {
		log.Fatal(err)
	}

	//preState = state
	//state = FOLLOWER
	fmt.Println("Released leadership.")
}

func observe(election *concurrency.Election, val string) {
	ch := election.Observe(context.Background())

	for {
		select {
		case rsp, ok := <-ch:
			if !ok {
				fmt.Println("now I am follower")
				//election.Campaign(context.Background(), args[1])
				//重新开始观察
				go observe(election, val)
				return
			} else {
				fmt.Printf("leader now is:%s\n", string(rsp.Kvs[0].Value))
				if string(rsp.Kvs[0].Value) == val {
					fmt.Println("still be leader")
					preState = state
					state = LEADER
				} else {
					fmt.Println("now become follower")
					preState = state
					state = FOLLOWER
					if preState == LEADER {
						go compaign(election, val)
					}
				}
			}
		}
	}
}

func main() {
	// Connect to etcd

	args := os.Args

	client, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"ipdizhi"}, // Replace with your etcd endpoints
		DialTimeout: 5 * time.Second,
		Username:    "user",
		Password:    "password",
	})
	if err != nil {
		log.Fatal(err)
	}

	defer client.Close()

	// Key for leader election
	electionKey := "/my-election"

	// Create an election session
	session, err := concurrency.NewSession(client, concurrency.WithTTL(10))
	if err != nil {
		log.Fatal(err)
	}
	defer session.Close()

	election := concurrency.NewElection(session, electionKey)

	go compaign(election, args[1])
	go observe(election, args[1])
	for {

	}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/988773.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

切片机制和MR工作机制

InputFormat基类 TextInputFormat&#xff1a;TextInputFormat是默认的FileInputFormat实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量&#xff0c; LongWritable类型。 CombineTextInputFormat&#xff1a;CombineTextInputFormat用于小文件过多的场景…

什么是正向代理和反向代理

一、什么是正向代理 正向代理&#xff08;Forward Proxy&#xff09;是一种代理服务器&#xff0c;它位于客户端和服务端之间&#xff0c;代表客户端向其他服务器发送请求。 一般使用的场景就是&#xff0c;当客户端无法直接访问某些资源时&#xff0c;可以通过正向代理来访问…

QML实现文件十六进制数据展示

前言 将一个二进制文件直接拖放到Qt Creator中可以直接查看到以十六进制显示的数据格式&#xff0c;如&#xff1a; 要实现一个这样的效果&#xff0c;还是要花不少时间的。 在网上找了挺多示例&#xff0c;其中一个开源代码效果不错&#xff08;参考这里&#xff09;&#…

Linux:【Mysql】Centos7安装mysql8.0

目录 一、环境及版本介绍 二、安装前准备 三、开始安装 一、环境及版本介绍 Linux环境&#xff1a;Centos7 Mysql版本&#xff1a;8.0.26 安装时使用的用户&#xff1a;root 二、安装前准备 1.1、下载Centos7镜像 网上寻找相关资源即可 1.2、下载VMwareWorkstation Pro并…

cpolar内网穿透

目录 一、引言二、什么是cpolar三、内网穿透四、如何使用cpolar1、下载cpolar软件安装包2、注册cpolar账号3、使用cpolar 一、引言 当我们完成了一个tomcat的web项目之后&#xff0c;如果我们想让其他电脑访问到这个项目&#xff0c;我们可以让其他电脑和本机连接到同一个局域…

python如何学习

功能如此强大、高效的Python&#xff0c;却非常的简单好学&#xff0c;这让学它的同学爱不释手&#xff0c;也让越来越多的互联网企业开始用Python来做主要的开发语言&#xff0c;比如谷歌、Facebook&#xff08;现Meta&#xff09;、豆瓣、知乎等知名互联网公司都在使用Python…

idea2018修改大小写提示(敏感)信息

操作步骤如下&#xff1a; File > Settings > Editor > Code Completion > Code Completion&#xff08;默认是首字母&#xff0c;选为none将不区分大小写&#xff09;

花生壳内网穿透+Windows系统,如何搭建网站?

1. 准备工作 在百度搜索“Win7下安装ApachePHPMySQL”&#xff0c;根据搜到的教程自行安装WAMP环境。 如果在网页上键入http://127.0.0.1/ 出现以下页面表示您的服务器已经建好&#xff0c;下一步就是关键&#xff0c;如何通过花生壳内网穿透&#xff0c;让外网的用户访问到您…

1.4 空间中的曲线和曲面

空间中的曲线与曲面 知识点1 曲面方程定义 定义1 如果曲面 S 与方程F (x,y,z ) 0 有下述关系&#xff1a; &#xff08;1&#xff09; 曲面 S 上的任意点的坐标都满足此方程 &#xff08;2&#xff09;不在曲面S上的点的坐标不满足此方程 则F&#xff08;x,y,z&#xff0…

无涯教程-JavaScript - IMEXP函数

描述 IMEXP函数以x yi或x yj文本格式返回复数的指数。复数的指数为- $$e ^ {((x yi)} e ^ xe ^ {yi} e ^ x(\cos y i \sin y)$$ 语法 IMEXP (inumber)争论 Argument描述Required/OptionalInumberA complex number for which you want the exponential.Required Not…

第一章: Mysql体系结构和存储引擎

文章目录 1.1 定义数据库和实例1.2 Mysql体系结构1. 3 Mysql存储引擎1. 4 常见问题解答1.5 存储引擎相关操作语法1.6 连接Mysql 1.1 定义数据库和实例 数据库和实例的区别&#xff1f; 数据库是物理操作系统或其他形式文件的集合&#xff08;数据库是文件的集合&#xff0c;是依…

Numpy包常用科学计算方法总结

numpy包的计算性能是python原始方法计算性能的几十倍到几百倍 一、引入numpy包&#xff1a; import numpy as np 二、创建数组: #定义一个pyth…

微信分账报错1908(请求中含有未在API文档中定义的参数)

开发指引-分账 | 微信支付合作伙伴平台文档中心 问题描述&#xff1a;根据微信分账文档&#xff0c;在下单接口添加是否分账参数后&#xff0c;报错如下 Client error: POST https://api.mch.weixin.qq.com/v3/pay/partner/transactions/jsapi 400 Bad Request {"code…

golang教程 beego框架笔记一

安装beego 安装bee工具 beego文档 # windos 推荐使用 go install github.com/beego/bee/v2master go get -u github.com/beego/bee/v2masterwindows使用安装bee工具时碰到的问题&#xff1b; 环境配置都没有问题&#xff0c;但是执行官网的命令&#xff1a;go get -u github…

Cadence Allegro如何添加/生成测试点?

Allegro因其功能强大、界面灵活、可适应切换复杂项目的需求&#xff0c;很快成为全球最受欢迎的EDA软件之一&#xff0c;而很多工程师在Allegro软件中添加测试点&#xff0c;这样做的好处是为了进行电路的功能测试和故障诊断&#xff0c;那么如何在Allegro添加/生成测试点&…

【Git】Git 分支

Git 分支 1.分支简介 为了真正理解 Git 处理分支的方式&#xff0c;我们需要回顾一下 Git 是如何保存数据的。 或许你还记得 起步 的内容&#xff0c; Git 保存的不是文件的变化或者差异&#xff0c;而是一系列不同时刻的 快照 。 在进行提交操作时&#xff0c;Git 会保存一…

Python 网页爬虫原理及代理 IP 使用

目录 前言 一、Python 网页爬虫原理 二、Python 网页爬虫案例 步骤1&#xff1a;分析网页 步骤2&#xff1a;提取数据 步骤3&#xff1a;存储数据 三、使用代理 IP 四、总结 前言 随着互联网的发展&#xff0c;网络上的信息量变得越来越庞大。对于数据分析人员和研究人…

基于antd+vue2来实现一个简单的绘画流程图功能

简单流程图的实现&#xff08;基于antdvue2的&#xff09;代码很多哦~ 实现页面如下 1.简单操作如下 2.弹框中使用组件&#xff1a; <vfdref"vfd"style"background-color: white;":needShow"true":fieldNames"fieldNames"openUse…