Flink 闭包清除源码分析
0x1 摘要
本文主要讲解Flink里为什么需要做闭包清除?Flink是怎么实现闭包清除的?
0x2 Flink 为什么要做闭包清除
大家都知道Flink中算子都是通过序列化分发到各节点上,所以要确保算子对象是可以被序列化的,很多时候大家比较喜欢直接用匿名内部类实现算子,而匿名内部类就会带来闭包问题,当匿名内部类引用的外部对象没有实现序列化接口时,就会导致内部类无法被序列化,因此Flink框架底层必须做好清除工作。
0x3 Flink 闭包清除实现
先来看一个Map算子代码:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final DataStreamSource<String> source = env.addSource(new SourceFunction<String>() { @Override public void run(SourceContext<String> ctx) throws Exception { } @Override public void cancel() { } }); source.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { return null; } });
跟进源码查看map方法:
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) { TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(), Utils.getCallLocationName(), true); return transform("Map", outType, new StreamMap<>(clean(mapper))); }
重点关注clean(mapper)
代码,继续跟进源码,最终会走到StreamExecutionEnvironment
类的以下方法:
@Internal public <F> F clean(F f) { if (getConfig().isClosureCleanerEnabled()) { ClosureCleaner.clean(f, true); } ClosureCleaner.ensureSerializable(f); return f; }
到这里已经可以看出来闭包清除工具类ClosureCleaner
,下面我们详细剖析一下此类。
先看clean
方法:
public static void clean(Object func, boolean checkSerializable) { if (func == null) { return; } final Class<?> cls = func.getClass(); // First find the field name of the "this$0" field, this can // be "this$x" depending on the nesting boolean closureAccessed = false; for (Field f: cls.getDeclaredFields()) { if (f.getName().startsWith("this$")) { // found a closure referencing field - now try to clean closureAccessed |= cleanThis0(func, cls, f.getName()); } } if (checkSerializable) { try { InstantiationUtil.serializeObject(func); } catch (Exception e) { String functionType = getSuperClassOrInterfaceName(func.getClass()); String msg = functionType == null ? (func + " is not serializable.") : ("The implementation of the " + functionType + " is not serializable."); if (closureAccessed) { msg += " The implementation accesses fields of its enclosing class, which is " + "a common reason for non-serializability. " + "A common solution is to make the function a proper (non-inner) class, or " + "a static inner class."; } else { msg += " The object probably contains or references non serializable fields."; } throw new InvalidProgramException(msg, e); } } }
方法参数:
-
func
:要清除的对应 -
checkSerializable
:清除完成后是否需要调用序列方法进行验证
第一步:查找闭包引用的成员变量,通过反射判断成员变量名是否包含this$
来判定,代码片断:
for (Field f: cls.getDeclaredFields()) { if (f.getName().startsWith("this$")) { // found a closure referencing field - now try to clean closureAccessed |= cleanThis0(func, cls, f.getName()); } }
找到闭包引用的成员变量后,调用内部私有方法cleanThis0
方法处理,看方法源码:
private static boolean cleanThis0(Object func, Class<?> cls, String this0Name) { This0AccessFinder this0Finder = new This0AccessFinder(this0Name); getClassReader(cls).accept(this0Finder, 0); final boolean accessesClosure = this0Finder.isThis0Accessed(); if (LOG.isDebugEnabled()) { LOG.debug(this0Name + " is accessed: " + accessesClosure); } if (!accessesClosure) { Field this0; try { this0 = func.getClass().getDeclaredField(this0Name); } catch (NoSuchFieldException e) { // has no this$0, just return throw new RuntimeException("Could not set " + this0Name + ": " + e); } try { this0.setAccessible(true); this0.set(func, null); } catch (Exception e) { // should not happen, since we use setAccessible throw new RuntimeException("Could not set " + this0Name + " to null. " + e.getMessage(), e); } } return accessesClosure; }
核心代码this0.set(func, null);
将闭包引用置空处理,此方法还用到了ASM包,具体逻辑没完成整明白。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Phoenix映射HBase时间戳的一种实现
Phoenix映射HBase时间戳的一种实现 官方实现 Apache Phoenix从4.6版本开始,提供了ROW_TIMESTAMP标签,来映射HBase的原生时间戳。但使用起来有以下限制: 只有主键中的TIME, DATE, TIMESTAMP, BIGINT, UNSIGNED_LONG类型的字段才能设置成ROW_TIMESTAMP 只能有一个主键列能被设置成ROW_TIMESTAMP ROW_TIMESTAMP标志的字段不能为null值 只有在建表的时候,某一列才能被设置成ROW_TIMESTAMP ROW_TIMESTAMP标志的列不能为负数 除了上面使用上的限制,还有应用场景的限制。根据上面的描述,ROW_TIMESTAMP字段有以下几种形式。 业务主键在前 ROW_TIMESTAMP字段在前 只有ROW_TIMESTAMP字段 我们来看下各
- 下一篇
使用EMR Spark Relational Cache跨集群同步数据 | 6月6号云栖夜读
点击订阅云栖夜读日刊,专业的技术干货,不容错过! 阿里专家原创好文 1.使用EMR Spark Relational Cache跨集群同步数据 Relational Cache是EMR Spark支持的一个重要特性,主要通过对数据进行预组织和预计算加速数据分析,提供了类似传统数据仓库物化视图的功能。除了用于提升数据处理速度,Relational Cache还可以应用于其他很多场景,本文主要介绍如何使用Relational Cache跨集群同步数据表。阅读更多》》 2.容器服务kubernetes federation v2实践一:基于External-DNS的多集群Ingress DNS实践 概要 External-DNS提供了编程方式管理Kubernetes Ingress资源的DNS的功能,方便用户从Ingress管理DNS解析记录。而在kubernetes federation v2环境中,使用External-DNS可以快速的管理多个联邦集群的Ingress DNS解析,降低用户的操作成本。阅读更多》》 3.漫谈分布式计算框架 本文主要谈了一些分布式计算框架方面的心得。阅读更多...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
-
Docker使用Oracle官方镜像安装(12C,18C,19C)
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8编译安装MySQL8.0.19
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
推荐阅读
最新文章
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- 设置Eclipse缩进为4个空格,增强代码规范
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS8编译安装MySQL8.0.19
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- Docker安装Oracle12C,快速搭建Oracle学习环境