使用 bend-ingest-kafka 将数据流实时导入到 Databend
作者:韩山杰
Databend Cloud 研发工程师
Databend是一个开源、高性能、低成本易于扩展的新一代云数据仓库。bend-ingest-kafka 是一个专为 Databend 设计的实时数据导入工具,它允许用户从 Apache Kafka 直接将数据流导入到 Databend 中,实现数据的实时分析和处理。
为什么选择bend-ingest-kafka?
- 实时性: 能够实时地从 Kafka 中读取数据并导入到 Databend。
- 高吞吐量: 支持高并发的数据导入,满足大规模数据处理的需求。
- 易用性: 提供了简单直观的配置方式,便于用户快速上手。
- 灵活性: 可二次开发支持多种数据格式和自定义转换逻辑。
环境准备
在使用 bend-ingest-kafka 之前,需要确保以下环境已经搭建好:
- 一个运行中的 Databend 实例或者在 Databend Cloud 中创建一个 warehouse(推荐)。
- 一个配置好的 Apache Kafka 集群。
- 已经安装的 bend-ingest-kafka。
快速开始
Step 1: 安装 bend-ingest-kafka
可以从 Databend 的官方 GitHub 仓库 release 页面 下载对应 OS 架构的 bend-ingest-kafka 的可执行二进制文件,或者直接执行命令安装最新版本。
go install github.com/databendcloud/bend-ingest-kafka[@latest](https://my.oschina.net/u/4418429)
Step 2: 配置 bend-ingest-kafka
配置文件通常包括 Kafka 的连接以及配置信息、Databend 的连接信息以及数据转换的逻辑。以下是一个简单的配置示例:
{ "kafkaBootstrapServers": "localhost:9092", "kafkaTopic": "ingest_test", "KafkaConsumerGroup": "test", "mockData": "", "isJsonTransform": false, "databendDSN": "https://cloudapp:password@tn3ftqihs--medium-p8at.gw.aws-us-east-2.default.databend.com:443", "databendTable": "default.kfk_test", "batchSize": 10, "batchMaxInterval": 5, "dataFormat": "json", "workers": 1, "copyPurge": false, "copyForce": false, "disableVariantCheck": true, "minBytes": 1024, "maxBytes": 1048576, "maxWait": 10, "useReplaceMode": false, "userStage": "~" }
具体的配置参数可以参考 Parameter References,这里对几个比较重要的参数展开解释。
- isJsonTransform: 默认为
true
,将 Kafka Json 数据逐字段转换为 Databend 表数据。通过设置isJsonTransform
为 true 来使用此模式。如果设置为false
的话,系统将在 Databend 中自动创建一个 raw table, 列包括 (uuid, koffset, kpartition, raw_data, record_metadata, add_time),并将原始数据导入此表。其中raw_data
为导入的 kafka Json 数据,record_metadata
包含了本条数据的 kafka 元信息 -topic
,partition
,offset
,create_time
,key
,方便用户查询。 - useReplaceMode:
useReplaceMode
是一种去重模式,开启后如果表中已存在数据,新数据将替换旧数据。但 useReplaceMode 仅在 isJsonTransform 为 false 时支持,因为它需要在目标表中添加 koffset 和 kpartition 字段。在这种模式下,系统可以实现exactly once
的同步语义,否则为at-least-once
语义。 - userStage: 用户的自定义 external stage name。
Step 3: 启动数据导入
这里使用 raw-data
模式作演示。
Kafka 的 Json 数据示例为:
{"i64": 10,"u64": 30,"f64": 20,"s": "hao","s2": "hello","a16":[1],"a8":[2],"d": "2011-03-06","t": "2016-04-04 11:30:00"}
模拟 kafka 生产数据
可以使用下面的脚本快速生成 kafka json 数据:
from confluent_kafka import Producer # 创建一个Producer实例 p = Producer({'bootstrap.servers': 'localhost:9092'}) for i in range(1000000): json_data = '{"i64": 10,"u64": 30,"f64": 20,"s": "hao","s2": "hello","a16":[1],"a8":[2],"d": "2011-03-06","t": "2016-04-04 11:30:00"}' p.produce('ingest_test', json_data) print(i) p.flush()
使用配置文件启动 bend-ingest-kafka
默认读取 ./config/conf.json
配置文件,开始将 Kafka 中的数据导入到 Databend。
./bend-ingest-kafka
启动后可以看到 log 和 metrics:
到 Databend 中可以查询到已经同步的数据:
由于 raw_data
和 record_metadata
的字段格式都是 JSON ,所以可以很灵活地做一些数据分析:
select record_metadata['partition'] p, min(record_metadata['offset']::bigint) o1, max(record_metadata['offset']::bigint) o2, o2-o1+1 sub_count, count(distinct record_metadata['offset']) distinct_cnt, count(1) cnt from default.kfk_test group by p order by p;
高级特性
- 错误处理: 能够处理数据导入过程中的异常,并提供重试机制。
- 监控与日志: 提供详细的日志记录和监控指标,方便跟踪数据导入的状态。
结语
bend-ingest-kafka 作为一个强大的工具,为 Databend 用户提供了从 Kafka 实时导入数据的能力。通过本文的介绍,用户应该能够快速上手并利用这个工具来实现实时数据处理的需求。
关于 Databend
Databend 是一款开源、弹性、低成本,基于对象存储也可以做实时分析的新式数仓。期待您的关注,一起探索云原生数仓解决方案,打造新一代开源 Data Cloud。
👨💻 Databend Cloud:https://databend.cn
📖 Databend 文档:https://docs.databend.cn/
💻 Wechat:Databend

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
吵了6年的数据库话题,会在冯若航这里终结吗?
谈及在 k8s/Docker 上部署数据库服务时,业界意见分歧显著,形成了一场围绕“数据库容器化”的持久辩论。 一方面,支持者强调 k8s 在提供环境无关性、自动化运维及资源优化方面的潜力;另一方面,反对者则担忧数据库的特殊需求与 k8s 的设计理念存在冲突,可能导致安全、性能及成本效率上的挑战。 本文跟踪了一下这场持续六七年的辩论,从 Mikhail Chinkov 与王渊命的早期论战,到王竹锋、姜承尧对于在 k8s 上部署数据库的必要性思考,以及冯若航提到的可能面临的“双输”情况,来看看数据库容器化在 k8s 生态中的争议与前景。 第一回合:Mikhail Chinkov VS. 王渊命:数据库容器化合理吗? 2017 年,Docker 宣布完成 7500 万美元融资,其总市值达到了 13 亿美元,这标志着容器技术在 IT 领领域的巨大潜力和影响力。当时Docker 的发展如日中天,几乎成了容器的代名词。 与此同时,一篇《为什么数据库不适合容器》(中文版:数据库不适合Docker及容器化的7大原因)引发了一场技术圈内的激烈辩论。作者 Mikhail Chinkov 提出了数据库容器...
- 下一篇
高效存储的秘诀:bitmap 数据结构在标签中的应用
在当今大数据和信息爆炸的时代,如何有效地管理和查询海量的数据成为了企业和开发者面临的重大挑战。其中,标签系统作为数据管理中的一种重要手段,被广泛应用于用户画像、商品分类、内容推荐等多个场景。然而,随着标签数量的急剧增加,传统的数据存储和查询方式已难以满足高效率、低延迟的需求。在这种背景下,Bitmap 数据结构作为一种高效的位级数据处理技术,开始在标签系统中展现出其独特的优势。 通过本文将会分享 Bitmap 方案在标签中的应用实践。 标签和群组 标签 标签用于描述一组具有相同特征的实体对象(例如实体可以是用户,对象就是具体的某一个人)。它能直接关联到具体的数据项、文档、产品、用户行为等,以实现快速检索、过滤和分类的目的。标签常被用于用户画像构建、行为分析以及个性化推荐,通过分析用户与标签的互动,系统能更精准地理解用户需求。 标签系统的核心在于通过简单直接的关键词关联,提高信息的可发现性和管理效率,适应数字化时代信息海量增长的需求。 群组 群组提供了一种结构化的方式来整理和操作具有共同属性的标签集合,将具有相似特征、类别或关系的标签集合在一起,这种结构允许用户更高效地管理和操作相关联的...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS8安装Docker,最新的服务器搭配容器使用
- Linux系统CentOS6、CentOS7手动修改IP地址
- 2048小游戏-低调大师作品
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- CentOS8编译安装MySQL8.0.19
- CentOS6,CentOS7官方镜像安装Oracle11G
- CentOS7,8上快速安装Gitea,搭建Git服务器
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- MySQL8.0.19开启GTID主从同步CentOS8