RocketMQ Flink Catalog 设计与实践
摘要:本文为 RocketMQ Flink Catalog 使用指南。主要内容包括:
- Flink 和 Flink Catalog
- RocketMQ Flink Connector
- RocketMQ Flink Catalog
作者:李晓双 ,Apache RocketMQ Contributor
Mentor:蒋晓峰,Apache RocketMQ Committer
一、Flink 和 Flink Catalog
Flink 是一个分布式计算引擎,目前已经实现批流一体,可以实现对有界数据和无界数据的处理。需要有效分配和管理计算资源才能执行流式应用程序。
目前 Flink API 共抽象为四个部分:
- 最顶层的抽象为 SQL。SQL 抽象与 Table API 抽象之间的关联是非常紧密的,并且 SQL 查询语句可以在 Table API 中定义的表上执行。
- 第二层抽象为 Table API。Table API 是以表(Table)为中心的声明式编程(DSL)API,例如在流式数据场景下,它可以表示一张正在动态改变的表。
- 第三层抽象是 Core APIs 。 许多程序可能使用不到最底层的 API , 而是可以使用 Core APIs 进行编程:其中包含 DataStream API(应用于有界/无界数据流场景)和 DataSet API(应用于有界数据集场景)两部分。
- 第四层抽象为有状态的实时流处理。
Flink Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。Flink 对于元数据的管理分为临时的、持久化的两种。内置的 GenericInMemoryCatalog
是基于内存实现的 Catalog,所有元数据只在 session 的生命周期内可用。JdbcCatalog 和 HiveCatalog 就是可以持久化元数据的 Catalog。
Flink Catalog 是扩展的,支持用户自定义。为了在 Flink SQL 中使用自定义 Catalog,用户需要通过实现CatalogFactory
接口来实现对应的 Catalog 工厂。该工厂是使用 Java 的服务提供者接口 (SPI) 发现的。可以将实现此接口的类添加到 META_INF/services/org.apache.flink.table.factories.Factory
JAR 文件中。
二、RocketMQ Flink Connector
RocketMQ 连接器为 Flink 提供从 RocketMQ Topic 中消费和写入数据的能力。Flink 的 Table API & SQL 程序可以连接到其他外部系统,用于读取和写入批处理和流式表。Source 提供对存储在外部系统(例如数据库、键值存储、消息队列或文件系统)中的数据的访问。Sink 将数据发送到外部存储系统。
该项目的 Github 仓库是: https://github.com/apache/rocketmq-flink
三、RocketMQ Flink Catalog
3.1 设计与实现
3.1.1 RocketMQ Flink Catalog 的设计主要分为两步
- 实现一个 RocketMqCatalogFactory 基于字符串属性创建已配置 Catalog 实例的工厂。将此实现类添加到
META_INF/services/org.apache.flink.table.factories.Factory
中。 - 继承 AbstractCatalog 实现 RocketMqCatalog,通过实现 Catalog 接口中的方法,完成对数据库、表、分区等信息的查询操作。
类图如下:
3.1.2 RocketMQ Flink Catalog 的存储
RocketMQ Flink Catalog 的底层存储使用的是 RocketMQ Schema Registry。Flink 调用 Catalog 的时候,在 AbstractCatalog 的实现类中通过 RocketMQ Schema Registry 的客户端和 RocketMQ Schema Registry 服务端进行交互。
- Database : 返回默认的 default 。
- Table : 从 RocketMQ Schema Registry 获取对应的 Schema,然后解析 IDL 转换成 DataType。
- Partition : 通过
DefaultMQAdminExt
从 RocketMQ 中获取到 Partition 相关信息。
RocketMQ Schema Registry 是一个 Topic Schema 的管理中心。它为 Topic(RocketMQ Topic)的注册、删除、更新、获取和引用模式提供了一个 RESTful 接口。New RocketMQ 客户端通过将 Schema 与 Subject 关联起来,可以直接发送结构化数据。用户不再需要关心序列化和反序列化的细节。
3.1.3 RocketMQ Flink Catalog 支持的 API
目前 RocketMQ Flink Catalog 支持对 Database、Table、Partition 的查询和判断是否存在的操作,不支持创建、修改、删除。所以在使用之前需要通过 RocketMQ Schema Registry 来创建好对应的 Schema。
3.2 使用指南
表环境(TableEnvironment)是 Flink 中集成 Table API & SQL 的核心概念。它负责:
- 在内部的 Catalog 中注册 Table。
- 注册外部的 Catalog。
- 加载可插拔模块。
- 执行 SQL 查询。
- 注册自定义函数 (scalar、table 或 aggregation)。
- 将 DataStream 或 DataSet 转换成 Table。
- 持有对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用。
3.2.1 创建并注册 Catalog
Table API :
RocketMQCatalog rocketMqCatalog = new RocketMQCatalog("rocketmq_catalog", "default", "http://localhost:9876", "http://localhost:8080"); tableEnvironment.registerCatalog("rocketmq_catalog", rocketMqCatalog);
SQL:
TableResult tableResult = tableEnvironment.executeSql( "CREATE CATALOG rocketmq_catalog WITH (" + "'type'='rocketmq_catalog'," + "'nameserver.address'='http://localhost:9876'," + "'schema.registry.base.url'='http://localhost:8088');");
3.2.2 修改当前的 Catalog
Table API :
tableEnvironment.useCatalog("rocketmq_catalog");
SQL:
tableEnvironment.executeSql("USE CATALOG rocketmq_catalog");
3.2.3 列出可用的 Catalog
Table API :
String[] catalogs = tableEnvironment.listCatalogs();
SQL:
TableResult tableResult = tableEnvironment.executeSql("show catalogs");
3.2.4 列出可用的 Database
Table API :
String[] databases = tableEnvironment.listDatabases();
SQL:
TableResult tableResult = tableEnvironment.executeSql("show databases");
3.2.5 列出可用的 Table
Table API:
String[] tables = tableEnvironment.listTables();
SQL:
TableResult tableResult = tableEnvironment.executeSql("show tables");
3.3 Quick Start
需要提前准备可用的 RocketMQ 、RocketMQ Schema Registry:
- RocketMQ 部署:https://rocketmq.apache.org/docs/介绍/02quickstart
- RocketMQ Schema Registry 部署:https://github.com/apache/rocketmq-schema-registry
3.3.1 创建 Topic
创建两个 Topic,rocketmq_source 和 rocketmq_sink。
3.3.2 注册 Source Schema
curl -X POST -H "Content-Type: application/json" \ -d '{"schemaIdl":"{"type":"record","name":"rocketmq_source_schema","namespace":"namespace","fields":[{"name":"name","type":"string"}]}"}' \ http://localhost:8088/schema-registry/v1/subject/rocketmq_source/schema/rocketmq_source_schema
3.3.3 注册 Sink Schema
curl -X POST -H "Content-Type: application/json" \ -d '{"schemaIdl":"{"type":"record","name":"rocketmq_sink_schema","namespace":"namespace","fields":[{"name":"name","type":"string"}]}"}' \ http://localhost:8088/schema-registry/v1/subject/rocketmq_sink/schema/rocketmq_sink_schema
3.3.4 添加依赖
创建一个任务项目 ,添加 rocketmq-flink 的依赖 :
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-flink</artifactId> <version>1.0.0-SNAPSHOT</version> </dependency>
目前 RocketMQ Schema Registry 还没有发布正式的版本,只有快照版,如果发现 jar 找不到,可以尝试以下方法:
<repositories> <repository> <id>snapshot-repos</id> <name>Apache Snapshot Repository</name> <url>https://repository.apache.org/snapshots/</url> <snapshots> <enabled>true</enabled> </snapshots> <layout>default</layout> </repository> </repositories>
3.3.5 创建任务
/** * @author lixiaoshuang */ public class RocketMqCatalog { public static void main(String[] args) { // 初始化表环境参数 EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().inStreamingMode().build(); // 创建 table 环境 TableEnvironment tableEnvironment = TableEnvironment.create(environmentSettings); // 注册 rocketmq catalog tableEnvironment.executeSql( "CREATE CATALOG rocketmq_catalog WITH (" + "'type'='rocketmq_catalog'," + "'nameserver.address'='http://localhost:9876'," + "'schema.registry.base.url'='http://localhost:8088');"); tableEnvironment.executeSql("USE CATALOG rocketmq_catalog"); // 从 rocketmq_source 中获取数据写入到 rocketmq_sink 中 TableResult tableResult = tableEnvironment.executeSql("INSERT INTO rocketmq_sink /*+ OPTIONS" + "('producerGroup'='topic_producer_group') */ select * from rocketmq_source /*+ OPTIONS" + "('consumerGroup'='topic_consumer_group') */"); } }
启动任务并运行以后,打开 RocketMQ 控制台,往 rocketmq_source 这个 Topic 发送一条消息。
然后再查看 rocketmq_sink 的状态,就会发现消息已经通过写入到 rocketmq_sink 中了。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
RustDesk 跻身 2022 第三季度前十大成长最快的开源初创项目
RustDesk是一款开源远程桌面软件,使用Rust语言和Flutter框架构建。经过一年多的发展,目前已经成为全球成长最快的远程桌面项目,GitHub突破3万星,Rust语言开源项目国际排名第7。
- 下一篇
将 NGINX 部署为 API 网关,第 3 部分:发布 gRPC 服务
原文作者:Liam Crilly of F5 原文链接:将 NGINX 部署为 API 网关,第 3 部分:发布 gRPC 服务 转载来源:NGINX 官方网站 本文是“将 NGINX 开源版和 NGINX Plus 部署为 API 网关”系列博文的第三篇。 第 1 部分详细说明了 NGINX 开源版和 NGINX Plus 作为基于 HTTP 的 RESTful API 的 API 网关的一些用例。 第 2 部分对这些用例进行了扩展,探讨了一系列可用于保护生产环境中后端 API 服务的安全措施。 本文解释了如何将 NGINX 开源版和 NGINX Plus 部署为 gRPC 服务的 API 网关。文章最初发布于 2018 年,随着NGINX Plus Release 23中引入了对原生 gRPC 健康检查协议的支持,特此更新,以方便大家充分利用 NGINX 开源版和 NGINX Plus。更新详情请参阅下文“实施健康检查”一节。 注:除非另有说明,否则本文中的所有信息都适用于 NGINX Plus 和 NGINX 开源版。为了便于阅读,当讨论内容同时适用于两个版本时,下文将它们统称为...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- CentOS7,CentOS8安装Elasticsearch6.8.6
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- 设置Eclipse缩进为4个空格,增强代码规范
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- CentOS8编译安装MySQL8.0.19