IM系统设计之websocket消息转发

news2025/1/16 13:57:26

Websocket消息转发

项目地址:git@github.com:muyixiaoxi/Link.git

上周面试被面试官问到:“在分布式IM系统中,如何实现多个websocket集群之间的通信”。

我在思考了良久后回答:“不会”。

随着我的回答,我和面试官的故事也到此完结了…

为什么会出现websocket集群

在IM系统中,需要在服务端和客户端之间维持一个长连接,而这个长连接可以通过websocket实现。
但服务端能维持websocket的数量并不是无限的。

WebSocket的并发连接数受到多种因素的影响,其中最主要的瓶颈通常在于服务器资源。在传统的模型中,一台服务器上的最大WebSocket连接数受到操作系统中TCP/IP连接数的限制。在Linux系统中,每个IPv4地址允许的最大连接数为65535,这意味着如果每个连接都使用不同的IP地址,一台服务器最多只能维持65535个WebSocket连接。

当用户量很多时,一台websocket服务器远远是不够的,所以需要多台websocket服务器。

假如现在只有一台IM服务器(即 Websocket 服务器),用户A、用户B均在线,用户A向用户B发送一条消息
在这里插入图片描述
单台IM服务器发送消息大概流程如下:

  1. 客户端向IM服务端发送消息
  2. IM服务端收到消息判断用户B是否在线
    • 在线,Websocket转发
    • 离线,将消息存储到B的离线消息库
  3. 用户B立即了消息,或者在下次上线时收到了消息

因为现在只有一台IM服务器,所以直接可以判断用户B是否在线,并且转发。

假如现在我有多台IM服务器,重复上面的操作

如果恰巧A和B连接在一台IM服务器上,那么和上面的流程一样

假如现在A连接在IM1上,B连接在IM2上
在这里插入图片描述

其中红线的部分,就是我们要解决的部分

websocket集群通信

我查阅了网上的一些资料(这一块具体采用哪种技术栈实现,网上的资料很少),大概分为两种

方法一:互为客户端

分别将IM服务端与其他IM服务端连接起来,可以通过网络编程或者MQ来实现
在这里插入图片描述
优点:

  • 不需要额外的服务
  • 转发过程中,各个IM服务负载相对均匀

缺点:

  • 每增加一个IM服务端都需要其他服务端多维持一个连接或者MQ
  • 水平扩展有点繁琐

方法二:C/S

采用c/s架构,新建一个transmit服务,单独实现转发功能,可以通过网络编程或者MQ实现。
因为各个 IM 都与 transmit 连接,所以扩展只需要该配置文件的运行端口
在这里插入图片描述
优点:

  • 高可用,IM server扩展方便,只需要修改自己的运行端口

缺点:

  • 不同服务端之间的消息需要通过 transmit 转发,当海量消息时,对 transmit 压力比较大
代码实现

为了实现消息的时效性,以及高可用,我采用net包中的tcp实现了c/s架构
项目目录如下:

transmit:
│  client.go
│  main.go
│
├─common
│  └─proto
│          proto.go
│
└─types
        types.go
proto

使用 net 包的 tcp 可能会出现粘包现象,封装编码与解码方法从而避免粘包

package main

import (
	"bufio"
	"encoding/json"
	"fmt"
	"github.com/zeromicro/go-zero/core/logx"
	"net"
	"sync"
	"transmit/common/proto"
	"transmit/types"
)

var Connects sync.Map

func main() {
	listenClient()
}

// listenClient 监听
func listenClient() {
	lister, err := net.Listen("tcp", "127.0.0.1:8333")
	if err != nil {
		fmt.Println("net.Listen failed:", err)
	}
	for {
		conn, err := lister.Accept()
		if err != nil {
			continue
		}
		fmt.Println(conn.RemoteAddr().String())
		Connects.Swap(conn.RemoteAddr().String(), conn)
		go addReceiver(conn)
	}
}

// addReceiver 向连接添加接收器
func addReceiver(conn net.Conn) {
	defer conn.Close()
	reader := bufio.NewReader(conn)
	for {
		m, err := proto.Decode(reader)
		if err != nil {
			fmt.Println("与客户端", conn.LocalAddr(), "断开连接")
			return
		}
		transmit := types.TransmitMap{}
		json.Unmarshal([]byte(m), &transmit)
		// 读到消息后,根据服务器进行转发
		for connect := range transmit.Users {
			transmitMessage(conn, connect, transmit)
		}
	}
}

func transmitMessage(conn net.Conn, ip string, transmit types.TransmitMap) {
	c, ok := Connects.Load(ip)
	message := types.TransmitMap{
		Users: map[string][]uint64{},
	}
	if !ok {
		message = types.TransmitMap{
			Message: types.Message{
				Id:          "",
				From:        0,
				To:          0,
				Type:        100,
				ContentType: 0,
				Time:        "",
				Content:     "客户端离线",
			},
		}
		s, _ := json.Marshal(message)
		msg, _ := proto.Encode(string(s))
		conn.Write(msg)
		fmt.Println("客户端离线:", ip)
		logx.Error("connect ip offline:", ip)
		return
	}
	message.Users[ip] = transmit.Users[ip]
	message.Message = transmit.Message
	j, _ := json.Marshal(message)
	msg, _ := proto.Encode(string(j))
	fmt.Println("ip:", ip, "msg:", string(msg))
	c.(net.Conn).Write(msg)
}

main

通过监听某个端口,让 IM server与其建立间接,实现转发功能

package main

import (
	"bufio"
	"encoding/json"
	"fmt"
	"github.com/zeromicro/go-zero/core/logx"
	"net"
	"sync"
	"transmit/common/proto"
	"transmit/types"
)

var Connects sync.Map

func main() {
	listenClient()
}

// listenClient 监听
func listenClient() {
	lister, err := net.Listen("tcp", "127.0.0.1:8333")
	if err != nil {
		fmt.Println("net.Listen failed:", err)
	}
	for {
		conn, err := lister.Accept()
		if err != nil {
			continue
		}
		fmt.Println(conn.RemoteAddr().String())
		Connects.Swap(conn.RemoteAddr().String(), conn)
		go addReceiver(conn)
	}
}

// addReceiver 向连接添加接收器
func addReceiver(conn net.Conn) {
	defer conn.Close()
	reader := bufio.NewReader(conn)
	for {
		m, err := proto.Decode(reader)
		if err != nil {
			fmt.Println("与客户端", conn.LocalAddr(), "断开连接")
			return
		}
		transmit := types.TransmitMap{}
		json.Unmarshal([]byte(m), &transmit)
		// 读到消息后,根据服务器进行转发
		for connect := range transmit.Users {
			transmitMessage(conn, connect, transmit)
		}
	}
}

func transmitMessage(conn net.Conn, ip string, transmit types.TransmitMap) {
	c, ok := Connects.Load(ip)
	message := types.TransmitMap{
		Users: map[string][]uint64{},
	}
	if !ok {
		message = types.TransmitMap{
			Message: types.Message{
				Id:          "",
				From:        0,
				To:          0,
				Type:        100,
				ContentType: 0,
				Time:        "",
				Content:     "客户端离线",
			},
		}
		s, _ := json.Marshal(message)
		msg, _ := proto.Encode(string(s))
		conn.Write(msg)
		fmt.Println("客户端离线:", ip)
		logx.Error("connect ip offline:", ip)
		return
	}
	message.Users[ip] = transmit.Users[ip]
	message.Message = transmit.Message
	j, _ := json.Marshal(message)
	msg, _ := proto.Encode(string(j))
	fmt.Println("ip:", ip, "msg:", string(msg))
	c.(net.Conn).Write(msg)
}
client

模拟客户端,根据自己的项目拆分到 IM server中

package main

import (
	"bufio"
	"encoding/json"
	"fmt"
	"net"
	"time"
	"transmit/common/proto"
	"transmit/types"
)

func client() {
	conn, _ := InitConnect()
	go Consumer(conn)
	var ip string
	message := types.Message{
		Id:          "123",
		From:        1,
		To:          2,
		Type:        1,
		ContentType: 1,
		Time:        "123",
		Content:     "你好",
	}
	for {
		fmt.Scan(&ip)
		users := map[string][]uint64{}
		users[ip] = []uint64{1, 2}
		time.Sleep(2 * time.Second)
		if err := Producer(conn, users, message); err != nil {
			fmt.Println("Producer(conn, ip, message) failed", err)
		}
	}

}

func InitConnect() (conn net.Conn, err error) {
	conn, err = net.Dial("tcp", "127.0.0.1:8333")
	fmt.Println(conn.LocalAddr())
	return
}

func Producer(conn net.Conn, user map[string][]uint64, mes types.Message) (err error) {
	transmit := types.TransmitMap{
		Users:   user,
		Message: mes,
	}
	message, _ := json.Marshal(transmit)
	m, _ := proto.Encode(string(message))
	_, err = conn.Write(m)
	if err != nil {
		// 重试三次,一次休眠一秒
		for i := 0; i < 3 && err != nil; i++ {
			time.Sleep(1 * time.Second)
			_, err = conn.Write(m)
		}
	}
	return
}

// Consumer 消费者 读消息
func Consumer(conn net.Conn) {
	defer conn.Close()
	reader := bufio.NewReader(conn)
	for {
		m, err := proto.Decode(reader)
		if err != nil {
			continue
		}
		transmit := types.TransmitMap{}
		json.Unmarshal([]byte(m), &transmit)
		// 读到消息后,进行转发
		for _, uIds := range transmit.Users {
			for _, id := range uIds {
				fmt.Println(id, transmit.Message)
			}
		}
	}
}

types

在转发群聊消息时,需要将 m 个用户转发到 n 个IM服务端上,如果单独发送需要多次发送,所以封装成 TransmitMap 进行转发。

type Message struct {
	Id          string `json:"id"`
	From        uint64 `json:"from,optional"`
	To          uint64 `json:"to"`
	Type        uint32 `json:"type"`
	ContentType uint32 `json:"contentType"`
	Time        string `json:"time"`
	Content     string `json:"content"`
}


type TransmitMap struct {
	Users map[string][]uint64 `json:"users"`  // map[主机地址]用户集合
	Message
}

为什么这里要封装一个?

type TransmitMap struct {
	Users map[string][]uint64 `json:"users"`
	Message
}

比如用户A、B、C、D、E、F、G在同一个群聊里,各自连接到的 IM server 如图所示
在这里插入图片描述
如果群聊消息采用上面单聊的转发方式

  1. 用户A发送一条消息
  2. IM server1 收到群聊消息,查找群里的其他用户(B、C、D、E、F、G)
  3. 判断哪些用户在当前 IM servier 上,发现 B,直接转发
  4. 遍历(B、C、D、E、F、G): 将在线用户消息转发消息到 transmit

如果不做任何操作的化,光过程 4 就需要转发5次消息

用户A发送一条群聊消息的过程

  1. 用户A发送一条群聊消息
  2. IM server1 收到群聊消息,查找群里的其他用户(B、C、D、E、F、G)
  3. 判断哪些用户在当前 IM servier 上,发现 B,直接转发
  4. 通过 redis 判断哪些用户在线并获取主机地址,将通过map将相同地址的用户分类 map[主机地址]用户集合,一块转发到 transmit
  5. transmit 以主机地址为组,将消息发送给 IM server2 和 IM server3
  6. IM server2 和 IM server3 收到消息后,将消息进行转发
  7. 离线用户同步离线消息库

这样的好处是,有效的减少群聊消息转发的次数。

ps:如果存在哪些不足,欢迎大家在评论区指正~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1539373.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【C++航海王:追寻罗杰的编程之路】stack

目录 1 -> stack的介绍和使用 1.1 -> stack的介绍 1.2 -> stack的使用 1.3 -> stack的模拟实现 1 -> stack的介绍和使用 1.1 -> stack的介绍 stack的文档介绍 1. stack是一种容器适配器&#xff0c;专门用在具有后进先出操作的上下文环境中&#xff0c…

React【Day1】

B站视频链接 一、React介绍 React由Meta公司开发&#xff0c;是一个用于 构建Web和原生交互界面的库 React的优势 相较于传统基于DOM开发的优势 组件化的开发方式不错的性能 相较于其它前端框架的优势 丰富的生态跨平台支持 React的市场情况 全球最流行&#xff0c;大…

案例实践 | 基于长安链的煤质检测智慧实验室

案例名称-煤质检测智慧实验室 ■ 建设单位 国能数智科技开发&#xff08;北京&#xff09;有限公司 ■ 用户群体 煤炭生产单位、电力单位、化工单位等产业链上下游单位 ■ 应用成效 化验效率提升50%&#xff0c;出验时间缩短40%&#xff0c;提高化验数据市场公信力 案例…

数据结构->手把手教入门栈与列队(基础)

✅作者简介&#xff1a;大家好&#xff0c;我是橘橙黄又青&#xff0c;一个想要与大家共同进步的男人&#x1f609;&#x1f609; &#x1f34e;个人主页&#xff1a;橘橙黄又青-CSDN博客 1.什么是栈 1.1栈的概念及结构 栈&#xff1a;一种特殊的线性表&#xff0c;其只允许…

利用Scala与Apache HttpClient实现网络音频流的抓取

概述 在当今数字化时代&#xff0c;网络数据的抓取和处理已成为许多应用程序和服务的重要组成部分。本文将介绍如何利用Scala编程语言结合Apache HttpClient工具库实现网络音频流的抓取。通过本文&#xff0c;读者将学习如何利用强大的Scala语言和Apache HttpClient库来抓取网…

IS210BPPCH1AEC的数据分析

GE IS210BPPCH1AEC 是一款专为石油和天然气行业设计的压力传感器。 该传感器可以监测油井、气井以及管道系统中的压力&#xff0c;并且具备数据分析和远程监控的能力。这使得它在确保油气生产过程安全和效率方面发挥关键作用。具体来看&#xff0c;以下是一些特点和应用&#x…

Webman全局异常捕获处理

最近在使用webman这个框架做项目开发&#xff0c;涉及到需要统一处理异常捕获。由于官网给的并不详细&#xff0c;于是自己实现了一下全局异常处理类。 一、配置效果 例如&#xff1a;我要在项目中统一返回json 格式数据&#xff0c;并不想在业务层写try,catch逻辑。 或者在业务…

校招应聘流程讲解

在整个应聘流程中&#xff0c;记得保持积极的态度、认真准备面试&#xff0c;同时也要对自己的能力和经验有清晰的认识&#xff0c;这样才能在竞争激烈的校园招聘中脱颖而出&#xff0c;成功获得心仪的工作机会. 1. 校招资源获取 想要参加校招&#xff0c;首先需要获取校招资…

操作系统内功篇:硬件结构之CPU是如何执行任务的?

一 CPU是如何读写数据的&#xff1f; 1.1 CPU架构(组成) 当代CPU一般是多核心的&#xff0c;每个核心都有自己的一个L1和L2Cache&#xff0c;L3Cache是一个CPU所有核心共享的&#xff0c;一个CPU只有一个。L1Cache分为数据缓存和指令缓存。 CPU有三层高速缓存的目的就是将Cac…

基于SSM+Jsp+Mysql的记账管理系统

开发语言&#xff1a;Java框架&#xff1a;ssm技术&#xff1a;JSPJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包…

C# Solidworks二次开发:获取主窗口API和创建新活动窗口API详解

今天要讲的是Solidworks中的两个API。 &#xff08;1&#xff09;Frame Method (ISldWorks)&#xff1a;获取SOLIDWORKS主框架。 下面是API中给出的例子&#xff1a; public void Main(){ModelDoc2 swModelDoc default(ModelDoc2);Frame swFrame default(Frame);ModelWindow…

jmeter之接口功能自动化

一、接口测试简述 接口&#xff1a;用来连接前端&#xff0c;后端还有移动端的程序模块。由于不同端的工作进度不一样&#xff0c;需要对最开始出来的接口进行接口测试。 接口分类&#xff1a;POST&#xff0c;GET&#xff0c;PUT&#xff0c;DELETE。 POST请求的数据是放在…

战略精进·第1篇:市场洞察的操作要点分享

首发&#xff1a;麦子禾咨询 作者&#xff1a;石头 近期本来没有计划写战略相关主题的&#xff0c;毕竟在3月初&#xff0c;石头定下来的方向是「大客户经营」&#xff0c;也在围绕大客户主题做些积累。 为什么要打乱现有的节奏&#xff0c;原因很简单&#xff0c;近期正在接…

NIO简介以及用NIO实现一个群聊系统

一、BIO的工作原理 传统Io(BIO)的本质就是面向字节流来进行数据传输的 ①:当两个进程之间进行相互通信&#xff0c;我们需要建立一个用于传输数据的管道(输入流、输出流)&#xff0c;原来我们传输数据面对的直接就是管道里面一个个字节数据的流动&#xff08;我们弄了一个 by…

利用Base64加密算法将数据加密解密

1. Base64加密算法 Base64准确来说并不像是一种加密算法&#xff0c;而更像是一种编码标准。 我们知道现在最为流行的编码标准就是ASCLL&#xff0c;它用八个二进制位&#xff08;一个char的大小&#xff09;表示了127个字符&#xff0c;任何二进制序列都可以用这127个字符表…

实用工具推荐:适用于 TypeScript 网络爬取的常用爬虫框架与库

随着互联网的迅猛发展&#xff0c;网络爬虫在信息收集、数据分析等领域扮演着重要角色。而在当前的技术环境下&#xff0c;使用TypeScript编写网络爬虫程序成为越来越流行的选择。TypeScript作为JavaScript的超集&#xff0c;通过类型检查和面向对象的特性&#xff0c;提高了代…

武汉星起航电商:跨境创业领航者,一站式服务助您轻松启航

在当今全球化的浪潮中&#xff0c;跨境电商以其独特的优势&#xff0c;成为越来越多创业者的首选。然而&#xff0c;如何在这个竞争激烈的市场中脱颖而出&#xff0c;实现业务的快速增长&#xff0c;却成为摆在创业者面前的一大难题。武汉星起航电子商务有限公司&#xff0c;正…

【go从入门到精通】if else 条件控制

作者简介&#xff1a; 高科&#xff0c;先后在 IBM PlatformComputing从事网格计算&#xff0c;淘米网&#xff0c;网易从事游戏服务器开发&#xff0c;拥有丰富的C&#xff0c;go等语言开发经验&#xff0c;mysql&#xff0c;mongo&#xff0c;redis等数据库&#xff0c;设计模…

面试算法-82-不同路径

题目 一个机器人位于一个 m x n 网格的左上角 &#xff08;起始点在下图中标记为 “Start” &#xff09;。 机器人每次只能向下或者向右移动一步。机器人试图达到网格的右下角&#xff08;在下图中标记为 “Finish” &#xff09;。 问总共有多少条不同的路径&#xff1f; …

OpenHarmony实战开发-手写板应用开发操作流程

分布式手写板&#xff08;ArkTS&#xff09; 介绍 本篇Codelab使用设备管理及分布式键值数据库能力&#xff0c;实现多设备之间手写板应用拉起及同步书写内容的功能。操作流程&#xff1a; 设备连接同一无线网络&#xff0c;安装分布式手写板应用。进入应用&#xff0c;点击…