数栈基于Flink CEP与规则热更新扩展的深度解析
本文通过实际案例深入探讨了 Flink CEP 在复杂事件处理中的核心作用,详细分析了其优缺点,并探讨了在实时计算平台中规则热更新的重要性和创新实现方式,旨在帮助读者更好地理解和应用 Flink CEP。 Flink CEP 1.1 什么是 Flink CEP FlinkCEP 是在 Flink 上层实现的复杂事件处理库。它可以让你在无限事件流中检测出特定的事件模型,并允许用户做出针对性的处理。它更多被应用在实时营销、实时风控和物联网等场景。 在传统的数据处理中,用户更多关注于单一数据的特征,而忽略了数据间的特征。随着业务的不断扩展和数据量的快速增长,用户在某些业务场景中不再满足于单一数据特征的处理,而是需要基于多个数据特征之间的关联进行更复杂的响应处理。 Flink CEP 则基于此需求而诞生。在 Flink CEP 中,每一个数据被视为一个事件,由众多数据组成的数据流就是一串事件流,Flink CEP 能够根据用户提供的事件流匹配规则,在一串事件流中进行摄取。 在实际应用中,Flink CEP 的优势主要体现在以下几个方面: (1)高性能:Flink CEP 在底层使用 Flink 的分布式计算框架,能够高效地处理大规模数据流。它支持实时流处理和批处理,并能够根据业务需求进行灵活的扩展。 (2)事件模式匹配:Flink CEP 提供了一组事件模式匹配器,可以根据预设的模式规则来匹配事件序列中的特定模式。这些规则可以是简单的模式(如出现特定事件),也可以是复杂的模式(如多个事件按照特定顺序发生)。 (3)事件时间戳和时间窗口:Flink CEP 支持事件时间戳和时间窗口的概念,能够根据事件的实际时间戳进行时间相关的计算和分析。有助于处理具有时间相关性的复杂事件序列。 (4)自定义逻辑:Flink CEP 允许用户根据具体需求编写自定义逻辑来处理复杂事件。用户可以通过扩展 Flink CEP 的 API 来实现自己的事件处理逻辑,从而更好地满足业务需求。在实际案例中,Flink CEP 可以应用于多个领域。例如,在金融领域中,它可以用于实时监测交易异常行为,如识别出可能存在欺诈行为的交易模式;在物联网领域中,它可以用于实时分析传感器数据流,如监测设备的运行状态和预警故障;在社交媒体领域中,它可以用于实时分析用户行为和舆情趋势,为营销策略提供支持。 总的来说,Flink CEP 是一种强大的复杂事件处理工具,能够应对传统数据处理方式难以应对的问题。通过关注并捕获一系列有特定规律的事件,Flink CEP 可以帮助我们提供更深入的业务洞察和实时决策支持。未来,随着业务需求的不断变化和技术的发展,Flink CEP 在数据处理和分析中的应用将会更加广泛和深入。 1.2 Flink CEP 现有的缺点 用户可以通过 DataStream API 或者 Flink SQL 进行 Flink CEP 任务的编写,然后通过数栈将任务提交到 Yarn 上运行。但是对于一些复杂多变的应用场景,用户可能需要针对业务变更,频繁地对 Flink CEP 任务进行规则修改,规则修改之后又需要先停止正在运行中的任务,然后重新提交任务。这一业务变更流程通常至少需要5-10分钟的变更时间,在此期间没有办法对事件进行捕获处理。这样做的时间成本非常高,对于延迟敏感的作业来说是很难接受的。此外,如果我们的事件规则窗口设置的比较长,并且在运行过程中产生了比较大的状态的话,重启作业的时间、代价会进一步更高。 因此 Flink CEP 的规则热更新对于提高 Flink CEP 的生产可用能力是一个很重要的功能。
数栈对于 Flink CEP 规则热更新的扩展 数栈基于 Flink 1.16 并借鉴学习了业界现有的方案后实现了 Flink CEP 规则热更新。 2.1 Flink CEP 使用案例 在介绍 Flink CEP 规则热更新之前我们先来看下 Flink CEP 常规的 DataStream API 和 SQL 是怎么处理以下一个场景的: 针对于一条包含了复数个 Event(id int, volume double, name varchar) 对象的事件流,我们希望能在流中找到一条子流,该子流中第一个 Event 对象的 id 为 42,然后紧接着第二个 Event 对象的 volume 大于 10.0,然后紧接着第二个 Event 对象的 name 必须是 "end"。对于这样的一个子流,我们要获得子流中所有 value 的值的总和。 基于 Data Stream API 开发 DataStream input = ...;
Pattern<Event, ?> pattern = Pattern.begin("start") .where( new SimpleCondition() { @Override public boolean filter (Event event) { return event.getId() == 42; } ) .next("middle").where( new SimpleCondition { @Override public boolean filter(SubEvent subEvent) { return subEvent.getVolume() >= 10.0; } } ).next("end" ).where ( new SimpleCondition() { @Override public boolean filter (Event event) { return event.getName().equals("end"); } } );
PatternStream patternStream = CEP.pattern(input, pattern); DataStream result = patternStream.process ( new PatternProcessFunction<Event, Double>() { @Override public void processMatch( Map<String, List> pattern, Context ctx, Collector out) throws Exception { Double total = 0d; for (Map.Entry<String, List> entry : pattern.entrySet()) ‹ total = total + entry.getValue().get(0).getVolume(); } out.collect(total); }); 基于 SQL 开发 CREATE TABLE source ( id INT, name VARCHAR, volume DOUBLE procTime AS PROCTIME () ) WITH ( 'connector' = 'kafka', 'topic' = 'dtstack', 'properties.group.id' = 'dtstack', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = '127.0.0.1:9092', 'format'= 'json' )
SELECT total FROM source MATCH_RECOGNIZE ( ORDER BY procTime MEASURES A. volume + B.volume + C.volume as total ONE ROW PER MATCH PATTERN (A B C) DEFINE A AS A. id == 42, B AS B. volume >= 10.0 C AS C. name == 'end' ) 在 Data Stream API 的案例中,3-25 行的代码描述了子流的一个捕获规则,28-40 行的代码描述了对捕获到的子流如何处理的,并将处理完成后的数据发送给下游的算子。 在 SQL 案例中 ,22-26 行 SQL 描述了子流的一个捕获规则,19-20 行的 SQL 描述了对捕获到的子流如何处理的。 如果用户对于上述 CEP 规则有变更,则需要修改相对应的代码或者 SQL。 2.2 Flink CEP 规则热更新使用案例 2.2.1 任务本体和规则剥离 Flink CEP 规则热更新功能在上层提供了相应的 Data Stream API 以及的 SQL 语法扩展用于用户编写能够使用 Flink CEP 热更新的程序。用户所需要做的是对上述代码进行改造,将代码中的规则(子流捕获逻辑)以及规则处理逻辑的代码片段去掉,因为这两块逻辑将储存在外部存储中。 对于 Data Stream API DataStream input = ...;
PatternStream output = CEP.dynamicPatterns( input, new JDBCPeriodicPatternProcessorDiscovererFactory<>( JDBC_URL, JDBC_DRIVE, TABLE_NAME, null, JDBC_INTERVAL_MILLIS), TimeBehaviour.ProcessingTime, TypeInformation.of(new TypeHint(){}) ) 对于 SQL CREATE TABLE source ( id INT, name VARCHAR, volume DOUBLE procTime AS PROCTIME () ) WITH ( 'connector' = 'kafka', 'topic' = 'dtstack', 'properties.group.id' = 'dtstack', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = '127.0.0.1:9092', 'format'= 'json' )
SELECT total FROM source DYNAMIC MATCH_RECOGNIZE ( ORDER BY procTime OUTPUT (total double) WITH_PATTERN ( 'tableName' = 'dynamic_cep', 'user' = 'dtstack', 'password' = '***', 'driver' = 'com.mysql.cj jdbc.Driver', 'jdbcUrl' = 'jdbc:mysql://127.0.0.1:3306/cep', 'jdbcIntervalMillis' = '1000' ) ) AS T; 将规则从任务本体中剥离之后,任务侧可以单独通过数栈提交到 Yarn 上,但是此时若用户还没有为任务设置规则,任务将会进入一种 block 的状态,不会从对上游的数据进行处理。 2.2.2 CEP 规则 JSON 描述文件编写 外部化的规则将通过特定的 JSON 格式进行描述并将其存储到数栈中。用户在数栈上可以通过编辑器对 JSON 格式的规则进行编辑保存以及更新。 CEP 规则使用 JSON 描述文件编写。每一个规则可以被描述为一个规则节点,多个规则节点可以组合成一个有向图(Graph)。"图"可以被视为一种特殊的复合规则节点,因此"图"与"图"之间、"图"与规则节点之间也可以相互组合。 通过规则节点的配置用户可以匹配到 n 个数据流中的节点,再通过规则节点之间的组合搭配用户可以实现更加复杂的规则逻辑匹配。 比如下面的一个规则描述文件: 通过定义一个 COMPOSITE 类型的规则节点即上文中的"图","图"中的 edges 定义了"图"中的子节点的顺序:start -> middle -> end。 然后在其 nodes 中会定义每个节点的匹配细节:start 规则匹配一个 id 字段为 45 的事件,middle 规则匹配一个 volume >= 10.0 的事件,end 规则匹配一个 name 为 "end" 的事件。 {
"name": "rule",
"type": "COMPOSITE", "edges": [{
"source": "start",
"target": "middle",
"type": "SKIP_TILL_NEXT"
},{
"source": "middle",
"target": "end",
"type": "SKIP_TILL_NEXT"
}],
"nodes": [{
"name": "start",
"condition": {
"expression": "id == 45",
"type": "AVIATOR"
},
"type": "ATOMIC"
},
{
"name": "middle",
"condition": {
"expression": "volume >= 10.0",
"type": "AVIATOR"
},
"type": "ATOMIC"
}, {
"name": "end",
"condition": {
"expression": "name == 'end'",
"type": "AVIATOR"
},
"type": "ATOMIC"
}
]...
} Flink Runtime 层通过解析 JSON 并将其转换为 Graph 用于描述规则状态机的状态转换,规则内部的匹配条件使用 Google Aviator 对表达式进行解析并求值计算。 在运行阶段 Flink Jobmanager 一旦检测到 CEP 规则内容变更后,会对新的规则进行校验然后会推送规则更新的系统事件给复数个 Flink Taskmanager,其中规则更新的系统事件中包含着新规则的详细描述信息。Taskmanager 在接受到规则更新的系统事件后会从其中获得新规则的信息并将其应用给 CEP 算子,之后 CEP 算子会将新的规则设置到算子内部的状态机中,完成对事件规则的热更新。 整个更新过程中不涉及到任务的重启,在几秒内就能完成更新,在新规则未更新完成前任务将依旧以旧规则运行直到新规则替换旧规则,新旧规则可以在用户无感的情况下完成替换。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
2025年,微服务架构和大模型能“玩出”什么新花样?
2025 年开年,DeepSeek 开源模型以“低成本、高性能”成功掀起 AI 平价化浪潮,并以惊人的速度渗透至各个领域。在 AI 平价化浪潮的推动下,微服务架构正迎来前所未有的变革机遇。微服务架构通过将系统拆解为多个小型、独立的服务,每个服务运行在自己的进程中,负责特定的业务功能。与单体架构相比,微服务架构实现了更高的灵活性、可扩展性和可维护性,这些特性使其成为现代软件开发的首选。 然而,随着企业应用规模和复杂度的不断提升,微服务架构也面临着服务治理、数据一致性、性能监控等诸多挑战。如何借助大模型技术优化微服务架构,提升开发效率与系统稳定性,成为企业技术实践中的关键课题。 3 月 22 日,以《云原生 × AI 时代的微服务架构与技术实践》为主题的 CloudWeGo 技术沙龙将在深圳科技园希尔顿惠庭酒店正式启幕。本次活动汇聚了多位来自字节跳动、火山引擎、智谱 AI 的技术大咖,共同探讨云原生与 AI 技术融合趋势下,微服务架构的演进方向与最佳实践。 📍 时间:2025 年 3 月 22 日(周六)14:00-17:00 📍 地点:深圳·南山区科技园希尔顿惠庭酒店 📍 规模:1...
- 下一篇
「IvorySQL 用户案例征集令」:用你的故事,点亮开源之光!
🌟 活动背景 IvorySQL 作为新一代开源关系型数据库,每一次技术突破都离不开社区的共同推动。 现在,我们需要你的声音! 无论是企业级应用实践,还是个人项目的创新玩法,你的真实使用案例都将: 帮助全球开发者更好地了解 IvorySQL 推动核心功能的优化方向 构建更完善的开源生态图谱 📅 活动主题 & 时间 主题:"IvorySQL 用户案例征集" 征集时间:2025 年 2 月 26 日开始 本活动长期有效! 🎯 参与对象 正在使用 IvorySQL 的企业技术团队 基于 IvorySQL 开发应用的独立开发者 在学术研究中采用 IvorySQL 的院校师生 任何通过 IvorySQL 解决实际问题的社区成员 📝 参与方式 1.回复 GitHub Discussion 或参与问卷填写 只要您在使用 IvorySQL,就可以来参与 GitHub Discussion 的讨论,告诉我们是谁在用 IvorySQL。我们设置了回复参考模板,以减轻您的参与成本。 务必留下您的使用邮箱,方便我们与您取得联系。后续我们将按照您的回复情况,给您发放对应的奖品。 如果您不习惯使用 ...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- 设置Eclipse缩进为4个空格,增强代码规范
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS8安装Docker,最新的服务器搭配容器使用
- SpringBoot2全家桶,快速入门学习开发网站教程
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- CentOS7设置SWAP分区,小内存服务器的救世主
- Linux系统CentOS6、CentOS7手动修改IP地址