您现在的位置是:首页 > 文章详情

Flink落HDFS数据按事件时间分区解决方案

日期:2019-09-28点击:799

0x1 摘要

Hive离线数仓中为了查询分析方便,几乎所有表都会划分分区,最为常见的是按天分区,Flink通过以下配置把数据写入HDFS,

BucketingSink<Object> sink = new BucketingSink<>(path); //通过这样的方式来实现数据跨天分区 sink.setBucketer(new DateTimeBucketer<>("yyyy/MM/dd")); sink.setWriter(new StringWriter<>()); sink.setBatchSize(1024 * 1024 * 256L); sink.setBatchRolloverInterval(30 * 60 * 1000L); sink.setInactiveBucketThreshold(3 * 60 * 1000L); sink.setInactiveBucketCheckInterval(30 * 1000L); sink.setInProgressSuffix(".in-progress"); sink.setPendingSuffix(".pending");

0x2 问题点

如果要做到数据完全正确的落到相应分区,那必须用eventTime来划分,我们先来看看DateTimeBucketer桶实现代码,

public class DateTimeBucketer<T> implements Bucketer<T> { private static final long serialVersionUID = 1L; private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH"; private final String formatString; private final ZoneId zoneId; private transient DateTimeFormatter dateTimeFormatter; /** * Creates a new {@code DateTimeBucketer} with format string {@code "yyyy-MM-dd--HH"} using JVM's default timezone. */ public DateTimeBucketer() { this(DEFAULT_FORMAT_STRING); } /** * Creates a new {@code DateTimeBucketer} with the given date/time format string using JVM's default timezone. * * @param formatString The format string that will be given to {@code DateTimeFormatter} to determine * the bucket path. */ public DateTimeBucketer(String formatString) { this(formatString, ZoneId.systemDefault()); } /** * Creates a new {@code DateTimeBucketer} with the given date/time format string using the given timezone. * * @param formatString The format string that will be given to {@code DateTimeFormatter} to determine * the bucket path. * @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket path. */ public DateTimeBucketer(String formatString, ZoneId zoneId) { this.formatString = Preconditions.checkNotNull(formatString); this.zoneId = Preconditions.checkNotNull(zoneId); this.dateTimeFormatter = DateTimeFormatter.ofPattern(this.formatString).withZone(zoneId); } @Override public Path getBucketPath(Clock clock, Path basePath, T element) { //分桶关键代码在这里,通过clock获取当前时间戳后格式 String newDateTimeString = dateTimeFormatter.format(Instant.ofEpochMilli(clock.currentTimeMillis())); return new Path(basePath + "/" + newDateTimeString); } }

以上代码clock实例是在BucketingSink#open方法中实例化,代码如下:

this.clock = new Clock() { @Override public long currentTimeMillis() { //直接返回当前处理时间 return processingTimeService.getCurrentProcessingTime(); } };

结合以上源码分析发现,使用DateTimeBucketer分桶是采用当前处理时间,采用当前处理时间必然会跟事件事件存在差异,因此会导致数据跨分区落入HDFS文件,举个例子,假设有一条数据事件时间是2019-09-29 23:59:58,那这条数据应该落在2019/09/29分区,但由于这条数据延迟了3秒过来,当处理过来时当前处理时间已经是2019-09-30 00:00:01,所以这条数据会被落到2019/09/30分区,针对一些重要场景数据这样的结果是不可接受的。

0x3 解决方案

从以上第二节源码分析可以看出,解决问题的核心在getBucketPath方法中时间的获取,只要把这里的时间改为事件即可,而正好这个方法的第三参数就是element,代表每一条记录,只要记录中有事件时间就可以获取。既然现有的实现源码不好改,那我们可以自己基于Bucketer接口实现一个EventTimeBucketer分桶器,实现源码如下:

public class EventTimeBucketer implements Bucketer<BaseCountVO> { private static final String DEFAULT_FORMAT_STRING = "yyyy/MM/dd"; private final String formatString; private final ZoneId zoneId; private transient DateTimeFormatter dateTimeFormatter; public EventTimeBucketer() { this(DEFAULT_FORMAT_STRING); } public EventTimeBucketer(String formatString) { this(formatString, ZoneId.systemDefault()); } public EventTimeBucketer(ZoneId zoneId) { this(DEFAULT_FORMAT_STRING, zoneId); } public EventTimeBucketer(String formatString, ZoneId zoneId) { this.formatString = formatString; this.zoneId = zoneId; this.dateTimeFormatter = DateTimeFormatter.ofPattern(this.formatString).withZone(this.zoneId); } //记住,这个方法一定要加,否则dateTimeFormatter对象会是空,此方法会在反序列的时候调用,这样才能正确初始化dateTimeFormatter对象 //那有的人问了,上面构造函数不是初始化了吗?反序列化的时候是不走构造函数的 private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); this.dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId); } @Override public Path getBucketPath(Clock clock, Path basePath, BaseCountVO element) { String newDateTimeString = dateTimeFormatter.format(Instant.ofEpochMilli(element.getTimestamp())); return new Path(basePath + "/" + newDateTimeString); } }

大家实际项目中可以把BaseCountVO改成自己的实体类即可,使用的时候只要换一下setBucketer值,代码如下:

BucketingSink<Object> sink = new BucketingSink<>(path); //通过这样的方式来实现数据跨天分区 sink.setBucketer(new EventTimeBucketer<>("yyyy/MM/dd")); sink.setWriter(new StringWriter<>()); sink.setBatchSize(1024 * 1024 * 256L); sink.setBatchRolloverInterval(30 * 60 * 1000L); sink.setInactiveBucketThreshold(3 * 60 * 1000L); sink.setInactiveBucketCheckInterval(30 * 1000L); sink.setInProgressSuffix(".in-progress"); sink.setPendingSuffix(".pending");
原文链接:https://yq.aliyun.com/articles/719786
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章