RabbitMQ实践——搭建单人聊天服务

news2025/1/11 11:48:05

大纲

  • 创建Core交换器
  • 用户登录
  • 发起聊天邀请
  • 接受邀请
  • 聊天
  • 实验过程
  • 总结
  • 代码工程

经过之前的若干节的学习,我们基本掌握了Rabbitmq各个组件和功能。本文我们将使用之前的知识搭建一个简单的单人聊天服务。
基本结构如下。为了避免Server有太多连线导致杂乱,下图将Server画成两个模块,实则是一个服务。
在这里插入图片描述
该服务由两个核心交换器构成。
Core交换器是服务启动时创建的,它主要是为了向不同用户传递“系统通知型”消息。比如Jerry向Tom发起聊天邀请,则是通过上面黑色字体6-10的流程发给了Core交换器。然后Core交换器将该条消息告知Tom。
Fanout交换器是用来消息传递的。Jerry和Tom都向其发送消息,然后路由到两个队列。它们两各自订阅一个队列,就可以看到彼此的聊天内容了。

创建Core交换器

package com.rabbitmq.chat.service;

import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import jakarta.annotation.PostConstruct;
import reactor.core.publisher.Flux;

@Service
public class Core {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    private ConnectionFactory connectionFactory;

    final String exchangeName = "Core";

    @PostConstruct
    public void init() {
        connectionFactory = rabbitTemplate.getConnectionFactory();
        createExchange(exchangeName);
    }

    private void createExchange(String exchangeName) {
        rabbitTemplate.execute(channel -> {
            channel.exchangeDeclare(exchangeName, "direct", false, true, null);
            return null;
        });
    }

用户登录

用户登录后,我们会创建一个“系统通知”队列。然后用户就会通过长连接形式,持续等待系统发出通知。

    private final ReentrantLock lock = new ReentrantLock();
    final private Map<String, SimpleMessageListenerContainer> listeners = new java.util.HashMap<>();
    
    public Flux<String> Login(String username) {
        createExclusiveQueue(username);
        createBanding(exchangeName, username, username);
        return Flux.create(emitter -> {
           SimpleMessageListenerContainer container = getListener(username, (Message message) -> {
               String msg = new String(message.getBody());
               System.out.println("Received message: " + msg);
               emitter.next(msg);
           });
           container.start();
       });
    }
 
     private void createExchange(String exchangeName) {
        rabbitTemplate.execute(channel -> {
            channel.exchangeDeclare(exchangeName, "direct", false, true, null);
            return null;
        });
    }

    private void createBanding(String exchangeName, String queueName, String routingKey) {
        rabbitTemplate.execute(channel -> {
            channel.queueBind(queueName, exchangeName, routingKey);
            return null;
        });
    }
    
    private SimpleMessageListenerContainer getListener(String queueName, MessageListener messageListener) {
        lock.lock();
        try {
            SimpleMessageListenerContainer listener = listeners.get(queueName);
            if (listener == null && messageListener != null) {
                listener = new SimpleMessageListenerContainer();
                listener.setConnectionFactory(connectionFactory);
                listener.setQueueNames(queueName);
                listener.setMessageListener(messageListener);
                listeners.put(queueName, listener);
            }
            return listener;
        } finally {
            lock.unlock();
        }
    }

Controller如下

package com.rabbitmq.chat.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.rabbitmq.chat.service.Core;

import reactor.core.publisher.Flux;

@RestController
@RequestMapping("/user")
public class UserController {
    @Autowired
    private Core core;

    @PostMapping(value = "/login", produces = "text/event-stream")
    public Flux<String> login(@RequestParam String username) {
        return core.Login(username);
    }
}

发起聊天邀请

发起聊天邀请时,系统会预先创建一个聊天室(ChatRoomInfo )。它包含上图中Fanout交换器、以及聊天双方需要订阅的消息队列。
这些创建完后,发起方就会等待对方发送的消息,也可以自己和自己聊天。因为消息队列已经创建好了,只是对方还没使用。

package com.rabbitmq.chat.service;

import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import jakarta.annotation.PostConstruct;
import lombok.Data;
import reactor.core.publisher.Flux;

@Service
public class ChatRoom {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    private ConnectionFactory connectionFactory;

    @Data
    private class ChatRoomInfo {
        private String exchange;
        private Map<String, String> usernameToQueuename;
    }

    private final Map<String, ChatRoomInfo> chatRooms = new java.util.HashMap<>();
    private final ReentrantLock lock = new ReentrantLock();   
    
    @PostConstruct
    public void init() {
        connectionFactory = rabbitTemplate.getConnectionFactory();
    }

    public Flux<String> invite(String fromUsername, String toUsername) {
        String chatRoomName = getChatRoomName(fromUsername, toUsername);
        ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);
        if (chatRoomInfo == null) {
            createChatRoom(fromUsername, toUsername);
        }
        return talk(chatRoomName, fromUsername);
    }
    
    private void createChatRoom(String fromUsername, String toUsername) {
        String chatRoomName = getChatRoomName(fromUsername, toUsername);
        String exchangeName = chatRoomName;
        String fromQueueName = "queue-" + fromUsername + "-" + toUsername;
        String toQueueName = "queue-" + toUsername + "-" + fromUsername;
        
        rabbitTemplate.execute(action -> {
            action.exchangeDeclare(exchangeName, "fanout", false, true, null);
            action.queueDeclare(fromQueueName, false, true, false, null);
            action.queueDeclare(toQueueName, false, true, false, null);
            action.queueBind(fromQueueName, exchangeName, "");
            action.queueBind(toQueueName, exchangeName, "");
            return null;
        });

        lock.lock();
        try {
            ChatRoomInfo chatRoomInfo = new ChatRoomInfo();
            chatRoomInfo.setExchange(exchangeName);
            chatRoomInfo.setUsernameToQueuename(Map.of(fromUsername, fromQueueName, toUsername, toQueueName));
            chatRooms.put(chatRoomName, chatRoomInfo);
        } finally {
            lock.unlock();
        }
    }

接受邀请

被邀请方通过Core交换器得知有人要和它聊天。
然后接受邀请的请求会寻找聊天室信息,然后订阅聊天记录队列。

    public Flux<String> accept(String fromUsername, String toUsername) {
        String chatRoomName = getChatRoomName(fromUsername, toUsername);
        return talk(chatRoomName, toUsername);
    }

    private Flux<String> talk(String chatRoomName, String username) {
        ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);
        if (chatRoomInfo == null) {
            throw new IllegalArgumentException("Chat room not found");
        }
        String queueName = chatRoomInfo.getUsernameToQueuename().get(username);
        return Flux.create(emitter -> {
            SimpleMessageListenerContainer listener = new SimpleMessageListenerContainer();
            listener.setConnectionFactory(connectionFactory);
            listener.setQueueNames(queueName);
            listener.setMessageListener((Message message) -> {
                String msg = new String(message.getBody());
                System.out.println(username + " received message: " + msg);
                emitter.next(msg);
            });
            listener.start();
        });
    }

聊天

聊天的逻辑就是找到聊天室信息,然后向交换器发送消息。

    public void chat(String fromUsername, String toUsername, String message) {
        String chatRoomName = getChatRoomName(fromUsername, toUsername);
        ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);
        if (chatRoomInfo == null) {
            chatRoomName = getChatRoomName(toUsername, fromUsername);
            chatRoomInfo = chatRooms.get(chatRoomName);
        }
        if (chatRoomInfo == null) {
            throw new IllegalArgumentException("Chat room not found");
        }
        rabbitTemplate.convertAndSend(chatRoomInfo.getExchange(), "", fromUsername + ": " + message);
    }
    
    private String getChatRoomName(String fromUsername, String toUsername) {
        return fromUsername + "-" + toUsername + "-chat-room";
    }

Controller侧代码

package com.rabbitmq.chat.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.rabbitmq.chat.service.ChatRoom;
import com.rabbitmq.chat.service.Core;

import reactor.core.publisher.Flux;

@RestController
@RequestMapping("/chat")
public class ChatController {
    @Autowired
    private Core core;

    @Autowired
    private ChatRoom chatRoom;

    @PutMapping(value = "/invite", produces = "text/event-stream")
    public Flux<String> invite(@RequestParam String fromUsername, @RequestParam String toUsername) {
        core.invite(fromUsername, toUsername);
        return chatRoom.invite(fromUsername, toUsername);
    }

    @PutMapping(value = "/accept", produces = "text/event-stream")
    public Flux<String> accept(@RequestParam String fromUsername, @RequestParam String toUsername) {
        core.accept(fromUsername, toUsername);
        return chatRoom.accept(fromUsername, toUsername);
    }

    @PostMapping("/send")
    public void send(@RequestParam String fromUsername, @RequestParam String toUsername, @RequestParam String message) {
        chatRoom.chat(fromUsername, toUsername, message);
    }
}

实验过程

在Postman中,我们先让tom登录,然后jerry登录。
在这里插入图片描述
在这里插入图片描述
在后台,我们看到创建两个队列
在这里插入图片描述
以及Core交换器的绑定关系也被更新
在这里插入图片描述
Jerry向Tom发起聊天邀请
在这里插入图片描述
可以看到Tom收到了邀请
在这里插入图片描述
同时新增了两个队列
在这里插入图片描述
以及一个交换器
在这里插入图片描述
在这里插入图片描述
Tom通过下面请求接受邀请
在这里插入图片描述
Jerry收到Tom接受了邀请的通知
在这里插入图片描述
后面它们就可以聊天了
在这里插入图片描述
在这里插入图片描述
它们的聊天窗口都收到了消息
在这里插入图片描述
在这里插入图片描述

总结

本文主要使用的知识点:

  • direct交换器以及其绑定规则
  • fanout交换器
  • 自动删除的交换器
  • 自动删除的队列
  • 只有一个消费者的队列
  • WebFlux响应式编程

代码工程

https://github.com/f304646673/RabbitMQDemo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1873350.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

竞赛选题 python+大数据校园卡数据分析

0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 基于yolov5的深度学习车牌识别系统实现 &#x1f947;学长这里给一个题目综合评分(每项满分5分) 难度系数&#xff1a;4分工作量&#xff1a;4分创新点&#xff1a;3分 该项目较为新颖&am…

5款提高工作效率的免费工具推荐

SimpleTex SimpleTex是一款用于创建和编辑LaTeX公式的简单工具。它能够识别图片中的复杂公式并将其转换为可编辑的数据格式。该软件提供了一个直观的界面&#xff0c;用户可以在编辑LaTeX代码的同时实时预览公式的效果&#xff0c;无需额外的编译步骤。此外&#xff0c;SimpleT…

VS对齐代码格式

制表符Tab与空格有所区别&#xff0c;如果用到Tab键进行格式对齐&#xff0c;后续回车键自动对齐代码格式&#xff0c;在提交git时将明显看到Tab制表符&#xff0c;影响代码观感。例如&#xff0c;可能就长下面这个样子&#xff1a; 解决方式&#xff1a;CtrlF-->输入Tab转义…

Android学习

安卓期末考题复习 知识点总结 View 布局管理 线性布局 实现垂直或者水平布局。 orientation属性控制控件排列方向&#xff0c;包含两个属性值&#xff1a;vertical(垂直)、horizontal(水平)&#xff1b;weight属性表示权重。 相对布局 根据控件之间的相对位置进行布局。…

css 布局出现无法去除的空白

案件介绍&#xff1a;在没有设置任何的css样式的情况下 文字顶部出现无法去除的空白 源代码 <div click"onClick" ><div class"tableTextButton--container"></div><Icon v-if"loading || thisLoading" type"ios-lo…

图数据库 vs 向量数据库

最近大模型出来之后&#xff0c;向量数据库重新翻红&#xff0c;业界和市场上有不少声音认为向量数据库会极大的影响图数据库&#xff0c;图数据库市场会萎缩甚至消失&#xff0c;今天就从技术原理角度来讨论下图数据库和向量数据库到底差别在哪里&#xff0c;适合什么场景&…

广和通 OpenCPU 二次开发(一) —— 串口

广和通 OpenCPU 二次开发&#xff08;一&#xff09; —— 串口 1.port&#xff0c;端口号2.引脚序列号对应芯片引脚图找&#xff0c;也可以对照GPIO功能复用表找3.要复用的pin脚对应的功能mode根据GPIO功能复用表选择 一、核心配置## 标题代码 int port 1; fibo_gpio_mode_s…

iML6602-无滤波器2×30W,60W音频放大器

iML6602是一款由集创北方推出的国产高性能、高效率的双声道D类音频功率放大器&#xff1b;它提供2X30W和60W的功率输出&#xff0c;支持无滤波器立体声&#xff0c;适用于蓝牙/无线扬声器、条形音响、LCD/LED电视和家庭影院等应用&#xff1b;可替代TI-TPA3128/3118/3110/3130/…

【知识图谱系列】Neo4j使用Py2neo与python进行链接

目录 一、安装py2neo 二、打开Neo4j 三、使用Python操作Neo4j 一、安装py2neo pip install --upgrade py2neo -i https://pypi.tuna.tsinghua.edu.cn/simple 可以先阅读下文档&#xff1a;https://py2neo.org/v4/index.html 这个文档里有好多关于这个工具包的API介绍&#x…

awk的用法

目录 awk简述 awk的用法 选项 内置变量 命令格式 打印行号 打印指定行 打印奇偶行 按行取列 BEGIN打印模式 乘法计算 awk -v 变量赋值 awk的条件判断 面试题awk的三元表达式 awk的精确筛选 逻辑且、或关系 awk做小数运算 curl 练习 1.获取其中的所有子域名…

MATLAB-振动问题:单自由度阻尼振动系统受迫振动

一、基本理论 二、MATLAB实现 单自由度阻尼振动系统受迫振动&#xff0c;MATLAB代码如下&#xff1a; clear; clc; close allA 1; psi 0; F0 10; D 20; Rm 0.5; M 1; omega 2; delta Rm / (2*M); omega0 sqrt(D / M); Omega sqrt(omega0^2 - delta^2); Zm Rm i *…

Python学习笔记25:进阶篇(十四)常见标准库使用之性能测试timeit模块学习使用

前言 本文是根据python官方教程中标准库模块的介绍&#xff0c;自己查询资料并整理&#xff0c;编写代码示例做出的学习笔记。 根据模块知识&#xff0c;一次讲解单个或者多个模块的内容。 教程链接&#xff1a;https://docs.python.org/zh-cn/3/tutorial/index.html 性能测量…

视频共享融合赋能平台LntonCVS安防监控平台现场方案实现和应用场景

LntonCVS国标视频融合云平台采用端-边-云一体化架构&#xff0c;部署简单灵活&#xff0c;功能多样化。支持多协议&#xff08;GB28181/RTSP/Onvif/海康SDK/Ehome/大华SDK/RTMP推流等&#xff09;和多类型设备接入&#xff08;IPC/NVR/监控平台&#xff09;。主要功能包括视频直…

数据中心机柜如何正确选择

选择适合数据中心的机柜是一个综合考量多个因素的过程&#xff0c;以下是一些关键点&#xff0c;帮助您做出正确的选择&#xff1a; 结构与承重&#xff1a; 考虑机柜的类型&#xff08;开放式或封闭式&#xff09;、重量载荷能力&#xff0c;确保它能承受数据中心内设备的总重…

期末C语言易错知识点整理

1.在定义多维数组时&#xff0c;除了最左边的维度&#xff0c;其余的维度必须明确指定大小 2.int m[1][4]{4}; 定义的是一个 1 行 4 列的二维数组&#xff0c;初始化时提供了一个元素 4&#xff0c;其余元素默认初始化为 0&#xff0c;因此是正确的。 3.二维数组 a[3][6] 中的索…

flash-Attention2安装和使用

flash-Attention2安装和使用 文章目录 flash-Attention2安装和使用写在前面解决方案 写在前面 就怕你不知道怎么查 pytorch、cuda 的版本 配置cuda&#xff1a;vim ~/.bashrc export CUDA_HOME/usr/local/cuda/ export PATH$PATH:$CUDA_HOME/bin export LD_LIBRARY_PATH$LD_LIB…

分页插件 count有数据,代码不往下执行

如下:如果打印了sql那么当row>0时会有图2下面sql详情的输出 问题出在了分页参数上,pageNum为1,并且pageSize>2才能打印出图二的结果,图一为pageNum值是0,注意,查询第一页,分页应该传入的是1而不是0

松下的台灯值得入手吗?书客、飞利浦真实横评大分享!

我们都知道&#xff0c;无论是学习还是工作&#xff0c;都需要一个良好的照明环境&#xff0c;而台灯就是我们日常生活中非常重要的照明工具。它不仅能够提供额外的光线&#xff0c;还能减少眼睛疲劳&#xff0c;提高我们的工作和学习效率。 所以&#xff0c;选购一款合适的台…

根据后端返回的省市区重新封装树结构(省市区通过children表示)

对比图&#xff08;截取部分&#xff09;&#xff1a; 注&#xff1a;先看分步&#xff0c;最后会附上完整代码&#xff08;如果有用&#xff0c;可以给小编点个赞吗&#xff1f;十分感谢&#xff09; 1.首先将前端返回相同的省份只展示一次 const obj {}; let keyList []r…