Spark-再接着上次的Lamda架构
日志分析
单机日志分析,适用于小数据量的。(最大10G),awk/grep/sort/join等都是日志分析的利器。
例子:
1、shell得到Nginx日志中访问量最高的前十个IP
cat access.log.10 | awk '(a[$1]++) END (for(b in a) print b"\t"a[b])' | sort -k2 -r | head -n 10
2、python 统计每个IP的地址点击数
import re import sys contents=sys.argv[1] def NginxIpHit(logfile_path): ipadd = r'\.'.join([r'\d{1,3}']*4) re_ip = re.compile(ipadd) iphitlisting = {} for line in open(contents): match = re_ip.match(line) if match: ip = match.group() iphitlisting[ip]=iphitlisting.get(ip,0)+1 print iphitlisting NginxIpHit(contents)
**大规模的日志处理,日志分析指标:
PV、UV、PUPV、漏斗模型和准化率、留存率、用户属性
最终用UI展示各个指标的信息。**
架构
- 1、实时日志处理流线
数据采集:采用Flume NG进行数据采集
数据汇总和转发:用Flume 将数据转发和汇总到实时消息系统Kafka
数据处理:采用spark streming 进行实时的数据处理
结果显示:flask作为可视化工具进行结果显示
- 2、离线日志处理流线
数据采集:通过Flume将数据转存到HDFS
数据处理:使用spark sql进行数据的预处理
结果呈现:结果汇总到mysql上,最后使用flask进行结果的展现
Lamda架构:低响应延迟的组合数据传输环境。
查询过程:一次流处理、一次批处理。对应着实时和离线处理。
项目流程
安装flume
Flume进行日志采集,web端的日志一般Nginx、IIS、Tomcat等。Tomcat的日志在var/data/log
安装jdk
安装Flume
wget http://mirrors.cnnic.cn/apache/flume/1.5.0/apache-flume-1.5.0-bin.tar.gz tar –zxvf apache-flume-1.5.0-bin.tar.gz mv apache-flume-1.5.0 –bin apache-flume-1.5.0 ln -s apache-flume-1.5.0 fiume
环境变量配置
Vim /etc/profile Export JAVA_HOME=/usr/local/jdk Export CLASS_PATH = .:$ JAVA_HOME/lib/dt.jar: $ JAVA_HOME/lib/tools.jar Export PATH=$ PATH:$ JAVA_HOME/bin Export FlUME_HOME=/usr/local/flume Export FlUME_CONF_DIR=$ FlUME_HOME/conf Export PATH=$ PATH:$ FlUME_HOME /bin Souce /etc/profile
创建agent配置文件将数据输出到hdfs上,修改flume.conf:
a1.sources = r1 a1.sinks = k1 a1.channels =c1 描述和配置sources 第一步:配置数据源 a1.sources.r1.type =exec a1.sources.r1.channels =c1 配置需要监控的日志输出目录 a1.sources.r1.command=tail –f /va/log/data 第二步:配置数据输出 a1.sink.k1.type =hdfs a1.sink.k1.channels =c1 a1.sink.k1.hdfs.useLocalTimeStamp=true a1.sink.k1.hdfs.path =hdfs://192.168.11.177:9000/flume/events/%Y/%m/%d/%H/%M a1.sink.k1.hdfs.filePrefix =cmcc a1.sink.k1.hdfs.minBlockReplicas=1 a1.sink.k1.hdfs.fileType =DataStream a1.sink.k1.hdfs.writeFormat=Text a1.sink.k1.hdfs.rollInterval =60 a1.sink.k1.hdfs.rollSize =0 a1.sink.k1.hdfs.rollCount=0 a1.sink.k1.hdfs.idleTimeout =0 配置数据通道 a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100 第四步:将三者级联 a1.souces.r1.channels =c1 a1.sinks.k1.channel =c1
启动Flume Agent
cd /usr/local/flume nohup bin/flume-ng agent –n conf -f conf/flume-conf.properties &
已经将flume整合到了hdfs中
- 整合Flume、kafka、hhdfs
#hdfs输出端 a1.sink.k1.type =hdfs a1.sink.k1.channels =c1 a1.sink.k1.hdfs.useLocalTimeStamp=true a1.sink.k1.hdfs.path =hdfs://192.168.11.174:9000/flume/events/%Y/%m/%d/%H/%M a1.sink.k1.hdfs.filePrefix =cmcc-%H a1.sink.k1.hdfs.minBlockReplicas=1 a1.sink.k1.hdfs.fileType =DataStream a1.sink.k1.hdfs.rollInterval =3600 a1.sink.k1.hdfs.rollSize =0 a1.sink.k1.hdfs.rollCount=0 a1.sink.k1.hdfs.idleTimeout =0 #kafka输出端 为了提高性能使用内存通道 a1.sink.k2.type =com.cmcc.chiwei.Kafka.CmccKafkaSink a1.sink.k2.channels =c2 a1.sink.k2.metadata.broker.List=192.168.11.174:9002;192.168.11.175:9092; 192.168.11.174:9092 a1.sink.k2.partion.key =0 a1.sink.k2.partioner.class= com.cmcc.chiwei.Kafka.Cmcc Partion a1.sink.k2.serializer.class= kafka. Serializer.StringEncoder a1.sink.k2.request.acks=0 a1.sink.k2.cmcc.encoding=UTF-8 a1.sink.k2.cmcc.topic.name=cmcc a1.sink.k2.producer.type =async a1.sink.k2.batchSize =100 a1.sources.r1.selector.type=replicating a1.sources = r1 a1.sinks = k1 k2 a1.channels =c1 c2 #c1 a1.channels.c1.type=file a1.channels.c1.checkpointDir=/home/flume/flumeCheckpoint a1.channels.c1.dataDir=/home/flume/flumeData, /home/flume/flumeDataExt a1.channels.c1.capacity=2000000 a1.channels.c1.transactionCapacity=100 #c2 a1.channels.c2.type=memory a1.channels.c2.capacity=2000000 a1.channels.c2.transactionCapacity=100
用Kafka将日志汇总
1.4 Tar –zxvf kafka_2.10-0.8.1.1.tgz 1.5 配置kafka和zookeeper文件 配置zookeeper.properties dataDir=/tmp/zookeeper client.Port=2181 maxClientCnxns = 0 initLimit = 5 syncLimit = 2 ## server.43 = 10.190.182.43:2888:3888 server.38 = 10.190.182.38:2888:3888 server.33 = 10.190.182.33:2888:3888
配置zookeeper myid
在每个服务器dataDir 创建 myid文件 写入本机id //server.43 myid 本机编号43 echo “43” > /tmp/ zookeeper/myid 配置kafka文件, config/server.properties 每个节点根据不同主机名配置 broker.id :43 host.name:10.190.172.43 zookeeper.connect=10.190.172.43:2181, 10.190.172.33:2181,10.190.172.38:2181
启动zookeeper
kafka通过zookeeper存储元数据,先启动它,提供kafka相应的连接地址
Kafka自带的zookeeper
在每个节点 bin/zookeeper-server-start.sh config/zookeeper. properties
启动Kafka
Bin/Kafka-server-start.sh
创建和查看topic
Topic和flume中的要一致,spark streming 也用的这个
Bin/ Kafka-topics.sh --create --zookeeper 10.190.172.43:2181 --replication-factor 1 -- partions 1 --topic KafkaTopic
查看下:
Bin/ Kafka-topics.sh --describe -- zookeeper 10.190.172.43:2181
整合kafka sparkstreming
Buid.sbt Spark-core Spark-streming Spark-streamng-kafka kafka
- Spark streming 实时分析
数据收集和中转已经好了,kafka给sparkstreming - Spark sql 离线分析
- Flask可视化
代码
移步: github.com/jinhang

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
[解决]java.io.IOException: Cannot obtain block length for LocatedBlock
在hadoop测试集群运行job的过程中发现部分运行失败,有Cannot obtain block length for LocatedBlock,使用hdfs dfs -cat ${文件}的时候也报这个错,看过代码后发现是文件未被正常关闭(flume传到hdfs上的,可能由于Namenode长时间gc或者切换或者重启会留下很多.tmp结尾的文件),为此,使用hadoop命令显示处于打开状态的文件,然后删除这些文件: hadoop fsck / -openforwrite | egrep -v '^\.+$' | egrep "MISSING|OPENFORWRITE" | grep -o "/[^ ]*" | sed -e "s/:$//" | xargs -i hadoop fs -rmr {}; 然后重传这些文件到hdfs。
- 下一篇
Spark-ML-基于云平台和用户日志的推荐系统
架构: 数据收集:spark stareming从Azure Queue收集数据,通过自定义的spark stareming receiver,源源不断的消费流式数据。 数据处理: spark stareming分析用户行为日志数据,通过实时的聚集,统计报表现有的应用的运营信息,,也可以通过离线的训练模型,对实现数据进行预测和标注。 结果输出:hdfs 数据收集用到了这个东西,miner是个js可以收集用户的行为日志,前端收集和回传用户行为日志。 静态数据 动态数据 代码见:https://github.com/jinhang/jquery-behavior-miner 前台收集发送给Azure spark streaming 分析日志 spark 训练ALS spark使用ALS进行推荐 协同过滤 协同过滤(Collaborative Filtering, 简称CF),wiki上的定义是:简单来说是利用某兴趣相投、拥有共同经验之群体的喜好来推荐使用者感兴趣的资讯,个人透过合作的机制给予资讯相当程度的回应(如评分)并记录下来以达到过滤的目的进而帮助别人筛选资讯,回应不一定局限于特别感兴...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- SpringBoot2整合Redis,开启缓存,提高访问速度
- Windows10,CentOS7,CentOS8安装Nodejs环境
- MySQL8.0.19开启GTID主从同步CentOS8
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Hadoop3单机部署,实现最简伪集群
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- 设置Eclipse缩进为4个空格,增强代码规范