RabbitMQ延时消息
RabbitMQ 本身并没有直接支持延时消息的功能,但是可以通过使用 RabbitMQ 插件或构建消息死信队列(Dead Letter Exchange, DLX)的方式来实现延时消息。以下是两种实现延时消息的方法:
1、死信队列 (Dead-Letter Queue):
- 当消息被拒绝接收,或者在队列中的存活时间超过了设置的TTL(Time-To-Live),或者队列达到最大长度时,消息会变成死信。
- 死信会被重新发布到另一个交换机上,这个交换机就是DLX(Dead-Letter Exchange)。
- 在声明业务队列时,可以添加一个
x-dead-letter-exchange
参数,值为死信交换机,这样死信就会被RabbitMQ重新发布到配置的这个交换机上.
2、延时插件 (Delayed Message Plugin):
需要先安装 RabbitMQ Delayed Message 插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez
mv rabbitmq_delayed_message_exchange-3.8.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-<version>/plugins/
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- 声明一个类型为
x-delayed-message
的交换机,并添加一个x-delayed-type
参数,值为交换机的类型,用于路由键的映射。 - 这种方式允许消息在交换机中延迟一定时间,然后再根据路由键发送到相应的队列.
RocketMQ延时消息
RocketMQ把持 实现延时消息相对简单,因为它内置了对延时消息的支持。RocketMQ 通过设置消息的 定时消息和延时消息本质相同,都是服务端根据消息设置的定时时间在某一固定时刻将消息投递给消费者消费。
比如,在分布式定时调度场景下,需要实现各类精度的定时任务,例如每天5点执行文件清理,每隔2分钟触发一次消息推送等需求。
以下是实现延时消息的详细步骤:
1、Producer发送延时消息
RocketMQ 通过设置消息的 delayTimeLevel
属性来实现延时投递。
在 Producer 端,通过设置消息的 delayTimeLevel
属性来发送延时消息。RocketMQ 内置了一组延迟级别,可以通过 delayTimeLevel
来指定延迟时间。
// 创建消息
Message message = new Message("TopicTest", "TagA", "delayed message".getBytes());
// 设置延时级别
message.setDelayTimeLevel(3); // 延时级别 3,对应于延时 10 秒
2、延时级别对照表
RocketMQ 预定义了多个延时级别,每个级别对应不同的延时时间。以下是默认的延时级别对照表:
可以根据需要选择合适的延时级别,并设置到 message.setDelayTimeLevel(level)
方法中。
3、Consumer 接收消息
在 Consumer 端,接收和处理延时消息与普通消息相同。
4、调整配置
如果默认的延时级别不满足需求,可以通过修改 RocketMQ 配置文件 broker.conf
来调整延时级别和对应的延时时间:
messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
修改完配置文件后,重启 Broker 生效。
小结
可以看到RocketMQ 5.x已经更新了延时消息的实现,在官方文档可以看到已经改成定时、延时消息,原本4.x文档中的延时级别对照表已经去掉了,统一成时间戳的实现。
RocketMQ 5.x 版本对延时消息的实现进行了重大更新。与之前版本相比,5.x 版本可以更灵活地处理消息的延迟时间,允许用户指定准确的延迟时间而不仅仅是预设的延迟级别。
下面来具体看看,实时/延时消息概念:
定时消息和延时消息本质相同,都是服务端根据消息设置的定时时间在某一固定时刻将消息投递给消费者消费。
- 定时消息:例如,当前系统时间为2022-06-09 17:30:00,您希望消息在下午19:20:00定时投递,则定时时间为2022-06-09 19:20:00,转换成时间戳格式为1654773600000。
- 延时消息:例如,当前系统时间为2022-06-09 17:30:00,您希望延时1个小时后投递消息,则您需要根据当前时间和延时时长换算成定时时刻,即消息投递时间为2022-06-09 18:30:00,转换为时间戳格式为1654770600000。
Kafka延时消息
在Apache Kafka中,延时消息的处理也不是内置功能,但可以通过一些设计模式和技术手段来实现。以下是几种常见的方法来处理Kafka中的延时消息:
1、基于时间戳的延时消息
- 生产者在发送消息时,可以在消息的头部添加一个时间戳字段,表示消息应该被消费的时间。
- 消费者在接收到消息后,检查时间戳,如果未到处理时间,则暂时不处理此消息,直到达到指定时间。
2、基于单独的延时主题(Topic)
- 创建一个专门的延时Topic
- 生产者先将延时消息发送到延时Topic
- 消费者从延时Topic拉取未到期的消息放入延时队列
- 延时消息到期后,再发送到目标Topic供实际消费
3、利用Kafka Stream做中间处理
- 创建一个Kafka Streams应用程序,用于处理延时消息。
- 定义输入Topic,用于接收原始延时消息。同时定义输出Topic,用于发送到期的延时消息。
- 使用Kafka Streams DSL定义Topology,对输入消息进行处理。
- 使用自定义的Punctuator定期从State Store中读取到期的延时消息,发送到输出Topic。
这种方式的优点是延迟时间计算由Kafka Streams完成,不需要额外线程控制。缺点是需要应用程序支持并维护Kafka Streams,运维成本较高。
4、基于第三方中间件或工具
- 利用redis、rabbitmq等其它中间件,构建一个延时消息系统
- 延时消息从外部系统发往Kafka时已经延时完成
- 例如利用RabbitMQ的延时队列特性实现
Pulsar延时消息
Pulsar自带了延时消息功能,可以在发送消息时设置消息的deliverAt或deliverAfter属性。
Apache Pulsar实现延时消息的方案:
deliverAfter
方法:通过指定一个延时时长来发送消息,消息将在该时长后被投递。deliverAt
方法:通过指定一个具体的未来时间戳来发送消息,消息将在该时间点被投递。
这两种方法都可以通过Pulsar的客户端API实现,简单且直接。