代码实战深度理解RabbitMQ 5 种消息模型

news2025/1/13 13:27:17

5种消息模式

在这里插入图片描述

  • 简单消息模式:1个生产者 + 1个队列 + 1个消费者;生产者只负责生产,消费者只负责消费,两者在同一个队列中操作
  • 工作队列消息模式:1个生产者 + 1个队列 + 多个消费者; 一条消息只能被消费一次
  • 订阅消息模式之 fanout(订阅模式):1个生产者 + 1个交换机 + 多个队列 + 多个消费者,一条消息可以被多个消费者消费;
  • 订阅消息模式之durect/router(路由模式):1个生产者 + 1个交换机 + 多个队列 + 多个消费者 ,routingKey ,一条消息发送给符合 routingKey 的队列;
  • 订阅消息模式之topic(主题模式):通配符,#:匹配一个或者多个 *:一个词;

1.简单消息模式 simple Exchange

在这里插入图片描述

P:消息的生产者
C:消息的消费者
红色:队列

生产者将消息发送到队列,消费者从队列中获取消息。

需要导入RabbitMQ的客户端依赖

<dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
   <version>3.4.1</version>
</dependency>

创建工具类ConnectionUtil ,用来获取MQ的连接

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;

public class ConnectionUtil {

    public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("47.96.115.67");
        //端口
        factory.setPort(5672);
        //设置账号信息,用户名、密码、vhost
        //factory.setVirtualHost("testhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 通过工程获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
}

创建生产者发送消息到队列

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

    private final static String QUEUE_NAME = "q_test_01";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 从连接中创建通道
        Channel channel = connection.createChannel();

        // 声明(创建)队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 消息内容
        String message = "我是一条简单模型的消息";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        //关闭通道和连接
        channel.close();
        connection.close();
    }
}

生产者发送消息到队列后,可以在RabbitMQ的管理控制台中查看该队列的消息
在这里插入图片描述
消费者从队列中拿消息


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

    private final static String QUEUE_NAME = "q_test_01";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 从连接中创建通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);

        // 监听队列
        channel.basicConsume(QUEUE_NAME, true, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
        }
    }
}



当消费者从队列中将该消息拿出后,队列中的该消息就会被弹出队列.

此时我们在到控制台中查看该队列,可以发现该队列的消息已经被消费了,队列为空

在这里插入图片描述

2.工作消息模式 work Exchange

在这里插入图片描述

一个生产者、2个消费者。

一个消息只能被一个消费者获取。

**生产者:**我们来定义一个生产者向队列中发送100条信息;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        for (int i = 0; i < 100; i++) {
            // 消息内容
            String message = "工作消息" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");

            //睡眠时间逐渐递增
            Thread.sleep(i * 10);
        }

        channel.close();
        connection.close();
    }
}


消费者1:


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class RecvOne {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void start() throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 同一时刻服务器只会发一条消息给消费者
        //channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,false表示手动返回完成状态,true表示自动
        channel.basicConsume(QUEUE_NAME, true, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("消费者1获得" + message + "'");
            //休眠
            Thread.sleep(10);
            // 返回确认状态,注释掉表示使用自动确认模式
            //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }

}


消费者2:


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class RecvTwo {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void start() throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 同一时刻服务器只会发一条消息给消费者
        //channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,false表示手动返回完成状态,true表示自动
        channel.basicConsume(QUEUE_NAME, true, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("消费者2获得" + message + "'");
            // 休眠1秒
            Thread.sleep(1000);
            //下面这行注释掉表示使用自动确认模式
            //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}


编写线程启动类testMain,同时启动两个线程:


import java.util.concurrent.*;

public class testMain {
    public static void main(String[] args) {

        CountDownLatch countDownLatch = new CountDownLatch(2);
        //线程池的创建方式推荐使用ThreadPoolExecutor,这也是阿里官方推荐的方式,这样可以避免线程过多导致内存占用一直增加的问题
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                2,
                1,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
            threadPool.submit(()->{
                try {
                    RecvOne.start();
                    //后续在此处进行业务处理
                } catch (Exception e) {
                    e.printStackTrace();
                }finally {
                    countDownLatch.countDown();
                }
            });

        threadPool.submit(()->{
            try {
                RecvTwo.start();
                //后续在此处进行业务处理
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                countDownLatch.countDown();
            }
        });
        //关闭线程处理
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //关闭线程池
        threadPool.shutdown();
    }
    }

查看结果:
内容基本上是由线程睡眠时间决定的, 随眠时间长的获取的消息少,睡眠时间短的,获得的消息多.
但是可以看到大致上消费者1和消费者2获得的消息差不多是一样多的,然而代码中的线程2的睡眠时间要远大于线程1,出现这种差不多的结果并不是我们想实现的,应该根据线程的能力来分配消息
在这里插入图片描述
总结:

  1. 消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取。
  2. 消费者1和消费者2获取到的消息的数量是相同的,一个是消费奇数号消息,一个是偶数。
  • 其实,这样是不合理的,因为消费者1线程停顿的时间短。应该是消费者1要比消费者2获取到的消息多才对。 RabbitMQ 默认将消息顺序发送给下一个消费者,这样,每个消费者会得到相同数量的消息。即轮询(round-robin)分发消息。
  • 怎样才能做到按照每个消费者的能力分配消息呢?联合使用 Qos 和 Acknowledge 就可以做到。
    basicQos 方法设置了当前信道最大预获取(prefetch)消息数量为1。消息从队列异步推送给消费者,消费者的 ack 也是异步发送给队列,从队列的视角去看,总是会有一批消息已推送但尚未获得 ack 确认,Qos 的 prefetchCount 参数就是用来限制这批未确认消息数量的。设为1时,队列只有在收到消费者发回的上一条消息 ack 确认后,才会向该消费者发送下一条消息。prefetchCount 的默认值为0,即没有限制,队列会将所有消息尽快发给消费者。
  • 2个概念
  • 轮询分发 :使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易。在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。
  • 公平分发 :虽然上面的分配法方式也还行,但是有个问题就是:比如:现在有2个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而RabbitMQ则是不了解这些的。这是因为当消息进入队列,RabbitMQ就会分派消息。它不看消费者为应答的数目,只是盲目的将消息发给轮询指定的消费者。

为了解决这个问题,我们使用basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。
还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答。

Work模式的“能者多劳”

打开上述代码的注释:

// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
//开启这行 表示使用手动确认模式
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

同时改为手动确认:

// 监听队列,false表示手动返回完成状态,true表示自动
channel.basicConsume(QUEUE_NAME, false, consumer);

测试:
消费者1比消费者2获取的消息更多。

消息的确认模式

消费者从队列中获取消息,服务端如何知道消息已经被消费呢?

  • 模式1:自动确认:只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。
  • 模式2:手动确认:消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。

手动模式:
在这里插入图片描述
自动模式:
在这里插入图片描述

3.订阅模式 Fanout Exchange

在这里插入图片描述
在这里插入图片描述
解读:

  1. 1个生产者,多个消费者
  2. 每一个消费者都有自己的一个队列
  3. 生产者没有将消息直接发送到队列,而是发送到了交换机
  4. 每个队列都要绑定到交换机
  5. 生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的
    注意: 一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费

订阅模型中,通过交换机连接的多个队列之间的消息一致,区别在于消费者的不同,发送方的消息可以通过交换机将信息存储在各个队列中,消费者在对所属的队列进行消费,各个队列之间的消息互不影响.

在这里插入图片描述
生产者(看作是后台系统)
向交换机中发送消息。
注意 : 消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

    //定义所连接的交换机名称
    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        for (int i = 0; i < 100; i++) {
            // 消息内容
            String message = "工作消息" + i;
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");

            //睡眠时间逐渐递增
            Thread.sleep(i * 10);
        }

        channel.close();
        connection.close();
    }
}


消费者1(看作是前台系统)


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;


public class RecvOne {

    private final static String QUEUE_NAME = "test_queue_work1";

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void start() throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("消费者1:" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}


消费者2(看作是搜索系统)


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;


public class RecvTwo {

    private final static String QUEUE_NAME = "test_queue_work2";

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void start() throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("消费者2:" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }

}


测试类:



import java.util.concurrent.*;

public class testMain {
    public static void main(String[] args) {

        CountDownLatch countDownLatch = new CountDownLatch(2);
        //线程池的创建方式推荐使用ThreadPoolExecutor,这也是阿里官方推荐的方式,这样可以避免线程过多导致内存占用一直增加的问题
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                2,
                1,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
            threadPool.submit(()->{
                try {
                    RecvOne.start();
                    //后续在此处进行业务处理
                } catch (Exception e) {
                    e.printStackTrace();
                }finally {
                    countDownLatch.countDown();
                }
            });

        threadPool.submit(()->{
            try {
                RecvTwo.start();
                //后续在此处进行业务处理
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                countDownLatch.countDown();
            }
        });
        //关闭线程处理
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //关闭线程池
        threadPool.shutdown();
    }
    }

测试结果:
同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。

在管理工具中查看队列和交换机的绑定关系:
在这里插入图片描述

4.路由模式 Direct Exchange

路由模型就是通过配置,连接到交换机的路由名称进行匹配

交换机虽然可以连接多个队列,通过路由配置信息通过交换机后只会存储到对应路由路径的队列中
在这里插入图片描述
在这里插入图片描述
生产者:



import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

    //定义所连接的交换机名称
    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");


        String message = "路由1 消息传递";
        //配置消息所要送达的路径,通过这里的配置,该消息只会到”route1“路径下的队列中去
        channel.basicPublish(EXCHANGE_NAME, "route1", null, message.getBytes());
        System.out.println("[生产者] 生产"+message);

        channel.close();
        connection.close();
    }
}

消费者1:


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;


public class RecvOne {

    private final static String QUEUE_NAME = "test_queue_direct_1";

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void start() throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        //设置绑定到交换机的路径为”route2“
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "route2");


        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("消费者1 只能接收到来route2中的消息:" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}


消费者2:



import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;


public class RecvTwo {

    private final static String QUEUE_NAME = "test_queue_direct_2";

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void start() throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        //设置该队列包含路径 ”route1“、”route2“
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "route2");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "route1");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("消费者2 可以接收到来route1、route2中的消息:" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }

}


测试:
可以看到 无论生产者怎么生产数据,由于路由的绑定,产品只能进入direct2中给消费者2消费。

在这里插入图片描述

生产者配置:

        String message = "路由1 消息传递 ";
        channel.basicPublish(EXCHANGE_NAME, "route1", null, message.getBytes());
        System.out.println("[生产者] 生产"+message);

消费者1 配置:

        // 绑定队列到交换机
        //设置绑定到交换机的路径为”route2“
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "route2");

消费者2 配置:

        // 绑定队列到交换机
        //设置该队列包含路径 ”route1“、”route2“
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "route2");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "route1");

5.主题模式(通配符模式)Topic Exchange

主题模式:与路由模式类似,只不过是在路由模式的基础上,对路由匹配进行了解析配置
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,但是在单个队列中只有其中一个消费者实例会消费到消息。

生产者:


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;


public class Send {

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        // 消息内容
        String message = "Hello World!!";
        channel.basicPublish(EXCHANGE_NAME, "routekey.1", null, message.getBytes());
        System.out.println(" [生产者] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}


消费者1:


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;


public class RecvOne {

    private final static String QUEUE_NAME = "test_queue_topic_work_1";

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void start() throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routekey.*");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [消费者1] 获得 ' " + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}


消费者2:



import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class RecvTwo {

    private final static String QUEUE_NAME = "test_queue_topic_work_2";

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void start() throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [消费者2] 获得 '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}


多线程启动测试类:


import java.util.concurrent.*;

public class testMain {
    public static void main(String[] args) {

        CountDownLatch countDownLatch = new CountDownLatch(2);
        //线程池的创建方式推荐使用ThreadPoolExecutor,这也是阿里官方推荐的方式,这样可以避免线程过多导致内存占用一直增加的问题
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                2,
                1,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
            threadPool.submit(()->{
                try {
                    RecvOne.start();
                    //后续在此处进行业务处理
                } catch (Exception e) {
                    e.printStackTrace();
                }finally {
                    countDownLatch.countDown();
                }
            });

        threadPool.submit(()->{
            try {
                RecvTwo.start();
                //后续在此处进行业务处理
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                countDownLatch.countDown();
            }
        });
        //关闭线程处理
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //关闭线程池
        threadPool.shutdown();
    }
    }

测试结果:
可以看到消费者1、消费者2都通过配置的路由,匹配到了对应的路径队列,拿到了消息
在这里插入图片描述

Springboot集成RabbitMQ

  • springboot集成RabbitMQ非常简单,如果只是简单的使用配置非常少,springboot提供了spring-boot-starter-amqp对消息各种支持。

配置pom文件,主要是添加spring-boot-starter-amqp的支持

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>


配置application.properties文件

配置rabbitmq的安装地址、端口以及账户信息

spring.application.name=spirng-boot-rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest


1.简单队列

配置队列

package com.zpc.rabbitmq;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
    @Bean
    public Queue queue() {
        return new Queue("q_hello");
    }
}


** 发送者**

package com.zpc.rabbitmq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

@Component
public class HelloSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制
        String context = "hello " + date;
        System.out.println("Sender : " + context);
        //简单对列的情况下routingKey即为Q名
        this.rabbitTemplate.convertAndSend("q_hello", context);
    }
}


接收者

package com.zpc.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "q_hello")
public class HelloReceiver {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver  : " + hello);
    }
}


测试

package com.zpc.rabbitmq;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMqHelloTest {

    @Autowired
    private HelloSender helloSender;

    @Test
    public void hello() throws Exception {
        helloSender.send();
    }
}


2.多对多使用(Work模式)

注册两个Receiver:

package com.zpc.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "q_hello")
public class HelloReceiver2 {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver2  : " + hello);
    }

}


@Test
public void oneToMany() throws Exception {
    for (int i=0;i<100;i++){
        helloSender.send(i);
        Thread.sleep(300);
    }
}


public void send(int i) {
    String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制
    String context = "hello " + i + " " + date;
    System.out.println("Sender : " + context);
    //简单对列的情况下routingKey即为Q名
    this.rabbitTemplate.convertAndSend("q_hello", context);
}


3.Topic Exchange(主题模式)

  • topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列

首先对topic规则配置,这里使用两个队列(消费者)来演示。
配置队列,绑定交换机

package com.zpc.rabbitmq.topic;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TopicRabbitConfig {

    final static String message = "q_topic_message";
    final static String messages = "q_topic_messages";

    @Bean
    public Queue queueMessage() {
        return new Queue(TopicRabbitConfig.message);
    }

    @Bean
    public Queue queueMessages() {
        return new Queue(TopicRabbitConfig.messages);
    }

    /**
     * 声明一个Topic类型的交换机
     * @return
     */
    @Bean
    TopicExchange exchange() {
        return new TopicExchange("mybootexchange");
    }

    /**
     * 绑定Q到交换机,并且指定routingKey
     * @param queueMessage
     * @param exchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }
}


创建2个消费者
q_topic_message 和q_topic_messages

package com.zpc.rabbitmq.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "q_topic_message")
public class Receiver1 {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver1  : " + hello);
    }
}


package com.zpc.rabbitmq.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "q_topic_messages")
public class Receiver2 {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver2 : " + hello);
    }
}


消息发送者(生产者)

package com.zpc.rabbitmq.topic;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MsgSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send1() {
        String context = "hi, i am message 1";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("mybootexchange", "topic.message", context);
    }


    public void send2() {
        String context = "hi, i am messages 2";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("mybootexchange", "topic.messages", context);
    }
}


send1方法会匹配到topic.#和topic.message,两个Receiver都可以收到消息,发送send2只有topic.#可以匹配所有只有Receiver2监听到消息。
测试

package com.zpc.rabbitmq.topic;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitTopicTest {

    @Autowired
    private MsgSender msgSender;

    @Test
    public void send1() throws Exception {
        msgSender.send1();
    }

    @Test
    public void send2() throws Exception {
        msgSender.send2();
    }
}


4.Fanout Exchange(订阅模式)

  • Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
    1)配置队列,绑定交换机
package com.zpc.rabbitmq.fanout;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutRabbitConfig {

    @Bean
    public Queue aMessage() {
        return new Queue("q_fanout_A");
    }

    @Bean
    public Queue bMessage() {
        return new Queue("q_fanout_B");
    }

    @Bean
    public Queue cMessage() {
        return new Queue("q_fanout_C");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("mybootfanoutExchange");
    }

    @Bean
    Binding bindingExchangeA(Queue aMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(aMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(Queue bMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(bMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(Queue cMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(cMessage).to(fanoutExchange);
    }
}


2)创建3个消费者

package com.zpc.rabbitmq.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "q_fanout_A")
public class ReceiverA {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("AReceiver  : " + hello + "/n");
    }
}


package com.zpc.rabbitmq.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "q_fanout_B")
public class ReceiverB {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("BReceiver  : " + hello + "/n");
    }
}


package com.zpc.rabbitmq.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "q_fanout_C")
public class ReceiverC {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("CReceiver  : " + hello + "/n");
    }
}


3)生产者

package com.zpc.rabbitmq.fanout;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MsgSenderFanout {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "hi, fanout msg ";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("mybootfanoutExchange","", context);
    }
}


4)测试

package com.zpc.rabbitmq.fanout;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitFanoutTest {

    @Autowired
    private MsgSenderFanout msgSender;

    @Test
    public void send1() throws Exception {
        msgSender.send();
    }
}


结果如下

三个消费者都收到消息:
AReceiver : hi, fanout msg
CReceiver : hi, fanout msg
BReceiver : hi, fanout msg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/557989.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AI作曲都这么厉害了,AI生成音乐

人工智能&#xff08;AI&#xff09;正在越来越多地应用于音乐、电影和绘画等艺术领域。在之前海森大数据已经为大家介绍了AI生成绘画&#xff0c;今天带大家了解一下AI生成音乐。 在当下的应用中&#xff0c;AI音乐生成已经相对成熟&#xff0c;AI已经可以影响音乐制作过程的…

annoconda安装使用及镜像源的添加,提高软件下载速度

1 annoconda下载 文件地址列表&#xff0c;选择版本下载https://repo.anaconda.com/archive/ win10版本&#xff1a; Anaconda3-2023.03-1-Windows-x86_64 linux版本&#xff1a; Anaconda3-2023.03-1-Linux-x86_64 win10下执行exe按向导安装&#xff0c;linu…

人工智能和物联网:如何将传感器和设备数据与机器学习相结合

第一章&#xff1a;引言 人工智能&#xff08;Artificial Intelligence, AI&#xff09;和物联网&#xff08;Internet of Things, IoT&#xff09;是当今科技领域最引人注目的技术之一。随着传感器和设备的普及&#xff0c;我们能够收集到大量的实时数据。然而&#xff0c;这…

chatgpt赋能Python-python_head__

Python的head()方法 什么是head()方法&#xff1f; head()方法是Python编程语言中的一个函数&#xff0c;它用于获取一个序列的前几项。它的用法如下&#xff1a; head(n, iterable)其中&#xff0c;n表示需要返回的序列前n项&#xff0c;iterable表示需要获取前n项的序列对…

2023年海彼特全国幼儿篮球联赛·总决赛圆满落幕

5月21日&#xff0c;由北京海彼特教育科技院主办的“2023年海彼特全国幼儿篮球联赛总决赛”。在河北体育馆隆重举行&#xff0c;精彩的比赛效果使体育馆顿时成为幼儿篮球界最高端、大气的舞台。 本次盛会联合举办方有&#xff1a; 河北体育馆 亚洲少儿体育协会 北京海彼特文…

【Linux】signal 和 sigaction 两个信号捕捉函数

目录 signal 信号捕捉函数1、函数解析2、代码示例 sigaction 信号捕捉函数1、函数解析2、代码示例 内核实现信号捕捉的过程 sigaction的用法要复杂一些&#xff0c;但一般都是用sigaction&#xff0c;signal依据不同的标准可能有不同的用法变化&#xff0c;sigaction比较稳定&a…

易基因:全基因组DNA甲基化分析揭示DNMT1在斑马鱼模型听觉系统发育中的作用 | 胚胎发育

大家好&#xff0c;这里是专注表观组学十余年&#xff0c;领跑多组学科研服务的易基因。 听力障碍通常与内耳发育不全或损伤有关&#xff0c;是影响生活质量的严重健康问题。因此研究听觉器官发生过程中的关键基因对于探索听力损伤的潜在策略至关重要。斑马鱼模型在理解内耳发…

C++ Primer笔记——查找算法

目录 一.简单查找 ①find(first, last, val); ②find_if & find_if_not ③count & count_if ④all_of & any_of & none_of 二.重复值的查找 ①adjacent_find(first, end); ②search_n(first, end, count, val); 三.查找子序列 ①search(first1, end1,…

基于html+css的图展示86

准备项目 项目开发工具 Visual Studio Code 1.44.2 版本: 1.44.2 提交: ff915844119ce9485abfe8aa9076ec76b5300ddd 日期: 2020-04-16T16:36:23.138Z Electron: 7.1.11 Chrome: 78.0.3904.130 Node.js: 12.8.1 V8: 7.8.279.23-electron.0 OS: Windows_NT x64 10.0.19044 项目…

Redis事务和Redis管道

什么是Redis事务:Redis事务是指将多条命令加入到队列里面&#xff0c;一次批量执行多条命令&#xff0c;每一条命令会按顺序执行&#xff0c;在事务执行过程中不会受到客户端所传入的命令请求的影响 1)单独的隔离操作:Redis的事务仅仅保证事务 里面的操作会被连续独占的执行&am…

【Rust 日报】2023-05-21 Helix 23.05发布

Helix 23.05发布 Helix 是个文本编辑器&#xff1a; 新版本功能&#xff1a; 为 LSP 引用请求添加一个配置选项&#xff0c;用于排除声明&#xff08;&#xff03;6886&#xff09;。根据文件扩展名和 shebang 启用语言注入&#xff08;&#xff03;3970&#xff09;。通过最近…

Vision-CAIR/MiniGPT-4:使用先进的大型语言模型增强视觉-语言理解

Vision-CAIR/MiniGPT-4&#xff1a;使用先进的大型语言模型增强视觉-语言理解 摘要 视觉-语言理解是人工智能领域的一个重要方向&#xff0c;它涉及到图像和文本之间的复杂交互。近年来&#xff0c;大型语言模型&#xff08;LLM&#xff09;在自然语言处理&#xff08;NLP&am…

【PCIE732】基于 Kintex UltraScale 系列FPGA 的2 路40G 光纤通道适配器(5GByte/s 带宽)/XCKU060

板卡概述 PCIE732 是一款基于PCIE 总线架构的高性能数据传输卡&#xff0c;板卡具有1 个PCIex8 主机接口、2 个QSFP40G 光纤接口&#xff0c;可以实现2路QSFP 40G 光纤的数据实时采集、传输。板卡采用Xilinx 的高性能Kintex UltraScale 系列FPGA 作为实时处理器&#xff0c;板…

【Nginx】反向代理

文章目录 Nginx反向代理概述Nginx反向代理的配置语法proxy_passproxy_set_headerproxy_redirect Nginx反向代理实战Nginx的安全控制如何使用SSL对流量进行加密nginx添加SSL的支持Nginx的SSL相关指令生成证书开启SSL实例 反向代理系统调优 Nginx反向代理概述 关于正向代理和反向…

IIC-EEPROM实验

IIC I2C介绍I2C物理层&#xff08;内部结构&#xff09;I2C协议层数据有效性起始和结束信号应答响应时序图 数据传输软件模拟IIC使用方法产生IIC起始信号产生IIC停止信号产生ACK应答产生nack非应答等待应答信号到来IIC发送一个字节IIC读一个字节 AT24C02介绍硬件设计软件设计实…

chatgpt赋能Python-python_if_非

Python中的if非语句在SEO中的重要性 在Python编程中&#xff0c;if非语句是必不可少的一部分。它让程序员能够编写条件语句&#xff0c;根据不同的条件执行不同的代码。但你知道吗&#xff1f;if非语句也可以对SEO&#xff08;搜索引擎优化&#xff09;产生深远的影响。 什么…

cannot read system data from XML file

最近在使用ccs进行debug仿真时&#xff0c;不知道为什么一直报错&#xff0c;或者偶尔能够正常下载程序。一些报错情况如下&#xff1a; One or more sections of your program falls into a memory region that is not writable. Invalid Target Configuration file 有可能…

【容器化应用程序设计和开发】2.7 云原生开发工具和框架

2.7 云原生开发工具和框架 今天我们就简单来讲一下云原生下用到的开发工具和一些基本的框架。云原生开发工具和框架是为了支持现代化的应用程序开发&#xff0c;能够简化云原生应用程序的构建、部署、管理和维护。下面是一些常见的云原生开发工具和框架&#xff1a; Kubernetes…

给初学者的Vue.js项目搭建教程

部分数据来源&#xff1a;ChatGPT 1. 环境准备 在开始创建 Vue.js 项目前&#xff0c;需要保证已经安装了 Node.js&#xff08;建议版本12&#xff09;和 NPM&#xff08;Node.js 自带的包管理工具&#xff09;。 可以执行以下命令确认是否已经安装&#xff1a; node -v np…

数字孪生智慧灯杆,“多杆合一”降本增效

随着智慧城市建设的不断深入&#xff0c;智慧灯杆作为城市基础设施的重要组成部分&#xff0c;正在成为城市智能化和绿色化的重要手段之一。 图扑智慧灯杆系统在城市道路照明领域引入信息化手段&#xff0c;通过构建路灯物联网&#xff0c;实现了现代化的路灯按需维修和按需照…