如果你想通过 RabbitMQ 的死信队列功能实现消费者拒绝消息投递到死信交换机的行为,你可以按照以下步骤操作:
- 创建原始队列,并将其绑定到一个交换机上:
export RABBITMQ_SERVER=127.0.0.1
export RABBITMQ_PORT=5672
export RABBITMQ_USER=mingcai
export RABBITMQ_PASSWORD=password
rabbitmqadmin list exchanges
rabbitmqadmin declare queue name=my_queue durable=true
rabbitmqadmin declare exchange name=my_exchange type=direct
rabbitmqadmin declare binding source=my_exchange destination=my_queue routing_key=my_routing_key
- 创建死信交换机和死信队列,并将死信队列绑定到死信交换机:
rabbitmqadmin declare exchange name=my_dlx type=direct
rabbitmqadmin declare queue name=my_dlq durable=true
rabbitmqadmin declare binding source=my_dlx destination=my_dlq routing_key=dlx_routing_key
- 设置原始队列的参数,包括死信交换机和死信路由键:
rabbitmqadmin declare queue name=my_queue durable=true arguments='{"x-dead-letter-exchange":"my_dlx", "x-dead-letter-routing-key":"dlx_routing_key"}'
- 编写消费者代码,在消费者代码中拒绝投递消息到死信交换机:
//Consumer.ts
import * as amqp from 'amqplib';
async function main() {
const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');
const channel = await connection.createChannel();
const queue = 'my_queue';
await channel.consume(queue, async (msg) => {
if (msg !== null) {
try {
await new Promise(resolve => {
setTimeout(()=>{
resolve(true)
},3000)
})
// Process the message
console.log("Received message:", msg.content.toString());
// Simulate rejection of message delivery to dead letter exchange
throw new Error("Failed to process message, rejecting delivery to dead letter exchange");
} catch (error) {
// Reject the message delivery to dead letter exchange
channel.reject(msg, false);
console.error("Error processing message:", error.message);
}
}
});
console.log('Waiting for messages...');
}
main().catch(console.error);
在这个示例中,当消费者处理消息时发生错误时,它会将消息的投递拒绝到死信交换机。这样,这些被拒绝的消息将被重新路由到死信队列 my_dlq
中。
记得根据你的具体需求修改队列、交换机和消费者的配置,确保它们符合你的预期行为。
rabbitmqadmin purge queue name=my_dlq #删除队列中的所有消息
//Producer.ts
import * as amqp from 'amqplib';
async function sendMsg() {
try {
const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');
const channel = await connection.createChannel(); // 创建通道
const delayQueue = 'my_queue'; // 延迟队列名称
// 每秒发送一条消息
for (let i = 0; i < 100; i++) {
await new Promise(resolve => {
setTimeout(() => {
resolve(true)
}, 1000)
})
const message = 'Hello RabbitMQ!';
channel.sendToQueue(delayQueue, Buffer.from(message));
console.log('消息已发送:', message);
}
} catch (error) {
console.error('错误:', error);
}
}
sendMsg()