RabbitMQ实践——搭建多人聊天服务

news2025/1/10 2:57:27

大纲

  • 用户登录
  • 创建聊天室
  • 监听Stream(聊天室)
  • 发送消息
  • 实验
    • 登录
      • Tom侧
      • Jerry侧
    • 创建聊天室
      • Jerry侧
      • Tom侧
    • 进入聊天室
      • Jerry侧
      • Tom侧
    • 发送消息
      • Jerry发送消息
        • Jerry侧聊天室
        • Tom侧聊天室
      • Tom发送消息
        • Jerry侧聊天室
        • Tom侧聊天室
  • 代码工程
  • 参考资料

在《RabbitMQ实践——搭建单人聊天服务》一文中,我们搭建了Tom和Jerry两人的聊天服务。在这个服务中,它们都向Fanout交换器发送消息。而Fanout会将消息路由到它们两各自监听的队列。这样它们就可以得到全部消息。
在这里插入图片描述
如果是多人聊天,比如10个人聊天,按上述方案,需要Fanout交换器绑定10个队列。这就会使得结构变得非常复杂。
这是因为Classic类型队列在消费者确认读取消息后,会将消息从队列中删除。这样就需要我们使用fanout向多个队列路由消息,以供不同消费者消费。如果多个消费者消费同一个队列,则会导致每个消费者得到的都是部分信息。这就不符合我们理解的聊天场景。
但是我们可以使用Stream类型队列来解决这个问题。
Stream类型队列和之前的Classic队列的不同点是:Stream队列并不会清除消息。消息会一直存在于Stream队列中,消费者可以从指定位置开始读取消息。这样我们只要有一个Stream队列保存消息,所有消费者都从队列中读取消息即可。
在这里插入图片描述

用户登录

关于用户登录的流程我们在《RabbitMQ实践——搭建单人聊天服务》中已经有详细的介绍。即上图中黑色字体1、2、3、4、5的步骤。

创建聊天室

我们会创建一个以聊天室名称命名的交换器和Stream类型队列。即上图中黑色字体6、7、8、9的步骤。
需要注意的是Stream类型队列创建方案和Classic类型类似,只需要多指定"x-queue-type"=“stream”。但是对于Durable(持久化)只能设置为True,exclusive只能设置为False,autoDelete只能设置为False。

package com.rabbitmq.chat.service;

import java.util.Collections;
import java.util.Date;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import reactor.core.publisher.Flux;

@Service
public class ChatRoomV2 {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void createChatRoom(String admin, String roomName) {
        createChatRoom(roomName);
    }

    private void createChatRoom(String roomName) {
        rabbitTemplate.execute(action -> {
            action.exchangeDeclare(roomName, "fanout", false, true, null);
            action.queueDeclare(roomName, true, false, false,
                Collections.singletonMap("x-queue-type", "stream"));
            action.queueBind(roomName, roomName, "");
            return null;
        });
    }

聊天室创建完毕后,会通知所有登录的用户。

    @PostMapping("/create")
    public void create(@RequestParam String admin, @RequestParam String roomName) {
        chatRoomV2.createChatRoom(admin, roomName);
        core.notifyEveryone(roomName + " is created");
    }

监听Stream(聊天室)

    public Flux<String> receive(String username, String roomName) {
        return Flux.create(emitter -> {
            rabbitTemplate.execute(channel -> {
                channel.basicQos(100);
                Date timestamp = new Date(System.currentTimeMillis());
                channel.basicConsume(roomName, false, username,
                    false, true,
                        Collections.singletonMap("x-stream-offset", timestamp),
                        (consumerTag, message) -> {
                            String senderOfMessage = message.getProperties().getHeaders().get("username").toString();
                            String show = "You Said: ";
                            if (!senderOfMessage.equals(username)) {
                                show = senderOfMessage + " Said: ";
                            }
                            show += new String(message.getBody());
                            System.out.println(show);
                            emitter.next(show);
                            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
                        },
                        consumerTag -> { }
                );
                return null;
            });
        });
    }

我们将"x-stream-offset"设置为当前毫秒数,是表示我们只读取当前时间之后发布的消息。这也符合聊天室的业务特点:不能读取历史消息。
当我们收到消息后,我们会获取消息Header中的自定义字段username,它标志了消息的发布者。如果发布者和读取者是同一人,我们将展示内容前面新增“You Said:”;如果是别人说的,则标记发布者的名称。
由于我们使用了WebFlux响应式编程,所以Controller层要做特殊处理

    @GetMapping(value = "/receive", produces = "text/event-stream")
    public Flux<String> receive(@RequestParam String username, @RequestParam String roomName) {
        return chatRoomV2.receive(username, roomName);
    }

发送消息

每个聊天室用户只要给之前创建的Fanout交换器发送消息即可。在这一步,我们给他们发送的消息Header中新增了字段username,以标记是谁发送的。

    public void send(String username, String roomName, String message) {
        Message msg = MessageBuilder.withBody(message.getBytes())
            .setHeader("username", username)
            .build();
        rabbitTemplate.send(roomName, "", msg);
    }

实验

登录

Tom侧

在这里插入图片描述

Jerry侧

在这里插入图片描述

创建聊天室

Jerry侧

Jerry申请创建一个聊天室
在这里插入图片描述
在管理后台,我们看到对应的交换器和Stream都创建出来了。
在这里插入图片描述
在这里插入图片描述
同时在刚才的登录接口界面,Jerry收到了通知
在这里插入图片描述

Tom侧

Tom也会收到通知
在这里插入图片描述

进入聊天室

Tom和Jerry在收到通知后,可以通过receive接口进入聊天室,监听聊天室内容变化。

Jerry侧

在这里插入图片描述

Tom侧

在这里插入图片描述

发送消息

Jerry发送消息

在这里插入图片描述

Jerry侧聊天室

在这里插入图片描述

Tom侧聊天室

在这里插入图片描述

Tom发送消息

在这里插入图片描述

Jerry侧聊天室

在这里插入图片描述

Tom侧聊天室

在这里插入图片描述

代码工程

https://github.com/f304646673/RabbitMQDemo

参考资料

  • https://www.rabbitmq.com/docs/streams

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1863291.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LongRAG:增强长上下文大语言模型的检索增强生成

这篇论文的标题是《LongRAG: Enhancing Retrieval-Augmented Generation with Long-context LLMs》&#xff0c;由滑铁卢大学的Ziyan Jiang、Xueguang Ma和Wenhu Chen撰写。论文主要探讨了在传统的检索增强生成&#xff08;RAG&#xff09;框架中存在的一些问题&#xff0c;并提…

Linux基础 - 存储结构与管理硬盘

目录 零. 简介 一. 文件系统 Ubuntu 文件系统结构&#xff1a; 路径: 二. 硬盘管理 零. 简介 Linux 文件系统是一种用于组织和存储文件、目录以及相关数据的架构。 常见的 Linux 文件系统有&#xff1a; Ext4&#xff08;Fourth Extended File System&#xff09;&#…

数据结构:队列详解 c++信息学奥赛基础知识讲解

目录 一、队列概念 二、队列容器 三、队列操作 四、代码实操 五、队列遍历 六、案例实操 题目描述&#xff1a; 输入格式&#xff1a; 输出格式&#xff1a; 输入样例&#xff1a; 输出样例&#xff1a; 详细代码&#xff1a; 一、队列概念 队列是一种特殊的线性…

了解负载均衡器

现代系统变得越来越复杂&#xff0c;但这种复杂性确保了处理大量的网络流量和请求。 简单来说&#xff0c;负载均衡器的主要思想就像它的名字一样&#xff0c;它跨服务器提供直接的客户端请求。换句话说&#xff0c;负载均衡器是在多台服务器之间分配网络或应用程序流量的系统…

【系统架构设计师】六、信息系统基础知识(定义|分类|企业信息化系统|生命周期|建设原则|开发方法)

目录 一、信息系统的定义 二、信息系统的分类 三、企业使用的信息化系统 四、信息系统的生命周期 五、信息系统建设原则 六、信息系统的开发方法 6.1 结构化方法 6.2 原型法 6.3 构件化开发方法 6.4 面向服务的方法 6.5 面向对象的方法 6.6 敏捷方法 历年真题考情&#x…

AMSR-E/Aqua L1A 原始观测次数,第 3 版

AMSR-E/Aqua L1A Raw Observation Counts, Version 3 简介 改进后的 V003 AMSREL1A 产品对共同登记参数 A1 和 A2 进行了经验修正&#xff0c;并更新了用于修正 AMSR-E 89 GHz 位置信息的参数文件。因此&#xff0c;第三版 AMSREL1A 数据提高了以下方面的精度&#xff1a;纬度…

【个人博客搭建】(26)发布后端webapi项目

1、选择启动的webapi&#xff0c;右击发布 2、选择左下角的“显示所有设置” 在上一页按钮那边是发布文件夹的目录 地址&#xff0c; 现在界面的就是配置的信息&#xff0c; 配置&#xff1a;Debug、Release 目标框架&#xff1a;我们用的net8.0&#xff0c;就是他&#xff…

Golang | Leetcode Golang题解之第191题位1的个数

题目&#xff1a; 题解&#xff1a; func hammingWeight(num uint32) (ones int) {for ; num > 0; num & num - 1 {ones}return }

C语言 | Leetcode C语言题解之第191题位1的个数

题目&#xff1a; 题解&#xff1a; int hammingWeight(uint32_t n) {int ret 0;while (n) {n & n - 1;ret;}return ret; }

S_LOVE多端恋爱小站小程序源码 uniapp多端

S_LOVE多端恋爱小站小程序源码&#xff0c;采用uniapp多端开发框架进行开发&#xff0c;目前已适配H5、微信小程序版本。 源码下载&#xff1a;https://download.csdn.net/download/m0_66047725/89421726 更多资源下载&#xff1a;关注我。

数据库系统体系结构-DBMS的三级模式结构、DBMS的工作方式、模式定义语言、二级映射

一、体系结构的概念 1、大多数DBMS遵循三级模式结构 &#xff08;1&#xff09;外模式 &#xff08;2&#xff09;概念模式 &#xff08;3&#xff09;内模式 2、DBMS的体系结构描述的应该是系统的组成结构及其联系以及系统结构的设计和变化的原则等 3、1978年美国国家标…

在flask中加载mnist模型,并预测图片

一、在tensorflow中新建及保存模型 启动Jupyter Notebook 新建Notebook 生成 mnist_model.h5 模型的代码 import tensorflow as tf from tensorflow.keras.datasets import mnist from tensorflow.keras.models import Sequential from tensorflow.keras.layers import…

【Oracle篇】逻辑备份工具expdp(exp)/impdp(imp)和物理备份工具rman的区别和各自的使用场景总汇(第八篇,总共八篇)

&#x1f4ab;《博主介绍》&#xff1a;✨又是一天没白过&#xff0c;我是奈斯&#xff0c;DBA一名✨ &#x1f4ab;《擅长领域》&#xff1a;✌️擅长Oracle、MySQL、SQLserver、阿里云AnalyticDB for MySQL(分布式数据仓库)、Linux&#xff0c;也在扩展大数据方向的知识面✌️…

26.4 Django 视图层

1. 视图函数 视图函数是Django框架中用于处理Web请求并返回Web响应的重要组件. 以下是对Django视图函数的详细解释: * 1. 视图函数与URL的映射.为了让Django能够知道哪个URL对应哪个视图函数, 需要在应用的urls.py文件中定义URL模式.使用path或re_path函数来定义URL模式, 并将…

计算机视觉的职业规划

Hi&#xff0c;大家好。我是茶桁。 今天这节课呢&#xff0c;咱们先不着急讲原理&#xff0c;先来讲讲职业规划的话题。 如果想要直接上手企业级的 AI 项目&#xff0c;可以看看咱们的「AI 人工智能企业项目实战」。 趋势和薪资 首先&#xff0c;先来讲讲就业的趋势。其实学…

Python学习笔记20:进阶篇(九)常见标准库使用之sys模块和re模块

前言 本文是根据python官方教程中标准库模块的介绍&#xff0c;自己查询资料并整理&#xff0c;编写代码示例做出的学习笔记。 根据模块知识&#xff0c;一次讲解单个或者多个模块的内容。 教程链接&#xff1a;https://docs.python.org/zh-cn/3/tutorial/index.html 错误输出…

python笔记3

1.通过乘法多次打印&#xff0c;以及字符串相加的合体打印 xzzz yyyy print(xy) print(x*10)#与一个数为打印多少次 2.设置俩个变量&#xff0c;可以通过下面的方法来判断是否一个元素是否在另一个元素中&#xff0c;返回bool值 xzzz yyyy print(xy) print(x*10)#与一个数为打…

算法学习笔记——单双链表及其反转—堆栈诠释

单双链表及其反转——堆栈诠释 按值传递 int、long、byte、short、char、float、double、boolean和String 都是按值传递 概念&#xff1a;在方法被调用时&#xff0c;实参通过形参把它的内容副本传入方法内部&#xff0c;此时形参接收到的内容是实参值的一个拷贝&#xff0c;…

Windows 获取打印机及端口号方法 (C#)

1. 打开注册表编辑器 regedit 2.选择如下配置 计算机\HKEY_CURRENT_USER\SOFTWARE\Microsoft\Windows NT\CurrentVersion\Device 3. 代码 C# using System; using Microsoft.Win32;class Program {static void Main(){string registryPath "SOFTWARE\Microsoft\Windows …

解决pycharm安装dlib失败的问题

今天使用pycharm来学习opencv人脸识别库face-recognition的时候出现了一点小问题&#xff0c;在pycharm中直接安装face-recognition会失败&#xff0c;说是因为缺少依赖库dlib&#xff0c;但是直接使用pycharm安装dlib库也有问题&#xff0c;不知道大家遇到没有 错误提示 note…