问题起源:
某产品灰度上线后,从pulsar服务端监控发现存在一种现象:消费但未ack的信息不断增加,直到3000左右就稳定下来了且消费速度为0,但不清楚这3000是怎么来的,因为代码是消费到立马ack的;
msg = self.consumer.receive()
self.consumer.acknowledge(msg)
根因定位:
查python的pulsar说明文档:https://pulsar.apache.org/api/python/2.10.x/pulsar.html#Client.subscribe
GPT4的回答:
在Python中,receiver_queue_size参数通常与消息队列或者异步处理场景中出现。具体到Apache Pulsar这种分布式消息传递和流处理平台,receiver_queue_size参数在消费者(Consumer)的配置中定义了客户端用于接收来自Pulsar broker的消息的队列的大小。
receiver_queue_size的作用如下:
-
消息缓冲:这个队列作为缓冲区,存储了已经从Pulsar broker拉取但尚未被应用程序处理的消息。这允许消费者预先从broker拉取多个消息,以便在处理当前消息时,下一个消息已经准备好了,这样可以提高处理效率。
-
流量控制:通过限制队列的大小,客户端可以控制其从broker接收消息的速率,从而避免因为消费者处理能力不足而导致内存溢出或其他资源问题。
-
性能调优:根据消费者的处理能力和应用程序的需求,可以调整receiver_queue_size的大小以优化性能。较小的队列可能会减少内存占用并减少消息积压,但可能会导致增加从broker拉取消息的频率。较大的队列可以减少拉取次数,但增加了消息延迟和内存使用的风险。
-
背压管理:如果消费者处理慢于消息到达的速率,receiver_queue_size可以帮助管理背压(backpressure)。一旦队列满了,Pulsar客户端会停止从broker拉取新消息,直到队列中有了空位。
举例来说,如果你设置receiver_queue_size=100,那么消费者客户端会在其内部队列中缓存最多100条消息。如果消费者处理消息的速度跟不上消息到达的速率,队列可能会很快填满,此时客户端不会再从broker接收更多消息,直到已缓存的消息被处理并从队列中清除。
调整receiver_queue_size参数需要根据具体情况权衡,因为它会直接影响应用程序的吞吐量和响应能力。
案例
import pulsar
# Pulsar 服务地址
service_url = 'pulsar://localhost:6650'
# 要订阅的主题
topic_name = 'persistent://public/default/my-topic'
# 订阅名称
subscription_name = 'my-subscription'
# 创建 Pulsar 客户端
client = pulsar.Client(service_url)
# 创建消费者,设置 receiver_queue_size
consumer = client.subscribe(topic=topic_name,
subscription_name=subscription_name,
receiver_queue_size=10)
try:
while True:
# 阻塞等待接收消息
msg = consumer.receive()
try:
print(f"Received message: '{msg.data()}'")
# 确认消息,以便它不会被再次传递
consumer.acknowledge(msg)
except Exception as e:
# 如果处理消息时发生异常,未确认的消息可能会被重新传递
print(f"Failed to process message: {e}")
consumer.negative_acknowledge(msg)
finally:
# 关闭客户端
client.close()