【消息中间件】RabbitMQ的工作模式

news2024/11/17 16:10:36

在这里插入图片描述

前 言
🍉 作者简介:半旧518,长跑型选手,立志坚持写10年博客,专注于java后端
☕专栏简介:深入、全面、系统的介绍消息中间件
🌰 文章简介:本文将介绍RabbitMQ的工作模式
🍓文章推荐:【消息中间件】1小时快速上手RabbitMQ

文章目录

  • 1.WorkQueue工作队列模式
  • 2.Pub/Sub订阅模式
  • 3.Routing工作模式
  • 4.Topics模式
  • 5.总结

上一篇文章已经介绍RabbitMQ的基本概念、安装、管控台使用和基于简单模式的helloworld。这篇文章将介绍RabbitMQ的其它工作模式。

1.WorkQueue工作队列模式

在这里插入图片描述
代码实现也很简单,只需要多一个消费者即可。

生产者

public class Producer_WorkQueues {
    public static void main(String[] args) throws IOException, TimeoutException, IOException, TimeoutException {

        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("localhost");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/ittest");//虚拟机 默认值/
        factory.setUsername("guest");//用户名 默认 guest
        factory.setPassword("guest");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建队列Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        参数:
            1. queue:队列名称
            2. durable:是否持久化,当mq重启之后,还在
            3. exclusive:
                * 是否独占。只能有一个消费者监听这队列
                * 当Connection关闭时,是否删除队列
                *
            4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
            5. arguments:参数。

         */
        //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare("work_queues",true,false,false,null);
        /*
        basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
        参数:
            1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
            2. routingKey:路由名称
            3. props:配置信息
            4. body:发送消息数据

         */
        for (int i = 1; i <= 10; i++) {
            String body = i+"hello rabbitmq~~~";

            //6. 发送消息
            channel.basicPublish("","work_queues",null,body.getBytes());
        }



        //7.释放资源
        channel.close();
        connection.close();

    }
}

消费者1.

public class Consumer_WorkQueues1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("localhost");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/ittest");//虚拟机 默认值/
        factory.setUsername("guest");//用户名 默认 guest
        factory.setPassword("guest");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建队列Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        参数:
            1. queue:队列名称
            2. durable:是否持久化,当mq重启之后,还在
            3. exclusive:
                * 是否独占。只能有一个消费者监听这队列
                * 当Connection关闭时,是否删除队列
                *
            4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
            5. arguments:参数。

         */
        //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare("work_queues",true,false,false,null);

        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        参数:
            1. queue:队列名称
            2. autoAck:是否自动确认
            3. callback:回调对象

         */
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回调方法,当收到消息后,会自动执行该方法

                1. consumerTag:标识
                2. envelope:获取一些信息,交换机,路由key...
                3. properties:配置信息
                4. body:数据

             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              /*  System.out.println("consumerTag:"+consumerTag);
                System.out.println("Exchange:"+envelope.getExchange());
                System.out.println("RoutingKey:"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);*/
                System.out.println("body:"+new String(body));
            }
        };
        channel.basicConsume("work_queues",true,consumer);


        //关闭资源?不要

    }
}

消费者2

public class Consumer_WorkQueues2 {
    public static void main(String[] args) throws IOException, TimeoutException, IOException, TimeoutException {

        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("localhost");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/ittest");//虚拟机 默认值/
        factory.setUsername("guest");//用户名 默认 guest
        factory.setPassword("guest");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建队列Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        参数:
            1. queue:队列名称
            2. durable:是否持久化,当mq重启之后,还在
            3. exclusive:
                * 是否独占。只能有一个消费者监听这队列
                * 当Connection关闭时,是否删除队列
                *
            4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
            5. arguments:参数。

         */
        //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare("work_queues",true,false,false,null);

        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        参数:
            1. queue:队列名称
            2. autoAck:是否自动确认
            3. callback:回调对象

         */
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回调方法,当收到消息后,会自动执行该方法

                1. consumerTag:标识
                2. envelope:获取一些信息,交换机,路由key...
                3. properties:配置信息
                4. body:数据

             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              /*  System.out.println("consumerTag:"+consumerTag);
                System.out.println("Exchange:"+envelope.getExchange());
                System.out.println("RoutingKey:"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);*/
                System.out.println("body:"+new String(body));
            }
        };
        channel.basicConsume("work_queues",true,consumer);


        //关闭资源?不要

    }
}

先启动consumer1,2;再启动producer,即可看到两个消费者会争抢消费生产者生产的消息。
在这里插入图片描述
在这里插入图片描述

小结下。
在这里插入图片描述

2.Pub/Sub订阅模式

在这里插入图片描述
生产者

public class Producer_PubSub {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("localhost");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/ittest");//虚拟机 默认值/
        factory.setUsername("guest");//用户名 默认 guest
        factory.setPassword("guest");//密码 默认值 guest

        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
       /*

       exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
       参数:
        1. exchange:交换机名称
        2. type:交换机类型
            DIRECT("direct"),:定向
            FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
            TOPIC("topic"),通配符的方式
            HEADERS("headers");参数匹配

        3. durable:是否持久化
        4. autoDelete:自动删除
        5. internal:内部使用。 一般false
        6. arguments:参数
        */

        String exchangeName = "test_fanout";
        //5. 创建交换机
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
        //6. 创建队列
        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //7. 绑定队列和交换机
        /*
        queueBind(String queue, String exchange, String routingKey)
        参数:
            1. queue:队列名称
            2. exchange:交换机名称
            3. routingKey:路由键,绑定规则
                如果交换机的类型为fanout ,routingKey设置为""
         */
        channel.queueBind(queue1Name,exchangeName,"");
        channel.queueBind(queue2Name,exchangeName,"");

        String body = "日志信息:张三调用了findAll方法...日志级别:info...";
        //8. 发送消息
        channel.basicPublish(exchangeName,"",null,body.getBytes());

        //9. 释放资源
        channel.close();
        connection.close();
    }
}

消费者1

public class Consumer_PubSub1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("localhost");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/ittest");//虚拟机 默认值/
        factory.setUsername("guest");//用户名 默认 guest
        factory.setPassword("guest");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();


        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";


        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        参数:
            1. queue:队列名称
            2. autoAck:是否自动确认
            3. callback:回调对象

         */
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回调方法,当收到消息后,会自动执行该方法

                1. consumerTag:标识
                2. envelope:获取一些信息,交换机,路由key...
                3. properties:配置信息
                4. body:数据

             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              /*  System.out.println("consumerTag:"+consumerTag);
                System.out.println("Exchange:"+envelope.getExchange());
                System.out.println("RoutingKey:"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);*/
                System.out.println("body:"+new String(body));
                System.out.println("将日志信息打印到控制台.....");
            }
        };
        channel.basicConsume(queue1Name,true,consumer);


        //关闭资源?不要

    }
}

消费者2

public class Consumer_PubSub2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("localhost");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/ittest");//虚拟机 默认值/
        factory.setUsername("guest");//用户名 默认 guest
        factory.setPassword("guest");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();


        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";


        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        参数:
            1. queue:队列名称
            2. autoAck:是否自动确认
            3. callback:回调对象

         */
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回调方法,当收到消息后,会自动执行该方法

                1. consumerTag:标识
                2. envelope:获取一些信息,交换机,路由key...
                3. properties:配置信息
                4. body:数据

             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              /*  System.out.println("consumerTag:"+consumerTag);
                System.out.println("Exchange:"+envelope.getExchange());
                System.out.println("RoutingKey:"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);*/
                System.out.println("body:"+new String(body));
                System.out.println("将日志信息保存到数据库.....");
            }
        };
        channel.basicConsume(queue2Name,true,consumer);


        //关闭资源?不要

    }
}

启动生产者,可以看到rabbitmq主控台是这样的。
在这里插入图片描述
在这里插入图片描述
启动消费者1
在这里插入图片描述
启动消费者2.
在这里插入图片描述
主控台是这样的。
在这里插入图片描述

3.Routing工作模式

在这里插入图片描述
下面实现下列需求,对于error级别的log输出到控制台并保存到数据库,其它级别的log打印到控制台。

实现如下。
生产者。


/**
 * 发送消息
 */
public class Producer_Routing {
    public static void main(String[] args) throws IOException, TimeoutException {

      //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("localhost");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/ittest");//虚拟机 默认值/
        factory.setUsername("guest");//用户名 默认 guest
        factory.setPassword("guest");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
       /*

       exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
       参数:
        1. exchange:交换机名称
        2. type:交换机类型
            DIRECT("direct"),:定向
            FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
            TOPIC("topic"),通配符的方式
            HEADERS("headers");参数匹配

        3. durable:是否持久化
        4. autoDelete:自动删除
        5. internal:内部使用。 一般false
        6. arguments:参数
        */

       String exchangeName = "test_direct";
        //5. 创建交换机
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
        //6. 创建队列
        String queue1Name = "test_direct_queue1";
        String queue2Name = "test_direct_queue2";

        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //7. 绑定队列和交换机
        /*
        queueBind(String queue, String exchange, String routingKey)
        参数:
            1. queue:队列名称
            2. exchange:交换机名称
            3. routingKey:路由键,绑定规则
                如果交换机的类型为fanout ,routingKey设置为""
         */
        //队列1绑定 error
        channel.queueBind(queue1Name,exchangeName,"error");
        //队列2绑定 info  error  warning
        channel.queueBind(queue2Name,exchangeName,"info");
        channel.queueBind(queue2Name,exchangeName,"error");
        channel.queueBind(queue2Name,exchangeName,"warning");

        String body = "日志信息:张三调用了delete方法...出错误了。。。日志级别:error...";
        //8. 发送消息
        channel.basicPublish(exchangeName,"warning",null,body.getBytes());

        //9. 释放资源
        channel.close();
        connection.close();

    }
}

消费者1.

public class Consumer_Routing1 {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("172.16.98.133");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/itcast");//虚拟机 默认值/
        factory.setUsername("heima");//用户名 默认 guest
        factory.setPassword("heima");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();


        String queue1Name = "test_direct_queue1";
        String queue2Name = "test_direct_queue2";


        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        参数:
            1. queue:队列名称
            2. autoAck:是否自动确认
            3. callback:回调对象

         */
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回调方法,当收到消息后,会自动执行该方法

                1. consumerTag:标识
                2. envelope:获取一些信息,交换机,路由key...
                3. properties:配置信息
                4. body:数据

             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              /*  System.out.println("consumerTag:"+consumerTag);
                System.out.println("Exchange:"+envelope.getExchange());
                System.out.println("RoutingKey:"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);*/
                System.out.println("body:"+new String(body));
                System.out.println("将日志信息打印到控制台.....");
            }
        };
        channel.basicConsume(queue2Name,true,consumer);


        //关闭资源?不要

    }
}

消费者2

public class Consumer_Routing2 {
    public static void main(String[] args) throws IOException, TimeoutException {

       //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("localhost");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/ittest");//虚拟机 默认值/
        factory.setUsername("guest");//用户名 默认 guest
        factory.setPassword("guest");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();


        String queue1Name = "test_direct_queue1";
        String queue2Name = "test_direct_queue2";


        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        参数:
            1. queue:队列名称
            2. autoAck:是否自动确认
            3. callback:回调对象

         */
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回调方法,当收到消息后,会自动执行该方法

                1. consumerTag:标识
                2. envelope:获取一些信息,交换机,路由key...
                3. properties:配置信息
                4. body:数据

             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              /*  System.out.println("consumerTag:"+consumerTag);
                System.out.println("Exchange:"+envelope.getExchange());
                System.out.println("RoutingKey:"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);*/
                System.out.println("body:"+new String(body));
                System.out.println("将日志信息存储到数据库.....");
            }
        };
        channel.basicConsume(queue1Name,true,consumer);


        //关闭资源?不要

    }
}

请读者自测。

4.Topics模式

看图说明一切
在这里插入图片描述
实现如下需求。对Q1,error级别的信息,order系统的信息存入数据库;对Q2都打到控制台。

生产者。


/**
 * 发送消息
 */
public class Producer_Topics {
    public static void main(String[] args) throws IOException, TimeoutException {

      //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("localhost");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/ittest");//虚拟机 默认值/
        factory.setUsername("guest");//用户名 默认 guest
        factory.setPassword("guest");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
       /*

       exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
       参数:
        1. exchange:交换机名称
        2. type:交换机类型
            DIRECT("direct"),:定向
            FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
            TOPIC("topic"),通配符的方式
            HEADERS("headers");参数匹配

        3. durable:是否持久化
        4. autoDelete:自动删除
        5. internal:内部使用。 一般false
        6. arguments:参数
        */

       String exchangeName = "test_topic";
        //5. 创建交换机
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
        //6. 创建队列
        String queue1Name = "test_topic_queue1";
        String queue2Name = "test_topic_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //7. 绑定队列和交换机
        /*
        queueBind(String queue, String exchange, String routingKey)
        参数:
            1. queue:队列名称
            2. exchange:交换机名称
            3. routingKey:路由键,绑定规则
                如果交换机的类型为fanout ,routingKey设置为""
         */

        // routing key  系统的名称.日志的级别。
        //=需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
        channel.queueBind(queue1Name,exchangeName,"#.error");
        channel.queueBind(queue1Name,exchangeName,"order.*");
        channel.queueBind(queue2Name,exchangeName,"*.*");

        String body = "日志信息:张三调用了findAll方法...日志级别:info...";
        //8. 发送消息
        channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());

        //9. 释放资源
        channel.close();
        connection.close();

    }
}

消费者1.

public class Consumer_Topic1 {
    public static void main(String[] args) throws IOException, TimeoutException {
		//1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("localhost");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/ittest");//虚拟机 默认值/
        factory.setUsername("guest");//用户名 默认 guest
        factory.setPassword("guest");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();


        String queue1Name = "test_topic_queue1";
        String queue2Name = "test_topic_queue2";


        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        参数:
            1. queue:队列名称
            2. autoAck:是否自动确认
            3. callback:回调对象

         */
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回调方法,当收到消息后,会自动执行该方法

                1. consumerTag:标识
                2. envelope:获取一些信息,交换机,路由key...
                3. properties:配置信息
                4. body:数据

             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              /*  System.out.println("consumerTag:"+consumerTag);
                System.out.println("Exchange:"+envelope.getExchange());
                System.out.println("RoutingKey:"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);*/
                System.out.println("body:"+new String(body));
                System.out.println("将日志信息存入数据库.......");
            }
        };
        channel.basicConsume(queue1Name,true,consumer);


        //关闭资源?不要

    }
}

消费者2

public class Consumer_Topic2 {
    public static void main(String[] args) throws IOException, TimeoutException {
		//1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("localhost");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/ittest");//虚拟机 默认值/
        factory.setUsername("guest");//用户名 默认 guest
        factory.setPassword("guest");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();


        String queue1Name = "test_topic_queue1";
        String queue2Name = "test_topic_queue2";


        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        参数:
            1. queue:队列名称
            2. autoAck:是否自动确认
            3. callback:回调对象

         */
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel){
            /*
                回调方法,当收到消息后,会自动执行该方法

                1. consumerTag:标识
                2. envelope:获取一些信息,交换机,路由key...
                3. properties:配置信息
                4. body:数据

             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              /*  System.out.println("consumerTag:"+consumerTag);
                System.out.println("Exchange:"+envelope.getExchange());
                System.out.println("RoutingKey:"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);*/
                System.out.println("body:"+new String(body));
                System.out.println("将日志信息打印控制台.......");
            }
        };
        channel.basicConsume(queue2Name,true,consumer);


        //关闭资源?不要

    }
}

请读者自测。

5.总结

总结下。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/136105.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

cubeIDE开发, stm32的CRC计算CubeMX配置及HAL库底层实现分析

一、stm32的CRC 1.1 CRC的简介及MCU关联说明 STM32的CRC(Cyclic Redundancy Check&#xff0c;循环冗余校验)计算单元使用一个固定的多项式发生器&#xff0c;从一个32位的数据字产生一个CRC码。在业务开发应用中&#xff0c;会基于CRC的技术用于验证数据传输或存储完整性。在E…

【python基础_05】面向对象1_对象和类、魔术方法

文章目录1. 类和对象1.1 使用对象组织数据的模版1.2 成员变量和成员方法1.3 实现代码2. 内置方法&#xff08;魔术方法&#xff09;2.1 构造方法&#xff1a; __init__()2.2 __call__()2.3 __len__()2.3 __str__()2.4 __getitem__()2.5 __setitem__()2.6 __delitem__()2.7 __lt…

【从零开始学习深度学习】35. 门控循环神经网络之门控循环单元(gated recurrent unit,GRU)介绍、Pytorch实现GRU并进行训练预测

在循环神经网络中&#xff0c;当时间步数较大或者时间步较小时&#xff0c;循环神经网络的梯度较容易出现衰减或爆炸。上一篇文章中介绍的裁剪梯度可以应对梯度爆炸&#xff0c;但无法解决梯度衰减的问题。因此&#xff0c;循环神经网络在实际中较难捕捉时间序列中时间步距离较…

Elastic-Job分布式任务调度(1):概述

1 什么是任务调度 我们可以先思考一下下面业务场景的解决方案&#xff1a; 某电商系统需要在每天上午10点&#xff0c;下午3点&#xff0c;晚上8点发放一批优惠券。某银行系统需要在信用卡到期还款日的前三天进行短信提醒。某财务系统需要在每天凌晨0:10结算前一天的财务数据…

【自学Python】Linux安装Python

Linux安装Python Python下载 Python下载地址 https://www.python.org/ftp/python/3.7.4/Python-3.7.4.tar.xzPython下载 我们在 Linux 终端中&#xff0c;直接使用 wget 命令&#xff0c;下载 Linux 版 Python 的安装包&#xff0c;我们在终端输入以下命令&#xff1a; wg…

PAT乙级|1094 谷歌的招聘

题源https://pintia.cn/problem-sets/994805260223102976/exam/problems/1071785997033074688 提交1&#xff1a;一个用例没过 提交2&#xff1a;AC 错因&#xff1a;输出需为字符串&#xff0c;例如在 200236 中找 4 位素数&#xff0c;解是0023 关键&#xff1a;第33行代码…

linphone android sdk 源码下载编译

前言 前面的有写过Android 使用Linphone SDK开发SIP客户端相关的文章, 在后续的开发过程中, 为了更深入了解linphone, 便尝试下载SDK源码自行编译. 关于linphone这里不作过多介绍, 可以参考前面的文章. Linphone-SDK 是一个将 Liblinphone 及其依赖项捆绑为 git 子模块的项目&a…

HTC FOCUS3在PC端串流FOHEART H1数据手套(手柄)

本教程介绍使用FOHEART H1数据手套与HTC手柄驱动VR中的虚拟手运动&#xff0c;实现手部的追踪及定位。 本教程内容与之前使用腕带定位&#xff08;HTC FOCUS3在PC端串流FOHEART H1数据手套&#xff08;腕带&#xff09;&#xff09;不同&#xff0c;这次我们使用头显中自带的…

【Kuangbin简单DP】挤奶时间

4561. 挤奶时间 - AcWing题库 题意&#xff1a; 思路&#xff1a; 一开始的思路是把这么多的区间当作物品&#xff0c;然后选与不选&#xff0c;这样去搞线性DP 显然是不行的&#xff0c;因为这样答案就不知道怎么统计了 而且&#xff0c;我们是设阶段&#xff01;&#xf…

HSK汉语考试变革,您需要了解以下几点

2023年HSK考试可能有哪些变化汉语考试难度增加了还是减低了&#xff1f; 对现在的课程和教材有影响&#xff1f; 汉语老师怎么样应对&#xff1f;HSK考试变化猜想1.HSK3级考试和HSKK初级结合在一起 2.HSK4级考试和HSKK中级结合在一起 3.HSK5,6级考试和HSKK高级结合在一起HSKK考…

INTERSPEECH 2022|面向零样本声音克隆的内容相关细粒度说话人表征方法

本文由清华大学与腾讯 AI Lab、香港中文大学合作。 零样本说话人自适应&#xff08;zero-shot speaker adaptation&#xff09;&#xff0c;或称为零样本声音克隆&#xff0c;旨在根据任意一条参考语音&#xff08;reference speech&#xff09;合成训练过程中从未见过的说话人…

Leetcode:239. 滑动窗口最大值(C++)

目录 问题描述&#xff1a; 实现代码和解析&#xff1a; 暴力法&#xff08;会超时&#xff09;&#xff1a; 原理思路&#xff1a; 单调队列法&#xff1a; 原理思路&#xff1a; 单调队列&#xff1a; 模拟过程&#xff1a; 问题描述&#xff1a; 给你一个整数数组…

Python基础知识(二)

目录 顺序语句 条件语句 条件语句书写格式一及对比&#xff1a;if条件语句 条件语句书写格式二及对比&#xff1a;if...else...语句 条件语句书写格式三及对比&#xff1a;if...elif...else语句 空语句pass 条件语句的总结&#xff1a; 循环语句 while循环 与c/java/…

对于Muduo主从Reactor模式的理解

从12月20号开始看Muduo网络库&#xff0c;到28号的时候弄懂了EventLoop, Poller, Channel是怎么一回事&#xff0c;一番琢磨之后觉得还是应该发到博客上跟大家分享&#xff0c;特此记录。 对照linyacool那个webserver的实现&#xff0c;再看了一遍muduo的EventLoop, Poller ,C…

IDEA快速启动多个微服务模块 -idea如何开启Run DashBoard

文章目录 缘起 Run DashBoard面板如何开启开启 Run DashBoard 注意&#xff1a; 缘起 在idea里面如果需要启动多个项目的话&#xff0c;尤其是是比如微服务项目&#xff0c;动辄要启动五六个七八个应用&#xff0c;如果通过右上角那边启动会很不方便&#xff0c;你需要选择…

基于GIS简单处理世界土壤数据库(HWSD)的中国土壤数据集

来源&#xff1a;GIS前沿 一、 数据介绍 土壤属性表主要字段包括&#xff08;图1&#xff09;&#xff1a;详细描述请参考Harmonized World Soil Database (version 1.1).pdf文件&#xff0c;其中以T开头的土壤属性表示土壤上层的属性&#xff08;0-30cm&#xff09;&#xff…

【曲线全局逼近】

曲线全局逼近 本文是基于 这篇文章 翻译而来的&#xff0c;仅学习。 在插值中&#xff0c;插值曲线以给定的顺序通过所有给定的数据点。正如在全局插值页面中所讨论的&#xff0c;插值曲线可能会在所有数据点上摆动&#xff0c;而不是紧紧跟随数据多边形。为了克服这个问题&…

包装类的使用

文章目录一、单元测试方法的使用步骤二、包装类的使用基本数据类型、包装类、String类型之间的相互转化基本数据类型——>包装类注意包装类——>基本数据类型自动装箱与自动拆箱&#xff08;jdk5.0后&#xff09;基本数据类型、包装类——>String类型String类型——&g…

史上最全 Appium 自动化测试从基础到框架实战精华学习笔记(一)

1080402 31.8 KB 对测试人来说&#xff0c;Appium 是非常重要的一个开源跨平台自动化测试工具&#xff0c;它允许测试人员在不同的平台&#xff08;iOS、Android 等&#xff09;使用同一套 API 来写自动化测试脚本&#xff0c;这样可大幅提升代码复用率和工作效率。 本文汇总了…

郭盛华:警惕家庭智能扬声器中潜在的窃听风险

一名安全研究人员因识别Google Home智能扬声器中的安全问题而获得了107500美元的漏洞赏金&#xff0c;这些问题可能被用来安装后门并将其变成窃听设备。 国际知名网络黑客安全专家、东方联盟创始人郭盛华在一篇技术文章中透露&#xff1a;这些漏洞“允许无线附近的攻击者在设备…