RustDesk 跻身 2022 第三季度前十大成长最快的开源初创项目
RustDesk是一款开源远程桌面软件,使用Rust语言和Flutter框架构建。经过一年多的发展,目前已经成为全球成长最快的远程桌面项目,GitHub突破3万星,Rust语言开源项目国际排名第7。
摘要:本文为 RocketMQ Flink Catalog 使用指南。主要内容包括:
- Flink 和 Flink Catalog
- RocketMQ Flink Connector
- RocketMQ Flink Catalog
作者:李晓双 ,Apache RocketMQ Contributor
Mentor:蒋晓峰,Apache RocketMQ Committer
Flink 是一个分布式计算引擎,目前已经实现批流一体,可以实现对有界数据和无界数据的处理。需要有效分配和管理计算资源才能执行流式应用程序。
目前 Flink API 共抽象为四个部分:
Flink Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。Flink 对于元数据的管理分为临时的、持久化的两种。内置的 GenericInMemoryCatalog 是基于内存实现的 Catalog,所有元数据只在 session 的生命周期内可用。JdbcCatalog 和 HiveCatalog 就是可以持久化元数据的 Catalog。
Flink Catalog 是扩展的,支持用户自定义。为了在 Flink SQL 中使用自定义 Catalog,用户需要通过实现CatalogFactory接口来实现对应的 Catalog 工厂。该工厂是使用 Java 的服务提供者接口 (SPI) 发现的。可以将实现此接口的类添加到 META_INF/services/org.apache.flink.table.factories.FactoryJAR 文件中。
RocketMQ 连接器为 Flink 提供从 RocketMQ Topic 中消费和写入数据的能力。Flink 的 Table API & SQL 程序可以连接到其他外部系统,用于读取和写入批处理和流式表。Source 提供对存储在外部系统(例如数据库、键值存储、消息队列或文件系统)中的数据的访问。Sink 将数据发送到外部存储系统。
该项目的 Github 仓库是: https://github.com/apache/rocketmq-flink
META_INF/services/org.apache.flink.table.factories.Factory 中。类图如下:
RocketMQ Flink Catalog 的底层存储使用的是 RocketMQ Schema Registry。Flink 调用 Catalog 的时候,在 AbstractCatalog 的实现类中通过 RocketMQ Schema Registry 的客户端和 RocketMQ Schema Registry 服务端进行交互。
DefaultMQAdminExt 从 RocketMQ 中获取到 Partition 相关信息。RocketMQ Schema Registry 是一个 Topic Schema 的管理中心。它为 Topic(RocketMQ Topic)的注册、删除、更新、获取和引用模式提供了一个 RESTful 接口。New RocketMQ 客户端通过将 Schema 与 Subject 关联起来,可以直接发送结构化数据。用户不再需要关心序列化和反序列化的细节。
目前 RocketMQ Flink Catalog 支持对 Database、Table、Partition 的查询和判断是否存在的操作,不支持创建、修改、删除。所以在使用之前需要通过 RocketMQ Schema Registry 来创建好对应的 Schema。
表环境(TableEnvironment)是 Flink 中集成 Table API & SQL 的核心概念。它负责:
Table API :
RocketMQCatalog rocketMqCatalog = new RocketMQCatalog("rocketmq_catalog", "default", "http://localhost:9876", "http://localhost:8080");
tableEnvironment.registerCatalog("rocketmq_catalog", rocketMqCatalog);
SQL:
TableResult tableResult = tableEnvironment.executeSql(
"CREATE CATALOG rocketmq_catalog WITH (" +
"'type'='rocketmq_catalog'," +
"'nameserver.address'='http://localhost:9876'," +
"'schema.registry.base.url'='http://localhost:8088');");
Table API :
tableEnvironment.useCatalog("rocketmq_catalog");
SQL:
tableEnvironment.executeSql("USE CATALOG rocketmq_catalog");
Table API :
String[] catalogs = tableEnvironment.listCatalogs();
SQL:
TableResult tableResult = tableEnvironment.executeSql("show catalogs");
Table API :
String[] databases = tableEnvironment.listDatabases();
SQL:
TableResult tableResult = tableEnvironment.executeSql("show databases");
Table API:
String[] tables = tableEnvironment.listTables();
SQL:
TableResult tableResult = tableEnvironment.executeSql("show tables");
需要提前准备可用的 RocketMQ 、RocketMQ Schema Registry:
创建两个 Topic,rocketmq_source 和 rocketmq_sink。
curl -X POST -H "Content-Type: application/json" \
-d '{"schemaIdl":"{"type":"record","name":"rocketmq_source_schema","namespace":"namespace","fields":[{"name":"name","type":"string"}]}"}' \
http://localhost:8088/schema-registry/v1/subject/rocketmq_source/schema/rocketmq_source_schema
curl -X POST -H "Content-Type: application/json" \
-d '{"schemaIdl":"{"type":"record","name":"rocketmq_sink_schema","namespace":"namespace","fields":[{"name":"name","type":"string"}]}"}' \
http://localhost:8088/schema-registry/v1/subject/rocketmq_sink/schema/rocketmq_sink_schema
创建一个任务项目 ,添加 rocketmq-flink 的依赖 :
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-flink</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
目前 RocketMQ Schema Registry 还没有发布正式的版本,只有快照版,如果发现 jar 找不到,可以尝试以下方法:
<repositories>
<repository>
<id>snapshot-repos</id>
<name>Apache Snapshot Repository</name>
<url>https://repository.apache.org/snapshots/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<layout>default</layout>
</repository>
</repositories>
/**
* @author lixiaoshuang
*/
public class RocketMqCatalog {
public static void main(String[] args) {
// 初始化表环境参数
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().inStreamingMode().build();
// 创建 table 环境
TableEnvironment tableEnvironment = TableEnvironment.create(environmentSettings);
// 注册 rocketmq catalog
tableEnvironment.executeSql(
"CREATE CATALOG rocketmq_catalog WITH (" +
"'type'='rocketmq_catalog'," +
"'nameserver.address'='http://localhost:9876'," +
"'schema.registry.base.url'='http://localhost:8088');");
tableEnvironment.executeSql("USE CATALOG rocketmq_catalog");
// 从 rocketmq_source 中获取数据写入到 rocketmq_sink 中
TableResult tableResult = tableEnvironment.executeSql("INSERT INTO rocketmq_sink /*+ OPTIONS" +
"('producerGroup'='topic_producer_group') */ select * from rocketmq_source /*+ OPTIONS" +
"('consumerGroup'='topic_consumer_group') */");
}
}
启动任务并运行以后,打开 RocketMQ 控制台,往 rocketmq_source 这个 Topic 发送一条消息。
然后再查看 rocketmq_sink 的状态,就会发现消息已经通过写入到 rocketmq_sink 中了。
微信关注我们
转载内容版权归作者及来源网站所有!
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。
马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。
为解决软件依赖安装时官方源访问速度慢的问题,腾讯云为一些软件搭建了缓存服务。您可以通过使用腾讯云软件源站来提升依赖包的安装速度。为了方便用户自由搭建服务架构,目前腾讯云软件源站支持公网访问和内网访问。
Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。