如何在 GoFrame 项目中实现 Redis 队列功能:
配置代码:
# 消息队列配置
mq:
# 消息队列类型: rocketmq 或 rabbitmq
type: "rabbitmq"
# 是否启用消息队列
enabled: true
rocketmq:
nameServer: "127.0.0.1:9876"
producerGroup: "myProducerGroup"
consumerGroup: "myConsumerGroup"
brokerAddress: "127.0.0.1:10911" # 添加 broker 地址
rabbitmq:
url: "amqp://wanghaibin:wanghaibin@127.0.0.1:5672/"
exchange: "gf_exchange"
dlx_exchange: "gf_dlx_exchange" # 新增:死信交换机
queue: "gf_queue"
delay_queue: "gf_delay_queue" # 新增:延迟队列
routingKey: "gf_key"
vhost: "/"
# GoFrame 项目实现 Redis 队列功能
本文将介绍如何在 GoFrame 框架中实现 Redis 队列功能,采用三层架构:Controller、Service 和 Logic。
## 1. 定义接口层 (Service)
首先在 `internal/service` 目录下创建接口定义:
```go:internal/service/redis_queue.go
package service
import (
"context"
)
type IRedisQueue interface {
// 生产消息
ProduceMessage(ctx context.Context, queue string, message string) error
// 消费消息
ConsumeMessage(ctx context.Context, queue string) (string, error)
// 获取队列长度
QueueLength(ctx context.Context, queue string) (int64, error)
}
var localRedisQueue IRedisQueue
func RedisQueue() IRedisQueue {
if localRedisQueue == nil {
panic("implement not found for interface IRedisQueue")
}
return localRedisQueue
}
func RegisterRedisQueue(i IRedisQueue) {
localRedisQueue = i
}
2. 实现业务逻辑层 (Logic)
在 internal/logic
目录下实现具体业务逻辑:
package redis_queue
import (
"context"
"gf_new_web/internal/service"
"github.com/gogf/gf/v2/frame/g"
)
type sRedisQueue struct{}
func init() {
service.RegisterRedisQueue(New())
}
func New() *sRedisQueue {
return &sRedisQueue{}
}
// ProduceMessage 生产消息
func (s *sRedisQueue) ProduceMessage(ctx context.Context, queue string, message string) error {
_, err := g.Redis().RPush(ctx, queue, message)
if err != nil {
return err
}
return nil
}
// ConsumeMessage 消费消息
func (s *sRedisQueue) ConsumeMessage(ctx context.Context, queue string) (string, error) {
message, err := g.Redis().LPop(ctx, queue)
if err != nil {
return "", err
}
return message.String(), nil
}
// QueueLength 获取队列长度
func (s *sRedisQueue) QueueLength(ctx context.Context, queue string) (int64, error) {
length, err := g.Redis().LLen(ctx, queue)
if err != nil {
return 0, err
}
return length, nil
}
3. 实现控制器层 (Controller)
在 internal/controller
目录下实现 HTTP 接口:
package controller
import (
"gf_new_web/internal/service"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/ghttp"
)
type cRedisQueue struct{}
var RedisQueue = cRedisQueue{}
// ProduceMessage 生产消息接口
func (c *cRedisQueue) ProduceMessage(r *ghttp.Request) {
var (
queue = r.Get("queue").String()
message = r.Get("message").String()
)
err := service.RedisQueue().ProduceMessage(r.GetCtx(), queue, message)
if err != nil {
r.Response.WriteJson(g.Map{
"code": -1,
"msg": err.Error(),
})
return
}
r.Response.WriteJson(g.Map{
"code": 0,
"msg": "消息发送成功",
})
}
// ConsumeMessage 消费消息接口
func (c *cRedisQueue) ConsumeMessage(r *ghttp.Request) {
var (
queue = r.Get("queue").String()
)
message, err := service.RedisQueue().ConsumeMessage(r.GetCtx(), queue)
if err != nil {
r.Response.WriteJson(g.Map{
"code": -1,
"msg": err.Error(),
})
return
}
r.Response.WriteJson(g.Map{
"code": 0,
"msg": "消息消费成功",
"data": message,
})
}
// QueueLength 获取队列长度接口
func (c *cRedisQueue) QueueLength(r *ghttp.Request) {
var (
queue = r.Get("queue").String()
)
length, err := service.RedisQueue().QueueLength(r.GetCtx(), queue)
if err != nil {
r.Response.WriteJson(g.Map{
"code": -1,
"msg": err.Error(),
})
return
}
r.Response.WriteJson(g.Map{
"code": 0,
"data": length,
})
}
4. 路由注册
在路由配置文件中添加相关路由:
func registerRouter(group *ghttp.RouterGroup) {
group.Group("/queue", func(group *ghttp.RouterGroup) {
group.POST("/produce", controller.RedisQueue.ProduceMessage)
group.GET("/consume", controller.RedisQueue.ConsumeMessage)
group.GET("/length", controller.RedisQueue.QueueLength)
})
}
5. 使用说明
生产消息
curl -X POST "http://localhost:8000/queue/produce" \
-d "queue=test_queue" \
-d "message=hello world"
消费消息
curl "http://localhost:8000/queue/consume?queue=test_queue"
获取队列长度
curl "http://localhost:8000/queue/length?queue=test_queue"
6. 特点和优势
- 层次分明:严格遵循三层架构设计
- 接口解耦:通过接口定义实现业务解耦
- 易于扩展:可以方便地添加新的队列操作
- 统一错误处理:采用统一的错误返回格式
- 使用 GoFrame 的 Redis 客户端,保证稳定性
7. 注意事项
- 需要在配置文件中正确配置 Redis 连接信息
- 建议添加队列名称的验证
- 可以考虑添加消息的序列化和反序列化
- 在生产环境中应该添加适当的日志记录
- 可以考虑添加消息的过期策略