Flume数据导入ODPS方法

一、简介

 Apache Flume是一个分布式的、可靠的、可用的系统,可用于从不同的数据源中高效地收集、聚合和移动海量日志数据到集中式数据存储系统。
 ODPS Sink是基于ODPS DataHub Service开发的Flume插件,可以将Flume的Event数据导入到ODPS中。插件兼容Flume的原有功能特性,支持ODPS表自定义分区、且可以自动创建分区。

二、环境要求
1、JDK(1.6以上,推荐1.7)
2、Flume-NG 1.x

三、插件部署
1、下载ODPS Sink插件并解压:aliyun-odps-flume-plugin
2、Flume-NG 1.x下载:https://flume.apache.org/download.html
(1)下载 apache-flume-1.6.0-bin.tar.gz
(2)下载apache-flume-1.6.0-src.tar.gz
3、Flume的安装
(1) 解压apache-flume-1.6.0-src.tar.gz和apache-flume-1.6.0-bin.tar.gz
(2) 将apache-flume-1.6.0-src中的文件复制到apache-flume-1.6.0-bin中
4、部署ODPS Sink插件:将文件夹odps_sink移动到Apache Flume安装目录下:
$ mkdir {YOUR_APACHE_FLUME_DIR}/plugins.d
$mv odps_sink/ { YOUR_APACHE_FLUME_DIR }/plugins.d/
移动后,核验ODPS Sink插件是否已经在相应目录:
$ ls { YOUR_APACHE_FLUME_DIR}/plugins.d
odps_sink
部署完成后,只需要在Flume的配置文件中将sink的type字段配置为:
com.aliyun.odps.flume.sink.OdpsSink
即可使用

四、配置示例
例:将日志文件中的结构化数据进行解析,并上传到ODPS表中
需要上传的日志文件格式如下(每行为一条记录,字段之间逗号分隔):

test_basic.log

some,log,line1
some,log,line2
...
第一步、在ODPS 的 project创建ODPS Datahub表
建表语句如下所示:
CREATE TABLE hub_table_basic (col1 STRING, col2 STRING)

PARTITIONED BY (pt STRING)
INTO 1 SHARDS
HUBLIFECYCLE 1;

第二步、创建Flume作业配置文件:
在Flume安装目录的conf/文件夹下创建名为odps_basic.conf的文件,并输入内容如下:

odps_basic.conf

A single-node Flume configuration for ODPS

Name the components on this agent

a1.sources = r1
a1.sinks = k1
a1.channels = c1

Describe/configure the source

a1.sources.r1.type = exec
a1.sources.r1.command = cat {YOUR_LOG_DIRECTORY}/test_basic.log

Describe the sink

a1.sinks.k1.type = com.aliyun.odps.flume.sink.OdpsSink
a1.sinks.k1.accessID = {YOUR_ALIYUN_ODPS_ACCESS_ID}
a1.sinks.k1.accessKey = {YOUR_ALIYUN_ODPS_ACCESS_KEY}
a1.sinks.k1.odps.endPoint = http://service.odps.aliyun.com/api
a1.sinks.k1.odps.datahub.endPoint = http://dh.odps.aliyun.com
a1.sinks.k1.odps.project = {YOUR_ALIYUN_ODPS_PROJECT}
a1.sinks.k1.odps.table = hub_table_basic
a1.sinks.k1.odps.partition = 20150814
a1.sinks.k1.batchSize = 100
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ,
a1.sinks.k1.serializer.fieldnames = col1,,col2
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.shard.number = 1
a1.sinks.k1.shard.maxTimeOut = 60
a1.sinks.k1.autoCreatePartition = true

Use a channel which buffers events in memory

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000

Bind the source and sink to the channel

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

第三步:启动Flume
启动Flume并指定agent的名称和配置文件路径,-Dflume.root.logger=INFO,console选项可以将日志实时输出到控制台。
$ cd { YOUR_APACHE_FLUME_DIR}
$ bin/flume-ng agent -n a1 -c conf -f conf/odps_basic.conf -Dflume.root.logger=INFO,console
写入成功,显示日志如下:
...
Write success. Event count: 2
...
在ODPS Datahub表中即可查到数据;

多数据源上传到ODPS
多个数据上传到odps,只需要配置对应的source和channel,可以有一下几种上传方式:
(1) 多个source和一个channel和一个sink
screenshot

(2) 多个source和多个channel和一个sink
screenshot

(3) 多个source,多个channel和多个sink,输出到多个地方存储
screenshot

(4)多个agent的复杂情况:
screenshot

下面给出(1)中情况的配置:

odps_basic.conf

A single-node Flume configuration for ODPS

Name the components on this agent

a1.sources = r1 r2
a1.sinks = k1
a1.channels = c1

Describe/configure the source

a1.sources.r1.type = exec
a1.sources.r1.command = cat {YOUR_LOG_DIRECTORY}/test_basic.log

source2的配置

a1.sources.r2.type = exec
a1.sources.r2.command = cat {YOUR_LOG_DIRECTORY}/test_basic2.log

Describe the sink

a1.sinks.k1.type = com.aliyun.odps.flume.sink.OdpsSink
a1.sinks.k1.accessID = {YOUR_ALIYUN_ODPS_ACCESS_ID}
a1.sinks.k1.accessKey = {YOUR_ALIYUN_ODPS_ACCESS_KEY}
a1.sinks.k1.odps.endPoint = http://service.odps.aliyun.com/api
a1.sinks.k1.odps.datahub.endPoint = http://dh.odps.aliyun.com
a1.sinks.k1.odps.project = {YOUR_ALIYUN_ODPS_PROJECT}
a1.sinks.k1.odps.table = hub_table_basic
a1.sinks.k1.odps.partition = 20150814
a1.sinks.k1.batchSize = 100
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ,
a1.sinks.k1.serializer.fieldnames = col1,,col2
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.shard.number = 1
a1.sinks.k1.shard.maxTimeOut = 60
a1.sinks.k1.autoCreatePartition = true

Use a channel which buffers events in memory

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000

Bind the source and sink to the channel

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

source2的channel

a1.sources.r2.channels = c2

可能遇到的问题:
1、在数据sink阶段报错,数据无法传递
screenshot
这个错误是由于数据的最上面加了一行注释,它默认读取改行导致数据的行数与配置文件中 配置的行数不一致,所以报上面这个错,删出上面的注释行问题就解决了。

2、OOM 问题:
flume 报错:
java.lang.OutOfMemoryError: GC overhead limit exceeded
或者:
java.lang.OutOfMemoryError: Java heap space
Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.OutOfMemoryError: Java heap space
Flume 启动时的最大堆内存大小默认是 20M,线上环境很容易 OOM,因此需要你在 flume-env.sh 中添加 JVM 启动参数:

JAVA_OPTS="-Xms8192m -Xmx8192m -Xss256k -Xmn2g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"
然后在启动 agent 的时候一定要带上 -c conf 选项,否则 flume-env.sh 里配置的环境变量不会被加载生效

优秀的个人博客,低调大师

微信关注我们

原文链接:https://yq.aliyun.com/articles/58249

转载内容版权归作者及来源网站所有!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

相关文章

发表评论

资源下载

更多资源
优质分享Android(本站安卓app)

优质分享Android(本站安卓app)

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

Apache Tomcat7、8、9(Java Web服务器)

Apache Tomcat7、8、9(Java Web服务器)

Tomcat是Apache 软件基金会(Apache Software Foundation)的Jakarta 项目中的一个核心项目,由Apache、Sun 和其他一些公司及个人共同开发而成。因为Tomcat 技术先进、性能稳定,而且免费,因而深受Java 爱好者的喜爱并得到了部分软件开发商的认可,成为目前比较流行的Web 应用服务器。

Eclipse(集成开发环境)

Eclipse(集成开发环境)

Eclipse 是一个开放源代码的、基于Java的可扩展开发平台。就其本身而言,它只是一个框架和一组服务,用于通过插件组件构建开发环境。幸运的是,Eclipse 附带了一个标准的插件集,包括Java开发工具(Java Development Kit,JDK)。

Sublime Text 一个代码编辑器

Sublime Text 一个代码编辑器

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。