Go: IM系统分布式架构方案 (6)

news2025/1/16 7:51:37

分布式部署可能遇到的问题


常见 nginx 反向代理方案

  • 假设按照上述架构方案来
  • A用户接入后connA(ws客户端) 由节点1来维护
  • B用户接入后connA(ws客户端) 由节点2来维护
  • 流程: A->B 发信息: A -> connA -> 分析处理 -> connB -> B
  • 实际上,上述流程是没有办法通信的,因为 A找不到B在哪里
  • 核心问题:系统如何将消息投递到 connB?

常用解决方案

1 ) 使用消息总线

  • 优点:简单
  • 去电:各个节点不知道彼此节点状态
  • 例子:redis/kafka/MQ

2 ) 局域网通信协议

  • 节点间通过通信协议来通信
  • 优点:简单,成本低
  • 缺点:不知道节点状态
  • 例子:UDP

3 ) 实现调度应用

  • 自己实现调度应用,保存各个客户端的状态
  • 优点:可靠
  • 缺点:复杂

基于局域网通信UDP协议解决

  • 以上三种方案,我们选择第二种来解决
  • 首先,回顾单体应用
    • 开启ws接收协程recvproc/ws发送协程sendproc
    • websocket收到消息->dispatch发送给dstid
  • 基于UDP的分布式应用
    • 开启ws接收协程recvproc/ws发送协程sendproc
    • 开启udp接收协程udprecvproc/udp发送协程udpsendproc
    • websocket收到消息->broadMsg广播到局域网
    • udp接收到收到消息->dispatch发送给dstid
    • 自己是局域网一份子,所以也能接收到消息

代码实现解决,ctrl/chat.go

package ctrl

import (
	"net/http"
	"github.com/gorilla/websocket"
	"gopkg.in/fatih/set.v0"
	"sync"
	"strconv"
	"log"
	"encoding/json"
	"net"
)

const (
	CMD_SINGLE_MSG = 10
	CMD_ROOM_MSG   = 11
	CMD_HEART      = 0
)

type Message struct {
	Id      int64  `json:"id,omitempty" form:"id"` //消息ID
	Userid  int64  `json:"userid,omitempty" form:"userid"` //谁发的
	Cmd     int    `json:"cmd,omitempty" form:"cmd"` //群聊还是私聊
	Dstid   int64  `json:"dstid,omitempty" form:"dstid"`//对端用户ID/群ID
	Media   int    `json:"media,omitempty" form:"media"` //消息按照什么样式展示
	Content string `json:"content,omitempty" form:"content"` //消息的内容
	Pic     string `json:"pic,omitempty" form:"pic"` //预览图片
	Url     string `json:"url,omitempty" form:"url"` //服务的URL
	Memo    string `json:"memo,omitempty" form:"memo"` //简单描述
	Amount  int    `json:"amount,omitempty" form:"amount"` //其他和数字相关的
}
/**
消息发送结构体
1、MEDIA_TYPE_TEXT
{id:1,userid:2,dstid:3,cmd:10,media:1,content:"hello"}
2、MEDIA_TYPE_News
{id:1,userid:2,dstid:3,cmd:10,media:2,content:"标题",pic:"http://www.baidu.com/a/log,jpg",url:"http://www.a,com/dsturl","memo":"这是描述"}
3、MEDIA_TYPE_VOICE,amount单位秒
{id:1,userid:2,dstid:3,cmd:10,media:3,url:"http://www.a,com/dsturl.mp3",anount:40}
4、MEDIA_TYPE_IMG
{id:1,userid:2,dstid:3,cmd:10,media:4,url:"http://www.baidu.com/a/log,jpg"}
5、MEDIA_TYPE_REDPACKAGR //红包amount 单位分
{id:1,userid:2,dstid:3,cmd:10,media:5,url:"http://www.baidu.com/a/b/c/redpackageaddress?id=100000","amount":300,"memo":"恭喜发财"}
6、MEDIA_TYPE_EMOJ 6
{id:1,userid:2,dstid:3,cmd:10,media:6,"content":"cry"}
7、MEDIA_TYPE_Link 6
{id:1,userid:2,dstid:3,cmd:10,media:7,"url":"http://www.a,com/dsturl.html"}

7、MEDIA_TYPE_Link 6
{id:1,userid:2,dstid:3,cmd:10,media:7,"url":"http://www.a,com/dsturl.html"}

8、MEDIA_TYPE_VIDEO 8
{id:1,userid:2,dstid:3,cmd:10,media:8,pic:"http://www.baidu.com/a/log,jpg",url:"http://www.a,com/a.mp4"}

9、MEDIA_TYPE_CONTACT 9
{id:1,userid:2,dstid:3,cmd:10,media:9,"content":"10086","pic":"http://www.baidu.com/a/avatar,jpg","memo":"胡大力"}
*/

//本核心在于形成userid和Node的映射关系
type Node struct {
	Conn *websocket.Conn
	//并行转串行,
	DataQueue chan []byte
	GroupSets set.Interface
}
//映射关系表
var clientMap map[int64]*Node = make(map[int64]*Node,0)
//读写锁
var rwlocker sync.RWMutex

// ws://127.0.0.1/chat?id=1&token=xxxx
func Chat(writer http.ResponseWriter,
	request *http.Request) {
	//fmt.Printf("%+v",request.Header)
	// 检验接入是否合法
    //checkToken(userId int64,token string)
    query := request.URL.Query()
    id := query.Get("id")
    token := query.Get("token")
    userId ,_ := strconv.ParseInt(id,10,64)
	isvalida := checkToken(userId,token)
	//如果isvalida=true
	//isvalida=false

	conn,err :=(&websocket.Upgrader{
		CheckOrigin: func(r *http.Request) bool {
			return isvalida
		},
	}).Upgrade(writer,request,nil)
	if err!=nil{
		log.Println(err.Error())
		return
	}
	// 获得conn
	node := &Node{
		Conn:conn,
		DataQueue:make(chan []byte,50),
		GroupSets:set.New(set.ThreadSafe),
	}
	// 获取用户全部群Id
	comIds := contactService.SearchComunityIds(userId)
	for _,v:=range comIds{
		node.GroupSets.Add(v)
	}
	// userid和node形成绑定关系
	rwlocker.Lock()
	clientMap[userId]=node
	rwlocker.Unlock()
	//todo 完成发送逻辑,con
	go sendproc(node)
	//todo 完成接收逻辑
	go recvproc(node)
    log.Printf("<-%d\n",userId)
	sendMsg(userId,[]byte("hello,world!"))
}

// 添加新的群ID到用户的groupset中
func AddGroupId(userId,gid int64){
	//取得node
	rwlocker.Lock()
	node,ok := clientMap[userId]
	if ok{
		node.GroupSets.Add(gid)
	}
	//clientMap[userId] = node
	rwlocker.Unlock()
	//添加gid到set
}
// ws发送协程
func sendproc(node *Node) {
	for {
		select {
		case data:= <-node.DataQueue:
			err := node.Conn.WriteMessage(websocket.TextMessage,data)
			if err!=nil{
				log.Println(err.Error())
				return
			}
		}
	}
}
// ws接收协程
func recvproc(node *Node) {
	for{
		_,data,err := node.Conn.ReadMessage()
		if err!=nil{
			log.Println(err.Error())
			return
		}
		//dispatch(data)
		//把消息广播到局域网
		broadMsg(data)
		log.Printf("[ws]<=%s\n",data)
	}
}

func init(){
	go udpsendproc()
	go udprecvproc()
}

// 用来存放发送的要广播的数据
var udpsendchan chan []byte=make(chan []byte,1024)
// 将消息广播到局域网
func broadMsg(data []byte){
	udpsendchan<-data
}
// 完成udp数据的发送协程
func udpsendproc(){
	log.Println("start udpsendproc")
	//todo 使用udp协议拨号
	con,err:=net.DialUDP("udp",nil,
		&net.UDPAddr{
			IP:net.IPv4(192,168,0,255), // 这里代表网段,这个可以在部署的时候,抽离出去配置
			Port:3000,
		})
	defer con.Close()
	if err!=nil{
		log.Println(err.Error())
		return
	}
	// 通过的到的con发送消息
	// con.Write()
	for{
		select {
		case data := <- udpsendchan:
			_,err=con.Write(data)
			if err!=nil{
				log.Println(err.Error())
				return
			}
		}
	}
}
// 完成upd接收并处理功能
func udprecvproc(){
	log.Println("start udprecvproc")
	 //todo 监听udp广播端口
	 con,err:=net.ListenUDP("udp",&net.UDPAddr{
	 	IP:net.IPv4zero,
	 	Port:3000,
	 })
	 defer con.Close()
	 if err!=nil{log.Println(err.Error())}
	// 处理端口发过来的数据
	for{
		var buf [512]byte
		n,err:=con.Read(buf[0:])
		if err!=nil{
			log.Println(err.Error())
			return
		}
		//直接数据处理
		dispatch(buf[0:n])
	}
	log.Println("stop updrecvproc")
}

// 后端调度逻辑处理
func dispatch(data[]byte){
	//todo 解析data为message
	msg := Message{}
	err := json.Unmarshal(data,&msg)
	if err!=nil{
		log.Println(err.Error())
		return
	}
	// 根据cmd对逻辑进行处理
	switch msg.Cmd {
	case CMD_SINGLE_MSG:
		sendMsg(msg.Dstid,data)
	case CMD_ROOM_MSG:
		//todo 群聊转发逻辑
		for _,v:= range clientMap{
			if v.GroupSets.Has(msg.Dstid){
				v.DataQueue<-data
			}
		}
	case CMD_HEART:
		//todo 一般啥都不做
	}
}

// 发送消息
func sendMsg(userId int64,msg []byte) {
	rwlocker.RLock()
	node,ok:=clientMap[userId]
	rwlocker.RUnlock()
	if ok{
		node.DataQueue<- msg
	}
}
// 检测是否有效
func checkToken(userId int64,token string)bool{
	//从数据库里面查询并比对
	user := userService.Find(userId)
	return user.Token==token
}

nginx 反向代理

	upstream wsbackend {
			server 192.168.0.102:8080;
			server 192.168.0.100:8080;
			hash $request_uri;
	}
	map $http_upgrade $connection_upgrade {
	      default upgrade;
	      ''      close; 
	}
    server {
	  listen  80;
	  server_name localhost;
	  location / {
	   	   proxy_pass http://wsbackend;
	  }
	  location ^~ /chat {
		   proxy_pass http://wsbackend;
		   proxy_connect_timeout 500s;
	       proxy_read_timeout 500s;
		   proxy_send_timeout 500s;
		   proxy_set_header Upgrade $http_upgrade;
	       proxy_set_header Connection "Upgrade";
	  }
	 }
}

注意,这里在 server 192.168.0.102; server 192.168.0.100; 这两台服务器上启动 chat.exe 程序
需要提前 go build 并进行相关部署,此处不在赘述,下面会有说明

打包部署

  • 我们要打包应用,同时需要 asset 和 view 目录,这两个在 go build 中是不会被打包进去的
  • 所以,我们要同时把两者和二进制程序一起进行部署
  • 这里,我们写两个脚本文件,分别对应在 window 和 linux 平台的部署文件
  • 这里说下,一般会借助 jenkins 来操作

1 )windows 下

rd /s/q release
md release
::go build -ldflags "-H windowsgui" -o chat.exe
go build -o chat.exe
COPY chat.exe release\
COPY favicon.ico release\favicon.ico
XCOPY asset\*.* release\asset\  /s /e
XCOPY view\*.* release\view\  /s /e

2 )Linux 下

#!/bin/sh
rm -rf ./release
mkdir  release
go build -o chat
chmod +x ./chat 
cp chat ./release/
cp favicon.ico ./release/ 
cp -arf ./asset ./release/
cp -arf ./view ./release/
  • 注意,linux 下这里使用 nohup: nohup ./chat >>./log.log 2>&1 &

3 ) 总结

  • 上面两种是分别在不同平台手动部署的样例
  • 实际上,我们如果借助jenkins只需要配置下linux下的相关命令即可

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1946066.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++学习笔记03-对象和类(问题-解答自查版)

前言 以下问题以Q&A形式记录&#xff0c;基本上都是笔者在初学一轮后&#xff0c;掌握不牢或者频繁忘记的点 Q&A的形式有助于学习过程中时刻关注自己的输入与输出关系&#xff0c;也适合做查漏补缺和复盘。 本文对读者可以用作自查&#xff0c;答案在后面&#xff0…

1 深度学习网络DNN

代码来自B站up爆肝杰哥 测试版本 import torch import torchvisiondef print_hi(name):print(fHi, {name}) if __name__ __main__:print_hi(陀思妥耶夫斯基)print("HELLO pytorch {}".format(torch.__version__))print("torchvision.version:", torchvi…

C++(week13): C++基础: 标准模板库 STL

文章目录 零、标准模板库 STL一、容器 (Container)1.序列式容器(1)vector2.五种遍历10.vector的迭代器失效问题 (2)deque(3)list 2.关联式容器(1)set4.set的查找(2)find() 8.set中存储自定义类型&#xff1a;三种方法 (2)multiset7.multiset的特殊操作&#xff1a;bound系列函数…

LeetCode/NowCoder-二叉树OJ练习

励志冰檗&#xff1a;形容在清苦的生活环境中激励自己的意志。&#x1f493;&#x1f493;&#x1f493; 目录 说在前面 题目一&#xff1a;单值二叉树 题目二&#xff1a;相同的树 题目三&#xff1a;对称二叉树 题目四&#xff1a;二叉树的前序遍历 题目五&#xff1a;另…

Python | Leetcode Python题解之第275题H指数II

题目&#xff1a; 题解&#xff1a; class Solution:def hIndex(self, citations: List[int]) -> int:n len(citations)left 0; right n - 1while left < right:mid left (right - left) // 2if citations[mid] > n - mid:right mid - 1else:left mid 1retur…

C语言之2048小游戏理解分析

目录 游戏程序思维导图&#xff1a; ​编辑 功能介绍&#xff1a; 代码管理&#xff1a; 主函数&#xff1a; 头文件&#xff1a; 游戏程序思维导图&#xff1a; 功能介绍&#xff1a; 按键W --------------- 向上 按键A --------------- 向左 按键S --------------- 向…

科技云报道:算网筑基AI注智,中国联通如何讲出AI时代的“新故事”?

科技云报道原创。 AI从未停止进化&#xff0c;也从未停止给人类带来惊喜。 从ChatGPT代表的文生文、Dall-E代表的文生图&#xff0c;到Sora代表的文生视频&#xff0c;Suno为代表的文生音乐&#xff0c;生成式AI的“暴力美学”持续突破内容生产的天花板&#xff0c;大模型技术…

【黑马java基础】特殊文件,日志

目录 特殊文件&#xff1a;Properties属性文件特点、作用使用Properties读取属性文件里的键值对数据使用properties把键值对数据写到属性文件中去案例 特殊文件&#xff1a;XML文件概述读取XML文件中的数据把数据写出到XML文件中去补充知识&#xff1a;约束XML文件的编写[了解]…

打卡第21天------二叉树

我现在每天都是在与时间赛跑&#xff0c;分秒必争&#xff0c;不想浪费一点我自己的时间。 希望通过算法训练营可以把我自己的逻辑思维建立起来&#xff0c;把自己的算法能力给提上去。 一、修剪二叉搜索树 题目链接&#xff1a;669. 修剪二叉搜索树 题目描述&#xff1a; 给…

最优化原理(笔记)

内积是线性代数运算的一个结果&#xff0c;一行*一列。 内积的性质&#xff01; 什么是范数&#xff1f;&#xff1f;&#xff1f; 对称矩阵&#xff1a;关于主对角线对称&#xff01; 正定对称矩阵&#xff1a; 二阶导是正定的&#xff0c;f(x)就是严格的凸函数&#xff01;&a…

element的el-autocomplete带输入建议搜索+搜索匹配文字高亮显示

element的el-autocomplete带输入建议搜索搜索匹配文字高亮显示 直接上代码 // vue代码块 添加插槽<el-autocompleteclearableplaceholder"请输入关键词进行搜索"input"searchInput"v-model"searchInputData":fetch-suggestions"queryS…

Android APP 音视频(02)MediaProjection录屏与MediaCodec编码

说明&#xff1a; 此MediaProjection 录屏和编码实操主要针对Android12.0系统。通过MediaProjection获取屏幕数据&#xff0c;将数据通过mediacodec编码输出H264码流&#xff08;使用ffmpeg播放&#xff09;&#xff0c;存储到sd卡上。 1 MediaProjection录屏与编码简介 这里…

【测开能力提升-Javascript】JavaScript运算符流程结构

1. 递增递减运算符 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title><script>// 前置递增运算符var age10age //类似于ageage1&#xff0c; 先加1后返回值alert(age)// 后置…

【数据结构 | 哈希表】一文了解哈希表(散列表)

&#x1f601;博客主页&#x1f601;&#xff1a;&#x1f680;https://blog.csdn.net/wkd_007&#x1f680; &#x1f911;博客内容&#x1f911;&#xff1a;&#x1f36d;嵌入式开发、Linux、C语言、C、数据结构、音视频&#x1f36d; &#x1f923;本文内容&#x1f923;&a…

【SQL语句大全(MySQL)】

SQL语法 添加删除修改查询基本查询条件查询分组函数/聚合函数分组查询排序分页查询&#xff08;限制查询&#xff09;多表查询连接查询根据年代分类连接查询根据连接方式分类1、内连接2、左外连接3、右外连接 多张表连接的语法格式 嵌套查询 SQL语句书写顺序 添加 INSERT INTO…

什么是STP环路保护

在运行生成树协议的网络中&#xff0c;根端口和其他阻塞端口状态是依靠不断接收来自上游设备的BPDU维持。当由于链路拥塞或者单向链路故障导致这些端口收不到来自上游交换设备的BPDU时&#xff0c;设备会重新选择根端口。原先的根端口会转变为指定端口&#xff0c;而原先的阻塞…

2019年9月全国英语等级考试第三级笔试真题

2019年9月全国英语等级考试第三级笔试真题

vue3.0学习笔记(三)——计算属性、监听器、ref属性、组件通信

1. computed 函数 定义计算属性&#xff1a; computed 函数&#xff0c;是用来定义计算属性的&#xff0c;计算属性不能修改。 计算属性应该是只读的&#xff0c;特殊情况可以配置 get set 核心步骤&#xff1a; 导入 computed 函数 执行函数 在回调参数中 return 基于响应…

尚品汇-sku存入Redis缓存(二十三)

目录&#xff1a; &#xff08;1&#xff09;分布式锁改造获取sku信息 &#xff08;2&#xff09;使用Redisson 分布式锁 AOP实现缓存 &#xff08;3&#xff09;定义缓存aop注解 &#xff08;1&#xff09;分布式锁改造获取sku信息 前面学习了本地锁的弊端&#xff0c;…

Springboot validated JSR303校验

1.导入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId></dependency> 2.测试类 package com.jmj.gulimall.product.testC;import lombok.Data;import javax.val…