RabbitMQ-默认读、写方式介绍
RabbitMQ-发布/订阅模式
RabbitMQ-直连交换机(direct)使用方法
目录
1、概述
2、topic交换机使用方法
2.1 适用场景
2.2 解决方案
3、代码实现
3.1 源代码实现
3.2 运行记录
4、小结
1、概述
topic 交换机是比直连交换机功能更加强大的交换方式,通过不同的路由规则,可以实现fanout、direct两种交换机的功能。
2、topic交换机使用方法
2.1 适用场景
假设我们要对动物做一个描述,根据速度、颜色、种类等特征对其进行分别入到不同的mq队列中,routing key的格式为:"<speed>.<colour>.<species>
",比如说,所有黄色动物入队列1,跑的速度慢的,还有小兔子入队列2,哪该如何实现该需求呢?
2.2 解决方案
结合2.1描述的需求,我们可以画出如下框图:
知识点解释:
*
(star) :和正则的功能类似,可以代表一整个单词。
#
(hash) :代表0个或者多个单词。
如果一条消息的routing key为「quick.orange.rabbit」,将会被同时路由到Q1和Q2,「lazy.orange.elephant」的routing key同样也将会被同时路由到Q1和Q2,「quick.orange.fox」的消息只会被路由Q1,【lazy.brown.fox】只会被路由到Q2,【lazy.pink.rabbit】只会被路由到Q2一次,虽然匹配了两个binding,【quick.brown.fox】没有匹配到任何的绑定,那么消息将会被丢弃。
如果一个队列绑定的是【#】,那么他将会接收到所有的消息,会忽略调binding key,实现类似扇形交换机的功能。
如果一个routing key中没有设计【#】和【*】,那么他会实现类似直连交换机的功能。
3、代码实现
3.1 源代码实现
生产者:
package main
import (
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
fmt.Println("Failed to connect to RabbitMQ")
return
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
fmt.Println("Failed to open a channel")
return
}
err = ch.ExchangeDeclare(
"logs_topic", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
fmt.Println("Failed to declare an exchange,err:", err)
return
}
body := "Hello World by topic exchange"
err = ch.Publish(
"logs_topic", // exchange
"quick.orange.fox", // routing key
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
if err != nil {
fmt.Println("Failed to publish a message")
return
}
}
代码示例中routing key为【quick.orange.fox】,这条消息将会被路由到2.2中的Q1队列中。
消费侧代码:
package main
import (
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
fmt.Println("Failed to connect to RabbitMQ")
return
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
fmt.Println("Failed to open a channel")
return
}
err = ch.ExchangeDeclare("logs", "direct", true, false, false, false, nil)
if err != nil {
fmt.Println("Failed to declare an exchange")
return
}
q, err := ch.QueueDeclare(
"logs_topic", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
err = ch.QueueBind(
q.Name, // queue name
"*.orange.*", // routing key(binding key)
"logs_topic", // exchange
false,
nil,
)
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
true, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
var forever chan struct{}
go func() {
for d := range msgs {
fmt.Printf(" [x] %s\n", d.Body)
}
}()
fmt.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
3.2 运行记录
发送消息:
接收消息:
4、小结
学到这里发现,topic交换机完全具备fanout、direct两种交换机的全部功能,日常开发完全可以使用topic交换机,根据不同routing key即可以实现扇形和直连交换机的功能。
比如第3章节中消费者,routing key设置为【#】,则这个队列可以接收所有消息,类似扇形交换机功能。