Asynq: 基于Redis实现的Go生态分布式任务队列和异步处理库

news2025/1/12 13:12:40

Asynq[1]是一个Go实现的分布式任务队列和异步处理库,基于redis,类似Ruby的sidekiq[2]和Python的celery[3]。Go生态类似的还有machinery[4]和goworker

alt

同时提供一个WebUI asynqmon[5],可以源码形式安装或使用Docker image, 还可以和Prometheus集成

docker run --rm --name asynqmon -p 8080:8080 hibiken/asynqmon,如果使用的是主机上的redis,还需加上 --redis-addr=host.docker.internal:6379,否则会报错[6]

docker run --rm --name asynqmon -p 8080:8080 hibiken/asynqmon --redis-addr=host.docker.internal:6379


➜  asynq-demo git:(main) ✗ tree
.
├── client.go
├── const.go
├── go.mod
├── go.sum
└── server.go

0 directories, 5 files

其中const.go:

package main

const (
 redisAddr   = "127.0.0.1:6379"
 redisPasswd = ""
)

const (
 TypeExampleTask    = "shuang:asynq-task:example"
)

client.go:


package main

import (
 "encoding/json"
 "fmt"
 "log"
 "time"

 "github.com/hibiken/asynq"
)

type ExampleTaskPayload struct {
 UserID string
 Msg    string
 // 业务需要的其他字段
}

func NewExampleTask(userID string, msg string) (*asynq.Task, error) {
 payload, err := json.Marshal(ExampleTaskPayload{UserID: userID, Msg: msg})
 if err != nil {
  return nil, err
 }
 return asynq.NewTask(TypeExampleTask, payload), nil
}

var client *asynq.Client

func main() {

 client = asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr, Password: redisPasswd, DB: 0})
 defer client.Close()

 //go startExampleTask()
 startExampleTask()

 //startGithubUpdate() // 定时触发
}

func startExampleTask() {

 fmt.Println("开始执行一次性的任务")
 // 立刻执行
 task1, err := NewExampleTask("10001""mashangzhixing!")
 if err != nil {
  log.Fatalf("could not create task: %v", err)
 }

 info, err := client.Enqueue(task1)
 if err != nil {
  log.Fatalf("could not enqueue task: %v", err)
 }
 log.Printf("task1 -> enqueued task: id=%s queue=%s", info.ID, info.Queue)

 // 10秒后执行(定时执行)
 task2, err := NewExampleTask("10002""10s houzhixing")
 if err != nil {
  log.Fatalf("could not create task: %v", err)
 }

 info, err = client.Enqueue(task2, asynq.ProcessIn(10*time.Second))
 if err != nil {
  log.Fatalf("could not enqueue task: %v", err)
 }
 log.Printf("task2 -> enqueued task: id=%s queue=%s", info.ID, info.Queue)

 // 30s后执行(定时执行)
 task3, err := NewExampleTask("10003""30s houzhixing")
 if err != nil {
  log.Fatalf("could not create task: %v", err)
 }

 theTime := time.Now().Add(30 * time.Second)
 info, err = client.Enqueue(task3, asynq.ProcessAt(theTime))
 if err != nil {
  log.Fatalf("could not enqueue task: %v", err)
 }
 log.Printf("task3 -> enqueued task: id=%s queue=%s", info.ID, info.Queue)
}

server.go:

package main

import (
 "context"
 "encoding/json"
 "fmt"
 "time"

 "github.com/davecgh/go-spew/spew"
 "github.com/hibiken/asynq"
)

var AsynqServer *asynq.Server // 异步任务server

func initTaskServer() error {
 // 初始化异步任务服务端
 AsynqServer = asynq.NewServer(
  asynq.RedisClientOpt{
   Addr:     redisAddr,
   Password: redisPasswd, //与client对应
   DB:       0,
  },
  asynq.Config{
   // Specify how many concurrent workers to use
   Concurrency: 100,
   // Optionally specify multiple queues with different priority.
   Queues: map[string]int{
    "critical"6,
    "default":  3,
    "low":      1,
   },
   // See the godoc for other configuration options
  },
 )
 return nil
}

func main() {
 initTaskServer()
 mux := asynq.NewServeMux()

 mux.HandleFunc(TypeExampleTask, HandleExampleTask)
 // ...register other handlers...

 if err := AsynqServer.Run(mux); err != nil {
  fmt.Printf("could not run asynq server: %v", err)
 }
}

func HandleExampleTask(ctx context.Context, t *asynq.Task) error {

 res := make(map[string]string)

 spew.Dump("t.Payload() is:", t.Payload())
 err := json.Unmarshal(t.Payload(), &res)
 if err != nil {
  fmt.Printf("rum session, can not parse payload: %s,  err: %v", t.Payload(), err)
  return nil
 }
 //-----------具体处理逻辑------------
 spew.Println("拿到的入参为:", res, "接下来将进行具体处理")
 fmt.Println()
 // 模拟具体的处理
 time.Sleep(5 * time.Second)
 fmt.Println("--------------处理了5s,处理完成-----------------")

 return nil

}

执行redis-server


清除redis中所有的key:


执行docker run --rm --name asynqmon -p 8080:8080 hibiken/asynqmon --redis-addr=host.docker.internal:6379

alt

执行 go run client.go const.go (生产者,产生消息放入队列)

alt

此时能看到redis中多个几个key

alt

同时管理后台能看到队列的信息

alt

执行 go run server.go const.go (消费者,消费队列中的消息)

alt

可以看到都被处理了

alt

此时redis中的key:

alt

此处的业务处理为模拟,实际可能是某个被触发后不需要马上执行的操作




实际试一下。通过一个定时器(24h执行一次),触发代码每天向github push当天的代码等内容。收到触发后无需马上执行(可能当时其他请求量高,机器资源紧张),可以先放入队列,延迟30min后实际去执行。

完整Demo[7] push github的功能没有完全实现


另外可以配置队列的优先级,asynq队列如何配置队列优先级[8]

 // 初始化异步任务服务端
 AsynqServer = asynq.NewServer(
  asynq.RedisClientOpt{
   Addr:     redisAddr,
   Password: redisPasswd, //与client对应
   DB:       0,
  },
  asynq.Config{
   // Specify how many concurrent workers to use
   Concurrency: 100,
   // Optionally specify multiple queues with different priority.
   Queues: map[string]int{
    "critical"6,//关键队列中的任务将被处理 60% 的时间
    "default":  3,//默认队列中的任务将被处理 30% 的时间
    "low":      1,//低队列中的任务将被处理 10% 的时间
   },
   // See the godoc for other configuration options
  },
 )

go asynq 异步任务 (延迟触发) 简单案例及奇怪的错误[9]

参考资料

[1]

Asynq: https://github.com/hibiken/asynq

[2]

sidekiq: https://github.com/sidekiq/sidekiq

[3]

celery: https://github.com/celery/celery

[4]

machinery: https://blog.csdn.net/weixin_42681866/article/details/123334654

[5]

asynqmon: https://github.com/hibiken/asynqmon

[6]

报错: https://github.com/hibiken/asynqmon/issues/214

[7]

完整Demo: https://github.com/cuishuang/asynq-demo

[8]

asynq队列如何配置队列优先级: https://blog.csdn.net/itopit/article/details/126123626

[9]

go asynq 异步任务 (延迟触发) 简单案例及奇怪的错误: https://my.oschina.net/randolphcyg/blog/5539676

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/847702.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【数据结构与算法——TypeScript】哈希表

【数据结构与算法——TypeScript】 哈希表(HashTable) 哈希表介绍和特性 哈希表是一种非常重要的数据结构,但是很多学习编程的人一直搞不懂哈希表到底是如何实现的。 在这一章节中,我门就一点点来实现一个自己的哈希表。通过实现来理解哈希表背后的原理…

oracle容灾备份怎么样Oracle容灾备份

随着科学技术的发展和业务的增长,数据安全问题越来越突出。为了保证数据的完整性、易用性和保密性,公司需要采取一系列措施来防止内容丢失的风险。  Oracle是一个关系数据库管理系统(RDBMS),OracleCorporation是由美国软件公司开发和维护的。该系统功能…

构建Docker容器监控系统(cadvisor+influxDB+grafana)

目录 一、部署 1、安装docker-cd 2、阿里云镜像加速 3、下载组件镜像 4、创建自定义网络 5、创建influxdb容器 6、创建Cadvisor 容器 7、创建granafa容器 一、部署 1、安装docker-cd [rootlocalhost ~]# iptables -F [rootlocalhost ~]# setenforce 0 setenforce: SELi…

22款奔驰GLC260加装原厂香氛负离子系统,清香宜人,久闻不腻

奔驰原厂香氛合理性可通过车内空气调节组件营造芳香四溢的怡人氛围。通过更换手套箱内香氛喷雾发生器所用的香水瓶,可轻松选择其他香氛。香氛的浓度和持续时间可调。淡雅的香氛缓缓喷出,并且在关闭后能够立刻散去。车内气味不会永久改变,香氛…

IELAB-网络工程师的路由答疑10问(2)

各位小伙伴们,接下来的问题可能有些难度,你们做好准备了吗? 7. 动态路由协议做了啥? 这次咱们先解决第一个比较棘手的问题--路由协议,相信初学的同学对于路由协议的学习总是或多或少有些问题,呐&#xff…

UVA1025 城市里的间谍 A Spy in the Metro

实际上这题就是问Mario最少的总等车时间 这题我的做法是 把一个火车从左到右(或从右到左) 的过程 转化成 途中任何车站到左边(或右边)相邻车站 的过程 相当于把他切成了一段一段 (一段就是两个相邻车站中间的部分) 这样更容易操作 具体请看代码 一些注释在代码里请往下看 #inc…

.netcore下grpc概述

一、什么是grpc 是一种与语言无关的高性能远程过程调用 (RPC) 框架。基于http/2标准设计,提供了头部压缩、tcp连接上的多路复用、流量控制、流式处理(客户端流/服务端流/双向流)。提供统一使用的.proto文件,它定义 grpc 服务和消…

屏幕录制app分享,总有适合你的一款

在现今的互联网时代,屏幕录制已经成为了一项必备的技能。然而,要想将自己的屏幕录制下来并分享给别人,就需要一款好用的屏幕录制app。市面上有许多不同的屏幕录制app,每个人的需求也各不相同。本篇文章就将为大家推荐几款适合不同…

技术应用:Docker安全性的最佳实验|聊聊工程化Docker

🔥 技术相关:《技术应用》 ⛺️ I Love you, like a fire! 文章目录 首先,使用Docker Hub控制访问其次,保护密钥写在最后 不可否认,能生存在互联网上的软件都是相互关联的,当我们开发一款应用程序时&#x…

好用的Windows 10磁盘管理工具

​前几天,我给我用的戴尔笔记本电脑装上了全新的SSD,并准备将所有除Windows操作系统以外的数据,特别是游戏,全部转移到SSD上(主要是因为这样能加快游戏的加载速度)。但在我尝试用Windows 10自带的磁盘管理操…

树莓派4B, Purple Pi, Orange Pi 3B对比

1 参数 树莓派4BPurple Pi OHOrange Pi 3BSOCBroadcom BCM2711RockChip 3566RockChip 3566CPUARM Cortex-A72 四核1.5GHz主频ARM Cortex-A55 四核 2.0GHz主频ARM Cortex-A55 四核 2.0GHz主频GPU支持OpenGL ES 3.0 graphicsMali-G52 1-Core-2EE 支持 OpenGL ES 1.1/2.0/3.2&…

Python操作MySQL将数据库表中的数据导出到excel

Author: liukai 2810248865qq.com Date: 2022-08-18 04:28:52 LastEditors: liukai 2810248865qq.com LastEditTime: 2023-06-29 09:35:25 FilePath: \PythonProject01\Python操作MySQL数据库及excel将数据库表中的数据导出到excel中.py Description: 这是默认设置,请设置custo…

独立站如何进行Facebook广告投放?关于广告投放策略的真相

谷歌广告是独立站卖家推广引流的首选渠道,那么谷歌广告该如何投放?在这个过程中有哪些需要特别注意的吗? 创建Facebook广告账户: 访问Facebook广告管理平台(Ads Manager)并创建一个广告账户。您需要提供一…

Android 数据库之GreenDAO

GreenDAO 是一款开源的面向 Android 的轻便、快捷的 ORM 框架,将 Java 对象映射到 SQLite 数据库中,我们操作数据库的时候,不再需要编写复杂的 SQL语句, 在性能方面,greenDAO 针对 Android 进行了高度优化,…

dy六神参数记录分析(立秋篇)

version: 23.9 X-SSSTUB: 搜索:x-tt-dt var hashMap Java.use("java.util.HashMap");hashMap.put.implementation function (a, b) {console.log("hashMap.put: ", a, b);return this.put(a, b);}https://codeooo.blog.csdn.n…

分享一下Steam搬砖常规操作

大家好,我是阿阳,接下来我们会陆续更新一些Steam搬砖项目的操作课程,大家可以自行学习,希望对正在操作的朋友,有一定的帮助。 steam平台,对于大多数游戏玩家应该再清楚不过了,玩过pubg&#xf…

资深测试老鸟整理,性能测试-常见调优详细,卷起来...

目录:导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜) 前言 常见的一些性能缺…

【数据结构】单链表OJ题

🔥博客主页:小王又困了 📚系列专栏:数据结构 🌟人之为学,不日近则日退 ❤️感谢大家点赞👍收藏⭐评论✍️ 目录 一、移除链表元素 💡方法一: 💡方法二…

活动发布会邀请媒体6步走

传媒如春雨,润物细无声,大家好,我是51媒体网胡老师。 邀请媒体参加活动发布会对信息的传播,企业品牌建设有诸多的好处,今天就与大家分享下邀请媒体参加活动报道的6个步骤: 1. 策划与准备: -明…

南卡携手傅园慧,破圈背后开辟了全新营销方法

南卡官宣了!新品牌大使是傅园慧!在极短时间内,该消息迅速登上各平台热搜,并且在抖音等社交平台上也引起了强烈的共振。 一直以来,耳机行业由于本身经营的是耐消品,因此在推广大使的选择上始终持谨慎且保守的…