Flume学习笔记
Flume [fluːm] (引水槽、运河)
王洪亮 2021/05/07 15:05
1. 简介
大数据技术解决的三个问题,海量数据的传输、存储和计算。Flume属于传输框架。专门传输日志的,多媒体不行。
2.安装
官网 http://flume.apache.org/ 下载apache-flume-1.9.0-bin.tar.gz,下载后解压即可,类似tomcat中间件,需要改配置文件 , 把apache-flume-1.9.0-bin/conf/flume-env.sh.template改成flume-env.sh,里面配置jdk地址,安装完毕。
export JAVA_HOME=D:\sde\Java\jdk1.8.0_131
3.Hello Flume样例
flume是按任务启动的,这个任务在flume这里叫agent,每个agent分为三块,source(对接数据来源)、channel(中间缓冲区)、sink(数据去向)。任务过程简述就是从一个地方采集数据后送到其他地方。
此次案例source数据采集自端口,通过channel再经过sink将数据输出到终端屏幕上。
开始编写Hello Flume样例(win环境)
- 编写配置文件,参考官网例子
习惯上把任务的配置文件和conf区分开,这里新建jobs目录下新建netcat-flume-logger.conf,命名格式最好区分出source和sink的类型
#完整的conf分几块,变量声明区、source区、interceptor区、channelSelector区、channel区、sink区、关系绑定区 #变量声明区(必须) a1指的是agent的变量名,启动时指定 #分别声明 sources、channels、sinks、sinkgroups等 a1.sources = r1 a1.channels = c1 a1.sinks = k1 #source区(必须) 指定source类型 具体类型需要看官网文档source 部分 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 #interceptor区(可选),代码自写,配合multiplexing channelSelector使用 #channelSelector区(可选),不写默认是replicating,把数据发给每个sink a1.sources.r1.selector.type=replicating #channel区(必须) 指定channel类型 具体类型需要看官网文档channel部分 a1.channels.c1.type = memory #sink处理器区,声明sink组,选择处理器是故障转移还是负载均衡,不写就是默认 #sink区(必须) 指定sink类型 具体类型需要看官网文档sink部分 a1.sinks.k1.type = logger #关系绑定区(必须) a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- 启动flume的agent
#win环境启动命令 进入到bin目录 cmd flume-ng agent --conf ../conf --conf-file ../jobs/netcat-flume-logger.conf --name a1 -property flume.root.logger=INFO,console #简写 flume-ng agent -c ../conf -f ../jobs/netcat-flume-logger.conf -n a1 -property flume.root.logger=INFO,console #linux环境启动命令 那个logger不一样 #-Dflume.root.logger=INFO,console
- 启动结果显示端口已监听
[INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:166)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
- 通过netcat工具向端口发送数据
没有netcat telnet 也可以
nc localhost 44444 > Hello Flume OK
- 查看flume的日志输出
[INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 48 65 6C 6C 6F 20 46 6C 75 6D 65 Hello Flume }
4.Source种类
拉出几个常用的。其他的查看官方文档
1.Avro Source
Flume集群使用,多个Fulme连接需要使用Avro的source和sink连接,自身相当于服务端,需要优先启动
#相关source区 写法 a1.sources.r1.type=avro a1.sources.r1.bind=0.0.0.0 a1.sources.r1.port=4141
2. Exec Source
用命令监控某个文件的变化,将变化的内容传送走,适用于监控一个实时变化的日志文件,缺点是不能断点续传,容易数据丢失或是数据重复
#相关source区 写法 a1.sources.r1.type=exec #后跟 监控文件的命令 a1.sources.r1.command=tail -F /var/log/secure
3. Spooling Directory Source
监控本地文件夹,文件夹中新增文件时,会将整个文件内容取走,并加后缀做标记(.COMPLETED),表示已经取过了,标记后的文件不在处理,内容变化也不管。适用于批量上传,缺点是不能适应动态变化的文件
#相关source区 写法 a1.sources.r1.type=spooldir #后跟监控文件的目录 a1.sources.r1.spoolDir=/var/log/apache/flumeSpool
4. Taildir Source
可以监控不同文件夹下的不同文件。支持断点续传
#相关source区 写法 a1.sources.r1.type = TAILDIR #指定保存断点续传的文件 a1.sources.r1.positionFile = /var/log/flume/position.json #指定文件组 可以多个 a1.sources.r1.filegroups = f1 f2 #指定 单个文件 a1.sources.r1.filegroups.f1 = /var/log/test1/example.log #指定多个文件 * 不能出现在第一位 a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
5. NetCat TCP Source
监听TCP端口用的,还有个UDP Source 就type不一样
#相关source区 写法 udp 的type是netcatudp a1.sources.r1.type=netcat a1.sources.r1.bind=0.0.0.0 a1.sources.r1.port=6666
5.Channel种类
常用的Memory Channel和File Channel
1. Memory Channel
将数据缓存在内存中,快,闪断丢数据
#channel区 a1.channels.c1.type=memory
2. File Channel
将数据缓存在本地文件中,慢点,稳定
#channel区 a1.channels.c1.type=file #数据存放的目录,可以有多个目录,逗号分割 a1.channels.c1.dataDirs=/mnt/flume/data
6.Sink种类
常用的拉出
1. Avro Sink
Flume集群收尾连接,相对source 而言这个算客户端,去绑定服务端IP和端口
#sink 区 a1.sinks.k1.type = avro a1.sinks.k1.hostname = 10.10.10.10 a1.sinks.k1.port = 4545
2. Logger Sink
日志打印在终端屏幕上,需要agent启动的时候搭配对应命令
问题:中文没打出来;maxBytesToLog这玩意不好使
#sink区 #-property flume.root.logger=INFO,console #-Dflume.root.logger=INFO,console a1.sinks.k1.type=logger
3. File Roll Sink
日志输出到本地文件,问题是就算没有接收到数据,也会不停的生成新的空文件
#sink区 a1.sinks.k2.type = file_roll #指定生成文件夹路径,文件名会自动生成 a1.sinks.k2.sink.directory = ../data/ #指定新文件生成频率,30s a1.sinks.k2.sink.rollInterval = 30
7.Channel Selector
这个就俩,算自定义的三个
1. Replicating Channel Selector (default)
默认的,可以不写,把数据发给所有sink
#channel selector 区 a1.sources.r1.selector.type=replicating
2. Multiplexing Channel Selector
根据拦截器规则,选择发送到哪个sink
#channel selector 区 a1.sources.r1.selector.type = multiplexing #这个关键变量,拦截器里写的,写什么是什么,不一定是state a1.sources.r1.selector.header = state #根据这个state的值选择sink,如果header中的state=CZ就发个c1 a1.sources.r1.selector.mapping.CZ = c1 # 等于US就给c2 c3 a1.sources.r1.selector.mapping.US = c2 c3
8.Sink Processor
目前算默认的三种,默认的数据给谁,谁就发走
1. Failover Sink Processor
故障转移,根据优先级,可着最高优先级的用,挂了再用优先级低的
#sink处理器区 #这个算声明,可以拿到声明区 a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 #选择处理器类型为 故障转移 a1.sinkgroups.g1.processor.type = failover #指定sink组成员优先级 a1.sinkgroups.g1.processor.priority.k1 = 5 #最高等级k2一直使用,直到挂了为止,才能轮到k1 a1.sinkgroups.g1.processor.priority.k2 = 10
2. Load balancing Sink Processor
负载均衡,可选随机或是均衡发送
#sink处理器区 #这个算声明,可以拿到声明区 a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 #选择处理器类型为 负载均衡 a1.sinkgroups.g1.processor.type = load_balance #指定模式为随机发送,还可以选round_robin均衡发送 a1.sinkgroups.g1.processor.selector = random
9.自定义Inteceptor
idea新建maven项目开整
\> hello
1. 引入依赖
<dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> </dependency>
2. 写自己的拦截器
package com.flume; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.util.HashMap; import java.util.List; import java.util.Map; public class MyInterceptor implements Interceptor { @Override public void initialize() { System.out.println("---------initialize---------"); } @Override public Event intercept(Event event) { String body = new String(event.getBody()); Map<String, String> map = new HashMap<>(); if(body.contains("hello")){ map.put("myStatus","up"); event.setHeaders(map); }else{ map.put("myStatus","down"); event.setHeaders(map); } return event; } @Override public List<Event> intercept(List<Event> list) { return null; } @Override public void close() {} public static class MyBuilder implements Interceptor.Builder{ @Override public Interceptor build() { return new MyInterceptor(); } @Override public void configure(Context context) {} } }
3. 项目打个jar包放到Flume的lib目录下
4. 编写Flume配置文件
netcat-flume-doubleLogger.conf,本来想两个sink都发到logger的,后来发现终端上分不出哪个是哪个,把其中一个sink改到落到本地文件了
#变量区 a1.sources = r1 a1.channels = c1 c2 a1.sinks = k1 k2 #source区 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 #intercetor区 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.flume.MyInterceptor$MyBuilder #channel selector 区 a1.sources.r1.selector.type = multiplexing #我代码里定义的是 myStatus a1.sources.r1.selector.header = myStatus #myStatus=up就发个c1 a1.sources.r1.selector.mapping.up = c1 # 等于down就给c2 a1.sources.r1.selector.mapping.down = c2 #channel区 a1.channels.c1.type = memory a1.channels.c2.type = memory #sink区 a1.sinks.k1.type = logger a1.sinks.k2.type = file_roll a1.sinks.k2.sink.directory = ../data/ #绑定区 a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
5. 启动
flume-ng agent --conf ../conf --conf-file ../jobs/netcat-flume-doubleLogger.conf --name a1 -property flume.root.logger=INFO,console
6. 测试效果
启动netcat发送命令
> nc localhost 44444 6351 OK hello OK
一个在终端屏幕上有显示,一个在本地文件中有显示。
10自定义Source
1. 依赖
<dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> </dependency>
2. 代码
package com.flume; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; import org.apache.flume.event.SimpleEvent; import org.apache.flume.source.AbstractSource; public class MySource extends AbstractSource implements Configurable, PollableSource { private String aaa; private String bbb; /** * 定义配置文件中的变量 * @param context */ @Override public void configure(Context context) { aaa = context.getString("aaa"); //没取到值 就给默认值 bbb = context.getString("bbb", "GangZi"); } @Override public Status process(){ Status status = null; try { //创建个事件 Event e = new SimpleEvent(); String content = aaa+"->"+bbb+": "+Math.random(); e.setBody(content.getBytes("utf-8")); //把event传给channel getChannelProcessor().processEvent(e); status = Status.READY; } catch (Throwable t) { status = Status.BACKOFF; if (t instanceof Error) { throw (Error)t; } } try { //睡两秒 要不刷的太快了 Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return status; } @Override public long getBackOffSleepIncrement() { return 0; } @Override public long getMaxBackOffSleepInterval() { return 0; } }
3. 写配置文件custom-flume-logger.conf
#变量区 a1.sources = r1 a1.channels = c1 a1.sinks = k1 #source区 a1.sources.r1.type = com.flume.MySource a1.sources.r1.aaa = LiLei #a1.sources.r1.bbb = HanMeiMei #channel区 a1.channels.c1.type = memory #sink区 a1.sinks.k1.type = logger a1.sinks.k1.maxBytesToLog = 18 #绑定区 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
4. 启动命令
flume-ng agent --conf ../conf --conf-file ../jobs/custom-flume-logger.conf --name a1 -property flume.root.logger=INFO,console
5. 查看效果
[INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 4C 69 4C 65 69 2D 3E 47 61 6E 67 5A 69 3A 20 30 LiLei->GangZi: 0 }
11自定义Sink
1. 依赖
<dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> </dependency>
2. 代码
package com.flume; import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; public class MySink extends AbstractSink implements Configurable { private String ccc; private String ddd; @Override public void configure(Context context) { ccc = context.getString("ccc"); ddd = context.getString("ddd","xxxxxxxx"); } @Override public Status process() throws EventDeliveryException { Status status = null; Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try { Event event = ch.take(); if(event == null){ txn.rollback(); status = Status.BACKOFF; }else { byte[] body = event.getBody(); String content = new String(body); System.out.println("ccc="+ccc); System.out.println("content="+content); System.out.println("ddd="+ddd); txn.commit(); status = Status.READY; } } catch (Exception e) { e.printStackTrace(); txn.rollback(); status = Status.BACKOFF; }finally { txn.close(); } return status; } }
3. flume配置文件netcat-flume-custom.conf
#变量区 a1.sources = r1 a1.channels = c1 a1.sinks = k1 #source区 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 #channel区 a1.channels.c1.type = memory #sink区 a1.sinks.k1.type = com.flume.MySink a1.sinks.k1.ccc =--hello-- a1.sinks.k1.ddd =--moto-- #绑定区 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
4. 启动agent
flume-ng agent --conf ../conf --conf-file ../jobs/netcat-flume-custom.conf --name a1 -property flume.root.logger=INFO,console
5. 客户端发送命令
telnet localhost 44444 > 1 OK
6. flume终端打印
ccc=--hello-- content=1 ddd=--moto--
12.Flume监控工具
下面的实验,windows环境下怎么跟多个参数 不知道
1.Http监控,json形式文本
使用这种监控方式,只需要在启动flume的时候在启动参数上面加上监控配置,例如这样:
bin/flume-ng agent --conf conf --conf-file conf/flume_conf.properties --name collect -Dflume.monitoring.type=http -Dflume.monitoring.port=1234
其中-Dflume.monitoring.type=http表示使用http方式来监控,后面的-Dflume.monitoring.port=1234表示我们需要启动的监控服务的端口号为1234,这个端口号可以自己随意配置。然后启动flume之后,通过http://ip:1234/metrics就可以得到flume的一个json格式的监控数据。
2.ganglia监控,图形化界面
这种监控方式需要先安装ganglia然后启动ganglia,然后再启动flume的时候加上监控配置,例如:
bin/flume-ng agent --conf conf --conf-file conf/producer.properties --name collect -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=ip:port
其中-Dflume.monitoring.type=ganglia表示使用ganglia的方式来监控,而-Dflume.monitoring.hosts=ip:port表示ganglia安装的ip和启动的端口号。
flume监控还可以使用zabbix,但是这种方式需要在flume源码中添加监控模块,相对比较麻烦,由于不是flume自带的监控方式,这里不讨论这种方式。
因此,flume自带的监控方式其实就是http、ganglia两种,http监控只能通过一个http地址访问得到一个json格式的监控数据,而ganglia监控是拿到这个数据后用界面的方式展示出来了,相对比较直观。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
数栈技术文章分享:你居然是这样的initialValue
先说一下写着篇文章的契机,是因为回显,复杂表单的回显,让我觉得我对initialValue这个属性是有误解的。 一、initialValue的出处和定义 initialValue的出处: AntDesign/Form表单件/getFieldDecorator(id,options)装饰器函数/第二个参数options/options.initialValue。 链接地址:https://ant.design/components/form-cn/#getFieldDecorator(id,-options)-%E5%8F%82%E6%95%B0 关于属性initialValue,官方的解释如下: 关键字是“子节点的初始值”,初始值也就是默认值,比如Form中有一个城市的选择器,默认选择“杭州”,那么initialValue就是杭州对应的value。 所以其实我一直以为initialValue是defaultValue一样的存在。 二、initialValue和defaultValue的区别 1. defaultValue的例子 importReact,{Component,Fragme...
- 下一篇
Furion 让开发者重新认识了 .NET,v2.4.0 发布
让 .NET 开发更简单,更通用,更流行。 庆祝 5K 说点 自 2020年09月01日 发布以来,Furion 一直高速发展,Stars 趋势图也勾勒出了指数增长的线条美,截至今日,诞生 7个月12天。 今天,Furion 项目在 Gitee 平台突破了 5K Stars,QQ 交流群成员达 6200 +,Nuget 下载破 260K。或许5K Stars 对 Java 项目来说只是个小目标,但对国内.NET 开源项目来说,无疑是梦想中的目标。 当然 Stars 的多少并不能决定项目优秀与否,但是从侧面也能反映出 .NET 正在崛起。 作者贡献度: https://gitee.com/monksoul 项目概况图: https://gitee.com/dotnetchina/Furion Stars 趋势图: https://whnb.wang/dotnetchina/Furion?e=43200 贡献者画像: https://giteye.net/chart/ZS49EPL6 框架文档: https://dotnetchina.gitee.io/furion/ 本期更新 新特性 [...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2全家桶,快速入门学习开发网站教程
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS8编译安装MySQL8.0.19
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Red5直播服务器,属于Java语言的直播服务器