版权声明:本文由董可伦首发于https://dongkelun.com,非商业转载请注明作者及原创出处。商业转载请联系作者本人。 https://blog.csdn.net/dkl12/article/details/80943294
我的原创地址:https://dongkelun.com/2018/06/25/KafkaUV/
前言
本文利用SparkStreaming+Kafka实现实时的统计uv,即独立访客,一个用户一天内访问多次算一次,这个看起来要对用户去重,其实只要按照WordCount的思路,最后输出key的数量即可,所以可以利用SparkStreaming+Kafka 实现基于缓存的实时wordcount程序,这里稍加改动,如果uv数量增加的话就打印uv的数量(key的数量)。
1、数据
数据是我随机在kafka里生产的几条,用户以空格区分开(因为用的之前单词统计的程序)
2、kafka topic
首先在kafka建一个程序用到topic:KafkaUV
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic KafkaUV
3、创建checkpoint的hdfs目录
我的目录为:/spark/dkl/kafka/UV_checkpoint
hadoop fs -mkdir -p /spark/dkl/kafka/UV_checkpoint
4、Spark代码
启动下面的程序
package com.dkl.leanring.spark.kafka
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.Seconds
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
object KafkaUV {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("KafkaUV").master("local[2]").enableHiveSupport().getOrCreate()
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint("hdfs://ambari.master.com:8020/spark/dkl/kafka/UV_checkpoint")
val server = "ambari.master.com:6667"
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> server,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "UpdateStateBykeyWordCount",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean))
val topics = Array("KafkaUV")
val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
val words = stream.flatMap(_.value().split(" ")).map((_, 1))
val wordCounts = words.updateStateByKey(
(values: Seq[Int], state: Option[Int]) => {
var newValue = state.getOrElse(0)
values.foreach(newValue += _)
Option(newValue)
})
val accum = sc.longAccumulator("uv")
wordCounts.foreachRDD(rdd => {
if (rdd.count > accum.value) {
println(rdd.count)
accum.add(rdd.count - accum.value)
}
})
ssc.start()
ssc.awaitTermination()
}
}
5、生产几条数据
随便写几条即可
bin/kafka-console-producer.sh --broker-list ambari.master.com:6667 --topic KafkaUV
![]()
6、结果
根据结果可以看到,既做到了历史消息用户的累计,也做到了用户的去重
![]()