直播弹幕系统(七)- 利用动态创建队列完成直播间独立聊天
- 前言
- 一. 动态创建队列
- 1.1 测试 - 动态创建队列
- 1.2 测试 - 聊天室独立
前言
上一篇 SpringBoot + STOMP + RabbitMQ(使用MQ替代Spring代理) 中主要讲解了如何整合STOMP
以及RabbitMQ
代替Spring
代理。
其中代码的设计有一点还并不完善:
- 所有的直播间共用同一个队列。就会造成直播间聊天内容窜了。
因为设计起来,希望是每个直播间有一个独立的队列和交换机。这样就能做到不同直播间的人聊天内容不会乱窜。但是我们又不可能提前去为每个直播间去创建队列和交换机。(排除创建直播间时的操作),我们这里就通过动态创建和监听的方式来完成这个功能。
一. 动态创建队列
我是这样设想的:
- 在打开任何一个直播间的时候,
Java
后端这里我们是能够感应到WebSocket
的创建的。我们主要在这里进行队列和交换机的动态创建过程。 - 每个直播间的消息都往统一的交换机发送。和上篇文章保持一致:
stomp-exchange
交换机。 - 根据我们的
RabbitMQ
配置,对这个交换机对应的队列stomp-queue
进行监听。再由业务代码来决定,消息该往哪个直播间的交换机发送。(发送的消息体中包含了直播间号)
我们按照这个思路顺序来编写代码。
稍微复习一下,在上篇文章中我们写了个WebSocketEventListener
监听类,下面有这么一个函数:
@EventListener
public void handleWebSocketConnectListener(SessionConnectEvent event) {}
当建立WebSocket
链接的时候,这个函数就会走进来。那么我们在原本代码基础上,增加动态创建队列的逻辑即可。
@EventListener
public void handleWebSocketConnectListener(SessionConnectEvent event) {
// 老代码...
// 如果没有队列就创建一个队列
createQueueAndExchangeIfNeed(roomId);
}
public void createQueueAndExchangeIfNeed(String roomId) {
String exchangeName = "Live_" + roomId + "-Exchange";
String queueName = "Live_" + roomId + "-Queue";
// 判断是否有队列创建过了
QueueInformation queueInfo = RabbitMQUtil.getQueueInfo(queueName);
// 如果创建过队列了,就直接返回,不要重复创建
if (queueInfo != null) {
return;
}
// 创建新队列
RabbitMQUtil.createAndBindQueue(queueName);
// 创建新交换机
RabbitMQUtil.createAndBindExchange(exchangeName, ExchangeTypeEnum.TOPIC);
// 绑定队列和交换机
RabbitMQUtil.binding(queueName, exchangeName);
}
RabbitMQUtil
工具类代码,核心:通过AmqpAdmin
去创建队列、交换机以及绑定动作。
import kz.constants.ExchangeTypeEnum;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.*;
/**
* @Date 2023/1/5 15:32
* @Created by jj.lin
*/
public class RabbitMQUtil {
private static AmqpAdmin getAmqpAdmin() {
return SpringBeanUtil.getBean("amqpAdmin", AmqpAdmin.class);
}
public static void createAndBindQueue(String queueName) {
AmqpAdmin amqpAdmin = getAmqpAdmin();
Queue queue = new Queue(queueName, true);
if (StringUtils.isBlank(queueName)) {
return;
}
amqpAdmin.declareQueue(queue);
}
public static QueueInformation getQueueInfo(String queueName) {
if (StringUtils.isBlank(queueName)) {
return null;
}
return getAmqpAdmin().getQueueInfo(queueName);
}
public static void createAndBindExchange(String exchangeName, ExchangeTypeEnum typeEnum) {
AbstractExchange exchange = null;
switch (typeEnum) {
case DIRECT:// 直连交换机
exchange = new DirectExchange(exchangeName, true, false);
break;
case TOPIC: // 主题交换机
exchange = new TopicExchange(exchangeName, true, false);
break;
case FANOUT: //扇形交换机
exchange = new FanoutExchange(exchangeName, true, false);
break;
case HEADERS: // 头交换机
exchange = new HeadersExchange(exchangeName, true, false);
break;
}
getAmqpAdmin().declareExchange(exchange);
}
public static void binding(String queueName, String exchangeName) {
Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, "", null);
// 绑定队列和交换机
getAmqpAdmin().declareBinding(binding);
}
}
这里我们还创建了一个枚举类ExchangeTypeEnum
:
public enum ExchangeTypeEnum {
/**
* 直连交换机
*/
DIRECT,
/**
* 主题交换机
*/
TOPIC,
/**
* 扇形交换机
*/
FANOUT,
/**
* 头交换机
*/
HEADERS;
}
接下来第二点,我们无需改动,我们需要关注的是业务代码,如何通过业务代码去控制将消息发送到对应直播间的对应交换机上,我们看ChatService.sendMsg()
这个函数:
public boolean sendMsg(String message) {
if (StringUtils.isBlank(message)) {
return false;
}
ChatMessage chatMessage = JsonUtil.parseJsonToObj(message, ChatMessage.class);
if (chatMessage == null) {
return false;
}
LiveMessage liveMessage = new LiveMessage();
liveMessage.setType(MessageType.CHAT.toString());
liveMessage.setContent("用户 [" + chatMessage.getSender() + "] 说 (来自MQ):" + chatMessage.getContent());
String roomId = chatMessage.getRoomId();
String exchangeName = "Live_" + roomId + "-Exchange";
// 主要的修改部分就是这里,根据roomId去拼接对应的交换机名称
rabbitTemplate.convertAndSend(exchangeName, "", JsonUtil.toJSON(liveMessage));
return true;
}
后端到这里就改好了(后面会根据流程跑一遍),前端部分很简单,我们只需要更换一下订阅的队列名称即可:
const onMQConnected = () => {
console.log('RabbitMQ初始化成功');
// 订阅交换机
const exchangeName = `/exchange/Live_${roomId}-Exchange`;
stompMQClient.subscribe(exchangeName, function(data:any) {
const res = data.body;
const entity = JSON.parse(res);
const arr :any = [ entity.content ];
setBulletList((pre: any[]) => [].concat(...pre, ...arr));
data.ack();
}, { ack: 'client' });
};
1.1 测试 - 动态创建队列
首先来看下RabbitMQ
的控制台:一共有4个队列。
接下来我打开URL
:http://localhost:4396/zong/?userId=Zong4&roomId=6
。那么对应的就应该自动创建一个名为 Live_6-Queue
的队列。我们跟着代码来跑一遍。打开URL
,代码进入到此:
紧接着,分别创建了以下对象:
- 队列:
Live_6-Queue
- 交换机:
Live_6-Exchange
控制台验证:绑定关系也有了。
1.2 测试 - 聊天室独立
我们在直播间号为6的地方聊天(点击右侧按钮),这里发送的是HTTP
请求。
Controller
层接收:
此时将这条信息(包含了直播间号等数据)发送给了交换机stomp-exchange
。根据项目启动时RabbitMQ
的相关配置,对stomp-queue
这个队列进行了监听:
监听到后,将消息委派给ChatService.sendMsg()
这个函数来处理:
这样前端监听的时候,就可以直接拿到自己直播间的消息啦:
结果如下:
当然,每个直播间的聊天内容也是独立的哦:
本篇文章到这里就结束啦,最后我也想说一下:
- 其实这一系列的文章都是自己思考后得出的一些设计思路。问题肯定是存在的,当然,现在也是在不断地学习和摸索,看是否有更好的实现方案。但是如果说这种思路或者编码方式对你们有一点帮助,那么这些都是值得的。
- 后面会研究前端方面,如何实现弹幕的滚动效果。毕竟我这里是一个简单的聊天室功能。后端方面也会继续更新。