数栈技术分享:用短平快的方式告诉你Flink-SQL的扩展实现
数栈是云原生—站式数据中台PaaS,我们在github和gitee上有一个有趣的开源项目:FlinkX,FlinkX是一个基于Flink的批流统一的数据同步工具,既可以采集静态的数据,也可以采集实时变化的数据,是全域、异构、批流一体的数据同步引擎。大家喜欢的话请给我们点个star!star!star!
github开源项目:https://github.com/DTStack/flinkx
gitee开源项目:https://gitee.com/dtstack_dev_0/flinkx
首先,本文所述均基于flink 1.5.4。
一、我们为什么扩展Flink-SQL?
由于Flink 本身SQL语法并不提供在对接输入源和输出目的的SQL语法。数据开发在使用的过程中需要根据其提供的Api接口编写Source和 Sink, 异常繁琐,不仅需要了解FLink 各类Operator的API,还需要对各个组件的相关调用方式有了解(比如kafka,redis,mongo,hbase等),并且在需要关联到外部数据源的时候没有提供SQL相关的实现方式,因此数据开发直接使用Flink编写SQL作为实时的数据分析时需要较大的额外工作量。
我们的目的是在使用Flink-SQL的时候只需要关心做什么,而不需要关心怎么做。不需要过多的关心程序的实现,专注于业务逻辑。
接下来,我们一起来看下Flink-SQL的扩展实现吧!
二、扩展了哪些flink相关sql
1、创建源表语句
2、创建输出表语句
3、创建自定义函数
4、维表关联
三、各个模块是如何翻译到flink的实现
1、如何将创建源表的sql语句转换为flink的operator
Flink中表的都会映射到Table这个类。然后调用注册方法将Table注册到environment。
StreamTableEnvironment.registerTable(tableName, table);
当前我们只支持kafka数据源。Flink本身有读取kafka 的实现类, FlinkKafkaConsumer09,所以只需要根据指定参数实例化出该对象。并调用注册方法注册即可。
另外需要注意在flink sql经常会需要用到rowtime, proctime, 所以我们在注册表结构的时候额外添加rowtime,proctime。
当需要用到rowtime的使用需要额外指定DataStream.watermarks(assignTimestampsAndWatermarks),自定义watermark主要做两个事情:1:如何从Row中获取时间字段。 2:设定最大延迟时间。
2、 如何将创建的输出表sql语句转换为flink的operator
Flink输出Operator的基类是OutputFormat, 我们这里继承的是RichOutputFormat, 该抽象类继承OutputFormat,额外实现了获取运行环境的方法getRuntimeContext(), 方便于我们之后自定义metric等操作。
我们以输出到mysql插件mysql-sink为例,分两部分:
- 将create table 解析出表名称,字段信息,mysql连接信息。
该部分使用正则表达式的方式将create table 语句转换为内部的一个实现类。该类存储了表名称,字段信息,插件类型,插件连接信息。
- 继承RichOutputFormat将数据写到对应的外部数据源。
主要是实现writeRecord方法,在mysql插件中其实就是调用jdbc 实现插入或者更新方法。
3、如何将自定义函数语句转换为flink的operator;
Flink对udf提供两种类型的实现方式:
1)继承ScalarFunction
2)继承TableFunction
需要做的将用户提供的jar添加到URLClassLoader, 并加载指定的class (实现上述接口的类路径),然后调用TableEnvironment.registerFunction(funcName, udfFunc);即完成了udf的注册。之后即可使用改定义的udf;
4、维表功能是如何实现的?
流计算中一个常见的需求就是为数据流补齐字段。因为数据采集端采集到的数据往往比较有限,在做数据分析之前,就要先将所需的维度信息补全,但是当前flink并未提供join外部数据源的SQL功能。
实现该功能需要注意的几个问题:
1)维表的数据是不断变化的
在实现的时候需要支持定时更新内存中的缓存的外部数据源,比如使用LRU等策略。
2)IO吞吐问题
如果每接收到一条数据就串行到外部数据源去获取对应的关联记录的话,网络延迟将会是系统最大的瓶颈。这里我们选择阿里贡献给flink社区的算子RichAsyncFunction。该算子使用异步的方式从外部数据源获取数据,大大减少了花费在网络请求上的时间。
3)如何将sql 中包含的维表解析到flink operator
为了从sql中解析出指定的维表和过滤条件, 使用正则明显不是一个合适的办法。需要匹配各种可能性。将是一个无穷无尽的过程。查看flink本身对sql的解析。它使用了calcite做为sql解析的工作。将sql解析出一个语法树,通过迭代的方式,搜索到对应的维表;然后将维表和非维表结构分开。
通过上述步骤可以通过SQL完成常用的从kafka源表,join外部数据源,写入到指定的外部目的结构中。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
前端安全 — 浅谈JavaScript拦截XSS攻击
XSS/跨站脚本攻击,是一种代码注入网页攻击,攻击者可以将代码植入到其他用户都能访问到的页面(如论坛、留言板、贴吧等)中。 如今,XSS 攻击所涉及的场景愈发广泛。越来越多的客户端软件支持 html 解析和 JavaScript 解析,比如:HTML 文档、XML 文档、Flash、PDF、QQ、一些音乐播放器以及浏览器的功能界面等。这些用户经常使用的场景往往都是 XSS 攻击的高发地带。 一、XSS攻击类型 1. 存储型XSS(持久型) 攻击者在表单内提交恶意 js 代码 ( 如 <script>alert('hello')</script> ),网站后端对提交数据不做任何安全处理,直接存储在数据库中。当其他用户访问这个已被攻击的网站,js 代码攻击就会被触发。这个类型的 XSS 攻击会存储在数据库中,持续时间长,影响范围大。 2. 反射型XSS(非持久型) 反射型 XSS 攻击,是正常用户请求一个非法资源时触发的攻击。此类型攻击通常需要用户主动去访问带攻击的链接,一旦点开了链接,大概率被成功攻击(比如:我有一部电影资源,请点击 http://xxxxx 下载...
- 下一篇
Waifu2x-Extension-GUI v3.61.01-beta 发布,机器学习多媒体处理应用
Waifu2x-Extension-GUI v3.61.01-beta 已经发布,这是一个机器学习多媒体处理应用。 此版本更新内容包括: ⚠⚠ 这是一个 测试版, 点此获取 稳定版 ⚠⚠ 更新日志: 新特性: PSD(Photoshop Document)图片处理. 修复bug: 无法正常调整界面大小. 下载(PRC): 因本软件属于独立开发的项目, 没有给各大杀毒软件交过保护费, 所以会被一些杀毒软件误报病毒. ➡超星云盘(不限速, 直链) ➡百度网盘 提取码: d6qx ➡GitHub.com Waifu2x-Extension-GUI-v3.61.01-beta-Win64.7z SHA256: cf5465ec98b72660c1e35da23a3c9e2f10d87b2bcfabe1ef718fce25902cb98a 详情查看:https://gitee.com/aaronfeng0711/Waifu2x-Extension-GUI/releases/v3.61.01-beta
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS7安装Docker,走上虚拟化容器引擎之路
- Docker安装Oracle12C,快速搭建Oracle学习环境
- CentOS8编译安装MySQL8.0.19
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS关闭SELinux安全模块
- CentOS6,CentOS7官方镜像安装Oracle11G
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Docker使用Oracle官方镜像安装(12C,18C,19C)