RabbitMQ的基础学习(上)

news2025/1/10 3:11:27

前言:

        RabbitMQ是一个基于AMQP规范实现的消息队列。它具有性能好、高可用、跨平台性、社区活跃等优点,比较适合中小型公司使用。掌握RabbitMQ相关知识,对工作和学习都有帮助。下面我讲详细介绍一下Rabbit的相关知识。

正文:

一、AMQP规范:

          首先,我们先要说明一下AMQP规范,这有利于我们学习RabbitMQ相关知识。

1. 概念:

        AMQP(Advanced Message Queuing Protocol)是一个应用层的高级消息队列协议,它与JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。基于此协议不受客户端,开发语言等条件的限制,RabbitMQ就是基于此协议实现的。

2. 核心组件:

  • ConnectionFactory(连接工厂):生产Connection的的工厂。
  • Connection(连接):应用程序与Broker的网络连接TCP/IP/三次握手和四次挥手。AMQP连接通常是长连接。
  • Channel(网络信道):大部分的业务操作是在Channel这个接口中完成的,包括:
    • 队列的声明queueDeclare;交换机的声明exchangeDeclare;
    • 队列的绑定queueBind;发布消息basicPublish;消费消息basicConsume等。
  • Broker(中间件):接受客户端的连接,实现AMQP实体服务,如RabbitMQ。
  • Producer(生产者):生产消息。
  • Consumer(消费者):消费消息。
  • Queue(队列):存储着即将被应用消费掉的消息。
  • Message(消息):服务与应用程序之间传送的数据,由Properties(属性)和body(主体)组成。
  • VirtualHost(虚拟主机):用于进行逻辑隔离,一个虚拟主机理由可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名字的Exchange。
  • Exchange(交换机):接受消息,根据路由键发送消息到绑定的队列(不具备消息存储的能力)。
  • Binding(绑定):Exchange和Queue之间的虚拟连接。
  • Routing Key(路由键):路由规则,虚拟机可以用它来确定如何路由一个特定消息。

3. AMQP工作过程:

  1. 生成者发布消息到交换机(Exchange)。
  2. 交换机根据路由规则,将消息分发给与当前交换机绑定的队列中。
  3. 消费者监听接收到消息之后开始业务处理。

二、4种交换机的使用:

        RabbitMQ中一供有四种交换机类型,分别是Direct exchange(直连交换机)、Fanout exchange(扇形交换机)、Topic exchange(主题交换机)、Headers exchange(头交换机)。

1. Direct exchange(直连交换机)

        要求消息与一个特定的路由键完全匹配,即一对一的,点对点的发送。

2. Fanout exchange(扇形交换机)

        一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上,即发布订阅。

3.Topic exchange(主题交换机)

        通配符匹配交换机,使用通配符去匹配,路由到对应的队列。

4. Headers exchange(头交换机)

        不适用routingKey进行路由匹配,使用请求头中键值路由匹配。 

5. 代码实现,以直连交换机为例:

5.1 直连交换机的Rabbit配置类

/**
 * 〈一句话功能简述〉<br>
 * 〈直连交换机的Rabbit配置类〉
 *
 * @author hanxinghua
 * @create 2022/9/19
 * @since 1.0.0
 */
@Order(-1)
@Configuration
public class DirectRabbitConfig implements BeanPostProcessor {


    @Resource
    private RabbitAdmin rabbitAdmin;


    //初始化rabbitAdmin对象
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

    @Bean
    public Queue rabbitDirectQueue() {
        /**
         * 1、name:    队列名称
         * 2、durable: 是否持久化
         * 3、exclusive: 是否独享、排外的。如果设置为true,定义为排他队列。则只有创建者可以使用此队列。也就是private私有的。
         * 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后,会自动删除。
         * */
        return new Queue(RabbitConstant.DIRECT_TOPIC, true, false, false);
    }

    @Bean
    public DirectExchange rabbitDirectExchange() {
        // Direct交换机
        return new DirectExchange(RabbitConstant.DIRECT_EXCHANGE, true, false);
    }

    @Bean
    public Binding bindDirect() {
        //链式写法,绑定交换机和队列,并设置匹配键
        return BindingBuilder
                //绑定队列
                .bind(rabbitDirectQueue())
                //到交换机
                .to(rabbitDirectExchange())
                //并设置匹配键
                .with(RabbitConstant.DIRECT_ROUTING);
    }

    /**
     * 实例化Bean后的处理器
     * Tips:
     * 由于队列不存在,启动消费者会报错,最好的解决方法是生产者和消费者都尝试声明队列
     *
     * @param bean
     * @param beanName
     * @return
     * @throws BeansException
     */
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

        // 创建交换机
        rabbitAdmin.declareExchange(rabbitDirectExchange());
        // 创建队列
        rabbitAdmin.declareQueue(rabbitDirectQueue());
        return null;
    }

}

5.2 直连交换机的发送消息服务

/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/19
 * @since 1.0.0
 */
@Service("directRabbitService")
public class DirectRabbitServiceImpl implements RabbitService {

    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Override
    public String sendMsg(String msg) throws Exception {
        try {
            rabbitTemplate.convertAndSend(RabbitConstant.DIRECT_EXCHANGE, RabbitConstant.DIRECT_ROUTING, msg);
            return "ok";
        } catch (Exception e) {
            e.printStackTrace();
            return "error";
        }
    }
}

5.3 直连交换机的消息消费者:

/**
 * 〈一句话功能简述〉<br>
 * 〈直连交换机消费者〉
 *
 * @author hanxinghua
 * @create 2022/9/19
 * @since 1.0.0
 */
@Component
public class DirectRabbitConsumer {

    enum Action {
        //处理成功
        SUCCESS,
        //可以重试的错误,消息重回队列
        RETRY,
        //无需重试的错误,拒绝消息,并从队列中删除
        REJECT
    }

    @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue(RabbitConstant.DIRECT_TOPIC))
    public void process(String msg, Message message, Channel channel) {
        long tag = message.getMessageProperties().getDeliveryTag();
        Action action = Action.SUCCESS;
        try {
            System.out.println("消费者RabbitDemoConsumer从RabbitMQ服务端消费消息:" + msg);
            if ("bad".equals(msg)) {
                throw new IllegalArgumentException("测试:抛出可重回队列的异常");
            }
            if ("error".equals(msg)) {
                throw new Exception("测试:抛出无需重回队列的异常");
            }
        } catch (IllegalArgumentException e1) {
            e1.printStackTrace();
            //根据异常的类型判断,设置action是可重试的,还是无需重试的
            action = Action.RETRY;
        } catch (Exception e2) {
            //打印异常
            e2.printStackTrace();
            //根据异常的类型判断,设置action是可重试的,还是无需重试的
            action = Action.REJECT;
        } finally {
            try {
                if (action == Action.SUCCESS) {
                    //multiple 表示是否批量处理。true表示批量ack处理小于tag的所有消息。false则处理当前消息
                    channel.basicAck(tag, false);
                } else if (action == Action.RETRY) {
                    //Nack,拒绝策略,消息重回队列
                    channel.basicNack(tag, false, true);
                } else {
                    //Nack,拒绝策略,并且从队列中删除
                    channel.basicNack(tag, false, false);
                }
                channel.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

}

三、6种通信模型使用:

        RabbitMQ中,主要包括6种通信模型,分别是helloworld模型、work模型、pubsub模型、router模型、topic模型、rpc模型。

1. helloworld模型:

        一个生产者发送消息,一个接收者接收消息。

        相关代码:

/**
 * 〈一句话功能简述〉<br>
 * 〈接受队列中的消息〉
 * <p>
 * 一个生成者发送消息,一个接收者接收消息
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Recv {

    /**
     * 队列名称
     */
    private final static String QUEUE_NAME = "hello.mq";

    public static void main(String[] argv) throws Exception {

        // 创建链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置RabbitMQ所在主机ip或者主机名
        factory.setHost("localhost");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个频道
        Channel channel = connection.createChannel();
        // 声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println("Waiting for messages.");

        // 创建消费者
        Consumer consumer = new DefaultConsumer(channel) {

            /**
             * 处理交付
             *
             * @param consumerTag 这个消息的唯一标记
             * @param envelope 信封(请求的消息属性的一个封装)
             * @param properties 前面队列带过来的值
             * @param body 接受到的消息
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Received '" + message + "'");
            }
        };

        // 启动一个消费者,并返回服务端生成的消费者标识
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}




/**
 * 〈一句话功能简述〉<br>
 * 〈发送消息到队列中〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Send {

    /**
     * 队列名称
     */
    private final static String QUEUE_NAME = "hello.mq";

    public static void main(String[] argv) throws Exception {

        // 创建链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置RabbitMQ所在主机ip或者主机名
        factory.setHost("localhost");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个频道
        Channel channel = connection.createChannel();
        // 指定一个队列
        // 第一个参数:队列名称
        // 第二个参数:false:重启后,队列没有。true:持久化队列,重启后,队列依然存在
        // 第三个参数:声明一个独占队列,仅限于此连接,连接关闭,删除这个队列 true
        // 第四个参数:最后一个消费者退出去之后,这个队列是否自动删除
        // 第五个参数:队列的其他属性
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 往队列中发出一条消息
        String message = "hello world!";
        // 第一个参数: 交换机,不能为null,但是可以设置成 ""
        // 第二个参数:路由key,不能为null,但是可以设置成 ""
        // 第三个参数:设置的队列的属性
        // 第四个参数:消息值
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("Sent '" + message + "'");
        //关闭频道和连接
        channel.close();
        connection.close();
    }
}

2. work模型: 

        多个消费者消费的数据之和才是原来队列中的所有数据,适用于流量的消峰。

         相关代码:

/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Task {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

        for (int i = 0; i < 100 ; i++) {
            channel.basicPublish("", TASK_QUEUE_NAME, null,
                    ("我是工作模型:"+i).getBytes("UTF-8"));
        }
        System.out.println("Sent over!");

        channel.close();
        connection.close();
    }

}



/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Worker1 {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println("Waiting for messages.");

//        // 消费端限流策略,同一时刻服务器只会发送一条消息给消费者
//        channel.basicQos(1)

        final Consumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("消费者1接受到的消息是:"+new String(body));
                // 进行手动应答  第一个参数:自动应答的这个消息标记  第二个参数:false 就相当于告诉队列受到消息了
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
    }

}




/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Worker2 {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println("Waiting for messages.");


        final Consumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("消费者2接受到的消息是:"+new String(body));
                // 进行手动应答  第一个参数:自动应答的这个消息标记  第二个参数:false 就相当于告诉队列受到消息了
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
    }
}

3. pubsub模型:

        发布订阅模式,使用Fanout交换机。

         相关代码:

/**
 * 〈一句话功能简述〉<br>
 * 〈消息发布者〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Publish {

    /**
     * 声明交换机的名字
     */
    private static final String EXCHANGE_NAME="fanout-01";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明交换机 第一个参数:交换机的名字  第二个参数:交换机的类型,如果使用的是发布订阅模型  只能写 fanout
        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT);

        // 发送消息到交换机
        for (int i = 0; i <100 ; i++) {
            channel.basicPublish(EXCHANGE_NAME,"",null,("发布订阅模型的值:"+i).getBytes());
        }
        System.out.println("Sent over!");

        // 关闭资源
        channel.close();
        connection.close();
    }
}



/**
 * 〈一句话功能简述〉<br>
 * 〈消息订阅者〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Subscribe1 {

    /**
     * 声明交换机的名字
     */
    private static final String EXCHANGE_NAME = "fanout-01";

    /**
     * 队列的名字
     */
    private static final String QUEUE_NAME = "fanout-queue1";


    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 声明换机
        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT);

        // 将队列绑定到交换机  第一个参数:队列的名字 第二个参数:交换机的名字
        // 第三个参数:路由的key(现在没有用到这个路由的key)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        System.out.println("Waiting for messages.");

        // 声明消费者
        Consumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("订阅者1接受到的数据是:" + new String(body));
            }
        };

        // 启动一个消费者
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}



/**
 * 〈一句话功能简述〉<br>
 * 〈消息订阅者〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Subscribe2 {

    /**
     * 声明交换机的名字
     */
    private static final String EXCHANGE_NAME = "fanout-01";

    /**
     * 队列的名字
     */
    private static final String QUEUE_NAME = "fanout-queue2";


    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 声明换机
        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT);

        // 将队列绑定到交换机  第一个参数:队列的名字 第二个参数:交换机的名字
        // 第三个参数:路由的key(现在没有用到这个路由的key)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        System.out.println("Waiting for messages.");

        // 声明消费者
        Consumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("订阅者1接受到的数据是:" + new String(body));
            }
        };

        // 启动一个消费者
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}

4. router模型:

        路由模型,相当于是分布订阅的升级版,根据路由的key(routing key)来判断是否路由到哪一个队列里面去,使用Direct交换机

        相关代码:

/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Producer {

    private static final String EXCHANGE_NAME = "direct-01";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明一个交换机,要是路由模式只能是 direct
        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT);

        // 发送信息到交换机
        for (int i = 0; i < 100; i++) {
            if (i % 2 == 0) {
                // 这个路由的key是可以随便设置的
                channel.basicPublish(EXCHANGE_NAME, "one", null, ("路由模型的值:" + i).getBytes());
            } else {
                // 这个路由的key是可以随便设置的
                channel.basicPublish(EXCHANGE_NAME, "two", null, ("路由模型的值:" + i).getBytes());
            }
        }
        System.out.println("Sent over!");

        channel.close();
        connection.close();
    }
}



/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Consumer1 {

    private static final String EXCHANGE_NAME="direct-01";
    private static final String QUEUE_NAME="direct-queue-01";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT);

        // 绑定队列到交换机  第三个参数:表示的是路由key
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"one");
        System.out.println("Waiting for messages.");

        // 声明消费者
        Consumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //这里就是接受消息的地方
                System.out.println("路由key是one的这个队列接受到数据:"+new String(body));
            }
        };

        //绑定消费者
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}



/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Consumer2 {

    private static final String EXCHANGE_NAME="direct-01";
    private static final String QUEUE_NAME="direct-queue-02";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT);

        // 绑定队列到交换机  第三个参数:表示的是路由key
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"two");
        System.out.println("Waiting for messages.");

        // 声明消费者
        Consumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //这里就是接受消息的地方
                System.out.println("路由key是two的这个队列接受到数据:"+new String(body));
            }
        };

        //绑定消费者
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}

5. topic模型:

        相当于是对路由模式的一个升级,在匹配的规则上可以实现模糊匹配

        相关代码:

/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Producer {

    private static final String EXCHANGE_NAME = "topic-01";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.TOPIC);

        // 发送信息到交换机
        for (int i = 0; i < 100; i++) {
            if (i % 2 == 0) {
                channel.basicPublish(EXCHANGE_NAME, "one.one.one", null, ("路由模型的值:" + i).getBytes());
            }else {
                channel.basicPublish(EXCHANGE_NAME, "one.one", null, ("路由模型的值:" + i).getBytes());
            }
        }
        System.out.println("Sent over!");

        channel.close();
        connection.close();
    }
}



/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Consumer1 {

    private static final String QUEUE_NAME="topic-queue-01";
    private static final String EXCHANGE_NAME="topic-01";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");

        // 绑定队列到交换机  第三个参数:表示的是路由key
        // 注意  * :只是代表一个单词  # :这个才代表  一个或者多个单词
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"one.*");
        System.out.println("Waiting for messages.");

        // 声明消费者
        Consumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("路由key是one.*的这个队列接受到数据:"+new String(body));
            }
        };
        System.out.println("Waiting for messages.");
        //绑定消费者
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}



/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Consumer2 {

    private static final String QUEUE_NAME="topic-queue-02";
    private static final String EXCHANGE_NAME="topic-01";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");

        // 绑定队列到交换机  第三个参数:表示的是路由key
        // 注意  * :只是代表一个单词  # :这个才代表  一个或者多个单词
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"one.#");
        System.out.println("Waiting for messages.");

        // 声明消费者
        Consumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("路由key是one.#的这个队列接受到数据:"+new String(body));
            }
        };
        //绑定消费者
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}

6. rpc模型:

         相关代码:

/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Server {

    private final static String QUEUE_NAME = "rpc-01";

    public static void main(String[] args) throws Exception {
        // 创建链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置RabbitMQ所在主机ip或者主机名
        factory.setHost("localhost");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个频道
        Channel channel = connection.createChannel();
        // 声明一个队列:客户端向服务器发送数据的队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        // 启动消费者,用来处理客户端发送到队列的消息
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 获取参数
                String message = new String(body);
                int n = Integer.parseInt(message);
                // 模拟服务端的一个功能
                String fib = handleInterface(n) + "";
                AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                        .Builder()
                        .correlationId(properties.getCorrelationId())
                        .build();

                // 将结果返回会客户端
                // 注意从properties去获取客户端传送过来的信息,再返回回去
                channel.basicPublish("", properties.getReplyTo(), replyProps, fib.getBytes());
            }
        });
    }

    private static int handleInterface(int n) {
        if (n == 0) {
            return 0;
        }
        return n + 2;
    }
}



/**
 * 〈一句话功能简述〉<br>
 * 〈RPC模型〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Client {

    private final static String QUEUE_NAME = "rpc-01";

    public static void main(String[] args) throws Exception {
        // 创建链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置RabbitMQ所在主机ip或者主机名
        factory.setHost("localhost");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个频道
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        // 声明一个队列(换了一种方式),用于存储服务器返回到客户端的数据
        String replyQueueName = channel.queueDeclare().getQueue();

        // 使用UUID随机生成一个id
        final String correlationId = UUID.randomUUID().toString();

        // 客户端发送给服务器添加的额外属性
        AMQP.BasicProperties props = new AMQP.BasicProperties()
                .builder()
                .correlationId(correlationId)
                .replyTo(replyQueueName)
                .build();

        // 客户端将数据发送到发送队列
        channel.basicPublish("", QUEUE_NAME, props, "4".getBytes());

        // 启动消费者,用来客户端从相应队列获取到处理的结果
        channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                // 通过correlationId去保证获取到的是正确的信息
                if (properties.getCorrelationId().equals(correlationId)) {
                    // 处理的结果输出
                    System.out.println("RPC返回结果:" + new String(body));
                }

                // 关闭通道,注意一定要等结果返回后再关闭,不然拿不到返回的数据
                try {
                    channel.close();
                    connection.close();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/174724.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【机器学习】缺失值的处理方法总结

目录&#xff1a;缺失值的处理一、总录二、引言三、数据缺失的原因四、数据缺失的类型五、数据缺失的处理方法5.1 删除记录5.2 数据填充5.2.1 替换缺失值5.2.2 拟合缺失值5.2.3 虚拟变量5.3 不处理六、实证演练七、小结一、总录 二、引言 业界广泛流传这样一句话&#xff1a;数…

java构造器2023021

构造器&#xff1a; 构造器是一个特殊的方法&#xff0c;用于创建实例时执行初始化。构造器是创建对象的重要途径&#xff08;即使使用工厂模式、反射等方式创建对象&#xff0c;其实质依然是依赖于构造器&#xff09;&#xff0c;因此Java类必须包含一个或一个以上的构造器。 …

23种设计模式(十五)——适配器模式【接口隔离】

文章目录 意图什么时候使用适配器真实世界类比适配器模式的实现适配器模式的优缺点亦称:封装器模式、Wrapper、Adapter 意图 将一个接口转换成客户希望的另一个接口,使接口不兼容的那些类可以一起工作,其别名为包装器。 什么时候使用适配器 1、系统需要使用现有的类,而这…

软件设计到底是什么?

软件设计是什么&#xff1a; 就是讨论要用什么技术实现功能&#xff1f;就是要考虑选择哪些框架和中间件&#xff1f;设计就是设计模式&#xff1f;设计就是Controller、Service加Model&#xff1f;…… 一百个程序员&#xff0c;就有一百种理解。若按照这些方式去了解“软件…

Java设计模式中状态模式介绍/状态模式怎么使用

继续整理记录这段时间来的收获&#xff0c;详细代码可在我的Gitee仓库SpringBoot克隆下载学习使用&#xff01; 6.6 状态模式 6.6.1定义 对有状态的对象&#xff0c;把复杂的"判断逻辑"提取到不同的状态对象中&#xff0c;允许状态对象在其内部状态发生改变时改变…

【C++】哈希表 | 闭散列 | 开散列 | unordered_map 和 unordered_set 的模拟实现

​&#x1f320; 作者&#xff1a;阿亮joy. &#x1f386;专栏&#xff1a;《吃透西嘎嘎》 &#x1f387; 座右铭&#xff1a;每个优秀的人都有一段沉默的时光&#xff0c;那段时光是付出了很多努力却得不到结果的日子&#xff0c;我们把它叫做扎根 目录&#x1f449;unordere…

Ubuntu20.04/22.04 安装 Arduino IDE 2.x

这周收到两片基于LGT8F328P LQFP32的Arduino Mini EVB, 机器上没有 Arduino 环境需要新安装, 正好感受一下新出的 Arduino IDE 2.x, 记录一下 Ubuntu 20.04/22.04 下安装 Arduino IDE 2.x 的过程. 下载解压 下载 访问 Arduino 的官网下载 https://www.arduino.cc/en/softwar…

2021-04-12

今天在练习自定义标题栏&#xff08;Android初级开发&#xff08;四&#xff09;——补充3&#xff09;的过程中遇到了隐藏系统自带标题栏的问题&#xff0c;现将几种去掉系统自带标题栏的方式做一总结。大体上可以分为两种方式&#xff0c;一种是修改xml文件&#xff08;这种方…

第六层:继承

文章目录前情回顾继承继承的作用继承的基本语法继承方式公共继承保护继承私有继承继承中的对象模型继承中的构造和析构顺序继承中同名成员访问非静态成员静态成员多继承语法注意多继承中的对象模型多继承父类成员名相同菱形继承概念菱形继承出现的问题虚继承步入第七层本章知识…

【数据分析】(task3)数据重构

note 数据的合并&#xff1a;df自带的join方法是横向合并&#xff0c;append方法是纵向&#xff08;上下&#xff09;合并拼接&#xff1b;pd的merge方法是横向合并&#xff0c;然后用刚才的apend进行纵向合并。数据的重构&#xff1a;stack函数的主要作用是将原来的列转成最内…

Redis基本类型和基本操作

2.Redis常见命令 Redis是典型的key-value数据库&#xff0c;key一般是字符串&#xff0c;而value包含很多不同的数据类型&#xff1a; Redis为了方便我们学习&#xff0c;将操作不同数据类型的命令也做了分组&#xff0c;在官网&#xff08; https://redis.io/commands &…

阿里云轻量服务器下>安装宝塔面板>安装使用Tomcat服务器>通过公网ip地址>直接访问网站目录下文件

第一步 阿里云开放Tomcat 8080端口号 和宝塔面板 8888端口 第二步 如果你的应用镜像 一开始在阿里云购买服务器时候没有选择宝塔应用镜像 先打开如下界面 将系统中应用镜像 确定更换为 宝塔面板镜像 第三步 请在应用详情中 走完紫色所框选的步骤 第四步 将上一步获取到的…

cadence SPB17.4 - allegro - align component by pin

文章目录cadence SPB17.4 - allegro - align component by pin概述笔记实验备注补充 - 2023_0120_2337ENDcadence SPB17.4 - allegro - align component by pin 概述 allegro自带的元件对齐, 默认是对齐元件中心的, 对齐后的效果是中心对齐. 但是为了走线能走的最短, 拉线方便…

2023年春节祝福第二弹——送你一只守护兔(下),CSS3 动画相关属性图例实例大全(82种),守护兔源代码免费下载

2023年春节祝福第二弹——送你一只守护兔(下&#xff09; CSS3 动画相关属性图例实例大全&#xff08;82种&#xff09;、守护兔源代码免费下载 本文目录&#xff1a; 五、CSS3 动画相关属性实例大全 &#xff08;1&#xff09;、CSS3的动画基本属性 &#xff08;2&#xf…

TCP/IP OSI七层模型

作者简介&#xff1a;一名在校云计算网络运维学生、每天分享网络运维的学习经验、和学习笔记。 座右铭&#xff1a;低头赶路&#xff0c;敬事如仪 个人主页&#xff1a;网络豆的主页​​​​​​ 目录 前言 一.OSI七层模型 1.什么是OSI七层参考模型 2.七层每层分别的作用…

Spring热部署设置

手动热部署 热部署是指在不停止应用程序的情况下更新应用程序的功能。这样可以避免短暂的服务中断&#xff0c;并且可以更快地部署新的功能和修复问题。热部署通常适用于Web应用程序和服务器端应用程序。 在pom.xml中添加依赖&#xff1a; <dependency><groupId>…

cmake 02 hello_cmake

cmake 学习笔记 一个最小化的 cmake 项目 目录结构 F:\2023\code\cmake\hello_cmake>tree /f 卷 dox 的文件夹 PATH 列表 卷序列号为 34D2-6BE8 F:. │ CMakeLists.txt │ main.cpp │ └─.vscodelaunch.jsontasks.jsonF:\2023\code\cmake\hello_cmake>源码 main.c…

Java 集合 笔记

体系 Collection接口 List接口&#xff1a;按照顺序插入数据&#xff0c;可重复 ArrayList实现类&#xff1a;LinkedList实现类&#xff1a; Set接口&#xff1a;不可重复的集合 HashSet实现类 Queue接口&#xff1a;队列 LinkedList实现类ArrayBlockingQueue实现类PriorityQu…

Python CalmAn工具包安装及环境配置过程【Windows】

文章目录CalmAn简介安装要求我的设备1>CalmAn压缩包解压2>conda创建虚拟环境3>requirements依赖包配置&#xff08;包括tensorflow&#xff09;4>caiman安装(mamba install)5>caimanmanager.py install6>PyCharm添加解释器7>Demo演示8>遇到的问题CalmA…

DB SQL 转 ES DSL(支持多种数据库常用查询、统计、平均值、最大值、最小值、求和语法)...

1. 简介 日常开发中需要查询Elasticsearch中的数据时&#xff0c;一般会采用RestHighLevelClient高级客户端封装的API。项目中一般采用一种或多种关系型数据库(如&#xff1a;Mysql、PostgreSQL、Oracle等) NoSQL(如&#xff1a;Elasticsearch)存储方案&#xff1b;不同关系数…