基于2PC和延迟更新完成分布式消息队列多条事务Golang版本
背景
分布式多消息事务问题
在消息队列使用场景中,有时需要同时下发多条消息,但现在的消息队列比如kafka只支持单条消息的事务保证,不能保证多条消息,今天说的这个方案就时kafka内部的一个子项目中基于2PC和延迟更新来实现分布式事务
2PC
2PC俗称两阶段提交,通过将一个操作分为两个阶段:准备阶段和提交阶段来尽可能保证操作的原子执行(实际上不可能,大家有个概念先)
延迟更新
延迟更新其实是一个很常用的技术手段,简单来说,当某个操作条件不满足时,通过一定手段将数据暂存,等条件满足时在进行执行
基于2PC和延迟队列的分布式事务实现
系统架构
实现也蛮简单的, 在原来的业务消息之后再添加一条事务消息(事务消息可以通过类似唯一ID来关联到之前提交的消息), worker未消费到事物提交的消息,就会一直将消息放在本地延迟存储中,只有当接收到事物提交消息,才会进行业务逻辑处理
业务流程
生产者
- 逐条发送业务消息组
- 发送事务提交消息
消费者
- 消费消息队列,将业务消息存放本地延迟存储
- 接收提交事务消息,从本地延迟存储获取所有数据,然后从延迟存储中删除该消息
代码实现
核心组件
MemoryQuue: 用于模拟消息队列,接收事件分发事件 Worker: 模拟具体业务服务,接收消息,存入本地延迟更新存储,或者提交事务触发业务回调
Event与EventListener
Event: 用于标识事件,用户将业务数据封装成事件存入到MemoryQueue中 EventListener: 事件回调接口,用于MemoryQueue接收到数据后的回调 事件在发送的时候,需要通过一个前缀来进行事件类型标识,这里有三种TaskPrefix、CommitTaskPrefix、ClearTaskPrefix
const (
// TaskPrefix 任务key前缀
TaskPrefix string = "task-"
// CommitTaskPrefix 提交任务key前缀
CommitTaskPrefix string = "commit-"
// ClearTaskPrefix 清除任务
ClearTaskPrefix string = "clear-"
)
// Event 事件类型
type Event struct {
Key string
Name string
Value interface{}
}
// EventListener 用于接收消息回调
type EventListener interface {
onEvent(event *Event)
}
MemoryQueue
MemoryQueue内存消息队列,通过Push接口接收用户数据,通过AddListener来注册EventListener, 同时内部通过poll来从chan event取出数据分发给所有的Listener
// MemoryQueue 内存消息队列
type MemoryQueue struct {
done chan struct{}
queue chan Event
listeners []EventListener
wg sync.WaitGroup
}
// Push 添加数据
func (mq *MemoryQueue) Push(eventType, name string, value interface{}) {
mq.queue <- Event{Key: eventType + name, Name: name, Value: value}
mq.wg.Add(1)
}
// AddListener 添加监听器
func (mq *MemoryQueue) AddListener(listener EventListener) bool {
for _, item := range mq.listeners {
if item == listener {
return false
}
}
mq.listeners = append(mq.listeners, listener)
return true
}
// Notify 分发消息
func (mq *MemoryQueue) Notify(event *Event) {
defer mq.wg.Done()
for _, listener := range mq.listeners {
listener.onEvent(event)
}
}
func (mq *MemoryQueue) poll() {
for {
select {
case <-mq.done:
break
case event := <-mq.queue:
mq.Notify(&event)
}
}
}
// Start 启动内存队列
func (mq *MemoryQueue) Start() {
go mq.poll()
}
// Stop 停止内存队列
func (mq *MemoryQueue) Stop() {
mq.wg.Wait()
close(mq.done)
}
Worker
Worker接收MemoryQueue里面的数据,然后在本地根据不同类型来进行对应事件事件类型处理, 主要是通过事件的前缀来进行对应事件回调函数的选择
// Worker 工作进程
type Worker struct {
name string
deferredTaskUpdates map[string][]Task
onCommit ConfigUpdateCallback
}
func (w *Worker) onEvent(event *Event) {
switch {
// 获取任务事件
case strings.Contains(event.Key, TaskPrefix):
w.onTaskEvent(event)
// 清除本地延迟队列里面的任务
case strings.Contains(event.Key, ClearTaskPrefix):
w.onTaskClear(event)
// 获取commit事件
case strings.Contains(event.Key, CommitTaskPrefix):
w.onTaskCommit(event)
}
}
事件处理任务
事件处理任务主要分为:onTaskClear(从本地清楚该数据)、onTaskEvent(数据存储本地延迟存储进行暂存)、onTaskCommit(事务提交)
func (w *Worker) onTaskClear(event *Event) {
task, err := event.Value.(Task)
if !err {
// log
return
}
_, found := w.deferredTaskUpdates[task.Group]
if !found {
return
}
delete(w.deferredTaskUpdates, task.Group)
// 还可以继续停止本地已经启动的任务
}
// onTaskCommit 接收任务提交, 从延迟队列中取出数据然后进行业务逻辑处理
func (w *Worker) onTaskCommit(event *Event) {
// 获取之前本地接收的所有任务
tasks, found := w.deferredTaskUpdates[event.Name]
if !found {
return
}
// 获取配置
config := w.getTasksConfig(tasks)
if w.onCommit != nil {
w.onCommit(config)
}
delete(w.deferredTaskUpdates, event.Name)
}
// onTaskEvent 接收任务数据,此时需要丢到本地暂存不能进行应用
func (w *Worker) onTaskEvent(event *Event) {
task, err := event.Value.(Task)
if !err {
// log
return
}
// 保存任务到延迟更新map
configs, found := w.deferredTaskUpdates[task.Group]
if !found {
configs = make([]Task, 0)
}
configs = append(configs, task)
w.deferredTaskUpdates[task.Group] = configs
}
// getTasksConfig 获取task任务列表
func (w *Worker) getTasksConfig(tasks []Task) map[string]string {
config := make(map[string]string)
for _, t := range tasks {
config = t.updateConfig(config)
}
return config
}
主流程
unc main() {
// 生成一个内存队列启动
queue := NewMemoryQueue(10)
queue.Start()
// 生成一个worker
name := "test"
worker := NewWorker(name, func(data map[string]string) {
for key, value := range data {
println("worker get task key: " + key + " value: " + value)
}
})
// 注册到队列中
queue.AddListener(worker)
taskName := "test"
// events 发送的任务事件
configs := []map[string]string{
map[string]string{"task1": "SendEmail", "params1": "Hello world"},
map[string]string{"task2": "SendMQ", "params2": "Hello world"},
}
// 分发任务
queue.Push(ClearTaskPrefix, taskName, nil)
for _, conf := range configs {
queue.Push(TaskPrefix, taskName, Task{Name: taskName, Group: taskName, Config: conf})
}
queue.Push(CommitTaskPrefix, taskName, nil)
// 停止队列
queue.Stop()
}
输出
# go run main.go
worker get task key: params1 value: Hello world
worker get task key: task1 value: SendEmail
worker get task key: params2 value: Hello world
worker get task key: task2 value: SendMQ
总结
在分布式环境中,很多时候并不需要使用CP模型,更多时候是满足最终一致性即可
基于2PC和延迟队列的这种设计,主要是依赖于事件驱动的架构
在kafka connect中, 每次节点变化都会触发一次任务的重分配,所以延迟存储直接用的就是内存中的HashMap, 因为即使分配消息的主节点挂了,那就再触发一次事件,直接将HashMap里面的数据清掉,进行下一次事务即可,并不需要保证延迟存储里面的数据不丢,
所以方案因环境、需求不同,可以做一些取舍,没必要什么东西都去加一个CP模型的中间件进来,当然其实那样更简单
未完待续!
更多文章可以访问http://www.sreguide.com/

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
马蜂窝推荐系统容灾缓存服务的设计与实现
数据库突然断开连接、第三方接口迟迟不返回结果、高峰期网络发生抖动...... 当程序突发异常时,我们的应用可以告诉调用方或者用户「对不起,服务器出了点问题」;或者找到更好的方式,达到提升用户体验的目的。 一、背景 用户在马蜂窝 App 上「刷刷刷」时,推荐系统需要持续给用户推荐可能感兴趣的内容,主要分为根据用户特性和业务场景,召回根据各种机器学习算法计算过的内容,然后对这些内容进行排序后返回给前端这几个步骤。 推荐的过程涉及到 MySQL 和 Redis 查询、REST 服务调用、数据处理等一系列操作。对于推荐系统来说,对时延的要求比较高。马蜂窝推荐系统对于请求的平均处理时延要求在 10ms 级别,时延的 99 线保持在 1s 以内。 当外部或者内部系统出现异常时,推荐系统就无法在限定时间内返回数据给到前端,导致用户刷不出来新内容,影响用户体验。 所以我们希望通过设计一套容灾缓存服务,实现在应用本身或者依赖的服务发生超时等异常情况时,可以返回缓存数据给到前端和用户,来减少空结果数量,并且保证这些数据尽可能是用户感兴趣的。 二、设计与实现 设计思路和技术选型 不仅仅是推荐系统,缓存技...
-
下一篇
用Q-learning算法实现自动走迷宫机器人
【技术沙龙002期】数据中台:宜信敏捷数据中台建设实践|宜信技术沙龙 将于5月23日晚8点线上直播,点击报名 项目描述: 在该项目中,你将使用强化学习算法,实现一个自动走迷宫机器人。 如上图所示,智能机器人显示在右上角。在我们的迷宫中,有陷阱(红色炸弹)及终点(蓝色的目标点)两种情景。机器人要尽量避开陷阱、尽快到达目的地。 小车可执行的动作包括:向上走 u、向右走 r、向下走 d、向左走l。 执行不同的动作后,根据不同的情况会获得不同的奖励,具体而言,有以下几种情况。 撞到墙壁:-10 走到终点:50 走到陷阱:-30 其余情况:-0.1 我们需要通过修改 robot.py 中的代码,来实现一个 Q Learning 机器人,实现上述的目标。 Section 1 算法理解 1.1 强化学习总览 强化学习作为机器学习算法的一种,其模式也是让智能体在“训练”中学到“经验”,以实现给定的任务。但不同于监督学习与非监督学习,在强化学习的框架中,我们更侧重通过智能体与环境的交互来学习。通常在监督学习和非监督学习任务中,智能体往往需要通过给定的训练集,辅之以既定的训练目标(如最小化损失函数),通过...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- CentOS6,CentOS7官方镜像安装Oracle11G
- Dcoker安装(在线仓库),最新的服务器搭配容器使用
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2全家桶,快速入门学习开发网站教程