RabbitMQ的6种工作模式

news2024/12/26 11:00:16

RabbitMQ的6种工作模式

官方文档:

http://www.rabbitmq.com/

https://www.rabbitmq.com/getstarted.html

RabbitMQ 常见的 6 种工作模式:
在这里插入图片描述

1、simple简单模式

在这里插入图片描述

1)、消息产生后将消息放入队列。

2)、消息的消费者监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除。

3)、存在的问题:消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失。

4)、应用场景:聊天(中间有一个过度的服务器)。

5)、代码实现:

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>rabbitmq-java</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.10.0</version>
        </dependency>
    </dependencies>

</project>

工具类

package com.example;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtil {

    // 连接rabbitmq服务,共享一个工厂对象
    private static ConnectionFactory factory;

    static {
        factory=new ConnectionFactory();
        //设置rabbitmq属性
        factory.setHost("127.0.0.1");
        factory.setUsername("zsx242030");
        factory.setPassword("zsx242030");
        factory.setVirtualHost("/");
        factory.setPort(5672);
    }
    public static Connection getConnection(){
        Connection connection=null;
        try {
            //获取连接对象
            connection = factory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return connection;
    }
}

消息提供者

package com.example.simple;

import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Provider {

    public static void main(String[] args) {
        try {
            //获取连接对象
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //通过通道创建队列,后续所有的操作都是基于channel实现(队列也可以由消费方创建)
            channel.queueDeclare("queue1", false, false, false, null);
            //向队列中发送消息
            channel.basicPublish("", "queue1", null, "Hello RabbitMQ!!!".getBytes());
            //断开连接
            channel.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息消费者

package com.example.simple;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer {
    public static void main(String[] args) {
        try {
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //监听队列中的消息(消费的是队列,而不是交换机)
            channel.basicConsume("queue1", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者获得消息为:" + new String(body, "utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            channel.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
消费者获得消息为:Hello RabbitMQ!!!

2、work工作模式(资源的竞争)

在这里插入图片描述

1)、消息产生者将消息放入队列,消费者可以有多个,消费者1,消费者2,同时监听同一个队列。消息被消费,

C1 和 C2 共同争抢当前的消息队列内容,谁先拿到谁负责消费消息。

2)、存在的问题:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关

(synchronized,与同步锁的性能不一样),保证一条消息只能被一个消费者使用。

3)、应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到

消息队列中,空闲的系统自动争抢);对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

4)、代码实现:

消息提供者

package com.example.work;

import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Provider {

    public static void main(String[] args) {
        try {
            //获取连接对象
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //通过通道创建队列
            channel.queueDeclare("queue1", false, false, false, null);
            //向队列中发送消息
            for (int i = 1; i <= 10; i++) {
                channel.basicPublish("", "queue1", null, ("Hello RabbitMQ!!!" + i).getBytes());
            }
            //断开连接
            channel.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息消费者1

package com.example.work;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {
    public static void main(String[] args) {
        try {
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //监听队列中的消息
            channel.basicConsume("queue1", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            // channel.close();
            // connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息消费者2

package com.example.work;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer2 {
    public static void main(String[] args) {
        try {
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //监听队列中的消息
            channel.basicConsume("queue1", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            // channel.close();
            // connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!9
消费者2获得消息为:Hello RabbitMQ!!!2
消费者2获得消息为:Hello RabbitMQ!!!4
消费者2获得消息为:Hello RabbitMQ!!!6
消费者2获得消息为:Hello RabbitMQ!!!8
消费者2获得消息为:Hello RabbitMQ!!!10

3、publish/subscribe发布订阅(共享资源)

在这里插入图片描述

1)、X代表交换机,rabbitMQ 内部组件,erlang 消息产生者是代码完成,代码的执行效率不高,消息产生者将消

息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费。

Exchange 有常见以下 3 种类型:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。

  • Direct:定向,把消息交给符合指定 routing key 的队列。

  • Topic:通配符,把消息交给符合 routing pattern (路由模式)的队列。

Exchange (交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者

没有符合路由规则的队列,那么消息会丢失。

2)相关场景:邮件群发,群聊天,广播(广告)。

3)、代码实现:

消息提供者

package com.example.publishsubscribe;

import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

// 交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。
// 保证,哪一方先运行则在哪一方创建
public class Provider {

    public static void main(String[] args) {
        try {
            //获取连接对象
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失)
            // 1.参数一:交换机名称    参数二:交换机类型
            channel.exchangeDeclare("fanout_exchange", "fanout");
            //通过通道创建队列
            //channel.queueDeclare("queue1",false,false,false,null);
            //向队列中发送消息
            for (int i = 1; i <= 10; i++) {
                channel.basicPublish("fanout_exchange", "", null, ("Hello RabbitMQ!!!" + i).getBytes());
            }
            //断开连接
            channel.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息消费者1

package com.example.publishsubscribe;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {

    public static void main(String[] args) {
        try {
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("fanout_queue1", false, false, false, null);
            //给队列绑定交换机
            channel.queueBind("fanout_queue1", "fanout_exchange", "");
            //监听队列中的消息
            channel.basicConsume("fanout_queue1", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            // channel.close();
            //connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息消费者2

package com.example.publishsubscribe;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer2 {

    public static void main(String[] args) {
        try {
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("fanout_queue2", false, false, false, null);
            //给队列绑定交换机
            channel.queueBind("fanout_queue2", "fanout_exchange", "");
            //监听队列中的消息
            channel.basicConsume("fanout_queue2", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            // channel.close();
            //connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!2
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!4
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!6
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!8
消费者1获得消息为:Hello RabbitMQ!!!9
消费者1获得消息为:Hello RabbitMQ!!!10
消费者2获得消息为:Hello RabbitMQ!!!1
消费者2获得消息为:Hello RabbitMQ!!!2
消费者2获得消息为:Hello RabbitMQ!!!3
消费者2获得消息为:Hello RabbitMQ!!!4
消费者2获得消息为:Hello RabbitMQ!!!5
消费者2获得消息为:Hello RabbitMQ!!!6
消费者2获得消息为:Hello RabbitMQ!!!7
消费者2获得消息为:Hello RabbitMQ!!!8
消费者2获得消息为:Hello RabbitMQ!!!9
消费者2获得消息为:Hello RabbitMQ!!!10

4、routing路由模式

在这里插入图片描述

1)、消息生产者将消息发送给交换机按照路由判断,路由是字符串,当前产生的消息携带路由字符,交换机根据路

由的 key,只能匹配上路由 key 对应的消息队列,对应的消费者才能消费消息。队列与交换机的绑定,不能是任意

绑定了,而是要指定一个 RoutingKey (路由 key)。消息的发送方在向 Exchange 发送消息时,也必须指定消息的

RoutingKey 。Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列

的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息。

2)、根据业务功能定义路由字符串。

3)、从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。

4)、业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可

以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误。

5)、代码实现:

消息提供者

package com.example.souting;

import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

// 交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。
// 保证,哪一方先运行则在哪一方创建

public class Provider {

    public static void main(String[] args) {
        try {
            //获取连接对象
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失)
            // 1.参数一:交换机名称    参数二:交换机类型
            channel.exchangeDeclare("direct_exchange", "direct");
            //向队列中发送消息
            for (int i = 1; i <= 10; i++) {
                channel.basicPublish("direct_exchange",
                        //设置路由键,符合路由键的队列,才能拿到消息
                        "insert",
                        null,
                        ("Hello RabbitMQ!!!" + i).getBytes());
            }
            //断开连接
            channel.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息消费者1

package com.example.souting;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {

    public static void main(String[] args) {
        try {
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("direct_queue1", false, false, false, null);
            //绑定交换机(routingKey:路由键)
            channel.queueBind("direct_queue1", "direct_exchange", "select");
            channel.queueBind("direct_queue1", "direct_exchange", "insert");
            //监听队列中的消息
            channel.basicConsume("direct_queue1", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            // channel.close();
            //connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息消费者2

package com.example.souting;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer2 {
    public static void main(String[] args) {
        try {
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("direct_queue2", false, false, false, null);
            //绑定交换机(routingKey:路由键)
            channel.queueBind("direct_queue2", "direct_exchange", "delete");
            channel.queueBind("direct_queue2", "direct_exchange", "select");
            //监听队列中的消息
            channel.basicConsume("direct_queue2", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            // channel.close();
            //connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!2
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!4
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!6
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!8
消费者1获得消息为:Hello RabbitMQ!!!9
消费者1获得消息为:Hello RabbitMQ!!!10

5、topic 主题模式(路由模式的一种)

在这里插入图片描述

1)、Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型

Exchange 可以让队列在绑定 Routing key 的时候使用通配符。

2)、Routingkey 一般都是有一个或多个单词组成,多个单词之间以 . 分割,例如:item.insert。

通配符规则:

# :匹配一个或多个词

*:匹配不多不少恰好1个词

举例:

item.# :能够匹配item.insert.abc或者item.insert

item.* :只能匹配 item.insert

usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到

#.news ,因此凡是以 .news 结尾的 routing key 都会被匹配

3)、路由功能添加模糊匹配。

4)、消息产生者产生消息,把消息交给交换机。

5)、交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费。

6)、代码实现:

消息提供者

package com.example.topic;


import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

//交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。
// 保证,哪一方先运行则在哪一方创建

public class Provider {

    public static void main(String[] args) {
        try {
            //获取连接对象
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失)   //1.参数一:交换机名称    参数二:交换机类型
            channel.exchangeDeclare("topic_exchange", "topic");
            //向队列中发送消息
            for (int i = 1; i <= 10; i++) {
                channel.basicPublish("topic_exchange",
                        // #:匹配0-n个单词(之间以.区分,两点之间算一个单词,可以匹配hello world空格的情况)   *(匹配一个单词)
                        "emp.hello world",
                        null,
                        ("Hello RabbitMQ!!!" + i).getBytes());
            }
            //断开连接
            channel.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息消费者1

package com.example.topic;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {
    public static void main(String[] args) {
        try {
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("topic_queue1", false, false, false, null);
            //绑定交换机(routingKey:路由键)  #:匹配0-n个单词(之间以.区分,两点之间算一个单词)
            channel.queueBind("topic_queue1", "topic_exchange", "emp.#");
            //监听队列中的消息
            channel.basicConsume("topic_queue1", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            // channel.close();
            //connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息消费者2

package com.example.topic;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer2 {
    public static void main(String[] args) {
        try {
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("topic_queue2", false, false, false, null);
            //绑定交换机(routingKey:路由键)  *:匹配1个单词(之间以.区分,两点之间算一个单词)
            channel.queueBind("topic_queue2", "topic_exchange", "emp.*");
            //监听队列中的消息
            channel.basicConsume("topic_queue2", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            // channel.close();
            //connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!2
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!4
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!6
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!8
消费者1获得消息为:Hello RabbitMQ!!!9
消费者1获得消息为:Hello RabbitMQ!!!10

6、RPC

在这里插入图片描述

RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:

1)、客户端即是生产者也是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。

2)、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果。

3)、服务端将RPC方法 的结果发送到RPC响应队列。

4)、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。

5)、代码实现:

Client端

package com.example.rpc;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class Client {

    public static void main(String[] argv) throws IOException, InterruptedException {
        String message = "Hello World!!!";
        // 建立一个连接和一个通道,并为回调声明一个唯一的回调队列
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 定义一个临时变量的接受队列名
        String replyQueueName = channel.queueDeclare().getQueue();
        // 生成一个唯一的字符串作为回调队列的编号
        String corrId = UUID.randomUUID().toString();
        // 发送请求消息,消息使用了两个属性:replyTo和correlationId
        // 服务端根据replyTo返回结果,客户端根据correlationId判断响应是不是给自己的
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName)
                .build();
        // 发布一个消息,rpc_queue路由规则
        channel.basicPublish("", "rpc_queue", props, message.getBytes("UTF-8"));
        // 由于我们的消费者交易处理是在单独的线程中进行的,因此我们需要在响应到达之前暂停主线程。
        // 这里我们创建的容量为1的阻塞队列ArrayBlockingQueue,因为我们只需要等待一个响应。
        final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
        // String basicConsume(String queue, boolean autoAck, Consumer callback)
        channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                //检查它的correlationId是否是我们所要找的那个
                if (properties.getCorrelationId().equals(corrId)) {
                    //如果是,则响应BlockingQueue
                    response.offer(new String(body, "UTF-8"));
                }
            }
        });
        System.out.println(" 客户端请求的结果:" + response.take());
    }
}

Server端

package com.example.rpc;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Server {

    public static void main(String[] args) {
        Connection connection = null;
        try {
            connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare("rpc_queue", false, false, false, null);
            channel.basicQos(1);
            System.out.println("Awaiting RPC requests:");
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
                            .correlationId(properties.getCorrelationId()).build();
                    String response = "";
                    try {
                        response = new String(body, "UTF-8");
                        System.out.println("response (" + response + ")");
                    } catch (RuntimeException e) {
                        System.out.println("错误信息 " + e.toString());
                    } finally {
                        // 返回处理结果队列
                        channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
                        // 确认消息,已经收到后面参数 multiple:是否批量.true:将一次性确认所有小于envelope.getDeliveryTag()的消息。
                        channel.basicAck(envelope.getDeliveryTag(), false);
                        // RabbitMq consumer worker thread notifies the RPC
                        // server owner thread
                        synchronized (this) {
                            this.notify();
                        }
                    }
                }
            };
            // 取消自动确认
            boolean autoAck = false;
            channel.basicConsume("rpc_queue", autoAck, consumer);
            // Wait and be prepared to consume the message from RPC client.
            while (true) {
                synchronized (consumer) {
                    try {
                        consumer.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
Awaiting RPC requests:
response (Hello World!!!)
response (Hello World!!!)
response (Hello World!!!)

# 客戶端发起3次请求
客户端请求的结果:Hello World!!!
客户端请求的结果:Hello World!!!
客户端请求的结果:Hello World!!!

7、发布订阅模式与工作队列模式的区别

1、工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。

2、发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使

用默认交换机)。

3、发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将

队列绑定到默认的交换机 。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/840820.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深入探究Spring核心模块

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

Flowise AI:用于构建LLM流的拖放UI

推荐&#xff1a;使用NSDT场景编辑器助你快速搭建可二次编辑的3D应用场景 什么是Flowise AI&#xff1f; Flowise AI是一个开源的UI可视化工具&#xff0c;用于帮助开发LangChain应用程序。在我们详细介绍 Flowise AI 之前&#xff0c;让我们快速定义 LangChain。LangChain是…

谈谈如何使用ShellExecute的返回值

之前的一篇文章中&#xff0c;我们讲到了在 16 位 Windows 中&#xff0c;实例句柄(HINSTANCE)唯一标识了一个进程。到了 32 位 Windows&#xff0c;内核得到了完全的重新设计&#xff0c;其中之一是&#xff1a;它引入了 “内核对象” 和 “安全描述符”。 在 16 位 Windows …

vue项目实战-脑图编辑管理系统kitymind百度脑图

前言 项目为前端vue项目&#xff0c;把kitymind百度脑图整合到前端vue项目中&#xff0c;显示了脑图的绘制&#xff0c;编辑&#xff0c;到处为json&#xff0c;png&#xff0c;text等格式的功能 文章末尾有相关的代码链接&#xff0c;代码只包含前端项目&#xff0c;在原始的…

GET和POST的区别,java模拟postman发post请求

目录 一、先说一下get和post1、看一下人畜无害的w3schools怎么说&#xff1a;2、问一下文心你言哥&#xff0c;轻轻松松给你一个标准答案&#xff1a;3、卧槽&#xff0c;懂了&#xff0c;好像又没懂 二、让我们扒下GET和POST的外衣&#xff0c;坦诚相见吧&#xff01;三、我们…

MySQL5.7源码编译Debug版本

编译环境Ubuntu22.04LTS 1 官方下载MySQL源码 https://dev.mysql.com/downloads/mysql/?spma2c6h.12873639.article-detail.4.68e61a14ghILh5 2 安装基础软件 cmakeclangpkg-configperl 参考&#xff1a;https://dev.mysql.com/doc/refman/5.7/en/source-installation-prere…

网络安全进阶学习第十二课——SQL手工注入3(Access数据库)

文章目录 注入流程&#xff1a;1、判断数据库类型2、判断表名3、判断列名4、判断列数1&#xff09;判断显示位 5、判断数据长度6、爆破数据内容 注入流程&#xff1a; 判断数据库类型 ——> 判断表名 ——> 判断列名 ——> 判断列名长度 ——> 查出数据。 asp的网…

Acwing.877 扩展欧几里得算法

题目 给定n对正整数ai , bi&#xff0c;对于每对数&#xff0c;求出一组ai ,g&#xff0c;使其满足ai* xi bi * yi gcd(ai ,bi)。 输入格式 第一行包含整数n。 接下来n行&#xff0c;每行包含两个整数ai , bi。 输出格式 输出共n行&#xff0c;对于每组ai, bi&#xff0c…

hcip的ospf综合实验

题目 拓扑图 IP地址分配及环回 R1 < Huawei>sy Enter system view, return user view with CtrlZ. [Huawei]sysname r1 [r1]int g0/0/0 [r1-GigabitEthernet0/0/0]ip add 172.16.0.1 27 Aug 6 2023 19:03:33-08:00 r1 %%01IFNET/4/LINK_STATE(l)[0]:The line protocol I…

第3章 语言基础

引言 任何语言的核心所描述的都是这门语言在最基本的层面上如何工作&#xff0c;涉及语法、操作符、数据类型以及内置功能&#xff0c;在此基础之上才可以构建复杂的解决方案 本章接下来的内容主要基于ECMAScript第6版。ES6 语法 js的语法借鉴了c/c&#xff0c;java。js是相对…

如何在PCB设计过程中处理好散热

在现代高性能电子设备中&#xff0c;散热是一个常见而重要的问题。正确处理散热问题对于确保电子设备的可靠性、稳定性和寿命至关重要。 下面将介绍在PCB设计过程中处理散热问题的方法和技巧&#xff0c;以帮助大家提高设计质量和性能。 首先&#xff0c;在处理散热问题之前&…

项目部署(前后端分离)

1、前端项目 &#xff08;打包成dist文件,放到nginx的html目录下面&#xff09;&#xff0c;然后配置nginx 2、后端项目部署 使用之前的shell脚本&#xff08;然后赋予用户权限&#xff09;&#xff0c;最后运行脚本 查看进程

Flutter编译一直显示Running Gradle task ‘assembleDebug‘

&#x1f525; 目前开发的Android Studio版本 &#x1f525; &#x1f525; 当前Flutter SDK 版本 &#x1f525; Flutter 3.10.6 • channel stable • https://github.com/flutter/flutter.git Framework • revision f468f3366c (3 周前) • 2023-07-12 15:19:05 -0700 Eng…

算法与数据结构-哈希表

文章目录 什么是散列表散列函数的设计原则散列冲突的解决办法1. 开放寻址法2. 链表法 什么是散列表 散列表用的是数组支持按照下标随机访问数据的特性&#xff0c;所以散列表其实就是数组的一种扩展&#xff0c;由数组演化而来。可以说&#xff0c;如果没有数组&#xff0c;就…

网络安全预警分类流程

网络安全预警指南 随着信息技术的广泛应用与快速发展&#xff0c;传统业务与信息系统的融合程度不断加深&#xff0c;网络安全对国家政治、经济、文化、公共服务活动的影响进一步增大。网络安全形势日趋复杂&#xff0c;安全威胁不断变化&#xff0c;利用网络漏洞、恶意程序从…

HIVE语法优化之Join优化

桶用两表关联字段,MapJoin时需要将小表填入内存,这时候,分桶就起到了作用 一个stage阶段代表一个mr执行,好几个MR,会吧每一个MR的结果都压缩 Mysql 慢查询 如果sql语句执行超过指定时间,定义该sql为慢查询,存储日志, 查问题: SQL日志,模拟慢SQL 然后查询执行计划 分组聚合 就…

使用webpack建立React+TS项目

之前写过类似的文章&#xff0c;这次看到一本新书里也介绍了这个知识点&#xff0c;故尝试之。 Refer: 《Learn React With TypeScript - A Beginners Guide To Reactive Web Development With React 18 and TypeScript》chapter3 Creating a project with webpack 1.先建立一…

Mysql主从复制-主库/从库

介绍 mysql的主从复制是一个异步的复制过程&#xff0c;底层是基于Mysql数据库自带的二进制日志功能&#xff0c;就是一台或多台数据库&#xff08;slave,从库&#xff09;从另一台MYSQL数据库&#xff08;master,主库&#xff09;进行日志的复制然后再解析并应用到自己&#…

小程序wx:else提示 Bad attr `wx

问题&#xff1a;以下wx:for里的wx:if &#xff0c; wx:else 会报这个错&#xff1a;Bad attr wx <scroll-view class"scroll1" scroll-x enable-flex"true"><view wx:if"{{playlist.length>0}}" class"item" wx:for"…

2023.8.7论文阅读

文章目录 CMUNeXt: An Efficient Medical Image Segmentation Network based on Large Kernel and Skip Fusion摘要本文方法实验结果 Boundary Difference Over Union Loss For Medical Image Segmentation&#xff08;损失函数&#xff09;摘要本文方法实验结果 CMUNeXt: An E…