go 实现websocket以及详细设计流程过程,确保通俗易懂

news2025/1/22 19:11:03

websocket简介:

WebSocket 是一种网络传输协议,可在单个 TCP 连接上进行全双工通信,位于 OSI 模型的应用层。WebSocket 协议在 2011 年由 IETF 标准化为 RFC 6455,后由 RFC 7936 补充规范。

WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就可以创建持久性的连接,并进行双向数据传输。

理解各种协议和通信层、套接字的含义

IP:网络层协议;(高速公路)

TCP和UDP:传输层协议;(卡车)

HTTP:应用层协议;(货物)。HTTP(超文本传输协议)是建立在TCP协议之上的一种应用。HTTP连接最显著的特点是客户端发送的每次请求都需要服务器回送响应,在请求结束后,会主动释放连接。从建立连接到关闭连接的过程称为“一次连接”。

SOCKET:套接字,TCP/IP网络的API。(港口码头/车站)Socket是应用层与TCP/IP协议族通信的中间软件抽象层,它是一组接口。socket是在应用层和传输层之间的一个抽象层,它把TCP/IP层复杂的操作抽象为几个简单的接口供应用层调用已实现进程在网络中通信。

Websocket:同HTTP一样也是应用层的协议,但是它是一种双向通信协议,是建立在TCP之上的,解决了服务器与客户端全双工通信的问题,包含两部分:一部分是“握手”,一部分是“数据传输”。握手成功后,数据就直接从 TCP 通道传输,与 HTTP 无关了。

*注:什么是单工、半双工、全工通信?

数据只能单向传送为单工;
数据能双向传送但不能同时双向传送称为半双工;
数据能够同时双向传送则称为全双工。

上面是简单的接受了websocket情况以及和其他协议的区别及联系,在做之前,还是要了解下这块,对后期实战有帮助。

websocket开源选择:

在go语言中,websocket组件比较多的,可以到go仓库搜索下:

今天以仓库使用最多一个开源框架(gorilla)进行实战落地,以及讲解整个过程细节

websocket (github.com/gorilla/websocket)

上面介绍完整体开源情况。

业务背景:

      1、数据实时推送到前端进行图形化显示

      2、报警数据需要实时推送各端,例如web、cliet、安卓、ios等等其他客户端

      我前几年一直从事的是java,可以看看我的播客,基本上与java有关,go也是最近两三周学习的,我其他播客有具体说明,因为公司业务需要,所有就简单的学了一下go语言,作为刚接触go不久的技术人员,如何面对新技术进行探索和落地的,大家可以跟着我的思路进行学习模仿,这样后期学习稍微比较快一些。

 实战:    

        1、在实战之前,我们先看看官网使用说明:

             

这是最简单的,方式,没有其他业务,我们如何进行最佳实现呢?还是要看官网:

传送门:websocket/examples/chat at main · gorilla/websocket · GitHub

具体源码不说了,但是里面有几个重要点已经出现了,也是我们要学习的思想,这也是为什么要看开源代码的原因,要学习他们的思想和编码技巧。

稍微解释下Hub结构体里的字段:

            

   // Registered clients. 
 // 这是保存客户端连接的信息,map,从这里可以看出来,所有的客户端都要保存到这里,这时可以想到,后期保存到redis里,从这里进行扩展即可。
	clients map[*Client]bool

	// Inbound messages from the clients. 
 // 这个就是广播数据了,但是demo了用了字节链,目的是了并行执行,提高效率,字节目的是接收所有情况的数据
	broadcast chan []byte

	// Register requests from the clients.
// 这个比较好理解了,是新客户端进行连接时触发的链条,为什么走链,也是为了并行执行,也就是异步执行
	register chan *Client

	// Unregister requests from clients. 
//这个就是取消注册,也就是关闭客户端连接
	unregister chan *Client

其实这几个字段已经把我们的框架整体搭建起来了,clients负责存储客户端,broadcast负责服务端发送给客户端数据的,register负责用来监听新建连接,unregister负责关闭客户端连接的

其实websocket也就是干这个事情的,例如websocket服务端收集所有客户端,然后根据需要进行发送消息给客户,然后就是关闭,大致流程就是这样的。

在看客户端怎么进行封装的:

这个就是针对上面的Hub进行组装结构体实例的,这里就不介绍了,本次实战也是根据这个来的,大家看懂这个基本上后续其他开源的websocket都没啥大问题。

接下来真正进行项目实战:

1、第一步创建websocket的客户端管理结构体:

// clientManager
// @Description: 客户端管理者
type clientManager struct {
	//客户端存储的地方,我这边是用map进行存储,这块可以放到redis上,也是可以的,根据情况扩展即可
	//这里可以使用map[string]*client,也是可以的,如果这样设计,方便后期进行匹配比较简单,直接匹配key即可,这种方式也是可以的,匹配客户端对象里的key
	clients map[*client]bool
	//广播数据链,进行业务限制,如果没有业务限制,直接使用[]byte 比较通用;用chan进行异步处理
	broadcast chan model.BusinessDataWrapper
	//客户端注册链;用chan进行异步处理
	register chan *client
	//客户端关闭链;用chan进行异步处理
	unregister chan *client
}

结构体首字母小写,目的不用暴露出去,用于内部使用即可;每个字段不解释了,上面注释已经写好了,其实和官网是一样的。

2、客户端结构体:

// client
// @Description: 客户端信息
type client struct {
	//每个客户端连接后都要进行生成唯一key,因为业务场景需求,不同的用户或设备接受的数据要一一对应,后期这块要会做权限控制
	key string
	//客户端连接对象
	socket *websocket.Conn
	//数据发送链
	send chan []byte
}

3、编写启动方法,其实官网写的很像,稍微进行重构,更加符合当前项目

func (m *clientManager) start(callBackFunc func(c *client, manager *clientManager, businessDataWrapper model.BusinessDataWrapper) bool) {
	for {
		select {

		case client := <-m.register: //进行连接
			m.clients[client] = true
			msg := "有一个新连接出现,已连接成功,客户端key:" + client.key
			log.Info(ctx, msg)
			//发送数据,这块可以不用发送,等后续进行注释即可
			//m.send([]byte(msg), client)
		case client := <-m.unregister: //关闭连接,进行释放资源
			if _, ok := m.clients[client]; ok {
				close(client.send)
				delete(m.clients, client)
				msg := "客户端key:" + client.key + ",已关闭"
				log.Info(ctx, msg)
				//m.send([]byte(msg), client)
			}
		case businessDataWrapper := <-m.broadcast: //广播数据
			for client := range m.clients {
				//这里其实就是发送数据了,
				//进行转换成json字符串
				//businessDataJsonStr, _ := json.Marshal(businessDataWrapper)
				//broadCastSend(m, client, businessDataJsonStr)
				//进行回调,我这里面目的为了后期这块不在调整代码了,在启动时进行业务处理,方便后期扩展用的,如果毕竟简单,直接使用broadCastSend(m, client, businessDataJsonStr)进行推送信息
				callBackFunc(client, m, businessDataWrapper)
			}
		}
	}
}




// 广播进行发送数据
func broadCastSend(manager *clientManager, client *client, businessDataJsonByte []byte) {
	select {
	case client.send <- businessDataJsonByte:
	default:
		fmt.Println("关闭连接了,,,,,")
		close(client.send)
		delete(manager.clients, client)
	}
}

4、某一个客户端群发给其他客户端,排除自己客户端

//发送数据,这快类似群发消息
func (m *clientManager) send(message []byte, ignore *client) {

	for client := range m.clients {
      //ignore,这是忽略本身客户端,因为这是群发消息,自己可以不用接收了
		if client != ignore {
			//将数据写入到通道链
			client.send <- message
		}
	}
}

5、推送数据,第四步是写入到发送通道里,还没真正发送,这块就是真正推送到客户端机制:

func (c *client) write(manager clientManager) {
	defer func() {
		manager.unregister <- c
		c.socket.Close()
		log.Info(ctx, c.key, "客户端进行关闭")
	}()

	for {
		select {
		//如果客户端有数据要进行写出去
		case message, ok := <-c.send:
			if !ok {
				c.socket.WriteMessage(websocket.CloseMessage, []byte{})
				log.Info(ctx, c.key, "发送关闭提示")
				return
			}
			//这里才是真正的把数据推送到客户端
			err := c.socket.WriteMessage(websocket.TextMessage, message)
			if err != nil {
				manager.unregister <- c
				c.socket.Close()
				log.Info(ctx, c.key, "数据写入失败,进行关闭!")
				break
			}
		}
	}
}

6、接收客户端发送的数据

func (c *client) read(manager clientManager) {
	defer func() {
		manager.unregister <- c
		c.socket.Close()
		log.Info(ctx, c.key, "客户端进行关闭")
	}()
	for {
		_, message, err := c.socket.ReadMessage()
		if err != nil {
			manager.unregister <- c
			c.socket.Close()
			log.Info(ctx, c.key, "读数据出现异常,直接关闭。")
			break
		}
		//后期可以注释掉
		log.Info(ctx, c.key, "接收到客户端发送的数据", string(message))
		//读到数据,进行业务操作,目前我这边项目只需要推送到客户端即可,所以暂时不做业务了,其他需要做业务,这里做个监听即可
	}
}

7、提供创建客户端管理函数

// 创建客户端管理器
func newClientManager() *clientManager {
    return &clientManager{
       //广播数据,model.BusinessDataWrapper是我具体业务数据,可以换成[]byte接收
       broadcast:  make(chan model.BusinessDataWrapper),
       register:   make(chan *client),
       unregister: make(chan *client),
       clients:    make(map[*client]bool),
    }
}

以上就是整体的封装处理,可以看到和官方的demo很像,只是结合了一些业务场景而已,其他的都一样的。

接下来进行和业务进行集成:

1、创建管理器,上面也说了有两个业务场景, 一个是原始数据推送 ,另一个是报警数据推送,所以创建两个管理器出来:

// 原始ws客户端管理器
var rowDataManagerNew = newClientManager()

// 报警数据ws客户端管理器
var alarmDataManagerNew = newClientManager()

2、我们注册路由上,通过上面也能推断,需要指定两个路由路径

// 初始化websocket协议配置
var upgrader = websocket.Upgrader{
	ReadBufferSize:  1024,
	WriteBufferSize: 1024,
	CheckOrigin:     func(r *http.Request) bool { return true }, //允许跨域 、 允许同源
}

// registerRawDataClientConn
//
//	@Author  zhaosy
//	@Description: 注册原始数据客户端连接
//	@date  2024-07-16 18:12:19
func registerRawDataClientConn(w http.ResponseWriter, r *http.Request, businessType string, businessId string, userName string) {
	if lang.IsEmpty(businessType) {
		io.WriteString(w, "businessType 不能为空")
	}
	//生成客户端
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Error(ctx, err.Error(), err)
		io.WriteString(w, "这是一个websocket连接,不是API.")
		return
	}

	clientId := guid.S()
//这个key是我随机生成的一个key,包含了一些业务,大家根据需要进行设置,也可以随机生成就行,就是一行字符串,如果对key没有要求,其实key不用处理也行哈
	key := websocketRowDataCachePrefix(businessType, businessId, userName, clientId)
	//初始化客户端对象
	client := &client{
		key:    key,
		socket: conn,
		send:   make(chan []byte),
	}
	rowDataManagerNew.register <- client
	//开启读
	go client.read(*rowDataManagerNew)
	//开起写
	go client.write(*rowDataManagerNew)

}

// registerAlarmClient
//
//	@Author  zhaosy
//	@Description: 注册报警客户端连接
//	@date  2024-07-16 19:40:52
func registerAlarmClientConn(w http.ResponseWriter, r *http.Request) {
	//生成客户端
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Error(ctx, err.Error(), err)
		io.WriteString(w, "这是一个websocket,不是API.")
		return
	}

	clientId := guid.S()
//这个key是我随机生成的一个key,包含了一些业务,大家根据需要进行设置,也可以随机生成就行,就是一行字符串,如果对key没有要求,其实key不用处理也行哈
	key := websocketAlarmCachePrefix(clientId)
	//初始化客户端对象
	client := &client{
		key:    key,
		socket: conn,
		send:   make(chan []byte),
	}
	alarmDataManagerNew.register <- client
	//开启读
	go client.read(*alarmDataManagerNew)
	//开启写
	go client.write(*alarmDataManagerNew)

}

上面看到 

w http.ResponseWriter, r *http.Request 这两个参数应该就知道怎么做了吧,直接绑定到路由路由,也是官网那种方式

这是注册到go的http路由上了,后续通过path路径进行访问即可。

3、如何与我们的业务数据进行绑定?

还需要提供包函数出去

// SendRowDataBusinessData
//
//	@Author  zhaosy
//	@Description: 接收业务数据进行推送到websocket
//	@date  2024-07-16 18:19:46
func SendRowDataBusinessData(data model.BusinessData) {
	rowDataManagerNew.broadcast <- model.SetRowDataWsWrapper(data)
}

// SendAlarmBusinessData
//
//	@Author  zhaosy
//	@Description: 接收报警数据,推送到websocket客户端,这是我项目的业务,大家换成string即可
//	@date  2024-07-16 19:42:13
func SendAlarmBusinessData(data model.AlarmBusinessData) {
	alarmDataManagerNew.broadcast <- model.SetAlarmWsWrapper(data)
}

仅供参考。

这里是业务数据推送websocket的入口:

4、最后一步是管理器要启动了,启动前,大家知道我写了 回调函数,要进行实现下,具体业务了,所以大家参考即可:

func init() {
	//启动原始数据websocket
	go rowDataManagerNew.start(func(c *client, manager *clientManager, businessDataWrapper model.BusinessDataWrapper) bool {
		if consts.ZERO == businessDataWrapper.DataType { //原始数据,推送
			businessData := businessDataWrapper.BusinessData
			if businessData.BusinessId == "" {
				//进行转换成json字符串
				businessDataJsonStr, _ := json.Marshal(businessData)
				//广播所有客户端
				broadCastSend(manager, c, businessDataJsonStr)
				return true
			}
			//找到对应客户端 --可以通过拼接缓存key进行匹配也是可以的
			if strings.Contains(c.key, websocketRowDataCachePrefix(businessData.BusinessType, businessData.BusinessId, "", "")) {
				//进行转换成json字符串
				businessDataJsonStr, _ := json.Marshal(businessData)
				//广播指定客户端
				broadCastSend(manager, c, businessDataJsonStr)
				return true
			}
			return false
		}

		return false
	})

	//启动报警数据推送
	go alarmDataManagerNew.start(func(c *client, manager *clientManager, businessDataWrapper model.BusinessDataWrapper) bool {
		// 报警数据,推送
		if consts.ONE == businessDataWrapper.DataType {
			alarmBusinessData := businessDataWrapper.AlarmBusinessData
			if alarmBusinessData.BusinessId != "" {
				//找到对应客户端 --可以通过拼接缓存key进行匹配也是可以的
				if strings.Contains(c.key, websocketAlarmCachePrefix("")) {
					//进行转换成json字符串
					alarmBusinessDataJsonStr, _ := json.Marshal(alarmBusinessData)
					//广播指定客户端
					broadCastSend(manager, c, alarmBusinessDataJsonStr)
					return true
				}
			}
		}
		return false
	})
}

这样就可以了,启动整体就没问题了,我这边用的是goframe框架,所以我单独提供了这两个:

func NewWs() *webSocket {
	return &webSocket{}
}


type webSocket struct {
}

// RawDataWSHandle
//
//	@Author  zhaosy
//	@Description: 原始数据websocket
//	@date  2024-07-16 15:00:04
func (w *webSocket) RawDataWSHandle(r *ghttp.Request) {
	//获取参数
	businessType := r.Get("businessType")
	businessId := r.Get("businessId")
	userName := r.Get("userName")
	registerRawDataClientConn(r.Response.BufferWriter, r.Request, businessType.String(), businessId.String(), userName.String())
}

// AlarmWSHandle
//
//	@Author  zhaosy
//	@Description: 报警websocket处理器
//	@date  2024-07-16 19:43:57
func (w *webSocket) AlarmWSHandle(r *ghttp.Request) {
	registerAlarmClientConn(r.Response.BufferWriter, r.Request)
}

在goframe里cmd里进行绑定:

	//websocket--原始数据websocket推送
				s.BindHandler("/ws/rowdata/{businessType}/{businessId}/{userName}", websocket.NewWs().RawDataWSHandle)
				//websocket--报警数据websocket推送
				s.BindHandler("/ws/alarm", websocket.NewWs().AlarmWSHandle)

5、进行测试:

启动正常,日志也输出来了,进行测试

上面是连接正常,

通过业务数据进行测试:

以上就是本次研究的结果,整体上go的websocket比较简单,后面有机会,会重新进行重构,重构单独封装可以随意使用。

发一个整的代码:

// Package websocket
// @Author zhaosy
// @Date 2024/7/16 下午2:52:00
// @Desc websocket相关
package websocket

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/gogf/gf/v2/frame/g"
	"github.com/gogf/gf/v2/net/ghttp"
	"github.com/gogf/gf/v2/util/guid"
	"github.com/gorilla/websocket"
	"io"
	"net/http"
	"skynet/internal/consts"
	"skynet/internal/model"
	"skynet/utility/lang"
	"strings"
)

var (
	ctx = context.TODO()
	log = g.Log()
)

func init() {
	//启动原始数据websocket
	go rowDataManagerNew.start(func(c *client, manager *clientManager, businessDataWrapper model.BusinessDataWrapper) bool {
		if consts.ZERO == businessDataWrapper.DataType { //原始数据,推送
			businessData := businessDataWrapper.BusinessData
			if businessData.BusinessId == "" {
				//进行转换成json字符串
				businessDataJsonStr, _ := json.Marshal(businessData)
				//广播所有客户端
				broadCastSend(manager, c, businessDataJsonStr)
				return true
			}
			//找到对应客户端 --可以通过拼接缓存key进行匹配也是可以的
			if strings.Contains(c.key, websocketRowDataCachePrefix(businessData.BusinessType, businessData.BusinessId, "", "")) {
				//进行转换成json字符串
				businessDataJsonStr, _ := json.Marshal(businessData)
				//广播指定客户端
				broadCastSend(manager, c, businessDataJsonStr)
				return true
			}
			return false
		}

		return false
	})

	//启动报警数据推送
	go alarmDataManagerNew.start(func(c *client, manager *clientManager, businessDataWrapper model.BusinessDataWrapper) bool {
		// 报警数据,推送
		if consts.ONE == businessDataWrapper.DataType {
			alarmBusinessData := businessDataWrapper.AlarmBusinessData
			if alarmBusinessData.BusinessId != "" {
				//找到对应客户端 --可以通过拼接缓存key进行匹配也是可以的
				if strings.Contains(c.key, websocketAlarmCachePrefix("")) {
					//进行转换成json字符串
					alarmBusinessDataJsonStr, _ := json.Marshal(alarmBusinessData)
					//广播指定客户端
					broadCastSend(manager, c, alarmBusinessDataJsonStr)
					return true
				}
			}
		}
		return false
	})
}

func NewWs() *webSocket {
	return &webSocket{}
}

// websocketRowDataCachePrefix
//
//	@Author  zhaosy
//	@Description: 原始数据缓存前缀
//	@date  2024-07-16 18:15:55
func websocketRowDataCachePrefix(businessType, businessId, userName, clientId string) string {
	key := "websocket:rowdata"
	if lang.IsNotEmpty(businessType) {
		key = key + ":" + businessType
		if lang.IsNotEmpty(businessId) {
			key = key + ":" + businessId
			if lang.IsNotEmpty(userName) {
				key = key + ":" + userName

			}
		}
	}
	if lang.IsNotEmpty(clientId) {
		key = key + ":" + clientId
	}
	return key
}

// websocketAlarmCachePrefix
//
//	@Author  zhaosy
//	@Description: 报警数据前缀
//	@date  2024-07-16 19:36:24
func websocketAlarmCachePrefix(clientId string) string {
	//后期要加组织机构,有权限控制这块,需要进行处理,暂时先不去处理
	key := "websocket:alarm"
	if lang.IsNotEmpty(clientId) {
		key = key + ":" + clientId
	}
	return key
}

type webSocket struct {
}

// RawDataWSHandle
//
//	@Author  zhaosy
//	@Description: 原始数据websocket
//	@date  2024-07-16 15:00:04
func (w *webSocket) RawDataWSHandle(r *ghttp.Request) {
	//获取参数
	businessType := r.Get("businessType")
	businessId := r.Get("businessId")
	userName := r.Get("userName")
	registerRawDataClientConn(r.Response.BufferWriter, r.Request, businessType.String(), businessId.String(), userName.String())
}

// AlarmWSHandle
//
//	@Author  zhaosy
//	@Description: 报警websocket处理器
//	@date  2024-07-16 19:43:57
func (w *webSocket) AlarmWSHandle(r *ghttp.Request) {
	registerAlarmClientConn(r.Response.BufferWriter, r.Request)
}

// SendRowDataBusinessData
//
//	@Author  zhaosy
//	@Description: 接收业务数据进行推送到websocket
//	@date  2024-07-16 18:19:46
func SendRowDataBusinessData(data model.BusinessData) {
	rowDataManagerNew.broadcast <- model.SetRowDataWsWrapper(data)
}

// SendAlarmBusinessData
//
//	@Author  zhaosy
//	@Description: 接收报警数据,推送到websocket客户端
//	@date  2024-07-16 19:42:13
func SendAlarmBusinessData(data model.AlarmBusinessData) {
	alarmDataManagerNew.broadcast <- model.SetAlarmWsWrapper(data)
}

// 初始化websocket协议配置
var upgrader = websocket.Upgrader{
	ReadBufferSize:  1024,
	WriteBufferSize: 1024,
	CheckOrigin:     func(r *http.Request) bool { return true }, //允许跨域 、 允许同源
}

// registerRawDataClientConn
//
//	@Author  zhaosy
//	@Description: 注册原始数据客户端连接
//	@date  2024-07-16 18:12:19
func registerRawDataClientConn(w http.ResponseWriter, r *http.Request, businessType string, businessId string, userName string) {
	if lang.IsEmpty(businessType) {
		io.WriteString(w, "businessType 不能为空")
	}
	//生成客户端
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Error(ctx, err.Error(), err)
		io.WriteString(w, "这是一个websocket连接,不是API.")
		return
	}

	clientId := guid.S()
	key := websocketRowDataCachePrefix(businessType, businessId, userName, clientId)
	//初始化客户端对象
	client := &client{
		key:    key,
		socket: conn,
		send:   make(chan []byte),
	}
	rowDataManagerNew.register <- client
	//开启读
	go client.read(*rowDataManagerNew)
	//开起写
	go client.write(*rowDataManagerNew)

}

// registerAlarmClient
//
//	@Author  zhaosy
//	@Description: 注册报警客户端连接
//	@date  2024-07-16 19:40:52
func registerAlarmClientConn(w http.ResponseWriter, r *http.Request) {
	//生成客户端
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Error(ctx, err.Error(), err)
		io.WriteString(w, "这是一个websocket,不是网站.")
		return
	}

	clientId := guid.S()

	key := websocketAlarmCachePrefix(clientId)
	//初始化客户端对象
	client := &client{
		key:    key,
		socket: conn,
		send:   make(chan []byte),
	}
	alarmDataManagerNew.register <- client
	//开启读
	go client.read(*alarmDataManagerNew)
	//开启写
	go client.write(*alarmDataManagerNew)

}

// 原始ws客户端管理器
var rowDataManagerNew = newClientManager()

// 报警数据ws客户端管理器
var alarmDataManagerNew = newClientManager()

// **********************************以下是websocket进行封装,可以直接使用******************************************

// 创建客户端管理器
func newClientManager() *clientManager {
	return &clientManager{
		broadcast:  make(chan model.BusinessDataWrapper),
		register:   make(chan *client),
		unregister: make(chan *client),
		clients:    make(map[*client]bool),
	}
}

// clientManager
// @Description: 客户端管理者
type clientManager struct {
	//客户端存储的地方,我这边是用map进行存储,这块可以放到redis上,也是可以的,根据情况扩展即可
	//这里可以使用map[string]*client,也是可以的,如果这样设计,方便后期进行匹配比较简单,直接匹配key即可,这种方式也是可以的,匹配客户端对象里的key
	clients map[*client]bool
	//广播数据链,进行业务限制,如果没有业务限制,直接使用[]byte 比较通用;用chan进行异步处理
	broadcast chan model.BusinessDataWrapper
	//客户端注册链;用chan进行异步处理
	register chan *client
	//客户端关闭链;用chan进行异步处理
	unregister chan *client
}

// client
// @Description: 客户端信息
type client struct {
	//每个客户端连接后都要进行生成唯一key,因为业务场景需求,不同的用户或设备接受的数据要一一对应,后期这块要会做权限控制
	key string
	//客户端连接对象
	socket *websocket.Conn
	//数据发送链
	send chan []byte
}

// start
//
//	@Author  zhaosy
//	@Description: websocket启动
//	@date  2024-07-17 10:55:28
func (m *clientManager) start(callBackFunc func(c *client, manager *clientManager, businessDataWrapper model.BusinessDataWrapper) bool) {
	for {
		select {

		case client := <-m.register: //进行连接
			m.clients[client] = true
			msg := "有一个新连接出现,已连接成功,客户端key:" + client.key
			log.Info(ctx, msg)
			//发送数据,这块可以不用发送,等后续进行注释即可
			//m.send([]byte(msg), client)
		case client := <-m.unregister: //关闭连接,进行释放资源
			if _, ok := m.clients[client]; ok {
				close(client.send)
				delete(m.clients, client)
				msg := "客户端key:" + client.key + ",已关闭"
				log.Info(ctx, msg)
				//m.send([]byte(msg), client)
			}
		case businessDataWrapper := <-m.broadcast: //广播数据
			for client := range m.clients {
				//这里其实就是发送数据了,
				//进行转换成json字符串
				//businessDataJsonStr, _ := json.Marshal(businessDataWrapper)
				//broadCastSend(m, client, businessDataJsonStr)
				//进行回调,我这里面目的为了后期这块不在调整代码了,在启动时进行业务处理,方便后期扩展用的,如果毕竟简单,直接使用broadCastSend(m, client, businessDataJsonStr)进行推送信息
				callBackFunc(client, m, businessDataWrapper)
			}
		}
	}
}

// 广播进行发送数据
func broadCastSend(manager *clientManager, client *client, businessDataJsonByte []byte) {
	select {
	case client.send <- businessDataJsonByte:
	default:
		fmt.Println("关闭连接了,,,,,")
		close(client.send)
		delete(manager.clients, client)
	}
}

// send
//
//	@Author  zhaosy
//	@Description: 这快类似群发消息
//	@date  2024-07-16 16:40:37
func (m *clientManager) send(message []byte, ignore *client) {
	for client := range m.clients {
		//ignore,这是忽略本身客户端,因为这是群发消息,自己可以不用接收了
		if client != ignore {
			//将数据写入到通道链
			client.send <- message
		}
	}
}

func (c *client) read(manager clientManager) {
	defer func() {
		manager.unregister <- c
		c.socket.Close()
		log.Info(ctx, c.key, "客户端进行关闭")
	}()
	for {
		_, message, err := c.socket.ReadMessage()
		if err != nil {
			manager.unregister <- c
			c.socket.Close()
			log.Info(ctx, c.key, "读数据出现异常,直接关闭。")
			break
		}
		//后期可以注释掉
		log.Info(ctx, c.key, "接收到客户端发送的数据", string(message))
		//读到数据,进行业务操作,目前我这边项目只需要推送到客户端即可,所以暂时不做业务了,其他需要做业务,这里做个监听即可
	}
}

// write
//
//	@Author  zhaosy
//	@Description: 写入数据
//	@date  2024-07-16 16:52:47
func (c *client) write(manager clientManager) {
	defer func() {
		manager.unregister <- c
		c.socket.Close()
		log.Info(ctx, c.key, "客户端进行关闭")
	}()

	for {
		select {
		//如果客户端有数据要进行写出去
		case message, ok := <-c.send:
			if !ok {
				c.socket.WriteMessage(websocket.CloseMessage, []byte{})
				log.Info(ctx, c.key, "发送关闭提示")
				return
			}
			//这里才是真正的把数据推送到客户端
			err := c.socket.WriteMessage(websocket.TextMessage, message)
			if err != nil {
				manager.unregister <- c
				c.socket.Close()
				log.Info(ctx, c.key, "数据写入失败,进行关闭!")
				break
			}
		}
	}
}

ok。结束

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1937346.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

昇思学习打卡-21-生成式/Diffusion扩散模型

文章目录 Diffusion扩散模型介绍模型推理结果 Diffusion扩散模型介绍 关于扩散模型&#xff08;Diffusion Models&#xff09;有很多种理解&#xff0c;除了本文介绍的离散时间视角外&#xff0c;还有连续时间视角、概率分布转换视角、马尔可夫链视角、能量函数视角、数据增强…

《样式设计003:布局-自定义view模块》

描述&#xff1a;在开发小程序过程中&#xff0c;发现一些不错的案例&#xff0c;平时使用也比较多&#xff0c;稍微总结了下经验&#xff0c;以下内容可以直接复制使用&#xff0c;希望对大家有所帮助&#xff0c;废话不多说直接上干货&#xff01; 一、布局-自定义view模块 …

el-popover嵌套select弹窗点击实现自定义关闭

需求 el-popover弹窗内嵌套下拉选择框&#xff0c;点击el-popover弹出外部区域需关闭弹窗&#xff0c;点击查询、重置需关闭弹窗&#xff0c; 实现 根据需求要自定义弹窗的关闭和显示&#xff0c;首先想到的是visible属性&#xff0c;在实现过程中经过反复的测验&#xff0…

服务级别协议SLA与运营水平协议OLA

使用美团或饿了么在线订餐时&#xff0c;您将体验到即时的送餐提醒服务。首先&#xff0c;选择您想要的食品。系统会根据餐厅与您的位置、所选食品的种类&#xff0c;以及下单的具体时间&#xff0c;计算预计的等待时间和送餐费用&#xff0c;并将这些信息与您共享。这种信息的…

剖析SGI-STL二级空间配置器

概述 SGI-STL与C标准库提供的STL一样&#xff0c;都通过空间配置器allocator来申请或释放容器的空间。空间配置器的作用可以参考&#xff1a;浅谈C空间配置器allocator及其重要性 // C标准库的vector template < class T, class Alloc allocator<T> > class vec…

混淆专题一——简单AA,JJ,JSFuck混淆处理办法

以AA混淆为例 网址&#xff1a;Scrape | NBA 想要获取球员的信息&#xff0c;但找不到包。 刷新页面&#xff0c;main.js中找到混淆的代码&#xff0c;这串混淆代码就是球员信息。 如何处理&#xff1a; 复制下来&#xff0c;去除最后的笑脸 (_)&#xff0c;然后在控制台打…

启智集装箱箱号识别技术,更高效快捷

在当今这个信息技术高速发展的时代&#xff0c;集装箱箱号识别技术在全球物流领域扮演着至关重要的角色。随着物流行业的不断壮大和复杂化&#xff0c;对集装箱箱号识别的准确性、效率性和便捷性提出了更高的要求。启智集装箱箱号识别技术应运而生&#xff0c;以其高效快捷的特…

python-快速上手爬虫

目录 前言 爬虫需谨慎&#xff0c;切勿从入门到入狱&#xff01; 一点小小的准备工作 直接上手爬取网页 1.获取UA伪装 2.获取url 3.发送请求 4.获取数据并保存 总结 前言 爬虫需谨慎&#xff0c;切勿从入门到入狱&#xff01; 一点小小的准备工作 对pip进行换源&#xf…

【EI检索】第二届机器视觉、图像处理与影像技术国际会议(MVIPIT 2024)

一、会议信息 大会官网&#xff1a;www.mvipit.org 官方邮箱&#xff1a;mvipit163.com 会议出版&#xff1a;IEEE CPS 出版 会议检索&#xff1a;EI & Scopus 检索 会议地点&#xff1a;河北张家口 会议时间&#xff1a;2024 年 9 月 13 日-9 月 15 日 二、征稿主题…

vue3前端开发-小兔鲜项目-面包屑导航的渲染

vue3前端开发-小兔鲜项目-面包屑导航的渲染&#xff01;今天来完成&#xff0c;一级分类页面顶部&#xff0c;面包屑导航的渲染。 1&#xff1a;完善好一级页面内的基础模块代码。 <script setup> import {getCategoryAPI} from /apis/category import {ref,onMounted} …

【知识蒸馏】YOLO object detection 逻辑蒸馏

YOLO检测蒸馏 和分类和分割蒸馏的差异&#xff1a; 由于YOLOv3检测框的位置输出为正无穷到负无穷的连续值&#xff0c;和上面将的分类离散kdloss不同&#xff0c;而且由于yolo是基于anchor的one stage模型&#xff0c;head out中99%都是背景预测。 Object detection at 200 F…

【论文阅读笔记】Hierarchical Neural Coding for Controllable CAD Model Generation

摘要 作者提出了一种CAD的创新生成模型&#xff0c;该模型将CAD模型的高级设计概念表示为从全局部件排列到局部曲线几何的三层神经代码的层级树&#xff0c;并且通过指定目标设计的代码树来控制CAD模型的生成或完成。具体而言&#xff0c;一种带有“掩码跳过连接”的向量量化变…

【BUG】已解决:To update, run: python.exe -m pip install --upgrade pip

To update, run: python.exe -m pip install --upgrade pip 目录 To update, run: python.exe -m pip install --upgrade pip 【常见模块错误】 解决办法&#xff1a; 欢迎来到英杰社区https://bbs.csdn.net/topics/617804998 欢迎来到我的主页&#xff0c;我是博主英杰&…

「MQTT over QUIC」与「MQTT over TCP」与 「TCP 」通信测试报告

一、结论 在实车5G测试中「MQTT Over QUIC」整体表现优于「TCP」&#xff0c;可在系统架构升级时采用MQTT Over QUIC替换原有的TCP通讯&#xff1b;从实现原理上基于QUIC比基于TCP在弱网、网络抖动导致频繁重连场景延迟更低。 二、测试方案 网络类型&#xff1a;实车5G、实车…

FPGA-计数器

前言 之前一直说整理点FPGA控制器应用的内容&#xff0c;今天就从计数器这个在时序逻辑中比较重要的内容开始总结一下&#xff0c;主要通过还是通过让一个LED闪烁这个简单例子来理解。 寄存器 了解计数器之前先来认识一下寄存器。寄存器是时序逻辑设计的基础。时序逻辑能够避…

Android C++系列:Linux信号(三)

可重入函数 不含全局变量和静态变量是可重入函数的一个要素可重入函数见man 7 signal在信号捕捉函数里应使用可重入函数在信号捕捉函数里禁止调用不可重入函数例如:strtok就是一个不可重入函数,因为strtok内部维护了一个内部静态指针,保存上一 次切割到的位置,如果信号的捕捉…

android Invalid keystore format

签名的时候提示:Invalid keystore format. 点击info查看更多日志 再点击一次 stactrace 查看更多提示 提示&#xff1a;javaio异常 基本是jdk版本的问题&#xff0c;高jdk版本打的key&#xff0c;在低版本jdk开发环境上无法使用。 查看自己的key信息 keytool -list -v -keys…

Redis实现用户会话

1.分布式会话 (1)什么是会话 会话Session代表的是客户端与服务器的一次交互过程&#xff0c;这个过程可以是连续也可以是时断时续的。曾经的Servlet时代&#xff08;jsp&#xff09;&#xff0c;一旦用户与服务端交互&#xff0c;服务器tomcat就会为用户创建一个session&#…

【C++】深入理解函数重载:C语言与C++的对比

文章目录 前言1. 函数重载&#xff1a;概念与条件1.1 什么是函数重载1.2 函数重载的条件1.3 函数重载的注意点 2. 函数重载的价值2.1 书写函数名方便2.2 类中构造函数的实现2.3 模板的底层实现 3. C语言与C的对比3.1 C语言不支持函数重载的原因3.2 C支持函数重载的原因 4. Linu…

PostgreSQL的引号、数据类型转换和数据类型

一、单引号和双引号&#xff08;重要&#xff09;&#xff1a; 1、在mysql没啥区别 2、在pgsql中&#xff0c;实际字符串用单引号&#xff0c;双引号相当于mysql的,用来包含关键字&#xff1b; -- 单引号&#xff0c;表示user_name的字符串实际值 insert into t_user(user_nam…