golang实现延迟队列
1 延迟队列:邮件提醒、订单自动取消
延迟队列:处理需要在未来某个特定时间执行的任务。这些任务被添加到队列中,并且指定了一个执行时间,只有达到指定的时间点时才能从队列中取出并执行。
应用场景:
- 邮件提醒
- 订单自动取消(超过多少时间未支付,就取消订单)
- 对超时任务的处理等
由于任务的执行是在未来的某个时间点,因此这些任务不会立即执行,而是存储在队列中,直到它的预定执行时间才会被执行。
2 实现
2.1 simple简单版:go自带的time包实现
思路:
- 定义Task结构体,包含
- ExecuteTime time.Time
- Job func()
- 定义DelayQueue
- TaskQueue []Task
- func AddTask
- func RemoveTask
- ExecuteTask
这种方案存在的问题:
Go程序重启时,存储在slice中的延迟处理任务将全部丢失
完整代码:
package main
import (
"fmt"
"time"
)
/*
基于go实现延迟队列
*/
type Task struct {
ExecuteTime time.Time
Job func()
}
type DelayQueue struct {
Tasks []*Task
}
func (d *DelayQueue) AddTask(t *Task) {
d.Tasks = append(d.Tasks, t)
}
func (d *DelayQueue) RemoveTask() {
//FIFO: remove the first task to enqueue
d.Tasks = d.Tasks[1:]
}
func (d *DelayQueue) ExecuteTask() {
for len(d.Tasks) > 0 {
//dequeue a task
currentTask := d.Tasks[0]
if time.Now().Before(currentTask.ExecuteTime) {
//if the task execution time is not up, wait
time.Sleep(currentTask.ExecuteTime.Sub(time.Now()))
}
//execute the task
currentTask.Job()
//remove task who has been executed
d.RemoveTask()
}
}
func main() {
fmt.Println("start delayQueue")
delayQueue := &DelayQueue{}
firstTask := &Task{
ExecuteTime: time.Now().Add(time.Second * 1),
Job: func() {
fmt.Println("executed task 1 after delay")
},
}
delayQueue.AddTask(firstTask)
secondTask := &Task{
ExecuteTime: time.Now().Add(time.Second * 7),
Job: func() {
fmt.Println("executed task 2 after delay")
},
}
delayQueue.AddTask(secondTask)
delayQueue.ExecuteTask()
fmt.Println("all tasks have been done!!!")
}
效果:
2.2 complex持久版:go+redis
为了防止Go重启后存储到delayQueue的数据丢失,我们可以将任务持久化到redis中。
思路:
- 初始化redis连接
- 延迟队列采用redis的zset(有序集合)实现
前置准备:
# 安装docker
yum install -y yum-utils
yum-config-manager \
--add-repo \
https://download.docker.com/linux/centos/docker-ce.repo
yum install docker
systemctl start docker
# docker搭建redis
mkdir -p /Users/ziyi2/docker-home/redis
docker run -d --name redis -v /Users/ziyi2/docker-home/redis:/data -p 6379:6379 redis
完整代码:
package main
import (
"fmt"
"github.com/go-redis/redis"
log "github.com/ziyifast/log"
"time"
)
/*
基于redis zset实现延迟队列
*/
var redisdb *redis.Client
var DelayQueueKey = "delay-queue"
func initClient() (err error) {
redisdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // not set password
DB: 0, //use default db
})
_, err = redisdb.Ping().Result()
if err != nil {
log.Errorf("%v", err)
return err
}
return nil
}
func main() {
err := initClient()
if err != nil {
log.Errorf("init redis client err: %v", err)
return
}
addTaskToQueue("task1", time.Now().Add(time.Second*3).Unix())
addTaskToQueue("task2", time.Now().Add(time.Second*8).Unix())
//执行队列中的任务
getAndExecuteTask()
}
// executeTime为unix时间戳,作为zset中的score。允许redis按照task应该执行时间来进行排序
func addTaskToQueue(task string, executeTime int64) {
err := redisdb.ZAdd(DelayQueueKey, redis.Z{
Score: float64(executeTime),
Member: task,
}).Err()
if err != nil {
panic(err)
}
}
// 从redis中取一个task并执行
func getAndExecuteTask() {
for {
tasks, err := redisdb.ZRangeByScore(DelayQueueKey, redis.ZRangeBy{
Min: "-inf",
Max: fmt.Sprintf("%d", time.Now().Unix()),
Offset: 0,
Count: 1,
}).Result()
if err != nil {
time.Sleep(time.Second * 1)
continue
}
//处理任务
for _, task := range tasks {
fmt.Println("Execute task: ", task)
//执行完任务之后用 ZREM 移除该任务
redisdb.ZRem(DelayQueueKey, task)
}
time.Sleep(time.Second * 1)
}
}
效果:
redis一直从延迟队列中取数据,如果处理完一批则睡眠1s
- 具体根据大家的业务调整,此处主要介绍思路