[Flume]安装,部署与应用案例
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/SunnyYoona/article/details/52585478 1. 官网 http://flume.apache.org/ 2. 下载 http://flume.apache.org/download.html 3. 安装 3.1 将下载的flume包,解压到/opt目录中 3.2 创建flume-env.sh 配置文件 xiaosi@Qunar:/opt/apache-flume-1.6.0-bin/conf$ sudo cp flume-env.sh.template flume-env.sh 3.3 修改 flume-env.sh 配置文件,主要是JAVA_HOME变量设置 # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # If this file is placed at FLUME_CONF_DIR/flume-env.sh, it will be sourced # during Flume startup. # Enviroment variables can be set here. # export JAVA_HOME=/usr/lib/jvm/java-6-sun export JAVA_HOME=/opt/jdk1.8.0_91 # Give Flume more memory and pre-allocate, enable remote monitoring via JMX # export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote" # Note that the Flume conf directory is always included in the classpath. #FLUME_CLASSPATH="" 3.4 验证是否安装成功 xiaosi@Qunar:/opt/apache-flume-1.6.0-bin/bin$ flume-ng version Flume 1.6.0 Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git Revision: 2561a23240a71ba20bf288c7c2cda88f443c2080 Compiled by hshreedharan on Mon May 11 11:15:44 PDT 2015 From source with checksum b29e416802ce9ece3269d34233baf43f 出现上面信息,表示安装成功了。 4. 案例 4.1 案例一 Avro Avro可以发送一个给定的文件给Flume,Avro 源使用AVRO RPC机制。 4.1.1 创建agent 配置文件 根据模板文件创建配置文件: xiaosi@Qunar:/opt/apache-flume-1.6.0-bin/conf$ sudo cp flume-conf.properties.template flume.conf 4.1.2 配置agent配置文件 当你运行一个agent的时候,需要通过-f 选项来告诉Flume使用哪个配置文件。让我们看一个基本的例子,复制下面代码并粘贴到conf/flume.conf文件中。 # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' agent1.sources = avro-source1 agent1.channels = ch1 agent1.sinks = logger-sink1 # sources agent1.sources.avro-source1.type = avro agent1.sources.avro-source1.channels = ch1 agent1.sources.avro-source1.bind = 0.0.0.0 agent1.sources.avro-source1.port = 4141 # sink agent1.sinks.logger-sink1.type = logger agent1.sinks.logger-sink1.channel = ch1 # channel agent1.channels.ch1.type = memory agent1.channels.ch1.capacity = 1000 agent1.channels.ch1.transactionCapacity = 100 4.1.3 启动flume agent agent1 xiaosi@Qunar:/opt/apache-flume-1.6.0-bin/bin$ flume-ng agent -c . -f ../conf/flume.conf -n agent1 -Dflume.root.logger=INFO,console 4.1.4 创建指定文件 xiaosi@Qunar:/opt/apache-flume-1.6.0-bin$ sudo touch log.00 xiaosi@Qunar:/opt/apache-flume-1.6.0-bin$ sudo vim log.00 4.1.5 使用avro-client发送文件 xiaosi@Qunar:/opt/apache-flume-1.6.0-bin/bin$ flume-ng avro-client -c . -H 0.0.0.0 -p 4141 -F ../log.00 4.1.6 查看信息 在启动agent的控制窗口,可以看到一下信息,注意最后一行: xiaosi@Qunar:/opt/apache-flume-1.6.0-bin/bin$ flume-ng agent -c . -f ../conf/flume.conf -n agent1 -Dflume.root.logger=INFO,console Info: Including Hadoop libraries found via (/opt/hadoop-2.7.2/bin/hadoop) for HDFS access Info: Excluding /opt/hadoop-2.7.2/share/hadoop/common/lib/slf4j-api-1.7.10.jar from classpath ... SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/apache-flume-1.6.0-bin/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/apache-hive-2.0.0-bin/lib/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/apache-hive-2.0.0-bin/lib/hive-jdbc-2.0.0-standalone.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 16/09/19 10:29:27 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting 16/09/19 10:29:27 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:../conf/flume.conf 16/09/19 10:29:27 INFO conf.FlumeConfiguration: Processing:logger-sink1 16/09/19 10:29:27 INFO conf.FlumeConfiguration: Processing:logger-sink1 16/09/19 10:29:27 INFO conf.FlumeConfiguration: Added sinks: logger-sink1 Agent: agent1 16/09/19 10:29:27 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent1] 16/09/19 10:29:27 INFO node.AbstractConfigurationProvider: Creating channels 16/09/19 10:29:27 INFO channel.DefaultChannelFactory: Creating instance of channel ch1 type memory 16/09/19 10:29:27 INFO node.AbstractConfigurationProvider: Created channel ch1 16/09/19 10:29:27 INFO source.DefaultSourceFactory: Creating instance of source avro-source1, type avro 16/09/19 10:29:27 INFO sink.DefaultSinkFactory: Creating instance of sink: logger-sink1, type: logger 16/09/19 10:29:27 INFO node.AbstractConfigurationProvider: Channel ch1 connected to [avro-source1, logger-sink1] 16/09/19 10:29:27 INFO node.Application: Starting new configuration:{ sourceRunners:{avro-source1=EventDrivenSourceRunner: { source:Avro source avro-source1: { bindAddress: 0.0.0.0, port: 4141 } }} sinkRunners:{logger-sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@453e9d5e counterGroup:{ name:null counters:{} } }} channels:{ch1=org.apache.flume.channel.MemoryChannel{name: ch1}} } 16/09/19 10:29:27 INFO node.Application: Starting Channel ch1 16/09/19 10:29:27 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: ch1: Successfully registered new MBean. 16/09/19 10:29:27 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: ch1 started 16/09/19 10:29:27 INFO node.Application: Starting Sink logger-sink1 16/09/19 10:29:27 INFO node.Application: Starting Source avro-source1 16/09/19 10:29:27 INFO source.AvroSource: Starting Avro source avro-source1: { bindAddress: 0.0.0.0, port: 4141 }... 16/09/19 10:29:27 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: avro-source1: Successfully registered new MBean. 16/09/19 10:29:27 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: avro-source1 started 16/09/19 10:29:27 INFO source.AvroSource: Avro source avro-source1 started. 16/09/19 10:36:32 INFO ipc.NettyServer: [id: 0x072a068a, /127.0.0.1:42708 => /127.0.0.1:4141] OPEN 16/09/19 10:36:32 INFO ipc.NettyServer: [id: 0x072a068a, /127.0.0.1:42708 => /127.0.0.1:4141] BOUND: /127.0.0.1:4141 16/09/19 10:36:32 INFO ipc.NettyServer: [id: 0x072a068a, /127.0.0.1:42708 => /127.0.0.1:4141] CONNECTED: /127.0.0.1:42708 16/09/19 10:36:33 INFO ipc.NettyServer: [id: 0x072a068a, /127.0.0.1:42708 :> /127.0.0.1:4141] DISCONNECTED 16/09/19 10:36:33 INFO ipc.NettyServer: [id: 0x072a068a, /127.0.0.1:42708 :> /127.0.0.1:4141] UNBOUND 16/09/19 10:36:33 INFO ipc.NettyServer: [id: 0x072a068a, /127.0.0.1:42708 :> /127.0.0.1:4141] CLOSED 16/09/19 10:36:33 INFO ipc.NettyServer: Connection to /127.0.0.1:42708 disconnected. 16/09/19 10:36:37 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 46 6C 75 6D 65 Hello Flume } 4.2 案例二 Spool Spool监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点: (1)拷贝到spool目录下的文件不可以再打开编辑。 (2)spool目录下不可包含相应的子目录 4.2.1 创建配置文件flume-spool.conf xiaosi@Qunar:/opt/apache-flume-1.6.0-bin/conf$ sudo cp flume.conf flume-spool.conf 进行一下配置: # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' agent1.sources = avro-source1 agent1.channels = ch1 agent1.sinks = logger-sink1 # sources agent1.sources.avro-source1.type = spooldir agent1.sources.avro-source1.channels = ch1 agent1.sources.avro-source1.spoolDir = /home/xiaosi/logs/ agent1.sources.avro-source1.fileHeader = true agent1.sources.avro-source1.bind = 0.0.0.0 agent1.sources.avro-source1.port = 4141 # sink agent1.sinks.logger-sink1.type = logger agent1.sinks.logger-sink1.channel = ch1 # channel agent1.channels.ch1.type = memory agent1.channels.ch1.capacity = 1000 agent1.channels.ch1.transactionCapacity = 100 对/home/xiaosi/logs目录进行监控。 4.2.2 启动Flume agent agent1 xiaosi@Qunar:/opt/apache-flume-1.6.0-bin/bin$ flume-ng agent -c . -f ../conf/flume-spool.conf -n agent1 -Dflume.root.logger=INFO,console 4.2.3 追加文件到监控目录 xiaosi@Qunar:~$ echo "Hello Flume first" > /home/xiaosi/logs/flume-log-1.log xiaosi@Qunar:~$ echo "Hello Flume second" > /home/xiaosi/logs/flume-log-2.log 4.2.4 查看信息 在启动agent的控制窗口,可以看到一下信息,注意最后两行: xiaosi@Qunar:/opt/apache-flume-1.6.0-bin/bin$ flume-ng agent -c . -f ../conf/flume-spool.conf -n agent1 -Dflume.root.logger=INFO,console Info: Including Hadoop libraries found via (/opt/hadoop-2.7.2/bin/hadoop) for HDFS access Info: Excluding /opt/hadoop-2.7.2/share/hadoop/common/lib/slf4j-api-1.7.10.jar from classpath Info: Excluding /opt/hadoop-2.7.2/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar from classpath Info: Including Hive libraries found via (/opt/apache-hive-2.0.0-bin) for Hive access ... org.apache.flume.node.Application -f ../conf/flume-spool.conf -n agent1 SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/apache-flume-1.6.0-bin/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/apache-hive-2.0.0-bin/lib/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/apache-hive-2.0.0-bin/lib/hive-jdbc-2.0.0-standalone.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 16/09/19 11:29:52 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting 16/09/19 11:29:52 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:../conf/flume-spool.conf 16/09/19 11:29:52 INFO conf.FlumeConfiguration: Processing:logger-sink1 16/09/19 11:29:52 INFO conf.FlumeConfiguration: Added sinks: logger-sink1 Agent: agent1 16/09/19 11:29:52 INFO conf.FlumeConfiguration: Processing:logger-sink1 16/09/19 11:29:52 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent1] 16/09/19 11:29:52 INFO node.AbstractConfigurationProvider: Creating channels 16/09/19 11:29:52 INFO channel.DefaultChannelFactory: Creating instance of channel ch1 type memory 16/09/19 11:29:52 INFO node.AbstractConfigurationProvider: Created channel ch1 16/09/19 11:29:52 INFO source.DefaultSourceFactory: Creating instance of source avro-source1, type spooldir 16/09/19 11:29:52 INFO sink.DefaultSinkFactory: Creating instance of sink: logger-sink1, type: logger 16/09/19 11:29:52 INFO node.AbstractConfigurationProvider: Channel ch1 connected to [avro-source1, logger-sink1] 16/09/19 11:29:52 INFO node.Application: Starting new configuration:{ sourceRunners:{avro-source1=EventDrivenSourceRunner: { source:Spool Directory source avro-source1: { spoolDir: /home/xiaosi/logs/ } }} sinkRunners:{logger-sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@4f5f731e counterGroup:{ name:null counters:{} } }} channels:{ch1=org.apache.flume.channel.MemoryChannel{name: ch1}} } 16/09/19 11:29:52 INFO node.Application: Starting Channel ch1 16/09/19 11:29:52 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: ch1: Successfully registered new MBean. 16/09/19 11:29:52 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: ch1 started 16/09/19 11:29:52 INFO node.Application: Starting Sink logger-sink1 16/09/19 11:29:52 INFO node.Application: Starting Source avro-source1 16/09/19 11:29:52 INFO source.SpoolDirectorySource: SpoolDirectorySource source starting with directory: /home/xiaosi/logs/ 16/09/19 11:29:52 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: avro-source1: Successfully registered new MBean. 16/09/19 11:29:52 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: avro-source1 started 16/09/19 11:30:06 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one. 16/09/19 11:30:06 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /home/xiaosi/logs/flume-log-1.log to /home/xiaosi/logs/flume-log-1.log.COMPLETED 16/09/19 11:30:07 INFO sink.LoggerSink: Event: { headers:{file=/home/xiaosi/logs/flume-log-1.log} body: 48 65 6C 6C 6F 20 46 6C 75 6D 65 20 66 69 72 73 Hello Flume firs } 16/09/19 11:30:21 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one. 16/09/19 11:30:21 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /home/xiaosi/logs/flume-log-2.log to /home/xiaosi/logs/flume-log-2.log.COMPLETED 16/09/19 11:30:22 INFO sink.LoggerSink: Event: { headers:{file=/home/xiaosi/logs/flume-log-2.log} body: 48 65 6C 6C 6F 20 46 6C 75 6D 65 20 73 65 63 6F Hello Flume seco } 4.3 案例三 Exec EXEC执行一个给定的命令获得输出的源,如果要使用tail命令,必选使得file足够大才能看到输出内容 4.3.1 创建配置文件flume-exec.conf xiaosi@Qunar:/opt/apache-flume-1.6.0-bin/conf$ sudo cp flume.conf flume-exec.conf 进行如下修改: # The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' agent1.sources = avro-source1 agent1.channels = ch1 agent1.sinks = logger-sink1 # sources agent1.sources.avro-source1.type = exec agent1.sources.avro-source1.channels = ch1 agent1.sources.avro-source1.command = tail -F /home/xiaosi/logs/flume-log-exec.log agent1.sources.avro-source1.bind = 0.0.0.0 agent1.sources.avro-source1.port = 4141 # sink agent1.sinks.logger-sink1.type = logger agent1.sinks.logger-sink1.channel = ch1 # channel agent1.channels.ch1.type = memory agent1.channels.ch1.capacity = 1000 agent1.channels.ch1.transactionCapacity = 100 4.2.2 启动Flume agent agent1 xiaosi@Qunar:/opt/apache-flume-1.6.0-bin/bin$ flume-ng agent -c . -f ../conf/flume-exec.conf -n agent1 -Dflume.root.logger=INFO,console 4.2.3 执行tail命令 向文件中进行追加数据,生成足够多的数据: #! /bin/sh for index in {1..100} do echo "Hello Flume $index" >> /home/xiaosi/logs/flume-log-exec.log done 同时对文件使用tail 命令操作: xiaosi@Qunar:~$ tail -F /home/xiaosi/logs/flume-log-exec.log Hello Flume 1 Hello Flume 2 Hello Flume 3 Hello Flume 4 ... 4.3.4 查看信息 在启动agent的控制窗口,可以看到一下信息: xiaosi@Qunar:/opt/apache-flume-1.6.0-bin/bin$ flume-ng agent -c . -f ../conf/flume-exec.conf -n agent1 -Dflume.root.logger=INFO,console Info: Including Hadoop libraries found via (/opt/hadoop-2.7.2/bin/hadoop) for HDFS access Info: Excluding /opt/hadoop-2.7.2/share/hadoop/common/lib/slf4j-api-1.7.10.jar from classpath Info: Excluding /opt/hadoop-2.7.2/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar from classpath ... SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/apache-flume-1.6.0-bin/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/apache-hive-2.0.0-bin/lib/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/apache-hive-2.0.0-bin/lib/hive-jdbc-2.0.0-standalone.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 16/09/19 12:01:28 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting 16/09/19 12:01:28 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:../conf/flume-exec.conf 16/09/19 12:01:28 INFO conf.FlumeConfiguration: Processing:logger-sink1 16/09/19 12:01:28 INFO conf.FlumeConfiguration: Added sinks: logger-sink1 Agent: agent1 16/09/19 12:01:28 INFO conf.FlumeConfiguration: Processing:logger-sink1 16/09/19 12:01:28 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent1] 16/09/19 12:01:28 INFO node.AbstractConfigurationProvider: Creating channels 16/09/19 12:01:28 INFO channel.DefaultChannelFactory: Creating instance of channel ch1 type memory 16/09/19 12:01:28 INFO node.AbstractConfigurationProvider: Created channel ch1 16/09/19 12:01:28 INFO source.DefaultSourceFactory: Creating instance of source avro-source1, type exec 16/09/19 12:01:28 INFO sink.DefaultSinkFactory: Creating instance of sink: logger-sink1, type: logger 16/09/19 12:01:28 INFO node.AbstractConfigurationProvider: Channel ch1 connected to [avro-source1, logger-sink1] 16/09/19 12:01:28 INFO node.Application: Starting new configuration:{ sourceRunners:{avro-source1=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:avro-source1,state:IDLE} }} sinkRunners:{logger-sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@242d6c8b counterGroup:{ name:null counters:{} } }} channels:{ch1=org.apache.flume.channel.MemoryChannel{name: ch1}} } 16/09/19 12:01:28 INFO node.Application: Starting Channel ch1 16/09/19 12:01:28 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: ch1: Successfully registered new MBean. 16/09/19 12:01:28 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: ch1 started 16/09/19 12:01:28 INFO node.Application: Starting Sink logger-sink1 16/09/19 12:01:28 INFO node.Application: Starting Source avro-source1 16/09/19 12:01:28 INFO source.ExecSource: Exec source starting with command:tail -F /home/xiaosi/logs/flume-log-exec.log 16/09/19 12:01:28 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: avro-source1: Successfully registered new MBean. 16/09/19 12:01:28 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: avro-source1 started 16/09/19 12:01:58 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 46 6C 75 6D 65 20 31 Hello Flume 1 } 16/09/19 12:01:58 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 46 6C 75 6D 65 20 32 Hello Flume 2 } 16/09/19 12:01:58 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 46 6C 75 6D 65 20 33 Hello Flume 3 } ... 16/09/19 12:01:58 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 46 6C 75 6D 65 20 39 38 Hello Flume 98 } 16/09/19 12:01:58 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 46 6C 75 6D 65 20 39 39 Hello Flume 99 } 16/09/19 12:01:58 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 46 6C 75 6D 65 20 31 30 30 Hello Flume 100 } 4.4 案例四 Syslogtcp Syslogtcp监听TCP的端口做为数据源 4.4.1 创建配置文件flume-tcp.conf xiaosi@Qunar:/opt/apache-flume-1.6.0-bin/conf$ sudo cp flume.conf flume-tcp.conf 进行如下修改: # The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' agent1.sources = avro-source1 agent1.channels = ch1 agent1.sinks = logger-sink1 # sources agent1.sources.avro-source1.type = syslogtcp agent1.sources.avro-source1.channels = ch1 agent1.sources.avro-source1.host = localhost #agent1.sources.avro-source1.bind = 0.0.0.0 agent1.sources.avro-source1.port = 5140 # sink agent1.sinks.logger-sink1.type = logger agent1.sinks.logger-sink1.channel = ch1 # channel agent1.channels.ch1.type = memory agent1.channels.ch1.capacity = 1000 agent1.channels.ch1.transactionCapacity = 100 4.4.2 启动Flume agent agent1 xiaosi@Qunar:/opt/apache-flume-1.6.0-bin/bin$ flume-ng agent -c . -f ../conf/flume-tcp.conf -n agent1 -Dflume.root.logger=INFO,console 4.4.3 测试产生syslog xiaosi@Qunar:~$ echo "hello flume tcp" | nc localhost 5140 4.4.4 查看信息 在启动agent的控制窗口,可以看到一下信息: xiaosi@Qunar:/opt/apache-flume-1.6.0-bin/bin$ flume-ng agent -c . -f ../conf/flume-tcp.conf -n agent1 -Dflume.root.logger=INFO,console Info: Including Hadoop libraries found via (/opt/hadoop-2.7.2/bin/hadoop) for HDFS access Info: Excluding /opt/hadoop-2.7.2/share/hadoop/common/lib/slf4j-api-1.7.10.jar from classpath Info: Excluding /opt/hadoop-2.7.2/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar from classpath ... SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/apache-flume-1.6.0-bin/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/apache-hive-2.0.0-bin/lib/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/apache-hive-2.0.0-bin/lib/hive-jdbc-2.0.0-standalone.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 16/09/19 12:10:15 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting 16/09/19 12:10:15 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:../conf/flume-tcp.conf 16/09/19 12:10:15 INFO conf.FlumeConfiguration: Processing:logger-sink1 16/09/19 12:10:15 INFO conf.FlumeConfiguration: Processing:logger-sink1 16/09/19 12:10:15 INFO conf.FlumeConfiguration: Added sinks: logger-sink1 Agent: agent1 16/09/19 12:10:15 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent1] 16/09/19 12:10:15 INFO node.AbstractConfigurationProvider: Creating channels 16/09/19 12:10:15 INFO channel.DefaultChannelFactory: Creating instance of channel ch1 type memory 16/09/19 12:10:15 INFO node.AbstractConfigurationProvider: Created channel ch1 16/09/19 12:10:15 INFO source.DefaultSourceFactory: Creating instance of source avro-source1, type syslogtcp 16/09/19 12:10:15 INFO sink.DefaultSinkFactory: Creating instance of sink: logger-sink1, type: logger 16/09/19 12:10:15 INFO node.AbstractConfigurationProvider: Channel ch1 connected to [avro-source1, logger-sink1] 16/09/19 12:10:15 INFO node.Application: Starting new configuration:{ sourceRunners:{avro-source1=EventDrivenSourceRunner: { source:org.apache.flume.source.SyslogTcpSource{name:avro-source1,state:IDLE} }} sinkRunners:{logger-sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@38aab021 counterGroup:{ name:null counters:{} } }} channels:{ch1=org.apache.flume.channel.MemoryChannel{name: ch1}} } 16/09/19 12:10:15 INFO node.Application: Starting Channel ch1 16/09/19 12:10:16 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: ch1: Successfully registered new MBean. 16/09/19 12:10:16 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: ch1 started 16/09/19 12:10:16 INFO node.Application: Starting Sink logger-sink1 16/09/19 12:10:16 INFO node.Application: Starting Source avro-source1 16/09/19 12:10:16 INFO source.SyslogTcpSource: Syslog TCP Source starting... 16/09/19 12:10:50 WARN source.SyslogUtils: Event created from Invalid Syslog data. 16/09/19 12:10:54 INFO sink.LoggerSink: Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 68 65 6C 6C 6F 20 66 6C 75 6D 65 20 74 63 70 hello flume tcp } 4.5 案例五 JSONHandler 4.5.1 创建配置文件flume-json.conf xiaosi@Qunar:/opt/apache-flume-1.6.0-bin/conf$ sudo cp flume.conf flume-json.conf 进行如下修改: # The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' agent1.sources = avro-source1 agent1.channels = ch1 agent1.sinks = logger-sink1 # sources agent1.sources.avro-source1.type = org.apache.flume.source.http.HTTPSource agent1.sources.avro-source1.channels = ch1 agent1.sources.avro-source1.port = 8888 # sink agent1.sinks.logger-sink1.type = logger agent1.sinks.logger-sink1.channel = ch1 # channel agent1.channels.ch1.type = memory agent1.channels.ch1.capacity = 1000 agent1.channels.ch1.transactionCapacity = 100 4.5.2 启动Flume agent agent1 xiaosi@Qunar:/opt/apache-flume-1.6.0-bin/bin$ flume-ng agent -c . -f ../conf/flume-json.conf -n agent1 -Dflume.root.logger=INFO,console 4.5.3 生成JSON 格式的POST request xiaosi@Qunar:/opt/apache-flume-1.6.0-bin$ curl -X POST -d '[{ "headers" :{"a":"a1", "b":"b1"}, "body":"flume_json_boy"}]' http://localhost:8888 4.5.4 查看信息 在启动agent的控制窗口,可以看到一下信息: xiaosi@Qunar:/opt/apache-flume-1.6.0-bin/bin$ flume-ng agent -c . -f ../conf/flume-json.conf -n agent1 -Dflume.root.logger=INFO,console Info: Including Hadoop libraries found via (/opt/hadoop-2.7.2/bin/hadoop) for HDFS access Info: Excluding /opt/hadoop-2.7.2/share/hadoop/common/lib/slf4j-api-1.7.10.jar from classpath Info: Excluding /opt/hadoop-2.7.2/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar from classpath ... SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/apache-flume-1.6.0-bin/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/apache-hive-2.0.0-bin/lib/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/apache-hive-2.0.0-bin/lib/hive-jdbc-2.0.0-standalone.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 16/09/19 13:21:28 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting 16/09/19 13:21:28 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:../conf/flume-json.conf 16/09/19 13:21:28 INFO conf.FlumeConfiguration: Processing:logger-sink1 16/09/19 13:21:28 INFO conf.FlumeConfiguration: Processing:logger-sink1 16/09/19 13:21:28 INFO conf.FlumeConfiguration: Added sinks: logger-sink1 Agent: agent1 16/09/19 13:21:28 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent1] 16/09/19 13:21:28 INFO node.AbstractConfigurationProvider: Creating channels 16/09/19 13:21:28 INFO channel.DefaultChannelFactory: Creating instance of channel ch1 type memory 16/09/19 13:21:28 INFO node.AbstractConfigurationProvider: Created channel ch1 16/09/19 13:21:28 INFO source.DefaultSourceFactory: Creating instance of source avro-source1, type org.apache.flume.source.http.HTTPSource 16/09/19 13:21:28 INFO sink.DefaultSinkFactory: Creating instance of sink: logger-sink1, type: logger 16/09/19 13:21:28 INFO node.AbstractConfigurationProvider: Channel ch1 connected to [avro-source1, logger-sink1] 16/09/19 13:21:28 INFO node.Application: Starting new configuration:{ sourceRunners:{avro-source1=EventDrivenSourceRunner: { source:org.apache.flume.source.http.HTTPSource{name:avro-source1,state:IDLE} }} sinkRunners:{logger-sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@136bcdd0 counterGroup:{ name:null counters:{} } }} channels:{ch1=org.apache.flume.channel.MemoryChannel{name: ch1}} } 16/09/19 13:21:28 INFO node.Application: Starting Channel ch1 16/09/19 13:21:28 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: ch1: Successfully registered new MBean. 16/09/19 13:21:28 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: ch1 started 16/09/19 13:21:28 INFO node.Application: Starting Sink logger-sink1 16/09/19 13:21:28 INFO node.Application: Starting Source avro-source1 16/09/19 13:21:28 INFO mortbay.log: Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog 16/09/19 13:21:28 INFO mortbay.log: jetty-6.1.26 16/09/19 13:21:28 INFO mortbay.log: Started SelectChannelConnector@0.0.0.0:8888 16/09/19 13:21:28 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: avro-source1: Successfully registered new MBean. 16/09/19 13:21:28 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: avro-source1 started 16/09/19 13:21:32 INFO sink.LoggerSink: Event: { headers:{a=a1, b=b1} body: 66 6C 75 6D 65 5F 6A 73 6F 6E 5F 62 6F 79 flume_json_boy } 4.6 案例六 Hadoop Sink Syslogtcp监听TCP的端口做为数据源,并将监听的数据存储在HDFS中 4.6.1 创建配置文件flume-hadoop.conf xiaosi@Qunar:/opt/apache-flume-1.6.0-bin/conf$ sudo cp flume.conf flume-hadoop.conf 进行如下修改: # The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' agent1.sources = avro-source1 agent1.channels = ch1 agent1.sinks = logger-sink1 # sources agent1.sources.avro-source1.type = syslogtcp agent1.sources.avro-source1.channels = ch1 agent1.sources.avro-source1.host = localhost agent1.sources.avro-source1.port = 5140 # sink agent1.sinks.logger-sink1.type = hdfs agent1.sinks.logger-sink1.channel = ch1 agent1.sinks.logger-sink1.hdfs.path = hdfs://localhost:9000/user/xiaosi/data agent1.sinks.logger-sink1.hdfs.filePrefix = SysLog agent1.sinks.logger-sink1.hdfs.round = true agent1.sinks.logger-sink1.hdfs.roundValue = 10 agent1.sinks.logger-sink1.hdfs.roundUnit = minute # channel agent1.channels.ch1.type = memory agent1.channels.ch1.capacity = 1000 agent1.channels.ch1.transactionCapacity = 100 4.6.2 启动Flume agent agent1 xiaosi@Qunar:/opt/apache-flume-1.6.0-bin/bin$ flume-ng agent -c . -f ../conf/flume-hadoop.conf -n agent1 -Dflume.root.logger=INFO,console 4.6.3 测试产生syslog xiaosi@Qunar:/opt/apache-flume-1.6.0-bin$ echo "Hello Flume -> Hadoop one" | nc localhost 5140 4.6.4 查看信息 在启动agent的控制窗口,可以看到一下信息: xiaosi@Qunar:/opt/apache-flume-1.6.0-bin/bin$ flume-ng agent -c . -f ../conf/flume-hadoop.conf -n agent1 -Dflume.root.logger=INFO,console Info: Including Hadoop libraries found via (/opt/hadoop-2.7.2/bin/hadoop) for HDFS access Info: Excluding /opt/hadoop-2.7.2/share/hadoop/common/lib/slf4j-api-1.7.10.jar from classpath Info: Excluding /opt/hadoop-2.7.2/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar from classpath ... SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/apache-flume-1.6.0-bin/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/apache-hive-2.0.0-bin/lib/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/apache-hive-2.0.0-bin/lib/hive-jdbc-2.0.0-standalone.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 16/09/19 13:34:58 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting 16/09/19 13:34:58 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:../conf/flume-hadoop.conf 16/09/19 13:34:58 INFO conf.FlumeConfiguration: Processing:logger-sink1 16/09/19 13:34:58 INFO conf.FlumeConfiguration: Processing:logger-sink1 16/09/19 13:34:58 INFO conf.FlumeConfiguration: Processing:logger-sink1 16/09/19 13:34:58 INFO conf.FlumeConfiguration: Added sinks: logger-sink1 Agent: agent1 16/09/19 13:34:58 INFO conf.FlumeConfiguration: Processing:logger-sink1 16/09/19 13:34:58 INFO conf.FlumeConfiguration: Processing:logger-sink1 16/09/19 13:34:58 INFO conf.FlumeConfiguration: Processing:logger-sink1 16/09/19 13:34:58 INFO conf.FlumeConfiguration: Processing:logger-sink1 16/09/19 13:34:58 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent1] 16/09/19 13:34:58 INFO node.AbstractConfigurationProvider: Creating channels 16/09/19 13:34:58 INFO channel.DefaultChannelFactory: Creating instance of channel ch1 type memory 16/09/19 13:34:58 INFO node.AbstractConfigurationProvider: Created channel ch1 16/09/19 13:34:58 INFO source.DefaultSourceFactory: Creating instance of source avro-source1, type syslogtcp 16/09/19 13:34:58 INFO sink.DefaultSinkFactory: Creating instance of sink: logger-sink1, type: hdfs 16/09/19 13:34:58 INFO node.AbstractConfigurationProvider: Channel ch1 connected to [avro-source1, logger-sink1] 16/09/19 13:34:58 INFO node.Application: Starting new configuration:{ sourceRunners:{avro-source1=EventDrivenSourceRunner: { source:org.apache.flume.source.SyslogTcpSource{name:avro-source1,state:IDLE} }} sinkRunners:{logger-sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@569671b3 counterGroup:{ name:null counters:{} } }} channels:{ch1=org.apache.flume.channel.MemoryChannel{name: ch1}} } 16/09/19 13:34:58 INFO node.Application: Starting Channel ch1 16/09/19 13:34:58 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: ch1: Successfully registered new MBean. 16/09/19 13:34:58 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: ch1 started 16/09/19 13:34:58 INFO node.Application: Starting Sink logger-sink1 16/09/19 13:34:58 INFO node.Application: Starting Source avro-source1 16/09/19 13:34:58 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: logger-sink1: Successfully registered new MBean. 16/09/19 13:34:58 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: logger-sink1 started 16/09/19 13:34:58 INFO source.SyslogTcpSource: Syslog TCP Source starting... 16/09/19 13:35:06 WARN source.SyslogUtils: Event created from Invalid Syslog data. 16/09/19 13:35:07 INFO hdfs.HDFSSequenceFile: writeFormat = Writable, UseRawLocalFileSystem = false 16/09/19 13:35:07 INFO hdfs.BucketWriter: Creating hdfs://localhost:9000/user/xiaosi/data/SysLog.1474263307767.tmp 4.6.5 查看HDFS xiaosi@Qunar:/opt/hadoop-2.7.2/sbin$ hadoop fs -ls /user/xiaosi/data Found 3 items -rw-r--r-- 1 xiaosi supergroup 141 2016-09-19 13:35 /user/xiaosi/data/SysLog.1474263307767 -rw-r--r-- 1 xiaosi supergroup 1350 2016-07-28 14:10 /user/xiaosi/data/mysql-result.txt -rw-r--r-- 3 xiaosi supergroup 26 2016-07-30 22:47 /user/xiaosi/data/num.txt xiaosi@Qunar:/opt/hadoop-2.7.2/sbin$ hadoop fs -text /user/xiaosi/data/SysLog.1474263307767 1474263309104 48 65 6c 6c 6f 20 46 6c 75 6d 65 20 2d 3e 20 48 61 64 6f 6f 70 20 20 6f 6e 65 xiaosi@Qunar:/opt/hadoop-2.7.2/sbin$ hadoop fs -cat /user/xiaosi/data/SysLog.1474263307767 SEQ