使用EMR-Flume将非EMR集群的数据同步至EMR集群的HDFS
1.前言
E-MapReduce从3.20.0版本开始对EMR-Flume新增了Log Service Source。借助Log Service的Logtail等工具,可以将需要同步的数据实时采集并上传到LogHub,再使用EMR-Flume将LogHub的数据同步至EMR集群的HDFS。
本文将介绍使用EMR-Flume实时同步Log Service的数据至EMR集群的HDFS,并根据record timestamp将数据存入HDFS相应的partition中。
有关采集数据到Log Service的LogHub的详细方法及步骤参见采集方式。
2.准备工作
创建Hadoop集群,在可选软件中选择Flume,详细步骤参考创建集群。
3.配置Flume
3.1 配置source
配置项 | 值 | 说明 |
---|---|---|
type | org.apache.flume.source.loghub.LogHubSource | |
endpoint | Lohub的endpoint | 如果使用vpc/经典网络的endpoint,要保证与emr集群在同一个region;如果使用公网endpoint,要保证运行Flume agent的节点有公网IP |
project | Lohub的project | |
logstore | Lohub的logstore | |
accessKeyId | Aliyun的access key id | |
accessKey | Aliyun的access key | |
useRecordTime | true | |
consumerGroup | consumer_1 | 消费组名称,默认值为consumer_1 |
配置项说明如下
- useRecordTime
默认值为false。如果header中没有timestamp属性,接收event的时间戳会被加入到header中;
但是在Flume Agent启停或者同步滞后等情况下,会将数据放入错误的时间分区中。为避免这种情况,可以将该值设置为true,使用数据收集到LogHub的时间作为timestamp。
- consumerPosition
消费组在第一次消费LogHub数据时的位置,默认值为end,即从最近的数据开始消费;
可以设置的其他值为begin或special。begin表示从最早的数据开始消费;special表示从指定的时间点开始消费,在配置为special时,需要配置startTime为开始消费的时间点,单位是秒。
首次运行后LogHub服务端会记录消费组的消费点,此时如果想更改consumerPosition,可以清除LogHub的消费组状态,参考消费组状态;或者更改配置consumerGroup为新的消费组。
- heartbeatInterval和fetchInOrder
heartbeatInterval表示消费组与服务端维持心跳的间隔,单位是毫秒,默认为30秒;fetchInOrder表示相同key的数据是否按序消费,默认值为false。
- batchSize和batchDurationMillis
通用的source batch配置,表示触发event写入channel的阈值。
- backoffSleepIncrement和maxBackoffSleep
通用的source sleep配置,表示LogHub没有数据时触发sleep的时间和增量。
3.2配置channel和sink
此处使用memory channel和hdfs sink,hdfs sink配置如下
type | hdfs |
---|---|
hdfs.path | /tmp/flume-data/loghub/datetime=%y%m%d/hour=%H |
hdfs.fileType | DataStream |
hdfs.rollInterval | 3600 |
hdfs.round | true |
hdfs.roundValue | 60 |
hdfs.roundUnit | minute |
hdfs.rollSize | 0 |
hdfs.rollCount | 0 |
memory channel配置如下
type | memory |
---|---|
capacity | 2000 |
transactionCapacity | 2000 |
4.运行Flume agent
在Console页面启动Flume agent的具体操作参见Flume使用说明。启动后,可以看到配置的HDFS路径下按照record timestamp存储的日志数据。
查看Log Service上的消费组状态
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
深度剖析阿里巴巴对Apache Flink的优化与改进
本文主要从两个层面深度剖析:阿里巴巴对Flink究竟做了哪些优化? 取之开源,用之开源 一、SQL层 为了能够真正做到用户根据自己的业务逻辑开发一套代码,能够同时运行在多种不同的场景,Flink首先需要给用户提供一个统一的API。在经过一番调研之后,阿里巴巴实时计算认为SQL是一个非常适合的选择。在批处理领域,SQL已经经历了几十年的考验,是公认的经典。在流计算领域,近年来也不断有流表二象性、流是表的ChangeLog等理论出现。在这些理论基础之上,阿里巴巴提出了动态表的概念,使得流计算也可以像批处理一样使用SQL来描述,并且逻辑等价。这样一来,用户就可以使用SQL来描述自己的业务逻辑,相同的查询语句在执行时可以是一个批处理任务,也可以是一个高吞吐低延迟的流计算任务,甚至是先使用批处理技术进行历史数据的计算,然后自动的转成流计算任务处理
- 下一篇
大数据方面核心技术有哪些?新人必读
大数据技术的体系庞大且复杂,基础的技术包含数据的采集、数据预处理、分布式存储、NoSQL数据库、数据仓库、机器学习、并行计算、可视化等各种技术范畴和不同的技术层面。首先给出一个通用化的大数据处理框架,主要分为下面几个方面:数据采集与预处理、数据存储、数据清洗、数据查询分析和数据可视化。 一、数据采集与预处理 对于各种来源的数据,包括移动互联网数据、社交网络的数据等,这些结构化和非结构化的海量数据是零散的,也就是所谓的数据孤岛,此时的这些数据并没有什么意义,数据采集就是将这些数据写入数据仓库中,把零散的数据整合在一起,对这些数据综合起来进行分析。数据采集包括文件日志的采集、数据库日志的采集、关系型数据库的接入和应用程序的接入等。在数据量比较小的时候,可以写个定时的脚本将日志写入存储系统,但随着数据量的增长,这些方法无法提供数据安全保障,并且运维困难,需要更强壮的解决方案。 Flume NG作为实时日志收集系统,支持在日志系统中定制各类数据发送方,用于收集数据,同时,对数据进行简单处理,并写到各种数据接收方(比如文本,HDFS,Hbase等)。Flume NG采用的是三层架构:Agent层...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,CentOS8安装Elasticsearch6.8.6
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS7设置SWAP分区,小内存服务器的救世主