DataHub服务用户迁移文档
前言
原Odps版内测DataHub(下文统称为老DataHub服务),于2016年11月21日起已经处于维护状态,新版DataHub届时已经开启公测,公测至今已有半年以上时间,我们决定开始逐步下线老DataHub服务,老版部分用户需要迁移至新版DataHub。
新版本具有更多的特性,性能功能都有不少提升,可以同时支持数据一份数据同步到Odps、OSS、ElasticSearch等多个不同服务中,且提供WebConsole控制台进行更简单的操作。
准备工作
本文档针对使用Logstash、Fluentd、Flume以及使用SDK写入老DataHub服务的用户,提供迁移到新服务的指引,过程中遇到任何困难可以联系我们
![dingtalk dingtalk]()
新版DataHub相关文档
DataHub产品使用文档
DataHub控制台
创建新datahub project
新版DataHub中存在项目空间-Project概念,与Odps中Project类似,但是不等于Odps中的Project,为了方便管理,我们建议迁移时在DataHub中创建与Odps Project同名的Project(不同名称也可以)
- 登录DataHub官网控制台,使用阿里云账号登录;
- 点击创建Project,输入名称及描述,点击创建(Project描述中建议携带Project用处及Owner的邮箱或联系方式)
创建新DataHub topic
新版DataHub存在主题-Topic的概念,与Odps的Table类似,但是不等于Odps的Table,通常如果是需要导入数据到Odps的话,需要为每张表创建一个Topic,且字段类型、顺序与名称需要一致,Odps中的分区字段当做普通的Topic字段处理,新版DataHub会根据该分区字段再DataHub中的数据值,将数据同步到Odps离线表中。
例如:
MaxCompute表: table_test(f1 string, f2 string, f3 double) partitioned by (ds string, pt string)
对应Topic应为如下的Schema:
Topic: topic_test(f1 string, f2 string, f3 double, ds string, pt string)
创建Topic可以通过以下方式:
- 若Topic数量较少,可以再WebConsole控制台,进入Project页面后点击创建Topic按钮,选择从MaxCompute导入,输入配置信息后勾选“自动创建DataConnector”,点击“导入表结构”即可导入odps表对应的格式,确认格式无误后选择Shard数量及生命周期, Shard数量建议与老服务一样,生命周期建议3天,点击创建即可。
- 若Topic过多,可以使用迁移工具DataHub表结构迁移工具,工具将对列表中的所有表创建对应Topic及Connector。
DataHub与MaxCompute字段类型对应表
| MaxCompute表中的类型 |
DataHub Topic中的类型 |
| STRING |
STRING |
| DOUBLE |
DOUBLE |
| BIGINT |
BIGINT |
| DATETIME |
TIMESTAMP (注:以微秒为度量单位) |
| BOOLEAN |
BOOLEAN |
| DECIMAL |
不支持 |
| MAP |
不支持 |
| ARRAY |
不支持 |
映射Odps分区
老DataHub在写入数据时需要直接指定分区,如果是通过fluend或logstash等插件写入的用户是需要配置分区信息或者通过某个时间字段转为固定格式作为分区
新版DataHub在这一行为上有所改变,Odps表的分区字段再DataHub中将会变成一个普通字段,后台Connector同步任务在同步数据到Odps表时会根据分区字段比如pt具体每条记录的值写入Odps对应分区中。
例如:
MaxCompute表: table_test(f1 string, f2 string, f3 double) partitioned by (ds string, pt string)
对应Topic应为如下的Schema:
Topic: topic_test(f1 string, f2 string, f3 double, ds string, pt string)
数据1: ("test", "test", "0.14", "a1", "20170405")
数据2: ("test", "test", "0.14", "aa", "20170406")
则数据1将会同步到odps分区ds=a1,pt=20170405
则数据2将会同步到odps分区ds=a2,pt=20170406
- 若使用插件导入,并且是通过字符串转换为固定格式的分区值的用户,新的插件需要使用fluentd/logstash的filter功能,对分区字段的值进行转换,具体使用方式可以参考这些开源工具的官方文档
不同类型接入方式迁移
使用Java SDK
需要换成新版本DataHub的SDK,Mvn依赖变化
原依赖
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-core</artifactId>
<version>0.xxx</version>
</dependency>
新依赖
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>aliyun-sdk-datahub</artifactId>
<version>2.3.0-public</version>
</dependency>
Client初始化
原Client初始化步骤
Account account = new AliyunAccount(accessId, accessKey);
odps = new Odps(account);
odps.setDefaultProject(project);
odps.setEndpoint(odpsEndpoint);
DatahubClient client = new DatahubClient(odps, project, table, datahubEndpoint);
client.loadShard(shardNumber);
client.waitForShardLoad();
新Client初始化步骤
AliyunAccount account = new AliyunAccount(accessId, accessKey);
DatahubConfiguration conf = new DatahubConfiguration(account, datahubEndpoint);
DatahubClient client = new DatahubClient(conf);
获取Shard列表
原获取Shard列表及状态方式
HashMap<Long, DatahubClient.ShardState> shardStatus = client.getShardStatus();
新方式
ListShardResult listShardResult = client.listShard(projectName, topicName);
写入数据
原写入方式
DatahubWriter writer = client.openDatahubWriter(shardId);
TableSchema schema = client.getStreamSchema();
DatahubRecordPack recordPack = new DatahubRecordPack(schema);
/* Write another 20 records recordPack into another partition */
for (int i = 0; i < 20; i++) {
Record record = makeRecord(schema);
recordPack.append(record);
}
partSpec = "pt='20150809'";
packId = writer.write(new PartitionSpec(partSpec), recordPack)
.getPackId();
System.out.println("record append to the pack: " + packId);
新写入方式
List<RecordEntry> recordEntries = new ArrayList<RecordEntry>();
RecordEntry entry = new RecordEntry(schema);
entry.setString(0, "Test");
entry.setBigint(1, 5L);
entry.setShardId(shardId);
recordEntries.add(entry);
PutRecordsResult result = client.putRecords(projectName, topicName, recordEntries);
if (result.getFailedRecordCount() != 0) {
List<ErrorEntry> errors = result.getFailedRecordError();
// deal with result.getFailedRecords()
}
完整写入新DataHub示例代码
使用Fluentd
通过Fluend插件写入数据的用户,迁移除了上述准备工作外,还需进行三个步骤
- 更换,安装新插件包
- 根据配置文件对比,修改现有配置文件
- 使用新配置文件重新启动fluend进程
插件包更换
新版Fluentd插件使用文档
原安装语句
gem install fluent-plugin-aliyun-odps
新安装语句(也可按照新版文档提供的一键安装包安装logstash)
gem install fluent-plugin-datahub
配置对比
部分配置不需更改,更改match 部分配置即可。
| 老服务配置项 |
新服务配置项 |
备注 |
| type |
type |
需要从aliyun_odps改为dataHub |
| aliyun_access_id |
access_id |
云账号accessid |
| aliyun_access_key |
access_key |
云账号accesskey |
| aliyun_odps_hub_endpoint |
endpoint |
Datahub服务域名,需要改为新服务的域名 |
| aliyun_odps_endpoint |
无 |
不再需要 |
| buffer_chunk_limit |
buffer_chunk_limit |
不需要变化,但是新配置不能超过3MB |
| buffer_queue_limit |
buffer_queue_limit |
不需要变化 |
| flush_interval |
flush_interval |
不需要变化 |
| project |
project_name |
datahub的Project,非odps project |
| table |
topic_name |
datahub的topic,非odps table |
| fields |
column_names |
指定需要采集的列 |
| partition |
无 |
不再需要 |
| time_format |
无 |
不再需要 |
| shard_number |
无 |
不再需要 |
| enable_fast_crc |
无 |
不再需要 |
| retry_time |
retry_time |
重试次数 |
| retry_interval |
retry_interval |
重试间隔 |
| abandon_mode |
无 |
不再需要 |
新增配置
| 新服务配置项 |
备注 |
| dirty_data_continue |
true/false遇到增数据是否继续,若为true 遇到脏数据会重试,重试次数用完,会将脏数据写入脏数据文件 |
| dirty_data_file |
指定脏数据文件的位置 |
| put_data_batch_size |
每1000条record写一次DataHub |
| shard_id |
指定shard_id写入,默认round-robin方式写入 |
| shard_keys |
指定用作分区key,用key值hash后作为写入shard的索引 |
[TODO] 能否放一个新老的diff文件example
使用Logstash
通过Logstash插件写入数据的用户,迁移除了上述准备工作外,还需进行三个步骤
- 更换,安装新插件包
- 根据配置文件对比,修改现有配置文件
- 使用新配置文件重新启动Logstash进程
插件包更换
新版Logstash插件使用文档
配置对比
input部分配置不需更改,更改output部分配置即可。
| 老服务配置项 |
新服务配置项 |
备注 |
| type |
type |
需要从aliyun_odps改为dataHub |
| aliyun_access_id |
access_id |
云账号accessid |
| aliyun_access_key |
access_key |
云账号accesskey |
| aliyun_odps_hub_endpoint |
endpoint |
Datahub服务域名,需要改为新服务的域名 |
| aliyun_odps_endpoint |
无 |
不再需要 |
| value_field |
无 |
不再需要 |
| project |
project_name |
datahub的Project,非odps project |
| table |
topic_name |
datahub的topic,非odps table |
| partition |
无 |
不再需要 |
| partition_time_format |
无 |
不再需要 |
| shard_number |
无 |
不再需要 |
| batch_size |
通过logstash启动参数设置 |
logstash -f <上述配置文件地址> -b 256 (256即为每次batch大小) |
| batch_timeout |
无 |
不再需要 |
新增配置
| 新服务配置项 |
备注 |
| dirty_data_continue |
true/false遇到增数据是否继续,若为true 遇到脏数据会重试,重试次数用完,会将脏数据写入脏数据文件 |
| dirty_data_file |
指定脏数据文件的位置 |
| put_data_batch_size |
每1000条record写一次DataHub |
| shard_keys |
数组类型,数据落shard的字段名称,插件会根据这些字段的值计算hash将每条数据落某个shard, 注意shard_keys和shard_id都未指定,默认轮询落shard |
| shard_id |
所有数据落指定的shard,注意shard_keys和shard_id都未指定,默认轮询落shard |
| retry_times |
重试次数,-1为无限重试、0为不重试、>0表示需要有限次数, 默认值为-1 |
| retry_interval |
下一次重试的间隔,单位为秒,默认值为5 |
使用Apache Flume
通过Flume工具写入数据的用户,迁移除了上述准备工作外,还需进行三个步骤
- 更换,安装新Flume工具插件
- 根据配置文件对比,修改现有配置文件
- 使用新配置文件重新启动Flume进程
插件更新
新版Flume工具文档
配置对比
| 老服务配置项 |
新服务配置项 |
备注 |
| a1.sinks.k1.type |
a1.sinks.k1.type |
从com.aliyun.odps.flume.sink.OdpsSink改为com.aliyun.datahub.flume.sink.DatahubSink |
| a1.sinks.k1.accessID |
a1.sinks.k1.datahub.accessID |
云账号accessid |
| a1.sinks.k1.accessKey |
a1.sinks.k1.datahub.accessKey |
云账号accesskey |
| a1.sinks.k1.odps.endPoint |
a1.sinks.k1.datahub.endPoint |
Datahub服务域名,需要改为新服务的域名 |
| aliyun_odps_endpoint |
无 |
不再需要 |
| a1.sinks.k1.odps.project |
a1.sinks.k1.datahub.project |
datahub的Project,非odps project |
| a1.sinks.k1.odps.table |
a1.sinks.k1.datahub.topic |
datahub的topic,非odps table |
| a1.sinks.k1.odps.partition |
无 |
不再需要 |
| a1.sinks.k1.batchSize |
a1.sinks.k1.batchSize |
批次大小 |
| a1.sinks.k1.serializer |
a1.sinks.k1.serializer |
无变化 |
| a1.sinks.k1.serializer.delimiter |
a1.sinks.k1.serializer.delimiter |
无变化 |
| a1.sinks.k1.serializer.fieldnames |
a1.sinks.k1.serializer.fieldnames |
无变化 |
| a1.sinks.k1.serializer.charset |
a1.sinks.k1.serializer.charset |
无变化 |
| a1.sinks.k1.serializer.delimiter |
a1.sinks.k1.serializer.delimiter |
无变化 |
| a1.sinks.k1.shard.number |
无 |
不再需要 |
| a1.sinks.k1.shard.maxTimeOut |
a1.sinks.k1.shard.maxTimeOut |
无变化 |
| a1.sinks.k1.autoCreatePartition |
无 |
不再需要 |
使用OGG
通过OGG工具写入数据的用户,迁移除了上述准备工作外,还需进行三个步骤
- 更换,安装新OGG工具插件
- 根据配置文件对比,修改现有配置文件
- 使用新配置文件重新启动OGG进程
插件更新
新版OGG工具文档
配置对比
| 老服务配置项 |
新服务配置项 |
备注 |
| gg.handlerlist |
gg.handlerlist |
不需修改,仍然为ggdatahub |
| gg.handler.ggdatahub.type |
gg.handler.ggdatahub.type |
不需修改,仍然为com.aliyun.odps.ogg.handler.datahub.DatahubHandler |
| gg.classpath |
gg.classpath |
YOUR_DATAHUB_HANDLER_DIRECTORY/datahub_lib/改为{YOUR_HOME}/datahub-ogg-plugin/lib/ |
除以上配置外,其他DataHub相关配置均独立到configure.xml文件配置,具体含义请参看新版OGG工具文档。