关于flink的时间处理不正确的现象复现&原因分析
跟朋友聊天,说输出的时间不对,之前测试没关注到这个,然后就在processing模式下看了下,发现时间确实不正确
然后就debug,看问题在哪,最终分析出了原因,记录如下:
具体我在朋友的https://github.com/apache/flink/pull/7180 最下面给出了复现方案及原因分析
let me show how to generate the wrong result background: processing time in tumbling window flink:1.5.0 the invoke stack is as follows: [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747) [2] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:53) [3] org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply (IncrementalAggregateWindowFunction.scala:74) [4] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:72) [5] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:39) [6] org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process (InternalSingleValueWindowFunction.java:46) [7] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents (WindowOperator.java:550) [8] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime (WindowOperator.java:505) [9] org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime (HeapInternalTimerService.java:266) [10] org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run (SystemProcessingTimeService.java:281) [11] java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511) [12] java.util.concurrent.FutureTask.run (FutureTask.java:266) [13] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 (ScheduledThreadPoolExecutor.java:180) [14] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:293) [15] java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1,142) [16] java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617) [17] java.lang.Thread.run (Thread.java:748) now ,we are at [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747) and the code is as follows: public static Timestamp internalToTimestamp(long v) { return new Timestamp(v - LOCAL_TZ.getOffset(v)); } let us print the value of windowStart:v print v v = 1544074830000 let us print the value of windowEnd:v print v v = 1544074833000 after this, come back to [1] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:51) then,we will execute ` if (windowStartOffset.isDefined) { output.setField( lastFieldPos + windowStartOffset.get, SqlFunctions.internalToTimestamp(windowStart)) } if (windowEndOffset.isDefined) { output.setField( lastFieldPos + windowEndOffset.get, SqlFunctions.internalToTimestamp(windowEnd)) } ` before execute,the output is output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,null,null,null" after execute,the output is output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,2018-12-06 05:40:30.0,2018-12-06 05:40:33.0,null" so,do you think the long value 1544074830000 translated to be 2018-12-06 05:40:30.0 long value 1544074833000 translated to be 2018-12-06 05:40:33.0 would be right? I am in China, I think the timestamp should be 2018-12-06 13:40:30.0 and 2018-12-06 13:40:33.0 okay,let us continue now ,the data will be write to kafka,before write ,the data will be serialized let us see what happened! the call stack is as follows: [1] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer._timestamp (DateSerializer.java:41) [2] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:48) [3] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:15) [4] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue (DefaultSerializerProvider.java:130) [5] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValue (ObjectMapper.java:2,444) [6] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.valueToTree (ObjectMapper.java:2,586) [7] org.apache.flink.formats.json.JsonRowSerializationSchema.convert (JsonRowSerializationSchema.java:189) [8] org.apache.flink.formats.json.JsonRowSerializationSchema.convertRow (JsonRowSerializationSchema.java:128) [9] org.apache.flink.formats.json.JsonRowSerializationSchema.serialize (JsonRowSerializationSchema.java:102) [10] org.apache.flink.formats.json.JsonRowSerializationSchema.serialize (JsonRowSerializationSchema.java:51) [11] org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper.serializeValue (KeyedSerializationSchemaWrapper.java:46) [12] org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke (FlinkKafkaProducer010.java:355) [13] org.apache.flink.streaming.api.operators.StreamSink.processElement (StreamSink.java:56) [14] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator (OperatorChain.java:560) [15] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:535) [16] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:515) [17] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679) [18] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657) [19] org.apache.flink.streaming.api.operators.StreamMap.processElement (StreamMap.java:41) [20] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator (OperatorChain.java:560) [21] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:535) [22] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:515) [23] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679) [24] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657) [25] org.apache.flink.streaming.api.operators.TimestampedCollector.collect (TimestampedCollector.java:51) [26] org.apache.flink.table.runtime.CRowWrappingCollector.collect (CRowWrappingCollector.scala:37) [27] org.apache.flink.table.runtime.CRowWrappingCollector.collect (CRowWrappingCollector.scala:28) [28] DataStreamCalcRule$88.processElement (null) [29] org.apache.flink.table.runtime.CRowProcessRunner.processElement (CRowProcessRunner.scala:66) [30] org.apache.flink.table.runtime.CRowProcessRunner.processElement (CRowProcessRunner.scala:35) [31] org.apache.flink.streaming.api.operators.ProcessOperator.processElement (ProcessOperator.java:66) [32] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator (OperatorChain.java:560) [33] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:535) [34] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:515) [35] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679) [36] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657) [37] org.apache.flink.streaming.api.operators.TimestampedCollector.collect (TimestampedCollector.java:51) [38] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:65) [39] org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply (IncrementalAggregateWindowFunction.scala:74) [40] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:72) [41] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:39) [42] org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process (InternalSingleValueWindowFunction.java:46) [43] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents (WindowOperator.java:550) [44] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime (WindowOperator.java:505) [45] org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime (HeapInternalTimerService.java:266) [46] org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run (SystemProcessingTimeService.java:281) [47] java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511) [48] java.util.concurrent.FutureTask.run (FutureTask.java:266) [49] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 (ScheduledThreadPoolExecutor.java:180) [50] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:293) [51] java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1,142) [52] java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617) [53] java.lang.Thread.run (Thread.java:748) and the code is as follows: protected long _timestamp(Date value) { return value == null ? 0L : value.getTime(); } here,use windowEnd for example,the value is value = "2018-12-06 05:40:33.0" value.getTime() = 1544046033000 see,the initial value is 1544074833000 and the final value is 1544046033000 the minus value is 28800000, ---> 8 hours ,because I am in China. why? the key reason is SqlFunctions.internalToTimestamp public static Timestamp internalToTimestamp(long v) { return new Timestamp(v - LOCAL_TZ.getOffset(v)); } in the code, It minus the LOCAL_TZ , I think it is redundant!
刚才又看了下,其实根本原因就是时间转换来转换去,没有用同一个类,用了2个类的方法
结果就乱套了,要改的话就是SqlFunctions的那个类
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
CSDN文章自动展开全文无需登录插件(仅限Chrome)!
众所周知csdn里所有blog都记录了程序员们多年的技术积累,他们不吝啬技术,免费分享经验,随着资料的丰富,那些踩过的坑,报过的错,全被前人当成树种下了,现在各类问题基本都能在网上得到解决方案。而csdn却拿着这些资源当王牌,限制行内人士自由,连看文章都需要登录(如果你是作者,没有登录的话,可能连你自己的文章都看不了!)。早年csdn在国内基本一家独大,有些大牛们的blog也在csdn,没有别的选择。但是现在大牛们都有了自己的blog,或者有些已经不更新了,同时随着osc的日益壮大,导致csdn流量暴跌,他们试图以登录的方式来挽回损失的流量,搞了这么一出,登录才能看全文,简直恶心至极!! 对于csdn这种暴力行为,作为码农第一反应当然是拒绝的,我就想安安静静看个文章,你让我手动展开全文就算了(根本不知道你们加这个功能到底干啥,跟风好玩?),最主要还要求我登录,真是觉得手握资源就可以为所欲为?既然你这样,就不要怪我那样。 csdn文章内容其实在一开始就加载完了,只是设置了height和overflow把一部分隐藏起来了,想要自动展开全文只需要把这两个去掉就行了,用chrome或者...
- 下一篇
SpringBoot服务器压测对比(jetty、tomcat、undertow)
(麻烦看这篇的大大们,穿越到这https://my.oschina.net/shyloveliyi/blog/2980868) 1、本次对比基础环境信息如下: springboot版本1.5.10 centos虚机4c6G,版本7.4 centos实机2u16c40G,版本7.4,虚机运行在实机上 ab版本2.3 jprofiler版本9.1.1 2、压测接口说明 天花板:指的是一个空接口,没有任何实现,直接返回,如 @RequestMapping(value = "/test", method = RequestMethod.GET) public void test() { } 服务接口:指的是具有一定业务代码的接口,连接数据库/Redis然后返回json数据 异步接口:指的是开启了http异步 3、压测过程 JETTY 先以Jetty开始,这里通过优化参数来不断摸底。以下是参数说明: jettyMin:最小连接数 jettyMax:最大连接数 mvcCore:线程池core数量 mvcMax:线程池最大量 mvcQueue:线程池队列大...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- MySQL8.0.19开启GTID主从同步CentOS8
- Red5直播服务器,属于Java语言的直播服务器
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- SpringBoot2整合Redis,开启缓存,提高访问速度
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- CentOS7,CentOS8安装Elasticsearch6.8.6
- Hadoop3单机部署,实现最简伪集群
- CentOS8编译安装MySQL8.0.19
- CentOS7,8上快速安装Gitea,搭建Git服务器