RabbitMQ 消息类型

news2024/11/26 10:29:32

RabbitMQ 消息类型

下面我们简单介绍下RabbitMQ的一些消息种类,并结合Java代码进行学习。

如果需要执行代码,需要下载RabbitMQ的客户端(例如java客户端: https://www.rabbitmq.com/java-client.html)
使用maven:

<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.16.0</version>
</dependency>

注意先启动消费者,在启动生产者
5.x 版本系列需要 JDK 8
java-client 的文档:https://rabbitmq.github.io/rabbitmq-java-client/api/current/index.html

创建一个连接工具类ConnectionUtil

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 类描述:
 * 连接工具类
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 13:49
 */
public class ConnectionUtil {

    /**
     * 获取MQ的连接
     * @return
     */
    public static Connection getConnection() throws IOException, TimeoutException {
        // 定义一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置服务地址
        factory.setHost("localhost");
        // AMQP 5672
        factory.setPort(5672);
        // vhost
        factory.setVirtualHost("/vhost01");
        // 用户名
        factory.setUsername("admin");
        // 密码
        factory.setPassword("123456");
        return factory.newConnection();
    }
}

simple 简单队列

在这里插入图片描述

P:消息生产者
红色:队列
C:消费者
3个对象:生产者 队列 消费者

生产者直接发送消息到队列,消费者直接从队列获取消息。发送消息时,只需要指定队列,不需要指定交换机,以及路由key,只有一个消费者

示例

  1. 消息生产者:Send
import com.goudong.modules.rabbitmq.demo.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 类描述:
 * 简单队列,消息生产者
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 13:58
 */
public class Send {
    private static final String QUEUE_NAME="test_simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取一个连接
        Connection connection = ConnectionUtil.getConnection();

        // 从连接中获取一个通道
        Channel channel = connection.createChannel();

        // 声明一个队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        String message = "hello simple !";

        // 第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

        System.out.println("send message");

        channel.close();
        connection.close();
    }
}
  1. 消息消费者:Recv
import com.goudong.modules.rabbitmq.demo.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 类描述:
 * 简单队列,消息消费者
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 14:00
 */
public class Recv {
    private static final String QUEUE_NAME="test_simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取连接
        Connection connection = ConnectionUtil.getConnection();

        // 创建通道
        Channel channel = connection.createChannel();

        // 队列声明
        channel.queueDeclare(QUEUE_NAME, false, false,false, null);

        // 定义队列的消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            // 获取到达的消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msgString = new String(body,"utf-8");
                System.out.println("new api recv: " + msgString);
            }
        };

        // 监听队列
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}

简单队列的不足:耦合性高,生产者一一对应消费者(如果我想有多个消费者消费队列中消息,这时候就不行),队列名变更,这时候得同时变更。

work queues 工作队列

在这里插入图片描述

工作队列可以细分为轮询分发和公平分发。发送消息时,只需要指定队列,不需要指定交换机,以及路由key,设定多个消费者

为什么会出现工作队列,因为simple 队列是一一对应的,而且我们实际开发,生产者发送消息是毫不费力的,而消费者一般是要跟业务相结合的,消费者接收到消息之后就需要处理,可能需要花费时间,这时候队列就会积压了很多消息。

round robin 轮询分发

  1. 消息生产者:Send
import com.goudong.modules.rabbitmq.demo.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 类描述:
 * 工作队列,轮询分发,消息生产者
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 14:12
 */
public class Send {
    public static final String QUEUE_NAME = "test_round_robin_work_queue";

    /**
     *                  |--> C2
     * P ---> Queue ----|
     *                  |--> C1
     * @param args
     * @throws IOException
     * @throws TimeoutException
     */
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 获取连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取channel
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 发送消息
        for (int i = 0; i < 50; i++) {
            String msg= "send hello " + i;
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            System.out.println("【WQ】 send msg = " + msg);
            Thread.sleep(i*20);
        }
        // 关闭资源
        channel.close();
        connection.close();
    }
}
  1. 消息消费者:Recv1
import com.goudong.modules.rabbitmq.demo.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 类描述:
 * 工作队列,轮询分发,消息消费者1号
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 14:14
 */
public class Recv1 {
    public static final String QUEUE_NAME = "test_round_robin_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取channel
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false,null);

        // 定义一个消费者
        Consumer consumer = new DefaultConsumer(channel) {
            // 消息到达,触发这个方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("Recv [1] msg = " + msg);
                try{
                    Thread.sleep(2000);
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }finally{
                    System.out.println("Recv [1] done!");
                }
            }
        };

        // 监听队列
        boolean autoAck = true;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}
  1. 消息消费者:Recv2
import com.goudong.modules.rabbitmq.demo.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 类描述:
 * 工作队列,轮询分发,消息消费者2号
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 14:14
 */
public class Recv2 {
    public static final String QUEUE_NAME = "test_round_robin_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取channel
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false,null);

        // 定义一个消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            // 消息到达,触发这个方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("Recv [2] msg = " + msg);
                try{
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally{
                    System.out.println("Recv [2] done!");
                }
            }
        };

        // 监听队列
        boolean autoAck = true;
        channel.basicConsume(QUEUE_NAME, autoAck, defaultConsumer);
    }
}

现象:

  • 消费者1 和消费者2处理消息的数量是一样的。
  • 消费者1:偶数。
  • 消费者2:奇数。
  • 这种方式叫做轮询分发(round-robin)结果就是不管谁忙或者谁清闲 都不会多给一个消息,任务总是你一个我一个。

fair dispatch 公平分发

公平分发,需要消费者进行手动回执

// MQ一次只发一个请求给消费者,当消费者处理完消息后会手动回执,然后MQ再发一个消息给消费者
channel.basicQos(1);

boolean autoAck = false; //false 手动回执,处理完消息后,告诉MQ
channel.basicConsume(QUEUE_NAME, autoAck, defaultConsumer);
  1. 消息生产者:Send
import com.goudong.modules.rabbitmq.demo.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 类描述:
 * 工作队列,公平分发,消息生产者
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 14:30
 */
public class Send {

    public static final String QUEUE_NAME = "test_fair_dispatch_work_queue";

    /**
     *                  |--> C2
     * P ---> Queue ----|
     *                  |--> C1
     * @param args
     * @throws IOException
     * @throws TimeoutException
     */
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 获取连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取channel
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        /**
         * 每个消费者 发送确认消息之前,消息队列不发送下一个消息到消费者,一次只处理一个消息
         * 限制发送给同一个消费者 不得超过一条消息
         */
        channel.basicQos(1);

        // 发送消息
        for (int i = 0; i < 50; i++) {
            String msg= "send hello " + i;
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            System.out.println("【WQ】 send msg = " + msg);
            Thread.sleep(i*5);
        }
        // 关闭资源
        channel.close();
        connection.close();
    }
}
  1. 消息消费者:Recv1
import com.goudong.modules.rabbitmq.demo.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 类描述:
 * 工作队列,公平分发,消息消费者1号
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 14:14
 */
public class Recv1 {
    public static final String QUEUE_NAME = "test_fair_dispatch_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取channel
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false,null);

        channel.basicQos(1);

        // 定义一个消费者
        Consumer consumer = new DefaultConsumer(channel) {
            // 消息到达,触发这个方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("Recv [1] msg = " + msg);
                try{
                    Thread.sleep(2000);
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }finally{
                    System.out.println("Recv [1] done!");

                    // 手动回执 false 表示只确认 envelope.DelivertTag 这条消息,true 表示确认 小于等于 b.DelivertTag 的所有消息(批量确认)
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        // 监听队列
        // boolean autoAck = true; //自动应答
        boolean autoAck = false; //手动应答
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}
  1. 消息消费者:Recv2
import com.goudong.modules.rabbitmq.demo.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 类描述:
 * 工作队列,公平分发,消息消费者2号
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 14:14
 */
public class Recv2 {
    public static final String QUEUE_NAME = "test_fair_dispatch_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取channel
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false,null);

        channel.basicQos(1);
        // 定义一个消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            // 消息到达,触发这个方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("Recv [2] msg = " + msg);
                try{
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally{
                    System.out.println("Recv [2] done!");
                    // 手动回执 false 表示只确认 envelope.DelivertTag 这条消息,true 表示确认 小于等于 b.DelivertTag 的所有消息(批量确认)
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        // 监听队列
        boolean autoAck = false; //false 手动回执
        channel.basicConsume(QUEUE_NAME, autoAck, defaultConsumer);
    }
}

现象:消费者2 处理的消息比消费者1多,能者多劳

publish/subscribe 发布-订阅模式

在这里插入图片描述

  • 一个生产者,多个消费者,需要新建fanout交换机
  • 每个消费者都有自己的队列,并绑定到交换机上
  • 生产者没有直接把消息发送到队列,而是发送到交换机
  • 消息发送时需要指定交换机,消息接收时需要指定队列
  • 生产者发送的消息,经过交换机,到达队列。就能实现一个消息被多个消费者消费

注册成功时,既要发邮件,又要发短信

示例

  1. 消息生产者:Send
import com.goudong.modules.rabbitmq.demo.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 类描述:
 * 发布订阅模式队列,消息生产者
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 14:51
 */
public class Send {

    public static final String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //分发

        // 发送消息
        String msg = "hello ps";
        channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
        System.out.println("Send msg = " + msg);
        channel.close();
        connection.close();
    }
}

在这里插入图片描述

消息哪去了?? 丢失了,因为交换机没有存储的能力,在rabbitmq里面只有队列有存储的能力。因为还没有队列绑定到这个交换机,所以数据丢失了。

  1. 消息消费者:Recv1
import com.goudong.modules.rabbitmq.demo.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 类描述:
 * 发布订阅模式队列,消息消费者1号
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 14:59
 */
public class Recv1 {

    public static final String QUEUE_NAME = "test_queue_fanout_email";

    public static final String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取channel
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false,null);

        // 保证一次只分发一个
        channel.basicQos(1);

        // 绑定到交换机 转发器
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 定义一个消费者
        Consumer consumer = new DefaultConsumer(channel) {
            // 消息到达,触发这个方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("Recv [1] msg = " + msg);
                try{
                    Thread.sleep(2000);
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }finally{
                    System.out.println("Recv [1] done!");
                    // 手动回执 false 表示只确认 envelope.DelivertTag 这条消息,true 表示确认 小于等于 b.DelivertTag 的所有消息(批量确认)
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        // 监听队列
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}
  1. 消息消费者:Recv2
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 类描述:
 * 发布订阅模式队列,消息消费者1号
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 14:59
 */
public class Recv2 {

    public static final String QUEUE_NAME = "test_queue_fanout_sms";

    public static final String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取channel
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false,null);

        // 绑定到交换机 转发器
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        // 保证一次只分发一个
        channel.basicQos(1);
        // 定义一个消费者
        Consumer consumer = new DefaultConsumer(channel) {
            // 消息到达,触发这个方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("Recv [2] msg = " + msg);
                try{
                    Thread.sleep(2000);
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }finally{
                    System.out.println("Recv [2] done!");
                    // 手动回执 false 表示只确认 envelope.DelivertTag 这条消息,true 表示确认 小于等于 b.DelivertTag 的所有消息(批量确认)
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        // 监听队列
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}

现象:消费者1和消费者2都受到了消息。

在这里插入图片描述

routing 路由选择通配符模式

  • direct交换机类型
  • 生产者需要将交换机和routing key绑定
  • 消费者需要将队列,交换机,routing key 三者绑定
  • 每个消息会根据不同的Routing key,发送到不同的消费者队列
  • 不支持通配符(*,#

在这里插入图片描述

示例

  1. 消息生产者:Send
import com.goudong.modules.rabbitmq.demo.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 类描述:
 * 路由模式队列,消息生产者
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 15:12
 */
public class Send {
    public static final String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        // exchange direct:直连
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String msg = "hello direct !";
        // routing key
        String routingKey = "info";
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());

        System.out.println("send :" + msg);

        channel.close();
        connection.close();
    }
}
  1. 消息消费者:Recv1
import com.goudong.modules.rabbitmq.demo.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 类描述:
 * 路由模式队列,消息消费者1号
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 15:13
 */
public class Recv1 {
    public static final String EXCHANGE_NAME = "test_exchange_direct";

    public static final String QUEUE_NAME = "test_queue_direct_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false,null);

        // 绑定队列到交换机,并绑定 routingKey
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");

        // 保证一次只分发一个
        channel.basicQos(1);

        // 定义一个消费者
        Consumer consumer = new DefaultConsumer(channel){
            // 消息到达,触发这个方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("Recv [1] msg = " + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally{
                    // 手动回执 false 表示只确认 envelope.DelivertTag 这条消息,true 表示确认 小于等于 b.DelivertTag 的所有消息(批量确认)
                    channel.basicAck(envelope.getDeliveryTag(), false);
                    System.out.println("Recv [1] done!");
                }
            }
        };

        // 监听队列 autoAck(消息应答):false 手动回执 (消息回执 channel.basicAck(envelope.getDeliveryTag(), false);)
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}
  1. 消息消费者:Recv2
import com.goudong.modules.rabbitmq.demo.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 类描述:
 * 路由模式队列,消息消费者2号
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 15:13
 */
public class Recv2 {
    public static final String EXCHANGE_NAME = "test_exchange_direct";

    public static final String QUEUE_NAME = "test_queue_direct_2";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false,null);

        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "waring");

        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("Recv [2] msg = " + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally{
                    // 手动回执 false 表示只确认 envelope.DelivertTag 这条消息,true 表示确认 小于等于 b.DelivertTag 的所有消息(批量确认)
                    channel.basicAck(envelope.getDeliveryTag(), false);
                    System.out.println("Recv [2] done!");
                }
            }
        };

        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

现象:队列绑定的路由key和消息发送时指定的路由key匹配时,才会接收到消息。

Topics 主题

  • Topic交换机类型,生产者需要申明topic交换机,并指定routing key。
  • ​和路由模式类似,但是Topic可以支持通配符,# 匹配一个或者多个字符;​* 匹配一个字符

在这里插入图片描述

在这里插入图片描述

商品:发布、删除、修改、查询

  1. 消息生产者:Send
import com.goudong.modules.rabbitmq.demo.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 类描述:
 * 主题队列,消息生产者
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 15:21
 */
public class Send {
    public static final String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        // 声明交换机 topic:主题模式
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String message = "商品...";

        // goods.delete 只有消费者2号能收到(goods.#)
        channel.basicPublish(EXCHANGE_NAME, "goods.delete", null, message.getBytes());
        // 两个消费者都能收到
        // channel.basicPublish(EXCHANGE_NAME, "goods.add", null, message.getBytes());

        System.out.println("send message = " + message);

        channel.close();
        connection.close();
    }
}
  1. 消息消费者:Recv1
import com.goudong.modules.rabbitmq.demo.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 类描述:
 * 主题队列,消息消费者1号
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 15:21
 */
public class Recv1 {
    public static final String EXCHANGE_NAME = "test_exchange_topic";

    public static final String QUEUE_NAME = "test_queue_topic_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false,false,false,null);

        //绑定 商品新增 goods.add
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add");

        channel.basicQos(1);

        //定义消费者
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("[1] recv msg: " + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally{
                    System.out.println("[1] recv done!");
                    // 手动回执 false 表示只确认 envelope.DelivertTag 这条消息,true 表示确认 小于等于 b.DelivertTag 的所有消息(批量确认)
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        // 监听队列
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}
  1. 消息消费者:Recv2
import com.goudong.modules.rabbitmq.demo.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 类描述:
 * 主题队列,消息消费者2号
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 15:21
 */
public class Recv2 {
    public static final String EXCHANGE_NAME = "test_exchange_topic";

    public static final String QUEUE_NAME = "test_queue_topic_2";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false,false,false,null);

        //绑定 goods.# (`#` 匹配一个或者多个字符)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");

        channel.basicQos(1);

        //定义消费者
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("[2] recv msg: " + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally{
                    System.out.println("[2] recv done!");
                    // 手动回执 false 表示只确认 envelope.DelivertTag 这条消息,true 表示确认 小于等于 b.DelivertTag 的所有消息(批量确认)
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        // 监听队列
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

现象:只有满足生产者指定的路由模式,才会将消息发送到队列。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/143551.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

聊天突然尬住?教你用Python一键获取斗图表情包,各种表情包轻松化解尴尬

很多兄弟在聊天上没有下太多的功夫&#xff0c;导致自己聊天的时候很容易尬住&#xff0c;然后就不知道聊啥了&#xff0c;这时候合适表情包分分钟就能救场&#xff0c;但是一看自己收藏的表情包&#xff0c;好家伙&#xff0c;两只手都数得过来。 所以今天来给兄弟们分享一下…

IDEA书签,备份使用,全分支共享

IDEA原本设计就是给单分支保存书签使用的&#xff0c;但是我比较喜欢多个分支用同一个IDEA书签。然后在网上找到很久的IDEA书签全分支共享的办法&#xff0c;真心没找到合适的&#xff0c;自己浅浅总结了一下。首先找到我们需要备份书签的项目的目录然后在项目的目录下打开隐藏…

小程序 超长页面截图保存web-view+html2canvas

web-view文档建议参考----支付宝提供的文档&#xff0c;html2canvas官方文档&#xff08;官网可以下载html2canvas.js 和 html2canvas.min.js&#xff09;。由于篇幅受限&#xff0c;这里就贴了一下用法&#xff0c;对于web-view的配置情况&#xff0c;需要自己去查看文档&…

【测试】开发模型+测试模型

努力经营当下&#xff0c;直至未来明朗&#xff01; 文章目录一、开发模型和测试模型概述二、 开发模型一&#xff09; 瀑布模型二&#xff09;螺旋模型三&#xff09;增量模型和迭代模型四&#xff09;敏捷模型【重点:sunny:】三、 测试模型一&#xff09;V模型二&#xff09;…

【4.2】Ribbon负载均衡策略

【4.2】Ribbon负载均衡策略1 Ribbon--负载均衡策略2.1 修改负载均衡规则--代码方式2.1.1 具体测试&#xff1a;2.2 修改负载均衡规则--配置文件方式2.2.1 具体配置3 总结Ribbon负载均衡原理 中学习到&#xff1a; IRule接口决定了负载均衡的策略。 接下来学习IRule接口的实现有…

【Java编程进阶】Object类及常用方法详解

Java 编程基础教程系列&#xff1a;Java 编程进阶之路【从入门到精通】 &#xff0c;从入门到精通一站学习&#xff0c;买不了吃亏&#xff0c;买不了上当&#xff01;&#xff01; 文章目录1. Object类2. 常用的方法2.1 toString 方法2.2 equals 方法2.3 hashcode 方法3. 注意…

Verilog语法笔记(夏宇闻第三版)-数据类型及其常量、变量

目录 常量&#xff1a; 整数&#xff1a; x和z值: 负数: 下划线(underscore_)&#xff1a; 参数(Parameter)型&#xff1a; 变量&#xff1a; wire型&#xff1a; reg型&#xff1a; memory型&#xff1a; Verilog HDL中总共有十九种数据类型,数据类型是用来表示数字…

SSH远程连接服务详解

远程连接服务器 一&#xff0c;远程连接服务器简介 1、什么是远程连接服务器 远程连接服务器通过文字或图形接口方式来远程登录系统&#xff0c;让你在远程终端前登录 linux 主机以取得可操作主机接口&#xff08;shell&#xff09;&#xff0c;而登录后的操作感觉就像是坐在…

MyBatis Plus学习笔记

MyBatis Plus 国产的开源框架&#xff0c;基于 MyBatis 在Mybatis-Plus中&#xff0c;内置了代码生成器&#xff0c;我们可以通过该工具&#xff0c;生成我们需要的代码&#xff0c;例如&#xff1a;entity层&#xff0c;controller层&#xff0c;mapper层&#xff0c;service…

Java面向对象:构造器、this

目录构造器学构造器的目的构造器的作用样例构造器的注意事项总结this关键字this关键字是什么样例this关键字的作用总结构造器 学构造器的目的 真正知道对象具体是通过调用什么代码得到的。能够掌握为对象赋值的其他简便写法。为以后学习面向对象编程的其他内容做支撑。 构造…

Python实现可视化案例:采集天气数据并可视化分析

前言 最近长沙的天气&#xff0c;真的就是不能理解&#xff0c;大起大落的&#xff0c;就跟我的心情一样… 有点无聊就来采集一些天气数据&#xff0c;做个可视化的小案例吧&#xff08;我采集的是以前北上广深的天气数据哈&#xff09; 实现案例的步骤 一.分析数据来源 从…

狂神说笔记——Linux快速入门27

Linux快速入门 参考于&#xff1a;B站狂神视频&#xff01; Java开发之路&#xff1a;JavaSE、MySQL、前端&#xff08;HTML、Css、JS&#xff09;、JavaWeb、SSM框架、SpringBoot、Vue、SpringCloud、Mybatis-plus、Git、Linux &#xff08;CentOS 7&#xff09; 操作系统&…

【Linux】-- 程序地址空间

目录 程序地址空间 进程地址空间 - 虚拟地址空间 概念引入&#xff08;浅&#xff09; 初步理解结构 深入理解虚拟地址 为什么要有地址空间&#xff1f; 程序地址空间的角度理解挂起 程序地址空间 C/C在Linux下的程序地址空间分布&#xff1a; 栈向低地址增长&#xff0…

透过现象看本质,我找到了Netty粘包与半包的这几种解决方案

1、粘包与半包 啥也不说了&#xff0c;直接上代码是不是有点不太友好&#xff0c;我所谓了&#xff0c;都快过年了&#xff0c;还要啥自行车 我上来就是一段代码猛如虎 1.1 服务器代码 public class StudyServer {static final Logger log LoggerFactory.getLogger(StudyS…

怎样进行股票量化对冲策略分析?

股票量化对冲策略的分析需要从各方面去深入了解&#xff0c;就比如说明确量化和对冲的概念&#xff0c;可以先下载OA系统中“量化对冲 产品基础知识的学习&#xff0c;也要知道量化对冲产品在构建股票多头的同时&#xff0c;也构建期货空头。在市场不稳定的操作情绪之下&#x…

Git——初识git

1、git概述 1.1 简介 Git 是一个免费的、开源的分布式版本控制系统&#xff0c;可以快速高效地处理从小型到大型的各种 项目。 Git 易于学习&#xff0c;占地面积小&#xff0c;性能极快。 它具有廉价的本地库&#xff0c;方便的暂存区域和多个工作 流分支等特性。其性能优于…

rtl8188eus Linux驱动移植

rtl8188eus Linux驱动移植 rlt8188eus作为无线USB网卡&#xff0c;可以给我们的Linux设备提供无线上网能力&#xff0c;也能配置为AP&#xff0c;给其它无线设备提供上网能力。在使用较低版本的内核时&#xff0c;内核中不含rtl8188eus驱动&#xff0c;因此尝试自己移植&#…

1、常见的存储设备

文章目录较为常见的存储设备机械硬盘简介固态硬盘简介U盘简介固态U盘MMC卡SD卡简介TF卡NM卡MS卡CF卡CFExpress卡磁带光盘较为常见的存储设备 目前较为常见的存储设备&#xff0c;从电脑用的&#xff1a; 机械硬盘固态硬盘U盘固态U盘 到单反相机、运动相机、手机、行车记录仪…

BigDecimal 基本使用和常用方法

背景 涉及到比较大的数字之间的计算&#xff0c;使用float、double这样的浮点数就不那么准确了。因为不论是float 还是double都是浮点数&#xff0c;而计算机是二进制的&#xff0c;浮点数会失去一定的精确度。所以在商业计算中基本要用java.math.BigDecimal 一、初始化使用…

蹭秦霄贤流量,郭德纲凌晨时分转发老秦动态

都知道德云社董事长王慧很忙&#xff0c;每天除了打理公司业务&#xff0c;还要照顾众多徒弟们的衣食住行。王慧作为德云社董事长&#xff0c;她的忙都在情理之中&#xff0c;而郭德纲作为德云社总班主&#xff0c;他的时间就更加弥足珍贵了。 可是谁能想到&#xff0c;就是这样…