从 AutoMQ Kafka 导出数据到 Databend
作者:韩山杰
Databend Cloud 研发工程师
Databend是使用 Rust 研发、开源的、完全面向云架构、基于对象存储构建的新一代云原生数据仓库,为企业提供湖仓一体化、计 算和存储分离的大数据分析平台。
本文将介绍如何通过 bend-ingest-kafka 将数据从 AutoMQ for Kafka 导入 Databend。
本文中提及的 AutoMQ Kafka 术语,均特指安托盟丘(杭州)科技有限公司通过 GitHub AutoMQ 组织下开源的 automq-for-kafka 项目。
环境准备
准备 Databend Cloud 以及测试数据
首先到 Databend Cloud 开启你的 Warehouse ,并在 worksheet 中创建数据库库和测试表:
create database automq_db; create table users ( id bigint NOT NULL, name string NOT NULL, ts timestamp, status string );
准备 AutoMQ Kafka 环境和测试数据
参考 部署 AutoMQ 到 AWS▸ 部署好 AutoMQ Kafka 集群,确保 AutoMQ Kafka 与 StarRocks 之间保持网络连通。
在AutoMQ Kafka中快速创建一个名为 example_topic 的主题并向其中写入一条测试 JSON 数据,可以通过以下步骤实现:
创建Topic:
使用 Apache Kafka 命令行工具来创建主题。你需要有 Kafka 环境的访问权限,并且确保 Kafka 服务正在运行。以下是创建主题的命令:
./kafka-topics.sh --create --topic exampleto_topic --bootstrap-server 10.0.96.4:9092 --partitions 1 --replication-factor 1
注意:执行命令时,需要将 topic 和 bootstarp-server 替换为实际使用的 Kafka 服务器地址。
创建 topic 之后可以用以下命令检查 topic 创建的结果。
./kafka-topics.sh --describe example_topic --bootstrap-server 10.0.96.4:9092
生成测试数据:
生成一条简单的 JSON 格式的测试数据,和前文的表需要对应。
{ "id":1, "name":"Test User", "ts":"2023-11-10T12:00:00", "status":"active" }
写入测试数据
使用 Kafka 的命令行工具或者编程方式将测试数据写入到 example_topic
。以下是使用命令行工具的一个示例:
echo '{"id": 1, "name": "测试用户", "ts": "2023-11-10T12:00:00", "status": "active"}' | sh kafka-console-producer.sh --broker-list 10.0.96.4:9092 --topic example_topic
使用如下命令可以查看刚写入的 topic 数据:
sh kafka-console-consumer.sh --bootstrap-server 10.0.96.4:9092 --topic example_topic --from-beginning
创建 bend-ingest-databend job
bend-ingest-kafka 能够监控 kafka 并将数据批量写入 Databend Table。
部署 bend-ingest-kafka
之后,即可开启数据导入 job。
bend-ingest-kafka --kafka-bootstrap-servers="localhost:9094" --kafka-topic="example_topic" --kafka-consumer-group="Consumer Group" --databend-dsn="https://cloudapp:password@host:443" --databend-table="automq_db.users" --data-format="json" --batch-size=5 --batch-max-interval=30s
注意:将 kafka_broker_list 替换为实际使用的 Kafka 服务器地址。
参数说明
databend-dsn
Databend Cloud 提供的连接到 warehouse 的 DSN,可以参考该文档 获取。
batch-size
bend-ingest-kafka
会积攒到 batch-size 条数据再触发一次数据同步。
验证数据导入
到 Databend Cloud worksheet 中查询 automq_db.users
表,可以看到数据已经从 AutoMq 同步到 Databend Table。
关于 Databend
Databend 是一款开源、弹性、低成本,基于对象存储也可以做实时分析的新式数仓。期待您的关注,一起探索云原生数仓解决方案,打造新一代开源 Data Cloud。
👨💻 Databend Cloud:databend.cn
📖 Databend 文档:databend.rs/
💻 Wechat:Databend
✨ GitHub:github.com/datafuselab…

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
作业帮基于 DolphinScheduler 的数据开发平台实践
摘要 随着任务数量、任务类型需求不断增长,对我们的数据开发平台提出了更高的要求。本文主要分享我们将调度引擎升级到 Apache DolphinScheduler 的实践经验,以及对数据开发平台的一些思考。 1. 背景 首先介绍下我们的大数据平台架构: 数据计算层承接了全公司的数据开发需求,负责运行各类指标计算任务。 其中批计算任务运行在 UDA 数据开发平台,支持任务全链路的开发场景:开发、调试、环境隔离、运维、监控。这些功能的支持、任务的稳定运行,强依赖底层的调度系统。 原有调度系统是 2015 年 (抑或更早) 自研的,随着任务类型新增、任务数量增多,暴露出诸多问题: 稳定性:频繁出现 mysql 连接不释放、锁超时等问题;数据库压力进一步导致调度性能瓶颈,任务无法及时调度。 可维护性:核心调度器通过 php 开发,代码古老又经历多次交接,外围模块实现时采用了 go java python 多种语言;再加上功能上也存在单点,维护成本很高。 扩展性:业务高速发展,不同任务类型需求越来越多,但是调度作为底层服务在支撑上一直力不从心。 可观测性:由于是定时nohup启动任务进程的方式,经...
- 下一篇
银联商务:基于 Apache Doris 升级数据平台,查询提速 10-15 倍,挖掘增长新机遇
本文导读: 在长期服务广大规模商户的过程中,银联商务已沉淀了庞大、真实、优质的数据资产数据,这些数据不仅是银联商务开启新增长曲线的基础,更是进一步服务好商户的关键支撑。为更好提供数据服务,银联商务实现了从 Hadoop 到 Apache Doris 的架构升级,使数据导入性能提升 2-5 倍、ETL 场景性能提升 3-12 倍、查询分析响应速度提升 10-15 倍,满足大规模数据导入和实时极速查询的业务需求,解决了业务和数据快速增长问题,提升了数据应用构建的效率,充分助力业务提效与数字资产的服务化,推进数字化进程的落地,展示了 Apache Doris 在推动金融科技创新方面的巨大潜力。 作者:银联商务 杨劲雄、周阳 银联商务是国内大型的非银行支付机构,提供以银行卡收单、网络支付为基础的综合支付服务,以及多样化和专业化的商户增值和科技创新服务,始终致力于为商户、合作伙伴及消费者构筑普惠、便捷、高效、安全的支付环境。截至 2023 年 12月,银联商务已累计服务大中型、知名企业在内的各类商户超过 2500 万家,累计铺设终端超 4000 万台,实体服务网络覆盖中国大陆所有地级以上城市及港...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Docker安装Oracle12C,快速搭建Oracle学习环境
- CentOS7安装Docker,走上虚拟化容器引擎之路
- Linux系统CentOS6、CentOS7手动修改IP地址
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- CentOS关闭SELinux安全模块
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Hadoop3单机部署,实现最简伪集群
- CentOS6,7,8上安装Nginx,支持https2.0的开启