Flink WindowOperator 源码分析
0x1 摘要
WindowOperator可以说是Flink窗口功能非常核心核心的类,是窗口功能源码的一条主线,延着这条主线去慢慢看源码会轻松很多。注:此文基于Flink 1.4.2 版本源码。
0x2 WindowOperator 类结构分析
先来看一下类结构图,可以使用idea来生成类图,下图经过稍微加工,去掉一些不重要类的结构图:
我们核心重点关注以下一个接口:
- OneInputStreamOperator
public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT> { /** * Processes one element that arrived at this operator. * This method is guaranteed to not be called concurrently with other methods of the operator. */ void processElement(StreamRecord<IN> element) throws Exception; /** * Processes a {@link Watermark}. * This method is guaranteed to not be called concurrently with other methods of the operator. * * @see org.apache.flink.streaming.api.watermark.Watermark */ void processWatermark(Watermark mark) throws Exception; void processLatencyMarker(LatencyMarker latencyMarker) throws Exception; }
0x3 OneInputStreamOperator 具体实现分析
此接口三个方法WindowOperator
类只实现了processElement
方法,其余两个方法实现全部在AbstractStreamOperator
抽象类中,此文不去讲解,此文重点介绍processElement
方法,这个方法也是最重要的方法。
从方法注释可以看出,每一条消息过来都会调用此方法,此方法主体很清晰,看下面条件判断语句:
final Collection<W> elementWindows = windowAssigner.assignWindows( element.getValue(), element.getTimestamp(), windowAssignerContext); //if element is handled by none of assigned elementWindows boolean isSkippedElement = true; final K key = this.<K>getKeyedStateBackend().getCurrentKey(); if (windowAssigner instanceof MergingWindowAssigner) { ... } else { ... } // side output input event if // element not handled by any window // late arriving tag has been set // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp if (isSkippedElement && isElementLate(element)) { if (lateDataOutputTag != null){ sideOutput(element); } else { this.numLateRecordsDropped.inc(); } }
分为合并窗口分配器和非合并窗口分配器,我们平时使用的TumblingProcessingTimeWindows
都属于非合并窗口,今天就介绍非合并窗口,即代码中else
逻辑。
原代码如下:
for (W window: elementWindows) { // drop if the window is already late if (isWindowLate(window)) { continue; } isSkippedElement = false; windowState.setCurrentNamespace(window); windowState.add(element.getValue()); triggerContext.key = key; triggerContext.window = window; TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) { ACC contents = windowState.get(); if (contents == null) { continue; } emitWindowContents(window, contents); } if (triggerResult.isPurge()) { windowState.clear(); } registerCleanupTimer(window); }
第一步:判断窗口是否延迟,如果延迟直接踩过,判断延迟的逻辑相对简单可自行查看源码
第二步:设置isSkippedElement
标志位,此标志位等于false
说明,当前元素可以匹配到窗口,true
说明匹配不到窗口,后面会有处理逻辑
第三步:下面四行代码是一些状态设置
第四步:根据当前元素返回一个触发器结果
第五步:判断触发器结果是否需要执行,如果需要执行,则调用emitWindowContents
方法执行
第六步:判断是否需要清理窗口状态信息
第七步:注册清除定时器
protected void registerCleanupTimer(W window) { long cleanupTime = cleanupTime(window); if (cleanupTime == Long.MAX_VALUE) { // don't set a GC timer for "end of time" return; } if (windowAssigner.isEventTime()) { triggerContext.registerEventTimeTimer(cleanupTime); } else { triggerContext.registerProcessingTimeTimer(cleanupTime); } }
首先计算清除时间:
private long cleanupTime(W window) { if (windowAssigner.isEventTime()) { long cleanupTime = window.maxTimestamp() + allowedLateness; return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE; } else { return window.maxTimestamp(); } }
如果是事件时间则需要算上允许延迟时间,调用triggerContext
注册Time
注:processElement
方法开头代码
final Collection<W> elementWindows = windowAssigner.assignWindows( element.getValue(), element.getTimestamp(), windowAssignerContext);
这段代码是窗口的分配,后面单独文章来分析窗口分配实现原理。
0x4 结束语
整个WindowOperator
核心流程代码不多,但代码量还是比较大,里面涉及到窗口分配、时间触发器,每个点都涉及比较多的源码,不能一次性去讲完,需要慢慢去挖。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Schedulerx2.0支持MapReduce模型
1. 前言 Schedulerx2.0提供了map模型,通过一个map方法就能将海量数据分布式到多台机器上分布式执行,随着业务方的深入使用,又提出了更多的需求,比如: 监听所有子任务完成的事件 处理所有子任务返回的订单号 汇总结果进行工作流数据传输 2. 简介 MapReduce模型是Map模型的扩展,废弃了postProcess方法,新增reduce接口,需要实现MapReduceJobProcessor。 MapReduce模型只有一个reduce,所有子任务完成后会执行reduce方法,可以在reduce方法中返回该任务实例的执行结果,作为工作流的上下游数据传递。如果有子任务失败,reduce不会执行。 MapReduce模型,还能处理所有子任务的结果。子任务通过return ProcessResult(true, result)返回结果(
- 下一篇
突破Java面试(17)-ElasticSearch的部署架构
1 面试题 ES集群部署架构如何 每个索引的数据量大概多少 每个索引大概有多少分片 2 考点分析 问你生产环境咋部署的,说白了,没啥技术含量,就是看你有没有在真正的生产环境里做过ES! 有些同学可能没在生产环境做过,没在线上部署过ES集群,也没实际玩过,也没往ES集群里面导入过几千万甚至是几亿的数据量,可能你就不太清楚这里面的一些生产项目中的细节 如果你是自己就玩过demo,没碰过真实的ES集群,那你可能此时会懵,但是别怕!你一定要云淡风轻的回答,表示你确实干过ES! 3 详解 如果你确实干过ES,那你肯定了解你们生产es集群的实际情况,部署了几台机器?有多少个索引?每个索引有多大数据量?每个索引给了多少个分片?你肯定知道! 但是如果你确实没干过,也别虚,我给你说一个基本的版本,你到时候就简单说一下就好了 ES生产集群我们部署了5台机器,每台机器是6核
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2全家桶,快速入门学习开发网站教程
- Docker安装Oracle12C,快速搭建Oracle学习环境
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- 设置Eclipse缩进为4个空格,增强代码规范
- CentOS7,CentOS8安装Elasticsearch6.8.6