golang分布式缓存项目 Day5 分布式节点

news2024/12/22 19:10:47

该项目原作者:https://geektutu.com/post/geecache-day1.html。本文旨在记录本人做该项目时的一些疑惑解答以及部分的测试样例以便于本人复习

1 流程回顾注:

在这里插入图片描述
我们在GeeCache 第二天 中描述了 geecache 的流程。在这之前已经实现了流程 ⑴ 和 ⑶,今天实现流程 ⑵,从远程节点获取缓存值。

我们进一步细化流程 ⑵:
在这里插入图片描述

2 抽象 PeerPicker

package geecache

// PeerPicker is the interface that must be implemented to locate
// the peer that owns a specific key.
type PeerPicker interface {
	PickPeer(key string) (peer PeerGetter, ok bool)
}

// PeerGetter is the interface that must be implemented by a peer.
type PeerGetter interface {
	Get(group string, key string) ([]byte, error)
}
  • 在这里,抽象出 2 个接口,PeerPicker 的 PickPeer() 方法用于根据传入的 key 选择相应节点
    PeerGetter。

  • 接口 PeerGetter 的 Get() 方法用于从对应 group 查找缓存值。PeerGetter 就对应于上述流程中的 HTTP客户端。

3 节点选择与 HTTP 客户端

在 GeeCache 第三天 中我们为 HTTPPool 实现了服务端功能,通信不仅需要服务端还需要客户端,因此,我们接下来要为 HTTPPool 实现客户端的功能。

首先创建具体的 HTTP 客户端类 httpGetter,实现 PeerGetter 接口。

type httpGetter struct {
	baseURL string
}

func (h *httpGetter) Get(group string, key string) ([]byte, error) {
	u := fmt.Sprintf(
		"%v%v/%v",
		h.baseURL,
		url.QueryEscape(group),
		url.QueryEscape(key),
	)
	res, err := http.Get(u)
	if err != nil {
		return nil, err
	}
	defer res.Body.Close()

	if res.StatusCode != http.StatusOK {
		return nil, fmt.Errorf("server returned: %v", res.Status)
	}

	bytes, err := ioutil.ReadAll(res.Body)
	if err != nil {
		return nil, fmt.Errorf("reading response body: %v", err)
	}

	return bytes, nil
}

var _ PeerGetter = (*httpGetter)(nil)
  • baseURL 表示将要访问的远程节点的地址,例如 http://example.com/_geecache/。
  • 使用 http.Get() 方式获取返回值,并转换为 []bytes 类型。

:var _ PeerGetter = (*httpGetter)(nil) 是为了实现PeerGetter 接口防止报错

第二步,为 HTTPPool 添加节点选择的功能。

const (
	defaultBasePath = "/_geecache/"
	defaultReplicas = 50
)
// HTTPPool implements PeerPicker for a pool of HTTP peers.
type HTTPPool struct {
	// this peer's base URL, e.g. "https://example.net:8000"
	self        string
	basePath    string
	mu          sync.Mutex // guards peers and httpGetters
	peers       *consistenthash.Map
	httpGetters map[string]*httpGetter // keyed by e.g. "http://10.0.0.2:8008"
}
  • 新增成员变量 peers,类型是一致性哈希算法的 Map,用来根据具体的 key 选择节点。
  • 新增成员变量 httpGetters,映射远程节点与对应的 httpGetter。每一个远程节点对应一个 httpGetter,因为 httpGetter 与远程节点的地址 baseURL 有关。

第三步,实现 PeerPicker 接口。

// Set updates the pool's list of peers.
func (p *HTTPPool) Set(peers ...string) {
	p.mu.Lock()
	defer p.mu.Unlock()
	p.peers = consistenthash.New(defaultReplicas, nil)
	p.peers.Add(peers...)
	p.httpGetters = make(map[string]*httpGetter, len(peers))
	for _, peer := range peers {
		p.httpGetters[peer] = &httpGetter{baseURL: peer + p.basePath}
	}
}

// PickPeer picks a peer according to key
func (p *HTTPPool) PickPeer(key string) (PeerGetter, bool) {
	p.mu.Lock()
	defer p.mu.Unlock()
	if peer := p.peers.Get(key); peer != "" && peer != p.self {
		p.Log("Pick peer %s", peer)
		return p.httpGetters[peer], true
	}
	return nil, false
}

var _ PeerPicker = (*HTTPPool)(nil)
  • Set() 方法实例化了一致性哈希算法,并且添加了传入的节点。
  • 并为每一个节点创建了一个 HTTP 客户端 httpGetter。
  • PickerPeer() 包装了一致性哈希算法的 Get() 方法,根据具体的 key,选择节点,返回节点对应的 HTTP 客户端。

至此,HTTPPool 既具备了提供 HTTP 服务的能力,也具备了根据具体的 key,创建 HTTP 客户端从远程节点获取缓存值的能力。

4 实现主流程

最后,我们需要将上述新增的功能集成在主流程(geecache.go)中。

// A Group is a cache namespace and associated data loaded spread over
type Group struct {
	name      string
	getter    Getter
	mainCache cache
	peers     PeerPicker
}

// RegisterPeers registers a PeerPicker for choosing remote peer
func (g *Group) RegisterPeers(peers PeerPicker) {
	if g.peers != nil {
		panic("RegisterPeerPicker called more than once")
	}
	g.peers = peers
}

func (g *Group) load(key string) (value ByteView, err error) {
	if g.peers != nil {
		if peer, ok := g.peers.PickPeer(key); ok {
			if value, err = g.getFromPeer(peer, key); err == nil {
				return value, nil
			}
			log.Println("[GeeCache] Failed to get from peer", err)
		}
	}

	return g.getLocally(key)
}

func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
	bytes, err := peer.Get(g.name, key)
	if err != nil {
		return ByteView{}, err
	}
	return ByteView{b: bytes}, nil
}
  • 新增 RegisterPeers() 方法,将 实现了 PeerPicker 接口的 HTTPPool 注入到 Group 中。
  • 新增 getFromPeer() 方法,使用实现了 PeerGetter 接口的 httpGetter 从访问远程节点,获取缓存值。
  • 修改 load 方法,使用 PickPeer() 方法选择节点,若非本机节点,则调用 getFromPeer() 从远程获取。若是本机节点或失败,则回退到 getLocally()。

5 main 函数测试。

var db = map[string]string{
	"Tom":  "630",
	"Jack": "589",
	"Sam":  "567",
}

func createGroup() *geecache.Group {
	return geecache.NewGroup("scores", 2<<10, geecache.GetterFunc(
		func(key string) ([]byte, error) {
			log.Println("[SlowDB] search key", key)
			if v, ok := db[key]; ok {
				return []byte(v), nil
			}
			return nil, fmt.Errorf("%s not exist", key)
		}))
}

func startCacheServer(addr string, addrs []string, gee *geecache.Group) {
	peers := geecache.NewHTTPPool(addr)
	peers.Set(addrs...)
	gee.RegisterPeers(peers)
	log.Println("geecache is running at", addr)
	log.Fatal(http.ListenAndServe(addr[7:], peers))
}

func startAPIServer(apiAddr string, gee *geecache.Group) {
	http.Handle("/api", http.HandlerFunc(
		func(w http.ResponseWriter, r *http.Request) {
			key := r.URL.Query().Get("key")
			view, err := gee.Get(key)
			if err != nil {
				http.Error(w, err.Error(), http.StatusInternalServerError)
				return
			}
			w.Header().Set("Content-Type", "application/octet-stream")
			w.Write(view.ByteSlice())

		}))
	log.Println("fontend server is running at", apiAddr)
	log.Fatal(http.ListenAndServe(apiAddr[7:], nil))

}

func main() {
	var port int
	var api bool
	flag.IntVar(&port, "port", 8001, "Geecache server port")
	flag.BoolVar(&api, "api", false, "Start a api server?")
	flag.Parse()

	apiAddr := "http://localhost:9999"
	addrMap := map[int]string{
		8001: "http://localhost:8001",
		8002: "http://localhost:8002",
		8003: "http://localhost:8003",
	}

	var addrs []string
	for _, v := range addrMap {
		addrs = append(addrs, v)
	}

	gee := createGroup()
	if api {
		go startAPIServer(apiAddr, gee)
	}
	startCacheServer(addrMap[port], []string(addrs), gee)
}

main 函数的代码比较多,但是逻辑是非常简单的。

  • startCacheServer() 用来启动缓存服务器:创建 HTTPPool,添加节点信息,注册到 gee 中,启动 HTTP服务(共3个端口,8001/8002/8003),用户不感知。
  • startAPIServer() 用来启动一个 API 服务(端口 9999),与用户进行交互,用户感知。
  • main() 函数需要命令行传入 port 和 api 2 个参数,用来在指定端口启动 HTTP 服务。

为了方便,我们将启动的命令封装为一个 shell 脚本:

#!/bin/bash
trap "rm server;kill 0" EXIT

go build -o server
./server -port=8001 &
./server -port=8002 &
./server -port=8003 -api=1 &

sleep 2
echo ">>> start test"
curl "http://localhost:9999/api?key=Tom" &
curl "http://localhost:9999/api?key=Tom" &
curl "http://localhost:9999/api?key=Tom" &

wait

运行结果

$ ./run.sh
2020/02/16 21:17:43 geecache is running at http://localhost:8001
2020/02/16 21:17:43 geecache is running at http://localhost:8002
2020/02/16 21:17:43 geecache is running at http://localhost:8003
2020/02/16 21:17:43 fontend server is running at http://localhost:9999
>>> start test
2020/02/16 21:17:45 [Server http://localhost:8003] Pick peer http://localhost:8001
2020/02/16 21:17:45 [Server http://localhost:8003] Pick peer http://localhost:8001
2020/02/16 21:17:45 [Server http://localhost:8003] Pick peer http://localhost:8001
...
630630630

此时,我们可以打开一个新的 shell,进行测试:

$ curl "http://localhost:9999/api?key=Tom"
630
$ curl "http://localhost:9999/api?key=kkk"
kkk not exist

测试的时候,我们并发了 3 个请求 ?key=Tom,从日志中可以看到,三次均选择了节点 8001,这是一致性哈希算法的功劳。但是有一个问题在于,同时向 8001 发起了 3 次请求。试想,假如有 10 万个在并发请求该数据呢?那就会向 8001 同时发起 10 万次请求,如果 8001 又同时向数据库发起 10 万次查询请求,很容易导致缓存被击穿。

三次请求的结果是一致的,对于相同的 key,能不能只向 8001 发起一次请求?这个问题下一次解决。

阶段性总结

概括性流程:

开始
startCacheServer
将多个服务器节点绑定到本地服务器gee上
http.ListenAndServe
peers 监听8001, 8002, 8003端口
并将端口监听到的请求交由peers绑定的ServeHTTP处理
go startAPIServer
创建与用户面对的api客户端
http.ListenAndServe 监听9999端口
http.Handle处理路径为/api的请求
发出http://localhost:9999/api?key=Tom的HTTP请求
startAPIServer函数中http.Handle方法处理
查找本地缓存
不存在则从远程节点上查找
找到与本地节点绑定的多个节点
并通过consistenthash算法找出这个请求应该被处理的节点
来到这个节点
用http.Get方法再发送一个路径为peer + p.basePath/_geecache/scores/Tom的请求
startCacheServer函数中的peers上实现的ServeHTTP方法处理
查找这个节点的缓存是否有所要找的值
没找到
再次用consistenthash算法计算所要找的节点
因为key没有变,所以找到了同一个节点,也就是这个节点本身
回到节点本地
调用getLocally方法
进行从数据库中查找的慢过程
并将从数据库中找出的键值对存储到这个节点的缓存中
END

PS:用mermaid画流程图的时候,用逗号要切换到英文输入法再输入,否则会导致输出不了图片!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2239813.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[产品管理-76]:延续是创新与颠覆式创新的比较

目录 一、概述 1、定义与特征 2、市场影响与竞争策略 3、实施难度与风险 4、案例分析 二、示例 1. 延续性创新示例 2. 创新示例 3. 颠覆式创新示例 一、概述 延续性创新与颠覆式创新是技术创新领域的两种重要策略&#xff0c;它们在多个方面存在显著差异。 以下是对…

【 AI写作鹅-注册安全分析报告-无验证方式导致安全隐患】

前言 由于网站注册入口容易被黑客攻击&#xff0c;存在如下安全问题&#xff1a; 1. 暴力破解密码&#xff0c;造成用户信息泄露 2. 短信盗刷的安全问题&#xff0c;影响业务及导致用户投诉 3. 带来经济损失&#xff0c;尤其是后付费客户&#xff0c;风险巨大&#xff0c;造…

AI:重塑电商行业的创新引擎,开启电商数字化转型新征程

&#x1f9d1; 博主简介&#xff1a;CSDN博客专家&#xff0c;历代文学网&#xff08;PC端可以访问&#xff1a;https://literature.sinhy.com/#/literature?__c1000&#xff0c;移动端可微信小程序搜索“历代文学”&#xff09;总架构师&#xff0c;15年工作经验&#xff0c;…

语义分割数据增强,图像和标签同步对应详细增强教程(附代码)

&#x1f4aa; 专业从事且热爱图像处理&#xff0c;图像处理专栏更新如下&#x1f447;&#xff1a; &#x1f4dd;《图像去噪》 &#x1f4dd;《超分辨率重建》 &#x1f4dd;《语义分割》 &#x1f4dd;《风格迁移》 &#x1f4dd;《目标检测》 &#x1f4dd;《图像增强》 &a…

pgsql和mysql的自增主键差异

1. 当有历史数据存在时&#xff0c; mysql的自增主键是默认从最大值自增。 pgsql的自增主键取初始值开始逐个尝试&#xff0c;所以存在可能与历史数据的主键重复的情况。 pgsql解决上述问题的方式&#xff1a;重设自增值。 SELECT SETVAL(t_db_filed_id_seq, (SELECT MAX(&q…

Spring Cloud Contract快速入门Demo

1.什么是Spring Cloud Contract &#xff1f; Spring Cloud Contract 是 Spring 提供的一套工具&#xff0c;用于帮助开发者通过契约&#xff08;Contract&#xff09;驱动的方式进行微服务的测试和集成。它主要解决微服务之间通信时&#xff0c;如何确保服务提供者和消费者之…

OceanBase JDBC (Java数据库连接)的概念、分类与兼容性

本章将介绍 OceanBase JDBC的 概念与分类&#xff0c;已帮助使用 JDBC 的用户及技术人员更好的 了解JDBC&#xff0c;以及 OceanBase JDBC在与 MySQL 及 Oracle 兼容性方面的相关能力。 一、JDBC 基础 1.1 JDBC 的概念 JDBC 一般指 Java 数据库连接。Java 数据库连接&#xf…

自定义springCloudLoadbalancer简述

概述 目前后端用的基本都是springCloud体系&#xff1b; 平时在dev环境开发时&#xff0c;会把自己的本地服务也注册上去&#xff0c;但是这样的话&#xff0c;在客户端调用时请求可能会打到自己本地&#xff0c;对客户端测试不太友好. 思路大致就是前端在请求头传入指定ip&a…

Go/Golang语言各种数据类型内存字节占用大小和最小值最大值

具体请前往&#xff1a;Go/golang语言基本数据类型字节大小和取值范围(最小值~最大值)

力扣104 : 二叉树最大深度

补&#xff1a;二叉树的最大深度 描述&#xff1a; 给定一个二叉树 root &#xff0c;返回其最大深度。二叉树的 最大深度 是指从根节点到最远叶子节点的最长路径上的节点数。 何解&#xff1f; 树一般常用递归&#xff1a;递到叶子节点开始倒着处理

Android GPU纹理数据拷贝

在 Android 开发中读取纹理数据有以下几种方法&#xff1a; glReadPixelsImageReaderPBO&#xff08;Pixel BufferObject&#xff09; HardwareBuffer 1. glReadPixels glReadPixels 是 OpenGL ES 的 API&#xff0c;通常用于从帧缓冲区中读取像素数据&#xff0c;OpenGL ES…

畅捷通T+ RecoverPassword.aspx

用友 畅捷通T RecoverPassword.aspx 存在未授权管理员密码修改漏洞&#xff0c;攻击者可以通过漏洞修改管理员账号密码登录后台 #漏洞影响版本 12.0&#xff0c;12.1.12.2.12.3 13.0 15.0 16.0 18.0 补丁号291都影响 19.0 补丁号167之前也有影响 对于老版本畅捷通已经…

最全最简单理解迭代器

1. 迭代器的基础概念(iterator) 1.1 本质 迭代器能够用来遍历容器的对象,与能够遍历数组的指针类似,是广义指针。 1.2 作用: 能够让迭代器与算法不干扰的相互发展,最后又能无间隙的粘合起来。重载了*,++,==,!=,=运算符。用以操作复杂的数据结构。容器提供迭代…

python数据写入excel文件

主要思路&#xff1a;数据 转DataFrame后写入excel文件 一、数据格式为字典形式1 k e &#xff0c; v [‘1’, ‘e’, 0.83, 437, 0.6, 0.8, 0.9, ‘好’] 1、这种方法使用了 from_dict 方法&#xff0c;指定了 orient‘index’ 表示使用字典的键作为行索引&#xff0c;然…

[CKS] Create/Read/Mount a Secret in K8S

最近准备花一周的时间准备CKS考试&#xff0c;在准备考试中发现有一个题目关于读取、创建以及挂载secret的题目。 ​ 专栏其他文章: [CKS] Create/Read/Mount a Secret in K8S-CSDN博客[CKS] Audit Log Policy-CSDN博客 -[CKS] 利用falco进行容器日志捕捉和安全监控-CSDN博客[C…

docker之容器设置开机自启(4)

命令语法&#xff1a; docker update --restartalways 容器ID/容器名 选项&#xff1a; --restart参数 no 默认策略&#xff0c;在容器退出时不重启容器 on-failure 在容器非正常退出时&#xff08;退出状态非0&#xff09;&#xff0c;才会重启容器 …

机器学习:决策树——ID3算法、C4.5算法、CART算法

决策树是一种常用于分类和回归问题的机器学习模型。它通过一系列的“决策”来对数据进行分类或预测。在决策树中&#xff0c;每个内部节点表示一个特征的测试&#xff0c;每个分支代表特征测试的结果&#xff0c;而每个叶节点则表示分类结果或回归值。 决策树工作原理 根节点&…

Angular 和 Vue2.0 对比

前言 &#xff1a;“业精于勤&#xff0c;荒于嬉&#xff1b;行成于思&#xff0c;毁于随” 很久没写博客了&#xff0c;大多记录少进一步探查。 Angular 和 Vue2.0 对比&#xff1a; 一.概念 1.1 Angular 框架&#xff1a; 是一款由谷歌开发的开源web前端框架&#xff08;核…

Python酷库之旅-第三方库Pandas(208)

目录 一、用法精讲 971、pandas.MultiIndex.set_levels方法 971-1、语法 971-2、参数 971-3、功能 971-4、返回值 971-5、说明 971-6、用法 971-6-1、数据准备 971-6-2、代码示例 971-6-3、结果输出 972、pandas.MultiIndex.from_arrays类方法 972-1、语法 972-2…

[Linux]:IO多路转接之epoll

1. IO 多路转接之epoll 1.1 epoll概述 epoll是Linux内核为处理大规模并发网络连接而设计的高效I/O多路转接技术。它基于事件驱动模型&#xff0c;通过在内核中维护一个事件表&#xff0c;能够快速响应多个文件描述符上的I/O事件&#xff0c;如可读、可写、异常等&#xff0c;…