数栈基于Flink CEP与规则热更新扩展的深度解析
本文通过实际案例深入探讨了 Flink CEP 在复杂事件处理中的核心作用,详细分析了其优缺点,并探讨了在实时计算平台中规则热更新的重要性和创新实现方式,旨在帮助读者更好地理解和应用 Flink CEP。 Flink CEP 1.1 什么是 Flink CEP FlinkCEP 是在 Flink 上层实现的复杂事件处理库。它可以让你在无限事件流中检测出特定的事件模型,并允许用户做出针对性的处理。它更多被应用在实时营销、实时风控和物联网等场景。 在传统的数据处理中,用户更多关注于单一数据的特征,而忽略了数据间的特征。随着业务的不断扩展和数据量的快速增长,用户在某些业务场景中不再满足于单一数据特征的处理,而是需要基于多个数据特征之间的关联进行更复杂的响应处理。 Flink CEP 则基于此需求而诞生。在 Flink CEP 中,每一个数据被视为一个事件,由众多数据组成的数据流就是一串事件流,Flink CEP 能够根据用户提供的事件流匹配规则,在一串事件流中进行摄取。 

Pattern<Event, ?> pattern = Pattern.begin("start") .where( new SimpleCondition() { @Override public boolean filter (Event event) { return event.getId() == 42; } ) .next("middle").where( new SimpleCondition { @Override public boolean filter(SubEvent subEvent) { return subEvent.getVolume() >= 10.0; } } ).next("end" ).where ( new SimpleCondition() { @Override public boolean filter (Event event) { return event.getName().equals("end"); } } );
PatternStream patternStream = CEP.pattern(input, pattern); DataStream result = patternStream.process ( new PatternProcessFunction<Event, Double>() { @Override public void processMatch( Map<String, List> pattern, Context ctx, Collector out) throws Exception { Double total = 0d; for (Map.Entry<String, List> entry : pattern.entrySet()) ‹ total = total + entry.getValue().get(0).getVolume(); } out.collect(total); }); 基于 SQL 开发 CREATE TABLE source ( id INT, name VARCHAR, volume DOUBLE procTime AS PROCTIME () ) WITH ( 'connector' = 'kafka', 'topic' = 'dtstack', 'properties.group.id' = 'dtstack', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = '127.0.0.1:9092', 'format'= 'json' )
SELECT total FROM source MATCH_RECOGNIZE ( ORDER BY procTime MEASURES A. volume + B.volume + C.volume as total ONE ROW PER MATCH PATTERN (A B C) DEFINE A AS A. id == 42, B AS B. volume >= 10.0 C AS C. name == 'end' ) 在 Data Stream API 的案例中,3-25 行的代码描述了子流的一个捕获规则,28-40 行的代码描述了对捕获到的子流如何处理的,并将处理完成后的数据发送给下游的算子。 在 SQL 案例中 ,22-26 行 SQL 描述了子流的一个捕获规则,19-20 行的 SQL 描述了对捕获到的子流如何处理的。 如果用户对于上述 CEP 规则有变更,则需要修改相对应的代码或者 SQL。 2.2 Flink CEP 规则热更新使用案例 2.2.1 任务本体和规则剥离 Flink CEP 规则热更新功能在上层提供了相应的 Data Stream API 以及的 SQL 语法扩展用于用户编写能够使用 Flink CEP 热更新的程序。用户所需要做的是对上述代码进行改造,将代码中的规则(子流捕获逻辑)以及规则处理逻辑的代码片段去掉,因为这两块逻辑将储存在外部存储中。 对于 Data Stream API DataStream input = ...;
PatternStream output = CEP.dynamicPatterns( input, new JDBCPeriodicPatternProcessorDiscovererFactory<>( JDBC_URL, JDBC_DRIVE, TABLE_NAME, null, JDBC_INTERVAL_MILLIS), TimeBehaviour.ProcessingTime, TypeInformation.of(new TypeHint(){}) ) 对于 SQL CREATE TABLE source ( id INT, name VARCHAR, volume DOUBLE procTime AS PROCTIME () ) WITH ( 'connector' = 'kafka', 'topic' = 'dtstack', 'properties.group.id' = 'dtstack', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = '127.0.0.1:9092', 'format'= 'json' )
SELECT total FROM source DYNAMIC MATCH_RECOGNIZE ( ORDER BY procTime OUTPUT (total double) WITH_PATTERN ( 'tableName' = 'dynamic_cep', 'user' = 'dtstack', 'password' = '***', 'driver' = 'com.mysql.cj jdbc.Driver', 'jdbcUrl' = 'jdbc:mysql://127.0.0.1:3306/cep', 'jdbcIntervalMillis' = '1000' ) ) AS T; 将规则从任务本体中剥离之后,任务侧可以单独通过数栈提交到 Yarn 上,但是此时若用户还没有为任务设置规则,任务将会进入一种 block 的状态,不会从对上游的数据进行处理。 
"name": "rule",
"type": "COMPOSITE", "edges": [{
"source": "start",
"target": "middle",
"type": "SKIP_TILL_NEXT"
},{
"source": "middle",
"target": "end",
"type": "SKIP_TILL_NEXT"
}],
"nodes": [{
"name": "start",
"condition": {
"expression": "id == 45",
"type": "AVIATOR"
},
"type": "ATOMIC"
},
{
"name": "middle",
"condition": {
"expression": "volume >= 10.0",
"type": "AVIATOR"
},
"type": "ATOMIC"
}, {
"name": "end",
"condition": {
"expression": "name == 'end'",
"type": "AVIATOR"
},
"type": "ATOMIC"
}
]...
} Flink Runtime 层通过解析 JSON 并将其转换为 Graph 用于描述规则状态机的状态转换,规则内部的匹配条件使用 Google Aviator 对表达式进行解析并求值计算。 在运行阶段 Flink Jobmanager 一旦检测到 CEP 规则内容变更后,会对新的规则进行校验然后会推送规则更新的系统事件给复数个 Flink Taskmanager,其中规则更新的系统事件中包含着新规则的详细描述信息。Taskmanager 在接受到规则更新的系统事件后会从其中获得新规则的信息并将其应用给 CEP 算子,之后 CEP 算子会将新的规则设置到算子内部的状态机中,完成对事件规则的热更新。 整个更新过程中不涉及到任务的重启,在几秒内就能完成更新,在新规则未更新完成前任务将依旧以旧规则运行直到新规则替换旧规则,新旧规则可以在用户无感的情况下完成替换。