iceberg的java api使用
【前言】
了解一个组件的最好方式是先使用该组件,今天我们就来聊聊如何通过java api对iceberg进行操作。
为什么是选择api进行介绍,而不是更通用的flink、spark、hive等。一方面是觉得flink、spark使用iceberg的介绍网上已经有很多,官网的介绍也比较清晰,而java api的介绍则相对少些;另一方面,不管是flink,spark最终都还是调用这些基本的api完成相关的操作的,因此先从api入手,后续对flink,spark,trino等组件对iceberg的操作原理理解起来也会更容易些。所以就有了本文的内容。
【catalog的创建】
在创建数据库,表之前需要先创建catalog,这里主要介绍hive类型的catalog。
import org.apache.iceberg.hive.HiveCatalog;
HiveCatalog catalog = new HiveCatalog();
catalog.setConf(conf);
Map <String, String> properties = new HashMap<String, String>();
properties.put(CatalogProperties.WAREHOUSE_LOCATION, "/user/hive/warehouse")
properties.put(CatalogProperties.URI, "thrift://172.16.55.21:9083");
properties.put(CatalogProperties.CATALOG_IMPL, "org.apache.iceberg.hive.HiveCatalog");
// 初始化catalog
catalog.initialize("hive",properties);
对于hive类型的catalog,主要指定数据库存储位置,以及hive metastore server的URI。
【创建表】
对于iceberg表,可以理解由四部分组成,表结构定义(schema)、分区定义(partitionSpec)、表的属性(properties),以及表的唯一识别信息(identity)即表所属的数据库与表名。创建表时只需要分别制定这些内容即可。
// 定义表结构schema
Schema schema = new Schema(
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.required(2, "name", Types.StringType.get()),
Types.NestedField.required(3, "birth", Types.StringType.get()));
// 分区定义(以birth字段按月进行分区)
PartitionSpec spec = PartitionSpec.builderFor(schema).month("birth").build();
// 数据库名,表名
TableIdentifier name = TableIdentifier.of("iceberg_db", "developer");
// 表的属性
Map<String, String> properties = new HashMap<>();
properties.put("engine.hive.enabled", "true");
// 建表
Table table = catalog.createTable(name, schema, spec, properties);
这里需要注意的是:分区定义中的字段必须是schema中已有的字段,如果在schema中找不到对应的字段,会报错抛异常。
但是,通过sql方式建表时,分区字段会隐式地加入到表字段定义中,即不用强制写到schema的字段定义。例如通过如下hivesql语句建表:
create table developer(
id int,
name string
)
partitioned by (birth Date)
stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
tblproperties('engine.hive.enabled'='true');
建表后的情况如下所示:
【插入数据】
插入数据可以分为3个步骤,首先根据表格式构造对应的数据记录,然后将记录写入到指定格式(parquet、orc等)的文件中,最后将文件列表写入到表中。
// 1. 构建记录
GenericRecord record = GenericRecord.create(schema);
ImmutableList.Builder<GenericRecord> builder = ImmutableList.builder();
builder.add(ImmutableMap.of("id", 1, "name", "chen", "birth", "2020-03-08"));
builder.add(ImmutableMap.of("id", 2, "name", "yuan", "birth", "2021-03-09"));
builder.add(ImmutableMap.of("id", 3, "name", "jie", "birth", "2023-03-10"));
builder.add(ImmutableMap.of("id", 4, "name", "ma", "birth", "2023-03-11"));
ImmutableList<GenericRecord> records = builder.build();
// 2. 将记录写入parquet文件
String filepath = table.location() + "/" + UUID.randomUUID().toString();
OutputFile file = table.io().newOutputFile(filepath);
DataWriter<GenericRecord> dataWriter =
Parquet.writeData(file)
.schema(schema)
.createWriterFunc(GenericParquetWriter::buildWriter)
.overwrite()
.withSpec(PartitionSpec.unpartitioned())
.build();
try {
for (GenericRecord record : builder.build()) {
dataWriter.write(record);
}
} finally {
dataWriter.close();
}
// 3. 将文件写入table中
DataFile dataFile = dataWriter.toDataFile();
table.newAppend().appendFile(dataFile).commit();
这里,对于数据文件的存储位置是有一定规范的,如果没有在指定路径下存放,那么对于其他组件来说(比如表同步到hive后),会出现数据不完整或者查不到的情况。
【行级别的查询数据】
查询是通过构造ScanBuilder,并配合IcebergGenerics.read来完成的。ScanBuilder还可以进行select选择列,以及通过where指定查询条件。
Table table = catalog.loadTable(TableIdentifier.of("iceberg_db", "developer"));
IcebergGenerics.ScanBuilder scanBuilder = IcebergGenerics.read(table);
// 查询全部
CloseableIterable<Record> records = scanBuilder.build();
for (Record record : records) {
}
// 指定select列与where条件的查询
//CloseableIterable<Record> records = scanBuilder.select("id", "name").where(Expressions.lessThan("id", Integer.valueOf(10))).build();
【表结构变更】
iceberg所具备的一项特点就是可以对表结构进行变更,例如新增,删除已有字段,字段名或类型的变更,新增分区等。
1)新增列字段
Table table = catalog.loadTable(TableIdentifier.of("iceberg_db", "developer"));
UpdateSchema newSchema = table.updateSchema();
// 字段名, 字段类型
newSchema.addColumn("skill", Type.StringType.get());
updateSchema.commit();
对于已经写入的记录数据,其新增字段的值为NULL。
当然还可以UpdateSchema进行删除字段、重命名字段、更新字段(类型),调整字段位置等操作。
2)新增分区
通过UpdatePartitionSpec可以进行分区的相关操作。
Table table = catalog.loadTable(TableIdentifier.of(dbName, tblName));
UpdatePartitionSpec updatePartitionSpec = table.updateSpec();
updatePartitionSpec.addField("skill");
updatePartitionSpec.commit();
【snapshot的操作】
完成表的加载后,可以得到表的所有snapshot信息,也可以删除指定的snapshot,或指定时间之前的snapshot。
Table table = catalog.loadTable(TableIdentifier.of(dbName, tblName));
for (Snapshot snapshot : table.snapshots()) {
System.out.println(snapshot.sequenceNumber() + " " + snapshot.snapshotId() + " " + snapshot.parentId() + " " + snapshot.timestampMillis());
}
ExpireSnapshots expireSnapshot = table.expireSnapshots();
expireSnapshot.expireOlderThan(table.currentSnapshot().timestampMillis());
expireSnapshot.commit();
【删除表】
删除表的操作则很简单,通过catalog对表进行删除。
TableIdentifier name = TableIdentifier.of("iceberg_db", "developer");
catalog.dropTable(name, true);
【总结】
本文主要介绍iceberg api的一些基本操作,这里未涉及数据的更新与删除,因为这是一个比较大的知识点。另外,分区的新增,添加新的列这些操作的背后逻辑和iceberg的文件存储格式都有一定的关系,我们后续会逐一介绍。
好了,这就是本文的全部内容,如果觉得本文对您有帮助,请点赞+转发,如果觉得有不正确的地方,也可以拍砖指点,最后,欢迎加我微信交流~
本文分享自微信公众号 - 陈猿解码(gh_383bc7486c1a)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
详解AQS的7个同步组件
摘要:AQS的全称为Abstract Queued Synchronizer,是在J.U.C(java.util.concurrent)下子包中的类。 本文分享自华为云社区《【高并发】AQS案例详解》,作者: 冰 河。 AQS的全称为Abstract Queued Synchronizer,是在J.U.C(java.util.concurrent)下子包中的类。 一、AQS的设计如下 (1)使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架。 (2)利用了一个int类型表示状态 在AQS类中,有一个叫做state的成员变量。 基于AQS有一个同步组件ReentrantLock,在ReentrantLock中,state表示获取锁的线程数。如果state=0,则表示还没有线程获取锁;如果state=1,则表示有线程获取了锁;如果state>1,则表示重入锁的数量。 (3)使用方法是继承 设计上基于模板方法,使用时需要继承AQS,并覆写其中的方法 (4)子类通过继承并通过实现它的方法管理其状态{acquire和release}的方法操纵状态 (5)可以同时实现排它...
- 下一篇
知识蒸馏、轻量化模型架构、剪枝…几种深度学习模型压缩方法
摘要:模型压缩算法旨在将一个大模型转化为一个精简的小模型。工业界的模型压缩方法有:知识蒸馏、轻量化模型架构、剪枝、量化。 本文分享自华为云社区《深度学习模型压缩方法综述》,作者:嵌入式视觉 。 一,模型压缩技术概述 因为嵌入式设备的算力和内存有限,因此深度学习模型需要经过模型压缩后,方才能部署到嵌入式设备上。 在一定程度上,网络越深,参数越多,模型也会越复杂,但其最终效果也越好。而模型压缩算法是旨在将一个庞大而复杂的预训练模型转化为一个精简的小模型。本文介绍了卷积神经网络常见的几种压缩方法。 按照压缩过程对网络结构的破坏程度,《解析卷积神经网络》一书中将模型压缩技术分为“前端压缩”和“后端压缩”两部分: 前端压缩,是指在不改变原网络结构的压缩技术,主要包括知识蒸馏、轻量级网络(紧凑的模型结构设计)以及滤波器(filter)层面的剪枝(结构化剪枝)等; 后端压缩,是指包括低秩近似、未加限制的剪枝(非结构化剪枝/稀疏)、参数量化以及二值网络等,目标在于尽可能减少模型大小,会对原始网络结构造成极大程度的改造。 总结:前端压缩几乎不改变原有网络结构(仅仅只是在原模型基础上减少了网络的层数或者滤...
相关文章
文章评论
共有0条评论来说两句吧...