详解 Flink Catalog 在 ChunJun 中的实践之路
我们知道 Flink 有Table(表)、View(视图)、Function(函数/算子)、Database(数据库)的概念,相对于这些耳熟能详的概念,Flink 里还有一个 Catalog(目录) 的概念。
本文将为大家带来 Flink Catalog 的介绍以及 Flink Catalog 在 ChunJun 中的实践之路。
Flink Catalog 简介
Catalog 提供元数据,如数据库、表、分区、视图,以及访问存储在数据库或其他外部系统中的数据所需的函数和信息。
Flink Catalog 作用
数据处理中最关键的一个方面是管理元数据:
· 可能是暂时性的元数据,如临时表,或针对表环境注册的 UDFs;
· 或者是永久性的元数据,比如 Hive 元存储中的元数据。
Catalog 提供了一个统一的 API 来管理元数据,并使其可以从表 API 和 SQL 查询语句中来访问。
Catalog 使用户能够引用他们数据系统中的现有元数据,并自动将它们映射到 Flink 的相应元数据。例如,Flink 可以将 JDBC 表自动映射到 Flink 表,用户不必在 Flink 中手动重写 DDL。Catalog 大大简化了用户现有系统开始使用 Flink 所需的步骤,并增强了用户体验。
Flink Catalog 的结构
● Flink Catalog 原生结构
• GenericInMemoryCatalog:基于内存实现的 Catalog
• Jdbc Catalog:可以将 Flink 通过 JDBC 协议连接到关系数据库,目前 Flink 在1.12和1.13中有不同的实现,包括 MySql Catalog 和 Postgres Catalog
• Hive Catalog:作为原生 Flink 元数据的持久化存储,以及作为读写现有 Hive 元数据的接口
● Flink Iceberg Catalog
● Flink Hudi Catalog
HoodieCatalog、HoodieHiveCatalog
Flink Catalog 详解
GenericInMemoryCatalog
final CatalogManager catalogManager = CatalogManager.newBuilder() .classLoader(userClassLoader) .config(tableConfig) .defaultCatalog( settings.getBuiltInCatalogName(), new GenericInMemoryCatalog( settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName())) .build(); defaultCatalog = new GenericInMemoryCatalog( defaultCatalogName, settings.getBuiltInDatabaseName()); CatalogManager catalogManager = builder.defaultCatalog(defaultCatalogName, defaultCatalog).build();
GenericInMemoryCatalog 所有的数据都保存在 HashMap 里面,无法持久化。
JDBC Catalog
CREATE CATALOG my_catalog WITH( 'type' = 'jdbc', 'default-database' = '...', 'username' = '...', 'password' = '...', 'base-url' = '...' ); USE CATALOG my_catalog;
如果创建并使用 Postgres Catalog 或 MySQL Catalog,请配置 JDBC 连接器和相应的驱动。
JDBC Catalog 支持以下参数:
• name:必填,Catalog 的名称
• default-database:必填,默认要连接的数据库
• username:必填,Postgres/MySQL 账户的用户名
• password:必填,账户的密码
• base-url: 必填,(不应该包含数据库名)
对于 Postgres Catalog base-url 应为 "jdbc:postgresql://:" 的格式
对于 MySQL Catalog base-url 应为 "jdbc:mysql://:" 的格式
Hive Catalog
CREATE CATALOG myhive WITH ( 'type' = 'hive', 'default-database' = 'mydatabase', 'hive-conf-dir' = '/opt/hive-conf' ); -- set the HiveCatalog as the current catalog of the session USE CATALOG myhive;
Iceberg Catalog
● Hive Catalog 管理 Iceberg 表
(Flink) default_database.flink_table -> (Iceberg) default_database.flink_table CREATE TABLE flink_table ( id BIGINT, data STRING ) WITH ( 'connector'='iceberg', 'catalog-name'='hive_prod', 'uri'='thrift://localhost:9083', 'warehouse'='hdfs://nn:8020/path/to/warehouse' ); (Flink)default_database.flink_table -> (Iceberg) hive_db.hive_iceberg_table CREATE TABLE flink_table ( id BIGINT, data STRING ) WITH ( 'connector'='iceberg', 'catalog-name'='hive_prod', 'catalog-database'='hive_db', 'catalog-table'='hive_iceberg_table', 'uri'='thrift://localhost:9083', 'warehouse'='hdfs://nn:8020/path/to/warehouse' );
● Hadoop Catalog 管理 Iceberg 表
CREATE TABLE flink_table ( id BIGINT, data STRING ) WITH ( 'connector'='iceberg', 'catalog-name'='hadoop_prod', 'catalog-type'='hadoop', 'warehouse'='hdfs://nn:8020/path/to/warehouse' );
● 自定义 Catalog 管理 Iceberg 表
CREATE TABLE flink_table ( id BIGINT, data STRING ) WITH ( 'connector'='iceberg', 'catalog-name'='custom_prod', 'catalog-impl'='com.my.custom.CatalogImpl', -- More table properties for the customized catalog 'my-additional-catalog-config'='my-value', ... );
• connector:iceberg
• catalog-name:用户指定的目录名称,这是必须的,因为连接器没有任何默认值
• catalog-type:内置目录的 hive 或 hadoop(默认为hive),或者对于使用 catalog-impl 的自定义目录实现,不做设置
• catalog-impl:自定义目录实现的全限定类名,如果 catalog-type 没有被设置,则必须被设置,更多细节请参见自定义目录
• catalog-database: 后台目录中的 iceberg 数据库名称,默认使用当前的 Flink 数据库名称
• catalog-table: 后台目录中的冰山表名,默认使用 Flink CREATE TABLE 句子中的表名
Hudi Catalog
create catalog hudi with( 'type' = 'hudi', 'mode' = 'hms', 'hive.conf.dir'='/etc/hive/conf' ); --- 创建数据库供hudi使用 create database hudi.hudidb; --- order表 CREATE TABLE hudi.hudidb.orders_hudi( uuid INT, ts INT, num INT, PRIMARY KEY(uuid) NOT ENFORCED ) WITH ( 'connector' = 'hudi', 'table.type' = 'MERGE_ON_READ' ); select * from hudi.hudidb.orders_hudi;
Flink Catalog 在 ChunJun 中的实践
下面将为大家介绍本文的重头戏,Flink Catalog 在 ChunJun 中的实践之路。
直接引入开源 Catalog
ChunJun 目前的所有 Catalog 为以下四种:
● Hive Catalog 需要的依赖
● Iceberg Catalog 需要的依赖
● JDBC Catalog
JDBC 因为 Flink 1.12 和 1.13 API 有变化,因此需要涉及源码的改动,改动一些 API 后,从源码引入。
● DT Catalog
结合内部业务,自定义的一种 Catalog ,下文将会进行详细介绍。
DT Catalog -存储元数据表设计
● 创建 mysql 元数据表 database_info
-- 创建表的 sql create table database_info ( `id` bigint PRIMARY KEY NOT NULL AUTO_INCREMENT COMMENT '项目ID',-- database id `catalog_name` varchar(255) COMMENT 'catalog 名字', `database_name` varchar(255) COMMENT 'database 名字', `catalog_type` varchar(30) COMMENT 'catalog 类型, eg: mysql,oracle...', `project_id` int(11) NOT NULL COMMENT '项目ID', `tenant_id` int(11) NOT NULL COMMENT '租户ID' ) ENGINE = InnoDB DEFAULT CHARSET = utf8; -- 创建索引 CREATE INDEX idx_catalog_name_database_name_project_id_tenant_id ON database_info (`catalog_name`, `database_name`, `project_id`, `tenant_id`);
● 创建 mysql 元数据表 table_info
-- 创建表的 sql create table table_info ( `id` bigint PRIMARY KEY NOT NULL AUTO_INCREMENT, `database_id` bigint COMMENT 'database_info 表的 id', `table_name` varchar(255) COMMENT '表名', `project_id` int(11) NOT NULL COMMENT '项目ID', `tenant_id` int(11) NOT NULL COMMENT '租户ID' ) ENGINE = InnoDB DEFAULT CHARSET = utf8; -- 创建索引 CREATE INDEX idx_catalog_id_project_id_tenant_id ON table_info (`database_id`, `project_id`, `tenant_id`); CREATE INDEX idx_database_id_table_name_project_id_tenant_id ON table_info (`database_id`, `table_name`, `project_id`, `tenant_id`);
● 创建 mysql 元数据表 properties_info
create table properties_info ( `id` bigint PRIMARY KEY NOT NULL AUTO_INCREMENT , `table_id` bigint(20) COMMENT 'table_info 表的 id', `key` varchar(255) COMMENT '表的属性 key', `value` varchar(255) COMMENT '表的属性 value' ) ENGINE = InnoDB DEFAULT CHARSET = utf8; CREATE INDEX idx_table_id ON properties_info (table_id);
● properties_info 里面存了什么?
schema.0.name=id, schema.0.data-type=INT NOT NULL, schema.1.name=name, schema.1.data-type=VARCHAR(2147483647) schema.2.name=age, schema.2.data-type=BIGINT, schema.primary-key.name=PK_3386, schema.primary-key.columns=id, connector=jdbc, url=jdbc:mysql: //172.16.83.218:3306/wujuan?useSSL=false, username=drpeco, password=DT@Stack#123, comment=, scan.auto-commit=true, lookup.cache.max-rows=20000, scan.fetch-size=10, lookup.cache.ttl=700000 table-name=t2,
使用 DT Catalog
● 创建 DT Catalog
CREATE CATALOG catalog1 WITH ( 'type' = 'dt', 'default-database' = 'default_database', 'driver' = 'com.mysql.cj.jdbc.Driver', 'url' = 'jdbc:mysql://xxx:3306/catalog_default', 'username' = 'drpeco', 'password' = 'DT@Stack#123', 'project-id' = '1', 'tenant-id' = '1' );
● 创建 Database
DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ] Drop a database with the given database name. If the database to drop does not exist, an exception is thrown. IF EXISTS If the database does not exist, nothing happens. RESTRICT Dropping a non-empty database triggers an exception. Enabled by default. CASCADE Dropping a non-empty database also drops all associated tables and functions. create database if not exists catalog1.database1 drop database if exists catalog1.database1 -- 删除非空数据库,连通数据库中的所有表也一起删除 drop database if exists catalog1.database1 CASCADE
● 创建 Table
1)Rename Table
ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name Rename the given table name to another new table name
2)Set or Alter Table Properties
ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...) Set one or more properties in the specified table. If a particular property is already set in the table, override the old value with the new one.
-- 创建表 CREATE TABLE if not exists catalog1.default_database.table1 ( id int, name string, age bigint, primary key ( id) not enforced ) with ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://172.16.83.218:3306/wujuan?useSSL=false', 'table-name' = 't2', 'username' = 'drpeco', 'password' = 'DT@Stack#123' );
-- 删除表 drop table if exists mysql_catalog2.wujuan_database2.wujuan_table -- 重命名表名 ALTER TABLE catalog1.default_database.table1 RENAME TO table2; -- 设置表属性 ALTER TABLE catalog1.default_database.table1 SET ( 'tablename'='t2', 'url'='dbc:mysql://172.16.83.218:3306/wujuan?useSSL=false' )
使用 DTCatalog 的具体场景和实现原理
● 全部是 DDL,只有 Catalog 的创建
CREATE CATALOG catalog1 WITH ( 'type' = 'DT', 'default-database' = 'default_database', 'driver' = 'com.mysql.cj.jdbc.Driver', 'url' = 'jdbc:mysql://172.16.100.186:3306/catalog_default?autoReconnect=true&failOverReadOnly=false', 'username' = 'drpeco', 'password' = 'DT@Stack#123', 'project-id' = '1', 'tenant-id' = '1' ); ``` · 可以执行,但是没有意义,ChunJun 不会存储 Catalog 信息,只有平台存储; · 不支持语法校验。 ● 全部是 DDL,包含 Catalog、Database、Table 的创建
-- 初始化 Catalog CREATE CATALOG catalog1 WITH ( 'type' = 'dt', 'default-database' = 'default_database', 'driver' = 'com.mysql.cj.jdbc.Driver', 'url' = 'jdbc:mysql://172.16.100.186:3306/catalog_default', 'username' = 'drpeco', 'password' = 'DT@Stack#123', 'project-id' = '1', 'tenant-id' = '1' ); -- 创建数据库 create database if not exists database1 -- 创建表 CREATE TABLE if not exists catalog1.default_database.table1 ( id int, name string, age bigint, primary key ( id) not enforced ) with ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://172.16.83.218:3306/wujuan?useSSL=false', 'table-name' = 't2', 'username' = 'drpeco', 'password' = 'DT@Stack#123' );
· 无论创建数据库、表,删除数据库、表,必须包含 create catalog 语句; · 可以执行,可以创建数据库和表; · 不支持语法校验。
// 抛出异常的逻辑 StatementSet statementSet = SqlParser.parseSql(job, jarUrlList, tEnv); TableResult execute = statementSet.execute(); --> tableEnvironment.executeInternal(operations); --> Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, jobName); --> StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(getExecutionEnvironment(), transformations); -->
// 抛出异常的方法 public static StreamGraph generateStreamGraph(StreamExecutionEnvironment execEnv, List<Transformation<?>> transformations){ if (transformations.size() <= 0) { throw new IllegalStateException( "No operators defined in streaming topology. Cannot generate StreamGraph."); } ... return generator.generate(); }
// 如果没有 insert 语句的时候,无法生成 JobGraph,但是 DDL 是执行成功的。 // 因此捕获 FlinkX 抛出的特殊异常,此语句的异常 Message 是 FlinkX 里面处理的。 try { PackagedProgramUtils.createJobGraph(program, flinkConfig, 1, false); } catch (ProgramInvocationException e) { // 仅执行 DDL FlinkX 抛出的异常 if (!e.getMessage().contains("OnlyExecuteDDL")) { throw e; } }
![file](https://oscimg.oschina.net/oscnet/up-2367d5331dd105678c6ff710fe22ebe4949.png) ● DDL + DML,包含 create + insert 语句 1)初始化 Catalog
CREATE CATALOG catalog1 WITH ( 'type' = 'dt', 'default-database' = 'default_database', 'driver' = 'com.mysql.cj.jdbc.Driver', 'url' = 'jdbc:mysql://172.16.100.186:3306/catalog_default', 'username' = 'drpeco', 'password' = 'DT@Stack#123', 'project-id' = '1', 'tenant-id' = '1' );
2.1)创建数据库
create database if not exists database1
2.2)创建源表
CREATE TABLE if not exists catalog1.default_database.table1 ( id int, name string, age bigint, primary key ( id) not enforced ) with ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://172.16.83.218:3306/wujuan?useSSL=false', 'table-name' = 't2', 'username' = 'drpeco', 'password' = 'DT@Stack#123' );
3.1)创建数据库
create database if not exists catalog1.database2;
3.2)创建结果表
CREATE TABLE if not exists catalog1.database2.table2 ( id int, name string, age bigint, primary key ( id) not enforced ) with ( 'connector' = 'print' );
4)执行任务
insert into catalog1.database2.table2 select * from catalog1.database1.table1
· 不可以执行,可以提交; · 支持语法校验。 ● DML,只有 Insert 语句
-- 初始化 Catalog CREATE CATALOG catalog1 WITH ( 'type' = 'dt', 'default-database' = 'default_database', 'driver' = 'com.mysql.cj.jdbc.Driver', 'url' = 'jdbc:mysql://172.16.100.186:3306/catalog_default', 'username' = 'drpeco', 'password' = 'DT@Stack#123', 'project-id' = '1', 'tenant-id' = '1' );
-- 执行任务 insert into catalog1.database2.table2 select * from catalog1.database1.table1
· 如果 Catalog 的 数据库和表都已经创建好了,那么直接写 insert 就可以提交任务; · 不可以执行,可以提交; · 支持语法校验。 《数据治理行业实践白皮书》下载地址:https://fs80.cn/380a4b 想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=szkyzg 同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术qun」,交流最新开源技术信息,qun号码:30537511,项目地址:https://github.com/DTStack
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
基于FFmpeg和Wasm的Web端视频截帧方案
作者 |小萱 导读 基于实际业务需求,介绍了自定义Wasm截帧方案的实现原理和实现方案。解决传统的基于canvas的截帧方案所存在的问题,更高效灵活的实现截帧能力。 全文10103字,预计阅读时间26分钟。 01 项目背景 在视频编辑器里常见这样的功能,在用户上传完视频后抽取关键帧 ,提供给用户以便快捷选取封面,如下图: 在本文中,我们将探讨一种使用FFmpeg和WebAssembly(Wasm)的Web端视频截帧方案,以解决传统的基于canvas的截帧方案所存在的问题。通过采用这种新方法,我们可以克服video标签的限制,实现更高效、更灵活的视频截帧功能。 首先,我们需要了解一下传统的Web截帧方案的局限性。虽然该方案在处理一些常见的视频格式(如MP4、WebM和OGG)时表现良好,但其存在以下缺陷: 类型有限:video标签支持的视频格式十分有限,无法处理一些其他常见的视频格式,如FLV、MKV和AVI等。 DOM依赖:该方案依赖于DOM,只能在主线程中完成。这意味着在处理大量截帧任务时,可能会对页面性能产生负面影响。 抽帧策略局限:传统方案无法精确控制抽帧策只能传递时间交给浏览器...
- 下一篇
JeeSite V5.3.1 发布,BPM 多项更新,Java 快速开发平台
升级内容 升级 spring boot 2.7.10 新增 mybatis.scanTypeAliasesBasePackage 配置,减少启动时间 新增 JoinTable 的 lazy 懒加载属性,标记为懒加载的,默认不进行联表,当需要时再联表 新增 js.cookie 默认存 localStorage,可通过 window.cookieToLocalStorage 关闭默认 新增 oauth2.callbackUrl 回调后的跳转地址(可自定义vue版的账号绑定) 新增 子表编辑的另一种实现例子beetl(当点击行的时候开启编辑状态) 新增 BPM 多实例加减签,串行加减签、并行加减签 新增 BPM 模型的数据对象,在启动流程时作为默认流程变量 新增 BPM currentCmd 线程变量,并优化新增一些 CMD 的类型 优化 BPM 模型的字符串变量中包含逗号的时候自动转换为List,如果增加单引号或双引号,可强制为字符串 优化 BPM 下一步处理人逻辑,并行场景下不影响其他并行任务的节点;会签节点执行人与设置人数相等时自动设定每个会签人。 优化 BPM 退回、撤回、自由流跳转...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- Red5直播服务器,属于Java语言的直播服务器
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- CentOS6,CentOS7官方镜像安装Oracle11G
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2全家桶,快速入门学习开发网站教程
- SpringBoot2配置默认Tomcat设置,开启更多高级功能