[集群聊天服务器]----(十一) 使用Redis实现发布订阅功能

news2025/1/15 20:36:45

接着上文,[集群聊天服务器]----(十)Nginx的tcp负载均衡配置–附带截图,我们配置nginx,使用了多台服务端来提高单机的并发量,接下来我们回到项目中,思考一下,各个服务端之间怎么进行通信呢?

配置Nginx以后,怎么保证跨服务器通信呢?

使用集群服务器,有多个服务器维护自己的用户列表。ChatServer1与ChatServer2的用户聊天,ChatServer1在自己服务器的用户表中找不到,但是可能用户在线,所以我们需要保证跨服务器间的通信!
但是如果让后端的服务器之间互相连接,让各个ChatServer服务器互相之间直接建立TCP连接进行通信,相当于在服务器网络之间进行广播。ChatServer维护了一个连接的用户表,每次向别的用户发消息都会从用户表中查看对端用户是否在线。然后再判断是直接发送,还是转为离线消息。这样的设计使得各个服务器之间耦合度太高 ,不利于系统扩展,并且会占用系统大量的socket资源,各服务器之间的带宽压力很大,不能够节省资源给更多的客户端提供服务,并且存在一个服务器瘫痪其余都崩溃的情况,不采用
在这里插入图片描述
所以引入中间件消息队列,解耦各个服务器, 使整个系统松耦合,提高服务器的响应能力,节省服务器的带宽资源,在集群分布式环境中,经常使用的中间件消息队列有ActiveMQ、RabbitMQ、Kafka等,都是应用场景广泛并且性能很好的消息队列,供集群服务器之间,分布式服务之间进行消息通信。但是本项目业务类型并不是非常复杂,对并发请求量也没有太高的要求,因此我们的中间件消息队列选型的是-基于发布-订阅模式的redis。有关于redis的安装,在我的另一篇博客中有详细的介绍,Linux下安装redis并配置开机自启保姆级教程-----附带每一步截图
在这里插入图片描述

Redis发布-订阅

Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。
Redis 客户端可以订阅任意数量的通道。当有新消息通过 publish 命令发送给通道 时, 这个消息就会被发送给订阅它的客户端。
需要注意的是:这里的subscribe是以阻塞的形式等待publish端发送消息的,publish是一有消息就发送的。

实现

重要成员变量

// hiredis同步上下文对象,负责publish消息
redisContext *_publish_context;

// hiredis同步上下文对象,负责subscribe消息
redisContext *_subcribe_context;

// 回调操作,收到订阅的消息,给service层上报
function<void(int, string)> _notify_message_handler;
  • redisContext为redis提供的类

重要成员函数

Redis();
~Redis();

// 连接redis服务器 
bool connect();

// 向redis指定的通道channel发布消息
bool publish(int channel, string message);

// 向redis指定的通道subscribe订阅消息
bool subscribe(int channel);

// 向redis指定的通道unsubscribe取消订阅消息
bool unsubscribe(int channel);

// 在独立线程中接收订阅通道中的消息
void observer_channel_message();

// 初始化向业务层上报通道消息的回调对象
void init_notify_handler(function<void(int, string)> fn);

构造与析构函数

Redis::Redis()
    : _publish_context(nullptr), _subcribe_context(nullptr)
{
}

Redis::~Redis()
{
    //释放资源
    if (_publish_context != nullptr)
    {
        redisFree(_publish_context);
    }

    if (_subcribe_context != nullptr)
    {
        redisFree(_subcribe_context);
    }
}
  • 构造与析构函数重要完成对两个对象的初始化以及释放资源

连接函数

bool Redis::connect()
{
    _publish_context = redisConnect("127.0.0.1", 6379);
    if (nullptr == _publish_context)
    {
        cerr << "connect redis failed!" << endl;
        return false;
    }

    // 负责subscribe订阅消息的上下文连接
    _subcribe_context = redisConnect("127.0.0.1", 6379);
    if (nullptr == _subcribe_context)
    {
        cerr << "connect redis failed!" << endl;
        return false;
    }

    // 在单独的线程中,监听通道上的事件,有消息给业务层进行上报
    thread t([&]()
             { observer_channel_message(); });
    t.detach();

    cout << "connect redis-server success!" << endl;

    return true;
}
  • _publish_context 负责publish发布消息的上下文连接 6379 是 redis-server 监听的端口号
  • _subcribe_context负责subscribe订阅消息的上下文连接
  • 在单独的线程中,监听通道上的事件,有消息给业务层进行上报

发布消息

bool Redis::publish(int channel, string message)
{
    redisReply *reply = (redisReply *)redisCommand(_publish_context, "PUBLISH %d %s", channel, message.c_str());
    if (nullptr == reply)
    {
        cerr << "publish command failed!" << endl;
        return false;
    }
    freeReplyObject(reply); //释放
    return true;
}
  • 主要完成向redis指定的通道channel发布消息
  • 值得注意的是: redisCommand相当于在redis中敲了一个命令 通道号和消息,先把要发送的命令 缓存到本地 调用了redisAppendCommand,然后调用了redisBufferWrite 把命令发送到redis-server上,最后调用redisGetReply 阻塞等待redis server响应消息,publish一执行马上就回复了,所以可以使用redisCommand
  • 注意释放资源

订阅消息

bool Redis::subscribe(int channel)
{
    if (REDIS_ERR == redisAppendCommand(this->_subcribe_context, "SUBSCRIBE %d", channel))
    {
        cerr << "subscribe command failed!" << endl;
        return false;
    }
    // redisBufferWrite可以循环发送缓冲区,直到缓冲区数据发送完毕(done被置为1)
    int done = 0;
    while (!done)
    {
        if (REDIS_ERR == redisBufferWrite(this->_subcribe_context, &done))
        {
            cerr << "subscribe command failed!" << endl;
            return false;
        }
    }
    // redisGetReply

    return true;
}
  • 主要完成向redis指定的通道subscribe订阅消息
  • 值得注意的是: 订阅消息不会向发布消息一样使用redisCommand命令。因为subscribe命令本身会造成线程阻塞等待通道里面发生消息,这里只做订阅通道,不接收通道消息,通道消息的接收专门在observer_channel_message函数中的独立线程中进行,只负责发送命令,不阻塞接收redis server响应消息,否则和notifyMsg线程抢占响应资源

取消订阅

bool Redis::unsubscribe(int channel)
{
    if (REDIS_ERR == redisAppendCommand(this->_subcribe_context, "UNSUBSCRIBE %d", channel))
    {
        cerr << "unsubscribe command failed!" << endl;
        return false;
    }
    // redisBufferWrite可以循环发送缓冲区,直到缓冲区数据发送完毕(done被置为1)
    int done = 0;
    while (!done)
    {
        if (REDIS_ERR == redisBufferWrite(this->_subcribe_context, &done))
        {
            cerr << "unsubscribe command failed!" << endl;
            return false;
        }
    }
    return true;
}
  • 主要完成向redis指定的通道unsubscribe取消订阅消息

在独立线程中接收订阅通道中的消息

void Redis::observer_channel_message()
{
    redisReply *reply = nullptr;
    while (REDIS_OK == redisGetReply(this->_subcribe_context, (void **)&reply))
    {
        if (reply != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr)
        {
            // 给业务层上报通道上发生的消息
            _notify_message_handler(atoi(reply->element[1]->str), reply->element[2]->str);
        }

        freeReplyObject(reply);
    }

    cerr << ">>>>>>>>>>>>> observer_channel_message quit <<<<<<<<<<<<<" << endl;
}
  • 订阅收到的消息是一个带三元素的数组 element[2] 就是消息 element[1] 通道号

初始化向业务层上报通道消息的回调对象

void Redis::init_notify_handler(function<void(int, string)> fn)
{
    this->_notify_message_handler = fn;
}

怎么在项目中使用呢?

在前面的剖析中,我们多多少少也看到了redis的身影,主要是在业务模块使用了它,下面我们在具体看一下在那些部分使用到了redis。

  • 在ChatService类中,首先我们创建了一个redis的操作对象
Redis _redis;
  • 在用户登录成功以后,我们向redis订阅了通道,这里使用id作为通道号
_redis.subscribe(id);
  • 然后创建了一个函数从redis消息队列中获取订阅的消息
void handleRedisSubscribeMessage(int, string);
  • 利用这个操作对象,我们连接了服务器,并设置了上报消息的回调
if(_redis.connect())
{
    _redis.init_notify_handler(std::bind(&ChatService::handleRedisSubscribeMessage, this, _1,_2));
}
  • 在查询到用户在线,但是不在同一个服务端的时候,我们就会调用redis的回调函数
if (user.getState() == "online")
    {
        _redis.publish(toid, js.dump());
        return;
    }
  • 具体实现如下:
void ChatService::handleRedisSubscribeMessage(int userid, string msg)
{
    lock_guard<mutex> lock(_connMutex);
    auto it = _userConnMap.find(userid);
    if (it != _userConnMap.end())
    {
        it->second->send(msg);
        return;
    }

    // 存储该用户的离线消息
    _offlineMsgModel.insert(userid, msg);
}
  • 根据userid寻找用户是否存在,存在就发送消息,不存在就存储他的离线消息
  • 用户注销,在redis中取消订阅通道
 _redis.unsubscribe(userid);

具体的使用就这么多,实现起来还是很简单的,完全足够本项目的开发。

项目测试

剖析到这里,整个项目就完结撒花了,接下来我们来做一个简单的测试,这里再次给出源码地址,在readme中,给出了详细的编译步骤,也给出了一键编译脚本,感兴趣的伙伴们可以拉下来试试。
在这里插入图片描述

  • 编译结束以后,我们启动两个服务端6000 6002 在nginx配置的两个
    在这里插入图片描述

  • 然后开启三个客户端,记得打开8000端口
    在这里插入图片描述
    在这里插入图片描述

  • 此时客户端,分配给了两个服务端
    在这里插入图片描述
    在这里插入图片描述

  • 进入一个终端,我们查看表里的内容
    在这里插入图片描述

  • 注册三个用户
    在这里插入图片描述

  • 表中数据,1 2是之前创建过的
    在这里插入图片描述

  • 登录
    在这里插入图片描述

  • 一对一聊天 3向4聊天,4不在线
    在这里插入图片描述

  • 查看离线消息
    在这里插入图片描述

  • 添加好友 3添加4
    在这里插入图片描述

  • 查看好友列表
    在这里插入图片描述

  • 1创建群
    在这里插入图片描述
    在这里插入图片描述

  • 4登录,显示离线消息
    在这里插入图片描述

  • 加入群
    在这里插入图片描述

  • 查看表
    在这里插入图片描述

  • 5登录,加入群

  • 群聊天
    在这里插入图片描述

  • 3与4聊天
    在这里插入图片描述

  • 3客户端退出
    在这里插入图片描述
    在这里插入图片描述

  • 服务端退出【ctrl+c】
    在这里插入图片描述

好了~ 关于集群聊天服务器的剖析就到此结束了,希望能够帮助到大家,也希望路过的大佬看到问题可以指出,感谢大家的支持,完结撒花~

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1705707.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

专业145+总410+成电电子科技大学858信号与系统考研经验电子信息与通信工程,抗干扰,空天,资环,真题,大纲,参考书。

今年考研总分410,专业课858信号与系统145&#xff0c;顺利上岸成电&#xff0c;毕设已经搞得七七八八&#xff0c;就等毕业了&#xff0c;抽空整理回顾一下去年的复习&#xff0c;给群里的同学提供一些参加&#xff0c;少走弯路&#xff0c;对于整体复习的把握有个大概得规划。…

Unity 之 Android 【获取设备的序列号 (Serial Number)/Android_ID】功能的简单封装

Unity 之 Android 【获取设备的序列号 (Serial Number)/Android_ID】功能的简单封装 目录 Unity 之 Android 【获取设备的序列号 (Serial Number)/Android_ID】功能的简单封装 一、简单介绍 二、获取设备的序列号 (Serial Number) 实现原理 1、Android 2、 Unity 三、注意…

notepad++ 模糊替换规则

AUTO_INCREMENT\d AUTO_INCREMENT0 ALTER TABLE .* AUTO_INCREMENT0;

计算机网络——在地址栏输入网址(URL)之后都发生了什么

网址&#xff0c;也叫域名&#xff0c;域名就像一个 IP 地址的可读版本&#xff0c;比如&#xff0c;百度的域名 www.baidu.com&#xff0c;他的 ip 是 110.242.68.3&#xff0c;输入 IP 一样可以跳转到百度搜索的页面&#xff0c;我想没有一个人没去记百度的 IP 吧。其实我们真…

Docker 快速更改容器的重启策略(Restart Policies)以及重启策略详解

目录 1. 使用 docker update 命令2. 在启动容器时指定重启策略3. 在 Docker Compose 文件中指定重启策略4. 总结 官方文档&#xff1a;Start containers automatically 1. 使用 docker update 命令 Docker 提供了 docker update 命令&#xff0c;可以在容器运行时更改其重启策…

Audition 2024 for Mac/Win:音频录制与编辑的卓越之选

随着数字媒体的不断发展&#xff0c;音频内容创作已经成为各行各业中不可或缺的一部分。无论是音乐制作、广播节目、播客录制还是影视配音&#xff0c;都需要高品质的音频录制和编辑工具来实现专业水准的作品。在这个充满竞争的时代&#xff0c;要想在音频创作领域脱颖而出&…

JAVASE总结一

1、 2、引用也可以是成员变量&#xff08;实例变量&#xff09;&#xff0c;也可以是局部变量&#xff1b;引用数据类型&#xff0c;引用&#xff0c; 我们是通过引用去访问JVM堆内存当中的java对象&#xff0c;引用保存了java对象的内存地址&#xff0c;指向了JVM堆内存当中…

java项目启动报错

java项目启动报错&#xff1a;java: java.lang.NoSuchFieldError: Class com.sun.tools.javac.tree.JCTree$JCImport does not have member field ‘com.sun.tools.javac.tree.JCTree qualid’ 原因&#xff1a;编译和运行的版本不一样 点击idea文件 点击项目结构 把这两个版本…

埃及媒体分发投放-新闻媒体通稿发布

埃及商业新闻 大舍传媒近日宣布将在埃及商业新闻领域展开新的媒体分发投放。作为埃及最具影响力的商业新闻平台之一&#xff0c;埃及商业新闻将为大舍传媒提供广阔的市场和受众群体。这一合作意味着大舍传媒将有机会通过埃及商业新闻的平台向埃及的商业精英和投资者传递最新的…

记录一次安装k8s初始化失败

实例化 kubeadm init --configkubeadm.yaml --ignore-preflight-errorsSystemVerification报错 [init] Using Kubernetes version: v1.25.0 [preflight] Running pre-flight checks error execution phase preflight: [preflight] Some fatal errors occurred:[ERROR CRI]: co…

引领智能校对行业的革新者:爱校对

我们很高兴向大家介绍爱校对&#xff0c;这是交互未来&#xff08;北京&#xff09;科技有限公司推出的一款前沿智能校对产品。爱校对的诞生&#xff0c;源自清华大学计算机智能人机交互实验室&#xff0c;结合了最先进的技术与理念&#xff0c;旨在为用户提供高效、精准的智能…

【Chapter5】死锁与饥饿,计算机操作系统教程,第四版,左万利,王英

文章目录 1.1 什么是死锁1.2 死锁的类型1.2.1 竞争资源引起的死锁1.2.2 进程间通信引起的死锁1.2.3 其他原因引起的死锁 1.3 死锁产生必要条件1.4 死锁的处理策略1.5 死锁的预防1.5.1 破坏资源独占条件1.5.2 破坏不可剥夺条件1.5.3 破坏保持申请条件1.5.4 破坏循环等待条件 1.6…

R可视化:另类的箱线图

介绍 方格状态的箱线图 加载R包 knitr::opts_chunk$set(echo TRUE, message FALSE, warning FALSE) library(patternplot) library(png) library(ggplot2) library(gridExtra)rm(list ls()) options(stringsAsFactors F)导入数据 data <- read.csv(system.file(&qu…

记一次Chanakya靶机的渗透测试

Chanakya靶机渗透测试 首先通过主机发现发现到靶机的IP地址为:172.16.10.141 然后使用nmap工具对其进行扫描:nmap -sC -sV -sS -p- 172.16.10.141 发现目标靶机开启了80,22,21等多个端口&#xff0c; 访问80端口,发现是一个普通的页面,点击进入多个界面也没有其他有用的信息,然…

PaliGemma – 谷歌的最新开源视觉语言模型(一)

引言 PaliGemma 是谷歌推出的一款全新视觉语言模型。该模型能够处理图像和文本输入并生成文本输出。谷歌团队发布了三种类型的模型&#xff1a;预训练&#xff08;PT&#xff09;模型、混合&#xff08;Mix&#xff09;模型和微调&#xff08;FT&#xff09;模型&#xff0c;每…

【二叉树】非递归实现前中后序遍历

目录 前言 算法思想 非递归实现前序遍历 过程分析 代码 非递归实现中序遍历 过程分析 代码 非递归实现后序遍历 过程分析 代码 前言 1&#xff09;前序&#xff1a;根 左子树 右子树 2&#xff09;中序&#xff1a;左子树 根 右子树 3&#xff09;后序&#xff1…

Mysql 8.0 主从复制及读写分离搭建记录

前言 搭建参考&#xff1a;搭建Mysql主从复制 为什么要做主从复制&#xff1f; 做数据的热备&#xff0c;作为后备数据库&#xff0c;主数据库服务器故障后&#xff0c;可切换到从数据库继续工作&#xff0c;避免数据丢失。架构的扩展。业务量越来越大&#xff0c;I/O访问频…

PID控制中积分项目的理解,消除稳态误差的作用,表示着过去(PID积分控制)

1&#xff0c;消除稳态误差 积分项目是对于历史误差进行的累积&#xff0c;可以理解&#xff0c;系统的误差累积表示不断的在减少误差&#xff0c;最终消除误差&#xff0c;这个过程需要将误差进行累加&#xff0c;才可以真正知道误差的大小是多少&#xff0c;用最终累加的误差…

C++模板方法模式

文章目录 1. 定义抽象基类&#xff08;Abstract Class&#xff09;2. 实现具体子类&#xff08;Concrete Class&#xff09;3. 使用模板方法模板方法模式的优点模板方法模式的应用场景注意事项实现示例抽象类&#xff08;模板&#xff09;具体实现类客户端代码 总结 模板方法模…

期望薪资25K,新浪微博测试4轮面试,没想到过了。。

一面60min 1、离职原因 2、简单的算法题&#xff0c;就是我会什么让写什么&#xff1a; 冒泡排序&#xff0c;二分查找&#xff08;其实这么简单&#xff0c;我还是在指引下写出来的&#xff0c;自己实在太菜&#xff09; 3、简历问答&#xff08;随机抽几个点问&#xff0…