taskq 是一个带有 Redis、SQS、IronMQ 和内存后端的 Golang 异步任务/作业队列。
特性:
- Redis、SQS、IronMQ 和内存后端。
- 自动缩放用于获取(fetcher)和处理消息(worker)的 goroutines 的数量。
- 全局速率限制。
- 工人的全球限制。
- 调用一次 - 删除具有相同名称的重复消息。
- 使用指数退避自动重试。
- 当队列中的所有消息都失败时自动暂停。
- 用于处理失败消息的后备处理程序。
- 消息批处理。它用于 SQS 和 IronMQ 后端批量添加/删除消息。
- 使用 snappy / s2 自动压缩消息。
API overview
t := myQueue.RegisterTask(&taskq.TaskOptions{
Name: "greeting",
Handler: func(name string) error {
fmt.Println("Hello", name)
return nil
},
})
// Say "Hello World".
err := myQueue.Add(t.WithArgs(context.Background(), "World"))
if err != nil {
panic(err)
}
// Say "Hello World" with 1 hour delay.
msg := t.WithArgs(ctx, "World")
msg.Delay = time.Hour
_ = myQueue.Add(msg)
// Say "Hello World" once.
for i := 0; i < 100; i++ {
msg := t.WithArgs(ctx, "World")
msg.Name = "hello-world" // unique
_ = myQueue.Add(msg)
}
// Say "Hello World" once with 1 hour delay.
for i := 0; i < 100; i++ {
msg := t.WithArgs(ctx, "World")
msg.Name = "hello-world"
msg.Delay = time.Hour
_ = myQueue.Add(msg)
}
// Say "Hello World" once in an hour.
for i := 0; i < 100; i++ {
msg := t.WithArgs(ctx, "World").OnceInPeriod(time.Hour)
_ = myQueue.Add(msg)
}
// Say "Hello World" for Europe region once in an hour.
for i := 0; i < 100; i++ {
msg := t.WithArgs(ctx, "World").OnceInPeriod(time.Hour, "World", "europe")
_ = myQueue.Add(msg)
}