Java使用RabbitMQ实战,Springboot使用rabbitMQ实战

news2024/12/25 1:30:31

文章目录

  • 一、Java原生API
    • 1、简单实例
    • 2、延迟消息
    • 3、消费端限流
    • 4、消息属性设置
    • 5、消息可靠投递
  • 二、Spring-API
    • 1、简单实例
      • (1)引入rabbitMQ.xml
      • (2)生产者
      • (3)消费者
      • (4)测试类
  • 三、SpringBoot-API
    • 1、spring-amqp介绍
      • (1)Spring-AMQP核心对象
      • (2)官方文档
      • (3)官方文档翻译
      • (4)SpringBoot参数(2.1.5)
    • 2、配置信息
      • (1)手动配置连接
      • (2)配置Container
      • (3)配置template
    • 3、简单实例
    • 4、消费者消息自动转换
    • 5、延迟消息
    • 6、消息可靠投递

一、Java原生API

引入依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.6.0</version>
</dependency>

1、简单实例


import com.rabbitmq.client.*;
import java.io.IOException;

/**
 * 消息消费者
 */
public class MyConsumer {
    private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE";
    private final static String QUEUE_NAME = "SIMPLE_QUEUE";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 连接IP
        factory.setHost("192.168.56.10");
        // 默认监听端口
        factory.setPort(5672);
        // 虚拟机
        factory.setVirtualHost("/");

        // 设置访问的用户
        factory.setUsername("admin");
        factory.setPassword("admin");
        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        // 声明交换机
        // String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
        channel.exchangeDeclare(EXCHANGE_NAME,"direct",false, false, null);

        // 声明队列
        // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" Waiting for message....");

        // 绑定队列和交换机,以及routingKey
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"my.test");

        // 创建消费者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("Received message : '" + msg + "'");
                System.out.println("consumerTag : " + consumerTag );
                System.out.println("deliveryTag : " + envelope.getDeliveryTag() );
            }
        };

        // 开始获取消息
        // String queue, boolean autoAck, Consumer callback
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 消息生产者
 */
public class MyProducer {
    private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 连接IP
        factory.setHost("192.168.56.10");
        // 连接端口
        factory.setPort(5672);
        // 虚拟机
        factory.setVirtualHost("/");
        // 用户
        factory.setUsername("admin");
        factory.setPassword("admin");

        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        // 发送消息
        String msg = "Hello world, Rabbit MQ";

        // String exchange, String routingKey, BasicProperties props, byte[] body
        channel.basicPublish(EXCHANGE_NAME, "my.test", null, msg.getBytes());

        channel.close();
        conn.close();
    }
}

2、延迟消息

RabbitMQ实现延迟消息,RabbitMQ使用死信队列实现延迟消息,RabbitMQ延时队列插件

3、消费端限流

//非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息。
channel.basicQos(2);
channel.basicConsume(QUEUE_NAME, false, consumer);

4、消息属性设置

Map<String, Object> headers = new HashMap<String, Object>();
headers.put("name", "gupao");
headers.put("level", "top");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .deliveryMode(2)   // 2代表持久化
        .contentEncoding("UTF-8")  // 编码
        .expiration("10000")  // TTL,过期时间
        .headers(headers) // 自定义属性
        .priority(5) // 优先级,默认为5,配合队列的 x-max-priority 属性使用
        .messageId(String.valueOf(UUID.randomUUID()))
        .build();
channel.basicPublish("", QUEUE_NAME, properties, msg.getBytes());

5、消息可靠投递

RabbitMQ保证消息的可靠投递,Java实现RabbitMQ消息的可靠投递,Springboot实现RabbitMQ消息的可靠投递

二、Spring-API

引包:

<!--rabbitmq依赖 -->
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>1.3.5.RELEASE</version>
</dependency>

1、简单实例

(1)引入rabbitMQ.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">

    <!--配置connection-factory,指定连接rabbit server参数 -->
    <rabbit:connection-factory id="connectionFactory" virtual-host="/" username="admin" password="admin" host="192.168.56.10" port="5672" />

    <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
    <rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />

    <!--######分隔线######-->
    <!--定义queue -->
    <rabbit:queue name="MY_FIRST_QUEUE" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />

    <!--定义direct exchange,绑定MY_FIRST_QUEUE -->
    <rabbit:direct-exchange name="MY_DIRECT_EXCHANGE" durable="true" auto-delete="false" declared-by="connectAdmin">
        <rabbit:bindings>
            <rabbit:binding queue="MY_FIRST_QUEUE" key="FirstKey">
            </rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!--定义rabbit template用于数据的接收和发送 -->
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="MY_DIRECT_EXCHANGE" />

    <!--消息接收者 -->
    <bean id="messageReceiver" class="com.gupaoedu.consumer.FirstConsumer"></bean>

    <!--queue listener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener queues="MY_FIRST_QUEUE" ref="messageReceiver" />
    </rabbit:listener-container>




    <!--定义queue -->
    <rabbit:queue name="MY_SECOND_QUEUE" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />

    <!-- 将已经定义的Exchange绑定到MY_SECOND_QUEUE,注意关键词是key -->
    <rabbit:direct-exchange name="MY_DIRECT_EXCHANGE" durable="true" auto-delete="false" declared-by="connectAdmin">
        <rabbit:bindings>
            <rabbit:binding queue="MY_SECOND_QUEUE" key="SecondKey"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- 消息接收者 -->
    <bean id="receiverSecond" class="com.gupaoedu.consumer.SecondConsumer"></bean>

    <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener queues="MY_SECOND_QUEUE" ref="receiverSecond" />
    </rabbit:listener-container>

    <!--######分隔线######-->
    <!--定义queue -->
    <rabbit:queue name="MY_THIRD_QUEUE" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />

    <!-- 定义topic exchange,绑定MY_THIRD_QUEUE,注意关键词是pattern -->
    <rabbit:topic-exchange name="MY_TOPIC_EXCHANGE" durable="true" auto-delete="false" declared-by="connectAdmin">
        <rabbit:bindings>
            <rabbit:binding queue="MY_THIRD_QUEUE" pattern="#.Third.#"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <!--定义rabbit template用于数据的接收和发送 -->
    <rabbit:template id="amqpTemplate2" connection-factory="connectionFactory" exchange="MY_TOPIC_EXCHANGE" />

    <!-- 消息接收者 -->
    <bean id="receiverThird" class="com.gupaoedu.consumer.ThirdConsumer"></bean>

    <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener queues="MY_THIRD_QUEUE" ref="receiverThird" />
    </rabbit:listener-container>

    <!--######分隔线######-->
    <!--定义queue -->
    <rabbit:queue name="MY_FOURTH_QUEUE" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />

    <!-- 定义fanout exchange,绑定MY_FIRST_QUEUE 和 MY_FOURTH_QUEUE -->
    <rabbit:fanout-exchange name="MY_FANOUT_EXCHANGE" auto-delete="false" durable="true" declared-by="connectAdmin" >
        <rabbit:bindings>
            <rabbit:binding queue="MY_FIRST_QUEUE"></rabbit:binding>
            <rabbit:binding queue="MY_FOURTH_QUEUE"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:fanout-exchange>

    <!-- 消息接收者 -->
    <bean id="receiverFourth" class="com.gupaoedu.consumer.FourthConsumer"></bean>

    <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener queues="MY_FOURTH_QUEUE" ref="receiverFourth" />
    </rabbit:listener-container>
</beans>

(2)生产者

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

/**
 * 消息生产者
 */
@Service
public class MessageProducer {
    private Logger logger = LoggerFactory.getLogger(MessageProducer.class);

    @Autowired
    @Qualifier("amqpTemplate")
    private AmqpTemplate amqpTemplate;

    @Autowired
    @Qualifier("amqpTemplate2")
    private AmqpTemplate amqpTemplate2;

    /**
     * 演示三种交换机的使用
     */
    public void sendMessage(Object message) {


        // amqpTemplate 默认交换机 MY_DIRECT_EXCHANGE
        // amqpTemplate2 默认交换机 MY_TOPIC_EXCHANGE

        // Exchange 为 direct 模式,直接指定routingKey
        amqpTemplate.convertAndSend("FirstKey", "[Direct,FirstKey] "+message);
        amqpTemplate.convertAndSend("SecondKey", "[Direct,SecondKey] "+message);

        // Exchange模式为topic,通过topic匹配关心该主题的队列
        amqpTemplate2.convertAndSend("msg.Third.send","[Topic,msg.Third.send] "+message);

        // 广播消息,与Exchange绑定的所有队列都会收到消息,routingKey为空
        amqpTemplate2.convertAndSend("MY_FANOUT_EXCHANGE",null,"[Fanout] "+message);
    }
}

(3)消费者

public class FirstConsumer implements MessageListener {
    private Logger logger = LoggerFactory.getLogger(FirstConsumer.class);

    public void onMessage(Message message) {
        logger.info("The first consumer received message : " + message.getBody());
    }
}
public class SecondConsumer implements MessageListener {
    private Logger logger = LoggerFactory.getLogger(SecondConsumer.class);

    public void onMessage(Message message) {
        logger.info("The second consumer received message : " + message);
    }
}
public class ThirdConsumer implements MessageListener {
    private Logger logger = LoggerFactory.getLogger(ThirdConsumer.class);

    public void onMessage(Message message) {
        logger.info("The third cosumer received message : " + message);
    }
}

public class FourthConsumer implements MessageListener {
    private Logger logger = LoggerFactory.getLogger(FourthConsumer.class);

    public void onMessage(Message message) {
        logger.info("The fourth consumer received message : " + message);
    }
}


(4)测试类

import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.gupaoedu.producer.MessageProducer;
public class RabbitTest {
    private ApplicationContext context = null;

    @Test
    public void sendMessage() {
        context = new ClassPathXmlApplicationContext("applicationContext.xml");
        MessageProducer messageProducer = (MessageProducer) context.getBean("messageProducer");
        int k = 100;
        while (k > 0) {
            messageProducer.sendMessage("第" + k + "次发送的消息");
            k--;
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

三、SpringBoot-API

1、spring-amqp介绍

Spring AMQP是对Spring基于AMQP的消息收发解决方案,它是一个抽象层,不依赖于特定的AMQP Broker实现和客户端的抽象,所以可以很方便地替换。比如我们可以使用spring-rabbit来实现。

<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
  <version>2.1.6.RELEASE</version>
</dependency>

其中包含了3个jar包:
amqp-client(java api的包)、spring-amqp(对amqp的封装)、spring.rabbit(rabbitmq对AMQP在Spring中的实现)

(1)Spring-AMQP核心对象

在Spring AMQP中,对RabbitMQ的Java API进一步进行了封装,让我们实现更加简单,主要封装对象:
在这里插入图片描述

(2)官方文档

https://docs.spring.io/spring-amqp/docs/2.4.13/reference/html/

(3)官方文档翻译

Spring整合RabbitMQ,SpringBoot整合RabbitMQ,Spring-AMQP官方文档详解

(4)SpringBoot参数(2.1.5)

注:前缀spring.rabbitmq全部省略了。

全部配置总体上分为三大类:连接类、消息消费类、消息发送类
在这里插入图片描述

2、配置信息

(1)手动配置连接

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.ConsumerTagStrategy;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AmqpConfig {

    @Bean
    public ConnectionFactory connectionFactory() throws Exception {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setUri(ResourceUtil.getKey("rabbitmq.uri"));
        return cachingConnectionFactory;
    }

    @Bean
    public RabbitAdmin amqpAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin admin = new RabbitAdmin(connectionFactory);
        // admin.setAutoStartup(true);
        return admin;
    }

    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            public String createConsumerTag(String queue) {
                return null;
            }
        });
        return container;
    }

}

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
@ComponentScan(basePackages = "com.amqp")
public class AdminTest {

    public static void main(String[] args) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(AdminTest.class);
        RabbitAdmin rabbitAdmin = context.getBean(RabbitAdmin.class);

        // 声明一个交换机
        rabbitAdmin.declareExchange(new DirectExchange("ADMIN_EXCHANGE", false, false));

        // 声明一个队列
        rabbitAdmin.declareQueue(new Queue("ADMIN_QUEUE", false, false, false));

        // 声明一个绑定
        rabbitAdmin.declareBinding( new Binding("ADMIN_QUEUE", Binding.DestinationType.QUEUE,
                "ADMIN_EXCHANGE", "admin", null));

    }
}

(2)配置Container

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.amqp.support.ConsumerTagStrategy;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.UUID;

@Configuration
public class ContainerConfig {

    @Bean
    public ConnectionFactory connectionFactory() throws Exception {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        return cachingConnectionFactory;
    }

    @Bean
    public RabbitAdmin amqpAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin admin = new RabbitAdmin(connectionFactory);
        // admin.setAutoStartup(true);
        return admin;
    }


    @Bean("secondQueue")
    public Queue getSecondQueue(){
        return new Queue("BASIC_SECOND_QUEUE");
    }

    @Bean("thirdQueue")
    public Queue getThirdQueue(){
        return new Queue("BASIC_THIRD_QUEUE");
    }

    @Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(getSecondQueue(), getThirdQueue()); //监听的队列
        container.setConcurrentConsumers(1); // 最小消费者数
        container.setMaxConcurrentConsumers(5); //  最大的消费者数量
        container.setDefaultRequeueRejected(false); //是否重回队列
        container.setAcknowledgeMode(AcknowledgeMode.AUTO); //签收模式
        container.setExposeListenerChannel(true);

        container.setConsumerTagStrategy(new ConsumerTagStrategy() {    //消费端的标签策略
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID().toString();
            }
        });
        return container;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter()); // 消息转换器
        factory.setAcknowledgeMode(AcknowledgeMode.NONE); // 签收模式
        factory.setAutoStartup(true);

        factory.setConcurrentConsumers(2); // 最小消费者数
        factory.setMaxConcurrentConsumers(6); //最大消费者数
        factory.setTransactionManager(rabbitTransactionManager(connectionFactory));
        return factory;
    }

    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }


}

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.net.URI;
import java.net.URISyntaxException;

/**
 * 配置类的代码用不到,只用来演示
 */
public class ContainerSender {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new CachingConnectionFactory(new URI("amqp://guest:guest@localhost:5672"));
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        SimpleMessageListenerContainer container = factory.createListenerContainer();
        // 不用工厂模式也可以创建
        // SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setConcurrentConsumers(1);
        container.setQueueNames("BASIC_SECOND_QUEUE");
        container.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                System.out.println("收到消息:"+message);
            }
        });
        container.start();

        AmqpTemplate template = new RabbitTemplate(connectionFactory);
        template.convertAndSend("BASIC_SECOND_QUEUE", "msg 1");
        template.convertAndSend("BASIC_SECOND_QUEUE", "msg 2");
        template.convertAndSend("BASIC_SECOND_QUEUE", "msg 3");
    }

}

(3)配置template

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TemplateConfig {
    @Bean
    public ConnectionFactory connectionFactory() throws Exception {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setUri(ResourceUtil.getKey("rabbitmq.uri"));
        return cachingConnectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){
            public void returnedMessage(Message message,
                                        int replyCode,
                                        String replyText,
                                        String exchange,
                                        String routingKey){
                System.out.println("回发的消息:");
                System.out.println("replyCode: "+replyCode);
                System.out.println("replyText: "+replyText);
                System.out.println("exchange: "+exchange);
                System.out.println("routingKey: "+routingKey);
            }
        });

        rabbitTemplate.setChannelTransacted(true);

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (!ack) {
                    System.out.println("发送消息失败:" + cause);
                    throw new RuntimeException("发送异常:" + cause);
                }
            }
        });
        return rabbitTemplate;
    }
}
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

@ComponentScan(basePackages = "com.amqp.template")
public class TemplateSender {
    public static void main(String[] args) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(TemplateSender.class);
        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    System.out.println("消息确认成功");
                } else {
                    // nack
                    System.out.println("消息确认失败");
                }
            }
        });

        rabbitTemplate.convertAndSend("BASIC_FANOUT_EXCHANGE", "", "this is a msg");
    }
}

3、简单实例

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.ConsumerTagStrategy;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig {

    /**
     * 都可以使用缺省对象
     * @return
     * @throws Exception
     */
    @Bean
    public ConnectionFactory connectionFactory() throws Exception {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setUri("amqp://guest:guest@localhost:5672");
        return cachingConnectionFactory;
    }

    @Bean
    public RabbitAdmin amqpAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin admin = new RabbitAdmin(connectionFactory);
        admin.setAutoStartup(true);
        return admin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }

    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            public String createConsumerTag(String queue) {
                return null;
            }
        });
        return container;
    }

    // 两个交换机
    @Bean("topicExchange")
    public TopicExchange getTopicExchange(){
        return new TopicExchange("GP_BASIC_TOPIC_EXCHANGE");
    }

    @Bean("fanoutExchange")
    public FanoutExchange getFanoutExchange(){
        return  new FanoutExchange("GP_BASIC_FANOUT_EXCHANGE");
    }

    // 三个队列
    @Bean("firstQueue")
    public Queue getFirstQueue(){
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-message-ttl",6000);
        Queue queue = new Queue("GP_BASIC_FIRST_QUEUE", false, false, true, args);
        return queue;
    }

    @Bean("secondQueue")
    public Queue getSecondQueue(){
        return new Queue("GP_BASIC_SECOND_QUEUE");
    }

    @Bean("thirdQueue")
    public Queue getThirdQueue(){
        return new Queue("GP_BASIC_THIRD_QUEUE");
    }

    // 两个绑定
    @Bean
    public Binding bindSecond(@Qualifier("secondQueue") Queue queue, @Qualifier("topicExchange") TopicExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("#.gupao.#");
    }

    @Bean
    public Binding bindThird(@Qualifier("thirdQueue") Queue queue, @Qualifier("fanoutExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }
}

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

/**
 * 生产者
 */
@ComponentScan(basePackages = "com.basic")
public class BasicSender {

    public static void main(String[] args) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(BasicSender.class);
        RabbitAdmin rabbitAdmin = context.getBean(RabbitAdmin.class);
        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);

        rabbitTemplate.convertAndSend("","BASIC_FIRST_QUEUE","-------- a direct msg");

        rabbitTemplate.convertAndSend("BASIC_TOPIC_EXCHANGE","shanghai.teacher","-------- a topic msg : shanghai.teacher");
        rabbitTemplate.convertAndSend("BASIC_TOPIC_EXCHANGE","changsha.student","-------- a topic msg : changsha.student");

        rabbitTemplate.convertAndSend("BASIC_FANOUT_EXCHANGE","","-------- a fanout msg");

    }
}

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * 消费者
 */
@Component
@RabbitListener(queues = "BASIC_FIRST_QUEUE")
public class FirstConsumer {

    @RabbitHandler
    public void process(String msg, Channel channel,long deliveryTag) throws IOException {
        channel.basicAck(deliveryTag, true);
        System.out.println(" first queue received msg : " + msg);
    }
}

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消费者
 */
@Component
@RabbitListener(queues = "BASIC_SECOND_QUEUE")
public class SecondConsumer {

    @RabbitHandler
    public void process(String msg){
        System.out.println(" second queue received msg : " + msg);
    }
}
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消费者
 */
@Component
@RabbitListener(queues = "BASIC_THIRD_QUEUE")
public class ThirdConsumer {

    @RabbitHandler
    public void process(String msg){
        System.out.println(" third queue received msg : " + msg);
    }
}

4、消费者消息自动转换

/**
 * 在消费端转换JSON消息
 * 监听类都要加上containerFactory属性
 * @param connectionFactory
 * @return
 */
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(new Jackson2JsonMessageConverter()); // 序列化方式
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 应答方式
    factory.setAutoStartup(true);
    return factory;
}

import com.gupaoedu.entity.Merchant;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.PropertySource;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
@PropertySource("classpath:mq.properties")
// 指定监听一个或多个队列
@RabbitListener(queues = "${com.firstqueue}", containerFactory="rabbitListenerContainerFactory")
public class FirstConsumer {

    // 处理逻辑,可以接收一个对象类型的消息
    @RabbitHandler
    public void process(@Payload Merchant merchant){
        System.out.println("First Queue received msg : " + merchant.getName());
    }

}

public class Merchant implements Serializable {
    int id; // 商户编号
    String name; // 商户名称
    String address; // 商户地址
    String accountNo; // 商户账号
    String accountName; // 户名
    String state; // 状态 1 激活 2 关闭
    String stateStr; // 状态中文
}

5、延迟消息

RabbitMQ实现延迟消息,RabbitMQ使用死信队列实现延迟消息,RabbitMQ延时队列插件

6、消息可靠投递

RabbitMQ保证消息的可靠投递,Java实现RabbitMQ消息的可靠投递,Springboot实现RabbitMQ消息的可靠投递

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/693495.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用VSCODE跑orbslam2踩的坑

我用的是ubuntu22.04&#xff0c;opencv是4.7&#xff0c;使用其他的库感觉就算版本不一样&#xff0c;也能跑。 一、运行build.sh能够产生可执行文件遇到的问题 1.由于opencv版本高带来的问题 这些问题怎么定位出现在哪些文件中&#xff0c;你通过命令行&#xff0c;运行下…

更灵活的CSS3新特性:帮你简化样式管理和优化网站性能

文章目录 I. 前言&#xff1a;介绍CSS3的进化和发展趋势CSS3的历史和版本CSS3的标准化和浏览器支持情况 II. 新的CSS选择器&#xff1a;扩展选择器的功能属性选择器&#xff1a;更多方式选择元素伪类和伪元素&#xff1a;更方便地定义样式 III. 改进的排版和布局&#xff1a;实…

在 EulerOS 系统中设置 Chrony 时间同步服务

以下是在 EulerOS 系统中设置 Chrony 时间同步服务的所有步骤。 1.查看系统版本 [rootservice11 ~]# cat /etc/redhat-release EulerOS release 2.0 (SP5)2.检查是否已安装chrony软件 [rootservice11 ~]# rpm -qa|grep chrony chrony-3.2-2.eulerosv2r7.x86_64如果没有安装…

Openlayers实战教程学习大纲及引导

本系列教程是Openlayers的实战教程&#xff0c;介绍Openlayes的一些基础知识&#xff0c;并重点讲述哪些地方是openlayers项目中常用的&#xff0c;给出具体示例&#xff0c;起到一个很好的引导学习作用。 版本说明 Openlayers的实战教程 分为**图文版** 和 **视频版**&#x…

【经验分享】全志科技官方Ubuntu16.04根文件系统镜像的替换和测试方法

本文主要基于全志A40i开发板——TLA40i-EVM&#xff0c;一款基于全志科技A40i处理器设计的4核ARM Cortex-A7高性能低功耗国产评估板&#xff0c;演示Ubuntu根文件系统镜像的替换和测试方法。 创龙科技TLA40i-EVM评估板接口资源丰富&#xff0c;引出双路网口、双路CAN、双路USB…

7.5_1散列查找(上)

基于一种数据结构&#xff1a; 散列表&#xff08;Hash Table&#xff09;&#xff0c;又称作哈希表 特点&#xff1a;数据元素的关键字与其存储地址直接相关 其实这个散列表也是基于数组实现的 加入19对13取余 加入再次插入1的话&#xff0c;塞不进去 数据元素不会直接存放到…

深入浅出设计模式 - 适配器模式

博主介绍&#xff1a; ✌博主从事应用安全和大数据领域&#xff0c;有8年研发经验&#xff0c;5年面试官经验&#xff0c;Java技术专家✌ Java知识图谱点击链接&#xff1a;体系化学习Java&#xff08;Java面试专题&#xff09; &#x1f495;&#x1f495; 感兴趣的同学可以收…

Presto(Trino)分布式(物理)执行计划的生成和调度

文章目录 1.前言2.物理执行生成(Stage)的生成2.1不同的调度分区策略2.1.1 Connector自己提供的分区策略2.1.2 Presto提供的Partition策略(SystemPartitioningHandle)&#xff1a; 2.2 为Stage创建StageScheduler2.2.1 普通的非bucket表的TableScan StageSplit 放置策略解析 2.2…

UE5.1.1 c++从0开始(14.用C++写UMG类)

先在这里放一个链接防止第一次看的朋友们不知道我在讲什么&#xff1a;https://www.bilibili.com/video/BV1nU4y1X7iQ/ 这一段的教程不难&#xff0c;唯一新建的C类是UMG的一个类。这个类用来写绑定在ai身上的血条。 总结一下一共做了什么事情&#xff1a; 给ai写了一个血条…

LeetCode Java两个单链表相交的一系列问题

题目描述 单链表可能有环&#xff0c;也可能无环。给定两个单链表的头节点 head1和head2&#xff0c;这两个链表可能相交&#xff0c;也可能不相交。 请实现一个函数&#xff0c;如果两个链表相交&#xff0c;请返回相交的第一个节点&#xff1b;如果不相交&#xff0c;返回n…

Android 渐变背景色

目录 一、背景 二、渐变 2.1 线性渐变背景色 1.新建资源文件 2.编辑样式文件 3.使用 4.编辑样式参数说明 2.2 圆角按钮渐变背景色 2.3 放射渐变 2.4 扫描线渐变 一、背景 单纯的颜色背景已经不能够满足UI大佬们的发挥&#xff0c;渐变色背景无疑成了一个炫技的方向。现在…

chatgpt赋能python:Python调用同一个类中方法详解

Python调用同一个类中方法详解 在Python编程中&#xff0c;类是一种非常重要的概念&#xff0c;可用于组织和管理代码。在同一个类中&#xff0c;可以定义多个方法。本文将详细介绍如何调用同一个类中的方法。 什么是类方法&#xff1f; 在Python中&#xff0c;类方法是指类…

魔兽世界自己架设任务

在魔兽世界中&#xff0c;玩家可以使用游戏内的任务编辑器自己架设任务来增加游戏的乐趣和挑战性。以下是详细的步骤&#xff1a; 第一步&#xff1a;打开任务编辑器 玩家可以在游戏中按下“ESC”键&#xff0c;进入游戏设置页面。在这个页面中&#xff0c;有一个“编辑器”选…

DSL查询分类与全文检索查询

DSL查询分类 Elasticsearch提供了基于JSON的DSL&#xff08;Domain Specific Language&#xff09;来定义查询。常见的查询类型包括&#xff1a; 查询所有&#xff1a;查询出所有数据&#xff0c;一般测试用。例如&#xff1a;match_all全文检索&#xff08;full text&#x…

Idea新建springboot项目遇到的问题及解决

1.更换阿里云 方法&#xff1a; 找到文件路径&#xff1a;Settings > Build,Execution,Deployment > Build Tools > Maven 如下图&#xff1a; 找到相应的settings文件 如果没有就新建一个同名文件&#xff0c;内容如下&#xff1a; <settings xmlns"h…

Gitlab回退到指定版本的方法与步骤

一、先根据分支获取代码 如下&#xff1a; 下载好后&#xff0c;通过右键菜单进入git bash here 就进入下面界面 去gitlab上面去寻找需要的faf0af86d24f7de73b024785ad864f36da4284e2 git reset --hard cf2a5283b9a79f8cf04b003d05cdd94b2b3ff166 执行命令“git push -f”&…

vue中对语句的语义进行比较

一、安装 string-similarity库 npm install string-similarity二、html <div><input type"text" v-model"string1" placeholder"文本1" /> </div> <div><input type"text" v-model"string2" p…

[计算机入门]了解键盘

2.1 了解键盘 键盘一般可以根据按键的功能进行分区&#xff0c;一般分为&#xff1a;主键盘区、小键盘区、控制键区、功能键区、指示灯区。下面介绍键盘的各个分区按键及功能。 2.1.1 主键盘区 主键盘区又叫打字键盘区或字符键区&#xff0c;具有标准英文打字机键盘的格式。…

【Java】Java 中的栈和堆内存

本文仅供学习参考&#xff01; 相关教程地址&#xff1a; https://www.cnblogs.com/whgw/archive/2011/09/29/2194997.html https://www.developer.com/java/stack-heap-java-memory/ https://zhuanlan.zhihu.com/p/529280783 Java 数据类型在执行过程中存储在两种不同形式的内…

HOT24-回文链表

leetcode原题链接&#xff1a;回文链表 题目描述 给你一个单链表的头节点 head &#xff0c;请你判断该链表是否为回文链表。如果是&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 示例 1&#xff1a; 输入&#xff1a;head [1,2,2,1] 输出&#xff1a…