Flink+ice 实现可视化规则编排与灵活配置(Demo)
ice文档站:http://124.221.148.247/zh
1 Demo仓库地址:
github:https://github.com/zjn-zjn/flink-ice
gitee:https://gitee.com/waitmoon/flink-ice
2 Demo功能描述
通过netcat制造输入流(nc -l 9000 windows:nc -l -p 9000)
flink接收本地9000端口输入流,以回车(\n)分割单词
输入流经过IceProcessor处理后打印结果流
3 项目搭建
使用flink-quickstart-java快速搭建flink项目
3.1 添加ice依赖
因flink为非Spring项目,需依赖ice-core并手动初始化,Spring项目直接依赖ice-client-spring-boot-starter即可
<dependency> <groupId>com.waitmoon.ice</groupId> <artifactId>ice-core</artifactId> <version>${ice.version}</version> </dependency>
3.2 编写StreamingJob
public class StreamingJob { public static void main(String[] args) throws Exception { // 创建 Flink 执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //接收本地socket9000端口输入流,以回车分割单词 //通过netcat制造输入流 nc -l 9000 (windows nc -l -p 9000) DataStreamSource<String> stream = env.socketTextStream("localhost", 9000, "\n"); //按照单词长度keyBy,使用IceProcessor并打印结果 stream.keyBy(String::length).process(new IceProcessor()).print().setParallelism(1); //执行程序 env.execute("Flink Streaming Java API Skeleton"); } }
3.3 编写ice算子IceProcessor
在static代码块中初始化ice客户端,此处直接使用的自己部署的ice-server地址对应的app:2
算子功能: 将流内数据放入roam,组装pack并执行ice规则处理(直接根据iceId触发,iceId在server配置后台获取)
/** * ice算子 */ public class IceProcessor extends KeyedProcessFunction<Integer, String, String> { //ice 客户端 private static IceNioClient iceNioClient; static { //初始化ice客户端 try { //配置远程server地址,app,以及节点扫描路径 //此处使用了自己搭建的server,后台地址 http://eg.waitmoon.com/config/list/2 iceNioClient = new IceNioClient(2, "waitmoon.com:18121", "com.waitmoon.flink.ice.node"); //启动ice客户端 iceNioClient.start(); } catch (Exception e) { throw new RuntimeException(e); } } @Override public void processElement(String value, Context ctx, Collector<String> out) { //组装IcePack IcePack pack = new IcePack(); //设置要触发的iceId(配置后台中需要触发的ID) //http://eg.waitmoon.com/config/detail/2/1081 pack.setIceId(1081); //初始化roam,将单词和长度放入roam中 IceRoam roam = new IceRoam(); roam.put("input", value); roam.put("length", ctx.getCurrentKey()); pack.setRoam(roam); //同步执行 Ice.syncProcess(pack); //执行完成后,获取roam中的result String result = roam.getMulti("result"); if (result != null) { //result不为空,将结果放入下游算子 out.collect(result); } } @Override public void close() { if (iceNioClient != null) { //清理ice 客户端 iceNioClient.destroy(); iceNioClient = null; } } }
3.4 编写节点ContainsFlow
节点功能: 判断根据key去roam里拿的值是否在set中,是则返回true,否则返回false
/** * @author waitmoon * 过滤性质节点 * 判断值在不在集合中 */ @Data @Slf4j @EqualsAndHashCode(callSuper = true) public class ContainsFlow extends BaseLeafRoamFlow { //默认input private String key = "input"; private Set<String> set; @Override protected boolean doRoamFlow(IceRoam roam) { //判断roam中的key对应的值是否在集合中 return set.contains(roam.<String>getMulti(key)); } @Override public void afterPropertiesSet() { log.info("ContainsFlow init with key:{}, set:{} nodeId:{}", key, set, this.getIceNodeId()); } public NodeRunStateEnum errorHandle(IceContext ctx, Throwable t) { log.error("error occur id:{} e:", this.findIceNodeId(), t); return super.errorHandle(ctx, t); } }
3.5 编写节点PutNone
节点功能: 将value值放入roam的key中,不干扰流程(不返回true/false)
/** * @author waitmoon * 不干扰流程性质节点 * 将一个值放入roam */ @Data @EqualsAndHashCode(callSuper = true) public class PutNone extends BaseLeafRoamNone { //默认result private String key = "result"; private Object value; @Override protected void doRoamNone(IceRoam roam) { //将value放到roam中 roam.putMulti(key, value); } }
4 项目启动
4.1 netcat 制造输入流
mac/linux 使用 nc -l 9000命令,windows使用 nc -l -p 9000 命令 制造一个Socket输入流
4.2 运行StreamingJob
运行时可以看到ice客户端启动相关信息
5 编排ice规则
在ice-server后台编辑ice规则,用的是自己部署的ice-server,地址:http://124.221.148.247:8121
5.1 新增app
5.2 新增ice
此处Debug填2表示只打印节点执行过程,pack中的iceId即为此处的ID,点击查看详情即可编排规则
5.3编排ice规则
此编排实现逻辑:根据不同的输入单词,输出对应的结果到roam的result字段中供后续使用
如输入waitmoon,在管理员列表中,则输出"you are admin~"到roam的result字段,并最终由flink打印
6 发布与执行
在编排完规则后切记要发布后才会将变更推送到客户端并生效!!!
在终端输入单词并回车
在flink项目日志里可以看到:
ice打印了执行过程,[节点ID:节点类名简称:节点执行结果:节点执行耗时]
flink因为最后的sink是print(),所以打印了对应的输出。
这时候你就可以随意的更改与编排规则去实现自己的业务啦~~~

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
MASA Framework的分布式锁设计
前言 什么是锁?什么是分布式锁?它们之间有什么样的关系? 什么是锁 加锁(lock)是2018年公布的计算机科学技术名词,是指将控制变量置位,控制共享资源不能被其他线程访问。通过加锁,可以确保在同一时刻只有一个线程在访问被锁住的代码片段,我们在单机部署时可使用最简单的加锁完成资源的独享,如: public class Program { private static readonly object Obj = new { }; public static void Main() { lock (obj) { //同一时刻只有一个线程可以访问 } } } 什么是分布式锁 但随着业务发展的需要,原单体单机部署的系统被部署成分布式集群系统后,原来的并发控制策略失效,为了解决这个问题就需要引入分布式锁,那分布式锁应该具备哪些条件? 原子性:在分布式环境下,一个方法在同一个时间点只能被一台机器下的一个线程所执行,防止数据资源的并发访问,避免数据不一致情况 高可用:具备自动失效机制,防止死锁,获取锁后如果出现错误,并且无法释放锁,则使用租约一段时间后自动释放锁 阻塞性:具...
- 下一篇
系统实时性评估指标-中断延迟简介
实时系统的主要特点是必须保证处理结果的时间确定性,我们通常使用基准程序法对其进行性能指标评估。通过对实时系统的性能评估,就可以确认系统的时间确定性、可靠性、稳定性等指标。 衡量实时操作系统实时性能的重要指标有很多,本文将对运用最为广泛的指标之一,中断延迟时间,进行介绍。那么什么是中断延迟?如何测得实时操作系统的中断延迟呢?让我们一起来看看吧! 什么是中断延迟? 中断延迟(Interrupt Latency)是指从硬件中断发生到开始执行中断处理程序第一条指令之间的这段时间。也就是计算机接收到中断信号到操作系统作出响应,并完成换到转入中断服务程序的时间。 由于外部事件的发生常常是以一个中断申请信号的形式来通知处理器,然后才运行中断服务程序中来处理该事件,所以中断延迟是影响系统实时性的一个重要因素。 为了进一步描述清楚中断延迟,我们把中断延迟的时间分为以下三种: 识别中断时间:外界硬件发生了中断后,CPU到中断处理器读取中断向量,并且查找中断向量表,找到对应的中断服务子程序(ISR)的首地址,然后跳转到对应的ISR去做相应处理。 等待中断打开时间:在允许中断嵌套的实时操作系统中,中断也是基于...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2全家桶,快速入门学习开发网站教程
- Docker安装Oracle12C,快速搭建Oracle学习环境
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- 设置Eclipse缩进为4个空格,增强代码规范
- CentOS7,CentOS8安装Elasticsearch6.8.6