Flink1.9 Create table 语句转换 为 Operation流程分析
本文主要描述 Flink1.9 新提供的 create table sql 通过 Calcite 解析,校验 并注册到catalog的过程。
样例SQL:
CREATE TABLE user_log ( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP ) WITH ( 'connector.type' = 'kafka', -- 使用 kafka connector 'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本 'connector.topic' = 'xc_user_behavior', -- kafka topic 'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取 'connector.properties.0.key' = 'zookeeper.connect', -- 连接信息 'connector.properties.0.value' = '172.16.8.107:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = '172.16.8.107:9092', 'update-mode' = 'append', 'format.type' = 'json', -- 数据源格式为 json 'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则 )
带着两个疑问:1:如何自定义create table 语法(之后会另外写一篇), 2:create table 的表信息是如何注册到catalog上的,供之后的select 语句查询;
一:StreamTableEnvironment.create() 关注:Planner 的初始化
-
实例化一个CatalogManager(元数据管理器)
--初始化一个GenericInMemroyCatalog作为默认的Catalog
- 实例化StreamPlanner(本文以流作为背景,所以实例化目标为StreamPlanner)
- 初始化StreamPlanner继承的超类PlannerContext
- 可以看到提供了统一生成Calcite FrameworkConfig的函数createFrameworkConfig,关注其中对defaultSchema的设置,(之前实例的CatalogManager的包装类CatalogManagerCalciteSchema作为默认的Schema),另外ParserFactory参数也需要关注下,FlinkSqlParserImpl.FACTORY 这个是flink 基于Calcite 扩展的支持DDL的解析类;
初始化查询优化器:
RelOptPlanner planner = new VolcanoPlanner(frameworkConfig.getCostFactory(), frameworkConfig.getContext());//基于cost的优化器
到这一步Planner环境初始化相关的操作已经完成。
二:解析Create table Sql语句:StreamTableEnvironment.sqlUpdate(createSqlStr)
-
StreamTableEnvironment 调用 之前初始化环境过程中创建的 StreamPlanner 解析create table 语句 生成 CreateTableOperation;
其中生成Operation 的部分主体是:
- 生成Planner
- PlannerContext基于统一提供的createFrameworkConfig() 生成FlinkPlannerImpl。
- FlinkPlannerImpl 解析 DDL语句 生成SqlNode(如何根据Calcite自定义Table create 这个会后会另外单独写一篇)。
- 调用validate() 对create table 信息进行校验, 包括table 的column, primaryKeys,uniqueKeys,partitionKey设置是否正确,这个校验不包括对create table 语法的校验,create 语法的校验在parse阶段已经处理过;
- org.apache.flink.table.planner.operations.SqlToOperationConverter#convert:将SqlNode转换为org.apache.flink.table.operations.ddl.CreateTableOperation,当然convert()方法包含对Query, Create, Drop, Insert 的处理,本文只关注convertCreateTable模块;
- 调用TableEnvironmentImpl#registerCatalogTableInternal 把生成的CreateTableOperation注册到CatalogManager 上, 注册的元数据信息之后可以被sql 查询引用;
调用GenericInMemoryCatalog#createTable 将表的信息添加到catalog 元数据上;
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
【Kubernetes系列】第9篇 网络原理解析(上篇)
1. Linux网络基础 1.1 名词解释 Network Namespace(网络命名空间):Linux在网络栈中引入网络命名空间,将独立的网络协议栈隔离到不同的命令空间中,彼此间无法通信;docker利用这一特性,实现不容器间的网络隔离。 Iptables/Netfilter:Netfilter负责在内核中执行各种挂接的规则(过滤、修改、丢弃等),运行在内核模式中;Iptables模式是在用户模式下运行的进程,负责协助维护内核中Netfilter的各种规则表;通过分析进入主机的网络封包,将封包的表头数据提取出来进行分析,以决定该联机为放行或抵挡的机制。 由于这种方式可以直接分析封包表头数据,所以包括硬件地址(MAC), 软件地址 (IP), TCP, UDP, ICMP 等封包的信息都可以进行过滤分析。 Veth设备对:Veth设备对的引入是为了实现在不同网络命名空间的通信。 Bridge(网桥):网桥是一个二层网络设备,是最简单的CNI网络插件,它首先在Host创建一个网桥,然后再通过veth pair连接该网桥到container netns。另外,Bridge模式下,多主机网...
- 下一篇
HBase最佳实践
本文致力于从架构原理、集群部署、性能优化与使用技巧等方面,阐述在如何基于HBase构建 容纳大规模数据、支撑高并发、毫秒响应、稳定高效的OLTP实时系统 。 一、架构原理 1.1 基本架构 从上层往下可以看到HBase架构中的角色分配为: Client Zookeeper HMaster RegionServer HDFS Client Client是执行查询、写入等对HBase表数据进行增删改查的使用方,可以是使用HBase Client API编写的程序,也可以是其他开发好的HBase客户端应用。 Zookeeper 同HDFS一样,HBase使用Zookeeper作为集群协调与管理系统。 在HBase中其主要的功能与职责为: 存储整个集群HMaster与RegionServer的运行状态 实现HMaster的故障恢复与自动切换 为Client提供元数据表的存储信息 HMaster、RegionServer启动之后将会在Zookeeper上注册并创建节点(/hbasae/master 与 /hbase/rs/*),同时 Zookeeper 通过Heartbeat的心跳机制来维护与监...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
-
Docker使用Oracle官方镜像安装(12C,18C,19C)
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8编译安装MySQL8.0.19
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
推荐阅读
最新文章
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS6,CentOS7官方镜像安装Oracle11G
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Hadoop3单机部署,实现最简伪集群
- MySQL8.0.19开启GTID主从同步CentOS8