RabbitMQ高级特性

news2024/12/23 2:00:37

RabbitMQ高级特性

消息可靠性投递
Consumer ACK
消费端限流
TTL
死信队列
延迟队列
日志与监控
消息可靠性分析与追踪
管理

消息可靠性投递

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
confirm 确认模式
return 退回模式

rabbitmq 整个消息投递的路径为:
producer—>rabbitmq broker—>exchange—>queue—>consumer
消息从 producer 到 exchange 则会返回一个 confirmCallback 。
消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。
我们将利用这两个 callback 控制消息的可靠性投递

  1. 设置ConnectionFactory的publisher-confirms=“true” 开启 确认模式。
  2. 使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。
  3. 设置ConnectionFactory的publisher-returns=“true” 开启 退回模式。
  4. 使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。
  • 在RabbitMQ中也提供了事务机制,但是性能较差,此处不做讲解。使用channel下列方法,完成事务控制:

     	txSelect(), 用于将当前channel设置成transaction模式
     	txCommit(),用于提交事务
     	txRollback(),用于回滚事务
    

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.itheima</groupId>
    <artifactId>rabbitmq-producer-spring</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.1.8.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
rabbitmq.host=localhost
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=/
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--加载配置文件-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <!-- 定义rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"
                               publisher-confirms="true"
                               publisher-returns="true"
    />
    <!--定义管理交换机、队列-->
    <rabbit:admin connection-factory="connectionFactory"/>

    <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>



    <!--消息可靠性投递(生产端)-->
    <rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue>
    <rabbit:direct-exchange name="test_exchange_confirm">
        <rabbit:bindings>
            <rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>
</beans>
package com.itheima;


import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class
ProducerTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    /**
     * 确认模式:
     * 步骤:
     * 1. 确认模式开启:ConnectionFactory中开启publisher-confirms="true"
     * 2. 在rabbitTemplate定义ConfirmCallBack回调函数
     */
    @Test
    public void testConfirm() throws InterruptedException {

        //2. 定义回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * @param correlationData 相关配置信息
             * @param ack             exchange交换机 是否成功收到了消息。true 成功,false代表失败
             * @param cause           失败原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("confirm方法被执行了....");

                if (ack) {
                    //接收成功
                    System.out.println("接收成功消息" + cause);
                } else {
                    //接收失败
                    System.out.println("接收失败消息" + cause);
                    //做一些处理,让消息再次发送。
                }
            }
        });

        //3. 发送消息
        rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
        Thread.sleep(200);
    }

    /**
     * 回退模式:当消息发送给Exchange后,Exchange路由到Queue是啊比是 才会执行ReturnCallBack
     * 步骤:
     *      开启回退模式
     *      设置ReturnCallBack  publisher-returns="true"
     *      设置Exchange处理消息的模式
     *          1 消息没有路由到Queue,则丢弃消息 默认
     *          2 消息没有路由到Queue,返回消息发送方ReturnCallBack
     */
    @Test
    public void testReturn() throws InterruptedException {

        /// 设置交换机处理失败的模式 失败执行回调
        rabbitTemplate.setMandatory(true);

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             *
             * @param message 发送消息对象
             * @param replyCode 失败的错误码
             * @param replyText 错误信息
             * @param exchange 交换机
             * @param routingKey 路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("return 执行了");
                /**
                 * (Body:'message confirm....' MessageProperties [headers={}, contentType=text/plain,
                 * contentEncoding=UTF-8, contentLength=0,
                 * receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
                 * 312
                 * NO_ROUTE
                 * test_exchange_confirm
                 * confirm111
                 */
                System.out.println(message);
                System.out.println(replyCode);
                System.out.println(replyText);
                System.out.println(exchange);
                System.out.println(routingKey);
                //处理
            }
        });

        //3. 发送消息
        rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm111", "message confirm....");
        Thread.sleep(200);
    }

}

Consumer Ack

ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。
有三种确认方式:
自动确认:acknowledge=“none”
手动确认:acknowledge=“manual”
根据异常情况确认:acknowledge=“auto”,(这种方式使用麻烦,不作讲解)

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

  1. 在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认
  2. 如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息
  3. 如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。
package com.itheima.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

/**
 * Consumer ACK 机制
 *  1 默认设置手动签收 acknowledge="manual"
 *  2 让监听器类ChannelAwareMessageListener接口 不要实现这个MessageListener
 *  3 如果消息成功处理 则调用channeld的basicAck签收
 *  4 如果消息处理失败,则调用channel的basicNack拒绝签收,broker重新发送consumer
 */

@Component
public class AckListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try{
            System.out.println(new String(message.getBody()));
            // 处理业务
            System.out.println("处理业务逻辑");
            //int i =3/0; //出错

            // 手动签收
            channel.basicAck(deliveryTag, true);
        }catch (Exception e) {
            /**
             * 拒绝签收 第三个参数 requeue重回队列 如果设置为true 则消息重回到queue broker会重新发送该消息给消费端
             */
            channel.basicNack(deliveryTag,true,true);
            //channel.basicReject(deliveryTag,true);
        }
    }
}
package com.itheima.test;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {

    @Test
    public void test(){
        while (true) {

        }
    }
}
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>

<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                           port="${rabbitmq.port}"
                           username="${rabbitmq.username}"
                           password="${rabbitmq.password}"
                           virtual-host="${rabbitmq.virtual-host}"/>

<!--包扫描-->   <!--定义监听器-->
<context:component-scan base-package="com.itheima.listener" />
<!--定义监听器-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
    <rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener>
</rabbit:listener-container>

持久化
exchange要持久化
queue要持久化
message要持久化
生产方确认Confirm
消费方确认Ack
Broker高可用

1.3 消费端限流

在这里插入图片描述

package com.itheima.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

/**
 * Consumer 限流机制
 * 1 确保ack机制为手动确认
 * 2 listener-container配置熟悉
 * perfetch=1 表示消费端每次从mq拉取一条消息来消费,直到手动确认消费完毕后,才会继续拉取下一条消息
 */

@Component
public class QosListener implements ChannelAwareMessageListener {


    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        Thread.sleep(1000);
        /// 获取消息
        System.out.println(new String(message.getBody()));
        // 处理逻辑

        //签收 肯应应答,处理完消息之后提醒RabbitMQ可以删除当前队列,deliveryTag:当前队列中选中的消息;multiple:是否批量应答
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }
}
    <!--定义监听器-->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<!--        <rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener>-->
        <rabbit:listener ref="qosListener" queue-names="test_queue_confirm"></rabbit:listener>
    </rabbit:listener-container>

测试

    @Test
    public void test(){
        while (true) {

        }
    }
}

生产者发送消息

   @Test
    public void testSend() throws InterruptedException {

        for (int i = 0; i <10 ; i++) {
            //3. 发送消息
            rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
            //Thread.sleep(200);
        }

    }

在rabbit:listener-container 中配置 prefetch属性设置消费端一次拉取多少消息

消费端的确认模式一定为手动确认。acknowledge=“manual”

1.4 TTL

TTL 全称 Time To Live(存活时间/过期时间)。

当消息到达存活时间后,还没有被消费,会被自动清除。

RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。
在这里插入图片描述

/**
 *
 * ttl:过期时间
 *  1 队列统一过期
 *  2 消息单独过期
 *
 *  如果设置消息的过期时间,也设置了队列的过期时间,它以时间短的为准
 *  队列过期后,会将队列所有消息全部移除
 *  消息过期后,只有消息在队列顶端,才会去判断其是否过期(移除掉)
 */
@Test
public void testTLL() throws InterruptedException {

    队列统一过期
    //for (int i = 0; i <10 ; i++) {
    //    //3. 发送消息
    //    rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message tll....");
    //    //Thread.sleep(200);
    //}


    ///消息后处理对象,设置一些消息的参数信息
    MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            //1 设置message的信息
            message.getMessageProperties().setExpiration("5000");//消息的过期时间
            //2 返回该信息
            return message;
        }
    };
    ///消息单独过期
    //rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message tll....", messagePostProcessor);
    for (int i = 0; i < 10; i++) {
        if(i==5){
            rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message tll....", messagePostProcessor);
        }else{
            //不过期消息
            rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message tll....");
        }

    }

}
  <!--ttl-->
    <rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
        <!--设置queue的参数-->
        <rabbit:queue-arguments>
            <!--x-message-ttl指队列的过期时间-->
            <entry key="x-message-ttl" value="100000" value-type="java.lang.Integer"></entry>
        </rabbit:queue-arguments>
    </rabbit:queue>

        <!--声明交换机 绑定-->
    <rabbit:topic-exchange name="test_exchange_ttl">
        <rabbit:bindings>
            <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。

设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。

如果两者都进行了设置,以时间短的为准。

1.5 死信队列

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
在这里插入图片描述

消息成为死信的三种情况:

  1. 队列消息长度到达限制;

  2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;

  3. 原队列存在消息过期设置,消息到达超时时间未被消费;

队列绑定死信交换机:

给队列设置参数: x-dead-letter-exchange 设置交换机的名称 和 x-dead-letter-routing-key

在这里插入图片描述

 /**
     * 发送测试死信消息:
     *  1 过期时间
     *  2 长度限制
     *  3 消息拒收
     */
    @Test
    public void testDlx(){
        // 测试过期时间,死信消息
        //rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hehe","我是一条消息,我会死吗?");

        //测试长度限制后,消息死信
        //for (int i = 0; i < 20; i++) {
        //    rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hehe","我是一条消息,我会死吗?");
        //}

        //3 测试消息拒收
        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hehe","我是一条消息,我会死吗?");
    }
   <!--1 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)-->
    <rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
        <!--3 正常队列绑定死信交换机-->
        <rabbit:queue-arguments>
            <!--x-dead-letter-exchange:死信交换机名称-->
            <entry key="x-dead-letter-exchange" value="exchange_dlx"/>

            <!--x-dead-letter-routing-key:发送给死信交换机的rooutingkey-->
            <entry key="x-dead-letter-routing-key" value="dlx.hehe"/>

            <!--设置队列的过期时间 ttl-->
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry>
            <!--设置队列的长度现在 max-length-->
            <entry key="x-max-length" value="10" value-type="java.lang.Integer"></entry>
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:topic-exchange name="test_exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <!--2 声明死信队列queue_dlx和死信交换机exchange_dlx-->
    <rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue>
    <rabbit:topic-exchange name="exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

消费者:

package com.itheima.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;



@Component
public class DlxListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {

        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try{
            System.out.println(new String(message.getBody()));
            // 处理业务
            System.out.println("处理业务逻辑");
            int i =3/0; //出错

            // 手动签收
            channel.basicAck(deliveryTag, true);
        }catch (Exception e) {
            /**
             * 拒绝签收 第三个参数 requeue重回队列 如果设置为true 则消息重回到queue broker会重新发送该消息给消费端
             */
            System.out.println("出现异常拒绝接受");
            //拒绝签收,不重回队列 requeue = false
            channel.basicNack(deliveryTag,true,false);
            //channel.basicReject(deliveryTag,true);
        }
    }
}
    <!--定义监听器-->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
        <rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"></rabbit:listener>
    </rabbit:listener-container>

测试

    @Test
    public void test(){
        while (true) {

        }
    }
}
  1. 死信交换机和死信队列和普通的没有区别

  2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

  3. 消息成为死信的三种情况:

  4. 队列消息长度到达限制;

  5. 消费者拒接消费消息,并且不重回队列;

  6. 原队列存在消息过期设置,消息到达超时时间未被消费;

1.6 延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

需求:

  1. 下单后,30分钟未支付,取消订单,回滚库存。

  2. 新用户注册成功7天后,发送短信问候。

实现方式:
3. 定时器

  1. 延迟队列

在这里插入图片描述
很可惜,在RabbitMQ中并未提供延迟队列功能。

但是可以使用:TTL+死信队列 组合实现延迟队列的效果。
在这里插入图片描述

 <!--延迟队列:
            1 定义正常交换机(order_exchange)和队列(order_queue)
            2 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)
            3 绑定,设置正常队列过期时间为30分钟
    -->
    <!--1 定义正常交换机(order_exchange)和队列(order_queue)-->
    <rabbit:queue id="order_queue" name="order_queue">
    <!--3 绑定,设置正常队列过期时间为30分钟-->
        <rabbit:queue-arguments>
            <!--x-dead-letter-exchange:死信交换机名称-->
            <entry key="x-dead-letter-exchange" value="order_exchange_dlx"/>
            <!--x-dead-letter-routing-key:发送给死信交换机的rooutingkey-->
            <entry key="x-dead-letter-routing-key" value="dlx.order.cancel"/>
            <!--设置队列的过期时间 ttl-->
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry>
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:topic-exchange name="order_exchange">
        <rabbit:bindings>
            <rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <!-- 2 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)-->
    <rabbit:queue id="order_queue_dlx" name="order_queue_dlx"></rabbit:queue>
    <rabbit:topic-exchange name="order_exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
 /**
     * 延迟队列
     */
    @Test
    public void testDelay() throws InterruptedException {
        // 发送订单消息 将来是在订单系统中下单后,发送消息
        rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息:id=1,time=2023年3月9日03:27:28");

        //打印倒计时10秒
        for (int i = 10; i >0 ; i--) {
            System.out.println(i+"...");
            Thread.sleep(1000);
        }
    }

消费者:

package com.itheima.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;


@Component
public class OrderListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {

        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try{
            System.out.println(new String(message.getBody()));
            // 处理业务
            System.out.println("处理业务逻辑");
            System.out.println("根据订单id查询其状态");
            System.out.println("判断状态是否为支付成功");
            System.out.println("取消订单,回滚库存...");

            // 手动签收
            channel.basicAck(deliveryTag, true);
        }catch (Exception e) {
            /**
             * 拒绝签收 第三个参数 requeue重回队列 如果设置为true 则消息重回到queue broker会重新发送该消息给消费端
             */
            System.out.println("出现异常拒绝接受");
            //拒绝签收,不重回队列 requeue = false
            channel.basicNack(deliveryTag,true,false);
            //channel.basicReject(deliveryTag,true);
        }
    }
}

测试

    @Test
    public void test(){
        while (true) {

        }
    }
}
  1. 延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。

  2. RabbitMQ没有提供延迟队列功能,但是可以使用 : TTL + DLX 来实现延迟队列效果。

1.7 日志与监控

1.7.1 RabbitMQ日志

RabbitMQ默认日志存放路径: /var/log/rabbitmq/rabbit@xxx.log

日志包含了RabbitMQ的版本号、Erlang的版本号、RabbitMQ服务节点名称、cookie的hash值、RabbitMQ配置文件地址、内存限制、磁盘限制、默认账户guest的创建以及权限配置等等。

1.7.3 rabbitmqctl管理和监控

查看队列

rabbitmqctl list_queues

查看exchanges

rabbitmqctl list_exchanges

查看用户

rabbitmqctl list_users

查看连接

rabbitmqctl list_connections

查看消费者信息

rabbitmqctl list_consumers

查看环境变量

rabbitmqctl environment

查看未被确认的队列

rabbitmqctl list_queues name messages_unacknowledged

查看单个队列的内存使用

rabbitmqctl list_queues name memory

查看准备就绪的队列

rabbitmqctl list_queues name messages_ready

1.8 消息追踪

在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于RabbitMQ而言,可能是因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制;也有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位。

在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能来实现消息追踪。

1.8 消息追踪-Firehose

firehose的机制是将生产者投递给rabbitmq的消息,rabbitmq投递给消费者的消息按照指定的格式发送到默认的exchange上。这个默认的exchange的名称为amq.rabbitmq.trace,它是一个topic类型的exchange。发送到这个exchange上的消息的routing key为 publish.exchangename 和 deliver.queuename。其中exchangename和queuename为实际exchange和queue的名称,分别对应生产者投递到exchange的消息,和消费者从queue上获取的消息。

注意:打开 trace 会影响消息写入功能,适当打开后请关闭。
rabbitmqctl trace_on:开启Firehose命令

rabbitmqctl trace_off:关闭Firehose命令

1.8 消息追踪-rabbitmq_tracing

rabbitmq_tracing和Firehose在实现上如出一辙,只不过rabbitmq_tracing的方式比Firehose多了一层GUI的包装,更容易使用和管理。

启用插件:rabbitmq-plugins enable rabbitmq_tracing

RabbitMQ应用问题

  1. 消息可靠性保障
    消息补偿机制
  2. 消息幂等性保障
    乐观锁解决方案
    在这里插入图片描述

2.2 消息幂等性保障

幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。

在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。

在这里插入图片描述

3.RabbitMQ集群搭建

摘要:实际生产应用中都会采用消息队列的集群方案,如果选择RabbitMQ那么有必要了解下它的集群方案原理

一般来说,如果只是为了学习RabbitMQ或者验证业务工程的正确性那么在本地环境或者测试环境上使用其单实例部署就可以了,但是出于MQ中间件本身的可靠性、并发性、吞吐量和消息堆积能力等问题的考虑,在生产环境上一般都会考虑使用RabbitMQ的集群方案。

3.1 集群方案的原理

RabbitMQ这款消息队列中间件产品本身是基于Erlang编写,Erlang语言天生具备分布式特性(通过同步Erlang集群各节点的magic cookie来实现)。因此,RabbitMQ天然支持Clustering。这使得RabbitMQ本身不需要像ActiveMQ、Kafka那样通过ZooKeeper分别来实现HA方案和保存集群的元数据。集群是保证可靠性的一种方式,同时可以通过水平扩展以达到增加消息吞吐量能力的目的。
在这里插入图片描述

3.2 单机多实例部署

由于某些因素的限制,有时候你不得不在一台机器上去搭建一个rabbitmq集群,这个有点类似zookeeper的单机版。真实生成环境还是要配成多机集群的。有关怎么配置多机集群的可以参考其他的资料,这里主要论述如何在单机中配置多个rabbitmq实例。

主要参考官方文档:https://www.rabbitmq.com/clustering.html

首先确保RabbitMQ运行没有问题
``shell
[root@super ~]# rabbitmqctl status
Status of node rabbit@super …
[{pid,10232},
{running_applications,
[{rabbitmq_management,“RabbitMQ Management Console”,“3.6.5”},
{rabbitmq_web_dispatch,“RabbitMQ Web Dispatcher”,“3.6.5”},
{webmachine,“webmachine”,“1.10.3”},
{mochiweb,“MochiMedia Web Server”,“2.13.1”},
{rabbitmq_management_agent,“RabbitMQ Management Agent”,“3.6.5”},
{rabbit,“RabbitMQ”,“3.6.5”},
{os_mon,“CPO CXC 138 46”,“2.4”},
{syntax_tools,“Syntax tools”,“1.7”},
{inets,“INETS CXC 138 49”,“6.2”},
{amqp_client,“RabbitMQ AMQP Client”,“3.6.5”},
{rabbit_common,[],“3.6.5”},
{ssl,“Erlang/OTP SSL application”,“7.3”},
{public_key,“Public key infrastructure”,“1.1.1”},
{asn1,“The Erlang ASN1 compiler version 4.0.2”,“4.0.2”},
{ranch,“Socket acceptor pool for TCP protocols.”,“1.2.1”},
{mnesia,“MNESIA CXC 138 12”,“4.13.3”},
{compiler,“ERTS CXC 138 10”,“6.0.3”},
{crypto,“CRYPTO”,“3.6.3”},
{xmerl,“XML parser”,“1.3.10”},
{sasl,“SASL CXC 138 11”,“2.7”},
{stdlib,“ERTS CXC 138 10”,“2.8”},
{kernel,“ERTS CXC 138 10”,“4.2”}]},
{os,{unix,linux}},
{erlang_version,
“Erlang/OTP 18 [erts-7.3] [source] [64-bit] [async-threads:64] [hipe] [kernel-poll:true]\n”},
{memory,
[{total,56066752},
{connection_readers,0},
{connection_writers,0},
{connection_channels,0},
{connection_other,2680},
{queue_procs,268248},
{queue_slave_procs,0},
{plugins,1131936},
{other_proc,18144280},
{mnesia,125304},
{mgmt_db,921312},
{msg_index,69440},
{other_ets,1413664},
{binary,755736},
{code,27824046},
{atom,1000601},
{other_system,4409505}]},
{alarms,[]},
{listeners,[{clustering,25672,“::”},{amqp,5672,“::”}]},
{vm_memory_high_watermark,0.4},
{vm_memory_limit,411294105},
{disk_free_limit,50000000},
{disk_free,13270233088},
{file_descriptors,
[{total_limit,924},{total_used,6},{sockets_limit,829},{sockets_used,0}]},
{processes,[{limit,1048576},{used,262}]},
{run_queue,0},
{uptime,43651},
{kernel,{net_ticktime,60}}]


停止rabbitmq服务

```shell
[root@super sbin]# service rabbitmq-server stop
Stopping rabbitmq-server: rabbitmq-server.

启动第一个节点:

[root@super sbin]# RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit1 rabbitmq-server start

              RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc.
  ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
  ##  ##
  ##########  Logs: /var/log/rabbitmq/rabbit1.log
  ######  ##        /var/log/rabbitmq/rabbit1-sasl.log
  ##########
              Starting broker...
 completed with 6 plugins.

启动第二个节点:

web管理插件端口占用,所以还要指定其web插件占用的端口号。

[root@super ~]# RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15674}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server start

              RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc.
  ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
  ##  ##
  ##########  Logs: /var/log/rabbitmq/rabbit2.log
  ######  ##        /var/log/rabbitmq/rabbit2-sasl.log
  ##########
              Starting broker...
 completed with 6 plugins.

结束命令:

rabbitmqctl -n rabbit1 stop
rabbitmqctl -n rabbit2 stop

rabbit1操作作为主节点:

```shell
[root@super ~]# rabbitmqctl -n rabbit1 stop_app  
Stopping node rabbit1@super ...
[root@super ~]# rabbitmqctl -n rabbit1 reset	 
Resetting node rabbit1@super ...
[root@super ~]# rabbitmqctl -n rabbit1 start_app
Starting node rabbit1@super ...
[root@super ~]# 

rabbit2操作为从节点:

[root@super ~]# rabbitmqctl -n rabbit2 stop_app
Stopping node rabbit2@super ...
[root@super ~]# rabbitmqctl -n rabbit2 reset
Resetting node rabbit2@super ...
[root@super ~]# rabbitmqctl -n rabbit2 join_cluster rabbit1@'super' ###''内是主机名换成自己的
Clustering node rabbit2@super with rabbit1@super ...
[root@super ~]# rabbitmqctl -n rabbit2 start_app
Starting node rabbit2@super ...

查看集群状态:

[root@super ~]# rabbitmqctl cluster_status -n rabbit1
Cluster status of node rabbit1@super ...
[{nodes,[{disc,[rabbit1@super,rabbit2@super]}]},
 {running_nodes,[rabbit2@super,rabbit1@super]},
 {cluster_name,<<"rabbit1@super">>},
 {partitions,[]},
 {alarms,[{rabbit2@super,[]},{rabbit1@super,[]}]}]

web监控:

在这里插入图片描述

3.3 集群管理

rabbitmqctl join_cluster {cluster_node} [–ram]
将节点加入指定集群中。在这个命令执行前需要停止RabbitMQ应用并重置节点。

rabbitmqctl cluster_status
显示集群的状态。

rabbitmqctl change_cluster_node_type {disc|ram}
修改集群节点的类型。在这个命令执行前需要停止RabbitMQ应用。

rabbitmqctl forget_cluster_node [–offline]
将节点从集群中删除,允许离线执行。

rabbitmqctl update_cluster_nodes {clusternode}

在集群中的节点应用启动前咨询clusternode节点的最新信息,并更新相应的集群信息。这个和join_cluster不同,它不加入集群。考虑这样一种情况,节点A和节点B都在集群中,当节点A离线了,节点C又和节点B组成了一个集群,然后节点B又离开了集群,当A醒来的时候,它会尝试联系节点B,但是这样会失败,因为节点B已经不在集群中了。

rabbitmqctl cancel_sync_queue [-p vhost] {queue}
取消队列queue同步镜像的操作。

rabbitmqctl set_cluster_name {name}
设置集群名称。集群名称在客户端连接时会通报给客户端。Federation和Shovel插件也会有用到集群名称的地方。集群名称默认是集群中第一个节点的名称,通过这个命令可以重新设置。

3.4 RabbitMQ镜像集群配置

上面已经完成RabbitMQ默认集群模式,但并不保证队列的高可用性,尽管交换机、绑定这些可以复制到集群里的任何一个节点,但是队列内容不会复制。虽然该模式解决一项目组节点压力,但队列节点宕机直接导致该队列无法应用,只能等待重启,所以要想在队列节点宕机或故障也能正常应用,就要复制队列内容到集群里的每个节点,必须要创建镜像队列。

镜像队列是基于普通的集群模式的,然后再添加一些策略,所以你还是得先配置普通集群,然后才能设置镜像队列,我们就以上面的集群接着做。

设置的镜像队列可以通过开启的网页的管理端Admin->Policies,也可以通过命令。

rabbitmqctl set_policy my_ha “^” ‘{“ha-mode”:“all”}’

在这里插入图片描述

  • Name:策略名称
  • Pattern:匹配的规则,如果是匹配所有的队列,是^.
  • Definition:使用ha-mode模式中的all,也就是同步所有匹配的队列。问号链接帮助文档。

3.5 负载均衡-HAProxy

HAProxy提供高可用性、负载均衡以及基于TCP和HTTP应用的代理,支持虚拟主机,它是免费、快速并且可靠的一种解决方案,包括Twitter,Reddit,StackOverflow,GitHub在内的多家知名互联网公司在使用。HAProxy实现了一种事件驱动、单一进程模型,此模型支持非常大的并发连接数。

3.5.1 安装HAProxy
//下载依赖包
yum install gcc vim wget
//上传haproxy源码包
//解压
tar -zxvf haproxy-1.6.5.tar.gz -C /usr/local
//进入目录、进行编译、安装
cd /usr/local/haproxy-1.6.5
make TARGET=linux31 PREFIX=/usr/local/haproxy
make install PREFIX=/usr/local/haproxy
mkdir /etc/haproxy
//赋权
groupadd -r -g 149 haproxy
useradd -g haproxy -r -s /sbin/nologin -u 149 haproxy
//创建haproxy配置文件
vim /etc/haproxy/haproxy.cfg
3.5.2 配置HAProxy

配置文件路径:/etc/haproxy/haproxy.cfg

#logging options
global
	log 127.0.0.1 local0 info
	maxconn 5120
	chroot /usr/local/haproxy
	uid 99
	gid 99
	daemon
	quiet
	nbproc 20
	pidfile /var/run/haproxy.pid

defaults
	log global
	
	mode tcp

	option tcplog
	option dontlognull
	retries 3
	option redispatch
	maxconn 2000
	contimeout 5s
   
     clitimeout 60s

     srvtimeout 15s	
#front-end IP for consumers and producters

listen rabbitmq_cluster
	bind 0.0.0.0:5672
	
	mode tcp
	#balance url_param userid
	#balance url_param session_id check_post 64
	#balance hdr(User-Agent)
	#balance hdr(host)
	#balance hdr(Host) use_domain_only
	#balance rdp-cookie
	#balance leastconn
	#balance source //ip
	
	balance roundrobin
	
        server node1 127.0.0.1:5673 check inter 5000 rise 2 fall 2
        server node2 127.0.0.1:5674 check inter 5000 rise 2 fall 2

listen stats
	bind 172.16.98.133:8100
	mode http
	option httplog
	stats enable
	stats uri /rabbitmq-stats
	stats refresh 5s

启动HAproxy负载

/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg
//查看haproxy进程状态
ps -ef | grep haproxy

访问如下地址对mq节点进行监控
http://175.24.181.110:8100/rabbitmq-stats

代码中访问mq集群地址,则变为访问haproxy地址:5672

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/404180.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

docker基本命令-容器

容器 基本概念 镜像&#xff08;Image&#xff09;和容器&#xff08;Container&#xff09;的关系&#xff0c;就像是面向对象程序设计中的 类 和 实例 一样&#xff0c;镜像是静态的定义&#xff0c;容器是镜像运行时的实体。容器可以被创建、启动、停止、删除、暂停等。 容…

.NET Framework .NET Core与 .NET 的区别

我们在创建C#程序时,经常会看到目标框架以下的选项,那么究竟有什么区别? 首先 .NET是一种用于构建多种应用的免费开源开发平台,可以使用多种语言,编辑器和库开发Web应用、Web API和微服务、云中的无服务器函数、云原生应用、移动应用、桌面应用、Windows WPF、Windows窗体…

搭建一个中心化的定时服务

1. 背景 在物联网络&#xff0c;很多设备之间都在进行交互&#xff0c;其中云端在远程交流中起到了很重要的作用。比如&#xff0c;一台设备想进行调温&#xff0c;但是需要知道此时房间的温度&#xff0c;那就需要定时去查询传感器测出来的房间温度&#xff0c;如果温度过高&a…

【C++学习】【STL】list容器

list 容器&#xff0c;又称双向链表容器&#xff0c;即该容器的底层是以双向链表的形式实现的。这意味着&#xff0c;list 容器中的元素可以分散存储在内存空间里&#xff0c;而不是必须存储在一整块连续的内存空间中。可以看到&#xff0c;list 容器中各个元素的前后顺序是靠指…

【NodeJs】使用ffmpeg将视频webm转换为mp4

使用Chrome浏览器录制视频文件是webm格式&#xff0c;但是很多媒体播放器是不支持的&#xff0c;不利于分享&#xff0c;需要转换为mp4格式才行&#xff0c;接下来给大家讲 ffmpeg ffmpeg是什么呢&#xff0c; 一个免费开源的视频转换工具&#xff0c;一款音视频编解码工具&…

日志与可视化方案:从ELK到EFK,再到ClickHouse

EFK方案 从ELK谈起 ELK是三个开源软件的缩写&#xff0c;分别表示&#xff1a;Elasticsearch&#xff0c;Logstash&#xff0c;Kibana。新增了一个FlieBeat&#xff0c;它是一个轻量级的日志收集处理工具&#xff0c;FlieBeat占用资源少&#xff0c;适用于在各个服务器上搜集…

JS语法(扫盲)

文章目录一、初识JavaScript二、第一个JS程序JS代码的引入JS程序的输出三、语法变量使用动态类型内置类型运算符强类型语言&弱类型语言条件语句循环语句数组创建数组获取数组元素新增数组元素删除数组元素函数语法格式形参实参个数的问题匿名函数&函数表达式作用域作用…

PHP 的运行方式有哪些?

PHP本质上的运行方式可以分为两种&#xff1a; 基于命令行的基于PHP-FPM的 但实际上&#xff0c;PHP能做的事很多&#xff0c;很多场景下&#xff0c;不同的运行方式能让开发更方便&#xff0c;减轻各种工作。 测试开发 PHP内置了一个HTTP 的server。这意味着&#xff0c;很…

stm32外设-GPIO

0. 写在最前 本栏目笔记都是基于stm32F10x 1. GPIO基本介绍 GPIO—general purpose intput output 是通用输入输出端口的简称&#xff0c;简单来说就是软件可控制的引脚&#xff0c; STM32芯片的GPIO引脚与外部设备连接起来&#xff0c;从而实现与外部通讯、控制以及数据采集的…

java Date 和 Calendar类 万字详解(通俗易懂)

Date类介绍及使用关于SimpleDateFormat类Calendar类介绍及使用LocalDateTime类介绍及使用关于DateTimeFormatter类一、前言本节内容是我们《API-常用类》专题的第五小节了。本节内容主要讲Date 类 和 Calendar 类&#xff0c;内容包括但不限于Date类简介&#xff0c;Date类使用…

【微信小程序】-- 自定义组件 - 数据监听器 (三十四)

&#x1f48c; 所属专栏&#xff1a;【微信小程序开发教程】 &#x1f600; 作  者&#xff1a;我是夜阑的狗&#x1f436; &#x1f680; 个人简介&#xff1a;一个正在努力学技术的CV工程师&#xff0c;专注基础和实战分享 &#xff0c;欢迎咨询&#xff01; &…

传奇开服流程—传奇单机架设教程

现在传奇私服还是那么的火爆&#xff0c;上次有报道发布站一年盈利几个亿&#xff0c;还是有很大的机会&#xff0c;很多玩家因为GM开服关服给折腾&#xff0c;刚充的钱服务器就关了&#xff0c;很是恼火&#xff0c;于是都想自己整个服开开&#xff0c;但又不知道从何下手&…

三菱FX5U之数据处理类指令的使用

本课程使用三菱PLC works3编程软件进行教学&#xff0c;并使用works3的仿真功能进行PLC仿真&#xff0c;学习的时候不需要有实物PLC。 补充说明&#xff1a;三菱 FX 5U系列PLC使用的是GX works3编程软件&#xff0c;FX 3U、Q系列PLC使用的是GX works3编程软件。 第一章 八个案…

YUV实践记录

文章目录YUV基础介绍&#xff1a;不同采样YUV格式的区别为什么要使用YUV格式呢&#xff1f;YUV的存储方式Android中的YUV_420_888附录&#xff1a;YUV基础介绍&#xff1a; YUV在做手机图像或者视频处理的时候会经常用到的一个格式&#xff0c;用此文来记录YUV相关介绍&#xf…

hibernate学习(五)

hibernate学习&#xff08;五&#xff09; hibernate的一对多关联映射&#xff1a; 一、数据库表与表之间关系 一对多建表原则&#xff1a; 多对多的建表原则&#xff1a; 一对一建表原则&#xff1a; &#xff08;1&#xff09;唯一外键对应&#xff1a; &#xff08;…

时间复杂度和空间复杂度的计算

目录 算法的复杂度 时间复杂的的概念 时间复杂度计算方法 大O的渐进表示法 空间复杂的概念 空间复杂的的计算方法 时间和空间复杂度的应用 消失的数字 轮转数组 算法的复杂度 算法在编写成可执行程序后&#xff0c;运行时需要耗费时间资源和空间&#xff08;内存&…

modbus转profinet网关连接5台台达ME300变频器案例

通过兴达易控Modbus转Profinet&#xff08;XD-MDPN100&#xff09;网关改善网络场景&#xff0c;变频器有掉线或数据丢失报警&#xff0c;影响系统的正常运行&#xff0c;将5台 ME300变频器modbus转Profinet到1200PLC&#xff0c;通过网关还可以实现Profinet转modbus RTU协议转…

LabVIEW中以编程方式获取VI克隆名称

LabVIEW中以编程方式获取VI克隆名称演示如何以编程方式获取VI的名称或克隆名称。如果VI作为顶级VI运行&#xff0c;则将显示VI的名称。如果VI在主VI中用作子VI&#xff0c;它将返回克隆的名称。在项目开发过程中&#xff0c;有时需要获取VI的名称。在此示例中&#xff0c;实现了…

【数论】试除法判断质数,分解质因数,筛质数

Halo&#xff0c;这里是Ppeua。平时主要更新C语言&#xff0c;C&#xff0c;数据结构算法......感兴趣就关注我吧&#xff01;你定不会失望。 &#x1f308;个人主页&#xff1a;主页链接 &#x1f308;算法专栏&#xff1a;专栏链接 现已更新完KMP算法、排序模板&#xff0c;之…

代码管理--svnadmin工具介绍

1、简介 SVNAdmin2 是一款通过图形界面管理服务端SVN的web程序。正常情况下配置SVN仓库的人员权限需要登录到服务器手动修改 authz 和 passwd 两个文件&#xff0c;当仓库结构和人员权限上了规模后&#xff0c;手动管理就变的非常容易出错&#xff0c;本系统能够识别人员和权限…