flume 单机问题解决与架构更改
[TOC]
引言
今天针对线上生产环境下单机 flume 拉取kafka数据并存储数据入Hdfs 出现大批量数据延迟. 在网上官网各种搜索数据,并结合官网数据,现进行以下总结
1. 线上单机存在问题简述
当前flume拉取kafa数据量并不大 ,根据flume客户端日志 ,每半分钟hdfs文件写入一次数据生成文件
发现问题:
**拉取kafka数据过慢**
2. 解决思路
- 加大kafka拉取数据量
- 加大flume中channel,source,sink 各通道的单条数据量
- 将flume拉取数据单机版本改成多数据拉取,通过flume-avore-sink-> flume-avore-source 进行数据多数据采取并合并
3 加大kafka拉取数据量
3.1 kafka-source简述
- flume 输入单线程拉取数据并将数据发送内置channel并通过sink组件进行数据转发和处理,故对于kafka集群多副本方式拉取数据的时候,应适当考虑多个flume节点拉取kafka多副本数据,以避免flume节点在多个kafka集群副本中轮询。加大flume拉取kafka数据的速率。
- flume-kafka-source 是flume内置的kafka source数据组件,是为了拉取kafka数据,配置如下:
agent.sources = r1 agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource agent.sources.r1.batchSize = 50000 agent.sources.r1.batchDurationMillis = 2000 agent.sources.r1.kafka.bootstrap.servers = test-hadoop01:9092 agent.sources.r1.kafka.topics = topicTest agent.sources.r1.kafka.consumer.group.id = groupTest
- flume-kafka-source 的offset是交由zk集群去维护offset
3.2 kafka-source配置详解
Kafka Source是一个Apache Kafka消费者,它从Kafka主题中读取消息。 如果您正在运行多个Kafka源,则可以使用相同的使用者组配置它们,以便每个源都读取一组唯一的主题分区。
Property Name | Default | Description |
---|---|---|
channels | – | 配置的channels 可配置多个channels 后续文章会说到 |
type | – | org.apache.flume.source.kafka.KafkaSource |
kafka.bootstrap.servers | – | 配置kafka集群地址 |
kafka.consumer.group.id | flume | 唯一确定的消费者群体。 在多个源或代理中设置相同的ID表示它们是同一个使用者组的一部分 |
kafka.topics | – | 你需要消费的topic |
kafka.topics.regex | – | 正则表达式,用于定义源订阅的主题集。 此属性的优先级高于kafka.topics ,如果存在则覆盖kafka.topics 。 |
batchSize | 1000 | 一批中写入Channel的最大消息数 (优化项) |
batchDurationMillis | 1000 | 将批次写入通道之前的最长时间(以毫秒为单位)只要达到第一个大小和时间,就会写入批次。(优化项) |
backoffSleepIncrement | 1000 | Kafka主题显示为空时触发的初始和增量等待时间。 等待时间将减少对空kafka 主题的激进ping操作。 一秒钟是摄取用例的理想选择,但使用拦截器的低延迟操作可能需要较低的值。 |
maxBackoffSleep | 5000 | Kafka主题显示为空时触发的最长等待时间。 5秒是摄取用例的理想选择,但使用拦截器的低延迟操作可能需要较低的值。 |
useFlumeEventFormat | false | 默认情况下,事件从Kafka主题直接作为字节直接进入事件主体。 设置为true以将事件读取为Flume Avro二进制格式。 与KafkaSink上的相同属性或Kafka Channel上的parseAsFlumeEvent属性一起使用时,这将保留在生成端发送的任何Flume标头。 |
setTopicHeader | true | 设置为true时,将检索到的消息的主题存储到标题中,该标题由topicHeader 属性定义。 |
topicHeader | topic | 如果setTopicHeader 属性设置为true ,则定义用于存储接收消息主题名称的标题的名称。 如果与Kafka SinktopicHeader 属性结合使用,应该小心,以避免在循环中将消息发送回同一主题。 |
migrateZookeeperOffsets | true | 如果找不到Kafka存储的偏移量,请在Zookeeper中查找偏移量并将它们提交给Kafka。 这应该是支持从旧版本的Flume无缝Kafka客户端迁移。 迁移后,可以将其设置为false,但通常不需要这样做。 如果未找到Zookeeper偏移量,则Kafka配置kafka.consumer.auto.offset.reset定义如何处理偏移量。 查看[Kafka文档](http://kafka.apache.org/documentation.html#newconsumerconfigs)了解详细信息 |
kafka.consumer.security.protocol | PLAINTEXT | 如果使用某种级别的安全性写入Kafka,则设置为SASL_PLAINTEXT,SASL_SSL或SSL。 |
Other Kafka Consumer Properties | – | 这些属性用于配置Kafka Consumer。 可以使用Kafka支持的任何消费者财产。 唯一的要求是在前缀为“kafka.consumer”的前缀中添加属性名称。 例如:kafka.consumer.auto.offset.reset |
注意:
Kafka Source会覆盖两个Kafka使用者参数:source.com将auto.commit.enable设置为“false”,并提交每个批处理。 Kafka源至少保证一次消息检索策略。 源启动时可以存在重复项。 Kafka Source还提供了key.deserializer(org.apache.kafka.common.serialization.StringSerializer)和value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer)的默认值。 不建议修改这些参数。官方配置示例:
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource tier1.sources.source1.channels = channel1 tier1.sources.source1.batchSize = 5000 tier1.sources.source1.batchDurationMillis = 2000 tier1.sources.source1.kafka.bootstrap.servers = localhost:9092 tier1.sources.source1.kafka.topics = test1, test2 tier1.sources.source1.kafka.consumer.group.id = custom.g.id Example for topic subscription by regex tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource tier1.sources.source1.channels = channel1 tier1.sources.source1.kafka.bootstrap.servers = localhost:9092 tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$ # the default kafka.consumer.group.id=flume is used
本案例kafka-source配置
agent.sources = r1 agent.sources.r1.channels=c1 agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource agent.sources.r1.batchSize = 50000 agent.sources.r1.batchDurationMillis = 2000 agent.sources.r1.kafka.bootstrap.servers = test-hadoop01:9092 agent.sources.r1.kafka.topics = topicTest agent.sources.r1.kafka.consumer.group.id = groupTest
官网配置文件地址 : kafka-source
3.3 配置优化
主要是在放入flume-channels 的批量数据加大
更改参数: agent.sources.r1.batchSize = 50000
agent.sources.r1.batchDurationMillis = 2000
更改解释:
**即每2秒钟拉取 kafka 一批数据 批数据大小为50000 放入到flume-channels 中 。即flume该节点 flume-channels 输入端数据已放大**
更改依据:
- 需要配置kafka单条数据 broker.conf 中配置
message.max.bytes
- 当前flume channel sink 组件最大消费能力如何?
4. 加大flume中channel,source,sink 各通道的单条数据量
4.1 source 发送至channels 数据量大小已配置 见 3.3
4.2 channel 配置
Property Name | Default | Description |
---|---|---|
type | – | The component type name, needs to be memory |
capacity | 100 | 通道中存储的最大事件数 (优化项) |
transactionCapacity | 100 | 每个事务通道从源或提供给接收器的最大事件数 (优化项) |
keep-alive | 3 | 添加或删除事件的超时(以秒为单位) |
byteCapacityBufferPercentage | 20 | 定义byteCapacity与通道中所有事件的估计总大小之间的缓冲区百分比,以计算标头中的数据。 见下文。 |
byteCapacity | see description | 允许的最大总字节作为此通道中所有事件的总和。 实现只计算Eventbody ,这也是提供byteCapacityBufferPercentage 配置参数的原因。 默认为计算值,等于JVM可用的最大内存的80%(即命令行传递的-Xmx值的80%)。 请注意,如果在单个JVM上有多个内存通道,并且它们碰巧保持相同的物理事件(即,如果您使用来自单个源的复制通道选择器),那么这些事件大小可能会因为通道byteCapacity目的而被重复计算。 将此值设置为“0”将导致此值回退到大约200 GB的内部硬限制。 |
配置 capacity 和 transactionCapacity 值 。默认配置规则为:
$$ channels.capacity >= channels.transactionCapacity >= source.batchSize $$
官方channels配置示例
a1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000
本案例修改之后的channels 配置
agent.channels.c1.type = memory agent.channels.c1.capacity=550000 agent.channels.c1.transactionCapacity=520000
5. 将flume拉取数据单机版本改成多数据拉取,通过flume-avore-sink-> flume-avore-source 进行数据多数据采取并合并
5.1 存在问题
通过上续修改会发现单机版本的flume会在多副本kafka轮询造成效率浪费
单机版本flume处理数据时会存在单机瓶颈,单机channels可能最多只能处理最大数据无法扩充
单机flume配置多个数据源不方便,不能适合后续多需求开发
5.2 修改架构
5.3采集节点配置文件
收集节点配置(3台):
agent.sources = r1 agent.channels = c1 agent.sinks = k1 agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource agent.sources.r1.batchSize = 50000 agent.sources.r1.batchDurationMillis = 2000 agent.sources.r1.kafka.bootstrap.servers = qcloud-test-hadoop03:9092 agent.sources.r1.kafka.topics = topicTest agent.sources.r1.kafka.consumer.group.id = groupTest agent.channels.c1.type = memory agent.channels.c1.capacity=550000 agent.channels.c1.transactionCapacity=520000 agent.sinks.k1.type = avro agent.sinks.k1.hostname = test-hadoop03 agent.sinks.k1.port=4545 agent.sources.r1.channels = c1 agent.sinks.k1.channel = c1
汇总节点配置(1台):
agent.sources = r1 agent.channels = memoryChannel agent.sinks = hdfsSink agent.sources.r1.type = avro agent.sources.r1.bind = ip agent.sources.r1.port = 4545 agent.sources.r1.batchSize = 100000 agent.sources.r1.batchDurationMillis = 1000 agent.channels.memoryChannel.type=memory agent.channels.memoryChannel.keep-alive=30 agent.channels.memoryChannel.capacity=120000 agent.channels.memoryChannel.transactionCapacity=100000 agent.sinks.hdfsSink.type=hdfs agent.sinks.hdfsSink.hdfs.path=hdfs://nameser/data/hm2/%Y-%m-%d-%H agent.sinks.hdfsSink.hdfs.writeFormat=Text agent.sinks.hdfsSink.hdfs.rollCount = 0 agent.sinks.hdfsSink.hdfs.rollSize = 134217728 agent.sinks.hdfsSink.hdfs.rollInterval = 60 agent.sinks.hdfsSink.hdfs.fileType=DataStream agent.sinks.hdfsSink.hdfs.idleTimeout=65 agent.sinks.hdfsSink.hdfs.callTimeout=65000 agent.sinks.hdfsSink.hdfs.threadsPoolSize=300 agent.sinks.hdfsSink.channel = memoryChannel agent.sources.r1.channels = memoryChannel
5.4 架构注意点
- 当前架构需要保证聚合节点机器的性能
- 当前架构新的瓶颈可能会存在存储Hdfs数据时过慢 ,导致聚合节点Channels 占用率居高不下,导致堵塞 。
- 需要关注avro 自定义source sink 的发送效率
6.flume 监控工具(http)
flume 监控工具总共有三种方式 ,我们这里为方便简单,使用内置http接口监控方式进行操作
6.1 配置
在启动脚本处设置 参数 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545
即可
6.2 访问 地址 :
6.3 返回结果示例 和字段解释 :
{ "CHANNEL.memoryChannel": { "ChannelCapacity": "550000", "ChannelFillPercentage": "0.18181818181818182", "Type": "CHANNEL", "ChannelSize": "1000", "EventTakeSuccessCount": "33541400", "EventTakeAttemptCount": "33541527", "StartTime": "1536572886273", "EventPutAttemptCount": "33542500", "EventPutSuccessCount": "33542500", "StopTime": "0" }, "SINK.hdfsSink": { "ConnectionCreatedCount": "649", "ConnectionClosedCount": "648", "Type": "SINK", "BatchCompleteCount": "335414", "BatchEmptyCount": "27", "EventDrainAttemptCount": "33541500", "StartTime": "1536572886275", "EventDrainSuccessCount": "33541400", "BatchUnderflowCount": "0", "StopTime": "0", "ConnectionFailedCount": "0" }, "SOURCE.avroSource": { "EventReceivedCount": "33542500", "AppendBatchAcceptedCount": "335425", "Type": "SOURCE", "EventAcceptedCount": "33542500", "AppendReceivedCount": "0", "StartTime": "1536572886465", "AppendAcceptedCount": "0", "OpenConnectionCount": "3", "AppendBatchReceivedCount": "335425", "StopTime": "0" } }
参数定义:
字段名称 | 含义 | 备注 |
---|---|---|
SOURCE.OpenConnectionCount | 打开的连接数 | |
SOURCE.TYPE | 组件类型 | |
SOURCE.AppendBatchAcceptedCount | 追加到channel中的批数量 | |
SOURCE.AppendBatchReceivedCount | source端刚刚追加的批数量 | |
SOURCE.EventAcceptedCount | 成功放入channel的event数量 | |
SOURCE.AppendReceivedCount | source追加目前收到的数量 | |
SOURCE.StartTime(StopTIme) | 组件开始时间、结束时间 | |
SOURCE.EventReceivedCount | source端成功收到的event数量 | |
SOURCE.AppendAcceptedCount | source追加目前放入channel的数量 | |
CHANNEL.EventPutSuccessCount | 成功放入channel的event数量 | |
CHANNEL.ChannelFillPercentage | 通道使用比例 | |
CHANNEL.EventPutAttemptCount | 尝试放入将event放入channel的次数 | |
CHANNEL.ChannelSize | 目前在channel中的event数量 | |
CHANNEL.EventTakeSuccessCount | 从channel中成功取走的event数量 | |
CHANNEL.ChannelCapacity | 通道容量 | |
CHANNEL.EventTakeAttemptCount | 尝试从channel中取走event的次数 | |
SINK.BatchCompleteCount | 完成的批数量 | |
SINK.ConnectionFailedCount | 连接失败数 | |
SINK.EventDrainAttemptCount | 尝试提交的event数量 | |
SINK.ConnectionCreatedCount | 创建连接数 | |
SINK.Type | 组件类型 | |
SINK.BatchEmptyCount | 批量取空的数量 | |
SINK.ConnectionClosedCount | 关闭连接数量 | |
SINK.EventDrainSuccessCount | 成功发送event的数量 | |
SINK.BatchUnderflowCount | 正处于批量处理的batch数 |
参考地址
flume-document : http://flume.apache.org/FlumeUserGuide.html
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
深度解析大快DKM大数据运维管理平台功能
深度解析大快DKM大数据运维管理平台功能之前几周的时间一直是在围绕DKhadoop的运行环境搭建写分享,有一些朋友留言索要了dkhadoop安装包,不知道有没有去下载安装一探究竟。关于DKHadoop下载安装基本已经讲清楚了,这几天有点空闲把大快DKM大数据运维管理平台的内容整理了一些,作为DKHadoop相配套的管理平台,是有必要对DKM有所了解的。DKM 是DKHadoop管理平台。作为大数据平台端到端Apache Hadoop 的管理应用,DKM 对 DKH 的每个部件都提供了细粒度的可视化和控制。通过DKM ,运维人员是可以提高集群的性能,提升服务质量,提高合规性并降低管理成本。DKM 设计的目的是为了使得对于企业数据中心的管理变得简单和直观。通过DKM ,可以方便地部署,并且集中式的操作完整的大数据软件栈。该应用软件会自动化安装过程,从而减少了部署集群的时间。通过DKM 可以提供一个集群范围内的节点实时运行状态视图。同时,还提供了一个中央控制台,可以用于配置集群。总结DKM 能够提供的功能主要有以下几点:1.自动化Hadoop 安装过程,大幅缩短部署时间;2.提供实时的集群概...
- 下一篇
资源消耗降低2/3,Flink在唯品会实时平台的应用
本文主要内容包括以下几个方面: ●唯品会实时平台现状; ●Flink在唯品会的实践; ●Flink On K8S; ●最新项目进展。 一、唯品会实时平台现状 目前在唯品会,实时平台并不是一个统一的计算框架,而是包括Storm、Spark、Flink在内的三个主要计算框架。由于历史原因,当前在Storm平台上的job数量是最多的,但是从去年开始,业务重心逐渐切换到Flink上面,所以今年在Flink上面的应用数量有了大幅增加。 实时平台的核心业务包含八大部分: ●RTRS(实时推荐引擎):实时推荐作为电商的重点业务,包含多个实时特征; ●Dataeye(大促看板):包含各种维度的统计指标(例如:各种维度的订单、UV、转化率、漏斗等),供领导层、运营、产品决策使用; ●实时数据清洗:从用户埋点收集来数据,进行实时清洗和关联,为下游的各个业务提供更好的数据; ●互联网金融; ●安全风控; ●与友商比价; ●Logview、Mercury、Titan作为内部服务的监控系统; ●VDRC实时数据同步系统等。 共计有1400台机器和600+应用。 图1 实时核心业务 实时平台的职责主要包括实时计算...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
-
Docker使用Oracle官方镜像安装(12C,18C,19C)
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8编译安装MySQL8.0.19
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
推荐阅读
最新文章
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS6,CentOS7官方镜像安装Oracle11G
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Hadoop3单机部署,实现最简伪集群