Flink 1.4.2 版本踩过的坑
0x1 摘要
最近业务要实时统计半小时维度的UV、PV数据,经过调研准备用Flink时间窗来实现,主要是Flink对eventTime
的支持,可以做到更精准的统计,由于第一次尝试使用Flink,所以过程中遇到不少问题,记录下来方便后续查阅。
0x2 执行计划输出JSON问题
Flink对执行计划分析提供了支持,可以通过代码将执行计划打出来,并利用官网提供的图生成工具可以方便分析,通过env.getExecutionPlan()
方法可以获取JSON格式的执行计划,将JSON字符串拷贝到http://flink.apache.org/visualizer/
网站文本框就可以查看。
但我们在项目中调用env.getExecutionPlan()
方法后报以下异常信息:
Caused by: java.lang.IllegalArgumentException: Comparison method violates its general contract! at java.util.TimSort.mergeLo(TimSort.java:777) at java.util.TimSort.mergeAt(TimSort.java:514) at java.util.TimSort.mergeForceCollapse(TimSort.java:457) at java.util.TimSort.sort(TimSort.java:254) at java.util.Arrays.sort(Arrays.java:1512) at java.util.ArrayList.sort(ArrayList.java:1454) at java.util.Collections.sort(Collections.java:175) at org.apache.flink.streaming.api.graph.JSONGenerator.getJSON(JSONGenerator.java:61) at org.apache.flink.streaming.api.graph.StreamGraph.getStreamingPlanAsJSON(StreamGraph.java:663) ... 2 more
通过异常信息可以知道问题发生在TimSort
排序算法,意思就是比较方法违反约束,具体约束规范大家可以自行网上查阅:自反性、传递性、对称性。
我们来看一下Flink的源码,只要看JSONGenerator
类61
行就可以:
public String getJSON() throws JSONException { JSONObject json = new JSONObject(); JSONArray nodes = new JSONArray(); json.put("nodes", nodes); List<Integer> operatorIDs = new ArrayList<Integer>(streamGraph.getVertexIDs()); Collections.sort(operatorIDs, new Comparator<Integer>() { @Override public int compare(Integer o1, Integer o2) { // put sinks at the back if (streamGraph.getSinkIDs().contains(o1)) { return 1; } else if (streamGraph.getSinkIDs().contains(o2)) { return -1; } else { return o1 - o2; } } }); visit(nodes, operatorIDs, new HashMap<Integer, Integer>()); return json.toString(); }
从源码可以看在对操作ID做排序时Flink自己实现compare
方法,具体这个方法的实际意义不是很明白,有明白的赐教一下。后来通过网上查阅已经有人提过此issues,地址:https://issues.apache.org/jira/browse/FLINK-8498
,但状态是关闭的,也没有回复什么时候解决,但我们通过查看Flink GitHub源码发现,此处实现已经发生变更,源码地址https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
新实现源码:
public String getJSON() { ObjectNode json = mapper.createObjectNode(); ArrayNode nodes = mapper.createArrayNode(); json.put("nodes", nodes); List<Integer> operatorIDs = new ArrayList<Integer>(streamGraph.getVertexIDs()); Collections.sort(operatorIDs, new Comparator<Integer>() { @Override public int compare(Integer idOne, Integer idTwo) { boolean isIdOneSinkId = streamGraph.getSinkIDs().contains(idOne); boolean isIdTwoSinkId = streamGraph.getSinkIDs().contains(idTwo); // put sinks at the back if (isIdOneSinkId == isIdTwoSinkId) { return idOne.compareTo(idTwo); } else if (isIdOneSinkId) { return 1; } else { return -1; } } }); visit(nodes, operatorIDs, new HashMap<Integer, Integer>()); return json.toString(); }
我猜测是因为违反了自反性
导致的错误,那这个问题怎么解决呢?有两种方案:
- 方案一:去掉
env.getExecutionPlan()
不打印执行计划 - 方案二:设置JVM参数
-Djava.util.Arrays.useLegacyMergeSort=true
0x3 不能连续split问题
场景描述,直接看拓扑图:
希望达到上图的流拆分,但我开开心心把代码写后发布线上运行没有任何异常,等到验证数据时才发现最终统计数据不准A-1
和A-2
的结果都是一样的,
issues地址:https://issues.apache.org/jira/browse/FLINK-5031
0x4 不能先process操作再split
在发现不能连续split
后,只能想其他办法,将拓扑图改为:
改为此方案后,线下运行直接报错,异常信息:
Exception in thread "main" java.lang.NullPointerException at org.apache.flink.streaming.api.graph.StreamGraph.addOutputSelector(StreamGraph.java:444) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformSplit(StreamGraphGenerator.java:267) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:176) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformSelect(StreamGraphGenerator.java:282) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:178) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformPartition(StreamGraphGenerator.java:241) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:184) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:527) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:166) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generateInternal(StreamGraphGenerator.java:132) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:124) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1528) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionPlan(StreamExecutionEnvironment.java:1540) ... 2 more
网上查阅发现存在issues,地址:https://issues.apache.org/jira/browse/FLINK-9141
最终改为先split
再process
方法搞定,拓扑图如下:
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Tomcat学习笔记----本地部署servlet动态资源
前言: 链接:Tomcat学习笔记--简单了解和Web应用的目录结构以及常见的Web应用时出现404错误 有问题:什么是动态资源? 静态资源:当用户多次访问这个资源,资源的源代码永远不会改变的资源 动态资源:当用户多次访问这个资源,资源的源代码可能会发生改变。 Servlet : 用java语言来编写动态资源的开发技术。 Servlet特点: 1)普通的java类,继承HttpServlet类,覆盖doGet方法 2)Servlet类只能交给tomcat服务器运行(开发者自己不能运行) 所以要配置web.xml具体下面会讲 静态访问URL举例:http://localhost:8080/xx/zz.html动态访问URL举例:http://localhost:8080/xx/One区别看到没 动态结尾没有后缀名,这个下面会解释的。 进入正题: 第一步:创建一个servlet 为了 了解Tomcat部署的流程,我是照着例子手动敲了一个servlet例子。 编写一个servlet程序,继承HttpServlet 但是继承HttpServlet需要一个jar包:servlet-api.jar...
- 下一篇
想成为软件架构师,一定要看的经典书单
点击链接购书 参与文末话题讨论,每日赠送异步图书 ——异步小编 互联网的发展带动了各行各业信息化的趋势,一大批高新企业如雨后春笋般出现在大众的视野中。于是,不同类型的软件项目应运而生。在这些琳琅满目的项目中,有企业管理、电商平台、财务报表、金融银行、医疗器械、智慧城市和大数据分析等类型。项目的层出不穷带来了巨大的利润,让高新企业不断地成长起来,与此同时,也带来了很多相关的就业岗位。 当然,要顺利地完成这些项目,就需要大量的软件工程师。这种硬性的需求又养活了一大批培训机构,从事软件行业的人员当初是凤毛麟角,现在依然是供不应求。那么,如何提高软件工程师的开发技能就成了一个无法回避的问题。诚然,公司可以不定期进行培训,提高开发人员的技能水平,但从更普遍、更直接的意义上来说,提高技能水平的最佳方式还是系统地阅读相关书籍。 计算机语言从机器语言、汇编语言发展到现在的高级语言,这个过程中诞生了很多种语言。有些语言已经逐步退出历史舞台,有些语言仍然在小众化的范围内存在。而Java语言,经历了二十多年的发展,仍然保持着旺盛的生命力,在编程语言排行榜中高居不下,Java程序员的数量也与日俱增,这种现象...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
-
Docker使用Oracle官方镜像安装(12C,18C,19C)
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8编译安装MySQL8.0.19
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
推荐阅读
最新文章
- Mario游戏-低调大师作品
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- CentOS7,CentOS8安装Elasticsearch6.8.6
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- CentOS6,CentOS7官方镜像安装Oracle11G
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- 设置Eclipse缩进为4个空格,增强代码规范