Golang 搭建 WebSocket 应用(八) - 完整代码

news2025/1/11 17:49:43

本文应该是本系列文章最后一篇了,前面留下的一些坑可能后面会再补充一下,但不在本系列文章中了。

整体架构

再来回顾一下我们的整体架构:

在这里插入图片描述

在我们的 demo 中,包含了以下几种角色:

  1. 客户端:一般是浏览器,用于接收消息;
  2. Hub:消息中心,用于管理所有的客户端连接,以及将消息推送给客户端;
  3. 调用 /send 发送消息的应用:用于将消息发送给 Hub,然后由 Hub 将消息推送给客户端。

然后,每一个 WebSocket 连接都有一个关联的读协程和写协程,
用于读取客户端发送的消息,以及将消息推送给客户端。

目录结构

├── LICENSE  // 协议
├── Makefile // 一些常用的命令
├── README.md
├── authenticator.go      // 认证器
├── authenticator_test.go // 认证器测试
├── bytes.go // 字符串和 []byte 之间转换的辅助方法
├── client.go // WebSocket 客户端
├── go.mod    // 项目依赖
├── go.sum    // 项目依赖
├── hub.go    // 消息中心
├── main.go   // 程序入口
├── message   // 消息记录器
│   ├── db_logger.go
│   ├── db_logger_test.go
│   ├── log.go
│   └── stdout_logger.go
├── server.go // HTTP 服务
└── server_test.go // HTTP 接口的测试

运行

注:需要 Go 1.20 或以上版本

  1. 下载依赖:

可以使用七牛云的代理加速下载。

go mod tidy
  1. 启动 WebSocket 服务端:
go run main.go

Hub 代码

最终,我们的 Hub 代码演进成了下面这样:

// bufferSize 通道缓冲区、map 初始化大小
const bufferSize = 128

// Handler 错误处理函数
type Handler func(log message.Log, err error)

// Hub 维护了所有的客户端连接
type Hub struct {
	// 注册请求
	register chan *Client
	// 取消注册请求
	unregister chan *Client
	// 记录 uid 跟 client 的对应关系
	userClients map[string]*Client
	// 互斥锁,保护 userClients 以及 clients 的读写
	sync.RWMutex
	// 消息记录器
	messageLogger message.Logger
	// 错误处理器
	errorHandler Handler
	// 验证器
	authenticator Authenticator
	// 等待发送的消息数量
	pending atomic.Int64
}

// 默认的错误处理器
func defaultErrorHandler(log message.Log, err error) {
	res, _ := json.Marshal(log)
	fmt.Printf("send message: %s, error: %s\n", string(res), err.Error())
}

func newHub() *Hub {
	return &Hub{
		register:      make(chan *Client),
		unregister:    make(chan *Client),
		userClients:   make(map[string]*Client, bufferSize),
		RWMutex:       sync.RWMutex{},
		messageLogger: &message.StdoutMessageLogger{},
		errorHandler:  defaultErrorHandler,
		authenticator: &JWTAuthenticator{},
	}
}

// 注册、取消注册请求处理
func (h *Hub) run() {
	for {
		select {
		case client := <-h.register:
			h.Lock()
			h.userClients[client.uid] = client
			h.Unlock()
		case client := <-h.unregister:
			h.Lock()
			close(client.send)
			delete(h.userClients, client.uid)
			h.Unlock()
		}
	}
}

// 返回 Hub 的当前的关键指标
func metrics(hub *Hub, w http.ResponseWriter) {
	pending := hub.pending.Load()
	connections := len(hub.userClients)
	_, _ = w.Write([]byte(fmt.Sprintf("# HELP connections 连接数\n# TYPE connections gauge\nconnections %d\n", connections)))
	_, _ = w.Write([]byte(fmt.Sprintf("# HELP pending 等待发送的消息数量\n# TYPE pending gauge\npending %d\n", pending)))
}

其中:

  • Hub 中的 registerunregister 通道用于处理客户端的注册和取消注册请求;
  • Hub 中的 userClients 用于记录 uidClient 的对应关系;
  • Hub 中的 messageLogger 用于记录消息;
  • Hub 中的 errorHandler 用于处理错误;
  • Hub 中的 authenticator 用于验证客户端的身份;
  • Hub 中的 pending 用于记录等待发送的消息数量。

目前实现存在的问题:

  • registerunregister 通道被消费的时候需要加锁,这样会导致 registerunregister 变成串行的,性能不好;
  • userClients 也是需要加锁的,这样会导致 userClients 的读写也是串行的,性能不好;

对于这两个问题,前面我们讨论过,一种可行的办法分段 map,然后对每一个 map 都有一个对应的 sync.Mutex 互斥锁来保证其读写的安全。

Client 代码

Client 比较关键的方法是:

  • writePump:负责将消息推送给客户端。
  • serveWs:处理 WebSocket 连接请求。
  • send:处理消息发送请求。

writePump

这个方法会从 send 通道中获取消息,然后推送给客户端。
推送失败会调用 errorHandler 处理错误。
推送成功会将 pending 减一。

// writePump 负责推送消息给 WebSocket 客户端
//
// 该方法在一个独立的协程中运行,我们保证了每个连接只有一个 writer。
// Client 会从 send 请求中获取消息,然后在这个方法中推送给客户端。
func (c *Client) writePump() {
	defer func() {
		_ = c.conn.Close()
	}()

	// 从 send 通道中获取消息,然后推送给客户端
	for {
		messageLog, ok := <-c.send

		// 设置写超时时间
		_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
		// c.send 这个通道已经被关闭了
		if !ok {
			c.hub.pending.Add(int64(-1 * len(c.send)))
			return
		}

		if err := c.conn.WriteMessage(websocket.TextMessage, StringToBytes(messageLog.Message)); err != nil {
			c.hub.errorHandler(messageLog, err)
			c.hub.pending.Add(int64(-1 * len(c.send)))
			return
		}

		c.hub.pending.Add(int64(-1))
	}
}

serveWs

serveWs 方法会处理 WebSocket 连接请求,然后将其注册到 Hub 中。
在连接的时候会对客户端进行认证,认证失败会断开连接。
最后会启动读写协程。

// serveWs 处理 WebSocket 连接请求
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
	// 升级为 WebSocket 连接
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		w.WriteHeader(http.StatusBadRequest)
		_, _ = w.Write([]byte(fmt.Sprintf("upgrade error: %s", err.Error())))
		return
	}

	// 认证失败的时候,返回错误信息,并断开连接
	uid, err := hub.authenticator.Authenticate(r)
	if err != nil {
		_ = conn.SetWriteDeadline(time.Now().Add(time.Second))
		_ = conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf("authenticate error: %s", err.Error())))
		_ = conn.Close()
		return
	}

	// 注册 Client
	client := &Client{hub: hub, conn: conn, send: make(chan message.Log, bufferSize), uid: uid}
	client.conn.SetCloseHandler(closeHandler)
	// register 无缓冲,下面这一行会阻塞,直到 hub.run 中的 <-h.register 语句执行
	// 这样可以保证 register 成功之后才会启动读写协程
	client.hub.register <- client

	// 启动读写协程
	go client.writePump()
	go client.readPump()
}

send

send 是一个 http 接口,用于处理消息发送请求。
它会从 Hub 中获取 uid 对应的 Client,然后将消息发送给客户端。

// send 处理消息发送请求
func send(hub *Hub, w http.ResponseWriter, r *http.Request) {
	uid := r.FormValue("uid")
	if uid == "" {
		w.WriteHeader(http.StatusBadRequest)
		_, _ = w.Write([]byte("uid is required"))
		return
	}

	// 从 hub 中获取 uid 关联的 client
	hub.RLock()
	client, ok := hub.userClients[uid]
	hub.RUnlock()
	if !ok {
		w.WriteHeader(http.StatusBadRequest)
		_, _ = w.Write([]byte(fmt.Sprintf("client not found: %s", uid)))
		return
	}

	// 记录消息
	messageLog := message.Log{Uid: uid, Message: r.FormValue("message")}
	_ = hub.messageLogger.Log(messageLog)

	// 发送消息
	client.send <- messageLog

	// 增加等待发送的消息数量
	hub.pending.Add(int64(1))
}

github

完整代码可以在 github 上进行查看:https://github.com/eleven26/go-pusher

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1400049.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MyBatis 系列:MyBatis 源码环境搭建

文章目录 一、环境准备二、下载 MyBatis 源码和 MyBatis-Parent 源码三、创建空项目、导入项目四、编译 mybatis-parent五、编译 mybatis六、测试总结 一、环境准备 jdk&#xff1a;17 maven&#xff1a;3.9.5 二、下载 MyBatis 源码和 MyBatis-Parent 源码 Mybatis&#x…

【Linux】进程间通信——system V 共享内存、消息队列、信号量

需要云服务器等云产品来学习Linux的同学可以移步/–>腾讯云<–/官网&#xff0c;轻量型云服务器低至112元/年&#xff0c;优惠多多。&#xff08;联系我有折扣哦&#xff09; 文章目录 写在前面1. 共享内存1.1 共享内存的概念1.2 共享内存的原理1.3 共享内存的使用1.3.1 …

在Java中调企微机器人发送消息到群里

目录 如何使用群机器人 消息类型及数据格式 文本类型 markdown类型 图片类型 图文类型 文件类型 模版卡片类型 文本通知模版卡片 图文展示模版卡片 消息发送频率限制 文件上传接口 Java 执行语句 String url "webhook的Url"; String result HttpReque…

二叉树的直径(LeetCode 543)

文章目录 1.问题描述2.难度等级3.热门指数4.解题思路参考文献 1.问题描述 给你一棵二叉树的根节点&#xff0c;返回该树的直径 。 二叉树的 直径 是指树中任意两个节点之间最长路径的长度 。这条路径可能经过也可能不经过根节点 root 。 两节点之间路径的长度由它们之间边数…

Odrive 学习系列四:如何使用脚本自动初始化odrive配置

一、背景: 在学习markbase的教程后,发现odrive的初始化配置命令确实有点多。尽管odrive有自动补全: 且可以通过 ctrl + → 来快速补全: 但是对初学者而言,仍旧有比较大的工作量。 而针对于此,我们可以通过powershell脚本的方式来解决这个问题。 二、设计初始化…

继电器开关电路图大全

继电器是一种电控制器件&#xff0c;是当输入量&#xff08;激励量&#xff09;的变化达到规定要求时&#xff0c;在电气输出电路中使被控量发生预定的阶跃变化的一种电器。它具有控制系统&#xff08;又称输入回路&#xff09;和被控制系统&#xff08;又称输出回路&#xff0…

Siemens-NXUG二次开发-导入与导出(可移除参数)prt文件[Python UF][20240121]

Siemens-NXUG二次开发-导入与导出&#xff08;可移除参数&#xff09;prt文件[Python UF][20240121] 1.python uf函数1.1 NXOpen.UF.Part.Import1.2 NXOpen.UF.Part.ImportPartModes1.3 NXOpen.UF.Group.AskGroupData1.4 NXOpen.UF.Obj.AskTypeAndSubtype1.5 NXOpen.UF.Part.Ex…

Frenet坐标系下动态街道场景的最优轨迹生成

0 前言 有两个主要算法已经在实际中使用&#xff1a; &#xff08;1&#xff09;大多数研究组采用插值来解决规划问题&#xff0c;如奥迪、斯坦福最近演示中使用了回旋曲线&#xff0c;贝塞尔以及多项式曲线也被其他研究组使用。主要原因是在结构化环境中增强映射可以提供所需…

第十一站:多态练习ODU

实现动态切换 ODU.h #pragma once #include <iostream> using namespace std; #define ODU_TYPE_311_FLAG "311" #define ODU_TYPE_335_FLAG "335" enum class ODU_TYPE {ODU_TYPE_311,ODU_TYPE_335,ODU_TYPE_UNKNOW };class ODU{ public:ODU();//发…

Jan AI本地运行揭秘:首次体验,尝鲜科技前沿

&#x1f31f;&#x1f30c; 欢迎来到知识与创意的殿堂 — 远见阁小民的世界&#xff01;&#x1f680; &#x1f31f;&#x1f9ed; 在这里&#xff0c;我们一起探索技术的奥秘&#xff0c;一起在知识的海洋中遨游。 &#x1f31f;&#x1f9ed; 在这里&#xff0c;每个错误都…

带POE网络变压器与2.5G/5G/10G网络变压器产品特点介绍

Hqst华轩盛(石门盈盛)电子导读&#xff1a;一起来了解带POE网络变压器与2.5G/5G/10G网络变压器产品特点&#xff1f; 一﹑带POE网络变压器与2.5G/5G/10G网络变压器产品特点介绍 首先、POE网络变压器产品与常规不带POE产品的区别&#xff1a; 带POE网络变压器主要要求是耐电流等…

按键检测知识

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、pandas是什么&#xff1f;二、使用步骤 1.引入库2.读入数据总结 前言 提示&#xff1a;这里可以添加本文要记录的大概内容&#xff1a; 例如&#xff1a;…

frida https抓包

web端导入证书、https代理即可解决大部分需求&#xff0c;但是&#xff0c;有些app需要处理ssl pinning验证。 废话不多说。frida处理ssl pin的步骤大体如下。 安装python3.x,并在python环境中安装frida&#xff1a; pip install frida pip install frida-tools下载frida-se…

C#,入门教程(22)——函数的基础知识

上一篇&#xff1a; C#&#xff0c;入门教程(21)——命名空间&#xff08;namespace&#xff09;与程序结构的基础知识https://blog.csdn.net/beijinghorn/article/details/124140653 一、函数的基本概念 一个软件的结构大体如下&#xff1a; 大厦application: a plaza { --…

分布式锁实现(mysql,以及redis)以及分布式的概念

道生一&#xff0c;一生二&#xff0c;二生三&#xff0c;三生万物 我旁边的一位老哥跟我说&#xff0c;你知道分布式是是用来干什么的嘛&#xff1f;一句话给我干懵了&#xff0c;我能隐含知道&#xff0c;大概是用来做分压处理的&#xff0c;并增加系统稳定性的。但是具体如…

最优传输学习及问题总结

文章目录 参考内容lam0.1lam3lam10lam50lam100lam300画图线性规划matlabpython代码 参考内容 https://blog.csdn.net/qq_41129489/article/details/128830589 https://zhuanlan.zhihu.com/p/542379144 我主要想强调的是这个例子的解法存在的一些细节问题 lam0.1 lam 0.1P,…

《WebKit 技术内幕》之五(3): HTML解释器和DOM 模型

3 DOM的事件机制 基于 WebKit 的浏览器事件处理过程&#xff1a;首先检测事件发生处的元素有无监听者&#xff0c;如果网页的相关节点注册了事件的监听者则浏览器会将事件派发给 WebKit 内核来处理。另外浏览器可能也需要处理这样的事件&#xff08;浏览器对于有些事件必须响应…

D - Left Right Operation

思路&#xff1a; 1、求前缀和 2、从后往前遍历&#xff0c;把某个后缀都变为R&#xff0c;记录最多让数组和减小多少 3、从前往后遍历&#xff0c;把某个前缀都变为L&#xff0c;记录最小答案&#xff08;前i个变为L&#xff0c;后面的n-i个数让减小最多的后缀变为R&#x…

项目管理十大知识领域之项目采购管理

一、项目采购管理的定义与概述 项目采购管理是指在项目实施过程中&#xff0c;对相关产品、服务或工程进行采购的管理活动。其概述包括确定采购需求、制定采购计划、供应商选择、合同签订、供应管理和结算支付等环节。项目采购管理的定义还涉及对采购目标的明确界定&#xff0…

[小程序]使用代码渲染页面

一、条件渲染 1.单个控制 使用wx:if"{{条件}}"来判断是否需要渲染这段代码&#xff0c;同时可以结合wx:elif和wx:else来判断 <view wx:if"{{type0}}">0</view> <view wx:elif"{{type1}}">1</view> <view wx:else>…