Flink落HDFS数据按事件时间分区解决方案
0x1 摘要
Hive离线数仓中为了查询分析方便,几乎所有表都会划分分区,最为常见的是按天分区,Flink通过以下配置把数据写入HDFS,
BucketingSink<Object> sink = new BucketingSink<>(path); //通过这样的方式来实现数据跨天分区 sink.setBucketer(new DateTimeBucketer<>("yyyy/MM/dd")); sink.setWriter(new StringWriter<>()); sink.setBatchSize(1024 * 1024 * 256L); sink.setBatchRolloverInterval(30 * 60 * 1000L); sink.setInactiveBucketThreshold(3 * 60 * 1000L); sink.setInactiveBucketCheckInterval(30 * 1000L); sink.setInProgressSuffix(".in-progress"); sink.setPendingSuffix(".pending");
0x2 问题点
如果要做到数据完全正确的落到相应分区,那必须用eventTime
来划分,我们先来看看DateTimeBucketer
桶实现代码,
public class DateTimeBucketer<T> implements Bucketer<T> { private static final long serialVersionUID = 1L; private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH"; private final String formatString; private final ZoneId zoneId; private transient DateTimeFormatter dateTimeFormatter; /** * Creates a new {@code DateTimeBucketer} with format string {@code "yyyy-MM-dd--HH"} using JVM's default timezone. */ public DateTimeBucketer() { this(DEFAULT_FORMAT_STRING); } /** * Creates a new {@code DateTimeBucketer} with the given date/time format string using JVM's default timezone. * * @param formatString The format string that will be given to {@code DateTimeFormatter} to determine * the bucket path. */ public DateTimeBucketer(String formatString) { this(formatString, ZoneId.systemDefault()); } /** * Creates a new {@code DateTimeBucketer} with the given date/time format string using the given timezone. * * @param formatString The format string that will be given to {@code DateTimeFormatter} to determine * the bucket path. * @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket path. */ public DateTimeBucketer(String formatString, ZoneId zoneId) { this.formatString = Preconditions.checkNotNull(formatString); this.zoneId = Preconditions.checkNotNull(zoneId); this.dateTimeFormatter = DateTimeFormatter.ofPattern(this.formatString).withZone(zoneId); } @Override public Path getBucketPath(Clock clock, Path basePath, T element) { //分桶关键代码在这里,通过clock获取当前时间戳后格式 String newDateTimeString = dateTimeFormatter.format(Instant.ofEpochMilli(clock.currentTimeMillis())); return new Path(basePath + "/" + newDateTimeString); } }
以上代码clock
实例是在BucketingSink#open
方法中实例化,代码如下:
this.clock = new Clock() { @Override public long currentTimeMillis() { //直接返回当前处理时间 return processingTimeService.getCurrentProcessingTime(); } };
结合以上源码分析发现,使用DateTimeBucketer
分桶是采用当前处理时间,采用当前处理时间必然会跟事件事件存在差异,因此会导致数据跨分区落入HDFS文件,举个例子,假设有一条数据事件时间是2019-09-29 23:59:58
,那这条数据应该落在2019/09/29
分区,但由于这条数据延迟了3秒过来,当处理过来时当前处理时间已经是2019-09-30 00:00:01
,所以这条数据会被落到2019/09/30
分区,针对一些重要场景数据这样的结果是不可接受的。
0x3 解决方案
从以上第二节源码分析可以看出,解决问题的核心在getBucketPath
方法中时间的获取,只要把这里的时间改为事件即可,而正好这个方法的第三参数就是element
,代表每一条记录,只要记录中有事件时间就可以获取。既然现有的实现源码不好改,那我们可以自己基于Bucketer
接口实现一个EventTimeBucketer
分桶器,实现源码如下:
public class EventTimeBucketer implements Bucketer<BaseCountVO> { private static final String DEFAULT_FORMAT_STRING = "yyyy/MM/dd"; private final String formatString; private final ZoneId zoneId; private transient DateTimeFormatter dateTimeFormatter; public EventTimeBucketer() { this(DEFAULT_FORMAT_STRING); } public EventTimeBucketer(String formatString) { this(formatString, ZoneId.systemDefault()); } public EventTimeBucketer(ZoneId zoneId) { this(DEFAULT_FORMAT_STRING, zoneId); } public EventTimeBucketer(String formatString, ZoneId zoneId) { this.formatString = formatString; this.zoneId = zoneId; this.dateTimeFormatter = DateTimeFormatter.ofPattern(this.formatString).withZone(this.zoneId); } //记住,这个方法一定要加,否则dateTimeFormatter对象会是空,此方法会在反序列的时候调用,这样才能正确初始化dateTimeFormatter对象 //那有的人问了,上面构造函数不是初始化了吗?反序列化的时候是不走构造函数的 private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); this.dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId); } @Override public Path getBucketPath(Clock clock, Path basePath, BaseCountVO element) { String newDateTimeString = dateTimeFormatter.format(Instant.ofEpochMilli(element.getTimestamp())); return new Path(basePath + "/" + newDateTimeString); } }
大家实际项目中可以把BaseCountVO
改成自己的实体类即可,使用的时候只要换一下setBucketer
值,代码如下:
BucketingSink<Object> sink = new BucketingSink<>(path); //通过这样的方式来实现数据跨天分区 sink.setBucketer(new EventTimeBucketer<>("yyyy/MM/dd")); sink.setWriter(new StringWriter<>()); sink.setBatchSize(1024 * 1024 * 256L); sink.setBatchRolloverInterval(30 * 60 * 1000L); sink.setInactiveBucketThreshold(3 * 60 * 1000L); sink.setInactiveBucketCheckInterval(30 * 1000L); sink.setInProgressSuffix(".in-progress"); sink.setPendingSuffix(".pending");
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
MongoDB Spark Connector 实战指南
Why Spark with MongoDB? 高性能,官方号称 100x faster,因为可以全内存运行,性能提升肯定是很明显的 简单易用,支持 Java、Python、Scala、SQL 等多种语言,使得构建分析应用非常简单 统一构建 ,支持多种数据源,通过 Spark RDD 屏蔽底层数据差异,同一个分析应用可运行于不同的数据源; 应用场景广泛,能同时支持批处理以及流式处理 MongoDB Spark Connector 为官方推出,用于适配 Spark 操作 MongoDB 数据;本文以 Python 为例,介绍 MongoDB Spark Connector 的使用,帮助你基于 MongoDB 构建第一个分析应用。 准备 MongoDB 环境 安装 MongoDB 参考 Install MongoDB Community Ed
- 下一篇
MaxCompute问答整理之9月
本文是基于本人对MaxCompute产品的学习进度,再结合开发者社区里面的一些问题,进而整理成文。希望对大家有所帮助。 问题一、如何查看information_schema的tables?在使用ODPS建表时,有可能会建出几千张表,那我们寻找需要的表时就需要知道表名称,可以在数据地图中查看表,也可以使用Pyodps批量获取表名称。具体可参考文档:https://help.aliyun.com/document_detail/90412.html 问题二、不小心drop删除表可以恢复吗?不可以。在客户端和IDE中drop表是一个不可逆操作。表操作要谨慎。 问题三、在哪里可以看到所有执行的SQL?通过Information_Schema元数据的TASKS_HISTORY明细来查,元数据服务Information_Schema已经全面开放,大
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS7,CentOS8安装Elasticsearch6.8.6
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS7设置SWAP分区,小内存服务器的救世主
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题