您现在的位置是:首页 > 文章详情

Flink 1.4.2 版本踩过的坑

日期:2018-06-06点击:372

0x1 摘要

最近业务要实时统计半小时维度的UV、PV数据,经过调研准备用Flink时间窗来实现,主要是Flink对eventTime的支持,可以做到更精准的统计,由于第一次尝试使用Flink,所以过程中遇到不少问题,记录下来方便后续查阅。

0x2 执行计划输出JSON问题

Flink对执行计划分析提供了支持,可以通过代码将执行计划打出来,并利用官网提供的图生成工具可以方便分析,通过env.getExecutionPlan()方法可以获取JSON格式的执行计划,将JSON字符串拷贝到http://flink.apache.org/visualizer/网站文本框就可以查看。
但我们在项目中调用env.getExecutionPlan()方法后报以下异常信息:

Caused by: java.lang.IllegalArgumentException: Comparison method violates its general contract! at java.util.TimSort.mergeLo(TimSort.java:777) at java.util.TimSort.mergeAt(TimSort.java:514) at java.util.TimSort.mergeForceCollapse(TimSort.java:457) at java.util.TimSort.sort(TimSort.java:254) at java.util.Arrays.sort(Arrays.java:1512) at java.util.ArrayList.sort(ArrayList.java:1454) at java.util.Collections.sort(Collections.java:175) at org.apache.flink.streaming.api.graph.JSONGenerator.getJSON(JSONGenerator.java:61) at org.apache.flink.streaming.api.graph.StreamGraph.getStreamingPlanAsJSON(StreamGraph.java:663) ... 2 more

通过异常信息可以知道问题发生在TimSort排序算法,意思就是比较方法违反约束,具体约束规范大家可以自行网上查阅:自反性、传递性、对称性。
我们来看一下Flink的源码,只要看JSONGenerator61行就可以:

public String getJSON() throws JSONException { JSONObject json = new JSONObject(); JSONArray nodes = new JSONArray(); json.put("nodes", nodes); List<Integer> operatorIDs = new ArrayList<Integer>(streamGraph.getVertexIDs()); Collections.sort(operatorIDs, new Comparator<Integer>() { @Override public int compare(Integer o1, Integer o2) { // put sinks at the back if (streamGraph.getSinkIDs().contains(o1)) { return 1; } else if (streamGraph.getSinkIDs().contains(o2)) { return -1; } else { return o1 - o2; } } }); visit(nodes, operatorIDs, new HashMap<Integer, Integer>()); return json.toString(); }

从源码可以看在对操作ID做排序时Flink自己实现compare方法,具体这个方法的实际意义不是很明白,有明白的赐教一下。后来通过网上查阅已经有人提过此issues,地址:https://issues.apache.org/jira/browse/FLINK-8498,但状态是关闭的,也没有回复什么时候解决,但我们通过查看Flink GitHub源码发现,此处实现已经发生变更,源码地址https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
新实现源码:

public String getJSON() { ObjectNode json = mapper.createObjectNode(); ArrayNode nodes = mapper.createArrayNode(); json.put("nodes", nodes); List<Integer> operatorIDs = new ArrayList<Integer>(streamGraph.getVertexIDs()); Collections.sort(operatorIDs, new Comparator<Integer>() { @Override public int compare(Integer idOne, Integer idTwo) { boolean isIdOneSinkId = streamGraph.getSinkIDs().contains(idOne); boolean isIdTwoSinkId = streamGraph.getSinkIDs().contains(idTwo); // put sinks at the back if (isIdOneSinkId == isIdTwoSinkId) { return idOne.compareTo(idTwo); } else if (isIdOneSinkId) { return 1; } else { return -1; } } }); visit(nodes, operatorIDs, new HashMap<Integer, Integer>()); return json.toString(); }

我猜测是因为违反了自反性导致的错误,那这个问题怎么解决呢?有两种方案:

  • 方案一:去掉env.getExecutionPlan()不打印执行计划
  • 方案二:设置JVM参数-Djava.util.Arrays.useLegacyMergeSort=true

0x3 不能连续split问题

场景描述,直接看拓扑图:
screenshot
希望达到上图的流拆分,但我开开心心把代码写后发布线上运行没有任何异常,等到验证数据时才发现最终统计数据不准A-1A-2的结果都是一样的,
issues地址:https://issues.apache.org/jira/browse/FLINK-5031

0x4 不能先process操作再split

在发现不能连续split后,只能想其他办法,将拓扑图改为:
screenshot
改为此方案后,线下运行直接报错,异常信息:

Exception in thread "main" java.lang.NullPointerException at org.apache.flink.streaming.api.graph.StreamGraph.addOutputSelector(StreamGraph.java:444) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformSplit(StreamGraphGenerator.java:267) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:176) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformSelect(StreamGraphGenerator.java:282) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:178) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformPartition(StreamGraphGenerator.java:241) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:184) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:527) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:166) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generateInternal(StreamGraphGenerator.java:132) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:124) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1528) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionPlan(StreamExecutionEnvironment.java:1540) ... 2 more

网上查阅发现存在issues,地址:https://issues.apache.org/jira/browse/FLINK-9141
最终改为先splitprocess方法搞定,拓扑图如下:
screenshot

原文链接:https://yq.aliyun.com/articles/600268
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章