SparkStreaming+Kafka 实现统计基于缓存的实时uv
我的原创地址:https://dongkelun.com/2018/06/25/KafkaUV/
前言
本文利用SparkStreaming+Kafka实现实时的统计uv,即独立访客,一个用户一天内访问多次算一次,这个看起来要对用户去重,其实只要按照WordCount的思路,最后输出key的数量即可,所以可以利用SparkStreaming+Kafka 实现基于缓存的实时wordcount程序,这里稍加改动,如果uv数量增加的话就打印uv的数量(key的数量)。
1、数据
数据是我随机在kafka里生产的几条,用户以空格区分开(因为用的之前单词统计的程序)
2、kafka topic
首先在kafka建一个程序用到topic:KafkaUV
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic KafkaUV
3、创建checkpoint的hdfs目录
我的目录为:/spark/dkl/kafka/UV_checkpoint
hadoop fs -mkdir -p /spark/dkl/kafka/UV_checkpoint
4、Spark代码
启动下面的程序
package com.dkl.leanring.spark.kafka
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.Seconds
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
object KafkaUV {
def main(args: Array[String]): Unit = {
//初始化,创建SparkSession
val spark = SparkSession.builder().appName("KafkaUV").master("local[2]").enableHiveSupport().getOrCreate()
//初始化,创建sparkContext
val sc = spark.sparkContext
//初始化,创建StreamingContext,batchDuration为5秒
val ssc = new StreamingContext(sc, Seconds(5))
//开启checkpoint机制
ssc.checkpoint("hdfs://ambari.master.com:8020/spark/dkl/kafka/UV_checkpoint")
//kafka集群地址
val server = "ambari.master.com:6667"
//配置消费者
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> server, //kafka集群地址
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "UpdateStateBykeyWordCount", //消费者组名
"auto.offset.reset" -> "latest", //latest自动重置偏移量为最新的偏移量 earliest 、none
"enable.auto.commit" -> (false: java.lang.Boolean)) //如果是true,则这个消费者的偏移量会在后台自动提交
val topics = Array("KafkaUV") //消费主题
//基于Direct方式创建DStream
val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
//开始执行WordCount程序
//以空格为切分符切分单词,并转化为 (word,1)形式
val words = stream.flatMap(_.value().split(" ")).map((_, 1))
val wordCounts = words.updateStateByKey(
//每个单词每次batch计算的时候都会调用这个函数
//第一个参数为每个key对应的新的值,可能有多个,比如(hello,1)(hello,1),那么values为(1,1)
//第二个参数为这个key对应的之前的状态
(values: Seq[Int], state: Option[Int]) => {
var newValue = state.getOrElse(0)
values.foreach(newValue += _)
Option(newValue)
})
//共享变量,便于后面的比较是否用新的uv
val accum = sc.longAccumulator("uv")
wordCounts.foreachRDD(rdd => {
//如果uv增加
if (rdd.count > accum.value) {
//打印uv
println(rdd.count)
//将共享变量的值更新为新的uv
accum.add(rdd.count - accum.value)
}
})
ssc.start()
ssc.awaitTermination()
}
}
5、生产几条数据
随便写几条即可
bin/kafka-console-producer.sh --broker-list ambari.master.com:6667 --topic KafkaUV
6、结果
根据结果可以看到,既做到了历史消息用户的累计,也做到了用户的去重

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
Spark Streamming+Kafka提交offset实现有且仅有一次
版权声明:本文由董可伦首发于https://dongkelun.com,非商业转载请注明作者及原创出处。商业转载请联系作者本人。 https://blog.csdn.net/dkl12/article/details/80943217 我的原创地址:https://dongkelun.com/2018/06/20/sparkStreamingOffsetOnlyOnce/ 前言 本文讲Spark Streamming使用Direct方式读取Kafka,并在输出(存储)操作之后提交offset到Kafka里实现程序读写操作有且仅有一次,即程序重启之后之前消费并且输出过的数据不再重复消费,接着上次消费的位置继续消费Kafka里的数据。 Spark Streamming+Kafka官方文档:http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html 1、提交offset的程序 根据官方文档可知,在spark代码里可以获取对应的offset信息,并且可以提交offset存储到kafka中。 代码: pack...
-
下一篇
Spark获取当前分区的partitionId
版权声明:本文由董可伦首发于https://dongkelun.com,非商业转载请注明作者及原创出处。商业转载请联系作者本人。 https://blog.csdn.net/dkl12/article/details/80943341 我的原创地址:https://dongkelun.com/2018/06/28/sparkGetPartitionId/ 前言 本文讲解Spark如何获取当前分区的partitionId,这是一位群友提出的问题,其实只要通过TaskContext.get.partitionId(我是在官网上看到的),下面给出一些示例。 1、代码 下面的代码主要测试SparkSession,SparkContext创建的rdd和df是否都支持。 package com.dkl.leanring.partition import org.apache.spark.sql.SparkSession import org.apache.spark.TaskContext /** * 获取当前分区的partitionId */ object GetPartitionIdDemo ...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- MySQL数据库在高并发下的优化方案
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2全家桶,快速入门学习开发网站教程
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Dcoker安装(在线仓库),最新的服务器搭配容器使用
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2配置默认Tomcat设置,开启更多高级功能