Websocket消息转发
项目地址:git@github.com:muyixiaoxi/Link.git
上周面试被面试官问到:“在分布式IM系统中,如何实现多个websocket集群之间的通信”。
我在思考了良久后回答:“不会”。
随着我的回答,我和面试官的故事也到此完结了…
为什么会出现websocket集群
在IM系统中,需要在服务端和客户端之间维持一个长连接,而这个长连接可以通过websocket实现。
但服务端能维持websocket的数量并不是无限的。
WebSocket的并发连接数受到多种因素的影响,其中最主要的瓶颈通常在于服务器资源。在传统的模型中,一台服务器上的最大WebSocket连接数受到操作系统中TCP/IP连接数的限制。在Linux系统中,每个IPv4地址允许的最大连接数为65535,这意味着如果每个连接都使用不同的IP地址,一台服务器最多只能维持65535个WebSocket连接。
当用户量很多时,一台websocket服务器远远是不够的,所以需要多台websocket服务器。
假如现在只有一台IM服务器(即 Websocket 服务器),用户A、用户B均在线,用户A向用户B发送一条消息
单台IM服务器发送消息大概流程如下:
- 客户端向IM服务端发送消息
- IM服务端收到消息判断用户B是否在线
- 在线,Websocket转发
- 离线,将消息存储到B的离线消息库
- 用户B立即了消息,或者在下次上线时收到了消息
因为现在只有一台IM服务器,所以直接可以判断用户B是否在线,并且转发。
假如现在我有多台IM服务器,重复上面的操作
如果恰巧A和B连接在一台IM服务器上,那么和上面的流程一样
假如现在A连接在IM1上,B连接在IM2上
其中红线的部分,就是我们要解决的部分
websocket集群通信
我查阅了网上的一些资料(这一块具体采用哪种技术栈实现,网上的资料很少),大概分为两种
方法一:互为客户端
分别将IM服务端与其他IM服务端连接起来,可以通过网络编程或者MQ来实现
优点:
- 不需要额外的服务
- 转发过程中,各个IM服务负载相对均匀
缺点:
- 每增加一个IM服务端都需要其他服务端多维持一个连接或者MQ
- 水平扩展有点繁琐
方法二:C/S
采用c/s架构,新建一个transmit服务,单独实现转发功能,可以通过网络编程或者MQ实现。
因为各个 IM 都与 transmit 连接,所以扩展只需要该配置文件的运行端口
优点:
- 高可用,IM server扩展方便,只需要修改自己的运行端口
缺点:
- 不同服务端之间的消息需要通过 transmit 转发,当海量消息时,对 transmit 压力比较大
代码实现
为了实现消息的时效性,以及高可用,我采用net包中的tcp实现了c/s架构
项目目录如下:
transmit:
│ client.go
│ main.go
│
├─common
│ └─proto
│ proto.go
│
└─types
types.go
proto
使用 net 包的 tcp 可能会出现粘包现象,封装编码与解码方法从而避免粘包
package main
import (
"bufio"
"encoding/json"
"fmt"
"github.com/zeromicro/go-zero/core/logx"
"net"
"sync"
"transmit/common/proto"
"transmit/types"
)
var Connects sync.Map
func main() {
listenClient()
}
// listenClient 监听
func listenClient() {
lister, err := net.Listen("tcp", "127.0.0.1:8333")
if err != nil {
fmt.Println("net.Listen failed:", err)
}
for {
conn, err := lister.Accept()
if err != nil {
continue
}
fmt.Println(conn.RemoteAddr().String())
Connects.Swap(conn.RemoteAddr().String(), conn)
go addReceiver(conn)
}
}
// addReceiver 向连接添加接收器
func addReceiver(conn net.Conn) {
defer conn.Close()
reader := bufio.NewReader(conn)
for {
m, err := proto.Decode(reader)
if err != nil {
fmt.Println("与客户端", conn.LocalAddr(), "断开连接")
return
}
transmit := types.TransmitMap{}
json.Unmarshal([]byte(m), &transmit)
// 读到消息后,根据服务器进行转发
for connect := range transmit.Users {
transmitMessage(conn, connect, transmit)
}
}
}
func transmitMessage(conn net.Conn, ip string, transmit types.TransmitMap) {
c, ok := Connects.Load(ip)
message := types.TransmitMap{
Users: map[string][]uint64{},
}
if !ok {
message = types.TransmitMap{
Message: types.Message{
Id: "",
From: 0,
To: 0,
Type: 100,
ContentType: 0,
Time: "",
Content: "客户端离线",
},
}
s, _ := json.Marshal(message)
msg, _ := proto.Encode(string(s))
conn.Write(msg)
fmt.Println("客户端离线:", ip)
logx.Error("connect ip offline:", ip)
return
}
message.Users[ip] = transmit.Users[ip]
message.Message = transmit.Message
j, _ := json.Marshal(message)
msg, _ := proto.Encode(string(j))
fmt.Println("ip:", ip, "msg:", string(msg))
c.(net.Conn).Write(msg)
}
main
通过监听某个端口,让 IM server与其建立间接,实现转发功能
package main
import (
"bufio"
"encoding/json"
"fmt"
"github.com/zeromicro/go-zero/core/logx"
"net"
"sync"
"transmit/common/proto"
"transmit/types"
)
var Connects sync.Map
func main() {
listenClient()
}
// listenClient 监听
func listenClient() {
lister, err := net.Listen("tcp", "127.0.0.1:8333")
if err != nil {
fmt.Println("net.Listen failed:", err)
}
for {
conn, err := lister.Accept()
if err != nil {
continue
}
fmt.Println(conn.RemoteAddr().String())
Connects.Swap(conn.RemoteAddr().String(), conn)
go addReceiver(conn)
}
}
// addReceiver 向连接添加接收器
func addReceiver(conn net.Conn) {
defer conn.Close()
reader := bufio.NewReader(conn)
for {
m, err := proto.Decode(reader)
if err != nil {
fmt.Println("与客户端", conn.LocalAddr(), "断开连接")
return
}
transmit := types.TransmitMap{}
json.Unmarshal([]byte(m), &transmit)
// 读到消息后,根据服务器进行转发
for connect := range transmit.Users {
transmitMessage(conn, connect, transmit)
}
}
}
func transmitMessage(conn net.Conn, ip string, transmit types.TransmitMap) {
c, ok := Connects.Load(ip)
message := types.TransmitMap{
Users: map[string][]uint64{},
}
if !ok {
message = types.TransmitMap{
Message: types.Message{
Id: "",
From: 0,
To: 0,
Type: 100,
ContentType: 0,
Time: "",
Content: "客户端离线",
},
}
s, _ := json.Marshal(message)
msg, _ := proto.Encode(string(s))
conn.Write(msg)
fmt.Println("客户端离线:", ip)
logx.Error("connect ip offline:", ip)
return
}
message.Users[ip] = transmit.Users[ip]
message.Message = transmit.Message
j, _ := json.Marshal(message)
msg, _ := proto.Encode(string(j))
fmt.Println("ip:", ip, "msg:", string(msg))
c.(net.Conn).Write(msg)
}
client
模拟客户端,根据自己的项目拆分到 IM server中
package main
import (
"bufio"
"encoding/json"
"fmt"
"net"
"time"
"transmit/common/proto"
"transmit/types"
)
func client() {
conn, _ := InitConnect()
go Consumer(conn)
var ip string
message := types.Message{
Id: "123",
From: 1,
To: 2,
Type: 1,
ContentType: 1,
Time: "123",
Content: "你好",
}
for {
fmt.Scan(&ip)
users := map[string][]uint64{}
users[ip] = []uint64{1, 2}
time.Sleep(2 * time.Second)
if err := Producer(conn, users, message); err != nil {
fmt.Println("Producer(conn, ip, message) failed", err)
}
}
}
func InitConnect() (conn net.Conn, err error) {
conn, err = net.Dial("tcp", "127.0.0.1:8333")
fmt.Println(conn.LocalAddr())
return
}
func Producer(conn net.Conn, user map[string][]uint64, mes types.Message) (err error) {
transmit := types.TransmitMap{
Users: user,
Message: mes,
}
message, _ := json.Marshal(transmit)
m, _ := proto.Encode(string(message))
_, err = conn.Write(m)
if err != nil {
// 重试三次,一次休眠一秒
for i := 0; i < 3 && err != nil; i++ {
time.Sleep(1 * time.Second)
_, err = conn.Write(m)
}
}
return
}
// Consumer 消费者 读消息
func Consumer(conn net.Conn) {
defer conn.Close()
reader := bufio.NewReader(conn)
for {
m, err := proto.Decode(reader)
if err != nil {
continue
}
transmit := types.TransmitMap{}
json.Unmarshal([]byte(m), &transmit)
// 读到消息后,进行转发
for _, uIds := range transmit.Users {
for _, id := range uIds {
fmt.Println(id, transmit.Message)
}
}
}
}
types
在转发群聊消息时,需要将 m 个用户转发到 n 个IM服务端上,如果单独发送需要多次发送,所以封装成 TransmitMap 进行转发。
type Message struct {
Id string `json:"id"`
From uint64 `json:"from,optional"`
To uint64 `json:"to"`
Type uint32 `json:"type"`
ContentType uint32 `json:"contentType"`
Time string `json:"time"`
Content string `json:"content"`
}
type TransmitMap struct {
Users map[string][]uint64 `json:"users"` // map[主机地址]用户集合
Message
}
为什么这里要封装一个?
type TransmitMap struct {
Users map[string][]uint64 `json:"users"`
Message
}
比如用户A、B、C、D、E、F、G在同一个群聊里,各自连接到的 IM server 如图所示
如果群聊消息采用上面单聊的转发方式
- 用户A发送一条消息
- IM server1 收到群聊消息,查找群里的其他用户(B、C、D、E、F、G)
- 判断哪些用户在当前 IM servier 上,发现 B,直接转发
- 遍历(B、C、D、E、F、G): 将在线用户消息转发消息到 transmit
- …
如果不做任何操作的化,光过程 4 就需要转发5次消息
用户A发送一条群聊消息的过程
- 用户A发送一条群聊消息
- IM server1 收到群聊消息,查找群里的其他用户(B、C、D、E、F、G)
- 判断哪些用户在当前 IM servier 上,发现 B,直接转发
- 通过 redis 判断哪些用户在线并获取主机地址,将通过map将相同地址的用户分类
map[主机地址]用户集合
,一块转发到 transmit - transmit 以
主机地址为组
,将消息发送给 IM server2 和 IM server3 - IM server2 和 IM server3 收到消息后,将消息进行转发
- 离线用户同步离线消息库
这样的好处是,有效的减少群聊消息转发的次数。
ps:如果存在哪些不足,欢迎大家在评论区指正~