您现在的位置是:首页 > 文章详情

Flume学习笔记

日期:2021-05-12点击:633

Flume [fluːm] (引水槽、运河)

王洪亮 2021/05/07 15:05

1. 简介

大数据技术解决的三个问题,海量数据的传输、存储和计算。Flume属于传输框架。专门传输日志的,多媒体不行。

2.安装

官网 http://flume.apache.org/ 下载apache-flume-1.9.0-bin.tar.gz,下载后解压即可,类似tomcat中间件,需要改配置文件 , 把apache-flume-1.9.0-bin/conf/flume-env.sh.template改成flume-env.sh,里面配置jdk地址,安装完毕。

export JAVA_HOME=D:\sde\Java\jdk1.8.0_131

3.Hello Flume样例

flume是按任务启动的,这个任务在flume这里叫agent,每个agent分为三块,source(对接数据来源)、channel(中间缓冲区)、sink(数据去向)。任务过程简述就是从一个地方采集数据后送到其他地方。

此次案例source数据采集自端口,通过channel再经过sink将数据输出到终端屏幕上。

开始编写Hello Flume样例(win环境)

  1. 编写配置文件,参考官网例子

习惯上把任务的配置文件和conf区分开,这里新建jobs目录下新建netcat-flume-logger.conf,命名格式最好区分出source和sink的类型

#完整的conf分几块,变量声明区、source区、interceptor区、channelSelector区、channel区、sink区、关系绑定区 #变量声明区(必须) a1指的是agent的变量名,启动时指定 #分别声明 sources、channels、sinks、sinkgroups等 a1.sources = r1 a1.channels = c1 a1.sinks = k1 #source区(必须) 指定source类型 具体类型需要看官网文档source 部分 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 #interceptor区(可选),代码自写,配合multiplexing channelSelector使用 #channelSelector区(可选),不写默认是replicating,把数据发给每个sink a1.sources.r1.selector.type=replicating #channel区(必须) 指定channel类型 具体类型需要看官网文档channel部分 a1.channels.c1.type = memory #sink处理器区,声明sink组,选择处理器是故障转移还是负载均衡,不写就是默认 #sink区(必须) 指定sink类型 具体类型需要看官网文档sink部分 a1.sinks.k1.type = logger #关系绑定区(必须) a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
  1. 启动flume的agent
#win环境启动命令 进入到bin目录 cmd flume-ng agent --conf ../conf --conf-file ../jobs/netcat-flume-logger.conf --name a1 -property flume.root.logger=INFO,console #简写 flume-ng agent -c ../conf -f ../jobs/netcat-flume-logger.conf -n a1 -property flume.root.logger=INFO,console #linux环境启动命令 那个logger不一样 #-Dflume.root.logger=INFO,console
  1. 启动结果显示端口已监听
[INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:166)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
  1. 通过netcat工具向端口发送数据

没有netcat telnet 也可以

nc localhost 44444 > Hello Flume OK
  1. 查看flume的日志输出
[INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 48 65 6C 6C 6F 20 46 6C 75 6D 65 Hello Flume }

4.Source种类

拉出几个常用的。其他的查看官方文档

1.Avro Source

Flume集群使用,多个Fulme连接需要使用Avro的source和sink连接,自身相当于服务端,需要优先启动

#相关source区 写法 a1.sources.r1.type=avro a1.sources.r1.bind=0.0.0.0 a1.sources.r1.port=4141

2. Exec Source

用命令监控某个文件的变化,将变化的内容传送走,适用于监控一个实时变化的日志文件,缺点是不能断点续传,容易数据丢失或是数据重复

#相关source区 写法 a1.sources.r1.type=exec #后跟 监控文件的命令 a1.sources.r1.command=tail -F /var/log/secure

3. Spooling Directory Source

监控本地文件夹,文件夹中新增文件时,会将整个文件内容取走,并加后缀做标记(.COMPLETED),表示已经取过了,标记后的文件不在处理,内容变化也不管。适用于批量上传,缺点是不能适应动态变化的文件

#相关source区 写法 a1.sources.r1.type=spooldir #后跟监控文件的目录 a1.sources.r1.spoolDir=/var/log/apache/flumeSpool

4. Taildir Source

可以监控不同文件夹下的不同文件。支持断点续传

#相关source区 写法 a1.sources.r1.type = TAILDIR #指定保存断点续传的文件 a1.sources.r1.positionFile = /var/log/flume/position.json #指定文件组 可以多个 a1.sources.r1.filegroups = f1 f2 #指定 单个文件 a1.sources.r1.filegroups.f1 = /var/log/test1/example.log #指定多个文件 * 不能出现在第一位 a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*

5. NetCat TCP Source

监听TCP端口用的,还有个UDP Source 就type不一样

#相关source区 写法 udp 的type是netcatudp a1.sources.r1.type=netcat a1.sources.r1.bind=0.0.0.0 a1.sources.r1.port=6666

5.Channel种类

常用的Memory Channel和File Channel

1. Memory Channel

将数据缓存在内存中,快,闪断丢数据

#channel区 a1.channels.c1.type=memory

2. File Channel

将数据缓存在本地文件中,慢点,稳定

#channel区 a1.channels.c1.type=file #数据存放的目录,可以有多个目录,逗号分割 a1.channels.c1.dataDirs=/mnt/flume/data

6.Sink种类

常用的拉出

1. Avro Sink

Flume集群收尾连接,相对source 而言这个算客户端,去绑定服务端IP和端口

#sink 区 a1.sinks.k1.type = avro a1.sinks.k1.hostname = 10.10.10.10 a1.sinks.k1.port = 4545

2. Logger Sink

日志打印在终端屏幕上,需要agent启动的时候搭配对应命令

问题:中文没打出来;maxBytesToLog这玩意不好使

#sink区 #-property flume.root.logger=INFO,console #-Dflume.root.logger=INFO,console a1.sinks.k1.type=logger

3. File Roll Sink

日志输出到本地文件,问题是就算没有接收到数据,也会不停的生成新的空文件

#sink区 a1.sinks.k2.type = file_roll #指定生成文件夹路径,文件名会自动生成 a1.sinks.k2.sink.directory = ../data/ #指定新文件生成频率,30s a1.sinks.k2.sink.rollInterval = 30

7.Channel Selector

这个就俩,算自定义的三个

1. Replicating Channel Selector (default)

默认的,可以不写,把数据发给所有sink

#channel selector 区 a1.sources.r1.selector.type=replicating

2. Multiplexing Channel Selector

根据拦截器规则,选择发送到哪个sink

#channel selector 区 a1.sources.r1.selector.type = multiplexing #这个关键变量,拦截器里写的,写什么是什么,不一定是state a1.sources.r1.selector.header = state #根据这个state的值选择sink,如果header中的state=CZ就发个c1 a1.sources.r1.selector.mapping.CZ = c1 # 等于US就给c2 c3 a1.sources.r1.selector.mapping.US = c2 c3

8.Sink Processor

目前算默认的三种,默认的数据给谁,谁就发走

1. Failover Sink Processor

故障转移,根据优先级,可着最高优先级的用,挂了再用优先级低的

#sink处理器区 #这个算声明,可以拿到声明区 a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 #选择处理器类型为 故障转移 a1.sinkgroups.g1.processor.type = failover #指定sink组成员优先级 a1.sinkgroups.g1.processor.priority.k1 = 5 #最高等级k2一直使用,直到挂了为止,才能轮到k1 a1.sinkgroups.g1.processor.priority.k2 = 10

2. Load balancing Sink Processor

负载均衡,可选随机或是均衡发送

#sink处理器区 #这个算声明,可以拿到声明区 a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 #选择处理器类型为 负载均衡 a1.sinkgroups.g1.processor.type = load_balance #指定模式为随机发送,还可以选round_robin均衡发送 a1.sinkgroups.g1.processor.selector = random

9.自定义Inteceptor

idea新建maven项目开整

\> hello

1. 引入依赖

<dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> </dependency>

2. 写自己的拦截器

package com.flume; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.util.HashMap; import java.util.List; import java.util.Map; public class MyInterceptor implements Interceptor { @Override public void initialize() { System.out.println("---------initialize---------"); } @Override public Event intercept(Event event) { String body = new String(event.getBody()); Map<String, String> map = new HashMap<>(); if(body.contains("hello")){ map.put("myStatus","up"); event.setHeaders(map); }else{ map.put("myStatus","down"); event.setHeaders(map); } return event; } @Override public List<Event> intercept(List<Event> list) { return null; } @Override public void close() {} public static class MyBuilder implements Interceptor.Builder{ @Override public Interceptor build() { return new MyInterceptor(); } @Override public void configure(Context context) {} } }

3. 项目打个jar包放到Flume的lib目录下

4. 编写Flume配置文件

netcat-flume-doubleLogger.conf,本来想两个sink都发到logger的,后来发现终端上分不出哪个是哪个,把其中一个sink改到落到本地文件了

#变量区 a1.sources = r1 a1.channels = c1 c2 a1.sinks = k1 k2 #source区 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 #intercetor区 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.flume.MyInterceptor$MyBuilder #channel selector 区 a1.sources.r1.selector.type = multiplexing #我代码里定义的是 myStatus a1.sources.r1.selector.header = myStatus #myStatus=up就发个c1 a1.sources.r1.selector.mapping.up = c1 # 等于down就给c2 a1.sources.r1.selector.mapping.down = c2 #channel区 a1.channels.c1.type = memory a1.channels.c2.type = memory #sink区 a1.sinks.k1.type = logger a1.sinks.k2.type = file_roll a1.sinks.k2.sink.directory = ../data/ #绑定区 a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2

5. 启动

flume-ng agent --conf ../conf --conf-file ../jobs/netcat-flume-doubleLogger.conf --name a1 -property flume.root.logger=INFO,console

6. 测试效果

启动netcat发送命令

> nc localhost 44444 6351 OK hello OK

一个在终端屏幕上有显示,一个在本地文件中有显示。

10自定义Source

1. 依赖

<dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> </dependency>

2. 代码

package com.flume; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; import org.apache.flume.event.SimpleEvent; import org.apache.flume.source.AbstractSource; public class MySource extends AbstractSource implements Configurable, PollableSource { private String aaa; private String bbb; /** * 定义配置文件中的变量 * @param context */ @Override public void configure(Context context) { aaa = context.getString("aaa"); //没取到值 就给默认值 bbb = context.getString("bbb", "GangZi"); } @Override public Status process(){ Status status = null; try { //创建个事件 Event e = new SimpleEvent(); String content = aaa+"->"+bbb+": "+Math.random(); e.setBody(content.getBytes("utf-8")); //把event传给channel getChannelProcessor().processEvent(e); status = Status.READY; } catch (Throwable t) { status = Status.BACKOFF; if (t instanceof Error) { throw (Error)t; } } try { //睡两秒 要不刷的太快了 Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return status; } @Override public long getBackOffSleepIncrement() { return 0; } @Override public long getMaxBackOffSleepInterval() { return 0; } }

3. 写配置文件custom-flume-logger.conf

#变量区 a1.sources = r1 a1.channels = c1 a1.sinks = k1 #source区 a1.sources.r1.type = com.flume.MySource a1.sources.r1.aaa = LiLei #a1.sources.r1.bbb = HanMeiMei #channel区 a1.channels.c1.type = memory #sink区 a1.sinks.k1.type = logger a1.sinks.k1.maxBytesToLog = 18 #绑定区 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

4. 启动命令

flume-ng agent --conf ../conf --conf-file ../jobs/custom-flume-logger.conf --name a1 -property flume.root.logger=INFO,console

5. 查看效果

 [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 4C 69 4C 65 69 2D 3E 47 61 6E 67 5A 69 3A 20 30 LiLei->GangZi: 0 }

11自定义Sink

1. 依赖

<dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> </dependency>

2. 代码

package com.flume; import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; public class MySink extends AbstractSink implements Configurable { private String ccc; private String ddd; @Override public void configure(Context context) { ccc = context.getString("ccc"); ddd = context.getString("ddd","xxxxxxxx"); } @Override public Status process() throws EventDeliveryException { Status status = null; Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try { Event event = ch.take(); if(event == null){ txn.rollback(); status = Status.BACKOFF; }else { byte[] body = event.getBody(); String content = new String(body); System.out.println("ccc="+ccc); System.out.println("content="+content); System.out.println("ddd="+ddd); txn.commit(); status = Status.READY; } } catch (Exception e) { e.printStackTrace(); txn.rollback(); status = Status.BACKOFF; }finally { txn.close(); } return status; } }

3. flume配置文件netcat-flume-custom.conf

#变量区 a1.sources = r1 a1.channels = c1 a1.sinks = k1 #source区 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 #channel区 a1.channels.c1.type = memory #sink区 a1.sinks.k1.type = com.flume.MySink a1.sinks.k1.ccc =--hello-- a1.sinks.k1.ddd =--moto-- #绑定区 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

4. 启动agent

flume-ng agent --conf ../conf --conf-file ../jobs/netcat-flume-custom.conf --name a1 -property flume.root.logger=INFO,console

5. 客户端发送命令

telnet localhost 44444 > 1 OK

6. flume终端打印

ccc=--hello-- content=1 ddd=--moto--

12.Flume监控工具

下面的实验,windows环境下怎么跟多个参数 不知道

1.Http监控,json形式文本

使用这种监控方式,只需要在启动flume的时候在启动参数上面加上监控配置,例如这样:

bin/flume-ng agent --conf conf --conf-file conf/flume_conf.properties --name collect -Dflume.monitoring.type=http -Dflume.monitoring.port=1234

其中-Dflume.monitoring.type=http表示使用http方式来监控,后面的-Dflume.monitoring.port=1234表示我们需要启动的监控服务的端口号为1234,这个端口号可以自己随意配置。然后启动flume之后,通过http://ip:1234/metrics就可以得到flume的一个json格式的监控数据。

2.ganglia监控,图形化界面

这种监控方式需要先安装ganglia然后启动ganglia,然后再启动flume的时候加上监控配置,例如:

bin/flume-ng agent --conf conf --conf-file conf/producer.properties --name collect -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=ip:port

其中-Dflume.monitoring.type=ganglia表示使用ganglia的方式来监控,而-Dflume.monitoring.hosts=ip:port表示ganglia安装的ip和启动的端口号。

flume监控还可以使用zabbix,但是这种方式需要在flume源码中添加监控模块,相对比较麻烦,由于不是flume自带的监控方式,这里不讨论这种方式。

因此,flume自带的监控方式其实就是http、ganglia两种,http监控只能通过一个http地址访问得到一个json格式的监控数据,而ganglia监控是拿到这个数据后用界面的方式展示出来了,相对比较直观。

原文链接:https://blog.51cto.com/u_13278546/2769541
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章