正式开源:使用Kafka FDW 加载数据到 Apache Cloudberry™
Apache Cloudberry™ (Incubating) 由 Greenplum Database 核心开发者创建,是一款领先且成熟的开源大规模并行处理(Massively Parallel Processing,MPP)数据库。它基于开源版的 Pivotal Greenplum Database® 衍生而来,但采用了更新的 PostgreSQL 内核,并具备更先进的企业级功能。Cloudberry 可以作为数据仓库使用,也很适合大规模分析和 AI/ML 工作负载。
GitHub: https://github.com/apache/cloudberry
Hi 社区小伙伴们!酷克数据 HashData 宣布推出 Apache Cloudberry 生态组件 Kafka FDW。Kafka FDW 基于社区已有的 Kafka Foreign Data Wrapper for PostgreSQL(https://github.com/adjust/kafka_fdw),并针对 Apache Cloudberry 做了适配支持和优化。
作为 Apache Cloudberry™ 开源社区的一员,酷克数据始终在为推动 Cloudberry 的生态完善贡献力量。Kafka FDW 新组件将进一步增强 Cloudberry 在大数据流处理方面的能力,帮助大家更高效地应对实时数据处理的挑战,并轻松实现高性能的数据库扩展解决方案。
-
GitHub 地址:https://github.com/cloudberry-contrib/kafka_fdw
-
使用文档:https://cloudberry.apache.org/zh/docs/data-loading/load-data-from-kafka-using-fdw/
从 Kafka 加载数据入 Cloudberry
Cloudberry 提供了多种数据加载方案,支持本地文件、Web 服务等数据源,通过将外部数据转换为外部表,然后从这些外部表中读取或写入数据以实现外部数据的加载。
Apache Kafka 已成为许多企业数据流架构的核心,而如何将 Kafka 中的实时数据导入数据库,成为了一个常见的挑战。Kafka Foreign Data Wrapper(简称 Kafka FDW)解决了这个问题,提供了 Cloudberry 与 Apache Kafka 连接的能力,使得数据库可以直接从 Kafka 中读取数据,并将其作为外部表处理。Cloudberry 用户可以更高效、灵活、可靠地处理 Kafka 中的实时数据,从而提高数据处理能力和业务效率。
主要特点
-
直接从 Kafka 中读取数据:Cloudberry 用户可以轻松将 Kafka 中的数据加载到数据库中,并使用 SQL 查询进行处理。
-
高效灵活的数据处理:Kafka FDW 允许用户通过指定分区和偏移量来精准查询 Kafka 中的数据。
-
数据导入与生产功能:除了读取数据,Kafka FDW 还支持向外部表插入数据,使得 Cloudberry 用户可以同时充当 Kafka 消息的生产者。
基本使用
-
创建插件。
postgres=# create extension kafka_fdw;
-
创建外部服务器,指定 Kafka 的集群地址。你需要将 localhost:9092 替换为你的 Kafka 集群地址。
CREATE SERVER kafka_server
FOREIGN DATA WRAPPER kafka_fdw
OPTIONS (mpp_execute 'all segments', brokers 'localhost:9092');
注:需要在语句中指定 mpp_execute 'all segments' 参数
-
创建 user mapping。
CREATE USER MAPPING FOR PUBLIC SERVER kafka_server;
-
创建外部表
创建外部表时,必须指定两个元数据信息列 partition 和 offset,用于标识 Kafka 中的一个 Topic 的消息所属的分区和偏移。下面是一个示例:
CREATE FOREIGN TABLE kafka_test (
part int OPTIONS (partition 'true'),
offs bigint OPTIONS (offset 'true'),
some_int int,
some_text text,
some_date date,
some_time timestamp
)
SERVER kafka_server OPTIONS
(format 'csv', topic 'contrib_regress_csv', batch_size '1000', buffer_delay '1000');
参数说明:
-
batch_size:从 Kafka 读取一次数据的量。
-
buffer_delay:从 Kafka 获取数据的超时时间。
支持的数据格式
目前支持 CSV 和 JSON 两种数据格式。
查询
可以在查询的时候指定消息的分区和偏移,指定 partition 或 offset:
SELECT * FROM kafka_test WHERE part = 0 AND offs > 1000 LIMIT 60;
也可以指定多个条件:
SELECT * FROM kafka_test WHERE (part = 0 AND offs > 100) OR (part = 1 AND offs > 300) OR (part = 3 AND offs > 700);
消息生产者
目前 Kafka FDW 支持向外表中插入数据,即作为了 Kafka 的消息生产者。只需要使用 INSERT 语句即可。
INSERT INTO kafka_test(part, some_int, some_text)
VALUES
(0, 5464565, 'some text goes into partition 0'),
(1, 5464565, 'some text goes into partition 1'),
(0, 5464565, 'some text goes into partition 0'),
(3, 5464565, 'some text goes into partition 3'),
(NULL, 5464565, 'some text goes into partition selected by kafka');
插入的时候可以指定 partition 表示插入到哪个分区。
数据导入功能
如果想要通过 kafka FDW 实现类似数据导入的功能,你可以通过自定义函数来实现,例如 insert into select 语句,基本原理是将外表中的所有数据依次取出来插入到目标表中。
下面是一个简单的示例,你可以根据实际情况对此函数进行修改:
CREATE OR REPLACE FUNCTION import_kafka_data(
src_table_name text,
dest_table_name text,
dest_table_columns text[]
) RETURNS void AS $$
DECLARE
current_row RECORD;
current_part integer;
current_offs bigint;
max_off bigint;
import_progress_table_name text;
max_off_result bigint;
BEGIN
import_progress_table_name := src_table_name || '_import_progress';
-- 创建进度记录表
EXECUTE FORMAT('CREATE TABLE IF NOT EXISTS %I (part integer PRIMARY KEY, offs bigint NOT NULL)', import_progress_table_name);
-- 表的 topic 的 partition 数量有可能发生变化,所以每次导入前都要重新初始化
EXECUTE FORMAT('INSERT INTO %I SELECT DISTINCT part, 0 FROM %I ON CONFLICT (part) DO NOTHING', import_progress_table_name, src_table_name);
-- 逐个分区导入数据
FOR current_row IN
EXECUTE FORMAT('SELECT part, offs FROM %I', import_progress_table_name)
LOOP
current_part := current_row.part;
current_offs := current_row.offs;
-- 获取当前分区的最大 offset
EXECUTE FORMAT('SELECT MAX(offs) FROM %I WHERE part = %s', src_table_name, current_part) INTO max_off_result;
max_off := max_off_result;
-- 没有新数据跳过
IF max_off+1 = current_offs THEN
CONTINUE;
END IF;
-- 导入数据
EXECUTE FORMAT('
INSERT INTO %I (%s)
SELECT %s
FROM %I
WHERE part = %s AND offs >= %s AND offs <= %s',
dest_table_name,
array_to_string(dest_table_columns, ', '),
array_to_string(dest_table_columns, ', '),
src_table_name,
current_part,
current_offs,
max_off
);
-- 更新导入进度
EXECUTE FORMAT('UPDATE %I SET offs = %s WHERE part = %s', import_progress_table_name, max_off + 1, current_part);
END LOOP;
RETURN;
END;
$$ LANGUAGE plpgsql;
执行的时候只需要调用这个函数,传入外表名称、目标表名称、需要导入的字段即可,如下:
SELECT import_kafka_data('kafka_test', 'dest_table_fdw', ARRAY['some_int', 'some_text', 'some_date', 'some_time']);
定时导入
如果想要一个定时任务后台执行导入数据,可以使用 Cloudberry 中的 Task 功能(v1.4.0 及之后版本可用),定期执行导入函数。
CREATE TASK import_kafka_data schedule '1 seconds' AS $$SELECT import_kafka_data('kafka_test', 'dest_table_fdw', ARRAY['some_int', 'some_text', 'some_date', 'some_time']);$$;
在上面的例子中,每秒调度一次导入数据的函数,这样就可以基本实现不间断的使用 FDW,将源外表中的数据导入到目标表中。
期待大家积极分享使用体验与意见建议!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
TinyVue自动导入插件重大升级,@opentiny/unplugin-tiny-vue1.0.0版本开启单组件包引入新模式
近日,@opentiny/unplugin-tiny-vue 发布了其重要的1.0.0版本更新,此次更新带来了全新的单组件包引入模式,显著提升了开发者的本地编译效率以及整个项目的构建速度。 在之前的版本中,开发者需要一次性加载整个TinyVue 库,这个不仅增加了初始时间,也使得项目构建过程变得相对缓慢。然而,随着新版本的发布,这一问题得到了有效的解决。通过引入单组件包模式,开发者现在可以根据实际需求选择性地加载所需的组件,从而极大地减少了不必要的资源消耗,提高了应用的响应速度和整体性能。 具体来说,当开发者使用 @opentiny/unplugin-tiny-vue 的新功能时,他们可以指定仅加载那些在当前项目中真正被使用的组件。这意味着,对于大型项目而言,这种按需加载的方式能够大幅度减少构建时间和内存占用,进而为开发者提供了更加流畅的工作体验。 接下来我们一起来看看如何使用~ 安装配置 安装 npm i @opentiny/unplugin-tiny-vue -D 配置 单组件按需引入(推荐用法,可以加快编译和构建速度) 例如:TinyVueSingleResolver('Tiny...
- 下一篇
行云前端重构之路:从单体应用到 Monorepo 的血泪史
一、行云2.0的开篇 话说天下大势,合久必分,分久必合。 在行云2.0时代,一个原本平平无奇的业务工程,宛如一颗迅速膨胀的种子,短短两三个月,便摇身一变,成为容纳百十来个子应用的庞大“生态系统”。这些子应用来自五湖四海,各自施展浑身解数,为JDer们提供琳琅满目的产品功能,无论是与产研紧密相关,还是关联性稍弱的功能,皆涵盖其中。 作为基于Vue搭建的平台,不仅能够跨技术栈加载非Vue技术栈的应用,对于vue技术栈的应用还无私地奉献出了全局共享的Vue实例、router、vuex等等,同时连带全局组件库以及axios实例的分发,为依赖Vue技术栈的子应用提供全方位的支持,尽心哺育着有全局依赖需要的子子孙孙们。 二、回溯往昔:架构困境初现 不过,不管您用没用过这些出色的产品工具们,咱今天都不打算讨论子应用们,而是转回头审视行云前端工程最初的架构模样,着实还是有些惨不忍睹的。 惨在何处呢,我们粗略地说来: 1.代码结构混沌:平台与业务代码纠葛不清 虽设有/modules目录用以存放不同业务域的代码,然而,文件夹之间组件引用关系错综复杂。以实际场景为例,在A业务模块开发过程中,开发人员可能因便...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- CentOS8安装Docker,最新的服务器搭配容器使用
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2整合Redis,开启缓存,提高访问速度
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- CentOS7设置SWAP分区,小内存服务器的救世主
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- SpringBoot2全家桶,快速入门学习开发网站教程