Apache Doris 整合 FLINK CDC + Iceberg 构建实时湖仓一体的联邦查询
1概况
本文展示如何使用 Flink CDC + Iceberg + Doris 构建实时湖仓一体的联邦查询分析,Doris 1.1版本提供了Iceberg的支持,本文主要展示Doris和Iceberg怎么使用,大家按照步骤可以一步步完成。完整体验整个搭建操作的过程。
2系统架构
我们整理架构图如下,
Doris湖仓一体的联邦查询架构如下:
3 创建MySQL数据库表并初始化数据
CREATE DATABASE demo; USE demo; CREATE TABLE userinfo ( id int NOT NULL AUTO_INCREMENT, name VARCHAR(255) NOT NULL DEFAULT 'flink', address VARCHAR(1024), phone_number VARCHAR(512), email VARCHAR(255), PRIMARY KEY (`id`) )ENGINE=InnoDB ; INSERT INTO userinfo VALUES (10001,'user_110','Shanghai','13347420870', NULL); INSERT INTO userinfo VALUES (10002,'user_111','xian','13347420870', NULL); INSERT INTO userinfo VALUES (10003,'user_112','beijing','13347420870', NULL); INSERT INTO userinfo VALUES (10004,'user_113','shenzheng','13347420870', NULL); INSERT INTO userinfo VALUES (10005,'user_114','hangzhou','13347420870', NULL); INSERT INTO userinfo VALUES (10006,'user_115','guizhou','13347420870', NULL); INSERT INTO userinfo VALUES (10007,'user_116','chengdu','13347420870', NULL); INSERT INTO userinfo VALUES (10008,'user_117','guangzhou','13347420870', NULL); INSERT INTO userinfo VALUES (10009,'user_118','xian','13347420870', NULL);
4 创建Iceberg Catalog
CREATE CATALOG hive_catalog WITH ( 'type'='iceberg', 'catalog-type'='hive', 'uri'='thrift://localhost:9083', 'clients'='5', 'property-version'='1', 'warehouse'='hdfs://localhost:8020/user/hive/warehouse' );
5 创建 Mysql CDC 表
CREATE TABLE user_source ( database_name STRING METADATA VIRTUAL, table_name STRING METADATA VIRTUAL, `id` DECIMAL(20, 0) NOT NULL, name STRING, address STRING, phone_number STRING, email STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = 'MyNewPass4!', 'database-name' = 'demo', 'table-name' = 'userinfo' );
6 创建Iceberg表
---查看catalog show catalogs; ---使用catalog use catalog hive_catalog; --创建数据库 CREATE DATABASE iceberg_hive; --使用数据库 use iceberg_hive;
7 创建表
CREATE TABLE all_users_info ( database_name STRING, table_name STRING, `id` DECIMAL(20, 0) NOT NULL, name STRING, address STRING, phone_number STRING, email STRING, PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED ) WITH ( 'catalog-type'='hive' );
从CDC表里插入数据到Iceberg表里
use catalog default_catalog; insert into hive_catalog.iceberg_hive.all_users_info select * from user_source;
我们去查询iceberg表
select * from hive_catalog.iceberg_hive.all_users_info
8 Doris 查询 Iceberg
8.1 创建Iceberg外表
CREATE TABLE `all_users_info` ENGINE = ICEBERG PROPERTIES ( "iceberg.database" = "iceberg_hive", "iceberg.table" = "all_users_info", "iceberg.hive.metastore.uris" = "thrift://localhost:9083", "iceberg.catalog.type" = "HIVE_CATALOG" );
参数说明
iceberg.hive.metastore.uris
:Hive Metastore 服务地址 iceberg.database
:挂载 Iceberg 对应的数据库名 iceberg.table
:挂载 Iceberg 对应的表名,挂载 Iceberg database 时无需指定。 iceberg.catalog.type
:Iceberg 中使用的 catalog 方式,默认为 HIVE_CATALOG
,当前仅支持该方式,后续会支持更多的 Iceberg catalog 接入方式。 mysql> CREATE TABLE `all_users_info` -> ENGINE = ICEBERG -> PROPERTIES ( -> "iceberg.database" = "iceberg_hive", -> "iceberg.table" = "all_users_info", -> "iceberg.hive.metastore.uris" = "thrift://localhost:9083", -> "iceberg.catalog.type" = "HIVE_CATALOG" -> ); Query OK, 0 rows affected (0.23 sec) mysql> select * from all_users_info; +---------------+------------+-------+----------+-----------+--------------+-------+ | database_name | table_name | id | name | address | phone_number | email | +---------------+------------+-------+----------+-----------+--------------+-------+ | demo | userinfo | 10004 | user_113 | shenzheng | 13347420870 | NULL | | demo | userinfo | 10005 | user_114 | hangzhou | 13347420870 | NULL | | demo | userinfo | 10002 | user_111 | xian | 13347420870 | NULL | | demo | userinfo | 10003 | user_112 | beijing | 13347420870 | NULL | | demo | userinfo | 10001 | user_110 | Shanghai | 13347420870 | NULL | | demo | userinfo | 10008 | user_117 | guangzhou | 13347420870 | NULL | | demo | userinfo | 10009 | user_118 | xian | 13347420870 | NULL | | demo | userinfo | 10006 | user_115 | guizhou | 13347420870 | NULL | | demo | userinfo | 10007 | user_116 | chengdu | 13347420870 | NULL | +---------------+------------+-------+----------+-----------+--------------+-------+ 9 rows in set (0.18 sec)
上述Doris On Iceberg我们只演示了Iceberg单表的查询,你还可以联合Doris的表,或者其他的ODBC外表,Hive外表,ES外表等进行联合查询分析,通过Doris对外提供统一的查询分析入口。
自此我们完整从搭建Hadoop,hive、flink 、Mysql、Doris 及Doris On Iceberg的使用全部介绍完了,Doris朝着数据仓库和数据融合的架构演进,支持湖仓一体的联邦查询,给我们的开发带来更多的便利,更高效的开发,省去了很多数据同步的繁琐工作。
作者:京东零售 吴化斌
来源:京东云开发者社区 转载请注明来源

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
关于「日志采样」的一些思考及实践
一、背景: 系统日志可用于追踪用户操作轨迹,异常情况下,合理的日志有助于快速排查、定位问题,毫无疑问,打印日志对于系统是很重要的。 当业务规模较小时,大家都倾向于享受日志带来的便利,从而忽略日志带来的潜在的负面影响,缺乏对日志的管控。在JD当前用户量、业务规模下,绝大多数C端系统、甚至B端系统都是高吞吐的,毫无疑问,过大的日志量对系统的性能、磁盘IO有着显著负面影响,赶上大促时,问题尤为突出。日志在为我们提供便利的同时,也无时无刻成为一根刺,时不时刺我们一下。 作为一个共性问题,由于集团暂没推出统一的日志框架,不少团队都会尝试基于log4j、logback 进行轻度的封装,通过跟配置中心联动,增加一些诸如 '动态降级' 的功能、来缓解日志带来的负面影响。降级带来的效果是显著的,但同时也让系统丧失了记录 '操作轨迹' 的能力,从而又带来了新的问题。 此时,很容易想到,可以通过对 '请求' 采样,实现请求日志的采样输出,并通过控制采样比例平衡不同场景下日志对性能的影响,系统吞吐量较大时,降级采样比例,系统吞吐量较低时,提高采样比例。 二、正文: Ⅰ 在请求入口处,通过一定的采样算法,计...
- 下一篇
面试官:你能简单聊聊MyBatis执行流程
本文分享自华为云社区《面试必问|聊聊MyBatis执行流程?》,作者: 冰 河。 MyBatis源码解析 大家应该都知道Mybatis源码也是对Jbdc的再一次封装,不管怎么进行包装,还是会有获取链接、preparedStatement、封装参数、执行这些步骤的。 配置解析过程 String resource = "mybatis-config.xml"; //1.读取resources下面的mybatis-config.xml文件 InputStream inputStream = Resources.getResourceAsStream(resource); //2.使用SqlSessionFactoryBuilder创建SqlSessionFactory SqlSessionFactory sqlSessionFactory = new SqlSessionFactoryBuilder().build(inputStream); //3.通过sqlSessionFactory创建SqlSession SqlSession sqlSession = sqlSessio...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS7设置SWAP分区,小内存服务器的救世主
- Mario游戏-低调大师作品
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- 2048小游戏-低调大师作品
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS8安装Docker,最新的服务器搭配容器使用
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题