关于flink消费kafka的序列化时-如何拿到消息的meta信息
首先,我们故意制造一个异常
然后,发送一个JSON数据,并且缺失了一些字段,看看报什么错!
顺利抓到调用栈
[2018-11-23 13:24:32,877] INFO Source: MyKafka010JsonTableSource -> from: (pro, throwable, level, ip, SPT) -> where: (AND(IS NOT NULL(pro), IS NOT NULL(throwable), IS NOT NULL(level), IS NOT NULL(ip))), select: (pro, throwable, level, ip, SPT) (3/3) (7da5fcb7cd77ff35b64d8bbe367a84d7) switched from RUNNING to FAILED. org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:927) java.io.IOException: Failed to deserialize JSON object. at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:97) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:50) at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:142) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:721) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalStateException: Could not find field with name 'ip'. at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertRow(JsonRowDeserializationSchema.java:189) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:95) ... 10 more [2018-11-23 13:24:32,877] INFO Source: MyKafka010JsonTableSource -> from: (pro, throwable, level, ip, SPT) -> where: (AND(IS NOT NULL(pro), IS NOT NULL(throwable), IS NOT NULL(level), IS NOT NULL(ip))), select: (pro, throwable, level, ip, SPT) (2/3) (f68577aede6953de859c636d9d3b6f3e) switched from RUNNING to FAILED. org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:927) java.io.IOException: Failed to deserialize JSON object. at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:97) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:50) at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:142) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:721) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalStateException: Could not find field with name 'ip'. at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertRow(JsonRowDeserializationSchema.java:189) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:95) ... 10 more [2018-11-23 13:24:32,877] INFO Source: MyKafka010JsonTableSource -> from: (pro, throwable, level, ip, SPT) -> where: (AND(IS NOT NULL(pro), IS NOT NULL(throwable), IS NOT NULL(level), IS NOT NULL(ip))), select: (pro, throwable, level, ip, SPT) (1/3) (8a2c89aa3a2ea8f4c4423c69b22b2522) switched from RUNNING to FAILED. org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:927) java.io.IOException: Failed to deserialize JSON object. at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:97) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:50) at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:142) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:721) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalStateException: Could not find field with name 'ip'. at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertRow(JsonRowDeserializationSchema.java:189) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:95) ... 10 more [2018-11-23 13:24:32,892] INFO Freeing task resources for Source: MyKafka010JsonTableSource -> from: (pro, throwable, level, ip, SPT) -> where: (AND(IS NOT NULL(pro), IS NOT NULL(throwable), IS NOT NULL(level), IS NOT NULL(ip))), select: (pro, throwable, level, ip, SPT) (1/3) (8a2c89aa3a2ea8f4c4423c69b22b2522). org.apache.flink.runtime.taskmanager.Task.run(Task.java:810)
这里只是为了让大家对内部序列化的地点有个印象。
如果你看懂了,下面进入正题
1)自定义的TableSource中改写
protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) { //下面2行本质是一样的,但是因为需要自定义,所以自己返回 //super.createKafkaConsumer(topic, properties, deserializationSchema); return new FlinkKafkaConsumer010<>(Collections.singletonList(topic), new MyKeyedDeserializationSchemaWrapper<>(deserializationSchema), properties); }
2)MyKeyedDeserializationSchemaWrapper.java
package org.apache.flink.api.java.io.kafka; import java.io.IOException; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; import org.apache.flink.types.Row; public class MyKeyedDeserializationSchemaWrapper<T> extends KeyedDeserializationSchemaWrapper<T> { // private static final org.slf4j.Logger Logger = LoggerFactory // .getLogger(MyKeyedDeserializationSchemaWrapper.class); private static final int NOT_FOUND = -1; /** */ private static final long serialVersionUID = -6984024233082983820L; // private static final String KAFKA_TOPIC = "KAFKA_TOPIC"; private int KAFKA_TOPIC_INDEX = NOT_FOUND; // private static final String KAFKA_PARTITION = "KAFKA_PARTITION"; private int KAFKA_PARTITION_INDEX = NOT_FOUND; // private static final String KAFKA_OFFSET = "KAFKA_OFFSET"; private int KAFKA_OFFSET_INDEX = NOT_FOUND; // private boolean inited = false; public MyKeyedDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) { super(deserializationSchema); } @SuppressWarnings("unchecked") @Override public T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { Row row = (Row) super.deserialize(messageKey, message, topic, partition, offset); //只执行1次 if (false == inited) { RowTypeInfo rowTypeInfo = (RowTypeInfo) super.getProducedType();//Row(pro: String, throwable: String, level: String, ip: String, SPT: Timestamp) KAFKA_TOPIC_INDEX = rowTypeInfo.getFieldIndex(KAFKA_TOPIC); KAFKA_PARTITION_INDEX = rowTypeInfo.getFieldIndex(KAFKA_PARTITION); KAFKA_OFFSET_INDEX = rowTypeInfo.getFieldIndex(KAFKA_OFFSET); inited = true; } //每次都要执行一遍 if (NOT_FOUND != KAFKA_TOPIC_INDEX) { row.setField(KAFKA_TOPIC_INDEX, topic); } if (NOT_FOUND != KAFKA_PARTITION_INDEX) { row.setField(KAFKA_PARTITION_INDEX, partition); } if (NOT_FOUND != KAFKA_OFFSET_INDEX) { row.setField(KAFKA_OFFSET_INDEX, offset); } //返回最终结果 return (T) row; } }
测试一把看看效果!
完美!
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
四, 跨语言微服务框架 - Istio官方示例(超时控制,熔断器,流量复制)
基础的Istio环境已经搭建完成,我们需要开始了解Istio提供作为微服务网格的各种机制,也就是本文标题的(超时控制,熔断器,流量复制,速率控制)官方很给力的准备的实例项目也不需要大家自己编写demo来进行测试,那就来时跑跑看吧. 附上: 喵了个咪的博客:w-blog.cn Istio官方地址:https://preliminary.istio.io/zh Istio中文文档:https://preliminary.istio.io/zh/docs/ PS : 此处基于当前最新istio版本1.0.3版本进行搭建和演示 一. 超时控制 在真正的请求过程中我们常常会给对应的服务给一个超时时间来保证足够的用户体验,通过硬编码的方式当然不理想,Istio提供对应的超时控制方式: 1. 先还原所有的路由配置: kubectl apply -n istio-test -f istio-1.0.3/samples/bookinfo/networking/virtual-service-all-v1.yaml 可以在路由规则的 timeout 字段中来给 http 请求设置请求超时。缺省情况下,超时...
- 下一篇
使用Karate轻松实现自动API测试
如果您想做自动API测试,但没有编程背景,那么你必须要给Karate一个机会! Karate由Intuit作为开源工具发布。该工具旨在用于自动API测试,并具有使API测试变得轻而易举且实际上令人愉快的所有必需功能。 与需要大量编码的其他自动化API测试工具不同,即使只是做基本的东西,Karate开箱即用。您可以在不了解任何编程语言的情况下构建最复杂的请求 - 响应操作。您所要做的就是使用纯文本Gherkin样式编写要素文件。 因为Karate是一个完整的DSL并且位于Cucumber-JVM之上 ,所以你可以像任何标准的Java项目一样运行测试并生成报告,但是你不是编写Java代码,而是用一种用来处理HTTP,JSON的语言来编写测试。或XML简单易用。 虽然没有使用Karate的先决条件,但如果您对HTTP,JSON,XML,JsonPath以及XPath和JavaScript有基本的了解,它会有所帮助。 下面,我们将介绍一些您通常在自动API测试中执行的典型操作,但首先是关于为Karate设置环境的快速指南。 Maven的 如果您使用的是Maven,则需要以下两个依赖项 <...
相关文章
文章评论
共有0条评论来说两句吧...