直播弹幕系统(二)- 整合RabbitMQ进行消息广播和异步处理
- 前言
- 一. Socket服务整合RabbitMQ
- 二. 弹幕服务创建
- 2.1 创建一个公共maven项目
- 2.2 弹幕服务项目创建
- 2.2.1 创建队列和广播型交换机
- 2.2.2 生产者发送最终弹幕数据
- 2.2.3 消费者监听原始弹幕数据
- 2.3 Socket服务监听弹幕数据并返回前端
- 2.3.1 配置类
- 2.3.2 消费者
- 2.4 测试
前言
上一篇文章 SpringCloud网关对WebSocket链接进行负载均衡 中把主要的架子搭建好了,这一篇文章就要开始写业务逻辑了。在分布式系统下,如何达到SpringBoot - WebSocket的使用和聊天室练习的效果。
一. Socket服务整合RabbitMQ
我们页面上,通过WebSocket
发送弹幕信息的时候,后端通过@OnMessage
注解修饰的函数进行接收。这里我们统一将原始的弹幕消息丢给MQ
。让另一个专业的弹幕服务去消费处理。目的也是希望WebSocket
服务它只负责消息的传递和WebSocket
信息的维护,业务逻辑啥也不做。
1.添加pom
依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.配置文件bootstrap.yml
,添加RabbitMQ相关配置
server:
port: 81
spring:
application:
name: tv-service-socket
cloud:
nacos:
discovery:
server-addr: 你的Nacos地址:8848
rabbitmq:
username: guest
password: guest
# 虚拟主机,默认是/
virtual-host: /
# 超时时间
connection-timeout: 30000
listener:
simple:
# 消费模式,手动
acknowledge-mode: manual
# 并发数
concurrency: 5
# 最大并发数
max-concurrency: 10
# 限流,如果严格控制消费顺序,这里应该填1,数值越大,消费处理速度越快。MQ会把这个数值的消息放到缓存当中。
# 因此数值越大,内存占用越大,还需要考虑消费的速度
prefetch: 10
addresses: 你的RabbitMQ地址:5672
3.RabbitMQ
配置类:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Zong0915
* @date 2022/12/15 下午1:29
*/
@Configuration
public class RabbitMQConfig {
@Bean
public Queue initDirectQueue() {
return new Queue("originBullet-queue", true);
}
@Bean
DirectExchange initDirectExchange() {
return new DirectExchange("bulletPreProcessor-exchange", true, false);
}
@Bean
Binding initBindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("bullet.originMessage");
}
}
4.写一个简单的消息体OriginMessage
,发送到MQ
的:
import lombok.Data;
/**
* @author Zong0915
* @date 2022/12/15 下午1:30
*/
@Data
public class OriginMessage {
private String sessionId;
private String userId;
private String roomId;
private String message;
}
5.MQ
生产者OriginMessageSender
:
/**
* @author Zong0915
* @date 2022/12/15 下午1:29
*/
@Component
public class OriginMessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(OriginMessage originMessage) {
CorrelationData correlationData = new CorrelationData();
correlationData.setId(UUID.randomUUID().toString());// 唯一ID
Map<String, Object> map = new HashMap<>();
map.put("message", JSONObject.toJSONString(originMessage));
// 发送给消息预处理队列
rabbitTemplate.convertAndSend("bulletPreProcessor-exchange",// 交换机名称
"bullet.originMessage",// 路由Key
map, correlationData);
}
}
6.我们再对WebSocket
的监听类做一下小改动,将收到的消息,封装一下,然后调用生产者的API
即可。只需要注意一下多例下属性的注入方式是怎么写的即可。
import kz.cache.SocketCache;
import kz.entity.OriginMessage;
import kz.producer.OriginMessageSender;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Getter;
/**
* @author Zong0915
* @date 2022/12/9 下午3:45
*/
@Component
@ServerEndpoint("/websocket/live/{roomId}/{userId}")
@Slf4j
@Getter
public class BulletScreenServer {
/**
* 多例模式下的赋值方式
*/
private static OriginMessageSender originMessageSender;
/**
* 多例模式下的赋值方式
*/
@Autowired
private void setOriginMessageSender(OriginMessageSender originMessageSender) {
BulletScreenServer.originMessageSender = originMessageSender;
}
private static final AtomicLong count = new AtomicLong(0);
private Session session;
private String sessionId;
private String userId;
private String roomId;
/**
* 打开连接
* @param session
* @OnOpen 连接成功后会自动调用该方法
* @PathParam("token") 获取 @ServerEndpoint("/imserver/{userId}") 后面的参数
*/
@OnOpen
public void openConnection(Session session, @PathParam("roomId") String roomId, @PathParam("userId") String userId) {
// 如果是游客观看视频,虽然有弹幕,但是没有用户信息,所以需要用try
count.incrementAndGet();
log.info("*************WebSocket连接次数: {} *************", count.longValue());
this.userId = userId;
this.roomId = roomId;
// 保存session相关信息到本地
this.sessionId = session.getId();
this.session = session;
SocketCache.put(sessionId, this);
}
/**
* 客户端刷新页面,或者关闭页面,服务端断开连接等等操作,都需要关闭连接
*/
@OnClose
public void closeConnection() {
SocketCache.remove(sessionId);
}
/**
* 客户端发送消息给服务端
* @param message
*/
@OnMessage
public void onMessage(String message) {
if (StringUtils.isBlank(message)) {
return;
}
// 将消息丢给MQ,业务上的处理什么也不管,交给弹幕业务来处理,并且达到削峰的目的
originMessageSender.send(buildMessage(message));
}
private OriginMessage buildMessage(String message) {
OriginMessage originMessage = new OriginMessage();
originMessage.setMessage(message);
originMessage.setRoomId(roomId);
originMessage.setSessionId(sessionId);
originMessage.setUserId(userId);
return originMessage;
}
}
备注:记得将另一个Socket
项目也改造成同样的代码。
二. 弹幕服务创建
2.1 创建一个公共maven项目
我们创建一个maven
项目:service-bulletcommon
。先看下最终的项目架构:
1.pom
依赖添加一些常用的工具:
<groupId>bullet-service</groupId>
<artifactId>service-bulletcommon</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.79</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.2</version>
</dependency>
</dependencies>
2.创建一个常量定义类SocketConstants
:
/**
* @author Zong0915
* @date 2022/12/15 下午3:59
*/
public class SocketConstants {
/**
* 这条消息是否处理过
*/
public static final String CORRELATION_SET_PRE = "Correlation_Set_";
/**
* 同一个房间里面有哪些SessionID
*/
public static final String ROOM_LIVE_USER_SET_PRE = "ROOM_LIVE_USER_Set_";
public static final String MESSAGE = "message";
public static final String ID = "id";
/**
* 原始消息所在队列
*/
public static final String ORIGIN_BULLET_QUEUE = "originBullet-queue";
/**
* 广播队列A
*/
public static final String BULLET_SOCKET_QUEUE_A = "bulletSocket-queueA";
/**
* 广播队列B
*/
public static final String BULLET_SOCKET_QUEUE_B = "bulletSocket-queueB";
/**
* 弹幕预处理交换机
*/
public static final String BULLET_PRE_PROCESSOR_EXCHANGE = "bulletPreProcessor-exchange";
/**
* 弹幕广播交换机
*/
public static final String BULLET_FANOUT_EXCHANGE = "bulletFanOut-exchange";
/**
* 弹幕预处理路由Key
*/
public static final String BULLET_ORIGIN_MESSAGE_ROUTE_KEY = "bullet.originMessage";
}
3.创建一个消息传输体OriginMessage
:
import lombok.Data;
/**
* @author Zong0915
* @date 2022/12/15 下午2:07
*/
@Data
public class OriginMessage {
private String sessionId;
private String userId;
private String roomId;
private String message;
}
2.2 弹幕服务项目创建
1.我们创建一个maven
项目:service-bulletscreen
。先看下最终的项目架构:
1.pom
文件:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>2.2.1.RELEASE</version>
<exclusions>
<exclusion>
<artifactId>archaius-core</artifactId>
<groupId>com.netflix.archaius</groupId>
</exclusion>
<exclusion>
<artifactId>commons-io</artifactId>
<groupId>commons-io</groupId>
</exclusion>
<exclusion>
<artifactId>commons-lang3</artifactId>
<groupId>org.apache.commons</groupId>
</exclusion>
<exclusion>
<artifactId>fastjson</artifactId>
<groupId>com.alibaba</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
<exclusion>
<artifactId>httpclient</artifactId>
<groupId>org.apache.httpcomponents</groupId>
</exclusion>
<exclusion>
<artifactId>servo-core</artifactId>
<groupId>com.netflix.servo</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.6.7</version>
<exclusions>
<exclusion>
<artifactId>log4j-api</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>bullet-service</groupId>
<artifactId>service-bulletcommon</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
2.application.properties
:
spring.application.name=tv-service-bulletscreen
spring.cloud.nacos.discovery.server-addr=你的Nacos地址:8848
3.bootstrap.yml
文件:
server:
port: 83
spring:
application:
name: tv-service-bulletscreen
redis:
database: 0 # Redis数据库索引(默认为0)
host: 你的Redis地址 # Redis的服务地址
port: 6379 # Redis的服务端口
password: 密码
jedis:
pool:
max-active: 8 # 连接池最大连接数(使用负值表示没有限制)
max-wait: -1 # 连接池最大阻塞等待时间(使用负值表示没有限制)
max-idle: 8 # 连接池中的最大空闲连接
min-idle: 0 # 连接池中的最小空闲链接
timeout: 30000 # 连接池的超时时间(毫秒)
cloud:
nacos:
discovery:
server-addr: 你的Nacos地址:8848
rabbitmq:
username: guest
password: guest
# 虚拟主机,默认是/
virtual-host: /
# 超时时间
connection-timeout: 30000
listener:
simple:
# 消费模式,手动
acknowledge-mode: manual
# 并发数
concurrency: 5
# 最大并发数
max-concurrency: 10
# 限流,如果严格控制消费顺序,这里应该填1,数值越大,消费处理速度越快。MQ会把这个数值的消息放到缓存当中。
# 因此数值越大,内存占用越大,还需要考虑消费的速度
prefetch: 10
addresses: 你的RabbitMQ地址:5672
4.Redis
配置类RedisConfig
:
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
/**
* 实例化 RedisTemplate 对象
*
* @return
*/
@Bean
public RedisTemplate<String, Object> functionDomainRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
initDomainRedisTemplate(redisTemplate, redisConnectionFactory);
return redisTemplate;
}
/**
* 设置数据存入 redis 的序列化方式,并开启事务
*
* @param redisTemplate
* @param factory
*/
private void initDomainRedisTemplate(RedisTemplate<String, Object> redisTemplate, RedisConnectionFactory factory) {
//如果不配置Serializer,那么存储的时候缺省使用String,如果用User类型存储,那么会提示错误User can't cast to String!
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
// 开启事务
redisTemplate.setEnableTransactionSupport(true);
redisTemplate.setConnectionFactory(factory);
}
@Bean
@ConditionalOnMissingBean(StringRedisTemplate.class)
public StringRedisTemplate stringRedisTemplate(
RedisConnectionFactory redisConnectionFactory) {
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
}
2.2.1 创建队列和广播型交换机
创建一个广播模式的交换机bulletFanOut-exchange
:其实用direct
也可以,因为我只要监听的队列用同一个即可,这里只是进行一个模拟。
分别为两个Socket
服务创建个队列,用来接收处理好的消息(练习下广播模式):
bulletSocket-queueA
bulletSocket-queueB
再分别为他们和上述创建好的交换机进行绑定。
我们的弹幕服务主要做两件事:
- 监听预处理队列,数据来自:
originBullet-queue
。 - 将处理完的消息通过广播,发送给
bulletSocket-queueA/B
两个队列。
RabbitMQ
配置类如下:
import kz.common.SocketConstants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Zong0915
* @date 2022/12/15 下午1:29
*/
@Configuration
public class RabbitMQConfig {
@Bean
public Queue initDirectQueue() {
return new Queue(SocketConstants.ORIGIN_BULLET_QUEUE, true);
}
@Bean
public Queue initFanoutSocketQueueA() {
return new Queue(SocketConstants.BULLET_SOCKET_QUEUE_A, true);
}
@Bean
public Queue initFanoutSocketQueueB() {
return new Queue(SocketConstants.BULLET_SOCKET_QUEUE_B, true);
}
@Bean
DirectExchange initDirectExchange() {
return new DirectExchange(SocketConstants.BULLET_PRE_PROCESSOR_EXCHANGE, true, false);
}
@Bean("fanoutExchange")
FanoutExchange initFanoutExchange() {
return new FanoutExchange(SocketConstants.BULLET_FANOUT_EXCHANGE, true, false);
}
@Bean
Binding initBindingDirect() {
return BindingBuilder.bind(initDirectQueue()).to(initDirectExchange()).with(SocketConstants.BULLET_ORIGIN_MESSAGE_ROUTE_KEY);
}
@Bean
Binding initBindingFanoutA(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
return BindingBuilder.bind(initFanoutSocketQueueA()).to(fanoutExchange);
}
@Bean
Binding initBindingFanoutB(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
return BindingBuilder.bind(initFanoutSocketQueueB()).to(fanoutExchange);
}
}
2.2.2 生产者发送最终弹幕数据
创建FanoutMessageProducer
类:记得向我们上面绑定的广播交换机发送数据。
import com.alibaba.fastjson.JSONObject;
import kz.entity.OriginMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* @author Zong0915
* @date 2022/12/15 下午2:51
*/
@Component
public class FanoutMessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(OriginMessage originMessage) {
CorrelationData correlationData = new CorrelationData();
correlationData.setId(UUID.randomUUID().toString());// 唯一ID
Map<String, Object> map = new HashMap<>();
map.put("message", JSONObject.toJSONString(originMessage));
rabbitTemplate.convertAndSend("bulletFanOut-exchange",// 交换机名称
"",// 路由Key
map, correlationData);
}
}
2.2.3 消费者监听原始弹幕数据
创建OriginMessageConsumer
类:
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import kz.service.BulletScreenService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
/**
- @author Zong0915
- @date 2022/12/15 下午1:57
*/
@Component
@Slf4j
public class OriginMessageConsumer {
@Autowired
private BulletScreenService bulletScreenService;
/**
* 处理原始消息
*
* @param testMessage Map类型的消息体
* @param headers 消息头
* @param channel 消息所在的管道
*/
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "originBullet-queue", durable = "true"),
// 默认的交换机类型就是direct
exchange = @Exchange(name = "bulletPreProcessor-exchange", type = "direct"),
key = "bullet.originMessage"
)
)
@RabbitHandler
public void onOriginMessage(@Payload Map testMessage, @Headers Map<String, Object> headers,
Channel channel) throws IOException {
log.info("***********消费开始*************");
log.info("消费体:{}", JSONObject.toJSONString(testMessage));
bulletScreenService.processMessage(testMessage, headers, channel);
}
}
2.创建BulletScreenService
类用于原始弹幕的业务处理,主要考虑的几个点:
- 消息的合法性校验。
- 消息的幂等性保证,这里用了
Redis
做个存储。 - 将原始数据处理完后,在丢给
MQ
进行广播。
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import kz.common.SocketConstants;
import kz.entity.OriginMessage;
import kz.producer.FanoutMessageProducer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* @author Zong0915
* @date 2022/12/9 下午3:45
*/
@Service
@Slf4j
public class BulletScreenService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private FanoutMessageProducer fanoutMessageProducer;
@Async
public void processMessage(Map testMessage, Map<String, Object> headers,
Channel channel) throws IOException {
OriginMessage originMessage = getOriginMessage(testMessage);
// 合法性校验
if (!validMessage(testMessage, headers, originMessage)) {
return;
}
// 处理消息
log.info("***********业务处理,弹幕: {}***********", originMessage.getMessage());
String correlationId = headers.get(SocketConstants.ID).toString();
// 存入Redis并设置过期时间1天
redisTemplate.opsForSet().add(SocketConstants.CORRELATION_SET_PRE + originMessage.getRoomId(), correlationId);
redisTemplate.expire(SocketConstants.CORRELATION_SET_PRE + originMessage.getRoomId(), 1, TimeUnit.DAYS);
// 将处理好的消息发送给MQ,通过广播队列,将消息发送给所有的Socket服务,一般这里还会对originMessage进行一些二次封装
// 本案例就不做处理了,原样返回
fanoutMessageProducer.send(originMessage);
// 确认消息
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);
}
public OriginMessage getOriginMessage(Map testMessage) {
String messageJson = (String) testMessage.get(SocketConstants.MESSAGE);
if (StringUtils.isBlank(messageJson)) {
return null;
}
OriginMessage originMessage = JSONObject.parseObject(messageJson, OriginMessage.class);
return originMessage;
}
/**
* 对消息进行合法性校验
*/
public boolean validMessage(Map testMessage, Map<String, Object> headers, OriginMessage originMessage) {
// 判空
if (testMessage == null || testMessage.size() == 0 || originMessage == null) {
return false;
}
if (headers == null || headers.size() == 0) {
return false;
}
// 幂等性校验,如果消息已经被消费过了,那么这个弹幕消息就不应该被二次消费,这个消息就直接把他处理掉
UUID correlationId = (UUID) headers.get(SocketConstants.ID);
Boolean exist = redisTemplate.opsForSet().isMember(SocketConstants.CORRELATION_SET_PRE + originMessage.getRoomId(), correlationId.toString());
return !Optional.ofNullable(exist).orElse(false);
}
}
最后就是启动类BulletScreenApplication
:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.scheduling.annotation.EnableAsync;
/**
* @author Zong0915
* @date 2022/12/10 下午9:44
*/
@SpringBootApplication
@EnableDiscoveryClient
@EnableAsync
public class BulletScreenApplication {
public static void main(String[] args) {
SpringApplication.run(BulletScreenApplication.class, args);
}
}
2.3 Socket服务监听弹幕数据并返回前端
记得在pom
依赖中引入上面的公共包:
<dependency>
<groupId>bullet-service</groupId>
<artifactId>service-bulletcommon</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
2.3.1 配置类
RabbitMQ
配置类增加下队列和交换机的配置信息:绑定bulletSocket-queueA
@Bean
public Queue initFanoutSocketQueueA() {
return new Queue(SocketConstants.BULLET_SOCKET_QUEUE_A, true);
}
@Bean("fanoutExchange")
FanoutExchange initFanoutExchange() {
return new FanoutExchange(SocketConstants.BULLET_FANOUT_EXCHANGE, true, false);
}
@Bean
Binding initBindingFanoutA(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
return BindingBuilder.bind(initFanoutSocketQueueA()).to(fanoutExchange);
}
另一个Socket
项目,添加以下配置:绑定bulletSocket-queueB
@Bean
public Queue initFanoutSocketQueueB() {
return new Queue(SocketConstants.BULLET_SOCKET_QUEUE_B, true);
}
@Bean("fanoutExchange")
FanoutExchange initFanoutExchange() {
return new FanoutExchange(SocketConstants.BULLET_FANOUT_EXCHANGE, true, false);
}
@Bean
Binding initBindingFanoutA(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
return BindingBuilder.bind(initFanoutSocketQueueB()).to(fanoutExchange);
}
再写一个缓存工具类,通过直播间号获得同一个直播间下的所有WebSocket信息:
public class SocketCache {
public static List<BulletScreenServer> getSocketGroupByRoomId(String roomId) {
ArrayList<BulletScreenServer> res = new ArrayList<>();
if (StringUtils.isBlank(roomId)) {
return res;
}
for (Map.Entry<Integer, ConcurrentHashMap<String, BulletScreenServer>> hashMapEntry : CACHE_SEGMENT.entrySet()) {
ConcurrentHashMap<String, BulletScreenServer> map = hashMapEntry.getValue();
if (map == null || map.size() == 0) {
continue;
}
for (BulletScreenServer server : map.values()) {
if (server.getSession().isOpen() && StringUtils.equals(roomId, server.getRoomId())) {
res.add(server);
}
}
}
return res;
}
}
2.3.2 消费者
重点就是消费者的业务代码了,对最终的弹幕数据进行广播,创建FanOutMessageConsumer
类:
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import kz.cache.SocketCache;
import kz.common.SocketConstants;
import kz.entity.OriginMessage;
import kz.service.BulletScreenServer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* @author Zong0915
* @date 2022/12/15 下午1:57
*/
@Component
@Slf4j
public class FanOutMessageConsumer {
/**
* 处理弹幕消息,开始广播
*
* @param testMessage Map类型的消息体
* @param headers 消息头
* @param channel 消息所在的管道
*/
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "bulletSocket-queueA", durable = "true"),
// 默认的交换机类型就是direct
exchange = @Exchange(name = "bulletFanOut-exchange", type = "fanout")
)
)
@RabbitHandler
public void onOriginMessage(@Payload Map testMessage, @Headers Map<String, Object> headers, Channel channel) throws IOException {
log.info("***********消费开始, Socket服务A接收到广播消息*************");
log.info("消费体:{}", JSONObject.toJSONString(testMessage));
OriginMessage originMessage = getOriginMessage(testMessage);
if (originMessage == null) {
return;
}
// 根据roomID去找到同一个直播间下的所有用户并广播消息
List<BulletScreenServer> socketGroupByRoomId = SocketCache.getSocketGroupByRoomId(originMessage.getRoomId());
for (BulletScreenServer bulletScreenServer : socketGroupByRoomId) {
bulletScreenServer.getSession().getBasicRemote().sendText(JSONObject.toJSONString(originMessage));
}
// 确认消息
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);
}
public OriginMessage getOriginMessage(Map testMessage) {
String messageJson = (String) testMessage.get(SocketConstants.MESSAGE);
if (StringUtils.isBlank(messageJson)) {
return null;
}
OriginMessage originMessage = JSONObject.parseObject(messageJson, OriginMessage.class);
return originMessage;
}
}
另一个Socket
服务则改一下消费者的监听队列和日志内容即可:
2.4 测试
打开同一个直播间的两个用户,让两个WebSocket正好建立到不同的服务器上:
此时Socket
服务A:
Socket
服务B:
页面A中随便发送一条弹幕:
页面B中随便发送一条弹幕:
1.前端发送一条弹幕,后端监听到,开始向预处理队列丢消息。
2.service-bulletscreen
服务,监听到预处理队列数据,开始进行处理。
3.经过一系列校验和幂等性处理之后,将处理完的弹幕通过交换机发送给广播队列:
4.Socket
服务B接收到消息:
Socket
服务A
接收到广播消息:
5.前端页面展示:
页面A:
页面B:
到这里,一个聊天服务就完成了。不过大家也看到在线人数这块咱没做。可以用Redis
缓存来记录每个直播间的人数。这个功能放到下一篇文章来讲。