Rabbitmq 搭建使用案例
文章目录
- RabbitMQ搭建
- docker
- 代码
- golang
- 生产者
- 消费者
- 可视化
- 消费进度
RabbitMQ搭建
docker
docker run -d --hostname rabbitmq --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_MANAGEMENT_PLUGIN=rabbitmq_management_agent -p 15672:15672 -p 5672:5672 rabbitmq:management
代码
golang
生产者
package main
import (
"flag"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"log"
"strconv"
"time"
)
func main() {
var url = flag.String("url", "amqp://admin:admin@localhost:5672/my_vhost", "RabbitMQ server URL")
var exchange = flag.String("exchange", "logs", "Exchange name")
var key = flag.String("key", "log", "Routing key")
flag.Parse()
// 连接到RabbitMQ服务器
conn, err := amqp.Dial(*url)
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
}
defer conn.Close()
// 创建一个通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %v", err)
}
defer ch.Close()
// 声明一个交换机
err = ch.ExchangeDeclare(
*exchange, // name: 交换机名称
"fanout", // kind: 交换机类型
true, // durable: 是否持久化
false, // autoDelete: 没有队列绑定时是否自动删除
false, // internal: 是否是内部交换机
false, // noWait: 是否需要等待服务器响应
nil, // args: 其他参数
)
if err != nil {
log.Fatalf("Failed to declare an exchange: %v", err)
}
// 发送消息
body := "Hello World!" + fmt.Sprintf(time.Now().String())
for i := 0; i < 20; i++ {
body = strconv.Itoa(i) + body
err = ch.Publish(
*exchange, // 交换机名称
*key, // 路由键
false, // 强制发布
false, // 立即发布
amqp.Publishing{
ContentType: "text/plain",
DeliveryMode: amqp.Persistent,
Body: []byte(body),
Expiration: "10000", // 3000 3秒
})
}
if err != nil {
log.Fatalf("Failed to publish a message: %v", err)
}
fmt.Printf(" [x] Sent %s", body)
}
消费者
package main
import (
"flag"
"fmt"
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
var url = flag.String("url", "amqp://admin:admin@localhost:5672/my_vhost", "RabbitMQ server URL")
var exchange = flag.String("exchange", "logs", "Exchange name")
var key = flag.String("key", "log", "Routing key")
flag.Parse()
// 连接到RabbitMQ服务器
conn, err := amqp.Dial(*url)
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
}
defer conn.Close()
// 创建一个通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %v", err)
}
defer ch.Close()
// 声明一个交换机
err = ch.ExchangeDeclare(
*exchange, // name: 交换机名称
"fanout", // kind: 交换机类型
true, // durable: 是否持久化
false, // autoDelete: 没有队列绑定时是否自动删除
false, // internal: 是否是内部交换机
false, // noWait: 是否需要等待服务器响应
nil, // args: 其他参数
)
if err != nil {
log.Fatalf("Failed to declare an exchange: %v", err)
}
// 声明一个队列
q, err := ch.QueueDeclare(
"queue01", // 随机生成队列名称
true, // 持久化
false, // 删除
false, // 独占
false, // 不等消息
nil, // 其他参数
)
if err != nil {
log.Fatalf("Failed to declare a queue: %v", err)
}
// 绑定队列到交换机
err = ch.QueueBind(
q.Name, // 队列名称
*key, // 路由键
*exchange, // 交换机名称
false, // 现在绑定
nil, // 其他参数
)
if err != nil {
log.Fatalf("Failed to bind a queue: %v", err)
}
// 接收消息
msgs, err := ch.Consume(
q.Name, // 队列名称
"consumer01", // 消费者标签
false, // 自动ack
false, // 不独占
false, // 不等消息
false, // 不从服务器获取消息
nil, // 其他参数
)
if err != nil {
log.Fatalf("Failed to register a consumer: %v", err)
}
fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C\n")
for d := range msgs {
// 输出接收到的消息
fmt.Printf(" [x] Received %s\n", d.Body)
err = ch.Ack(d.DeliveryTag, true)
if err != nil {
log.Fatalf("Failed to ack message: %v", err)
}
}
}
可视化
看板
http://localhost:15672/
账户密码
admin
admin
消费进度
http://localhost:15672/#/queues