message-pipe v1.0.1 发布,支持 Nacos 服务发现
Message Pipe
基于Redis
实现的分布式消息顺序消费管道。
GitHub地址:https://github.com/minbox-projects/message-pipe
Gitee地址:https://gitee.com/minbox-projects/message-pipe
I. 什么是Message Pipe?
Message Pipe
是基于Redis
实现的顺序消息管道,由于内部引入了Redisson
分布式锁所以它是线程安全的,多线程情况下也会按照写入管道的顺序执行消费。
Message Pipe
采用Client
、Server
概念进行设计,内部通过grpc-netty
来建立消息通道相互通信的长连接,消息的分发由Server
负责,而每一个管道内的消息在分发时会通过LoadBalance(负载均衡)
的方式来获取在线的Client
信息并向Client
顺序发送消息。
II. 更新日志
✨ New Features
- [ #39 ] Client通过 "Cglib动态代理" 的方式实现动态绑定管道
- [ #40 ] Client/Server 通过正则表达式进行匹配 "pipeName"
- [ #41 ] 禁用Server接收注册请求后根据每一个"Pipe Name"创建消息管道
- [ #47 ] 使用Jackson代替fastjson转换实体与json字符串之间的相互转换方式
- [ #51 ] MessageProcessor新增正则表达式方式处理消息,并为每个匹配的表达式管道建立一个Porxy代理类
- [ #59 ] Server启动时自动加载Redis内的消息管道列表,并自动创建MessagePipe实例
- [ #64 ] 重构Client连接Server的实现方式,新增支持Nacos NamingService方式
🐛 Fix Bugs
- [ #45 ] 修复Client启动时一直重试注册到Server,导致阻塞主线程
- [ #48 ] 删除客户端ReceiveMessageService处理消息时使用线程池
- [ #53 ] 修复Redisson在高并发下出现的解锁异常
- [ #55 ] 修复获取MessageProcessors实例时可能出现线程安全性问题
- [ #57 ] 消息分发时,只有存在客户端列表才进行处理消息发送逻辑
- [ #61 ] Server运行过程中CPU飙升
- [ #65 ] Client注册时偶尔会出现获取IP地址为 "127.0.0.1"的情况
III. 特性
- 自动注册
- 心跳检查
- 消息分发
- 顺序消费
- 读写分离
- 线程安全
- 负载均衡
- 自动剔除
IIII. 快速上手
为了快速上手,提供了message-pipe
使用的示例项目,项目源码:https://github.com/minbox-projects/message-pipe-example。
4.1 安装Redis
由于message-pipe
基于Redis
实现,所以我们首先需要在本机安装Redis
,下面是使用Docker
方式安装步骤:
# 拉取Redis镜像 docker pull redis # 创建一个名为"redis"的后台运行容器,端口号映射宿主机6379 docker run --name redis -d -p 6379:6379 redis
4.2 查看Redis数据
# 运行容器内命令 docker exec -it redis /bin/sh # 运行Redis客户端 redis-cli # 选择索引为1的数据库 select 1 # 查看全部的数据 keys *
4.3 启动示例项目
# 下载源码 git clone https://github.com/minbox-projects/message-pipe-example.git # 进入项目目录 cd message-pipe-example # 运行项目 mvn spring-boot:run
V. License
message-pipe
采用Apache2开源许可进行编写。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
关于 Python 装饰器的一些个人理解
装饰器 本质是一个接受参数为函数的函数。 作用:为一个已经实现的方法添加额外的通用功能,比如日志记录、运行计时等。 举例 不带参数的装饰器,不用@ # 不带参数的装饰器 def deco_test(func): def wrapper(*args, **kwargs): print("before function") f = func(*args, **kwargs) print("after function") return f return wrapper def do_something(a,b,c): print(a) time.sleep(1) print(b) time.sleep(1) print(c) return a if __name__ == '__main__': # 不用@ f = deco_test(do_something)("1","2","3") 输出: before function 1 2 3 after function 个人理解: 相当于在 do_something 函数外面套了两个输出:before function 和 after fu...
- 下一篇
Newbe.Claptrap 0.7.4 发布,增加 saga 支持
更新内容 类库 新增 saga claptrap,以支持 saga 操作。(beta) 新增 MySql 作为 Event 和 State 的存储器 开发文档 Newbe.Claptrap 框架入门,第三步 —— 定义 Claptrap,管理商品库存 Newbe.Claptrap 框架入门,第四步 —— 利用 Minion,商品下单 软件介绍 这是以反应式、事件溯源和Actor模式作为基本理论的一套服务端开发框架。于此之上,开发者可以更为简单的开发出“分布式”、“可水平扩展”、“可测试性高”的应用系统。 该项目受启发于众多开源项目与博客文章: 基于Actor框架Orleans构建的分布式、事件溯源、事件驱动、最终一致性的高性能框架——Ray Event Sourcing Pattern Event Sourcing Pattern 中文译文 Orleans - Distributed Virtual Actor Model ENode 1.0 - Saga的思想与实现 实现入门篇 Newbe.Claptrap 框架入门,第一步 —— 创建项目,实现简易购物车 Newbe.Claptra...
相关文章
文章评论
共有0条评论来说两句吧...