SpringBoot下RabbitMQ的实战应用:动态创建和动态监控队列、死信、备份交换机

news2025/1/11 4:14:55

一、应用场景

  • 业务中心根据业务需求向特定用户发送消息;发送前不确定由哪个用户接收

在这里插入图片描述

  • 特定用户接收特定消息;用户可以退出,再切换别的用户登录,用户登录后只接收与自已对应的消息

在这里插入图片描述

二、总体要求

项目要足够稳健,消息不能丢失

  • 交换机、队列、消息持久化

  • 队列有容量限制;如:3000

  • 消息发送后需要确认(非自动确认)

  • 未发送成功的消息,由缓存保存,定时重发

  • 交换机收到消息,但无法投递时,转发至备份交换机,再广播至对应队列

三、架构图

在这里插入图片描述

四、安装RabbitMQ

参考如下三篇文章

  • 【RabbitMQ】RabbitMQ入门及安装

  • 【RabbitMQ】Docker中安装RabbitMQ

  • 【图文详解】RabbitMQ集群搭建、镜像队列、负载均衡HAProxy、故障转移Keepalived

五、搭建SpringBoot项目

  • java1.8

  • spring-boot 2.6.7

1、依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.7</version>
    </parent>

    <modelVersion>4.0.0</modelVersion>

    <groupId>com.tuwer</groupId>
    <artifactId>mq</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <!-- amqp-client -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- amqp-client Java原生依赖 -->
<!--        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.14.2</version>
        </dependency>-->
        <!-- hutool-all -->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.2</version>
        </dependency>
        <!-- jackson -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.13.3</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jsr310</artifactId>
            <version>2.13.3</version>
        </dependency>
        <dependency>
            <groupId>jakarta.json</groupId>
            <artifactId>jakarta.json-api</artifactId>
            <version>2.0.1</version>
        </dependency>
        <!-- fastjson -->
        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.3</version>
        </dependency>
        <!-- lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
        <!-- 工具类 -->
        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.6</version>
        </dependency>
        <!-- 测试 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

2、application.yml

spring:
  rabbitmq:
    host: 192.168.3.174
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    # 交换机接收确认
    publisher-confirm-type: correlated
    # 交换机回退消息
    #publisher-returns: true

2、启动类

package com.tuwer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author 土味儿
 * Date 2023/1/4
 * @version 1.0
 */
@SpringBootApplication
public class MqApp {
    public static void main(String[] args) {
        SpringApplication.run(MqApp.class, args);
    }
}

3、基础类

3.1、常量类

package com.tuwer.constant;

/**
 * <p>系统常量类</p>
 *
 * @author 土味儿
 * Date 2023/1/4
 * @version 1.0
 */
public class Constants {
    /**
     * 队列容量、通道预取值
     * 队列容量应根据项目需要,设置合适的值;
     * 本案例中为了测试方便设为5
     */
    public static final int QUEUE_CAPACITY = 5;
    public static final int PRE_FETCH_SIZE = 10;

    /**
     * 交换机
     */
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    public static final String BACKUP_EXCHANGE = "backup_exchange";

    /**
     * 队列
     */
    public static final String BACKUP_QUEUE = "backup_queue";
}

3.2、雪花算法工具类

获取Long型id:SnowflakeUtil.getInstance().nextId()

package com.tuwer.util;

import lombok.extern.slf4j.Slf4j;

import java.text.MessageFormat;

/**
 * <p>雪花算法工具类</p>
 *
 * @author 土味儿
 * Date 2022/6/2
 * @version 1.0
 */
@Slf4j
@SuppressWarnings("all")
public class SnowflakeUtil {
    // ==============================Fields===========================================
    /**
     * 开始时间戳 (2000-01-01 00:00:00)
     */
    private static final long TWEPOCH = 946656000000L;

    /**
     * 机器id所占的位数 5
     */
    private static final long WORKER_ID_BITS = 5L;

    /**
     * 数据标识id所占的位数 5
     */
    private static final long DATA_CENTER_ID_BITS = 5L;

    /**
     * 支持的最大机器id,结果是 31
     */
    private static final long MAX_WORKER_ID = ~(-1L << WORKER_ID_BITS);

    /**
     * 支持的最大数据标识id,结果是 31
     */
    private static final long MAX_DATA_CENTER_ID = ~(-1L << DATA_CENTER_ID_BITS);

    /**
     * 序列在id中占的位数
     */
    private static final long SEQUENCE_BITS = 12L;

    /**
     * 机器ID向左移12位
     */
    private static final long WORKER_ID_SHIFT = SEQUENCE_BITS;

    /**
     * 数据标识id向左移17位(12+5)
     */
    private static final long DATA_CENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS;

    /**
     * 时间戳向左移22位(5+5+12)
     */
    private static final long TIMESTAMP_LEFT_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATA_CENTER_ID_BITS;

    /**
     * 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095)
     */
    private static final long SEQUENCE_MASK = ~(-1L << SEQUENCE_BITS);

    /**
     * 步长 1024
     */
    private static final long STEP_SIZE = 1024;

    /**
     * unsigned int max value
     */
    private static final long UINT_MAX_VALUE = 0xffffffffL;

    /**
     * 工作机器ID(0~31)
     */
    private long workerId;

    /**
     * 工作机器ID 计数器
     */
    private long workerIdFlags = 0L;

    /**
     * 数据中心ID(0~31)
     */
    private long dataCenterId;

    /**
     * 数据中心ID 计数器
     */
    private long dataCenterIdFlags = 0L;

    /**
     * 毫秒内序列(0~4095)
     */
    private long sequence = 0L;

    /**
     * 毫秒内序列基数[0|1024|2048|3072]
     */
    private long basicSequence = 0L;

    /**
     * 上次生成ID的时间戳
     */
    private long lastTimestamp = -1L;

    /**
     * 工作模式
     */
    private final WorkMode workMode;

    public enum WorkMode {NON_SHARED, RATE_1024, RATE_4096;}

    //==============================单例模式(静态内部类)=====================================
    private static class InnerClass{
        private static final SnowflakeUtil INNER_DEMO = new SnowflakeUtil();
    }
    public static SnowflakeUtil getInstance(){
        return InnerClass.INNER_DEMO;
    }

    //==============================Constructors=====================================

    public SnowflakeUtil() {
        this(0, 0, WorkMode.RATE_4096);
    }

    /**
     * 构造函数
     *
     * @param workerId     工作ID (0~31)
     * @param dataCenterId 数据中心ID (0~31)
     */
    public SnowflakeUtil(long workerId, long dataCenterId) {
        this(workerId, dataCenterId, WorkMode.RATE_4096);
    }

    /**
     * 构造函数
     *
     * @param workerId     工作ID (0~31)
     * @param dataCenterId 数据中心ID (0~31)
     * @param workMode     工作模式
     */
    public SnowflakeUtil(long workerId, long dataCenterId, WorkMode workMode) {
        this.workMode = workMode;
        if (workerId > MAX_WORKER_ID || workerId < 0) {
            throw new IllegalArgumentException(MessageFormat.format("worker Id can't be greater than {0} or less than 0", MAX_WORKER_ID));
        }
        if (dataCenterId > MAX_DATA_CENTER_ID || dataCenterId < 0) {
            throw new IllegalArgumentException(MessageFormat.format("datacenter Id can't be greater than {0} or less than 0", MAX_DATA_CENTER_ID));
        }
        this.workerId = workerId;
        this.workerIdFlags = setSpecifiedBitTo1(this.workerIdFlags, this.workerId);
        this.dataCenterId = dataCenterId;
        this.dataCenterIdFlags = setSpecifiedBitTo1(this.dataCenterIdFlags, this.dataCenterId);
    }

    // ==============================Methods==========================================

    /**
     * 获取机器id
     *
     * @return 所属机器的id
     */
    public long getWorkerId() {
        return workerId;
    }

    /**
     * 获取数据中心id
     *
     * @return 所属数据中心id
     */
    public long getDataCenterId() {
        return dataCenterId;
    }

    /**
     * 获得下一个ID (该方法是线程安全的)
     *
     * @return SnowflakeId
     */
    public synchronized long nextId() {
        long timestamp = timeGen();
        //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
        if (timestamp < this.lastTimestamp) {
            if (timestamp > TWEPOCH) {
                if (WorkMode.NON_SHARED == this.workMode) {
                    nonSharedClockBackwards(timestamp);
                } else if (WorkMode.RATE_1024 == this.workMode) {
                    rate1024ClockBackwards(timestamp);
                } else {
                    throw new RuntimeException(MessageFormat.format("Clock moved backwards. Refusing to generate id for {0} milliseconds", lastTimestamp - timestamp));
                }
            } else {
                throw new RuntimeException(MessageFormat.format("Clock moved backwards. Refusing to generate id for {0} milliseconds", lastTimestamp - timestamp));
            }
        }
        //如果是同一时间生成的,则进行毫秒内序列
        if (this.lastTimestamp == timestamp) {
            this.sequence = (this.sequence + 1) & SEQUENCE_MASK;
            //毫秒内序列溢出
            if (this.sequence == 0) {
                //阻塞到下一个毫秒,获得新的时间戳
                timestamp = tilNextMillis(this.lastTimestamp);
            }
        }
        //时间戳改变,毫秒内序列重置
        else {
            this.sequence = this.basicSequence;
        }
        //上次生成ID的时间戳
        this.lastTimestamp = timestamp;
        //移位并通过或运算拼到一起组成64位的ID
        return ((timestamp - TWEPOCH) << TIMESTAMP_LEFT_SHIFT)
                | (this.dataCenterId << DATA_CENTER_ID_SHIFT)
                | (this.workerId << WORKER_ID_SHIFT)
                | this.sequence;
    }

    /**
     * 阻塞到下一个毫秒,直到获得新的时间戳
     *
     * @param lastTimestamp 上次生成ID的时间戳
     * @return 当前时间戳
     */
    protected long tilNextMillis(long lastTimestamp) {
        long timestamp0;
        do {
            timestamp0 = timeGen();
        } while (timestamp0 <= lastTimestamp);
        return timestamp0;
    }

    /**
     * 返回以毫秒为单位的当前时间
     *
     * @return 当前时间(毫秒)
     */
    protected long timeGen() {
        return System.currentTimeMillis();
    }

    /**
     * 尝试解决时钟回拨<br>【* 仅用于 单机生成不对外 的情况 *】
     *
     * @param timestamp 当前时间戳
     * @return void
     */
    private void nonSharedClockBackwards(long timestamp) {
        if (this.dataCenterIdFlags >= UINT_MAX_VALUE && this.workerIdFlags >= UINT_MAX_VALUE) {
            throw new RuntimeException(MessageFormat.format("Clock moved backwards. Refusing to generate id for {0} milliseconds", lastTimestamp - timestamp));
        } else {
            //如果仅用于生成不重复的数值,尝试变更 dataCenterId 或 workerId 修复时钟回拨问题

            log.warn("Clock moved backwards. Refusing to generate id for {} milliseconds", lastTimestamp - timestamp);
            //先尝试变更 dataCenterId,当 dataCenterId 轮询一遍之后,尝试变更 workerId 并重置 dataCenterId
            if (this.dataCenterIdFlags >= UINT_MAX_VALUE) {
                if (++this.workerId > MAX_WORKER_ID) {
                    this.workerId = 0L;
                }
                this.workerIdFlags = setSpecifiedBitTo1(this.workerIdFlags, this.workerId);
                // 重置 dataCenterId 和 dataCenterIdFlags
                this.dataCenterIdFlags = this.dataCenterId = 0L;
            } else {
                if (++this.dataCenterId > MAX_DATA_CENTER_ID) {
                    this.dataCenterId = 0L;
                }
            }
            this.dataCenterIdFlags = setSpecifiedBitTo1(this.dataCenterIdFlags, this.dataCenterId);
            this.lastTimestamp = -1L;
            log.warn("Try to fix the clock moved backwards. timestamp : {}, worker Id : {}, datacenter Id : {}", timestamp, workerId, dataCenterId);
        }
    }

    /**
     * 尝试解决时钟回拨<br>【* 仅用于每毫秒生成量 不大于 1024 的情况 *】
     *
     * @param timestamp 当前时间戳
     * @return void
     */
    private void rate1024ClockBackwards(long timestamp) {
        if (this.basicSequence > (SEQUENCE_MASK - STEP_SIZE)) {
            throw new RuntimeException(MessageFormat.format("Clock moved backwards. Refusing to generate id for {0} milliseconds", lastTimestamp - timestamp));
        } else {
            log.warn("Clock moved backwards. Refusing to generate id for {} milliseconds", lastTimestamp - timestamp);
            this.basicSequence += STEP_SIZE;
            this.lastTimestamp = -1L;
            log.warn("Try to fix the clock moved backwards. timestamp : {}, basicSequence : {}", timestamp, basicSequence);
        }
    }

    /**
     * Set the specified bit to 1
     *
     * @param value raw long value
     * @param index bit index (From 0~31)
     * @return long value
     */
    private long setSpecifiedBitTo1(long value, long index) {
        return value |= (1L << index);
    }

    /**
     * Set the specified bit to 0
     *
     * @param value raw long value
     * @param index bit index (From 0~31)
     * @return long value
     */
    private long setSpecifiedBitTo0(long value, long index) {
        return value &= ~(1L << index);
    }

    /**
     * Get the specified bit
     *
     * @param value raw long value
     * @param index bit index(From 0-31)
     * @return 0 or 1
     */
    private int getSpecifiedBit(long value, long index) {
        return (value & (1L << index)) == 0 ? 0 : 1;
    }

}

3.3、缓存模型类

缓存操作不是本文的重点,用模型类代替;实际部署时可换作redis

package com.tuwer.cache;

import java.util.Set;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

/**
 * 模拟缓存
 *
 * @author 土味儿
 * Date 2023/1/3
 * @version 1.0
 */
public class CacheModel {
    /**
     * 并发Map
     */
    private static ConcurrentSkipListMap<Long, String> cache = new ConcurrentSkipListMap<>();

    /**
     * 存入缓存
     *
     * @param key
     * @param value
     */
    public static void put(Long key, String value) {
        cache.put(key, value);
        System.out.println("存入缓存;【key】" + key + "【value】" + value);
        print();
    }

    /**
     * 获取value
     *
     * @param key
     * @return
     */
    public static String get(Long key) {
        String v = cache.get(key);
        System.out.println("从缓存中获取;【key】" + key + "【value】" + v);
        return v;
    }

    /**
     * 删除key
     *
     * @param key
     */
    public static void del(Long key) {
        String v = cache.remove(key);
        System.out.println("从缓存中删除;【key】" + key + "【value】" + v);
        print();
    }

    /**
     * 删除小于等于key的多个值
     *
     * @param key
     */
    public static void delMany1(Long key) {
        Set<Long> keys = cache.keySet();
        int n = 0;
        for (Long k : keys) {
            if (k <= key) {
                cache.remove(k);
                n++;
            }
        }
        System.out.println("从缓存中删除小于等于;【" + key + "】的多个值;共有 " + n + " 个");
        print();
    }

    /**
     * 删除小于等于key的多个值
     * ConcurrentNavigableMap
     * @param key
     */
    public static void delMany(Long key) {
        // 得到批量确认信息Map(只能得到小于key的值)
        ConcurrentNavigableMap<Long, String> confirmMap = cache.headMap(key);
        System.out.println("从缓存中删除小于等于;【" + key + "】的多个值;共有 " + (confirmMap.size() + 1) + " 个");
        // 清空已经确认的
        confirmMap.clear();

        // 单独再删除等于key的值
        cache.remove(key);

        print();
    }

    public static void print() {
        System.out.println("当前缓存大小:" + cache.size());
        Set<Long> keys = cache.keySet();
        for (Long key : keys) {
            System.out.print(key);
            System.out.print(" | ");
            //System.out.println(cache.get(key));
        }
        System.out.println();
    }
}

4、配置类 MqConfig.java

package com.tuwer.config;

import com.tuwer.constant.Constants;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

/**
 * <p>配置类</p>
 *
 * @author 土味儿
 * Date 2023/1/5
 * @version 1.0
 */
@Configuration
public class MqConfig {
    @Resource
    private CachingConnectionFactory connectionFactory;
    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){        
        System.out.println("初始化...");
        // 删除普通空队列(排除掉备份队列),减轻RabbitMQ的压力
   }

    /**
     * 获取普通交换机
     *
     * @return
     */
    @Bean("normalExchange")
    public DirectExchange getNormalExchange() {
        return ExchangeBuilder
                .directExchange(Constants.NORMAL_EXCHANGE)
                // 持久化
                .durable(true)
                // 备份(候补)交换机
                .withArgument("alternate-exchange", Constants.BACKUP_EXCHANGE)
                .build();
    }

    /**
     * 备份交换机
     * 类型:fanout
     *
     * @return
     */
    @Bean("backupExchange")
    public FanoutExchange getBackupExchange() {
        return new FanoutExchange(Constants.BACKUP_EXCHANGE);
    }

    /**
     * 获取备份队列
     *
     * @return
     */
    @Bean("backupQueue")
    public Queue getBackupQueue() {
        return QueueBuilder.durable(Constants.BACKUP_QUEUE).build();
    }

    /**
     * 备份队列 绑定 备份交换机
     *
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding backupQueueBindBackupExchange(
            @Qualifier("backupQueue") Queue queue,
            @Qualifier("backupExchange") FanoutExchange exchange
    ) {
        return BindingBuilder.bind(queue).to(exchange);
    }

    /**
     * 用于动态创建队列、交换机,并绑定
     * @return
     */
    @Bean
    public RabbitAdmin rabbitAdmin(){
        //return new RabbitAdmin(connectionFactory);
        return new RabbitAdmin(rabbitTemplate);
    }

    /**
     * 用于设置动态监听
     * @return
     */
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        container.setConcurrentConsumers(10);
        container.setMaxConcurrentConsumers(50);
        // 手动确认
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 预取值
        container.setPrefetchCount(Constants.PRE_FETCH_SIZE);

        // 创建队列(在用户登录后创建)
        //createNormalQueueAndBind.create(username);
        //listenerContainer.setQueueNames("q_" + username);
        //listenerContainer.setMessageListener(myAckReceiver);

        return container;
    }
}

5、普通队列动态创建类

package com.tuwer.service;

import com.tuwer.constant.Constants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Locale;
import java.util.Objects;
import java.util.Properties;

/**
 * <p>动态创建队列</p>
 *
 * @author 土味儿
 * Date 2023/1/7
 * @version 1.0
 */
@Component
@Slf4j
public class CreateNormalQueueAndBind {
    @Resource
    private DirectExchange normalExchange;
    @Resource
    private RabbitAdmin rabbitAdmin;

    /**
     * 动态创建普通队列,并绑定至普通交换机
     *
     * @param routingKey
     */
    public void create(String routingKey) {
        String key = routingKey.toLowerCase(Locale.ROOT);
        String queueName = "q_" + key;

        // 创建普通队列
        Queue queue = null;

        // 查询队列属性,为null时,表示队列不存在
        Properties queueProperties = rabbitAdmin.getQueueProperties(queueName);

        // 队列不存在时,创建
        if (Objects.isNull(queueProperties)) {
            log.info("队列【{}】不存在,创建中...", queueName);
            queue = QueueBuilder.durable(queueName).maxLength(Constants.QUEUE_CAPACITY).build();
            rabbitAdmin.declareQueue(queue);
        }
        // 绑定至普通交换机
        if (Objects.nonNull(queue)) {
            Binding binding = BindingBuilder.bind(queue).to(normalExchange).with(key);
            rabbitAdmin.declareBinding(binding);
        }
    }
}

经过测试,如果队列满了的时候,再向队列发送消息时,最老的消息被丢弃,且不会启用备份交换机;为了防止信息丢失,加入死信交换机死信队列,当前队列满了的时候,最老的信息进入死信交换机,再转至死信队列

  • 原代码
// 原
queue = QueueBuilder.durable(queueName)
      .maxLength(Constants.QUEUE_CAPACITY)
      .build();
  • 新代码
// 新
queue = QueueBuilder.durable(queueName)
      // 设置队列长度
      .maxLength(Constants.QUEUE_CAPACITY)
      // 设置死信交换机
      .deadLetterExchange(Constants.BACKUP_EXCHANGE)
      // 设置死信RoutingKey(死信队列)
      .deadLetterRoutingKey(Constants.BACKUP_QUEUE)
      // 改变溢出规则:当队列溢出时,拒绝接收新消息
      //.overflow(QueueBuilder.Overflow.rejectPublish)
      .build();

6、发布确认回调类

package com.tuwer.service;

import com.tuwer.cache.CacheModel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

/**
 * <p>发布确认回调类</p>
 *
 * @author 土味儿
 * Date 2023/1/5
 * @version 1.0
 */
@Slf4j
@Component
public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback {
    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 把当前类注入到 RabbitTemplate
     *
     * @PostConstruct 表示在执行当前类的构造时运行
     * 因为 ConfirmCallback接口是 RabbitTemplate的内部类
     */
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 交换机确认回调方法
     *
     * @param correlationData 回调消息
     * @param ack             交换机是否确认收到了消息:true:收到了;false:没有收到
     * @param cause           没有收到消息的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        // 消息ID
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            // 收到
            log.info("交换机收到了消息!编号:{}", id);
            // 从缓存中删除
            CacheModel.del(Long.parseLong(id));
        } else {
            // 未收到
            log.info("交换机没有收到编号为:{} 的消息!原因:{}", id, cause);
        }
    }
}

7、生产者服务类

package com.tuwer.service;


import com.tuwer.cache.CacheModel;
import com.tuwer.constant.Constants;
import com.tuwer.util.SnowflakeUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.*;

/**
 * <p>生产者</p>
 *
 * @author 土味儿
 * Date 2023/1/4
 * @version 1.0
 */
@Component
@Slf4j
public class Producer {
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Resource
    private CreateNormalQueueAndBind createNormalQueueAndBind;

    /**
     * 发送消息
     *
     * @param msg
     */
    public void sendMsg(String msg, String routingKey) {
        String key = routingKey.toLowerCase(Locale.ROOT);
        // 创建普通队列,并绑定至普通交换机
        createNormalQueueAndBind.create(key);

        long id = SnowflakeUtil.getInstance().nextId();
        CorrelationData correlationData = new CorrelationData(String.valueOf(id));
        rabbitTemplate.convertAndSend(
                Constants.NORMAL_EXCHANGE,
                key,
                msg,
                correlationData);

        // 存入缓存
        CacheModel.put(id, msg);

        log.info("消息:【{}】已发送!编号:{}", msg, correlationData.getId());
    }
}

8、消费者服务类

8.1、备份队列消费者

备份队列固定不变;config中配置,系统启动后自动创建

package com.tuwer.service;

import com.tuwer.config.MqConfig;
import com.tuwer.constant.Constants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * <p>备份消费者</p>
 *
 * @author 土味儿
 * Date 2023/1/5
 * @version 1.0
 */
@Slf4j
@Component
public class BackupConsumer {
    /**
     * 监听备份队列消息
     * @param message
     */
    @RabbitListener(queues = Constants.BACKUP_QUEUE)
    public void receiveMsg(Message message){
        String msg = new String(message.getBody());
        log.info("接收到备份队列的消息:【{}】", msg);
    }
}

8.2、普通队列消费者

  • 队列不确定;用户登录后,动态切换要监控的队列;

  • 如果用户退出后,也要更新监控列表;省略

package com.tuwer.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;


/**
 * <p>普通消费者</p>
 *
 * @author 土味儿
 * Date 2023/1/5
 * @version 1.0
 */
@Slf4j
@Component
public class NormalConsumer {
    @Resource
    private SimpleMessageListenerContainer listenerContainer;
    @Resource
    private MyAckReceiver myAckReceiver;
    @Resource
    private CreateNormalQueueAndBind createNormalQueueAndBind;

    /**
     * 监听普通队列消息
     * 动态切换要监控的队列   
     *   
     * @param username
     */
    //@RabbitListener(queues = "队列名称")
    public void receiveMsg(String username) {
        // 创建队列
        createNormalQueueAndBind.create(username);
        // 设置要监听的队列(用set,不是add)
        listenerContainer.setQueueNames("q_" + username);
        // 设置消息接收器
        listenerContainer.setMessageListener(myAckReceiver);

        // 当前监听的队列列表
        String[] queueNames = listenerContainer.getQueueNames();
        System.out.println("当前监听的队列:");
        for (String queueName : queueNames) {
            System.out.println(queueName);
        }
        System.out.println("----------");
    }
}
  • 消息接收器
package com.tuwer.service;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

/**
 * <p>消息接收器</p>
 *
 * @author 土味儿
 * Date 2023/1/7
 * @version 1.0
 */
@Slf4j
@Component
public class MyAckReceiver implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            String consumerQueue = message.getMessageProperties().getConsumerQueue();
            String msg = new String(message.getBody());
            log.info("MyAckReceiver的【{}】队列,收到的消息:【{}】", consumerQueue, msg);
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            channel.basicReject(deliveryTag, false);
            e.printStackTrace();
        }
    }
}

9、Controller接口类

9.1、发送消息

http://localhost:8080/sendMsg/用户名/消息

package com.tuwer.controller;

import com.tuwer.service.Producer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * <p>发送消息</p>
 *
 * @author 土味儿
 * Date 2023/1/5
 * @version 1.0
 */
@Slf4j
@RestController
public class ProducerController {
    @Resource
    private Producer producer;

    /**
     * 发送消息
     *
     * @param msg      消息内容
     * @param username 接收消息的用户
     * @return
     */
    @GetMapping("/sendMsg/{username}/{msg}")
    public String sendMsg(
            @PathVariable("msg") String msg,
            @PathVariable("username") String username
    ) {
        producer.sendMsg(msg, username);

        return "OK!";
    }
}

9.2、模拟用户登录

[http://localhost:8080/login/用户名

package com.tuwer.controller;

import com.tuwer.service.NormalConsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * <p>模拟登录</p>
 *
 * @author 土味儿
 * Date 2023/1/7
 * @version 1.0
 */
@Slf4j
@RestController
@RequestMapping("/login")
public class LoginController {
    @Resource
    private NormalConsumer consumer;

    /**
     * 登录
     *
     * @param username
     * @return
     */
    @GetMapping("/{username}")
    public String login(
            @PathVariable("username") String username
    ) {
        log.info("用户 {} 已登录", username);

        // 接收该用户的队列消息
        consumer.receiveMsg(username);

        return username;
    }
}

10、定时重发

缓存中未得到确认的消息,由定时器重新发送;省略…

11、项目结构图

在这里插入图片描述

六、测试

1、启动后自动创建交换机、队列

  • 启动前

在这里插入图片描述

在这里插入图片描述

  • 启动后

在这里插入图片描述

在这里插入图片描述

2、向用户user1发送消息

http://localhost:8080/sendMsg/user1/测试1

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

3、登录user1接收消息

http://localhost:8080/login/user1

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

4、向user2user3分别发消息

当前登录用户是user1,向用户user2user3发送消息时,user1是接收不到的,消息会存储在相应对列中

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

5、切换用户user3

user1切换到user3,同时监控的队列由q_user1动态切换到q_user3,队列q_user3中的消息将被消费掉

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

6、再次向user1发消息

当前登录用户是user3,这时再向上一个登录用户user1发消息,消息应该不会被消费,存在q_user1队列中

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

7、备份交换机接收消息

  • 本案例中发送消息前,先动态创建队列,一般不会出现信息无法路由的情况,也就不会因为无法路由而启动备份交换机(除非一些极端的情况)

  • 当普通队列满的时候,再向其发送消息,最老的信息变为死信,进入死信交换机;本案例中把备份交换机当成死信交换机,备份队列当成死信队列。

  • user2连发10条信息,且user2不登录,队列q_user2容量为5,溢出后最老的信息进入备份交换机

在这里插入图片描述

在这里插入图片描述

  • 启动测试

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

  • 改变溢出规则;默认是丢弃最老(开头)的消息

在这里插入图片描述

在创建队列时加上overflow(QueueBuilder.Overflow.rejectPublish),改为拒绝接收新消息,此时生产者会收到拒绝接收的消息提示,缓存中的消息将不会被删除,而是定时重发;备份交换机也不会启用,拒绝的信息不会进入备份队列,不符合设计要求,所以使用默认的溢出规则。

七、不足

  • 每个用户对应一个普通队列;当用户过多时,相应的队列也会很多,并且队列是持久化的,会占用较多的系统资源;解决思路:定时删除空队列

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/147743.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

(3分钟了解)SLAM后端优化的四大金刚!g2o ceres gtsam SE-Sync

后端优化常用的库有g2o ceres gtsam 和 se-sync这篇博客首先介绍se-sync&#xff0c;然后比较四种库之间的差异。编辑切换为居中添加图片注释&#xff0c;不超过 140 字&#xff08;可选&#xff09;编辑切换为居中添加图片注释&#xff0c;不超过 140 字&#xff08;可选&…

Python实现检测图片中的人脸,将识别到的人脸向量存入数据库,并实现提交的图片的人脸信息与入库的人脸信息进行比对

facenet_face_regonistant 完整代码下载地址&#xff1a;Python实现检测图片中的人脸&#xff0c;将识别到的人脸向量存入数据库 利用facenet实现检测图片中的人脸&#xff0c;将识别到的人脸向量存入数据库&#xff0c;此外利用post提交一个新图片&#xff08;也可以提交一个…

【Qt】通过继承ui界面类的方式加载.ui转化的.h头文件显示窗体

【Qt】通过继承ui界面类的方式加载.ui转化的.h头文件显示窗体1、背景2、实例3、验证1、背景 将.ui文件转化为.h头文件参考如下博客&#xff1a; 【Qt】将QtDesigner生成的.ui文件转化为.h头文件 https://jn10010537.blog.csdn.net/article/details/128589666其中生成的ui_wid…

Web学习

Web概念JavaWeb&#xff1a;使用Java语言开发基于互联网小贷项目软件架构&#xff1a;B/S架构详解静态资源HTML举例标签学习文件标签<!--注释-->文本标签图片标签* img&#xff1a;展示图片* 属性&#xff1a;* src&#xff1a;指定图片的位置* 代码&#xff1a;<!--展…

你的PC机或者终端,是怎么上网的?怎么连接到网络的?无线网怎么接入Internet,如何访问网络资源?

你的PC机或者终端&#xff0c;是怎么上网的&#xff1f;怎么连接到网络的&#xff1f;无线网怎么接入Internet&#xff0c;如何访问网络资源&#xff1f; 2022找工作是学历、能力和运气的超强结合体&#xff0c;遇到寒冬&#xff0c;大厂不招人&#xff0c;可能很多算法学生都…

就该去造空调吗?

今天在知乎上收到一个网友的提问&#xff1a;大佬&#xff0c;我想咨询点人生问题&#xff0c;但是碍于没工作囊中羞涩&#xff0c;实在没钱问。希望能问一问人生选择问题 是这样的一个是奥克斯空调电控部&#xff0c;一个是大陆汽车电子重庆研发&#xff0c;奥克斯偏裸机&…

光耦合器:其类型和在DC/AC电路中的各种应用

光耦合器&#xff1a;其类型和在DC/AC电路中的各种应用 介绍 光耦合器是一种在两个隔离电路之间传输电信号的电子元件。光耦合器也称为光隔离器、光电耦合器或光隔离器。 光耦合器通常用于电路&#xff0c;尤其是低电压或高噪声敏感电路&#xff0c;用于隔离电路&#xff0c…

【C++常用算法】STL基础语法学习 | 排序算法

目录 ●sort ●random_shuffle ● merge ●reverse ●sort 1.功能描述&#xff1a; 对容器内元素进行排序 2.查看sort定义下底层代码的函数原型&#xff1a; 3.向vector容器中插入10个无序数&#xff0c;并且用sort排序法对其进行升序和降序&#xff08;内建仿函数greater<…

伦敦交通局在这里为您的无障碍旅程提供支持

伦敦交通局首席人事官特里西亚赖特&#xff08;Tricia Wright&#xff09;详细介绍了伦敦交通局努力使该市的交通网络更易于所有乘客使用的方式&#xff0c;并强调尽管已经做了很多工作&#xff0c;但这项工作只是一个开始。公共交通是伦敦人和游客在首都生活中必不可少的。它连…

c++写一个连接池

用c写一个数据库连接池 数据库连接池是为了提高数据库连接的性能&#xff0c;进行连接复用 对于复杂数据库进行大量引用的场景下就会出现访问瓶颈 常见的两种解决方法就是&#xff1a;为了减少磁盘 I/O的次数&#xff0c;在数据库和服务器的应用中间加一层 缓存数据库&#…

2023/1/7 Vue学习笔记-4-组件的理解

1 对组件的理解 模块与组件、模块化与组件化&#xff1a; 1.模块&#xff1a; &#xff08;1&#xff09;理解&#xff1a;向外提供特定功能的js程序&#xff0c;一般就是一个js文件 &#xff08;2&#xff09;为什么&#xff1a;js文件很多很复杂 &#xff08;3&#xff09;作…

元编程:constexpr +特例化 判断质数

重点&#xff1a; 1.constexpr 函数支持在编译期间完成计算 2.特例化是模板中一种定义 using namespace std;//编译期进行判断 constexpr bool isPrime(unsigned int p) {for (unsigned int d2;d<p/2;d){if (p % d 0){return false;}}return p > 1; }template<int…

【安全硬件】Chap.6 芯片生产猜疑链与SoC设计流程;可能会存在的安全威胁Untrusted IC Supply Chain Threats

【安全硬件】Chap.6 芯片生产猜疑链与SoC设计流程&#xff1b;可能会存在的安全威胁Untrusted IC Supply Chain Threats背景1. IC和半导体产业的全球化2. 芯片生产猜疑链——Untrusted IC Supply Chain Threats可能会存在的安全威胁3. SoC Design Flow主要参考来源背景 在现代 …

专属 Python 开发者的完美终端工具

目录 前言 1.Rich 兼容性 2.Rich 安装说明 3.Rich 的 Print 功能 4.自定义 Console 控制台输出 5.Console 控制台记录 6.表情符号 7.表格 8.进度条 9.按列输出数据 10.Markdown 11.语法突出显示 12.错误回溯(traceback) 前言 今天给大家推荐一个非常精美的终端工…

数学建模——线性规划

目录 一. 线性规划 1.基本概念 线性规划的标准形式为&#xff1a; 线性规划的解&#xff1a; 线性规划三要素&#xff1a; 灵敏度分析&#xff1a; 2.matlab的实现 二. 整形规划 1.整型规划分类 2.基础模型 2.1 非线性约束条件的线性化 3.模型求解 一.钢管下料问题…

【模板初阶】

目录 1. 泛型编程 2. 函数模板 2.1 函数模板概念 2.2 函数模板格式 2.3 函数模板的原理 2.4 函数模板的实例化 2.4.1 隐式实例化 2.4.2 显式实例化 2.5 模板参数的匹配原则 3. 类模板 3.1 类模板的定义格式 3.2 类模板的实例化 4 总结 1. 泛型编程 如何实现一个通用的…

【Linux】简单理解静态库(.a)和动态库(.so)

在程序运行的基础原理这篇文章中&#xff0c;最后的代码进行链接过程&#xff0c;我们提到了动态库和静态库的概念。那么什么是动态库和静态库呢&#xff1f;我们来简单理解一下 静态库和动态库1.静态库1.1 静态链接优点1.2 静态链接缺点2.动态库2.1 动态链接的优点2.2 动态链接…

Ae 案例:制作漏光效果

本文介绍使用 Ae 内置效果插件制作漏光效果 Light Leak的一般方法与步骤。效果视频1、新建合成。持续时间&#xff1a;10 秒。2、新建纯色图层&#xff0c;命名为“漏光效果”&#xff0c;然后添加分形杂色 Fractal Noise效果。调整出如云朵一般柔和且层次多的分形杂色图。分形…

TCP中的状态转移(三种情况)

文章目录前言一、 TCP的生命周期二、另外两种挥手情况三、经典四问总结前言 博主个人社区&#xff1a;开发与算法学习社区 博主个人主页&#xff1a;Killing Vibe的博客 欢迎大家加入&#xff0c;一起交流学习~~ 在正常情况下&#xff0c;TCP要经过三次握手建立连接&#xff0c…

部署Web项目 (Linux)

部署Web项目 -- Linux一、Linux 环境搭建二、Linux 常用命令三、搭建 Java 部署环境3.1 JDK3.2 Tomcat3.3 MySQL四、部署 Web 项目4.1 什么是部署4.2 数据库建表4.3 构建项目并打包4.4 拷贝到 Tomcat 中4.5 验证一、Linux 环境搭建 这里我们使用的方法是购买云服务器 (CentOS …