直播弹幕系统(七)- 利用动态创建队列完成直播间独立聊天

news2025/1/17 6:04:16

直播弹幕系统(七)- 利用动态创建队列完成直播间独立聊天

  • 前言
  • 一. 动态创建队列
    • 1.1 测试 - 动态创建队列
    • 1.2 测试 - 聊天室独立

前言

上一篇 SpringBoot + STOMP + RabbitMQ(使用MQ替代Spring代理) 中主要讲解了如何整合STOMP以及RabbitMQ代替Spring代理。

其中代码的设计有一点还并不完善:

  • 所有的直播间共用同一个队列。就会造成直播间聊天内容窜了。

因为设计起来,希望是每个直播间有一个独立的队列和交换机。这样就能做到不同直播间的人聊天内容不会乱窜。但是我们又不可能提前去为每个直播间去创建队列和交换机。(排除创建直播间时的操作),我们这里就通过动态创建和监听的方式来完成这个功能。

一. 动态创建队列

我是这样设想的:

  1. 在打开任何一个直播间的时候,Java后端这里我们是能够感应到WebSocket的创建的。我们主要在这里进行队列和交换机的动态创建过程。
  2. 每个直播间的消息都往统一的交换机发送。和上篇文章保持一致:stomp-exchange交换机。
  3. 根据我们的RabbitMQ配置,对这个交换机对应的队列stomp-queue进行监听。再由业务代码来决定,消息该往哪个直播间的交换机发送。(发送的消息体中包含了直播间号)

我们按照这个思路顺序来编写代码。

稍微复习一下,在上篇文章中我们写了个WebSocketEventListener监听类,下面有这么一个函数:

@EventListener
public void handleWebSocketConnectListener(SessionConnectEvent event) {}

当建立WebSocket链接的时候,这个函数就会走进来。那么我们在原本代码基础上,增加动态创建队列的逻辑即可。

@EventListener
public void handleWebSocketConnectListener(SessionConnectEvent event) {
	// 老代码...
	// 如果没有队列就创建一个队列
    createQueueAndExchangeIfNeed(roomId);
}

public void createQueueAndExchangeIfNeed(String roomId) {
    String exchangeName = "Live_" + roomId + "-Exchange";
    String queueName = "Live_" + roomId + "-Queue";

    // 判断是否有队列创建过了
    QueueInformation queueInfo = RabbitMQUtil.getQueueInfo(queueName);
    // 如果创建过队列了,就直接返回,不要重复创建
    if (queueInfo != null) {
        return;
    }
    // 创建新队列
    RabbitMQUtil.createAndBindQueue(queueName);
    // 创建新交换机
    RabbitMQUtil.createAndBindExchange(exchangeName, ExchangeTypeEnum.TOPIC);
    // 绑定队列和交换机
    RabbitMQUtil.binding(queueName, exchangeName);
}

RabbitMQUtil工具类代码,核心:通过AmqpAdmin去创建队列、交换机以及绑定动作。

import kz.constants.ExchangeTypeEnum;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.*;

/**
 * @Date 2023/1/5 15:32
 * @Created by jj.lin
 */
public class RabbitMQUtil {
    private static AmqpAdmin getAmqpAdmin() {
        return SpringBeanUtil.getBean("amqpAdmin", AmqpAdmin.class);
    }

    public static void createAndBindQueue(String queueName) {
        AmqpAdmin amqpAdmin = getAmqpAdmin();
        Queue queue = new Queue(queueName, true);
        if (StringUtils.isBlank(queueName)) {
            return;
        }
        amqpAdmin.declareQueue(queue);
    }

    public static QueueInformation getQueueInfo(String queueName) {
        if (StringUtils.isBlank(queueName)) {
            return null;
        }
        return getAmqpAdmin().getQueueInfo(queueName);
    }

    public static void createAndBindExchange(String exchangeName, ExchangeTypeEnum typeEnum) {
        AbstractExchange exchange = null;
        switch (typeEnum) {
            case DIRECT:// 直连交换机
                exchange = new DirectExchange(exchangeName, true, false);
                break;
            case TOPIC: // 主题交换机
                exchange = new TopicExchange(exchangeName, true, false);
                break;
            case FANOUT: //扇形交换机
                exchange = new FanoutExchange(exchangeName, true, false);
                break;
            case HEADERS: // 头交换机
                exchange = new HeadersExchange(exchangeName, true, false);
                break;
        }
        getAmqpAdmin().declareExchange(exchange);
    }

    public static void binding(String queueName, String exchangeName) {
        Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, "", null);
        // 绑定队列和交换机
        getAmqpAdmin().declareBinding(binding);
    }
}

这里我们还创建了一个枚举类ExchangeTypeEnum

public enum ExchangeTypeEnum {
    /**
     * 直连交换机
     */
    DIRECT,

    /**
     * 主题交换机
     */
    TOPIC,
    /**
     * 扇形交换机
     */
    FANOUT,
    /**
     * 头交换机
     */
    HEADERS;
}

接下来第二点,我们无需改动,我们需要关注的是业务代码,如何通过业务代码去控制将消息发送到对应直播间的对应交换机上,我们看ChatService.sendMsg()这个函数:

public boolean sendMsg(String message) {
    if (StringUtils.isBlank(message)) {
        return false;
    }
    ChatMessage chatMessage = JsonUtil.parseJsonToObj(message, ChatMessage.class);
    if (chatMessage == null) {
        return false;
    }
    LiveMessage liveMessage = new LiveMessage();
    liveMessage.setType(MessageType.CHAT.toString());
    liveMessage.setContent("用户 [" + chatMessage.getSender() + "] 说 (来自MQ):" + chatMessage.getContent());
    String roomId = chatMessage.getRoomId();
    String exchangeName = "Live_" + roomId + "-Exchange";
	// 主要的修改部分就是这里,根据roomId去拼接对应的交换机名称
    rabbitTemplate.convertAndSend(exchangeName, "", JsonUtil.toJSON(liveMessage));
    return true;
}

后端到这里就改好了(后面会根据流程跑一遍),前端部分很简单,我们只需要更换一下订阅的队列名称即可:

const onMQConnected = () => {
  console.log('RabbitMQ初始化成功');
  // 订阅交换机
  const exchangeName = `/exchange/Live_${roomId}-Exchange`;
  stompMQClient.subscribe(exchangeName, function(data:any) {
    const res = data.body;
    const entity = JSON.parse(res);
    const arr :any = [ entity.content ];
    setBulletList((pre: any[]) => [].concat(...pre, ...arr));
    data.ack();
  }, { ack: 'client' });
};

1.1 测试 - 动态创建队列

首先来看下RabbitMQ的控制台:一共有4个队列。
在这里插入图片描述
接下来我打开URLhttp://localhost:4396/zong/?userId=Zong4&roomId=6。那么对应的就应该自动创建一个名为 Live_6-Queue的队列。我们跟着代码来跑一遍。打开URL,代码进入到此:
在这里插入图片描述
紧接着,分别创建了以下对象:

  • 队列:Live_6-Queue
  • 交换机:Live_6-Exchange

在这里插入图片描述
控制台验证:绑定关系也有了。
在这里插入图片描述

1.2 测试 - 聊天室独立

我们在直播间号为6的地方聊天(点击右侧按钮),这里发送的是HTTP请求。
在这里插入图片描述
Controller层接收:
在这里插入图片描述
此时将这条信息(包含了直播间号等数据)发送给了交换机stomp-exchange。根据项目启动时RabbitMQ的相关配置,对stomp-queue这个队列进行了监听:
在这里插入图片描述
监听到后,将消息委派给ChatService.sendMsg()这个函数来处理:
在这里插入图片描述
这样前端监听的时候,就可以直接拿到自己直播间的消息啦:
在这里插入图片描述
结果如下:
在这里插入图片描述
当然,每个直播间的聊天内容也是独立的哦:
在这里插入图片描述

本篇文章到这里就结束啦,最后我也想说一下:

  1. 其实这一系列的文章都是自己思考后得出的一些设计思路。问题肯定是存在的,当然,现在也是在不断地学习和摸索,看是否有更好的实现方案。但是如果说这种思路或者编码方式对你们有一点帮助,那么这些都是值得的。
  2. 后面会研究前端方面,如何实现弹幕的滚动效果。毕竟我这里是一个简单的聊天室功能。后端方面也会继续更新。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/151042.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vue条件语句中v-if、v-else、v-else-if的用法

文章目录1、v-if和v-else结合使用1.1 出现的错误2、v-if、v-else-if和v-else的联合使用2.1 出现的错误3、如果想要同时切换多个元素3.1 效果展示1、v-if和v-else结合使用 v-else 元素必须紧跟在带 v-if 或者 v-else-if 的元素的后面&#xff0c;否则它将不会被识别。 <span…

智能指针(二) shared_ptr 注意点

智能指针(二) shared_ptr 注意点 1 不存在 int * 到 shared_ptr 的隐式类型转换 void proc(shared_ptr<int> ptr) {cout << "ptr.use_count()" << ptr.use_count() << endl;cout << "调用成功" << endl;return; }in…

独立产品灵感周刊 DecoHack #043 - 互联网从业者的灵感数据库

本周刊记录有趣好玩的独立产品设计开发相关内容&#xff0c;每周发布&#xff0c;往期内容同样精彩&#xff0c;感兴趣的伙伴可以点击订阅我的周刊。为保证每期都能收到&#xff0c;建议邮件订阅。欢迎通过 Twitter 私信推荐或投稿。很完美的断更了2期&#xff0c;有一期是因为…

RFID技术和NFC技术的原理及区别,你都了解吗?

物联网是信息技术发展的重要推动力&#xff0c;推动了农业、工业、制造业、服务业等多个行业的发展&#xff0c;物联网主要由三个关键技术组成&#xff0c;即连接、物体标识和数据传输&#xff0c;人们把RFID技术作为物体标识的代表&#xff0c;随着技术的进步起源于RFID技术的…

谷粒商城-基础篇-Day07-品牌分类关联与级联更新

将品牌分类和品牌名称的关系放在pms_category_brand_relation表中 获取该列表品牌所有的关联信息 /*** 列表*/GetMapping("/catelog/list")public R list(RequestParam("brandId") Long brandId){List<CategoryBrandRelationEntity> datacategoryBra…

Java日期时间类

Java日期时间类Datenew Date()**获取当前系统时间**通过**指定毫秒数得到时间**format**指定日期格式**SimpleDateFormat的模式字母&#xff1a;parse()可以把**格式化的String转成对应Date**Calendar&#xff08;日历&#xff09;创建日期类对象获取日历对象的某个日历字段第三…

【Linux】五、Linux 进程控制(总)|进程创建|进程终止|进程等待进程程序替换|模拟shell

目录 一、进程创建 1.1 再谈 fork 函数 1.2 fork 函数返回值问题 1.2 写时拷贝 1.3 fork 常规用法 1.4 fork调用失败的原因 二、进程终止 2.1 进程退出码 2.2 进程退出场景 2.3 进程如何退出 三、进程等待 3.1 进程等待必要性 3.2 进程等待的方法 3.2.1 通过 wai…

【二进制安全面试题】linux篇:保护机制、函数调用约定

前言 上来先道歉&#xff0c;对不起(&#xff1e;人&#xff1c;&#xff1b;)对不起&#xff0c;博客鸽了好久。私下有好多朋友问我毕业工作的事情&#xff0c;毕竟搞二进制最重要的是要有热情&#xff01;我能做的也是有限&#xff0c;每个人的学习方式不完全相同&#xff0c…

Http4s 存在输入验证不当漏洞(CVE-2023-22465)

漏洞描述 http4s 是一个用于处理 HTTP 服务的 Scala 接口。 http4s 的受影响版本延迟加载模型化标头&#xff08;modeled headers&#xff09;&#xff0c;用于处理规范化标头的请求&#xff08;如&#xff1a;Option[Header] req.headers.get(“User-Agent”.ci)&#xff0…

C语言进阶——字符串函数(一)

目录 一. strlen 二. strcpy 三. strcat 四. strcmp 五. strncpy 六. strncat 七. strncmp 八. strstr 九. strtok 一. strlen 字符串以 \0 作为结束标志&#xff0c;strlen函数返回的是在字符串中 \0 前面出现的字符个数&#xff08;不包 含 \0 …

陪诊软件开发,陪诊服务具备哪些好处,前景如何

在当下互联网快速发展的时代&#xff0c;我们要首先明确&#xff0c;一个行业的发展最重要的是什么&#xff0c;什么才能促进这个行业的前进。当然是用户的数量&#xff0c;**而我们的陪诊服务&#xff0c;潜在的用户数量是巨大的。因为自己独立不便就医的人群&#xff0c;都可…

maven导入第三方jar包,出现找不到类

我们开发时&#xff0c;会用到第三代第三方的jar包&#xff0c;私服上没有&#xff0c;只能导入使用。 导入步骤&#xff1a; 1、在项目根目录建文件夹lib&#xff0c;降jar包复制过去。 在pom.xml中引入jar包&#xff0c;如引入bcprov-jdk15on-1.59.jar <dependency>&…

c++ - 第21节 - 智能指针

1.为什么需要智能指针 分析一下下面这段程序有没有什么内存方面的问题&#xff1f;前面在异常的博客中&#xff0c;我们分析了下图一的代码Func函数中如果div()函数抛异常则程序会直接跳到主函数的catch捕获程序部分&#xff0c;然后接着主函数catch捕获程序部分往后执行代码&a…

【IOS的safari浏览器】uniapp的H5项目 safari<添加到主屏幕>功能的实现(多页面、单页面)

uniapp的H5项目safari <添加到主屏幕>功能的实现ios添加到主屏幕的需求具体效果实现前提完整的HTML页面如何判断应用是从主屏幕打开还是从浏览器打开特殊情况ios添加到主屏幕的需求 添加到主屏幕——这个功能属于ios的safari浏览器的特性之一&#xff0c;他可以让我们的…

Java环境安装、替换jdk后java编译javac无反应,但java和java -version可以成功:实操解决方案

这里写自定义目录标题问题背景方案一方案二方案三问题背景 最近换了新电脑&#xff0c;安装java环境&#xff0c;一次性下载了3个jdk版本&#xff0c;在配置后返现 cmd命令行下javac编译java文件不成功&#xff0c;但是输入java和java -version没问题 在CSDN看了许多解决方案…

linux安装go

下载地址 https://studygolang.com/dl?id2&id15&id0&id8&adinfo678baidu&adinfo678baidu%3Epage%3E go语言中文网 解压 tar -xvf go1.19.4.linux-amd64.tar.gz 解压之后在 root目录下面 有个 go的文件夹 vim ~/.bashrc 配置环境变量 export GOROOT/roo…

【Dash搭建可视化网站】项目10:疫情数据可视化大屏制作步骤详解

疫情数据可视化大屏制作步骤详解1 项目效果图2 项目架构3 文件介绍和功能完善3.1 assets文件夹介绍3.2 app.py和index.py文件完善3.3 header.py文件完善3.4 cards.py文件完善3.5 api.py和api.ipynb文件完善3.5.1 数据获取3.5.2 数据处理3.5.3 接口数据导入header.py和cards.py文…

SpringMVC基本使用

SpringMVC基本使用1、回顾MVC1.1、什么是MVC1.2、Model1时代1.3、Model2时代1.4、回顾Servlet2、什么是SpringMVC2.1、概述2.2、中心控制器2.3、SpringMVC执行原理3、HelloSpring3.1、配置版3.2、注解版3.3、小结4、Controller 及 RestFul4.1、控制器Controller4.2、实现Contro…

【笔记:第一课】学习开发一个RISC-V上的操作系统 - 汪辰 - 2021春

文章目录前言来源正文小结前言 创作开始时间&#xff1a;2023年1月9日20:02:19 如题&#xff0c;学习一下RISC-V。 来源 https://www.bilibili.com/video/BV1Q5411w7z5/ 正文 打好基础&#xff01;好好学习 本课程目的&#xff1a; 了解 RISC-V 的相关知识学会查看RISC-…

week10

T1 Einstein学画画 题目描述 Einstein 学起了画画。 此人比较懒~~&#xff0c;他希望用最少的笔画画出一张画…… 给定一个无向图&#xff0c;包含 nnn 个顶点&#xff08;编号 1∼n1 \sim n1∼n&#xff09;&#xff0c;mmm 条边&#xff0c;求最少用多少笔可以画出图中所…