Rust 操作 Rabbitmq
使用docker快速部署rabbitmq
docker pull rabbitmq:management
# 15672为rabbitmq 管理员端口,默认账号密码为guest(账号密码相同)
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
rust 添加amqp库lapin
cargo add lapin
1. 连接到rabbitmq
let conn=lapin::Connection::connect(
"amqp://localhost:5672",
lapin::ConnectionProperties::default(),
)
.await?;
let chan=conn.create_channel().await?;
2. 交换机创建和队列创建
//创建一个名为itest的交换机,模式为话题模式
chan.exchange_declare(
"itest",
lapin::ExchangeKind::Topic,
lapin::options::ExchangeDeclareOptions::default(),
lapin::types::FieldTable::default(),
)
.await?;
//创建一个名为queue1的队列
chan.queue_declare(
"queue1",
lapin::options::QueueDeclareOptions::default(),
lapin::types::FieldTable::default(),
)
.await?;
//绑定队列到交换机,将名为队列queue1绑到交换机itest,并设置路由名为/queue1
chan.queue_bind(
"queue1",
"itest",
"/queue1",
lapin::options::QueueBindOptions::default(),
lapin::types::FieldTable::default(),
).await?;
3. 生产者发布消息
// 发送给itest交换机,交换机会把消息交给路由/queue1
chan.basic_publish(
"itest",
"/queue1",
lapin::options::BasicPublishOptions::default(),
"hello".as_bytes(),
lapin::BasicProperties::default(),
).await.expect("publish message failed");
4. 消费者订阅消息
let consumer = chan
.basic_consume(
"queue1",
"",
lapin::options::BasicConsumeOptions::default(),
lapin::types::FieldTable::default(),
)
.await?;
consumer.set_delegate(|d: lapin::message::DeliveryResult| async move {
match d {
Err(err) => eprintln!("subscribe message error {err}"),
Ok(data) => {
if let Some(data) = data {
let raw = data.data.clone();
let f = data.ack(lapin::options::BasicAckOptions::default());
println!(
"accept msg {}",
String::from_utf8(raw).expect("parse msg failed")
);
if let Err(err) = f.await {
eprintln!("ack failed {err}");
}
}
}
}
});
最终demo
#[cfg(test)]
mod mq{
#[tokio::test]
async fn rabbitmq() -> Result<(), Box<dyn std::error::Error>> {
//连接到rabbitmq
let conn = lapin::Connection::connect(
"amqp://localhost:5672",
lapin::ConnectionProperties::default(),
)
.await?;
let chan = conn.create_channel().await?;
//初始化queue和exchange
chan.queue_declare(
"queue1",
lapin::options::QueueDeclareOptions::default(),
lapin::types::FieldTable::default(),
)
.await?;
chan.exchange_declare(
"itest",
lapin::ExchangeKind::Topic,
lapin::options::ExchangeDeclareOptions::default(),
lapin::types::FieldTable::default(),
)
.await?;
chan.queue_bind(
"queue1",
"itest",
"/queue1",
lapin::options::QueueBindOptions::default(),
lapin::types::FieldTable::default(),
)
.await?;
//发送消息
tokio::spawn(async move {
chan.basic_publish(
"itest",
"/queue1",
lapin::options::BasicPublishOptions::default(),
"hello".as_bytes(),
lapin::BasicProperties::default(),
)
.await
.expect("publish message failed");
});
let chan = conn.create_channel().await?;
let consumer = chan
.basic_consume(
"queue1",
"",
lapin::options::BasicConsumeOptions::default(),
lapin::types::FieldTable::default(),
)
.await?;
//使用回调来触发接受到新消息时的操作,使用futures_lite 中StreamExt 可以不使用回调
consumer.set_delegate(|d: lapin::message::DeliveryResult| async move {
match d {
Err(err) => eprintln!("subscribe message error {err}"),
Ok(data) => {
if let Some(data) = data {
let raw = data.data.clone();
let f = data.ack(lapin::options::BasicAckOptions::default());
println!(
"accept msg {}",
String::from_utf8(raw).expect("parse msg failed")
);
if let Err(err) = f.await {
eprintln!("ack failed {err}");
}
}
}
}
});
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
Ok(())
}
}
结果展示
rabbitmq 管理后台页面可以看到我们创建的itest交换机和queue1队列向绑定,queue1的路由地址为/queue1
简言
amqp 包其实无论是rust 的lapin还是golang的streadway/amqp,操作手法整体都是一样的,rabbitmq其它几种模式可以参考我goalng 的rabbitmq几种模式下操作方式来类推