前言💬
Windows 上的 RabbitMQ 被我卸载了,在 macOS 上再安装一下,采用 brew install 还是挺方便的。
很好奇微软的程序员写代码用的是 Windows 操作系统吗?感觉有点不方便,但用 macOS 岂不是太丢撵了。
一、macOS 安装 Go 和 RabbitMQ
brew install go
brew install erlang // 若未安装,需要安装
brew install rabbitmq
是不是很方便💥!!此外根据 go 的奇怪特性,建议所有的项目都在 GOPATH 下的 src 目录中开发。
go env
mkdir -p GOPATH/{src,pkg,bin}
通过 go env 查到 GOPATH,在 GOPATH 下新建三个目录,在 src 目录中存放自己的代码。
二、RabbitMQ 核心概念
2.1 服务器启动与关闭
启动 RabbitMQ,首先要找到 brew 将其安装在哪,然后运行相应的可执行文件。
cd /usr/local/cellar/rabbitmq/3.10.7
./sbin/rabbitmq-server
启动成功后,查看 127.0.0.1:15672,15672是管理界面的默认端口,但5672是数据默认端口,开发时应连接5672端口。
通过以下命令关闭服务器。
rabbitmqctl stop
2.2 六个核心概念
- VirtualHost,虚拟主机,可以用作逻辑隔离,如测试环境和开发环境。
- Connection,建立满足 AMQP 协议的连接。
- Exchange,交换机,基于规则存储-转发消息,类似于计算机网络中的交换机。
- Queue,消息队列,两端可以视为广义的生产者和消费者。
- Channel,信道,它是生产者、消费者与RabbitMQ 通信的渠道。
- Binding,建立交换机与消息队列的绑定。
三、工作模式
所谓工作模式就是生产者与消费者间以何种规则进行生产、消费,总结如下
工作模式 | 特点 |
---|---|
Simple | 消费者监听MQ,一旦有消息就消费掉,且该消息从队列中删除 |
Work | 一个消息只能被一个消费者获取(如抢红包场景) |
Publish-Subscribe | 一个消息可以被多个消费者获取(如群聊天) |
Routing | |
Topic | |
RPC |
3.1 工具函数和工具结构体定义
/* rabbitmq.go 文件 */
package RabbitMQ
import (
"fmt"
"log"
"github.com/streadway/amqp"
)
// url 格式 amqp://账号:密码@rabbitmq服务器地址:端口号/vhost
const MQURL = "amqp://imoocuser:imoocuser@127.0.0.1:5672/imooc"
type RabbitMQ struct {
conn *amqp.Connection
channel *amqp.Channel
QueueName string // 队列名称
Exchange string // 交换机
Key string // key
Mqurl string // 连接信息
}
// 创建 RabbitMQ 结构体实例
func NewRabbitMQ(queueName string, exchange string, key string) *RabbitMQ {
return &RabbitMQ{QueueName: queueName, Exchange: exchange, Key: key, Mqurl: MQURL}
}
// 断开 channel 和 connection,r 是接收者变量
func (r *RabbitMQ) Destory() {
r.channel.Close()
r.conn.Close()
}
// 打印错误信息
func (r *RabbitMQ) failOnErr(err error, message string) {
if err != nil {
log.Fatalf("%s:%s", message, err)
panic(fmt.Sprintf("%s:%s", message, err))
}
}
3.2 实现 Simple 模式的生产者-消费者模型
/* rabbitmq.go 文件 */
/* ============= simple 模式下的生产消费 ============= */
// 创建 simple 模式的 rabbitmq 实例,传入参数仅为 queueName
func NewRabbitMQSimple(queueName string) *RabbitMQ {
rabbitmq := NewRabbitMQ(queueName, "", "")
var err error
// 创建 rabbitmq 连接
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "创建连接时发生错误!")
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "获取 channel 失败!")
return rabbitmq
}
// simple 模式下的生产者代码
func (r *RabbitMQ) PublishSimple(message string) {
// 1. 申请队列,若队列不存在,创建;若存在,跳过创建(保证队列存在)
_, err := r.channel.QueueDeclare(
r.QueueName,
false, // 数据是否持久化
false, // 最后一个生产者断开连接时是否自动删除
false, // 是否排他性
false, // 是否阻塞
nil,
)
if err != nil {
fmt.Println(err)
}
// 2. 发送消息到队列中
r.channel.Publish(
r.Exchange,
r.QueueName,
false, // 若为true,根据exchange类型和routekey规则,若无法找到符合条件的队列,则把发送的消息返回给发送者
false, // 若为true,当exchange发送消息到队列后发现队列上没有绑定消费者,则把消息返回给发送者
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
}
// simple 模式下的消费者代码
func (r *RabbitMQ) ConsumeSimple() {
// 1. 申请队列,若队列不存在,创建;若存在,跳过创建(保证队列存在)
_, err := r.channel.QueueDeclare(
r.QueueName,
false, // 数据是否持久化
false, // 最后一个生产者断开连接时是否自动删除
false, // 是否排他性
false, // 是否阻塞
nil,
)
if err != nil {
fmt.Println(err)
}
// 2.接收(消费)消息
msgs, err := r.channel.Consume(
r.QueueName,
"", // 不区分多个消费者
true, // 是否自动应答
false, // 是否有排他性
false, // 若设置为true,表示不能将同一个connection中发送的消息传递给该connection中的消费者
false, // 队列消费是否阻塞
nil,
)
if err != nil {
fmt.Println(err)
}
forever := make(chan bool)
// 启动协程处理消息
go func() {
for d := range msgs {
// 实现我们要处理的逻辑函数
log.Printf("[*] Receive a message: %s", d.Body)
}
}()
log.Printf("[*] Waiting for messages, To exit press CTRL+C")
<-forever
}