如何设计一个可扩展的短信发送队列模块
项目地址
前言
在日常开发中,短信发送是一个非常常见的功能需求。无论是用户注册验证码、密码重置,还是营销通知,短信服务都扮演着重要角色。然而,直接调用短信服务商的 API 往往会带来一些问题:
- 网络延迟影响用户体验:同步发送短信会阻塞请求,导致响应变慢
- 高并发场景下的限流问题:短信服务商通常有 QPS 限制
- 发送失败需要重试:网络抖动或服务商临时故障可能导致发送失败
- 多服务商切换成本高:不同服务商的 API 接口差异大,切换时需要大量修改代码
本文将基于 gin-fast-tenant 项目中的短信插件模块,分享如何设计一个可扩展的短信发送队列模块。
一、整体架构设计
1.1 核心设计理念
我们的设计遵循以下几个核心原则:
- 接口驱动:通过接口抽象,实现与具体实现的解耦
- 生产者-消费者模式:异步处理短信发送任务
- 单例模式:确保队列全局唯一,避免资源浪费
- 回调机制:灵活处理发送结果
1.2 模块结构
plugins/plusms/smshelper/
├── smsqueue/ # 核心队列模块(与具体服务商无关)
│ ├── interfaces.go # 接口定义
│ ├── queue.go # 队列实现
│ ├── callback.go # 回调函数
│ ├── errors.go # 错误定义
│ └── global.go # 全局队列
└── alisms/ # 阿里云短信实现
├── alisms.go # 阿里云客户端
└── queue.go # 阿里云队列封装
这种分层设计使得核心队列逻辑与具体短信服务商完全解耦,未来要接入腾讯云、华为云等服务商,只需实现相应的 Provider 即可。
二、核心接口设计
2.1 TemplateConfig 接口
所有短信模板必须实现此接口:
// TemplateConfig 短信模板配置接口
type TemplateConfig interface {
Json() string // 返回模板参数的JSON字符串
GetTemplateCode() string // 返回模板代码
GetSignName() string // 返回签名名称
}
设计思路:不同短信服务商的模板参数格式可能不同,但通过统一的接口,我们可以让上层代码无需关心底层差异。
示例实现:
type TemplateDemo1 struct {
Code string `json:"code"`
}
func (t *TemplateDemo1) Json() string {
b, _ := json.Marshal(t)
return string(b)
}
func (t *TemplateDemo1) GetTemplateCode() string {
return "SMS_218038860"
}
func (t *TemplateDemo1) GetSignName() string {
return "奇讯科技"
}
2.2 SMSProvider 接口
短信提供商必须实现此接口:
// SMSProvider 短信提供者接口
type SMSProvider interface {
Send(phoneNumbers string, tpl TemplateConfig) (SMSResponse, error)
}
设计思路:这是整个模块最核心的抽象。通过这个接口,我们可以轻松切换不同的短信服务商,而无需修改上层调用代码。
阿里云实现示例:
type AliSMS struct {
Debug bool
AccessKeyId string
AccessKeySecret string
*dysmsapi20170525.Client
}
func (ali *AliSMS) Send(phoneNumbers string, tpl smsqueue.TemplateConfig) (res smsqueue.SMSResponse, _err error) {
if ali.Debug {
// 调试模式,不实际发送
return
}
sendSmsRequest := &dysmsapi20170525.SendSmsRequest{
SignName: tea.String(tpl.GetSignName()),
TemplateCode: tea.String(tpl.GetTemplateCode()),
PhoneNumbers: tea.String(phoneNumbers),
TemplateParam: tea.String(tpl.Json()),
}
// 调用阿里云 API...
}
2.3 Callback 接口
回调函数接口:
// Callback 回调函数接口
type Callback interface {
OnSuccess(task *SMSTask, response SMSResponse) // 发送成功时调用
OnError(task *SMSTask, err error) // 发送失败时调用
}
设计思路:回调机制让调用方可以灵活处理发送结果,比如记录日志、更新数据库状态等。
函数适配器:
为了让普通函数也能作为回调使用,我们提供了 CallbackFunc 适配器:
type CallbackFunc func(task *SMSTask, response SMSResponse, err error)
func (f CallbackFunc) OnSuccess(task *SMSTask, response SMSResponse) {
f(task, response, nil)
}
func (f CallbackFunc) OnError(task *SMSTask, err error) {
f(task, nil, err)
}
这样,我们可以用更简洁的方式创建回调:
callback := smsqueue.NewCallback(func(task *smsqueue.SMSTask, response smsqueue.SMSResponse, err error) {
if err != nil {
fmt.Printf("发送失败: %v\n", err)
return
}
fmt.Printf("发送成功: BizId=%s\n", response.GetBizId())
})
三、队列实现详解
3.1 队列结构
type smsQueue struct {
mu sync.RWMutex
taskChan chan *SMSTask // 任务通道
workerNum int // 工作协程数量
stopChan chan struct{} // 停止信号
wg sync.WaitGroup // 等待组
defaultProvider SMSProvider // 默认短信提供者
globalCallbacks []Callback // 全局回调
isRunning bool // 运行状态
maxQueueSize int // 队列最大容量
currentQueueSize int // 当前队列大小
}
3.2 创建队列
func NewSMSQueue(workerNum int, maxQueueSize int) Queue {
if workerNum <= 0 {
workerNum = 3
}
return &smsQueue{
taskChan: make(chan *SMSTask, maxQueueSize),
workerNum: workerNum,
stopChan: make(chan struct{}),
maxQueueSize: maxQueueSize,
currentQueueSize: 0,
globalCallbacks: make([]Callback, 0),
}
}
设计要点:
- 使用带缓冲的 channel 作为任务队列,避免阻塞
maxQueueSize 为 0 时表示无限制
- 默认 3 个工作协程,可根据实际需求调整
3.3 添加任务
func (q *smsQueue) AddTask(task *SMSTask) error {
q.mu.Lock()
defer q.mu.Unlock()
// 检查队列容量
if q.maxQueueSize > 0 && q.currentQueueSize >= q.maxQueueSize {
return ErrQueueFull
}
// 生成任务ID(如果未设置)
if task.ID == "" {
task.ID = generateTaskID()
}
// 设置默认重试次数
if task.MaxRetries <= 0 {
task.MaxRetries = 3
}
// 尝试将任务推送到任务通道
select {
case q.taskChan <- task:
q.currentQueueSize++
case <-time.After(5 * time.Second):
return ErrQueueFull
}
return nil
}
设计要点:
- 使用
select + time.After 实现超时机制
- 自动生成任务 ID,便于追踪
- 默认重试 3 次
3.4 启动工作协程
func (q *smsQueue) Start() {
q.mu.Lock()
defer q.mu.Unlock()
if q.isRunning {
return
}
q.isRunning = true
// 启动工作协程
for i := 0; i < q.workerNum; i++ {
q.wg.Add(1)
go q.worker()
}
}
3.5 工作协程实现
func (q *smsQueue) worker() {
defer q.wg.Done()
for {
select {
case <-q.stopChan:
return
case task, ok := <-q.taskChan:
if !ok {
return
}
// 任务已从通道取出,减少队列大小
q.mu.Lock()
q.currentQueueSize--
q.mu.Unlock()
q.processTask(task)
}
}
}
设计要点:
- 使用
select 监听停止信号和任务通道
- 任务取出后立即减少队列计数
- 使用
WaitGroup 确保所有协程优雅退出
3.6 任务处理与重试机制
func (q *smsQueue) processTask(task *SMSTask) {
// 选择提供者
provider := task.Provider
if provider == nil {
provider = q.defaultProvider
}
if provider == nil {
q.handleError(task, ErrNoProvider)
return
}
// 发送短信
response, err := provider.Send(task.PhoneNumbers, task.Template)
// 处理结果
if err != nil {
// 发送失败,检查是否需要重试
if task.RetryCount < task.MaxRetries {
task.RetryCount++
// 延迟重试(指数退避)
time.Sleep(time.Duration(task.RetryCount) * time.Second)
// 重新加入队列
if err := q.AddTask(task); err != nil {
q.handleError(task, err)
}
return
}
q.handleError(task, err)
} else {
q.handleSuccess(task, response)
}
}
设计要点:
- 支持任务级 Provider,可以为不同任务指定不同的服务商
- 自动重试机制,默认重试 3 次
- 指数退避策略,避免短时间内频繁重试
3.7 回调处理
func (q *smsQueue) handleSuccess(task *SMSTask, response SMSResponse) {
// 调用任务级回调
if task.Callback != nil {
task.Callback.OnSuccess(task, response)
}
// 调用全局回调
for _, cb := range q.globalCallbacks {
cb.OnSuccess(task, response)
}
}
func (q *smsQueue) handleError(task *SMSTask, err error) {
// 调用任务级回调
if task.Callback != nil {
task.Callback.OnError(task, err)
}
// 调用全局回调
for _, cb := range q.globalCallbacks {
cb.OnError(task, err)
}
}
设计要点:
- 先调用任务级回调,再调用全局回调
- 全局回调可以用于统一记录日志、更新数据库等
四、单例模式与封装
4.1 阿里云队列单例
var (
aliQueue smsqueue.Queue
aliQueueOnce sync.Once
)
func GetQueue() smsqueue.Queue {
aliQueueOnce.Do(func() {
// 创建队列,3个工作协程,队列容量100
aliQueue = smsqueue.NewSMSQueue(3, 100)
// 设置默认提供者为阿里云短信服务
aliQueue.SetDefaultProvider(GetSingletonAliSMS())
// 启动队列
aliQueue.Start()
})
return aliQueue
}
设计要点:
- 使用
sync.Once 确保队列只初始化一次
- 懒加载,首次调用时才创建队列
4.2 便捷发送函数
func Send(ctx context.Context, phoneNumbers string, template smsqueue.TemplateConfig, callback ...smsqueue.Callback) error {
var cb smsqueue.Callback
if len(callback) > 0 {
cb = callback[0]
}
task := &smsqueue.SMSTask{
PhoneNumbers: phoneNumbers,
Template: template,
Callback: cb,
Context: ctx,
}
return GetQueue().AddTask(task)
}
设计要点:
- 使用可变参数,回调函数可选
- 自动创建任务并添加到队列
五、配置管理
5.1 配置文件
# config/sms.yml
alisms:
AccessKeyId: "" # 阿里云短信服务AccessKeyId
AccessKeySecret: "" # 阿里云短信服务AccessKeySecret
Debug: false # 是否开启调试模式
5.2 配置读取
func NewAliSMS() *AliSMS {
AccessKeyId := smsconfig.GetSmsConfig().GetString("alisms.AccessKeyId")
AccessKeySecret := smsconfig.GetSmsConfig().GetString("alisms.AccessKeySecret")
Debug := smsconfig.GetSmsConfig().GetBool("alisms.Debug")
config := &openapi.Config{
AccessKeyId: tea.String(AccessKeyId),
AccessKeySecret: tea.String(AccessKeySecret),
}
config.Endpoint = tea.String("dysmsapi.aliyuncs.com")
client, err := dysmsapi20170525.NewClient(config)
if err != nil {
panic(err)
}
return &AliSMS{
Debug: Debug,
AccessKeyId: AccessKeyId,
AccessKeySecret: AccessKeySecret,
Client: client,
}
}
设计要点:
- 调试模式下不实际发送短信,方便开发测试
- 配置与代码分离,便于部署
六、使用示例
6.1 基础用法
import "gin-fast/plugins/plusms/smshelper/alisms"
// 发送验证码短信
err := alisms.Send(context.Background(), "13800138000", &alisms.TemplateDemo1{
Code: "123456",
})
if err != nil {
// 处理错误
}
6.2 使用回调
callback := smsqueue.NewCallback(func(task *smsqueue.SMSTask, response smsqueue.SMSResponse, err error) {
if err != nil {
fmt.Printf("发送失败: %v\n", err)
return
}
fmt.Printf("发送成功: BizId=%s\n", response.GetBizId())
})
err := alisms.Send(context.Background(), "13800138000", &alisms.TemplateDemo1{Code: "123456"}, callback)
6.3 注册全局回调
// 注册全局回调(所有短信发送都会调用)
alisms.RegisterGlobalCallback(&MyGlobalCallback{})
6.4 自定义短信提供商
type MySMSProvider struct{}
func (p *MySMSProvider) Send(phoneNumbers string, tpl smsqueue.TemplateConfig) (smsqueue.SMSResponse, error) {
// 实现发送逻辑
return &myResponse{}, nil
}
// 设置为默认提供者
alisms.SetDefaultProvider(&MySMSProvider{})
七、设计亮点总结
7.1 高度解耦
通过接口抽象,核心队列逻辑与具体短信服务商完全解耦。未来要接入新的服务商,只需实现 SMSProvider 接口即可,无需修改队列代码。
7.2 灵活的回调机制
支持任务级回调和全局回调,可以灵活处理发送结果,满足不同业务场景的需求。
7.3 可靠的重试机制
自动重试 + 指数退避策略,有效应对网络抖动和临时故障。
7.4 优雅的并发控制
使用 channel + worker pool 模式,有效控制并发度,避免资源耗尽。
7.5 易于测试
通过接口抽象,可以轻松 mock Provider 和 Queue,便于单元测试。
八、扩展建议
8.1 优先级队列
当前实现是 FIFO 队列,如果需要支持优先级,可以考虑:
type PriorityTask struct {
*SMSTask
Priority int
}
// 使用 heap 实现优先级队列
type PriorityQueue []*PriorityTask
func (pq PriorityQueue) Len() int { return len(pq) }
func (pq PriorityQueue) Less(i, j int) bool {
return pq[i].Priority < pq[j].Priority
}
func (pq PriorityQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] }
func (pq *PriorityQueue) Push(x interface{}) { *pq = append(*pq, x.(*PriorityTask)) }
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
*pq = old[0 : n-1]
return item
}
8.2 持久化队列
当前队列是内存队列,应用重启会丢失任务。可以考虑:
- 使用 Redis 作为任务队列
- 使用数据库存储任务状态
- 实现任务恢复机制
8.3 限流控制
在 AddTask 中增加限流逻辑:
type RateLimiter struct {
tokens chan struct{}
}
func NewRateLimiter(qps int) *RateLimiter {
rl := &RateLimiter{
tokens: make(chan struct{}, qps),
}
for i := 0; i < qps; i++ {
rl.tokens <- struct{}{}
}
go rl.refill()
return rl
}
func (rl *RateLimiter) refill() {
ticker := time.NewTicker(time.Second / time.Duration(cap(rl.tokens)))
for range ticker.C {
select {
case rl.tokens <- struct{}{}:
default:
}
}
}
func (rl *RateLimiter) Acquire() {
<-rl.tokens
}
8.4 监控指标
增加监控指标:
结语
本文分享了一个可扩展的短信发送队列模块的设计思路和实现细节。通过接口抽象、生产者-消费者模式、回调机制等设计模式,我们构建了一个既简单又强大的短信发送系统。
这个模块的核心价值在于:
- 开箱即用:简单的 API 设计,几行代码即可发送短信
- 易于扩展:通过接口抽象,轻松接入新的短信服务商
- 可靠稳定:自动重试、并发控制、优雅停止等机制保证系统稳定性
希望本文能对大家有所帮助,欢迎交流讨论!