您现在的位置是:首页 > 文章详情

SparkStreaming+Kafka 实现统计基于缓存的实时uv

日期:2018-07-05点击:354
版权声明:本文由董可伦首发于https://dongkelun.com,非商业转载请注明作者及原创出处。商业转载请联系作者本人。 https://blog.csdn.net/dkl12/article/details/80943294

我的原创地址:https://dongkelun.com/2018/06/25/KafkaUV/

前言

本文利用SparkStreaming+Kafka实现实时的统计uv,即独立访客,一个用户一天内访问多次算一次,这个看起来要对用户去重,其实只要按照WordCount的思路,最后输出key的数量即可,所以可以利用SparkStreaming+Kafka 实现基于缓存的实时wordcount程序,这里稍加改动,如果uv数量增加的话就打印uv的数量(key的数量)。

1、数据

数据是我随机在kafka里生产的几条,用户以空格区分开(因为用的之前单词统计的程序)

2、kafka topic

首先在kafka建一个程序用到topic:KafkaUV

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic KafkaUV

3、创建checkpoint的hdfs目录

我的目录为:/spark/dkl/kafka/UV_checkpoint

hadoop fs -mkdir -p /spark/dkl/kafka/UV_checkpoint

4、Spark代码

启动下面的程序

package com.dkl.leanring.spark.kafka import org.apache.spark.streaming.StreamingContext import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.Seconds import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe object KafkaUV { def main(args: Array[String]): Unit = { //初始化,创建SparkSession val spark = SparkSession.builder().appName("KafkaUV").master("local[2]").enableHiveSupport().getOrCreate() //初始化,创建sparkContext val sc = spark.sparkContext //初始化,创建StreamingContext,batchDuration为5秒 val ssc = new StreamingContext(sc, Seconds(5)) //开启checkpoint机制 ssc.checkpoint("hdfs://ambari.master.com:8020/spark/dkl/kafka/UV_checkpoint") //kafka集群地址 val server = "ambari.master.com:6667" //配置消费者 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> server, //kafka集群地址 "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "UpdateStateBykeyWordCount", //消费者组名 "auto.offset.reset" -> "latest", //latest自动重置偏移量为最新的偏移量 earliest 、none "enable.auto.commit" -> (false: java.lang.Boolean)) //如果是true,则这个消费者的偏移量会在后台自动提交 val topics = Array("KafkaUV") //消费主题 //基于Direct方式创建DStream val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)) //开始执行WordCount程序 //以空格为切分符切分单词,并转化为 (word,1)形式 val words = stream.flatMap(_.value().split(" ")).map((_, 1)) val wordCounts = words.updateStateByKey( //每个单词每次batch计算的时候都会调用这个函数 //第一个参数为每个key对应的新的值,可能有多个,比如(hello,1)(hello,1),那么values为(1,1) //第二个参数为这个key对应的之前的状态 (values: Seq[Int], state: Option[Int]) => { var newValue = state.getOrElse(0) values.foreach(newValue += _) Option(newValue) }) //共享变量,便于后面的比较是否用新的uv val accum = sc.longAccumulator("uv") wordCounts.foreachRDD(rdd => { //如果uv增加 if (rdd.count > accum.value) { //打印uv println(rdd.count) //将共享变量的值更新为新的uv accum.add(rdd.count - accum.value) } }) ssc.start() ssc.awaitTermination() } }

5、生产几条数据

随便写几条即可

bin/kafka-console-producer.sh --broker-list ambari.master.com:6667 --topic KafkaUV 

6、结果

根据结果可以看到,既做到了历史消息用户的累计,也做到了用户的去重

原文链接:https://yq.aliyun.com/articles/676190
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章