在 Confluent Cloud 上使用 Databend Kafka Connect 构建实时数据流同步
作者:韩山杰
Databend Cloud 研发工程师
Confluent Cloud
Confluent Cloud 是由 Confluent 公司提供的云服务,它是基于 Apache Kafka 的企业级事件流平台是由 Confluent 公司提供的云服务,它是基于 Apache Kafka 的企业级事件流平台,允许用户轻松构建和管理分布式流处理应用。Confluent 由 Apache Kafka 的原始创建者创立,专注于提供围绕 Kafka 技术的产品和服务 。
Confluent Cloud 提供的主要优势包括简化的部署和管理,高可用性,以及自动扩展能力。它是一个云端托管服务,是 Confluent Enterprise 的云端版本,增加了云端管理控制台的组件,使得用户无需担心底层基础设施的维护和扩展问题,可以更专注于业务逻辑的实现 。
Confluent Cloud 与其他 Confluent 产品一样,提供了一系列的工具和服务,例如 Kafka Connect 用于连接外部系统,Schema Registry 用于管理数据格式的变更,以及 KSQL 用于流处理查询等。这些工具和服务帮助用户构建统一而灵活的数据流平台,实现数据的实时处理和分析 。
此外,Confluent Cloud 还提供了不同层级的服务,根据可用性、安全等企业特性分为 Basic、Standard 和 Dedicated 三个版本,支持按需创建资源,并且按量收费,为用户提供了灵活的付费选项。
Databend Kafka Connect
Databend 提供了 Databend-Kafka-Connect 作为 Apache Kafka 的 Sink connector作为 Apache Kafka 的 Sink connector,直接接入到 Confluent Cloud 平台,就可以实时消费 kafka topic 中的数据并写入 Databend table。
Databend kafka connect 提供了诸多特性,例如自动建表,Append Only 和 Upsert 写入模式,自动的 schema evolution。
这篇文章我们将会介绍如何在 Confluent Cloud 上使用 Databend Kafka Connector 构建实时的数据同步管道。
实现
创建自定义 connector
Confluent 提供了一个 connector hub,在这里可以找到所有已经内置到 Confluent Cloud 中的 Connector。对于没有内置的,Confluent 支持创建自定义 connector。
Add plugin
配置 plugin
填写 connector 的名称、描述以及入口 class,在这里 databend connect 的入口类是:com.databend.kafka.connect.DatabendSinkConnector
。
git clone ``https://github.com/databendcloud/databend-kafka-connect.git
下载源码编译或者直接到 release 页面下载打包好的 jar 文件。将其上传至 Confluent Cloud。
创建 Topic
创建一个新的 kafka topic
为新创建的 topic 定义一个 schema 以确定其数据结构。这里我们确定 kafka topic 中的数据格式为 AVRO,其 schema 为:
{ "doc": "Sample schema to help you get started.", "fields": [ { "doc": "The int type is a 32-bit signed integer.", "name": "id", "type": "int" }, { "doc": "The string is a unicode character sequence.", "name": "name", "type": "string" }, { "doc": "The string is a unicode character sequence.", "name": "age", "type": "int" } ], "name": "sampleRecord", "type": "record" }
Add connector for topic
为刚创建的 topic 添加一个 sink connector
选择上面我们自定义的 databend connector 并配置 API key 和 secret。
添加 databend connector 的配置文件
{ "auto.create": "true", "auto.evolve": "true", "batch.size": "1", "confluent.custom.schema.registry.auto": "true", "connection.attempts": "5", "connection.backoff.ms": "10000", "connection.database": "testsync", "connection.password": "password", "connection.url": "jdbc:databend://tn3ftqihs--medium-p8at.gw.aws-us-east-2.default.databend.com:443?ssl=true", "connection.user": "cloudapp", "errors.tolerance": "all", "insert.mode": "upsert", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "max.retries": "10", "pk.fields": "id", "pk.mode": "record_value", "table.name.format": "testsync.${topic}", "topics": "topic_avrob", "value.converter": "io.confluent.connect.avro.AvroConverter" }
配置文件中指定了目标端的连接参数,数据库名,表名,kafka 相关信息以及 connector converter 。 key.converter 和 value.converter 就是指定的转换器,分别指定了消息键和消息值所使用的的转换器,用于在 Kafka Connect 格式和写入 Kafka 的序列化格式之间进行转换。这控制了写入 Kafka 或从 Kafka 读取的消息中键和值的格式。由于这与 Connector 没有任何关系,因此任何 Connector 可以与任何序列化格式一起使用。默认使用 Kafka 提供的 JSONConverter。有些转换器还包含了特定的配置参数。例如,通过将 key.converter.schemas.enable 设置成 true 或者 false 来指定 JSON 消息是否包含 schema。
配置网络白名单
将数据写入的目标端的 host 填入 confluent cloud 的 connection endpoint 配置,confluent 会在 kafka 和目标端之间建立 private link 以确保网络通畅。
确认配置并启动 kafka connector
向 topic 中发送样例数据
确认 kafka connect 处于 running 状态后,
我们使用 confluent CLI 工具往 topic 中发送数据:
confluent kafka topic produce topic_avrob --value-format avro --schema schema.json
其中 schema.json
就是我们为 topic 定义的 schema。
通过 confluent cloud 的日志我们可以看到 kafka connect 已经接收到来自 topic 的消息并开始写入 databend table:
同时我们在 databend cloud 中可以看到 testsync.topic_avrob
表已经被自动创建且数据已写入:
总结
通过以上步骤,我们就可以在 Confluent Cloud 与 Databend Cloud 之间,使用 Databend Kafka Connector 构建起二者之间的实时数据同步管道。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Databend 开源周报第 156 期
Databend 是一款现代云数仓。专为弹性和高效设计,为您的大规模分析需求保驾护航。自由且开源。即刻体验云服务:https://app.databend.cn 。 What's On In Databend 探索 Databend 本周新进展,遇到更贴近你心意的 Databend。 支持 Share Catalog 此前,共享视图是通过创建数据库(CREATE DATABASE xx FROM SHARE xxx;)的方式来实现的,这种方法在使用和管理上存在一定的复杂性和局限性。 随着 Share Catalog 功能的引入,共享视图现在作为目录的一部分存在,而不是数据库,进一步简化和优化了共享视图的访问方式。 CREATE CATALOG xx TYPE = 'share' CONNECTION = (...); 如果您想了解更多信息,欢迎联系 Databend 团队,或查看下面列出的资源。 PR #16172 | feat: add share catalog Beyond the Code 一起来探索 Databend 社区和周边生态中的新鲜事。 了解如何共享 Workshee...
- 下一篇
如何有效避免Redis缓存击穿
引言 Redis,作为当前最受欢迎的高性能键值对存储系统,不仅广泛应用于缓存,还在消息传递系统中发挥着重要作用。其出色的处理速度大大提升了现代应用的响应时间和数据处理能力。然而,在面对高并发场景时,缓存系统可能会遇到所谓的"缓存击穿"问题,特别是当某一热点数据的缓存突然失效,而大量请求随之直击数据库,可能导致服务短时间内不稳定甚至宕机。本文旨在探讨在Spring Boot环境下,如何通过几种策略有效防止Redis缓存击穿,确保应用的高可用性和稳定性。 一、基本概念 1.Redis Redis是一个多功能的内存数据存储系统,主要用作数据库、缓存和消息代理。它的高性能来源于所有数据都在内存中进行处理,使得访问速度极快。支持的原子操作和丰富的数据类型使得Redis不仅可以高效处理数据,还能保证数据的一致性和稳定性,是构建需求响应迅速的现代应用的理想选择。 从上图中,我们可以看出Redis在实际开发当中是如何使用的。 2.布隆过滤器 布隆过滤器是一种高效的数据结构,用于快速判断一个元素是否存在于一个集合中,主要特点是高效的空间和时间性能以及一定的误判率。虽然它可能错误地判断某些不存在的元素为存...
相关文章
文章评论
共有0条评论来说两句吧...