Flume学习笔记
Flume [fluːm] (引水槽、运河)
王洪亮 2021/05/07 15:05
1. 简介
大数据技术解决的三个问题,海量数据的传输、存储和计算。Flume属于传输框架。专门传输日志的,多媒体不行。
2.安装
官网 http://flume.apache.org/ 下载apache-flume-1.9.0-bin.tar.gz,下载后解压即可,类似tomcat中间件,需要改配置文件 , 把apache-flume-1.9.0-bin/conf/flume-env.sh.template改成flume-env.sh,里面配置jdk地址,安装完毕。
export JAVA_HOME=D:\sde\Java\jdk1.8.0_131
3.Hello Flume样例
flume是按任务启动的,这个任务在flume这里叫agent,每个agent分为三块,source(对接数据来源)、channel(中间缓冲区)、sink(数据去向)。任务过程简述就是从一个地方采集数据后送到其他地方。
此次案例source数据采集自端口,通过channel再经过sink将数据输出到终端屏幕上。
开始编写Hello Flume样例(win环境)
- 编写配置文件,参考官网例子
习惯上把任务的配置文件和conf区分开,这里新建jobs目录下新建netcat-flume-logger.conf,命名格式最好区分出source和sink的类型
#完整的conf分几块,变量声明区、source区、interceptor区、channelSelector区、channel区、sink区、关系绑定区
#变量声明区(必须) a1指的是agent的变量名,启动时指定
#分别声明 sources、channels、sinks、sinkgroups等
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#source区(必须) 指定source类型 具体类型需要看官网文档source 部分
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#interceptor区(可选),代码自写,配合multiplexing channelSelector使用
#channelSelector区(可选),不写默认是replicating,把数据发给每个sink
a1.sources.r1.selector.type=replicating
#channel区(必须) 指定channel类型 具体类型需要看官网文档channel部分
a1.channels.c1.type = memory
#sink处理器区,声明sink组,选择处理器是故障转移还是负载均衡,不写就是默认
#sink区(必须) 指定sink类型 具体类型需要看官网文档sink部分
a1.sinks.k1.type = logger
#关系绑定区(必须)
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 启动flume的agent
#win环境启动命令 进入到bin目录 cmd
flume-ng agent --conf ../conf --conf-file ../jobs/netcat-flume-logger.conf --name a1 -property flume.root.logger=INFO,console
#简写
flume-ng agent -c ../conf -f ../jobs/netcat-flume-logger.conf -n a1 -property flume.root.logger=INFO,console
#linux环境启动命令 那个logger不一样
#-Dflume.root.logger=INFO,console
- 启动结果显示端口已监听
[INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:166)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
- 通过netcat工具向端口发送数据
没有netcat telnet 也可以
nc localhost 44444
> Hello Flume
OK
- 查看flume的日志输出
[INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 48 65 6C 6C 6F 20 46 6C 75 6D 65 Hello Flume }
4.Source种类
拉出几个常用的。其他的查看官方文档
1.Avro Source
Flume集群使用,多个Fulme连接需要使用Avro的source和sink连接,自身相当于服务端,需要优先启动
#相关source区 写法
a1.sources.r1.type=avro
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=4141
2. Exec Source
用命令监控某个文件的变化,将变化的内容传送走,适用于监控一个实时变化的日志文件,缺点是不能断点续传,容易数据丢失或是数据重复
#相关source区 写法
a1.sources.r1.type=exec
#后跟 监控文件的命令
a1.sources.r1.command=tail -F /var/log/secure
3. Spooling Directory Source
监控本地文件夹,文件夹中新增文件时,会将整个文件内容取走,并加后缀做标记(.COMPLETED),表示已经取过了,标记后的文件不在处理,内容变化也不管。适用于批量上传,缺点是不能适应动态变化的文件
#相关source区 写法
a1.sources.r1.type=spooldir
#后跟监控文件的目录
a1.sources.r1.spoolDir=/var/log/apache/flumeSpool
4. Taildir Source
可以监控不同文件夹下的不同文件。支持断点续传
#相关source区 写法
a1.sources.r1.type = TAILDIR
#指定保存断点续传的文件
a1.sources.r1.positionFile = /var/log/flume/position.json
#指定文件组 可以多个
a1.sources.r1.filegroups = f1 f2
#指定 单个文件
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
#指定多个文件 * 不能出现在第一位
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
5. NetCat TCP Source
监听TCP端口用的,还有个UDP Source 就type不一样
#相关source区 写法 udp 的type是netcatudp
a1.sources.r1.type=netcat
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=6666
5.Channel种类
常用的Memory Channel和File Channel
1. Memory Channel
将数据缓存在内存中,快,闪断丢数据
#channel区
a1.channels.c1.type=memory
2. File Channel
将数据缓存在本地文件中,慢点,稳定
#channel区
a1.channels.c1.type=file
#数据存放的目录,可以有多个目录,逗号分割
a1.channels.c1.dataDirs=/mnt/flume/data
6.Sink种类
常用的拉出
1. Avro Sink
Flume集群收尾连接,相对source 而言这个算客户端,去绑定服务端IP和端口
#sink 区
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
2. Logger Sink
日志打印在终端屏幕上,需要agent启动的时候搭配对应命令
问题:中文没打出来;maxBytesToLog这玩意不好使
#sink区
#-property flume.root.logger=INFO,console
#-Dflume.root.logger=INFO,console
a1.sinks.k1.type=logger
3. File Roll Sink
日志输出到本地文件,问题是就算没有接收到数据,也会不停的生成新的空文件
#sink区
a1.sinks.k2.type = file_roll
#指定生成文件夹路径,文件名会自动生成
a1.sinks.k2.sink.directory = ../data/
#指定新文件生成频率,30s
a1.sinks.k2.sink.rollInterval = 30
7.Channel Selector
这个就俩,算自定义的三个
1. Replicating Channel Selector (default)
默认的,可以不写,把数据发给所有sink
#channel selector 区
a1.sources.r1.selector.type=replicating
2. Multiplexing Channel Selector
根据拦截器规则,选择发送到哪个sink
#channel selector 区
a1.sources.r1.selector.type = multiplexing
#这个关键变量,拦截器里写的,写什么是什么,不一定是state
a1.sources.r1.selector.header = state
#根据这个state的值选择sink,如果header中的state=CZ就发个c1
a1.sources.r1.selector.mapping.CZ = c1
# 等于US就给c2 c3
a1.sources.r1.selector.mapping.US = c2 c3
8.Sink Processor
目前算默认的三种,默认的数据给谁,谁就发走
1. Failover Sink Processor
故障转移,根据优先级,可着最高优先级的用,挂了再用优先级低的
#sink处理器区
#这个算声明,可以拿到声明区
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
#选择处理器类型为 故障转移
a1.sinkgroups.g1.processor.type = failover
#指定sink组成员优先级
a1.sinkgroups.g1.processor.priority.k1 = 5
#最高等级k2一直使用,直到挂了为止,才能轮到k1
a1.sinkgroups.g1.processor.priority.k2 = 10
2. Load balancing Sink Processor
负载均衡,可选随机或是均衡发送
#sink处理器区
#这个算声明,可以拿到声明区
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
#选择处理器类型为 负载均衡
a1.sinkgroups.g1.processor.type = load_balance
#指定模式为随机发送,还可以选round_robin均衡发送
a1.sinkgroups.g1.processor.selector = random
9.自定义Inteceptor
idea新建maven项目开整
\> hello
1. 引入依赖
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
2. 写自己的拦截器
package com.flume;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class MyInterceptor implements Interceptor {
@Override
public void initialize() {
System.out.println("---------initialize---------");
}
@Override
public Event intercept(Event event) {
String body = new String(event.getBody());
Map<String, String> map = new HashMap<>();
if(body.contains("hello")){
map.put("myStatus","up");
event.setHeaders(map);
}else{
map.put("myStatus","down");
event.setHeaders(map);
}
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
return null;
}
@Override
public void close() {}
public static class MyBuilder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new MyInterceptor();
}
@Override
public void configure(Context context) {}
}
}
3. 项目打个jar包放到Flume的lib目录下
4. 编写Flume配置文件
netcat-flume-doubleLogger.conf,本来想两个sink都发到logger的,后来发现终端上分不出哪个是哪个,把其中一个sink改到落到本地文件了
#变量区
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2
#source区
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#intercetor区
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.flume.MyInterceptor$MyBuilder
#channel selector 区
a1.sources.r1.selector.type = multiplexing
#我代码里定义的是 myStatus
a1.sources.r1.selector.header = myStatus
#myStatus=up就发个c1
a1.sources.r1.selector.mapping.up = c1
# 等于down就给c2
a1.sources.r1.selector.mapping.down = c2
#channel区
a1.channels.c1.type = memory
a1.channels.c2.type = memory
#sink区
a1.sinks.k1.type = logger
a1.sinks.k2.type = file_roll
a1.sinks.k2.sink.directory = ../data/
#绑定区
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
5. 启动
flume-ng agent --conf ../conf --conf-file ../jobs/netcat-flume-doubleLogger.conf --name a1 -property flume.root.logger=INFO,console
6. 测试效果
启动netcat发送命令
> nc localhost 44444
6351
OK
hello
OK
一个在终端屏幕上有显示,一个在本地文件中有显示。
10自定义Source
1. 依赖
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
2. 代码
package com.flume;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
public class MySource extends AbstractSource implements Configurable, PollableSource {
private String aaa;
private String bbb;
/**
* 定义配置文件中的变量
* @param context
*/
@Override
public void configure(Context context) {
aaa = context.getString("aaa");
//没取到值 就给默认值
bbb = context.getString("bbb", "GangZi");
}
@Override
public Status process(){
Status status = null;
try {
//创建个事件
Event e = new SimpleEvent();
String content = aaa+"->"+bbb+": "+Math.random();
e.setBody(content.getBytes("utf-8"));
//把event传给channel
getChannelProcessor().processEvent(e);
status = Status.READY;
} catch (Throwable t) {
status = Status.BACKOFF;
if (t instanceof Error) {
throw (Error)t;
}
}
try {
//睡两秒 要不刷的太快了
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return status;
}
@Override
public long getBackOffSleepIncrement() {
return 0;
}
@Override
public long getMaxBackOffSleepInterval() {
return 0;
}
}
3. 写配置文件custom-flume-logger.conf
#变量区
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#source区
a1.sources.r1.type = com.flume.MySource
a1.sources.r1.aaa = LiLei
#a1.sources.r1.bbb = HanMeiMei
#channel区
a1.channels.c1.type = memory
#sink区
a1.sinks.k1.type = logger
a1.sinks.k1.maxBytesToLog = 18
#绑定区
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
4. 启动命令
flume-ng agent --conf ../conf --conf-file ../jobs/custom-flume-logger.conf --name a1 -property flume.root.logger=INFO,console
5. 查看效果
[INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 4C 69 4C 65 69 2D 3E 47 61 6E 67 5A 69 3A 20 30 LiLei->GangZi: 0 }
11自定义Sink
1. 依赖
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
2. 代码
package com.flume;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
public class MySink extends AbstractSink implements Configurable {
private String ccc;
private String ddd;
@Override
public void configure(Context context) {
ccc = context.getString("ccc");
ddd = context.getString("ddd","xxxxxxxx");
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
Event event = ch.take();
if(event == null){
txn.rollback();
status = Status.BACKOFF;
}else {
byte[] body = event.getBody();
String content = new String(body);
System.out.println("ccc="+ccc);
System.out.println("content="+content);
System.out.println("ddd="+ddd);
txn.commit();
status = Status.READY;
}
} catch (Exception e) {
e.printStackTrace();
txn.rollback();
status = Status.BACKOFF;
}finally {
txn.close();
}
return status;
}
}
3. flume配置文件netcat-flume-custom.conf
#变量区
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#source区
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#channel区
a1.channels.c1.type = memory
#sink区
a1.sinks.k1.type = com.flume.MySink
a1.sinks.k1.ccc =--hello--
a1.sinks.k1.ddd =--moto--
#绑定区
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
4. 启动agent
flume-ng agent --conf ../conf --conf-file ../jobs/netcat-flume-custom.conf --name a1 -property flume.root.logger=INFO,console
5. 客户端发送命令
telnet localhost 44444
> 1
OK
6. flume终端打印
ccc=--hello--
content=1
ddd=--moto--
12.Flume监控工具
下面的实验,windows环境下怎么跟多个参数 不知道
1.Http监控,json形式文本
使用这种监控方式,只需要在启动flume的时候在启动参数上面加上监控配置,例如这样:
bin/flume-ng agent --conf conf --conf-file conf/flume_conf.properties --name collect -Dflume.monitoring.type=http -Dflume.monitoring.port=1234
其中-Dflume.monitoring.type=http表示使用http方式来监控,后面的-Dflume.monitoring.port=1234表示我们需要启动的监控服务的端口号为1234,这个端口号可以自己随意配置。然后启动flume之后,通过http://ip:1234/metrics就可以得到flume的一个json格式的监控数据。
2.ganglia监控,图形化界面
这种监控方式需要先安装ganglia然后启动ganglia,然后再启动flume的时候加上监控配置,例如:
bin/flume-ng agent --conf conf --conf-file conf/producer.properties --name collect -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=ip:port
其中-Dflume.monitoring.type=ganglia表示使用ganglia的方式来监控,而-Dflume.monitoring.hosts=ip:port表示ganglia安装的ip和启动的端口号。
flume监控还可以使用zabbix,但是这种方式需要在flume源码中添加监控模块,相对比较麻烦,由于不是flume自带的监控方式,这里不讨论这种方式。
因此,flume自带的监控方式其实就是http、ganglia两种,http监控只能通过一个http地址访问得到一个json格式的监控数据,而ganglia监控是拿到这个数据后用界面的方式展示出来了,相对比较直观。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
数栈技术文章分享:你居然是这样的initialValue
先说一下写着篇文章的契机,是因为回显,复杂表单的回显,让我觉得我对initialValue这个属性是有误解的。 一、initialValue的出处和定义 initialValue的出处: AntDesign/Form表单件/getFieldDecorator(id,options)装饰器函数/第二个参数options/options.initialValue。 链接地址:https://ant.design/components/form-cn/#getFieldDecorator(id,-options)-%E5%8F%82%E6%95%B0 关于属性initialValue,官方的解释如下: 关键字是“子节点的初始值”,初始值也就是默认值,比如Form中有一个城市的选择器,默认选择“杭州”,那么initialValue就是杭州对应的value。 所以其实我一直以为initialValue是defaultValue一样的存在。 二、initialValue和defaultValue的区别 1. defaultValue的例子 importReact,{Component,Fragme...
-
下一篇
Furion 让开发者重新认识了 .NET,v2.4.0 发布
让 .NET 开发更简单,更通用,更流行。 庆祝 5K 说点 自 2020年09月01日 发布以来,Furion 一直高速发展,Stars 趋势图也勾勒出了指数增长的线条美,截至今日,诞生 7个月12天。 今天,Furion 项目在 Gitee 平台突破了 5K Stars,QQ 交流群成员达 6200 +,Nuget 下载破 260K。或许5K Stars 对 Java 项目来说只是个小目标,但对国内.NET 开源项目来说,无疑是梦想中的目标。 当然 Stars 的多少并不能决定项目优秀与否,但是从侧面也能反映出 .NET 正在崛起。 作者贡献度: https://gitee.com/monksoul 项目概况图: https://gitee.com/dotnetchina/Furion Stars 趋势图: https://whnb.wang/dotnetchina/Furion?e=43200 贡献者画像: https://giteye.net/chart/ZS49EPL6 框架文档: https://dotnetchina.gitee.io/furion/ 本期更新 新特性 [...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- MySQL数据库在高并发下的优化方案
- Dcoker安装(在线仓库),最新的服务器搭配容器使用
- CentOS8编译安装MySQL8.0.19
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- CentOS7,8上快速安装Gitea,搭建Git服务器
- SpringBoot2全家桶,快速入门学习开发网站教程
- Docker快速安装Oracle11G,搭建oracle11g学习环境