您现在的位置是:首页 > 文章详情

Flink1.9 Create table 语句转换 为 Operation流程分析

日期:2019-11-17点击:1239

本文主要描述 Flink1.9 新提供的 create table sql 通过 Calcite 解析,校验 并注册到catalog的过程。
样例SQL:

CREATE TABLE user_log ( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP ) WITH ( 'connector.type' = 'kafka', -- 使用 kafka connector 'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本 'connector.topic' = 'xc_user_behavior', -- kafka topic 'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取 'connector.properties.0.key' = 'zookeeper.connect', -- 连接信息 'connector.properties.0.value' = '172.16.8.107:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = '172.16.8.107:9092', 'update-mode' = 'append', 'format.type' = 'json', -- 数据源格式为 json 'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则 )

带着两个疑问:1:如何自定义create table 语法(之后会另外写一篇), 2:create table 的表信息是如何注册到catalog上的,供之后的select 语句查询;

一:StreamTableEnvironment.create() 关注:Planner 的初始化

  1. 实例化一个CatalogManager(元数据管理器)

     --初始化一个GenericInMemroyCatalog作为默认的Catalog 
  2. 实例化StreamPlanner(本文以流作为背景,所以实例化目标为StreamPlanner)
  • 初始化StreamPlanner继承的超类PlannerContext
  • 可以看到提供了统一生成Calcite FrameworkConfig的函数createFrameworkConfig,关注其中对defaultSchema的设置,(之前实例的CatalogManager的包装类CatalogManagerCalciteSchema作为默认的Schema),另外ParserFactory参数也需要关注下,FlinkSqlParserImpl.FACTORY 这个是flink 基于Calcite 扩展的支持DDL的解析类;
  • 初始化查询优化器:

     RelOptPlanner planner = new VolcanoPlanner(frameworkConfig.getCostFactory(), frameworkConfig.getContext());//基于cost的优化器 

    到这一步Planner环境初始化相关的操作已经完成。

二:解析Create table Sql语句:StreamTableEnvironment.sqlUpdate(createSqlStr)

  1. StreamTableEnvironment 调用 之前初始化环境过程中创建的 StreamPlanner 解析create table 语句 生成 CreateTableOperation;

     其中生成Operation 的部分主体是:
  • 生成Planner
  • PlannerContext基于统一提供的createFrameworkConfig() 生成FlinkPlannerImpl。
  • FlinkPlannerImpl 解析 DDL语句 生成SqlNode(如何根据Calcite自定义Table create 这个会后会另外单独写一篇)。
  • 调用validate() 对create table 信息进行校验, 包括table 的column, primaryKeys,uniqueKeys,partitionKey设置是否正确,这个校验不包括对create table 语法的校验,create 语法的校验在parse阶段已经处理过;
  • org.apache.flink.table.planner.operations.SqlToOperationConverter#convert:将SqlNode转换为org.apache.flink.table.operations.ddl.CreateTableOperation,当然convert()方法包含对Query, Create, Drop, Insert 的处理,本文只关注convertCreateTable模块;
  1. 调用TableEnvironmentImpl#registerCatalogTableInternal 把生成的CreateTableOperation注册到CatalogManager 上, 注册的元数据信息之后可以被sql 查询引用;

调用GenericInMemoryCatalog#createTable 将表的信息添加到catalog 元数据上;

原文链接:https://yq.aliyun.com/articles/727405
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章