RabbitMQ练习(Remote procedure call (RPC))

news2025/1/22 19:32:51

 1、RabbitMQ教程

《RabbitMQ Tutorials》icon-default.png?t=N7T8https://www.rabbitmq.com/tutorials

2、环境准备

参考:《RabbitMQ练习(Hello World)》。

确保RabbitMQ、Sender、Receiver容器正常安装和启动。

root@k0test1:~# docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
root@k0test1:~# docker start sender
root@k0test1:~# docker start receiver
root@k0test1:~# docker network inspect bridge

网络拓扑:

 

3、RPC练习

3.1 概述

前面练习了如何使用工作队列(Work queues)来在多个worker之间分配耗时的任务(time-consuming tasks)

但是,如果我们需要在远程计算机上运行一个函数并等待结果呢?那是一个不同的故事。这种模式通常被称为远程过程调用,或RPC。

在本练习中,我们将使用RabbitMQ来构建一个RPC系统:一个客户端和一个可扩展的RPC服务器。由于我们没有任何值得分配的耗时任务,我们将创建一个返回斐波那契数(Fibonacci numbers)虚拟RPC服务(dummy RPC service)

3.2 Client interface

为了展示如何使用RPC服务,将创建一个简单的客户端类(a simple client class)。这个类将暴露一个名为call的方法,该方法发送一个RPC请求,并阻塞直到收到响应。(原文:It's going to expose a method named call which sends an RPC request and blocks until the answer is received)

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print(f"fib(4) is {result}")

这段代码展示了如何使用一个RPC客户端来调用远程的斐波那契数计算服务。以下是代码的详细解释:

  1. fibonacci_rpc = FibonacciRpcClient():这行代码创建了一个FibonacciRpcClient的实例。FibonacciRpcClient是一个RPC客户端,用于发送请求到RPC服务器,并接收响应。

  2. result = fibonacci_rpc.call(4):这行代码调用了RPC客户端的call方法,并将数字4作为参数传递给服务器。服务器将计算斐波那契数列中的第4个数,并将结果返回给客户端。

  3. print(f"fib(4) is {result}"):这行代码打印出计算结果。格式化字符串"fib(4) is {result}"将计算得到的斐波那契数显示出来。

在这个例子中,客户端发送了一个请求到RPC服务器,请求计算斐波那契数列的第4个数,然后服务器返回了结果,客户端接收并打印了这个结果。这是一个典型的RPC模式的应用,允许客户端像调用本地函数一样调用远程服务。

 3.3 Callback queue

一般来说,通过RabbitMQ实现远程过程调用(RPC)是很简单的。客户端发送一个请求消息,服务器用一个响应消息来回复。为了能够接收到响应,客户端需要在请求中发送一个“回调”队列地址(原文:In order to receive a response the client needs to send a 'callback' queue address with the request.)。

result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)

# ... and some code to read a response message from the callback_queue ...

这段Python代码是使用RabbitMQ实现RPC(远程过程调用)的一个示例。下面是代码的中文解释:

  1. result = channel.queue_declare(queue='', exclusive=True):这行代码声明了一个私有的队列。queue_declare方法用于声明一个新的队列,如果队列不存在,则创建它。queue=''表示让RabbitMQ自动生成一个唯一的队列名称。exclusive=True表示这个队列是私有的,只对声明它的连接可见。

  2. callback_queue = result.method.queue:这行代码将声明的队列名称赋值给callback_queue变量,这个队列将被用作回调队列,以便服务器可以发送响应到这个队列。

  3. channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties(reply_to=callback_queue,), body=request):这行代码发布一个消息到RabbitMQ。exchange参数为空字符串,表示使用默认的交换机。routing_key='rpc_queue'表示消息将发送到名为rpc_queue的队列。properties参数是一个BasicProperties对象,其中reply_to属性设置为callback_queue,这告诉服务器将响应消息发送到这个回调队列。body参数是实际发送的消息体,这里假设是request变量。

  4. 注释# ... and some code to read a response message from the callback_queue ...:这部分代码应该包含从callback_queue读取响应消息的逻辑,但在这个示例中没有给出。

这段代码展示了客户端如何发送一个RPC请求,并设置回调队列以便接收服务器的响应。服务器需要监听rpc_queue队列,处理请求,并将响应消息发送到客户端指定的callback_queue

AMQP 0-9-1 协议预定义了一组共14个属性,这些属性与消息相关。除了以下几个常用的属性外,大多数属性很少使用:

  1. delivery_mode:标记消息是持久的(值为2)还是瞬时的(其他任何值)。你可能还记得这个属性是从第二个教程中学到的。
  2. content_type:用于描述消息编码的MIME类型。例如,对于常用的JSON编码,将此属性设置为application/json是一个好习惯。
  3. reply_to:通常用于命名一个回调队列。
  4. correlation_id:用于将RPC响应与请求相关联,非常有用。

这些属性在RabbitMQ中非常有用,因为它们允许你控制消息的行为,以及如何在不同的应用程序组件之间进行通信。例如,delivery_mode 可以帮助确保消息在RabbitMQ服务器重启后仍然可用,而 reply_tocorrelation_id 使得RPC模式的实现成为可能,允许服务器将响应发送回正确的客户端。content_type 则有助于接收方理解消息的内容格式,从而正确地解析消息。

3.4 Correlation id

在上述介绍的方法中,我们建议为每个RPC请求创建一个回调队列。这种方法效率不高,但幸运的是有更好的解决方案——让我们为每个客户端创建一个单一的回调队列。

这引发了一个新的问题:在队列中收到响应时,不清楚该响应属于哪个请求。这时,我们使用correlation_id属性。我们将为每个请求设置一个唯一的值。之后,当我们在回调队列中收到消息时,我们会查看这个属性,并基于它将响应与请求匹配起来。如果我们看到一个未知的correlation_id值,我们可以安全地丢弃该消息——它不属于我们的请求。

你可能会问,为什么我们应该忽略回调队列中的未知消息,而不是以错误失败?这是由于服务器端可能存在竞争条件。虽然不太可能,但RPC服务器可能在我们收到答案后但在发送请求确认消息之前就崩溃了。如果发生这种情况,重启的RPC服务器将再次处理请求。这就是为什么在客户端我们必须优雅地处理重复的响应,而且RPC应该是幂等的(原文: That's why on the client we must handle the duplicate responses gracefully, and the RPC should ideally be idempotent.)。

RPC(远程过程调用)应该是幂等的,意味着对于同一个请求,无论服务器端接收到多少次相同的RPC调用,其结果都应该是一致的,不会对系统状态产生额外的影响。换句话说,即使一个请求被重复发送多次,它也只会产生一次实际的效应或结果。

幂等性在分布式系统中非常重要,特别是在网络通信和服务器处理中可能存在延迟或重试的情况下。如果RPC调用不是幂等的,那么重复的请求可能会导致数据不一致或系统状态错误。例如,如果一个RPC调用是用来增加某个计数器的值,幂等性保证了无论这个调用被执行多少次,计数器的最终值都只会增加一次。

在实际应用中,确保RPC调用的幂等性可以通过以下方式实现:

  1. 唯一标识符:为每个请求分配一个唯一的ID,这样即使请求被重复发送,服务器也可以通过这个ID识别并避免重复处理。
  2. 状态检查:在执行操作之前,服务器可以检查请求是否已经被处理过,如果已经处理,则不再执行。
  3. 事务控制:使用数据库事务或其他形式的事务控制来确保操作的原子性,即要么完全执行,要么完全不执行,从而避免部分执行导致的问题。
  4. 重试机制:设计客户端时,可以包含重试逻辑,但要确保重试不会对系统状态产生负面影响。

幂等性是分布式系统设计中的一个关键原则,有助于提高系统的健壮性和可靠性。

3.5 Summary 

我们的RPC将按照以下方式工作:

  • 当客户端启动时,它会创建一个匿名的专用回调队列(an anonymous exclusive callback queue)
  • 对于RPC请求,客户端发送一个带有两个属性的消息:reply_to,设置为回调队列,correlation_id,为每个请求设置一个唯一值。
  • 请求被发送到一个名为rpc_queue的队列。
  • RPC工作器(RPC worker)(也就是服务器)在该队列上等待请求。当出现请求时,它执行任务,并将结果消息发送回客户端,使用的队列来自reply_to字段。
  • 客户端在回调队列上等待数据。当出现一条消息时,它会检查correlation_id属性。如果它与请求中的值匹配,它将响应返回给应用程序。

 3.6 代码说明

3.6.1 rpc_server.py

进入receiver容器,vi编写rpc_server.py:

root@receiver:/# vi rpc_server.py
root@receiver:/# cat rpc_server.py 
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='172.17.0.2'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)

def on_request(ch, method, props, body):
    n = int(body)

    print(f" [.] fib({n})")
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")
channel.start_consuming()
root@receiver:/# 

这段代码是一个使用Python编写的RabbitMQ RPC(远程过程调用)服务器的实现。它使用pika库与RabbitMQ进行交互。下面是对代码中每个部分的详细说明:

  1. import pika:导入pika库,这是一个Python客户端库,用于与RabbitMQ消息代理进行通信。

  2. 创建RabbitMQ连接:

    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='172.17.0.2'))

    这行代码创建了一个到RabbitMQ服务器的阻塞式连接。host='172.17.0.2'指定了RabbitMQ服务器的IP地址。

  3. 创建通信通道:

    channel = connection.channel()

    通过连接创建了一个通信通道,用于发送和接收消息。

  4. 声明队列:

    channel.queue_declare(queue='rpc_queue')

    声明了一个名为rpc_queue的队列。如果该队列不存在,则会被创建。

  5. 定义斐波那契函数:

    def fib(n):
        if n == 0:
            return 0
        elif n == 1:
            return 1
        else:
            return fib(n - 1) + fib(n - 2)

    这是一个递归函数,用于计算斐波那契数列的第n项。

  6. 定义请求处理回调函数:

    def on_request(ch, method, props, body):
        n = int(body)
        print(f" [.] fib({n})")
        response = fib(n)
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,
                         properties=pika.BasicProperties(correlation_id=props.correlation_id),
                         body=str(response))
        ch.basic_ack(delivery_tag=method.delivery_tag)

    这个函数是当队列rpc_queue接收到消息时会被调用的回调函数。它执行以下操作:

    • 将消息体(body)转换为整数n
    • 调用fib函数计算斐波那契数。
    • 打印正在处理的请求。
    • 使用basic_publish方法将响应消息发送到客户端指定的回复队列(reply_to)。
    • 使用basic_ack方法确认消息已被处理。
  7. 设置QoS(服务质量):

    channel.basic_qos(prefetch_count=1)

    这行代码设置了预取数量为1,意味着每次只有一个消息被分派给消费者,直到消费者确认该消息。

  8. 设置消息消费:

    channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

    这行代码设置了消息消费,指定了队列rpc_queue和消息处理回调函数on_request

  9. 打印等待RPC请求的消息:

    print(" [x] Awaiting RPC requests")
  10. 开始消费消息:

    channel.start_consuming()

    这行代码启动了消息通道的消费过程,使得服务器开始监听和处理消息。

整个代码的目的是创建一个RPC服务器,它监听rpc_queue队列中的请求,对每个请求计算斐波那契数,并将结果发送回请求者。这个服务器是阻塞式的,意味着它会一直运行,直到被显式地停止或发生错误。

以下具体解释on_request是怎么被调用的:

on_request 函数被调用的过程涉及到RabbitMQ的RPC(远程过程调用)模式和消息确认机制。以下是详细步骤:

  1. 客户端发送RPC请求:客户端向RabbitMQ发送一个RPC请求消息,这个消息会被发送到指定的队列(在这个例子中是'rpc_queue')。

  2. 消息到达队列:RabbitMQ将客户端发送的消息放入'rpc_queue'队列中。

  3. 服务端监听队列:服务端(即上述代码)通过channel.basic_consume方法设置了一个监听器,指定了队列'rpc_queue'和消息处理回调函数on_request

  4. 消息被消费:当RabbitMQ中的'rpc_queue'队列接收到新消息时,它会通知服务端。

  5. 回调函数被触发:服务端收到通知后,调用on_request函数。这个函数接收四个参数:

    • ch:当前的通道对象。
    • method:描述了消息的接收方法。
    • props:包含了消息的属性,如reply_to(客户端期望接收响应的队列名称)和correlation_id(用于匹配请求和响应的相关ID)。
    • body:消息体,包含了客户端发送的数据。
  6. 处理请求on_request函数内部,首先将消息体body转换为整数n,然后调用fib函数计算斐波那契数。

  7. 发送响应:计算完成后,使用ch.basic_publish方法将计算结果发送回客户端。这里会使用reply_to属性指定的路由键,以及correlation_id来确保响应能够正确匹配到原始请求。

  8. 确认消息:最后,通过ch.basic_ack方法确认消息已被处理,这样RabbitMQ就知道可以移除这条消息了。

整个过程是异步的,服务端在channel.start_consuming()调用后进入等待状态,一旦有消息到达,就会触发on_request函数的执行。

服务器代码相当直接明了:

首先,我们建立连接并声明队列rpc_queue。 我们声明了斐波那契函数。它假设输入的是有效的正整数。(不要指望这个函数能处理大数字,这可能是最慢的递归实现方式)。 我们为basic_consume声明了一个回调函数on_request,这是RPC服务器的核心。当收到请求时,它会被执行。它完成工作并将响应发送回去。 我们可能想要运行多个服务器进程。为了将负载平均分配给多个服务器,我们需要设置prefetch_count参数。

3.6.2 rpc_client.py

进入sender容器,vi编写rpc_client.py:

root@sender:/# vi rpc_client.py
root@sender:/# cat rpc_client.py 
import pika
import uuid


class FibonacciRpcClient(object):

    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='172.17.0.2'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True)

        self.response = None
        self.corr_id = None

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=str(n))
        while self.response is None:
            self.connection.process_data_events(time_limit=None)
        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(f" [.] Got {response}")
root@sender:/# 

这段代码是一个使用Python编写的RabbitMQ RPC(远程过程调用)客户端的实现。它使用pika库与RabbitMQ进行交互。下面是对代码中每个部分的详细说明:

  1. import pikaimport uuid:导入pika库用于与RabbitMQ进行通信,导入uuid库用于生成唯一的标识符。

  2. 定义FibonacciRpcClient类:

    class FibonacciRpcClient(object):

    这个类封装了RPC客户端的功能。

  3. 类初始化方法__init__

    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='172.17.0.2'))
        self.channel = self.connection.channel()
    • 创建一个到RabbitMQ服务器的阻塞式连接,服务器的IP地址是172.17.0.2
    • 通过连接创建了一个通信通道。
  4. 声明一个独占队列:

    result = self.channel.queue_declare(queue='', exclusive=True)
    self.callback_queue = result.method.queue
    • 使用queue_declare声明一个独占队列,这个队列在连接关闭时会被删除。
    • 将声明的队列名称存储在self.callback_queue中,用于接收服务器的响应。
  5. 设置消息消费:

    self.channel.basic_consume(
        queue=self.callback_queue,
        on_message_callback=self.on_response,
        auto_ack=True)
    • 使用basic_consume设置消息消费,指定队列、消息处理回调函数on_response和自动确认消息。
  6. 定义on_response方法:

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body
    • 这个方法是当客户端接收到响应时会被调用的回调函数。
    • 检查响应的消息属性中的correlation_id是否与请求时设置的correlation_id相匹配,如果匹配,则将响应的消息体存储在self.response中。
  7. 定义call方法:

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=str(n))
        while self.response is None:
            self.connection.process_data_events(time_limit=None)
        return int(self.response)
    • 这个方法用于发送RPC请求并等待响应。
    • 首先,将self.response设置为None,生成一个新的correlation_id
    • 使用basic_publish方法发送RPC请求,指定交换机、路由键、消息属性(包括回复队列和相关ID)和消息体。
    • 通过循环等待响应,使用process_data_events方法处理数据事件,直到收到响应。
    • 收到响应后,将响应的消息体转换为整数并返回。
  8. 创建FibonacciRpcClient实例并发送请求:

    fibonacci_rpc = FibonacciRpcClient()
    print(" [x] Requesting fib(30)")
    response = fibonacci_rpc.call(30)
    print(f" [.] Got {response}")
    • 创建FibonacciRpcClient类的实例。
    • 发送请求计算斐波那契数列的第30项,并打印响应结果。

整个代码的目的是创建一个RPC客户端,它发送请求到RabbitMQ服务器,等待服务器处理请求并返回结果。这个客户端是阻塞式的,意味着它会一直等待直到收到响应。

客户端代码稍微复杂一些:

  1. 我们建立一个连接,创建一个通道(channel),并声明一个用于接收响应的独占回调队列。
  2. 我们订阅(basic_consume)这个回调队列,以便我们能够接收RPC响应。
  3. on_response回调函数在每次响应时执行一个非常简单的任务,对于每个响应消息,它检查correlation_id是否是我们正在寻找的那个。如果是,它将响应保存在self.response中,并中断消费循环。
  4. 接下来,我们定义了主要的call方法——它执行实际的RPC请求。
  5. call方法中,我们生成一个唯一的correlation_id并保存它——on_response回调函数将使用这个值来捕获适当的响应。
  6. 同样在call方法中,我们发布了请求消息(basic_publish),附带两个属性:reply_tocorrelation_id
  7. 最后,我们等待适当的响应到达,并将响应返回给用户。

3.7 开始测试

 1、启动RPC服务器(Server)

root@receiver:/# python3 rpc_server.py
 [x] Awaiting RPC requests

 2、查看mq server信息

root@5d37b5b451ba:/# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages
rpc_queue       0

root@5d37b5b451ba:/# rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
name    type
amq.topic       topic
amq.fanout      fanout
amq.direct      direct
amq.headers     headers
amq.match       headers
amq.rabbitmq.trace      topic
        direct
root@5d37b5b451ba:/# rabbitmqctl list_bindings
Listing bindings for vhost /...
source_name     source_kind     destination_name        destination_kind        routing_key     arguments
        exchange        rpc_queue       queue   rpc_queue       []
root@5d37b5b451ba:/# 

3、启动RPC客户端(Client),验证结果

root@sender:/# python3 rpc_client.py
 [x] Requesting fib(30) <--观察容器终端输出,确认客户端已发送请求并正在等待响应。
 [.] Got 832040  <--客户端将等待服务器处理请求并返回结果。一旦服务器处理完毕,客户端将接收响应并显示结果。
root@sender:/# 

4、服务器端显示内容

root@receiver:/# python3 rpc_server.py
 [x] Awaiting RPC requests
 [.] fib(30)

5、一些说明

展示的设计并非RPC服务的唯一可能实现方式,但它有一些重要的优点:

  1. 如果RPC服务器运行太慢,你可以通过简单地运行另一个服务器来扩展。尝试在新的控制台中运行第二个rpc_server.py

  2. 在客户端,RPC只需要发送和接收一条消息。不需要像queue_declare这样的同步调用。因此,对于单个RPC请求,RPC客户端只需要一个网络往返。

尽管如此,我们的代码仍然相当简单,并没有尝试解决更复杂(但重要)的问题,比如:

  • 如果没有服务器运行,客户端应该如何反应?
  • 客户端是否应该有RPC的某种超时机制?
  • 如果服务器出现故障并引发异常,是否应该将异常转发给客户端?
  • 在处理之前,如何保护系统免受无效输入消息的影响(例如,检查边界)?

这些问题都是构建健壮的RPC服务时需要考虑的实际问题,它们涉及到错误处理、超时机制、异常处理和安全验证等方面。

4、Wireshark抓包

4.1 抓包方式

参考:《RabbitMQ练习(Hello World)》

4.2 抓包信息

典型数据包:

1、rpc client发送rpc请求时,携带reply_to和correlation_id:

 

Frame 47: 156 bytes on wire (1248 bits), 156 bytes captured (1248 bits) on interface docker0, id 0
Ethernet II, Src: 02:42:ac:11:00:03 (02:42:ac:11:00:03), Dst: 02:42:ac:11:00:02 (02:42:ac:11:00:02)
Internet Protocol Version 4, Src: 172.17.0.3 (172.17.0.3), Dst: 172.17.0.2 (172.17.0.2)
Transmission Control Protocol, Src Port: 52506, Dst Port: 5672, Seq: 492, Ack: 673, Len: 90
Advanced Message Queuing Protocol
    Type: Content header (2)
    Channel: 1
    Length: 82
    Class ID: Basic (60)
    Weight: 0
    Body size: 2
    Property flags: 0x0600
    Properties
        Correlation-Id: c1e5f310-0996-45fb-a3d8-bf2081cd891c
        Reply-To: amq.gen-p169XGGXAjsVdvgeJjMlOw

这段信息描述的是AMQP(Advanced Message Queuing Protocol,高级消息队列协议)中的一个消息头(Content header),它包含了消息的元数据和属性。下面是对这些字段的解释:

  1. Type: 消息头的类型,这里指的是内容头(Content header)。

  2. Channel: 消息发送的通道号,这里是1。

  3. Length: 消息头的长度,这里是82字节。

  4. Class ID: 消息所属的类标识符,Basic (60)表示这是一个基本类消息。

  5. Weight: 消息的优先级权重,这里设置为0,表示没有特别的优先级。

  6. Body size: 消息体的大小,这里是2字节。

  7. Property flags: 属性标志,这里是0x0600,表示设置了某些属性。

  8. Properties:

    • Correlation-Idc1e5f310-0996-45fb-a3d8-bf2081cd891c,这是消息的唯一标识符,用于将响应与请求关联起来。
    • Reply-Toamq.gen-p169XGGXAjsVdvgeJjMlOw,这是回复队列的名称,当消费者处理完消息后,会将响应发送到这个队列。

AMQP是一种提供高度可靠的异步消息传递协议,广泛应用于分布式系统中。消息头中的这些属性使得消息传递更加灵活和可靠,例如,通过Correlation-Id可以将请求和响应正确地匹配起来,而Reply-To属性则允许消费者指定一个队列来接收响应消息。

4.3 流量图

 5、小结

这篇文章是 RabbitMQ 官方教程的第六部分的练习,主题是关于如何使用 Python 客户端库 Pika 实现远程过程调用(RPC)。以下是主要内容小结:

1、预备条件

  • 假设 RabbitMQ 已经安装并运行在标准端口(5672)上。
  • 如果使用不同的主机、端口或凭据,需要调整连接设置。
  • 使用 Pika 客户端库版本 1.0.0。

2、RPC 客户端接口

  • 教程提供了一个简单的 RPC 客户端类,它公开了一个名为 call 的方法,用于发送 RPC 请求并阻塞等待响应。

3、RPC 的注意事项

  • RPC 是计算中常见的模式,但也常受到批评,因为它可能导致系统不可预测和调试复杂化。
  • 建议明确哪些函数调用是本地的,哪些是远程的,并且文档化系统,处理错误情况。

4、回调队列

  • 客户端发送请求消息时,需要提供一个“回调”队列地址,以便接收响应。
  • 教程中展示了如何声明一个独占的回调队列,并使用它来接收响应。

5、correlation_id

  • 为了匹配请求和响应,教程介绍了 correlation_id 属性的使用。每个请求都被赋予一个唯一的 correlation_id
  • 如果客户端在回调队列中收到一个未知的 correlation_id,它应该安全地丢弃该消息,因为这表示该响应不属于客户端的任何请求。

6、RPC 工作流程

  • 客户端启动时创建一个匿名的独占回调队列。
  • 客户端发送带有 reply_to 和 correlation_id 属性的 RPC 请求消息。
  • RPC 工作者(服务器)在 rpc_queue 队列上等待请求,处理请求后将结果消息发送回客户端。
  • 客户端在回调队列上等待数据,当消息出现时,检查 correlation_id 属性,如果匹配,则将响应返回给应用程序。

7、服务器和客户端代码

  • 教程提供了 rpc_server.py 和 rpc_client.py 的示例代码,展示了如何实现 RPC 服务器和客户端。

8、扩展性和问题

  • 如果 RPC 服务器运行缓慢,可以通过运行另一个服务器实例来扩展。
  • RPC 客户端只需要一次网络往返即可完成单个 RPC 请求。
  • 代码示例相对简单,没有解决一些更复杂但重要的问题,例如服务器不可用时客户端的反应、RPC 超时、服务器异常的处理以及对无效输入消息的保护。

9、实验和资源

  • 教程建议使用 RabbitMQ 管理 UI 来查看队列,这可能对实验很有帮助。

这个教程为读者提供了一个使用 RabbitMQ 和 Python 实现 RPC 机制的实用指南,包括代码示例和一些最佳实践建议。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2092190.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

前端内存泄露案例与解决方案

什么是内存泄漏&#xff1f; 内存泄露&#xff08;Memory Leaks&#xff09;&#xff1a;是指应用程序已经不再需要的内存&#xff0c;由于某种原因未返回给操作系统或者空闲内存池&#xff08;Pool of Free Memory&#xff09;。 内存泄露可能带来的问题&#xff1a;变慢、卡…

SAP LE学习笔记07 - MM与WM跨模块收货到仓库的流程中 如何实现 先上架再入库

上一章讲了LE中收货的一些特殊情况&#xff1a; 1&#xff0c;MM模块收货时&#xff0c;特别移动指标来标识的物料直接产生TO 2&#xff0c;MM中直接收货到仓库的固定Storage Bin(棚番)上 SAP LE学习笔记06 - MM与WM跨模块收货到仓库的流程中 带特别移动指标的物料也可以直接…

spring security 会话管理

一、简介 当浏览器调用登录接口登录成功后&#xff0c;服务端会和浏览器之间建立一个会话(Session)浏览器在每次发送请求时都会携带一个 Sessionld&#xff0c;服务端则根据这个 Sessionld 来判断用户身份当浏览器关闭后&#xff0c;服务端的 Session 并不会自动销毁&#xff0…

结构型设计模式-适配器(adapter)模式-python实现

设计模式汇总&#xff1a;查看 通俗示例 想象一下&#xff0c;你刚从国外带回一台最新的笔记本电脑&#xff0c;但是你发现它的电源插头是德标插头&#xff0c;而家里的电源插座是中式插座&#xff0c;这时怎么办呢&#xff1f;你需要一个电源适配器来将德标插头转换成中式插座…

“萌宠经济”全球化浪潮:宠物品牌如何利用TikTok达人破局出海

在全球“萌宠经济”不断升温的背景下&#xff0c;宠物品牌出海成为了重要的战略。随着市场的增长和消费者对宠物产品的需求增加&#xff0c;品牌需要寻找有效的方式进入新的海外市场。在这种情况下&#xff0c;TikTok平台的崛起和宠物达人的影响力成为了宠物品牌破局出海的关键…

数据结构与算法(快速基础C++版)

数据结构与算法&#xff08;快速基础C版&#xff09; 1. 基本概念第1章 绪论1.1 数据结构的研究内容1.2 基本概念和术语1.2.1 数据、数据元素、数据项和数据对象1.2.2 数据结构1.2.3 数据类型和抽象数据类型1.2.4 概念小结 1.3 算法和算法分析1.4 总结 2. 基本的数据结构第2章 …

【PyTorch常用库函数】一文教你快速上手torch.abs()函数:获取张量的绝对值

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《C干货基地》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 引言 在深度学习领域&#xff0c;PyTorch是一个非常受欢迎的框架&#xff0c;它提供了丰富的库函数来支持各种复杂的计算任务。…

利用Leaflet.js创建交互式地图:多种形状单个区域绘制

引言 在地图应用开发中&#xff0c;用户经常需要对特定区域进行标识和规划。本文将深入探讨如何利用Vue.js的响应式特性与Leaflet.js的地图功能&#xff0c;打造一个支持多边形、矩形、圆形等多种形状绘制的交互式地图编辑器。 功能亮点 自由绘制多边形&#xff1a;用户可以自…

mysql基础语法——个人笔记

0 前言 以前学习且实践过mysql&#xff0c;但后来用得少&#xff0c;随着岁月更替&#xff0c;对其印象渐浅&#xff0c;所以每次需要用时&#xff0c;都会去再看一眼语法规范&#xff0c;然后才能放心动手操作 然而&#xff0c;在信息爆炸的时代&#xff0c;查语法规范时&am…

BUUCTF PWN wp--jarvisoj_level0

第一步 checksec &#xff0c;该题为64位。 分析一下二进制保护机制&#xff1a; Arch: amd64-64-little 这个字段表示二进制程序的架构是 64 位的小端序的 x86-64 架构。小端序意味着低位字节存储在内存中的低地址上&#xff0c;高位字节存储在高地址上。RELRO: No RELRO …

迁移学习之领域自适应(domain adaptation)

比如有一堆有标注的训练数据&#xff0c;这些数 据来自源领域&#xff0c;用这些数据训练出一个模型&#xff0c;这个模型可以用在不一样的领域。在训练的时 候&#xff0c;我们必须要对测试数据所在的目标领域有一些了解。 随着了解的程度不同&#xff0c;领域自适应的方法也不…

(C++ STL)vector类的简单模拟实现与源码展示

vector类的简单模拟实现 一、前言二、vector 的成员变量三、vector 部分函数实现size、capacityreserveresizeinsert 与注意事项erase构造、析构、赋值拷贝 四、vector 源代码 以下代码环境为 VS2022 C。 一、前言 vector类 本质上就是数据结构中的顺序表。(可参考&#xff1…

【最新华为OD机试E卷】boos的收入(100分)-多语言题解-(Python/C/JavaScript/Java/Cpp)

🍭 大家好这里是春秋招笔试突围 ,一枚热爱算法的程序员 ✨ 本系列打算持续跟新华为OD-E/D卷的三语言AC题解 💻 ACM金牌🏅️团队| 多次AK大厂笔试 | 编程一对一辅导 👏 感谢大家的订阅➕ 和 喜欢💗 🍿 最新华为OD机试D卷目录,全、新、准,题目覆盖率达 95% 以上,…

4.负载均衡

文章目录 1.多级部署2.实现请求计数器3.负载均衡3.1服务端负载均衡3.2客户端负载均衡3.3自定义负载均衡3.4负载均衡策略3.5 LoadBalance 原理 4.部署实现 大家好&#xff0c;我是晓星航。今天为大家带来的是 负载均衡 相关的讲解&#xff01;&#x1f600; 1.多级部署 复制一…

C语言 | Leetcode C语言题解之第378题有序矩阵中第K小的元素

题目&#xff1a; 题解&#xff1a; bool check(int **matrix, int mid, int k, int n) {int i n - 1;int j 0;int num 0;while (i > 0 && j < n) {if (matrix[i][j] < mid) {num i 1;j;} else {i--;}}return num > k; }int kthSmallest(int **matri…

面试题小总结

一、为什么要使用Redis&#xff1f; 因为它是内存数据库&#xff0c;运行速度快因为它的工作线程是单线程&#xff0c;具有串行化&#xff0c;原子性具有IO模型&#xff0c;天生支撑高并发是kv模型&#xff0c;v具有多个数据结构具有本地方法&#xff0c;可以计算数据移动是二…

Mac用户必备:轻松添加Git SSH密钥全攻略

最近新买了一台MacBook笔记本&#xff0c;然后安装了git&#xff0c;准备下载代码&#xff0c;正好遇到配置GitHub的ssh密钥&#xff0c;记录一下整个操作流程。 操作步骤 在Mac上添加Git SSH密钥的步骤如下&#xff1a; 检查是否已有SSH密钥&#xff1a; 打开终端&#xff0…

Nginx: https解决安全问题

https原理 1 &#xff09;http协议存在的问题 数据使用明文传输&#xff0c;可能被黑客窃取 (需要信息加密)报文的完整性无法验证&#xff0c;可能被黑客篡改 (需要完整性校验)无法验证通信双方的身份&#xff0c;可能被黑客伪装 (需要身份认证) 2 ) https 原理 所谓 https,…

新160个crackme - 043-riijj_cm_20041121

运行分析 除了主程序还有一个dll文件&#xff0c;应该是要加载pf1.dll这个动态链接库运行主程序&#xff0c;需破解Name和Serial&#xff0c;点击注册无反应 PE分析 C程序&#xff0c;32位&#xff0c;无壳 静态分析&动态调试 尝试ida动调加载动态链接库pf1.dll&#xff0c…

全能型AI“草莓”:未来趋势还是市场泡沫?

你好&#xff0c;我是三桥君 近日&#xff0c;OpenAI宣布将在秋季推出代号为“草莓”的新AI模型。这一消息迅速引起了科技界和市场的广泛关注。 OpenAI的新项目“草莓”&#xff08;Strawberry&#xff09;是一个备受关注的人工智能模型&#xff0c;预计将在今年秋季发布。这个…