machinery入门看这一篇(异步任务队列)
前言
哈喽,大家好,我是asong,这次给大家介绍一个go的异步任务框架machinery。使用过python的同学们都知道
Celery
框架,machinery
框架就类似于Celery
框架。下面我们就来学习一下machinery
的基本使用。自己翻译一个粗略版的
machinery
中文文档,有需要的伙伴们公众号自取无水印版:后台回复:machinery即可领取。或者github下载:https://github.com/asong2020/Golang_Dream/tree/master/machinery
抛砖引玉
我们在使用某些APP时,登陆系统后一般会收到一封邮件或者一个短信提示我们在某个时间某某地点登陆了。而邮件或短信都是在我们已经登陆后才收到,这里就是采用的异步机制。大家有没有想过这里为什么没有使用同步机制实现呢?我们来分析一下。假设我们现在采用同步的方式实现,用户在登录时,首先会去检验一下账号密码是否正确,验证通过后去给用户发送登陆提示信息,假如在这一步出错了,那么就会导致用户登陆失败,这样是大大影响用户的体验感的,一个登陆提示的优先级别并不是很高,所以我们完全可以采用异步的机制实现,即使失败了也不会影响用户的体验。前面说了这么多,那么异步机制该怎么实现呢?对,没错,就是machinery
框架,听说你们还不会使用它,今天我就写一个小例子,我们一起来学习一下他吧。
特性
上面只是简单举了个例子,任务队列有着广泛的应用场景,比如大批量的计算任务,当有大量数据插入,通过拆分并分批插入任务队列,从而实现串行链式任务处理或者实现分组并行任务处理,提高系统鲁棒性,提高系统并发度;或者对数据进行预处理,定期的从后端存储将数据同步到到缓存系统,从而在查询请求发生时,直接去缓存系统中查询,提高查询请求的响应速度。适用任务队列的场景有很多,这里就不一一列举了。回归本文主题,既然我们要学习machinery
,就要先了解一下他都有哪些特性呢?
-
任务重试机制 -
延迟任务支持 -
任务回调机制 -
任务结果记录 -
支持Workflow模式:Chain,Group,Chord -
多Brokers支持:Redis, AMQP, AWS SQS -
多Backends支持:Redis, Memcache, AMQP, MongoDB
架构
任务队列,简而言之就是一个放大的生产者消费者模型,用户请求会生成任务,任务生产者不断的向队列中插入任务,同时,队列的处理器程序充当消费者不断的消费任务。基于这种框架设计思想,我们来看下machinery的简单设计结构图例:
-
Sender:业务推送模块,生成具体任务,可根据业务逻辑中,按交互进行拆分; -
Broker:存储具体序列化后的任务,machinery中目前支持到Redis, AMQP,和SQS; -
Worker:工作进程,负责消费者功能,处理具体的任务; -
Backend:后端存储,用于存储任务执行状态的数据;
e.g
学习一门新东西,我都习惯先写一个demo,先学会了走,再学会跑。所以先来看一个例子,功能很简单,异步计算1到10的和。
先看一下配置文件代码:
broker: redis://localhost:6379
default_queue: "asong"
result_backend: redis://localhost:6379
redis:
max_idle: 3
max_active: 3
max_idle_timeout: 240
wait: true
read_timeout: 15
write_timeout: 15
connect_timeout: 15
normal_tasks_poll_period: 1000
delayed_tasks_poll_period: 500
delayed_tasks_key: "asong"
这里broker
与result_backend
来实现。
主代码,完整版github获取:
func main() {
cnf,err := config.NewFromYaml("./config.yml",false)
if err != nil{
log.Println("config failed",err)
return
}
server,err := machinery.NewServer(cnf)
if err != nil{
log.Println("start server failed",err)
return
}
// 注册任务
err = server.RegisterTask("sum",Sum)
if err != nil{
log.Println("reg task failed",err)
return
}
worker := server.NewWorker("asong", 1)
go func() {
err = worker.Launch()
if err != nil {
log.Println("start worker error",err)
return
}
}()
//task signature
signature := &tasks.Signature{
Name: "sum",
Args: []tasks.Arg{
{
Type: "[]int64",
Value: []int64{1,2,3,4,5,6,7,8,9,10},
},
},
}
asyncResult, err := server.SendTask(signature)
if err != nil {
log.Fatal(err)
}
res, err := asyncResult.Get(1)
if err != nil {
log.Fatal(err)
}
log.Printf("get res is %v\n", tasks.HumanReadableResults(res))
}
运行结果:
INFO: 2020/10/31 11:32:15 file.go:19 Successfully loaded config from file ./config.yml
INFO: 2020/10/31 11:32:15 worker.go:58 Launching a worker with the following settings:
INFO: 2020/10/31 11:32:15 worker.go:59 - Broker: redis://localhost:6379
INFO: 2020/10/31 11:32:15 worker.go:61 - DefaultQueue: asong
INFO: 2020/10/31 11:32:15 worker.go:65 - ResultBackend: redis://localhost:6379
INFO: 2020/10/31 11:32:15 redis.go:100 [*] Waiting for messages. To exit press CTRL+C
DEBUG: 2020/10/31 11:32:16 redis.go:342 Received new message: {"UUID":"task_9f01be1f-3237-49f1-8464-eecca2e50597","Name":"sum","RoutingKey":"asong","ETA":null,"GroupUUID":"","GroupTaskCount":0,"Args":[{"Name":"","Type":"[]int64","Value":[1,2,3,4,5,6,7,8,9,10]}],"Headers":{},"Priority":0,"Immutable":false,"RetryCount":0,"RetryTimeout":0,"OnSuccess":null,"OnError":null,"ChordCallback":null,"BrokerMessageGroupId":"","SQSReceiptHandle":"","StopTaskDeletionOnError":false,"IgnoreWhenTaskNotRegistered":false}
DEBUG: 2020/10/31 11:32:16 worker.go:261 Processed task task_9f01be1f-3237-49f1-8464-eecca2e50597. Results = 55
2020/10/31 11:32:16 get res is 55
好啦,现在我们开始讲一讲上面的代码流程,
-
读取配置文件,这一步是为了配置 broker
和result_backend
,这里我都选择的是redis
,因为电脑正好有这个环境,就直接用了。 -
Machinery
库必须在使用前实例化。实现方法是创建一个Server
实例。Server
是Machinery
配置和注册任务的基本对象。 -
在你的 workders
能消费一个任务前,你需要将它注册到服务器。这是通过给任务分配一个唯一的名称来实现的。 -
为了消费任务,你需有有一个或多个worker正在运行。运行worker所需要的只是一个具有已注册任务的 Server
实例。每个worker将只使用已注册的任务。对于队列中的每个任务,Worker.Process()方法将在一个goroutine中运行。可以使用server.NewWorker
的第二参数来限制并发运行的worker.Process()调用的数量(每个worker)。 -
可以通过将 Signature
实例传递给Server
实例来调用任务。 -
调用 HumanReadableResults
这个方法可以处理反射值,获取到最终的结果。
多功能
1. 延时任务
上面的代码只是一个简单machinery
使用示例,其实machiney
也支持延时任务的,可以通过在任务signature
上设置ETA时间戳字段来延迟任务。
eta := time.Now().UTC().Add(time.Second * 20)
signature.ETA = &eta
2. 重试任务
在将任务声明为失败之前,可以设置多次重试尝试。斐波那契序列将用于在一段时间内分隔重试请求。这里可以使用两种方法,第一种直接对tsak signature
中的retryTimeout
和RetryCount
字段进行设置,就可以,重试时间将按照斐波那契数列进行叠加。
//task signature
signature := &tasks.Signature{
Name: "sum",
Args: []tasks.Arg{
{
Type: "[]int64",
Value: []int64{1,2,3,4,5,6,7,8,9,10},
},
},
RetryTimeout: 100,
RetryCount: 3,
}
或者,你可以使用return.tasks.ErrRetryTaskLater
返回任务并指定重试的持续时间。
func Sum(args []int64) (int64, error) {
sum := int64(0)
for _, arg := range args {
sum += arg
}
return sum, tasks.NewErrRetryTaskLater("我说他错了", 4 * time.Second)
}
3. 工作流
上面我们讲的都是运行一个异步任务,但是我们往往做项目时,一个需求是需要多个异步任务以编排好的方式执行的,所以我们就可以使用machinery
的工作流来完成。
3.1 Groups
Group
是一组任务,它们将相互独立地并行执行。还是画个图吧,这样看起来更明了:
一起来看一个简单的例子:
// group
group,err :=tasks.NewGroup(signature1,signature2,signature3)
if err != nil{
log.Println("add group failed",err)
}
asyncResults, err :=server.SendGroupWithContext(context.Background(),group,10)
if err != nil {
log.Println(err)
}
for _, asyncResult := range asyncResults{
results,err := asyncResult.Get(1)
if err != nil{
log.Println(err)
continue
}
log.Printf(
"%v %v %v\n",
asyncResult.Signature.Args[0].Value,
tasks.HumanReadableResults(results),
)
}
group
中的任务是并行执行的。
3.2 chords
我们在做项目时,往往会有一些回调场景,machiney
也为我们考虑到了这一点,Chord
允许你定一个回调任务在groups
中的所有任务执行结束后被执行。
来看一段代码:
callback := &tasks.Signature{
Name: "call",
}
group, err := tasks.NewGroup(signature1, signature2, signature3)
if err != nil {
log.Printf("Error creating group: %s", err.Error())
return
}
chord, err := tasks.NewChord(group, callback)
if err != nil {
log.Printf("Error creating chord: %s", err)
return
}
chordAsyncResult, err := server.SendChordWithContext(context.Background(), chord, 0)
if err != nil {
log.Printf("Could not send chord: %s", err.Error())
return
}
results, err := chordAsyncResult.Get(time.Duration(time.Millisecond * 5))
if err != nil {
log.Printf("Getting chord result failed with error: %s", err.Error())
return
}
log.Printf("%v\n", tasks.HumanReadableResults(results))
上面的例子并行执行task1、task2、task3,聚合它们的结果并将它们传递给callback任务。
3.3 chains
chain
就是一个接一个执行的任务集,每个成功的任务都会触发chain
中的下一个任务。
看这样一段代码:
//chain
chain,err := tasks.NewChain(signature1,signature2,signature3,callback)
if err != nil {
log.Printf("Error creating group: %s", err.Error())
return
}
chainAsyncResult, err := server.SendChainWithContext(context.Background(), chain)
if err != nil {
log.Printf("Could not send chain: %s", err.Error())
return
}
results, err := chainAsyncResult.Get(time.Duration(time.Millisecond * 5))
if err != nil {
log.Printf("Getting chain result failed with error: %s", err.Error())
}
log.Printf(" %v\n", tasks.HumanReadableResults(results))
上面的例子执行task1,然后是task2,然后是task3。当一个任务成功完成时,结果被附加到chain
中下一个任务的参数列表的末尾,最终执行callback
任务。
文中代码地址:https://github.com/asong2020/Golang_Dream/tree/master/machinery/example
总结
这一篇文章到这里就结束了,machinery
还有很多用法,比如定时任务、定时任务组等等,就不在这一篇文章介绍了。更多使用方法解锁可以观看machinery
文档。因为machiney
没有中文文档,所以我在学习的过程自己翻译了一篇中文文档,需要的小伙伴们自取。
获取步骤:关注公众号【Golang梦工厂】,后台回复:machiney即可获取无水印版~~~
好啦,这一篇文章到这就结束了,我们下期见~~。希望对你们有用,又不对的地方欢迎指出,可添加我的golang交流群,我们一起学习交流。
结尾给大家发一个小福利吧,最近我在看[微服务架构设计模式]这一本书,讲的很好,自己也收集了一本PDF,有需要的小伙可以到自行下载。获取方式:关注公众号:[Golang梦工厂],后台回复:[微服务],即可获取。
我翻译了一份GIN中文文档,会定期进行维护,有需要的小伙伴后台回复[gin]即可下载。
我是asong,一名普普通通的程序猿,让gi我一起慢慢变强吧。我自己建了一个golang
交流群,有需要的小伙伴加我vx
,我拉你入群。欢迎各位的关注,我们下期见~~~
推荐往期文章:
-
手把手教姐姐写消息队列 -
常见面试题之缓存雪崩、缓存穿透、缓存击穿 -
详解Context包,看这一篇就够了!!! -
go-ElasticSearch入门看这一篇就够了(一) -
面试官:go中for-range使用过吗?这几个问题你能解释一下原因吗 -
学会wire依赖注入、cron定时任务其实就这么简单! -
听说你还不会jwt和swagger-饭我都不吃了带着实践项目我就来了 -
掌握这些Go语言特性,你的水平将提高N个档次(二) -
go实现多人聊天室,在这里你想聊什么都可以的啦!!! -
grpc实践-学会grpc就是这么简单 -
go标准库rpc实践 -
2020最新Gin框架中文文档 asong又捡起来了英语,用心翻译 -
基于gin的几种热加载方式
本文分享自微信公众号 - Golang梦工厂(AsongDream)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
HashMap面试专题,不看后悔
以前也零零碎碎发过一些HashMap的文章,这次简短总结一下有关HashMap的重要考点,这也是求职面试的经常问的,看完记得点个赞和在看哦~ 1、Hash的概念 将任意长度的输入通过散列算法之后映射成固定长度的输出。 2、Hash冲突 当关键字集合很大时(key的数量很多的时候),关键字值不同的元素可能会映像到哈希表的同一地址上,即K1!=K2,但f(K1)=f(K2),这种现象称为hash冲突,实际中冲突是不可避免的,只能通过改进哈希函数的性能来减少冲突。 3、你认为好的Hash算法的点应该有哪些? (1)效率得高,做到长文本也能高效计算出Hash值 (2)根据Hash值不能逆推出原文 (3)两次输入,如果有一点不同也得保证Hash值是不同的 (4)尽可能要分散,因为在table中slot大部分都处于空闲状态时要尽可能降低Hash冲突 4、HashMap的存储结构长啥样? JDK1.8: (1)数组+链表+红黑树构成,每个数据单元为一个Node结构,Node结构中有key字段、value字段、next字段、hash字段(2)next字段就是发生Hash冲突的时候,当前桶位中的Node...
- 下一篇
RestTemplate设置固定的url参数
在使用RestTemplate请求三方接口时:三方接口一般都要求在url后面拼接上固定的几个参数,一般如accessToken进行权限校验。而我们在开发时,请求这些地址,如何避免在url拼接accessToken这种重复固定的编码操作呢。 方法当然有很多,本文提供一种通过反射偷梁换柱的写法来实现。 以微信小程序服务端接口请求作为请求对象。 微信小程序要求在请求时带上 ?accesss_token=ACCESS_TOKEN image.png 如何实现..? # 基础配置 微信小程序配置类 /***微信小程序配置类**@authorfutao*@date2020/10/29*/@ConfigurationProperties(prefix=WxMiniProgramProperties.PROPERTY_PREFIX)publicclassWxMiniProgramProperties{/***微信小程序配置前缀*/publicstaticfinalStringPROPERTY_PREFIX=Consts.System.FRAMEWORK_BASE_NAME+"."+Consts.WxM...
相关文章
文章评论
共有0条评论来说两句吧...