事件领域模型 kaka-core 5.0 预览版发布
kaka-core 是一项服务于 Java 后端的事件领域模型,全局事件通知框架。
kaka-core 已移至 https://gitee.com/zkpursuit/kaka-core , 并支持 maven 直接安装。
本次更新新增与第三方消息队列对接的功能
1、稍加编码就能对接市面上所有第三方消息队列。
2、通过消息队列派发和消费事件可由远程事件处理器处理并返回处理结果。
3、返回处理结果与本地执行事件完全相同(注:SyncResult消费处理远程事件时不可用)。
4、事件保证一次消费处理,不会多次重复处理。
5、稳定性完全由第三方消息队列决定。
原理:每个事件调度中心为消息的发布者亦为消息的订阅者,派发事件即将事件发布到消息队列,订阅者消费到事件后本地化处理事件,处理完成后再次将事件发布到消息队列,根据事件ID在发送方找到缓存在内存的原始事件对象并进行结果赋值或回调。
原理执行流程图(感谢用户 微信名:碧涛 提供此图):
基本范例:
Facade facade = FacadeFactory.getFacade();
//以下通过ActiveMQ消息队列消费处理事件,并获得事件处理结果
facade.initRemoteMessageQueue(new ActiveMQ("event_exec_before", "event_exec_after")); //此行全局一次设定
Message message = new Message("20000", "让MyCommand接收执行");
IResult<String> result4 = message.setResult("ResultMsg", new AsynResult<>(5000));
facade.sendMessageByQueue(message);
System.out.println("消息队列消费处理事件结果:" + result4.get());
facade.sendMessageByQueue(new Message("40000", "", (IResult<Object> result) -> {
String clasz = ((CallbackResult<Object>) result).eventHanderClass;
StringBuilder sb = new StringBuilder("消息队列消费处理事件结果异步回调:\t" + clasz + "\t");
Object resultObj = result.get();
if (resultObj instanceof Object[]) {
Object[] ps = (Object[]) resultObj;
sb.append(Arrays.toString(ps));
} else {
sb.append(resultObj);
}
System.out.println(sb);
}));
package kaka.test;
import com.kaka.notice.RemoteMessageQueue;
import kaka.test.util.KryoSerializer;
import kaka.test.util.Serializer;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
/**
* 本类仅为测试用例,ActiveMQ消息队列的访问实现代码是否为最优不做考虑,在此仅为范例参考
*/
public class ActiveMQ extends RemoteMessageQueue {
private static class TestLocalMessageCache implements LocalMessageCache {
private final Map<String, com.kaka.notice.Message> localMap = new ConcurrentHashMap<>();
@Override
public void add(String id, com.kaka.notice.Message message) {
localMap.put(id, message);
}
@Override
public com.kaka.notice.Message remove(String id) {
return localMap.remove(id);
}
}
private static final String BROKER_URL = "tcp://101.34.22.36:61616";
private final ActiveMQConnectionFactory activeMQConnectionFactory;
private final Serializer<MessageWrap> eventSerializer = new KryoSerializer<>();
public ActiveMQ(String beforeTopic, String afterTopic) {
super(beforeTopic, afterTopic);
this.activeMQConnectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
this.init();
}
@Override
protected LocalMessageCache initLocalMessageCache() {
return new TestLocalMessageCache();
}
@Override
protected void publishEventMessage(MessageWrap msgWrap, String topic) {
byte[] bytes = this.eventSerializer.serialize(msgWrap);
try {
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(topic));
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(bytes);
producer.send(bytesMessage);
producer.close();
connection.close();
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
private void init() {
new Thread(() -> {
try {
consume(beforeTopic, (byte[] bytes) -> {
MessageWrap remoteMsgWrap = eventSerializer.deserialize(bytes);
receivedBeforeEventMessage(remoteMsgWrap); //这里很重要,必须调用
});
} catch (Exception e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
consume(afterTopic, (byte[] bytes) -> {
MessageWrap remoteMsgWrap = eventSerializer.deserialize(bytes);
receivedAfterEventMessage(remoteMsgWrap); //这里很重要,必须调用
});
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
private void consume(String topic, Consumer<byte[]> callback) throws Exception {
Connection connection = this.activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(topic);
MessageConsumer consumer = session.createConsumer(destination);
while (true) {
Message message = consumer.receive();
if (null == message) break;
if (message instanceof BytesMessage) {
BytesMessage bytesMessage = (BytesMessage) message;
try {
int byteSize = (int) bytesMessage.getBodyLength();
byte[] bytes = new byte[byteSize];
bytesMessage.readBytes(bytes);
callback.accept(bytes);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
consumer.close();
session.close();
connection.close();
}
}
以上范例完整代码可在源码 test 中查阅,
源码地址:https://gitee.com/zkpursuit/kaka-core/tree/master/src/test/java/kaka/test
关注公众号
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
GIMP 2.99.8 发布
GIMP2.99.8 现已发布,此版本再次带来了大量改进。 GIMP 即GNU Image Manipulation Program(GNU 图像处理程序)的首字母组成,是一个自由开源的位图图像编辑器,用于图像照片润饰及编辑、自由绘图、调整大小、裁剪、照片蒙太奇、装换图像格式以及其他专业任务。GIMP 几乎拥有所有图象处理所需的功能,号称 Linux 下的 Photoshop。 新版本更新亮点有: 在 multiple layers 上的 Clone-type工具 修复了 Wayland 和 macOS 上的Selection cue 由于 Windows Ink 支持,输入设备的覆盖范围更广 Canvas-focus by toolbox clicking 删除thumbnail icon 改进的文件格式支持:JPEG-XL、PSD/PSB等 插件开发 内存泄漏修复 持续集成变化 Windows 开发安装程序“nightlies” 自动发布安装程序 安装程序脚本的一致性 Linux MacOS mergerequests的自动构建 更新的编码风格指南 GEGL和 babl 官方表示,...
-
下一篇
DeepMind 收购并开源物理模拟引擎 MuJoCo
近日,谷歌旗下的人工智能实验室 DeepMind 宣布,它已经收购并发布了 MuJoCo 物理模拟引擎,在 Github上开放源代码,免费提供给研究人员。 MuJoCo 全名 multi-joint dynamics with contact,是一款多关节接触动力学的物理模拟引擎,由神经科学家 Emo Todorov 开发,最初是用于解决最优控制、状态估计和系统识别等领域的问题。2015 年被 Robi LLC 打造成商业产品后,被广泛使用于机器人动作研究领域,而 DeepMind 机器人团队的首选物理模拟引擎正是 MuJoCo。 MuJoCo 一直都需要付费使用,而且价格非常昂贵,DeepMind估计也是觉得它的商业版收费太高了,最近发财了之后(注:DeepMind 在 2020 年实现了第一次盈利:4380 万英镑,2019年还在亏钱),直接就把 Robi LLC整个公司都买过来了。买下 Robi LLC 的第一时间,DeepMind 就公布了 MuJoCo 的开源计划。 此前, Deepmind 创始人Demis Hassabis表示,公司的初衷就是用人工智能推动科学发展,造福于...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- MySQL数据库在高并发下的优化方案
- CentOS7,CentOS8安装Elasticsearch6.8.6
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- CentOS7设置SWAP分区,小内存服务器的救世主
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- SpringBoot2全家桶,快速入门学习开发网站教程


微信收款码
支付宝收款码