cache教程 5.分布式节点的通信

news2025/1/23 12:59:10

0.对原教程的一些见解

其回顾完请求流程就是抽象了两个接口,PeerPicker和PeerGetter。这样操作,读者阅读时可能很难快速明白其含义,不好理解为什么就创建出两个接口,感觉会比较疑惑。原教程的评论中也有讨论这点。

 本教程就先不创建接口,而是使用struct方式,这样可能好理解点。

1.节点请求处理的流程

先弄清楚我们查询缓存的逻辑。

单节点: 

客户发送查询请求到节点A,该节点有缓存就立即返回,若是没有就执行用户设置的回调函数获取值并添加到缓存中,然后返回。

分布式节点:

客户端发送查询请求到某个缓存节点,该节点会判断该key是否在本地,若是不在本地,使用一致性哈希选择节点,若不是在远程节点,则就退回到本地节点处理;若在远程节点,该节点会发送请求去访问其他 node 节点。(不是客户端再去访问其他节点)

从这可以看出,一个node要处理两种请求,一个是来自客户端的外部请求,一个是来自其他远端节点的内部请求

为了清晰,划分职责,我们可以在一个node中启动两种HTTP服务,一个处理客户端请求(APIServer), 一个处理节点之间的请求(CacheServer)

2.HTTP客户端

之前我们为 HTTPPool 实现了服务端功能,通信不仅需要服务端还需要客户端,因此,我们接下来先实现客户端的功能。这个客户端是节点作为客户端去访问其他节点

  • baseURL 表示将要访问的远程节点的地址,例如 http://example.com/geecache/
type httpGetter struct {
	baseURL string
}

func (h *httpGetter) Get(group string, key string) ([]byte, error) {
	//QueryEscape 对字符串进行转义,以便可以将其安全地放置在 URL 查询中。
	u := fmt.Sprintf("%v/%v/%v", h.baseURL,
		url.QueryEscape(group),
		url.QueryEscape(key))

	res, err := http.Get(u)
	if err != nil {
		return nil, err
	}
	defer res.Body.Close()

	if res.StatusCode != http.StatusOK {
		return nil, fmt.Errorf("server returned: %v", res.Status)
	}

	bytes, err := io.ReadAll(res.Body)
	if err != nil {
		return nil, fmt.Errorf("reading response body: %v", err)
	}
	return bytes, nil
}

3.回顾上一章节实现的单节点的访问流程

func (g *Group) Get(key string) (ByteView, error) {
    //现在本地查询
	if v, ok := g.mainCache.get(key); ok {
		return v, nil
	}

	return g.load(key)   
}

func (g *Group) load(key string) (ByteView, error) {
	bytes, err := g.getter.Get(key)
	if err != nil {
		return ByteView{}, err
	}
	value := ByteView{b: cloneByte(bytes)}
	g.mainCache.add(key, value)
	return value, nil
}

那很明显是需要修改load方法,让其可以去访问远程节点。

在load方法中,伪代码如下。

func func (g *Group) load(key string) (ByteView, error){
	if 有远程节点 {
		if 找到key所在的远程节点 {
			本地作为客户端去访问该远程节点
		}
	}

	没有远程节点,只能在本地调用回调函数去源地方获取
}

要想在Group中访问节点,那么就要在Group中存储节点集合。

节点结合结构体Peers

那节点集合是不是又要创建一个结构体?那先试试创建一个结构体Peers。

因为 hash 环的 map 不是线程安全的,所以这里要加锁。

成员变量 httpGetters,映射远程节点与对应的 httpGetter。(httpGetter就是个客户端,是一个节点作为客户端),每一个远程节点对应一个 httpGetter,因为 httpGetter 与远程节点的地址 baseURL 有关,map的key是远程节点的地址,比如"http://localhost:10000"

type Peers struct {
	addr          string //这个是用于进行选择节点时用来判断是不是本地节点
	basePath      string
	mutex         sync.Mutex    //guards peersHashRing and httpGetters
	peersHashRing *consistenthash.HashRing
	httpGetters   map[string]*httpGetter
}

//这是HTTP服务端章节的HTTPPool,这是很相似的
type HTTPPool struct {
	addr     string
	basePath string
}

那么该结构体Peers就要有添加远程节点和通过key去获取远程节点的方法。

增添远程节点方法Set

通过该方法可以知道其map的key是远程节点的地址。

// 使用用例:Set("http://localhost:8001","http://localhost:8002")
func (p *Peers) Set(peers ...string) {
	p.mutex.Lock()
	defer p.mutex.Unlock()

	p.peersHashRing = consistenthash.NewHash(50, nil)
	p.peersHashRing.Add(peers...) //在 hash 环上添加真实节点和虚拟节点
	//存储远端节点信息
	p.httpGetters = make(map[string]*httpGetter)
	for _, peer := range peers {
		p.httpGetters[peer] = &httpGetter{baseURL: peer + p.basePath}
	}
}

通过key去获取远程节点的方法PickPeer

Peers结构体中的变量addr在这里派上用场了,返回的地址要是等于本身addr,那就返回false,不用自己作为客户端再去访问自己。

func (p *Peers) PickPeer(key string) (*httpGetter, bool) {
	p.mutex.Lock()
	defer p.mutex.Unlock()
	//这里返回的peer是个地址,可以查看(Peers).Set函数中的参数
	if peer := p.peersHashRing.Get(key); peer != "" && peer != p.addr {
		fmt.Println("pick peer ", peer)
		return p.httpGetters[peer], true
	}
	return &httpGetter{}, false
}

Peers这个结构体就实现了,可以看到其与HTTPPool是很相似的。对比HTTPPool,就是成员变量添加了一些,方法也添加了一些,也没有改变HTTPPool原有的逻辑,只是扩张了。所以可以把Peers的内容添加到HTTPPool中去,具体的代码就不在这里显示了。

type HTTPPool struct {
	addr     string
	basePath string

	//新添加的,把Peers内容增添到HTTPPool中
	mutex         sync.Mutex
	peersHashRing *consistenthash.HashRing
	httpGetters   map[string]*httpGetter
}

4.集成,实现主流程

最后,我们需要将上述新增的功能集成在主流程(geecache.go)中。

在Group结构体中有改变。

新增 RegisterPeers() 方法,将 peers 注入到 Group 中。

type Group struct {
	name      string
	mainCache cache
	getter    Getter

	peers *Peers //添加了节点集合
}

// 往分组内注册节点集合
func (g *Group) RegisterPeers(peers *Peers) {
	if g.peers != nil {
		panic("RegisterPeerPicker called more than once")
	}
	g.peers = peers
}

最终再回到load函数,这个函数是需要修改的。

func (g *Group) load(key string) (value ByteView, err error) {
	if g.peers != nil {    //有远程节点的情况
		if peer, ok := g.peers.PickPeer(key); ok {    //通过key找到该远程节点
			if value, err = g.getFromPeer(peer, key); err == nil {
				return value, nil        //找到值
			}
			log.Println("[GeeCache] Failed to get from peer", err)
		}
	}

	return g.getLocally(key)    //回到本地处理
}

func (g *Group) getFromPeer(peer *httpGetter, key string) (ByteView, error) {
	bytes, err := peer.Get(g.name, key)
	if err != nil {
		return ByteView{}, err
	}
	return ByteView{b: bytes}, nil
}

func (g *Group) getLocally(key string) (ByteView, error) {
	bytes, err := g.getter.Get(key)
	if err != nil {
		return ByteView{}, err
	}
	value := ByteView{b: cloneByte(bytes)}
	g.mainCache.add(key, value)
	return value, nil
}
  • 新增 getFromPeer() 方法,使用httpGetter 访问远程节点,获取缓存值。
  • 修改 load 方法,使用 PickPeer() 方法选择节点,若非本机节点,则调用 getFromPeer() 从远程获取。若是本机节点或失败,则回退到 getLocally()

5. 测试

总结——缓存节点启动的流程

  1. 创建 Group 对象.(用于存储我们的缓存数据)
  2. 启动缓存 http 服务.(创建 HTTPPool,添加节点信息,注册到缓存分组中)
  3. 启动 API 服务.(用于与客户端进行交互)

 测试代码:

var db = map[string]string{
	"Tom":  "630",
	"Jack": "589",
	"Sam":  "567",
}

func main() {
	var port int
	var api bool
	flag.IntVar(&port, "port", 8001, "Geecache server port")
	flag.BoolVar(&api, "api", false, "Start a api server?")
	flag.Parse()

	apiAddr := "http://localhost:9999"
	addrMap := map[int]string{
		8001: "http://localhost:8001",
		8002: "http://localhost:8002",
		8003: "http://localhost:8003",
	}

	var addrs []string
	for _, v := range addrMap {
		addrs = append(addrs, v)
	}

	gee := createGroup()
	if api {
		go startAPIServer(apiAddr, gee)
	}
	startCacheServer(addrMap[port], addrs, gee)
	time.Sleep(time.Second * 1000)
}

func createGroup() *cache.Group {
	return cache.NewGroup("scores", 2<<10, cache.GetterFunc(func(key string) ([]byte, error) {
		if v, ok := db[key]; ok {
			return []byte(v), nil
		}
		return nil, fmt.Errorf("%s not exit", key)
	}))
}

func startCacheServer(addr string, addrs []string, groups *cache.Group) {
	//HTTPPool是节点结合和HTTP服务端
	peers := cache.NewHTTPPool(addr, cache.DefaultBasePath)
	peers.Set(addrs...)         //添加节点
	groups.RegisterPeers(peers) //注册节点集合
	log.Println("geecache is running at", addr)
	http.ListenAndServe(addr[7:], peers)
}

func startAPIServer(apiAddr string, groups *cache.Group) {
	http.HandleFunc("/api", func(w http.ResponseWriter, r *http.Request) {
		key := r.URL.Query().Get("key")
		view, err := groups.Get(key)
		if err != nil {
			http.Error(w, err.Error(), http.StatusInternalServerError)

			return
		}
		w.Header().Set("Content-Type", "application/octet-stream")
		w.Write(view.ByteSlice())
	})

	log.Println("fontend server is running at", apiAddr)
	http.ListenAndServe(apiAddr[7:], nil)
}

为了方便,我们将启动的命令封装为一个 shell 脚本:

我们开启了三个节点(都是在同一个台机器上的,只是用不同端口来当做一个节点,进行区分)。

在端口8003的节点上开启APIServer,用户去访问时候,都是访问端口8003的那个节点。

#!/bin/bash

#trap 命令用于在 shell 脚本退出时,删掉临时文件,结束在该shell脚本运行的后台程序
trap "rm server;kill 0" EXIT

go build -o server
./server -port=8001 &
./server -port=8002 &
./server -port=8003 -api=1 &

sleep 2
echo ">>> start test"
curl "http://localhost:9999/api?key=Tom" &
curl "http://localhost:9999/api?key=Tom" &
curl "http://localhost:9999/api?key=Tom" &

wait

结果

测试的时候,我们并发了 3 个请求 ?key=Tom,从日志中可以看到,三次均选择了节点 8001,这是一致性哈希算法的功劳。

但是会有一个问题,同时向 8001 发起了 3 次请求。试想,假如有 10 万个在并发请求该数据呢?那就会向 8001 同时发起 10 万次请求,如果 8001 又同时向数据库发起 10 万次查询请求,很容易导致缓存被击穿。

三次请求的结果是一致的,对于相同的 key,能不能只向 8001 发起一次请求?这个问题下一次解决。

6.多节点的访问流程图

完整代码:https://github.com/liwook/Go-projects/tree/main/go-cache/5-multi-nodes

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1303328.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

docker mysql8 设置不区分大小写

docker安装Mysql8.0的坑之lower_case_table_names_docker mysql lower_case_table_names-CSDN博客https://blog.csdn.net/p793049488/article/details/108365929 docker run ‐di ‐‐nametensquare_mysql ‐p 33306:3306 ‐e MYSQL_ROOT_PASSWORD123456 mysql

node.js express JWT token生成与校验

目录 JWT header&#xff08;标头&#xff09; payload&#xff08;有效负载&#xff09; signature&#xff08;签名&#xff09; 访问令牌&#xff08;token&#xff09; express jwt生成、验证 生成jwt 验证jwt JWT JWT 是轻量级的数据交换格式&#xff0c;相对于传…

微服务-理论 分布式事务

一、分布式事务理论模型 分布式事务问题也叫分布式数据一致性问题&#xff0c;简单来说就是如何在分布式场景中保证多个节点数据的一致性。分布式事务产生的核心原因在于存储资源的分布性&#xff0c;比如多个数据库&#xff0c;或者MySQL和Redis两种不同存储设备的数据一致性…

R语言,table()函数实现统计每个元素出现的频数+并将最终统计频数结果转换成dataframe数据框形式

在 R中&#xff0c;要统计dataframe数据框中每个元素出现的频数&#xff0c;可以使用table()函数。以下是一个示例&#xff1a; 目录 一、创建数据 二、统计第一列每个元素出现的频数 三、统计第二列每个元素出现的频数 四、将频数结果转换为数据框&#xff0c;并改列名 一…

【OPNEGIS】Geoserver原地升级jetty,解决Apache HTTP/2拒绝服务漏洞 (CVE-2023-44487)

Geoserver是我们常用的地图服务器&#xff0c;在开源系统中的应用比较广泛。在实际环境中&#xff0c;我们可能会选用官方的二进制安装包进行部署&#xff0c;这样只要服务器上有java环境就可以运行&#xff0c;方便在现场进行部署。 1.问题来源 这次由于甲方一月一次的漏洞扫…

开源框架Apache NiFi调研

开源框架Apache NiFi调研 NiFi背景介绍一、什么是NiFi1.1 Apache NiFi特点&#xff1a;流管理、易用性、安全性、可扩展的体系结构和灵活的伸缩模型。1.2 Apache NiFi特性1.2 Apache NiFi核心概念1.3架构 二、NiFi的诞生&#xff0c;要致力于解决的问题有哪些&#xff1f;三、为…

day01、什么是数据库系统?

数据库系统介绍 1.实例化与抽象化数据库系统2.从用户角度看数据库管理系统的功能2.1 数据库定义功能2.2 数据库操纵2.3 数据库控制2.4 数据库维护功能2.5 数据库语言与高级语言 3.从系统&#xff1a;数据库管理系统应具有什么功能 来源于战德臣的B站网课 1.实例化与抽象化数据库…

React Native android环境搭建,使用夜神模拟器进行开发(适用于0.73+版本)

前言 本文基于&#xff1a;“react-native” : “^0.73.0” 1.安装 Node Node.js&#xff0c;下载时选择 > 18 版本 2.下载并安装 JDK Java SE Development Kit (JDK)&#xff0c;下载时选择 17 版本 安装 验证是否安装成功 打开命令提示符输入 javac -version 回车 3.…

【打印机如何设置只打印黑色】

目录 1. Window X 打开&#xff0c;选择“设置” 2. 选择“打印机和扫描仪” 3. 选择对应的“打印机” 4. 选择“打印首选项” 5. 选择“页设置”&#xff0c;并选择“打印选项” 6. 用于打印的墨水&#xff0c;改为“仅黑色” 7. 点击“确定”&#xff0c;关闭即可 1. Wi…

智慧储能数字孪生:能源未来的智慧引擎

随着社会对清洁能源的需求不断增加&#xff0c;智能储能技术成为能源转型的关键驱动力。在这一领域中&#xff0c;数字孪生技术的应用为智慧储能带来了全新的可能性。数字孪生是指数字化、实时、可视化的模拟系统&#xff0c;通过复制现实世界中的对象或过程&#xff0c;为智能…

基于查表法的水流量算法设计与实现

写在前面 本文分享的是一种基于查表法的水流量的算法方案设计与实现&#xff0c;算法简单易懂&#xff0c;主要面向初学者&#xff0c;有两个目的&#xff1a;一是给初学者一些算法设计的思路引导&#xff1b;二是引导初学者学习怎样用C语言编程实现。 一、设计需求 基于“19…

nodejs微信小程序+python+PHP个性化服装搭配系统APP-计算机毕业设计推荐 android

目 录 摘 要 I ABSTRACT II 目 录 II 第1章 绪论 1 1.1背景及意义 1 1.2 国内外研究概况 1 1.3 研究的内容 1 第2章 相关技术 3 2.1 nodejs简介 4 2.2 express框架介绍 6 2.4 MySQL数据库 4 第3章 系统分析 5 3.1 需求分析 5 3.2 系统可行性分析 5 3.2.1技术可行性&#xff1a;…

网络安全——SSH密码攻击实验

一、实验目的要求&#xff1a; 二、实验设备与环境&#xff1a; 三、实验原理&#xff1a; 四、实验步骤&#xff1a;​ 五、实验现象、结果记录及整理&#xff1a; 六、分析讨论与思考题解答&#xff1a; 一、实验目的要求&#xff1a; 1、了解SSH密码攻击、FTP密码攻击…

【小白专用】MySQL查询数据库所有表名及表结构其注释

一、先了解下INFORMATION_SCHEMA 1、在MySQL中&#xff0c;把INFORMATION_SCHEMA看作是一个数据库&#xff0c;确切说是信息数据库。其中保存着关于MySQL服务器所维护的所有其他数据库的信息。如数据库名&#xff0c;数据库的表&#xff0c;表栏的数据类型与访问权 限等。在INF…

YOLOv8改进 | 2023主干篇 | EfficientViT替换Backbone(高效的视觉变换网络)

一、本文介绍 本文给大家带来的改进机制是EfficientViT&#xff08;高效的视觉变换网络&#xff09;&#xff0c;EfficientViT的核心是一种轻量级的多尺度线性注意力模块&#xff0c;能够在只使用硬件高效操作的情况下实现全局感受野和多尺度学习。本文带来是2023年的最新版本…

无线且列窄图片如何转excel?

写此文原因&#xff1a;图片要转excel&#xff0c;这放以前&#xff0c;是不能实现的功能&#xff0c;但随着人工智能的蓬勃发展&#xff0c;人们已克服了这一难题&#xff0c;但是&#xff0c;我们知道&#xff0c;要将图片识别成excel&#xff0c;识别程序首先要先识别图片中…

Django讲课笔记02:Django环境搭建

文章目录 一、学习目标二、相关概念&#xff08;一&#xff09;Python&#xff08;二&#xff09;Django 三、环境搭建&#xff08;一&#xff09;安装Python1. 从官方网站下载最新版本的Python2. 运行安装程序并按照安装向导进行操作3. 勾选添加到路径复选框4. 完成安装过程5.…

分布式之raft一致性算法

1.CAP定理 在一个分布式系统中&#xff0c;CAP三者不可兼得&#xff0c;最多只有两者可以满足&#xff0c;正所谓鱼和熊掌不可兼得 一致性 Consistency&#xff1a;所有的节点在同一时间的数据一致可用性 Availability&#xff1a;服务在正常响应时间内可用分区容错性 Partit…

〖大前端 - 基础入门三大核心之JS篇(51)〗- 面向对象之认识上下文与上下文规则

说明&#xff1a;该文属于 大前端全栈架构白宝书专栏&#xff0c;目前阶段免费&#xff0c;如需要项目实战或者是体系化资源&#xff0c;文末名片加V&#xff01;作者&#xff1a;哈哥撩编程&#xff0c;十余年工作经验, 从事过全栈研发、产品经理等工作&#xff0c;目前在公司…

UnoCSS 原子化开发初体验

UnoCSS 是一个即时的原子化 CSS 引擎&#xff0c;旨在灵活和可扩展。核心是不拘一格的&#xff0c;所有的 CSS 工具类都是通过预设提供的。再也不用为了取一个 classname 类名而烦恼了。 一、UnoCSS 特点 完全可定制&#xff1a;无核心工具&#xff0c;所有功能都通过预设提供…