RabbitMq应用

news2024/11/24 17:16:38

1.RabbitMQ介绍

1.1现存问题

服务调用:两个服务调用时,我们可以通过传统的HTTP方式,让服务A直接去调用服务B的接口,但是这种方式是同步的方式,虽然可以采用SpringBoot提供的@Async注解实现异步调用,但是这种方式无法确保请求一定回访问到服务B的接口。[那如何保证服务A的请求信息一定能送达到服务B去完成一些业务操作呢?如何实现异步调用

海量请求:在我们在做一些秒杀业务时,可能会在某个时间点突然出现大量的并发请求,这可能已经远远超过服务器的并发瓶颈,这时我们需要做一些削峰的操作,也就是将大量的请求缓冲到一个队列中,然后慢慢的消费掉。[如何提供一个可以存储千万级别请求的队列呢?

 在微服务架构下,可能一个业务会出现同时调用多个其他服务的场景,而且这些服务之间一般会用到Feign的方式进行轻量级的通讯,如果存在一个业务,用户创建订单成功后,还需要去给用户添加积分、通知商家、通知物流系统、扣减商品库存,而在执行这个操作时,如果任意一个服务出现了问题,都会导致整体的下单业务失败,并且会导致给用户反馈的时间延长。这时就造成了服务之间存在一个较高的耦合性的问题。[如何可以降低服务之间的耦合性呢?

 1.2 处理问题

RabbitMQ就可以解决上述的全部问题

服务之间如何想实现可靠的异步调用,可以通过RabbitMQ的方式实现,服务A只需要保证可以把消息发送到RabbitMQ的队列中,服务B就一定会消费到队列中的消息只不过会存在一定的延时。异步访问

 忽然的海量请求可以存储在RabbitMQ的队列中,然后由消费者慢慢消费掉,RabbitMQ的队列本身就可以存储上千万条消息

 在调用其他服务时,如果允许延迟效果的出现,可以将消息发送到RabbitMQ中,再由消费者慢慢消费

2. RabbitMQ介绍

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用[Erlang](https://baike.baidu.com/item/Erlang)语言编写的,而集群和故障转移是构建在[开放电信平台](https://baike.baidu.com/item/开放电信平台)框架上的。所有主要的[编程语言](https://baike.baidu.com/item/编程语言/9845131)均有与代理接口通讯的[客户端](https://baike.baidu.com/item/客户端/101081)库。

首先RabbitMQ基于AMQP协议开发,所以很多基于AMQP协议的功能RabbitMQ都是支持的,比如SpringCloud中的消息总线bus

其次RabbitMQ是基于Erlang编写,这是也是RabbitMQ天生的优势,Erlang被称为面向并发编程的语言,并发能力极强,在众多的MQ中,RabbitMQ的延迟特别低,在微秒级别,所以一般的业务处理RabbitMQ比Kafka和RocketMQ更有优势。

最后RabbitMQ提供自带了图形化界面,操作方便,还自带了多种集群模式,可以保证RabbitMQ的高可用,并且SpringBoot默认就整合RabbitMQ,使用简单方便。

3.RabbitMQ构架

RabbitMQ的架构可以查看官方地址:https://rabbitmq.com/tutorials/amqp-concepts.html

可以看出RabbitMQ中主要分为三个角色:

- Publisher:消息的发布者,将消息发布到RabbitMQ中的Exchange
- RabbitMQ服务:Exchange接收Publisher的消息,并且根据Routes策略将消息转发到Queue中
- Consumer:消息的消费者,监听Queue中的消息并进行消费

官方提供的架构图相对简洁,我们可以自己画一份相对完整一些的架构图:

可以看出Publisher和Consumer都是单独和RabbitMQ服务中某一个Virtual Host建立Connection的客户端

后续通过Connection可以构建Channel通道,用来发布、接收消息

一个Virtual Host中可以有多个Exchange和Queue,Exchange可以同时绑定多个Queue

在基于架构图查看图形化界面,会更加清晰

4.RabbitMQ通讯方式

4.1 RabbitMQ提供的通讯方式

- [Hello World!](https://rabbitmq.com/tutorials/tutorial-one-python.html):为了入门操作!
- [Work queues](https://rabbitmq.com/tutorials/tutorial-two-python.html):一个队列被多个消费者消费
- [Publish/Subscribe](https://rabbitmq.com/tutorials/tutorial-three-python.html):手动创建Exchange(FANOUT)
- [Routing](https://rabbitmq.com/tutorials/tutorial-four-python.html):手动创建Exchange(DIRECT)
- [Topics](https://rabbitmq.com/tutorials/tutorial-five-python.html):手动创建Exchange(TOPIC)
- [RPC](https://rabbitmq.com/tutorials/tutorial-six-python.html):RPC方式
- [Publisher Confirms](https://rabbitmq.com/tutorials/tutorial-seven-java.html):保证消息可靠性

 5.构建Connection工具类

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.9.0</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
</dependencies>

 5.1构建工具类:

package com.mashibing.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author zjw
 * @description
 */
public class RabbitMQConnectionUtil {

    public static final String RABBITMQ_HOST = "192.168.11.32";

    public static final int RABBITMQ_PORT = 5672;

    public static final String RABBITMQ_USERNAME = "guest";

    public static final String RABBITMQ_PASSWORD = "guest";

    public static final String RABBITMQ_VIRTUAL_HOST = "/";

    /**
     * 构建RabbitMQ的连接对象
     * @return
     */
    public static Connection getConnection() throws Exception {
        //1. 创建Connection工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2. 设置RabbitMQ的连接信息
        factory.setHost(RABBITMQ_HOST);
        factory.setPort(RABBITMQ_PORT);
        factory.setUsername(RABBITMQ_USERNAME);
        factory.setPassword(RABBITMQ_PASSWORD);
        factory.setVirtualHost(RABBITMQ_VIRTUAL_HOST);

        //3. 返回连接对象
        Connection connection = factory.newConnection();
        return connection;
    }

}

5.2采用通信方式

5.3 构建生产者:

package com.mashibing.helloworld;

import com.mashibing.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;

/**
 * @author zjw
 * @description
 * @date 2022/1/24 22:54
 */
public class Publisher {

    public static final String QUEUE_NAME = "hello";

    @Test
    public void publish() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();

        //2. 构建Channel
        Channel channel = connection.createChannel();

        //3. 构建队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //4. 发布消息
        String message = "Hello World!";
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("消息发送成功!");
    }
}

 5.4消费者:

package com.mashibing.helloworld;

import com.mashibing.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.*;
import org.junit.Test;

import java.io.IOException;

/**
 * @author zjw
 * @description
 * @date 2022/1/24 23:02
 */
public class Consumer {

    @Test
    public void consume() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();

        //2. 构建Channel
        Channel channel = connection.createChannel();

        //3. 构建队列 //要与publisher的队列保持一直,生产了什么消息就要消费相同的消息否则会报错
        channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);

        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者获取到消息:" + new String(body,"UTF-8"));
            }
        };
        channel.basicConsume(Publisher.QUEUE_NAME,true,callback);
        System.out.println("开始监听队列");

        System.in.read();
    }
}

6. Work Queues多个consumer消费

 - 生产者:生产者和Hello World的形式是一样的,都是将消息推送到默认交换机。
- 消费者:让消费者关闭自动ack,并且设置消息的流控,最终实现消费者可以尽可能去多消费消息

 6.1构建两个consumer

package com.mashibing.workqueues;

import com.mashibing.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.*;
import org.junit.Test;

import java.io.IOException;

/**
 * @author zjw
 * @description
 * @date 2022/1/25 19:52
 */
public class Consumer {

    @Test
    public void consume1() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();

        //2. 构建Channel
        Channel channel = connection.createChannel();

        //3. 构建队列
        channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);

        //3.5 设置消息的流控消费者一次拿几个消息
        channel.basicQos(3);

        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者1号-获取到消息:" + new String(body,"UTF-8"));
				//手动设置Ack
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(Publisher.QUEUE_NAME,false,callback);//false表示手动设置Ack,true表示自动设置
        System.out.println("开始监听队列");

        System.in.read();
    }

    @Test
    public void consume2() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();

        //2. 构建Channel
        Channel channel = connection.createChannel();

        //3. 构建队列
        channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);

        channel.basicQos(3);

        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者2号-获取到消息:" + new String(body,"UTF-8"));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
、、
        channel.basicConsume(Publisher.QUEUE_NAME,false,callback);
        System.out.println("开始监听队列");

        System.in.read();
    }
}

6.2构建消息发送者 

package com.msb.util.mes;

import com.msb.util.RabbitMQConnectionutil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;

import java.nio.charset.StandardCharsets;

public class Publisher {
    /***
     Connection connection = RabbitMQConnectionutil.getConnection();
     Channel channel = connection.createChannel();
     channel.queueDeclare(HELLO,false,false,false,null);
     String message = "HelloWord";
     channel.basicPublish("",HELLO,null,message.getBytes());
     System.out.println("消息发送成功");
     System.in.read();
     */
    public static final String WORK="work";

    @Test
    public void publish() throws Exception{
        Connection connection = RabbitMQConnectionutil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(WORK,false,false,false,null);
        for (int i = 0; i < 10; i++) {
            String ms1 = "Hello World"+i;
            channel.basicPublish("",WORK,null,ms1.getBytes(StandardCharsets.UTF_8));
        }

        System.out.println("消息发送成功");
        System.in.read();
    }
}

7. Publish/Subscribe

 生产者:自行构建Exchange并绑定指定队列[(FANOUT类型)]

package com.mashibing.pubsub;

import com.mashibing.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;

/**
 * @author zjw
 * @description
 * @date 2022/1/25 20:08
 */
public class Publisher {

    public static final String EXCHANGE_NAME = "pubsub";
    public static final String QUEUE_NAME1 = "pubsub-one";
    public static final String QUEUE_NAME2 = "pubsub-two";
    @Test
    public void publish() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();

        //2. 构建Channel
        Channel channel = connection.createChannel();

        //3. 构建交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        //4. 构建队列
        channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
        channel.queueDeclare(QUEUE_NAME2,false,false,false,null);

        //5. 绑定交换机和队列,使用的是FANOUT类型的交换机,绑定方式是直接绑定
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"");

        //6. 发消息到交换机
        channel.basicPublish(EXCHANGE_NAME,"45jk6h645jk",null,"publish/subscribe!".getBytes());
        System.out.println("消息成功发送!");
    }
}

8. Routing

 生产者:在绑定Exchange和Queue时,需要指定好routingKey,同时在发送消息时,也指定routingKey,只有routingKey一致时,才会把指定的消息路由到指定的Queue

package com.mashibing.routing;

import com.mashibing.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;

/**
 * @author zjw
 * @description
 * @date 2022/1/25 20:20
 */
public class Publisher {

    public static final String EXCHANGE_NAME = "routing";
    public static final String QUEUE_NAME1 = "routing-one";
    public static final String QUEUE_NAME2 = "routing-two";
    @Test
    public void publish() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();

        //2. 构建Channel
        Channel channel = connection.createChannel();

        //3. 构建交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //4. 构建队列
        channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
        channel.queueDeclare(QUEUE_NAME2,false,false,false,null);

        //5. 绑定交换机和队列
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"ORANGE");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"BLACK");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"GREEN");

        //6. 发消息到交换机
        channel.basicPublish(EXCHANGE_NAME,"ORANGE",null,"大橙子!".getBytes());
        channel.basicPublish(EXCHANGE_NAME,"BLACK",null,"黑布林大狸子".getBytes());
        channel.basicPublish(EXCHANGE_NAME,"WHITE",null,"小白兔!".getBytes());
        System.out.println("消息成功发送!");


    }

}

 生产者:TOPIC类型可以编写带有特殊意义的routingKey的绑定方式

package com.mashibing.topics;

import com.mashibing.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;

/**
 * @author zjw
 * @description
 * @date 2022/1/25 20:28
 */
public class Publisher {

    public static final String EXCHANGE_NAME = "topic";
    public static final String QUEUE_NAME1 = "topic-one";
    public static final String QUEUE_NAME2 = "topic-two";
    @Test
    public void publish() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();

        //2. 构建Channel
        Channel channel = connection.createChannel();

        //3. 构建交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        //4. 构建队列
        channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
        channel.queueDeclare(QUEUE_NAME2,false,false,false,null);

        //5. 绑定交换机和队列,
        // TOPIC类型的交换机在和队列绑定时,需要以aaa.bbb.ccc..方式编写routingkey
        // 其中有两个特殊字符:*(相当于占位符),#(相当通配符)
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"*.orange.*");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"*.*.rabbit");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"lazy.#");

        //6. 发消息到交换机
        channel.basicPublish(EXCHANGE_NAME,"big.orange.rabbit",null,"大橙兔子!".getBytes());
        channel.basicPublish(EXCHANGE_NAME,"small.white.rabbit",null,"小白兔".getBytes());
        channel.basicPublish(EXCHANGE_NAME,"lazy.dog.dog.dog.dog.dog.dog",null,"懒狗狗狗狗狗狗".getBytes());
        System.out.println("消息成功发送!");

    }
}

9.SpringBoot操作RabbitMQ

9.1 SpringBoot声明信息

- 创建项目
- 导入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
  rabbitmq:
    host: 192.168.11.32
    port: 5672
    username: guest
    password: guest
    virtual-host: /

9.2声明交换机&队列

package com.mashibing.rabbitmqboot.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author zjw
 * @description
 * @date 2022/2/8 20:25
 */
@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE = "boot-exchange";
    public static final String QUEUE = "boot-queue";
    public static final String ROUTING_KEY = "*.black.*";


    @Bean
    public Exchange bootExchange(){
        // channel.DeclareExchange
        return ExchangeBuilder.topicExchange(EXCHANGE).build();
    }

    @Bean
    public Queue bootQueue(){
        return QueueBuilder.durable(QUEUE).build();
    }

    @Bean
    public Binding bootBinding(Exchange bootExchange,Queue bootQueue){
        return BindingBuilder.bind(bootQueue).to(bootExchange).with(ROUTING_KEY).noargs();
    }
}

9.3生产者操作 

package com.mashibing.rabbitmqboot;

import com.mashibing.rabbitmqboot.config.RabbitMQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

/**
 * @author zjw
 * @description
 * @date 2022/2/8 21:05
 */
@SpringBootTest
public class PublisherTest {

    @Autowired
    public RabbitTemplate rabbitTemplate;

    @Test
    public void publish(){
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");
        System.out.println("消息发送成功");
    }


    @Test
    public void publishWithProps(){
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "messageWithProps", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setCorrelationId("123");
                return message;
            }
        });
        System.out.println("消息发送成功");
    }
}

9.4 消费者操作

package com.mashibing.rabbitmqboot;

import com.mashibing.rabbitmqboot.config.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author zjw
 * @description
 * @date 2022/2/8 21:11
 */
@Component
public class ConsumeListener {

    @RabbitListener(queues = RabbitMQConfig.QUEUE)
    public void consume(String msg, Channel channel, Message message) throws IOException {
        System.out.println("队列的消息为:" + msg);
        String correlationId = message.getMessageProperties().getCorrelationId();
        System.out.println("唯一标识为:" + correlationId);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
}

10. RabbitMQ保证消息可靠性(原生api)

 产生消息丢失的地方:

1.生产者发消息的时候网络不稳定

2.Exchange不会持久化消息,断电后就会丢失

3.Queue默认是不会持久化消息(只保证队列还在)

4.消费者没有消费Ack模式需要开启

10.1 保证消息一定送达到Exchange

Confirm机制

可以通过Confirm效果保证消息一定送达到Exchange,官方提供了三种方式,选择了对于效率影响最低的异步回调的效果

//4. 开启confirms
channel.confirmSelect();

//5. 设置confirms的异步回调
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("消息成功的发送到Exchange!");
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("消息没有发送到Exchange,尝试重试,或者保存到数据库做其他补偿操作!");
    }
});

10.2保证消息可以路由到Queue

Return机制

为了保证Exchange上的消息一定可以送达到Queue

//6. 设置Return回调,确认消息是否路由到了Queue
channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("消息没有路由到指定队列,做其他的补偿措施!!");
    }
});
//7. 在发送消息时,将basicPublish方法参数中的mandatory设置为true,即可开启Return机制,当消息没有路由到队列中时,就会执行return回调

 true只能保证重启后的队列还在但是消息不会存在; 解决方法如下:

10.3保证Queue可以持久化消息

DeliveryMode设置消息持久化

DeliveryMode设置为2代表持久化,如果设置为1,就代表不会持久化。

//7. 设置消息持久化
AMQP.BasicProperties props = new AMQP.BasicProperties()
    .builder()
    .deliveryMode(2)
    .build();

//7. 发布消息
channel.basicPublish("","confirms",true,props,message.getBytes());

10.4 保证消费者可以正常消费消息

`详情看WorkQueue模式`

11.pringBoot( RabbitMQ保证消息可靠性)

11.1 Confirm

- 编写配置文件开启Confirm机制

spring:
  rabbitmq:
    publisher-confirm-type: correlated  # 新版本
    publisher-confirms: true  # 老版本 

在发送消息时,配置RabbitTemplate

@Test
public void publishWithConfirms() throws IOException {
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if(ack){
                System.out.println("消息已经送达到交换机!!");
            }else{
                System.out.println("消息没有送达到Exchange,需要做一些补偿操作!!retry!!!");
            }
        }
    });
    rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");
    System.out.println("消息发送成功");

    System.in.read();
}

11.2Return

- 编写配置文件开启Return机制

spring:
  rabbitmq:
    publisher-returns: true # 开启Return机制

 在发送消息时,配置RabbitTemplate

@Test
public void publishWithReturn() throws IOException {
    // 新版本用 setReturnsCallback ,老版本用setReturnCallback
    rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
        @Override
        public void returnedMessage(ReturnedMessage returned) {
            String msg = new String(returned.getMessage().getBody());
            System.out.println("消息:" + msg + "路由队列失败!!做补救操作!!");
        }
    });
    rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");
    System.out.println("消息发送成功");

    System.in.read();
}

11.3消息持久化 

@Test
public void publishWithBasicProperties() throws IOException {
    rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "message", new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 设置消息的持久化!
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            return message;
        }
    });
    System.out.println("消息发送成功");
}

 12.RabbitMQ死信队列&延迟交换机

12.1什么是死信

死信队列的应用:

- 基于死信队列在队列消息已满的情况下,消息也不会丢失
- 实现延迟消费的效果。比如:下订单时,有15分钟的付款时间

12.2实现死信队列

12.3 准备Exchange&Queue

package com.mashibing.rabbitmqboot.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author zjw
 * @description
 * @date 2022/2/10 15:04
 */
@Configuration
public class DeadLetterConfig {

    public static final String NORMAL_EXCHANGE = "normal-exchange";
    public static final String NORMAL_QUEUE = "normal-queue";
    public static final String NORMAL_ROUTING_KEY = "normal.#";

    public static final String DEAD_EXCHANGE = "dead-exchange";
    public static final String DEAD_QUEUE = "dead-queue";
    public static final String DEAD_ROUTING_KEY = "dead.#";


    @Bean
    public Exchange normalExchange(){
        return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build();
    }

    @Bean
    public Queue normalQueue(){
        return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.abc").build();
    }

    @Bean
    public Binding normalBinding(Queue normalQueue,Exchange normalExchange){
        return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs();
    }


    @Bean
    public Exchange deadExchange(){
        return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build();
    }

    @Bean
    public Queue deadQueue(){
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }

    @Bean
    public Binding deadBinding(Queue deadQueue,Exchange deadExchange){
        return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
    }

}

12.4实现效果

- 基于消费者进行reject或者nack实现死信效果

package com.mashibing.rabbitmqboot;

import com.mashibing.rabbitmqboot.config.DeadLetterConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author zjw
 * @description
 * @date 2022/2/10 15:17
 */
@Component
public class DeadListener {

    @RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE)
    public void consume(String msg, Channel channel, Message message) throws IOException {
        System.out.println("接收到normal队列的消息:" + msg);
        channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
        channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
    }
}
@Test
public void publishExpire(){
    String msg = "dead letter expire";
    rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE, "normal.abc", msg, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setExpiration("5000");
            return message;
        }
    });
}

 给队列设置消息的生存时间

@Bean
public Queue normalQueue(){
    return QueueBuilder.durable(NORMAL_QUEUE)
            .deadLetterExchange(DEAD_EXCHANGE)
            .deadLetterRoutingKey("dead.abc")
            .ttl(10000)
            .build();
}

 设置Queue中的消息最大长度

@Bean
public Queue normalQueue(){
    return QueueBuilder.durable(NORMAL_QUEUE)
            .deadLetterExchange(DEAD_EXCHANGE)
            .deadLetterRoutingKey("dead.abc")
            .maxLength(1)
            .build();
}

 只要Queue中已经有一个消息,如果再次发送一个消息,这个消息会变为死信!

13.延迟交换机

13.1设置死信的问题:

设置死信的生存时间时mq只会按顺序执行,mq只会监听最外侧的消息

如A消息设置了10s的生存时间,B消息设置了5s的生存时间,A先到达队列里面,那么MQ会先开始监听A,A的时间过去了,才会监听B,这时B的生存时间已经过去了,这就会导致B的消息也存活了10s

下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9

死信队列实现延迟消费时,如果延迟时间比较复杂,比较多,直接使用死信队列时,需要创建大量的队列还对应不同的时间,可以采用延迟交换机来解决这个问题。

安装延时交换机

1.docker cp rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez rabbitmq_docker_rabbitmq_1:/opt/rabbitmq/plugins

2.rabbitmq-plugins enable rabbitmq_delayed_message_exchang

3.docker restart rabbitmq_docker_rabbitmq_1

13.2构建配置

package com.mashibing.rabbitmqboot.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author zjw
 * @description
 */
@Configuration
public class DelayedConfig {

    public static final String DELAYED_EXCHANGE = "delayed-exchange";
    public static final String DELAYED_QUEUE = "delayed-queue";
    public static final String DELAYED_ROUTING_KEY = "delayed.#";

    @Bean
    public Exchange delayedExchange(){
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type","topic");
        Exchange exchange = new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,arguments);
        return exchange;
    }

    @Bean
    public Queue delayedQueue(){
        return QueueBuilder.durable(DELAYED_QUEUE).build();
    }

    @Bean
    public Binding delayedBinding(Queue delayedQueue,Exchange delayedExchange){
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

13.3构建消息发送者 

import com.mashibing.rabbitmqboot.config.DelayedConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

/**
 * @author zjw
 * @description
 */
@SpringBootTest
public class DelayedPublisherTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void publish(){
        rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE, "delayed.abc", "xxxx", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setDelay(30000);
                return message;
            }
        });
    }
}

 14.RabbitMQ的集群

RabbitMQ的镜像模式

14.1搭建RabbitMQ集群

- 准备两台虚拟机(克隆)
- 准备RabbitMQ的yml文件

  rabbitmq1:

version: '3.1'
services:
  rabbitmq1:
    image: rabbitmq:3.8.5-management-alpine
    container_name: rabbitmq1
    hostname: rabbitmq1
    extra_hosts:
      - "rabbitmq1:192.168.11.32"
      - "rabbitmq2:192.168.11.33"
    environment: 
      - RABBITMQ_ERLANG_COOKIE=SDJHFGDFFS
    ports:
      - 5672:5672
      - 15672:15672
      - 4369:4369
      - 25672:25672

rabbitmq2:

version: '3.1'
services:
  rabbitmq2:
    image: rabbitmq:3.8.5-management-alpine
    container_name: rabbitmq2
    hostname: rabbitmq2
    extra_hosts:
      - "rabbitmq1:192.168.11.32"
      - "rabbitmq2:192.168.11.33"
    environment: 
      - RABBITMQ_ERLANG_COOKIE=SDJHFGDFFS
    ports:
      - 5672:5672
      - 15672:15672
      - 4369:4369
      - 25672:25672

准备完毕之后,启动两台RabbitMQ 

让RabbitMQ服务实现join操作

需要四个命令完成join操作

让rabbitmq2   join  rabbitmq1,需要进入到rabbitmq2的容器内部,去执行下述命令

rabbitmqctl stop_app
rabbitmqctl reset 
rabbitmqctl join_cluster rabbit@rabbitmq1
rabbitmqctl start_app

 执行成功后:

设置镜像模式

在指定的RabbitMQ服务中设置好镜像策略即可

 

15.Headers类型Exchange

headers就是一个基于key-value的方式,让Exchange和Queue绑定的到一起的一种规则

相比Topic形式,可以采用的类型更丰富。

15.1headers绑定方式

注:x-match为all时代表全匹配 只有name与age相同时才能到Queue1中

        x-match为any时代表满足一个条件就可以 进入到Queue2中

15.2构建config

package com.mashibing.headers;

import com.mashibing.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;

/**
 * @author zjw
 * @description
 */
public class Publisher {

    public static final String HEADER_EXCHANGE = "header_exchange";
    public static final String HEADER_QUEUE = "header_queue";


    @Test
    public void publish()throws  Exception{
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();

        //2. 构建Channel
        Channel channel = connection.createChannel();


        //3. 构建交换机和队列并基于header的方式绑定
        channel.exchangeDeclare(HEADER_EXCHANGE, BuiltinExchangeType.HEADERS);
        channel.queueDeclare(HEADER_QUEUE,true,false,false,null);
        Map<String,Object> args = new HashMap<>();
        // 多个header的key-value只要可以匹配上一个就可以
        // args.put("x-match","any");
        // 多个header的key-value要求全部匹配上!
        args.put("x-match","all");
        args.put("name","jack");
        args.put("age","23");
        channel.queueBind(HEADER_QUEUE,HEADER_EXCHANGE,"",args);

        //4. 发送消息
        String msg = "header测试消息!";
        Map<String, Object> headers = new HashMap<>();
        headers.put("name","jac");
        headers.put("age","2");
        AMQP.BasicProperties props = new AMQP.BasicProperties()
                .builder()
                .headers(headers)
                .build();

        channel.basicPublish(HEADER_EXCHANGE,"",props,msg.getBytes());

        System.out.println("发送消息成功,header = " + headers);

    }
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/86882.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring之AOP

谈起AOP就不得不说起代理&#xff0c;Java 源代码经过编译生成字节码&#xff0c;然后再由 JVM 经过类加载&#xff0c;连接&#xff0c;初始化成 Java 类型&#xff0c;可以看到字节码是关键&#xff0c;静态和动态的区别就在于字节码生成的时机 静态代理&#xff1a;由程序员…

BLE MESH中的Secure Network beacon包

作用&#xff1a;节点使用安全网络信标来识别子网及其安全状态。可以用来更新Key和Iv Index。 数据包结构&#xff1a; 数据包格式&#xff1a; 大小含义 Beacon Type 1安全网络信标&#xff08;0x01&#xff09; Flags1包含密钥刷新标志和IV更新标志 Network ID8包含网络ID的值…

代码随想录算法训练营第四天 | 24. 两两交换链表中的节点 19.删除链表的倒数第N个节点 面试题 02.07. 链表相交 142.环形链表II

今天是链表章节最后一天&#xff0c;加油&#x1f4aa; 24. 两两交换链表中的节点 题目&#xff1a;给你一个链表&#xff0c;两两交换其中相邻的节点&#xff0c;并返回交换后链表的头节点。你必须在不修改节点内部的值的情况下完成本题&#xff08;即&#xff0c;只能进行节…

html练习11:案例仿制

1.目标效果 2.布局效果 3.顶端部分制作效果 问题&#xff1a;img和p无法同时垂直居中显示&#xff0c;img会顶端对齐&#xff0c;p会底部对齐 解决方法&#xff1a;把img作为背景加入&#xff1b;用两个div分别做img和p的容器再进行格式调整 4.导航栏部分制作效果 要点&#…

保证项目如期上线,测试人能做些什么?

要保证项目按照正常进度发布&#xff0c;需要整个研发团队齐心协力。 有很多原因都可能会造成项目延期。 1、产品经理频繁修改需求 2、开发团队存在技术难题 3、测试团队测不完 今天我想跟大家聊一下&#xff0c;测试团队如何保证项目按期上线&#xff0c;以及在这个过程中可能…

词法分析程序

一、实验原理 1.1实验内容 通过本实验&#xff0c;应达到以下目标&#xff1a; 1.掌握从源程序文件中读取有效字符的方法和产生源程序的内部表示文件的方法。 2.掌握词法分析的实现方法。 3.上机调试编出的词法分析程序。 1.2实验内容 词法分析是作为相对独立的阶段来完成的…

C# 事件

一 C#中的事件 大致上&#xff1a;事件-----回调函数&#xff1b; 二 用户界面中的事件 ① 按钮点击事件 ② 基本的写法 this.button1.Clicknew System.EventHandler(this.button1_Click); private void button1_Click(object sender,EventHandler e) {this.label1.TextDat…

C++智能指针weak_ptr

C智能指针weak_ptr 学习路线&#xff1a;C智能指针shared_ptr->C智能指针unique_ptr->C智能指针weak_ptr 简介&#xff1a;本文讲解常用的智能指针的用法和原理&#xff0c;包括shared_ptr,unique_ptr,weak_ptr。 概述 weak_ptr设计的目的是为配合 shared_ptr 而引入…

静电场方程与边界面上的衔接条件 工程电磁学 P6

我们现在已经知道两个公式 我们可以得到微分形式 对于体密度&#xff0c;面密度&#xff0c;线密度&#xff0c;点电荷的理解 很多同学会问空间中为什么要有面密度&#xff0c;线密度的存在呢&#xff1f; 其实这个是模型的需要&#xff0c;因为介质不一定是连续的&#xff0…

如何设计一个高性能的图 Schema

本文整理自青藤云安全工程师——文洲在青藤云技术团队内部分享&#xff0c;分享视频参考&#xff1a;https://www.bilibili.com/video/BV1r64y1R72i 图数据库的性能和 schema 的设计息息相关&#xff0c;但是 NebulaGraph 官方本身对图 schema 的设计其实没有一个定论&#xff…

Codeforces Round #837 (Div. 2) C. Hossam and Trainees

Problem - C - Codeforces 翻译&#xff1a; 胡萨姆有&#x1d45b;名学员。他给&#x1d456;-th的学员分配了一个号码&#x1d44e;&#x1d456;。 一双&#x1d456;-th和&#x1d457;-th(&#x1d456;≠&#x1d457;)学员被称为成功的如果有一个整数&#x1d465;(&…

基于springboot的企业员工工资管理系统(财务系统)

项目描述 临近学期结束&#xff0c;还是毕业设计&#xff0c;你还在做java程序网络编程&#xff0c;期末作业&#xff0c;老师的作业要求觉得大了吗?不知道毕业设计该怎么办?网页功能的数量是否太多?没有合适的类型或系统?等等。这里根据疫情当下&#xff0c;你想解决的问…

Vue渲染器(二):挂载与更新

渲染器&#xff08;二&#xff09;&#xff1a;挂载与更新 前面介绍了渲染器的基本概念和整体框架&#xff0c;接下来就可以介绍渲染器的核心功能&#xff1a;挂载与更新。 1.挂载子节点和元素的属性&#xff1a; vnode.children的值为字符串类型时&#xff0c;会把它设置为…

019 | 在线电影娱乐网站系统设计含论文 | 大学生毕业设计 | 极致技术工厂

作为一个在线电影娱乐网站系统&#xff0c;它展示给浏览者的是各种电影信息&#xff0c;把这些信息能够按用户的需要友好的展示出来是很重要的&#xff0c;同时&#xff0c;能够实现对这些信息的有条不紊的管理也是不可以忽视的。对浏览者和会员的功能而言叫做前台实现&#xf…

[附源码]Node.js计算机毕业设计电子购物商城Express

项目运行 环境配置&#xff1a; Node.js最新版 Vscode Mysql5.7 HBuilderXNavicat11Vue。 项目技术&#xff1a; Express框架 Node.js Vue 等等组成&#xff0c;B/S模式 Vscode管理前后端分离等等。 环境需要 1.运行环境&#xff1a;最好是Nodejs最新版&#xff0c;我…

[附源码]计算机毕业设计电商小程序Springboot程序

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; Springboot mybatis MavenVue等等组成&#xff0c;B/S模式…

【GRU时序预测】基于卷积神经网络结合门控循环单元CNN-GRU实现时间序列预测附matlab代码

✅作者简介&#xff1a;热爱科研的Matlab仿真开发者&#xff0c;修心和技术同步精进&#xff0c;matlab项目合作可私信。 &#x1f34e;个人主页&#xff1a;Matlab科研工作室 &#x1f34a;个人信条&#xff1a;格物致知。 更多Matlab仿真内容点击&#x1f447; 智能优化算法 …

代码随想录训练营第49天|LeetCode 121. 买卖股票的最佳时机、122.买卖股票的最佳时机II

参考 代码随想录 题目一&#xff1a;LeetCode 121. 买卖股票的最佳时机 注意这个题只买卖一次&#xff01;&#xff01; 贪心 class Solution { public:int maxProfit(vector<int>& prices) {int low INT_MAX;int result 0;for(int i 0; i < prices.size(…

Redis框架(十):大众点评项目 订单功能 Redis实现全局唯一ID、 秒杀基本环境

大众点评项目 订单功能 秒杀基本环境需求&#xff1a;订单功能 秒杀基本环境Redis实现全局唯一ID业务实现代码总览总结SpringCloud章节复习已经过去&#xff0c;新的章节Redis开始了&#xff0c;这个章节中将会回顾Redis实战项目 大众点评 主要依照以下几个原则 基础实战的Dem…

揭秘!全球2022年Salesforce不同招聘职位的平均薪资

Salesforce可以说是发展最快的企业软件公司。此外&#xff0c;还一直被评选为全球最佳工作场所之一&#xff0c;2021年赢得了Glassdoor评选的最佳工作场所&#xff0c;并且在《财富》杂志的100家最佳工作公司中排名第四。除了非常重视员工福利&#xff0c;强调工作与生活的平衡…