直播弹幕系统(二)- 整合RabbitMQ进行消息广播和异步处理

news2025/1/23 22:43:15

直播弹幕系统(二)- 整合RabbitMQ进行消息广播和异步处理

  • 前言
  • 一. Socket服务整合RabbitMQ
  • 二. 弹幕服务创建
    • 2.1 创建一个公共maven项目
    • 2.2 弹幕服务项目创建
      • 2.2.1 创建队列和广播型交换机
      • 2.2.2 生产者发送最终弹幕数据
      • 2.2.3 消费者监听原始弹幕数据
    • 2.3 Socket服务监听弹幕数据并返回前端
      • 2.3.1 配置类
      • 2.3.2 消费者
    • 2.4 测试

前言

上一篇文章 SpringCloud网关对WebSocket链接进行负载均衡 中把主要的架子搭建好了,这一篇文章就要开始写业务逻辑了。在分布式系统下,如何达到SpringBoot - WebSocket的使用和聊天室练习的效果。

一. Socket服务整合RabbitMQ

我们页面上,通过WebSocket发送弹幕信息的时候,后端通过@OnMessage注解修饰的函数进行接收。这里我们统一将原始的弹幕消息丢给MQ。让另一个专业的弹幕服务去消费处理。目的也是希望WebSocket服务它只负责消息的传递和WebSocket信息的维护,业务逻辑啥也不做。

1.添加pom依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.配置文件bootstrap.yml,添加RabbitMQ相关配置

server:
  port: 81

spring:
  application:
    name: tv-service-socket
  cloud:
    nacos:
      discovery:
        server-addr: 你的Nacos地址:8848
  rabbitmq:
    username: guest
    password: guest
    # 虚拟主机,默认是/
    virtual-host: /
    # 超时时间
    connection-timeout: 30000
    listener:
      simple:
        # 消费模式,手动
        acknowledge-mode: manual
        # 并发数
        concurrency: 5
        # 最大并发数
        max-concurrency: 10
        # 限流,如果严格控制消费顺序,这里应该填1,数值越大,消费处理速度越快。MQ会把这个数值的消息放到缓存当中。
        # 因此数值越大,内存占用越大,还需要考虑消费的速度
        prefetch: 10
    addresses: 你的RabbitMQ地址:5672

3.RabbitMQ配置类:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author Zong0915
 * @date 2022/12/15 下午1:29
 */
@Configuration
public class RabbitMQConfig {
    @Bean
    public Queue initDirectQueue() {
        return new Queue("originBullet-queue", true);
    }

    @Bean
    DirectExchange initDirectExchange() {
        return new DirectExchange("bulletPreProcessor-exchange", true, false);
    }

    @Bean
    Binding initBindingDirect() {
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("bullet.originMessage");
    }
}

4.写一个简单的消息体OriginMessage,发送到MQ的:

import lombok.Data;

/**
 * @author Zong0915
 * @date 2022/12/15 下午1:30
 */
@Data
public class OriginMessage {
    private String sessionId;
    private String userId;
    private String roomId;
    private String message;
}

5.MQ生产者OriginMessageSender

/**
 * @author Zong0915
 * @date 2022/12/15 下午1:29
 */
@Component
public class OriginMessageSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(OriginMessage originMessage) {
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());// 唯一ID
        Map<String, Object> map = new HashMap<>();
        map.put("message", JSONObject.toJSONString(originMessage));
        // 发送给消息预处理队列
        rabbitTemplate.convertAndSend("bulletPreProcessor-exchange",// 交换机名称
                "bullet.originMessage",// 路由Key
                map, correlationData);
    }
}

6.我们再对WebSocket的监听类做一下小改动,将收到的消息,封装一下,然后调用生产者的API即可。只需要注意一下多例下属性的注入方式是怎么写的即可

import kz.cache.SocketCache;
import kz.entity.OriginMessage;
import kz.producer.OriginMessageSender;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Getter;
/**
 * @author Zong0915
 * @date 2022/12/9 下午3:45
 */
@Component
@ServerEndpoint("/websocket/live/{roomId}/{userId}")
@Slf4j
@Getter
public class BulletScreenServer {
    /**
     * 多例模式下的赋值方式
     */
    private static OriginMessageSender originMessageSender;

    /**
     * 多例模式下的赋值方式
     */
    @Autowired
    private void setOriginMessageSender(OriginMessageSender originMessageSender) {
        BulletScreenServer.originMessageSender = originMessageSender;
    }

    private static final AtomicLong count = new AtomicLong(0);

    private Session session;
    private String sessionId;
    private String userId;
    private String roomId;

    /**
     * 打开连接
     * @param session
     * @OnOpen 连接成功后会自动调用该方法
     * @PathParam("token") 获取 @ServerEndpoint("/imserver/{userId}") 后面的参数
     */
    @OnOpen
    public void openConnection(Session session, @PathParam("roomId") String roomId, @PathParam("userId") String userId) {
        // 如果是游客观看视频,虽然有弹幕,但是没有用户信息,所以需要用try
        count.incrementAndGet();
        log.info("*************WebSocket连接次数: {} *************", count.longValue());
        this.userId = userId;
        this.roomId = roomId;
        // 保存session相关信息到本地
        this.sessionId = session.getId();
        this.session = session;
        SocketCache.put(sessionId, this);
    }

    /**
     * 客户端刷新页面,或者关闭页面,服务端断开连接等等操作,都需要关闭连接
     */
    @OnClose
    public void closeConnection() {
        SocketCache.remove(sessionId);
    }

    /**
     * 客户端发送消息给服务端
     * @param message
     */
    @OnMessage
    public void onMessage(String message) {
        if (StringUtils.isBlank(message)) {
            return;
        }
        // 将消息丢给MQ,业务上的处理什么也不管,交给弹幕业务来处理,并且达到削峰的目的
        originMessageSender.send(buildMessage(message));
    }

    private OriginMessage buildMessage(String message) {
        OriginMessage originMessage = new OriginMessage();
        originMessage.setMessage(message);
        originMessage.setRoomId(roomId);
        originMessage.setSessionId(sessionId);
        originMessage.setUserId(userId);
        return originMessage;
    }
}

备注:记得将另一个Socket项目也改造成同样的代码。

二. 弹幕服务创建

2.1 创建一个公共maven项目

我们创建一个maven项目:service-bulletcommon。先看下最终的项目架构:
在这里插入图片描述

1.pom依赖添加一些常用的工具:

<groupId>bullet-service</groupId>
<artifactId>service-bulletcommon</artifactId>
<version>1.0-SNAPSHOT</version>

<dependencies>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.12.0</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.79</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.24</version>
    </dependency>
    <dependency>
        <groupId>commons-collections</groupId>
        <artifactId>commons-collections</artifactId>
        <version>3.2.2</version>
    </dependency>
</dependencies>

2.创建一个常量定义类SocketConstants

/**
 * @author Zong0915
 * @date 2022/12/15 下午3:59
 */
public class SocketConstants {
    /**
     * 这条消息是否处理过
     */
    public static final String CORRELATION_SET_PRE = "Correlation_Set_";
    /**
     * 同一个房间里面有哪些SessionID
     */
    public static final String ROOM_LIVE_USER_SET_PRE = "ROOM_LIVE_USER_Set_";

    public static final String MESSAGE = "message";
    public static final String ID = "id";
    /**
     * 原始消息所在队列
     */
    public static final String ORIGIN_BULLET_QUEUE = "originBullet-queue";
    /**
     * 广播队列A
     */
    public static final String BULLET_SOCKET_QUEUE_A = "bulletSocket-queueA";
    /**
     * 广播队列B
     */
    public static final String BULLET_SOCKET_QUEUE_B = "bulletSocket-queueB";
    /**
     * 弹幕预处理交换机
     */
    public static final String BULLET_PRE_PROCESSOR_EXCHANGE = "bulletPreProcessor-exchange";
    /**
     * 弹幕广播交换机
     */
    public static final String BULLET_FANOUT_EXCHANGE = "bulletFanOut-exchange";
    /**
     * 弹幕预处理路由Key
     */
    public static final String BULLET_ORIGIN_MESSAGE_ROUTE_KEY = "bullet.originMessage";
}

3.创建一个消息传输体OriginMessage

import lombok.Data;

/**
 * @author Zong0915
 * @date 2022/12/15 下午2:07
 */
@Data
public class OriginMessage {
    private String sessionId;
    private String userId;
    private String roomId;
    private String message;
}

2.2 弹幕服务项目创建

1.我们创建一个maven项目:service-bulletscreen。先看下最终的项目架构:
在这里插入图片描述

1.pom文件:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.2.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        <version>2.2.1.RELEASE</version>
        <exclusions>
            <exclusion>
                <artifactId>archaius-core</artifactId>
                <groupId>com.netflix.archaius</groupId>
            </exclusion>
            <exclusion>
                <artifactId>commons-io</artifactId>
                <groupId>commons-io</groupId>
            </exclusion>
            <exclusion>
                <artifactId>commons-lang3</artifactId>
                <groupId>org.apache.commons</groupId>
            </exclusion>
            <exclusion>
                <artifactId>fastjson</artifactId>
                <groupId>com.alibaba</groupId>
            </exclusion>
            <exclusion>
                <artifactId>guava</artifactId>
                <groupId>com.google.guava</groupId>
            </exclusion>
            <exclusion>
                <artifactId>httpclient</artifactId>
                <groupId>org.apache.httpcomponents</groupId>
            </exclusion>
            <exclusion>
                <artifactId>servo-core</artifactId>
                <groupId>com.netflix.servo</groupId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
        <version>2.6.7</version>
        <exclusions>
            <exclusion>
                <artifactId>log4j-api</artifactId>
                <groupId>org.apache.logging.log4j</groupId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>bullet-service</groupId>
        <artifactId>service-bulletcommon</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
</dependencies>

2.application.properties

spring.application.name=tv-service-bulletscreen
spring.cloud.nacos.discovery.server-addr=你的Nacos地址:8848

3.bootstrap.yml文件:

server:
  port: 83

spring:
  application:
    name: tv-service-bulletscreen
  redis:
    database: 0 # Redis数据库索引(默认为0)
    host: 你的Redis地址 # Redis的服务地址
    port: 6379 # Redis的服务端口
    password: 密码
    jedis:
      pool:
        max-active: 8 # 连接池最大连接数(使用负值表示没有限制)
        max-wait: -1 # 连接池最大阻塞等待时间(使用负值表示没有限制)
        max-idle: 8 # 连接池中的最大空闲连接
        min-idle: 0 # 连接池中的最小空闲链接
    timeout: 30000 # 连接池的超时时间(毫秒)
  cloud:
    nacos:
      discovery:
        server-addr: 你的Nacos地址:8848
  rabbitmq:
    username: guest
    password: guest
    # 虚拟主机,默认是/
    virtual-host: /
    # 超时时间
    connection-timeout: 30000
    listener:
      simple:
        # 消费模式,手动
        acknowledge-mode: manual
        # 并发数
        concurrency: 5
        # 最大并发数
        max-concurrency: 10
        # 限流,如果严格控制消费顺序,这里应该填1,数值越大,消费处理速度越快。MQ会把这个数值的消息放到缓存当中。
        # 因此数值越大,内存占用越大,还需要考虑消费的速度
        prefetch: 10
    addresses: 你的RabbitMQ地址:5672

4.Redis配置类RedisConfig

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig {
    /**
     * 实例化 RedisTemplate 对象
     *
     * @return
     */
    @Bean
    public RedisTemplate<String, Object> functionDomainRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        initDomainRedisTemplate(redisTemplate, redisConnectionFactory);
        return redisTemplate;
    }
    /**
     * 设置数据存入 redis 的序列化方式,并开启事务
     *
     * @param redisTemplate
     * @param factory
     */
    private void initDomainRedisTemplate(RedisTemplate<String, Object> redisTemplate, RedisConnectionFactory factory) {
        //如果不配置Serializer,那么存储的时候缺省使用String,如果用User类型存储,那么会提示错误User can't cast to String!
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());

        // 开启事务
        redisTemplate.setEnableTransactionSupport(true);
        redisTemplate.setConnectionFactory(factory);
    }

    @Bean
    @ConditionalOnMissingBean(StringRedisTemplate.class)
    public StringRedisTemplate stringRedisTemplate(
            RedisConnectionFactory redisConnectionFactory) {
        StringRedisTemplate template = new StringRedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }
}

2.2.1 创建队列和广播型交换机

创建一个广播模式的交换机bulletFanOut-exchange:其实用direct也可以,因为我只要监听的队列用同一个即可,这里只是进行一个模拟。
在这里插入图片描述

分别为两个Socket服务创建个队列,用来接收处理好的消息(练习下广播模式):

  • bulletSocket-queueA
  • bulletSocket-queueB

再分别为他们和上述创建好的交换机进行绑定。
在这里插入图片描述

我们的弹幕服务主要做两件事:

  • 监听预处理队列,数据来自:originBullet-queue
  • 将处理完的消息通过广播,发送给bulletSocket-queueA/B两个队列。

RabbitMQ配置类如下:

import kz.common.SocketConstants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author Zong0915
 * @date 2022/12/15 下午1:29
 */
@Configuration
public class RabbitMQConfig {
    @Bean
    public Queue initDirectQueue() {
        return new Queue(SocketConstants.ORIGIN_BULLET_QUEUE, true);
    }
	
    @Bean
    public Queue initFanoutSocketQueueA() {
        return new Queue(SocketConstants.BULLET_SOCKET_QUEUE_A, true);
    }

    @Bean
    public Queue initFanoutSocketQueueB() {
        return new Queue(SocketConstants.BULLET_SOCKET_QUEUE_B, true);
    }

    @Bean
    DirectExchange initDirectExchange() {
        return new DirectExchange(SocketConstants.BULLET_PRE_PROCESSOR_EXCHANGE, true, false);
    }

    @Bean("fanoutExchange")
    FanoutExchange initFanoutExchange() {
        return new FanoutExchange(SocketConstants.BULLET_FANOUT_EXCHANGE, true, false);
    }

    @Bean
    Binding initBindingDirect() {
        return BindingBuilder.bind(initDirectQueue()).to(initDirectExchange()).with(SocketConstants.BULLET_ORIGIN_MESSAGE_ROUTE_KEY);
    }

    @Bean
    Binding initBindingFanoutA(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(initFanoutSocketQueueA()).to(fanoutExchange);
    }

    @Bean
    Binding initBindingFanoutB(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(initFanoutSocketQueueB()).to(fanoutExchange);
    }
}

2.2.2 生产者发送最终弹幕数据

创建FanoutMessageProducer类:记得向我们上面绑定的广播交换机发送数据。

import com.alibaba.fastjson.JSONObject;
import kz.entity.OriginMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * @author Zong0915
 * @date 2022/12/15 下午2:51
 */
@Component
public class FanoutMessageProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(OriginMessage originMessage) {
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());// 唯一ID
        Map<String, Object> map = new HashMap<>();
        map.put("message", JSONObject.toJSONString(originMessage));

        rabbitTemplate.convertAndSend("bulletFanOut-exchange",// 交换机名称
                "",// 路由Key
                map, correlationData);
    }
}

2.2.3 消费者监听原始弹幕数据

创建OriginMessageConsumer类:

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import kz.service.BulletScreenService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

/**
 - @author Zong0915
 - @date 2022/12/15 下午1:57
 */
@Component
@Slf4j
public class OriginMessageConsumer {
    @Autowired
    private BulletScreenService bulletScreenService;

    /**
     * 处理原始消息
     *
     * @param testMessage Map类型的消息体
     * @param headers     消息头
     * @param channel     消息所在的管道
     */
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "originBullet-queue", durable = "true"),
                    // 默认的交换机类型就是direct
                    exchange = @Exchange(name = "bulletPreProcessor-exchange", type = "direct"),
                    key = "bullet.originMessage"
            )
    )
    @RabbitHandler
    public void onOriginMessage(@Payload Map testMessage, @Headers Map<String, Object> headers,
                                Channel channel) throws IOException {
        log.info("***********消费开始*************");
        log.info("消费体:{}", JSONObject.toJSONString(testMessage));
        bulletScreenService.processMessage(testMessage, headers, channel);
    }
}

2.创建BulletScreenService类用于原始弹幕的业务处理,主要考虑的几个点:

  • 消息的合法性校验。
  • 消息的幂等性保证,这里用了Redis做个存储。
  • 将原始数据处理完后,在丢给MQ进行广播。
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import kz.common.SocketConstants;
import kz.entity.OriginMessage;
import kz.producer.FanoutMessageProducer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * @author Zong0915
 * @date 2022/12/9 下午3:45
 */
@Service
@Slf4j
public class BulletScreenService {
    @Autowired
    private StringRedisTemplate redisTemplate;
    @Autowired
    private FanoutMessageProducer fanoutMessageProducer;

    @Async
    public void processMessage(Map testMessage, Map<String, Object> headers,
                               Channel channel) throws IOException {
        OriginMessage originMessage = getOriginMessage(testMessage);
        // 合法性校验
        if (!validMessage(testMessage, headers, originMessage)) {
            return;
        }
        // 处理消息
        log.info("***********业务处理,弹幕: {}***********", originMessage.getMessage());
        String correlationId = headers.get(SocketConstants.ID).toString();
        // 存入Redis并设置过期时间1天
        redisTemplate.opsForSet().add(SocketConstants.CORRELATION_SET_PRE + originMessage.getRoomId(), correlationId);
        redisTemplate.expire(SocketConstants.CORRELATION_SET_PRE + originMessage.getRoomId(), 1, TimeUnit.DAYS);
        // 将处理好的消息发送给MQ,通过广播队列,将消息发送给所有的Socket服务,一般这里还会对originMessage进行一些二次封装
        // 本案例就不做处理了,原样返回
        fanoutMessageProducer.send(originMessage);
        // 确认消息
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        channel.basicAck(deliveryTag, false);
    }

    public OriginMessage getOriginMessage(Map testMessage) {
        String messageJson = (String) testMessage.get(SocketConstants.MESSAGE);
        if (StringUtils.isBlank(messageJson)) {
            return null;
        }
        OriginMessage originMessage = JSONObject.parseObject(messageJson, OriginMessage.class);
        return originMessage;
    }

    /**
     * 对消息进行合法性校验
     */
    public boolean validMessage(Map testMessage, Map<String, Object> headers, OriginMessage originMessage) {
        // 判空
        if (testMessage == null || testMessage.size() == 0 || originMessage == null) {
            return false;
        }
        if (headers == null || headers.size() == 0) {
            return false;
        }
        // 幂等性校验,如果消息已经被消费过了,那么这个弹幕消息就不应该被二次消费,这个消息就直接把他处理掉
        UUID correlationId = (UUID) headers.get(SocketConstants.ID);
        Boolean exist = redisTemplate.opsForSet().isMember(SocketConstants.CORRELATION_SET_PRE + originMessage.getRoomId(), correlationId.toString());
        return !Optional.ofNullable(exist).orElse(false);
    }
}

最后就是启动类BulletScreenApplication

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.scheduling.annotation.EnableAsync;

/**
 * @author Zong0915
 * @date 2022/12/10 下午9:44
 */
@SpringBootApplication
@EnableDiscoveryClient
@EnableAsync
public class BulletScreenApplication {
    public static void main(String[] args) {
        SpringApplication.run(BulletScreenApplication.class, args);
    }
}

2.3 Socket服务监听弹幕数据并返回前端

记得在pom依赖中引入上面的公共包:

<dependency>
   <groupId>bullet-service</groupId>
    <artifactId>service-bulletcommon</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>

2.3.1 配置类

RabbitMQ配置类增加下队列和交换机的配置信息:绑定bulletSocket-queueA

@Bean
public Queue initFanoutSocketQueueA() {
    return new Queue(SocketConstants.BULLET_SOCKET_QUEUE_A, true);
}

@Bean("fanoutExchange")
FanoutExchange initFanoutExchange() {
    return new FanoutExchange(SocketConstants.BULLET_FANOUT_EXCHANGE, true, false);
}

@Bean
Binding initBindingFanoutA(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(initFanoutSocketQueueA()).to(fanoutExchange);
}

另一个Socket项目,添加以下配置:绑定bulletSocket-queueB

@Bean
public Queue initFanoutSocketQueueB() {
    return new Queue(SocketConstants.BULLET_SOCKET_QUEUE_B, true);
}

@Bean("fanoutExchange")
FanoutExchange initFanoutExchange() {
    return new FanoutExchange(SocketConstants.BULLET_FANOUT_EXCHANGE, true, false);
}

@Bean
Binding initBindingFanoutA(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(initFanoutSocketQueueB()).to(fanoutExchange);
}

再写一个缓存工具类,通过直播间号获得同一个直播间下的所有WebSocket信息:

public class SocketCache {
	public static List<BulletScreenServer> getSocketGroupByRoomId(String roomId) {
        ArrayList<BulletScreenServer> res = new ArrayList<>();
        if (StringUtils.isBlank(roomId)) {
            return res;
        }
        for (Map.Entry<Integer, ConcurrentHashMap<String, BulletScreenServer>> hashMapEntry : CACHE_SEGMENT.entrySet()) {
            ConcurrentHashMap<String, BulletScreenServer> map = hashMapEntry.getValue();
            if (map == null || map.size() == 0) {
                continue;
            }
            for (BulletScreenServer server : map.values()) {
                if (server.getSession().isOpen() && StringUtils.equals(roomId, server.getRoomId())) {
                    res.add(server);
                }
            }
        }
        return res;
    }
}

2.3.2 消费者

重点就是消费者的业务代码了,对最终的弹幕数据进行广播,创建FanOutMessageConsumer类:

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import kz.cache.SocketCache;
import kz.common.SocketConstants;
import kz.entity.OriginMessage;
import kz.service.BulletScreenServer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
 * @author Zong0915
 * @date 2022/12/15 下午1:57
 */
@Component
@Slf4j
public class FanOutMessageConsumer {

    /**
     * 处理弹幕消息,开始广播
     *
     * @param testMessage Map类型的消息体
     * @param headers     消息头
     * @param channel     消息所在的管道
     */
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "bulletSocket-queueA", durable = "true"),
                    // 默认的交换机类型就是direct
                    exchange = @Exchange(name = "bulletFanOut-exchange", type = "fanout")
            )
    )
    @RabbitHandler
    public void onOriginMessage(@Payload Map testMessage, @Headers Map<String, Object> headers, Channel channel) throws IOException {
        log.info("***********消费开始, Socket服务A接收到广播消息*************");
        log.info("消费体:{}", JSONObject.toJSONString(testMessage));
        OriginMessage originMessage = getOriginMessage(testMessage);
        if (originMessage == null) {
            return;
        }
        // 根据roomID去找到同一个直播间下的所有用户并广播消息
        List<BulletScreenServer> socketGroupByRoomId = SocketCache.getSocketGroupByRoomId(originMessage.getRoomId());
        for (BulletScreenServer bulletScreenServer : socketGroupByRoomId) {
            bulletScreenServer.getSession().getBasicRemote().sendText(JSONObject.toJSONString(originMessage));
        }
        // 确认消息
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        channel.basicAck(deliveryTag, false);
    }

    public OriginMessage getOriginMessage(Map testMessage) {
        String messageJson = (String) testMessage.get(SocketConstants.MESSAGE);
        if (StringUtils.isBlank(messageJson)) {
            return null;
        }
        OriginMessage originMessage = JSONObject.parseObject(messageJson, OriginMessage.class);
        return originMessage;
    }
}

另一个Socket服务则改一下消费者的监听队列和日志内容即可:

在这里插入图片描述

2.4 测试

打开同一个直播间的两个用户,让两个WebSocket正好建立到不同的服务器上:
在这里插入图片描述
此时Socket服务A:
在这里插入图片描述

Socket服务B:
在这里插入图片描述
页面A中随便发送一条弹幕:
在这里插入图片描述
页面B中随便发送一条弹幕:
在这里插入图片描述

1.前端发送一条弹幕,后端监听到,开始向预处理队列丢消息。
在这里插入图片描述
2.service-bulletscreen服务,监听到预处理队列数据,开始进行处理。

在这里插入图片描述
3.经过一系列校验和幂等性处理之后,将处理完的弹幕通过交换机发送给广播队列:
在这里插入图片描述
4.Socket服务B接收到消息:
在这里插入图片描述

Socket服务A接收到广播消息:
在这里插入图片描述

5.前端页面展示:

页面A:
在这里插入图片描述

页面B:
在这里插入图片描述

到这里,一个聊天服务就完成了。不过大家也看到在线人数这块咱没做。可以用Redis缓存来记录每个直播间的人数。这个功能放到下一篇文章来讲。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/92263.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

安卓11上的存储权限问题

这篇文章&#xff0c;想来发布的有些晚了&#xff0c;安卓11已经发布多时了&#xff0c;关于安卓11上的存储权限变更的文章数不胜数&#xff0c;所以这篇文章只做为自己的一个简单的记录吧&#xff01; 在说11之前&#xff0c;我们先回忆以下10上存储权限的变更&#xff1a;每…

GaiaX开源解读 | 给Stretch(Rust编写的Flexbox布局引擎)新增特性,我掉了好多头发

GaiaX&#xff08;盖亚&#xff09;&#xff0c;是在阿里文娱内广泛使用的Native动态化方案&#xff0c;其核心优势是性能、稳定和易用。本系列文章《GaiaX开源解读》&#xff0c;带大家看看过去三年GaiaX的发展过程。 GaiaX的布局方案 - Flexbox 阿里文娱业务作为一个内容分发…

Linux调试器——gdb

gdb什么是gdbdebug与releasegdb的基本操作查看代码与断点执行与调试监视变量什么是gdb 之前用的一直都是VS编译器进行调试&#xff0c;调试是一个非常重要的过程&#xff0c;在Linux中调试需要用到一个工具就是gdb。 在调试思路上VS编译器和gdb是一样的&#xff0c;但是调试过…

【云原生进阶之容器】第一章Docker核心技术1.5.1节——cgroup综述

1. cgroups概述 1.1 为什么需要cgroup 在Linux里,一直以来就有对进程进行分组的概念和需求,比如session group, progress group等,后来随着人们对这方面的需求越来越多,比如需要追踪一组进程的内存和IO使用情况等,于是出现了cgroup,用来统一将进程进行分组,并在分组的…

51单片机——静态数码管实验,小白讲解

数码管介绍&#xff1a; 数码管是一种半导体发光器件&#xff0c;其基本单元是发光二极管。数码管也称LED数码管&#xff0c;不同行业人士对数码管的称呼不一样&#xff0c;其实都是同样的产品。数码管按段数可分为七段数码管和八段数码管&#xff0c;八段数码管比七段数码管多…

中国计算机大会CNCC【笔记】

中国计算机大会CNCC【笔记】前言推荐中国计算机大会CNCCCNCC 青年精英思想秀主题&#xff1a;当呼吸化为空气——物联网安全云原生一站式数据管理与服务 : 构建云计算数据平台生态计算产业未来应用场景与创新方向展望用开源打造云原生数据库的生态系统CCF 优博的培养与成长最后…

华为HI第二款车,阿维塔11的智能化有什么特点?

作者 | 德新 编辑 | 于婷阿维塔11&#xff0c;这款车不用过多介绍&#xff0c;长安 x 宁德 x 华为 3家联合打造。外观独特&#xff0c;在宝马17年的顶尖设计师Nader Faghihzadeh主导的设计&#xff1b;用料也很足&#xff0c;90度 - 116度的电池&#xff0c;34.99万的起售价&am…

利用 ALV 实现增删改查系列之一:让 ALV 报表进入可编辑状态试读版

在 CSDN 和我的知识星球里有朋友向我提出同样的问题&#xff0c;询问如何在 ALV 里实现增删改查操作。 虽然需求只有一句话&#xff0c;但是这个需求背后涉及到的知识点不少&#xff0c;因此笔者会通过几篇文章的篇幅&#xff0c;来介绍这个需求的详细实现步骤。 本文先解决第…

【Linux学习】之访问命令行

【Linux学习】之访问命令行 文章目录【Linux学习】之访问命令行一、基础知识二、练习1.使用 date 命令来显示当前的日期和时间。2.以12小时制显示当前时间(例如&#xff0c;11:42:11AM)。3. 查看/home/student/zcat 的文件类型&#xff0c;是否被人读取?4.使用wc命令和 Bash 快…

看过来,2022最后一期大咖说-大厂可观测来啦~

可观测性的应用创新与落地研讨会 「UGeek大咖说-大厂可观测」 最后一期 活动时间&#xff1a;2022/12/21 15:00 — 17:30 活动平台&#xff1a;线上网络直播间 主办单位&#xff1a;优维科技 内容介绍 白驹过隙&#xff0c;转眼就到了2022年末啦~ 「UGeek大咖说-大厂可观…

A. Tower(暴力 + 看数据范围)

Problem - A - Codeforces 彭教授建造了n个不同高度的积木塔。第i座塔的高度为ai。 寿教授不喜欢这些塔&#xff0c;因为它们的高度是任意的。他决定首先精确地移除其中的m个&#xff0c;然后执行以下一些&#xff08;或不执行&#xff09;操作。 选择一座塔&#xff0c;将其…

MongoDB——Java Client API(Spring Data MongoDB)

[TOC](MongoDB——Java Client API(Spring Data MongoDB)) MongoDB——Java Client API(Spring Data MongoDB) 关于文档注解 由于mongoDB使用是BSON进行存储&#xff0c;Java则是类与对象的概念&#xff0c;所以设计了一套注解用于标注 Document 范围&#xff1a;类 作用&…

表的增删查改基本查询(where-group by-having)

文章目录表的操作表的创建修改表属性&#xff08;轻易不要改&#xff09;数据类型分类类型测试表的增删查改增加插入insert插入否则更新替换Retrieve&#xff08;检索&#xff09;查找selectwhere条件语句的添加姓孙的和孙某where语句无法使用别名的问题&#xff1f;语文成绩&g…

OAK相机depthai最全上手教程

编辑&#xff1a;OAK中国 首发&#xff1a;oakchina.cn 喜欢的话&#xff0c;请多多&#x1f44d;⭐️✍ 内容可能会不定期更新&#xff0c;官网内容都是最新的&#xff0c;请查看首发地址链接。 ▌前言 Hello&#xff0c;大家好&#xff0c;这里是OAK中国&#xff0c;我是助手…

一种数据驱动的自动驾驶汽车前馈补偿器优化方法(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f389;3 参考文献 &#x1f468;‍&#x1f4bb;4 Matlab代码 &#x1f4a5;1 概述 一个可靠的控制器对于自动驾驶汽车的安全和平稳操纵的执行至关重要。控制器必须对外部干扰&#xff08;如路面、天气、风况等&…

javaweb文件下载案例

html代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title> </head> <body> <a href"./img/1.jpg">图片</a> <!--这个下载效果是download实…

pikachu靶场越权漏洞实战

今天继续给大家介绍渗透测试相关知识&#xff0c;本文主要内容是pikachu靶场越权漏洞实战。 免责声明&#xff1a; 本文所介绍的内容仅做学习交流使用&#xff0c;严禁利用文中技术进行非法行为&#xff0c;否则造成一切严重后果自负&#xff01; 再次强调&#xff1a;严禁对未…

01背包问题以及有关题目

一、01背包问题详解 确定dp数组以及下标的含义 使用二维数组 dp[i] [j] 表示从下标为[0-i]的物品里任意取&#xff0c;放进容量为j的背包&#xff0c;价值总和最大是多少。 确定递推公式 dp数组的初始化 首先从dp[i][j] 的定义出发&#xff0c;如果背包容量j为0的话&#…

kobject应用实例--在/sys下创建设备的属性节点

本文讲解如何利用内核提供的接口&#xff0c;在/sys下创建设备的属性节点&#xff0c;实现属性的读写接口。 1、主要数据结构 一、kobject --> 目录&#xff1b;kobj_type --> 属性文件 使用到的内核数据结构如下&#xff1a; struct kobject {const char *name;…

1996-2020年31省主要农业机械年末拥有量相关数据

1996-2020年31省主要农业机械年末拥有量 1、时间&#xff1a;1996-2020年 2、范围&#xff1a;包括全国31省 3、指标包括&#xff1a; 农用机械总动力&#xff08;万千瓦&#xff09;、大中型拖拉机&#xff08;台&#xff09;、小型拖拉机&#xff08;台&#xff09;、大中…