Flink 事件时间的陷进及解决思路
0x1 摘要
大家都知道Flink引入了事件时间(eventTime)这个重要概念,来提升数据统计的准确性,但引入事件时间后在具体业务实现时存在一些问题必需要合理去解决,否则会造成非常严重的问题。
0x2 Flink 时间概念介绍
Flink 支持不同的时间概念,包括:
- Event Time :事件时间
- Processing Time :处理时间
- Ingestion Time :消息提取时间
参考下图可以清晰的知道这三者的关系:Ingestion Time
是介于Event Time
和Processing Time
之间的概念。
程序中可以通过env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
指定使用时间类型。
0x3 事件时间存在的问题
事件时间存在什么样的问题呢?下面先看一个简单的业务场景。
比如:要统计APP上搜索按钮每1分钟的点击次数。
前端埋点数据结构:
字段名 | 字段类型 | 描述 |
---|---|---|
eventCode | String | 事件编码 |
clickTime | Long | 点击时间 |
基于以上数据结构我们可设计如下水印处理器:
public static class TimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> { private long currentMaxTimestamp = 0L; @Override public Watermark getCurrentWatermark() { return new Watermark(currentMaxTimestamp -3000); } @Override public long extractTimestamp(Tuple2<String, Long> tuple, long previousElementTimestamp) { long eventTime = tuple.f1; currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTime); return eventTime; } }
extractTimestamp
方法会拿事件时间和上一次事件时间比较,并取较大值来更新当前水印值。
假设前端发送了以下这些数据,方便直观看数据clickTime直接采用格式化后的值,并以逗号分隔数据。
001,2018-12-17 13:30:00 001,2018-12-17 13:30:01 001,2018-12-17 13:30:02 001,2018-12-18 13:30:00 001,2018-12-17 13:30:03 001,2018-12-17 13:30:04 001,2018-12-17 13:30:05
正常数据都是17号,突然来了一条18号的数据,再结合上面的水印逻辑,一旦出现这种问题数据,直接导致水位上升到18号,后面再来17号的数据全部无法处理。针对业务来讲这样的错误是致命的,统计结果出现断层。
0x4 解决思路
针对以上问题我们可以对水印实现类做如下改造:
public static class TimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> { private long currentMaxTimestamp = 0L; @Override public Watermark getCurrentWatermark() { return new Watermark(currentMaxTimestamp -3000); } @Override public long extractTimestamp(Tuple2<String, Long> tuple, long previousElementTimestamp) { long eventTime = tuple.f1; if((currentMaxTimestamp == 0) || (eventTime - currentMaxTimestamp < MESSAGE_FORWARD_TIME)) { currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTime); } return eventTime; } }
MESSAGE_FORWARD_TIME
变量是自定义的消息最大跳跃时间,如果超出这个范围则不更新最大水印时间。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
阿里云大数据ACP认证知识点梳理9——产品特点(DATA WORKS)
DataWorks(数据工场,原大数据开发套件)是阿里云数加重要的PaaS平台产品,它提供全面托管的工作流服务,一站式开发管理的界面,帮助企业专注于数据价值的挖掘和探索。 DataWorks(数据工场)基于MaxCompute作为核心的计算、存储引擎,提供了海量数据的离线加工分析、数据挖掘的能力. 使用DataWorks(数据工场),可对数据进行数据传输、数据转换等相关操作,从不同的数据存储引入数据,对数据进行转化处理,最后将数据提取到其他数据系统。 提供强大的调度能力,支持按照时间、依赖关系的任务触发机制,支持每日千万级别的任务按照DAG关系准确、准时运行。支持分钟、小时、天、周和月多种调度周期配置。(分钟的最小单位是5分钟) 完全托管的服务,无需关心调度服务器资源问题。租户之间提供隔离,保证不同租户之间的任务不会相互影响。 支持数据同步
- 下一篇
JSON是什么,为什么这么流行?
1JSON是什么? 前几天分享了[《Spring Boot 返回 JSON 数据,一分钟搞定!》](https://mp.weixin.qq.com/s/cFztjzQttMwBQJqAowUZ2A),好些人对 JSON 还没有一个清晰的认识,今天栈长带大家来认识一下什么是JSON。 有一种叫做JSON (JavaScript Object Notation) 的轻量级数据交换格式能够替代XML的工作。它就是JSON。 数据格式比较简单, 易于读写, 格式都是压缩的, 占用带宽小,易于解析这种语言。 客户端JavaScript可以简单的通过eval()进行JSON数据的读取,包括ActionScript, C, C#, ColdFusion,Java,JavaScript,Perl,PHP,Python,Ruby等语言服务器端语言, 便于服务器端的解析。 各语言对JSON支持的特别好,自从Ajax的流行,JSON格式传输就更流行了。JSON常被用作序列化,推荐阅读:关于Java序列化你应该知道的一切。 2 如果到这里你还不明白? JSON是什么,那么我就发大招了! 其实我在为公司面试的...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS关闭SELinux安全模块
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- SpringBoot2整合Redis,开启缓存,提高访问速度
- CentOS7设置SWAP分区,小内存服务器的救世主
- CentOS7,8上快速安装Gitea,搭建Git服务器
- CentOS7安装Docker,走上虚拟化容器引擎之路
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装