1 import org.apache.spark.storage.StorageLevel
2 import org.apache.spark.{HashPartitioner, SparkConf}
3 import org.apache.spark.streaming.kafka.KafkaUtils
4 import org.apache.spark.streaming.{Seconds, StreamingContext}
5
6 object KafkaWordCount {
7 val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
8 //iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))
9 iter.flatMap { case (x, y, z) => Some(y.sum + z.getOrElse(0)).map(i => (x, i)) }
10 }
11
12 def main(args: Array[String]) {
13 LoggerLevels.setStreamingLogLevels()
14 val Array(zkQuorum, group, topics, numThreads) = args
15 val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
16 val ssc = new StreamingContext(sparkConf, Seconds(5))
17 ssc.checkpoint("c://ck2")
18 //"alog-2016-04-16,alog-2016-04-17,alog-2016-04-18"
19 //"Array((alog-2016-04-16, 2), (alog-2016-04-17, 2), (alog-2016-04-18, 2))"
20 val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
21 val data = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER)
22 val words = data.map(_._2).flatMap(_.split(" "))
23 val wordCounts = words.map((_, 1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
24 wordCounts.print()//老师给的代码文件中没有这句话 必须要有一个Action,否则报错
25 //java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
26 ssc.start()
27 ssc.awaitTermination()
28 }
29 }
1 import kafka.serializer.StringDecoder
2 import org.apache.log4j.{Level, Logger}
3 import org.apache.spark.SparkConf
4 import org.apache.spark.rdd.RDD
5 import org.apache.spark.streaming.kafka.{KafkaManager, KafkaUtils}
6 import org.apache.spark.streaming.{Seconds, StreamingContext}
7
8
9 object DirectKafkaWordCount {
10
11 /* def dealLine(line: String): String = {
12 val list = line.split(',').toList
13 // val list = AnalysisUtil.dealString(line, ',', '"')// 把dealString函数当做split即可
14 list.get(0).substring(0, 10) + "-" + list.get(26)
15 }*/
16
17 def processRdd(rdd: RDD[(String, String)]): Unit = {
18 val lines = rdd.map(_._2)
19 val words = lines.map(_.split(" "))
20 val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
21 wordCounts.foreach(println)
22 }
23
24 def main(args: Array[String]) {
25 if (args.length < 3) {
26 System.err.println(
27 s"""
28 |Usage: DirectKafkaWordCount <brokers> <topics> <groupid>
29 | <brokers> is a list of one or more Kafka brokers
30 | <topics> is a list of one or more kafka topics to consume from
31 | <groupid> is a consume group
32 |
33 """.stripMargin)
34 System.exit(1)
35 }
36
37 Logger.getLogger("org").setLevel(Level.WARN)
38
39 val Array(brokers, topics, groupId) = args
40
41 // Create context with 2 second batch interval
42 val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
43 sparkConf.setMaster("local[*]")
44 sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "5")
45 sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
46
47 val ssc = new StreamingContext(sparkConf, Seconds(2))
48
49 // Create direct kafka stream with brokers and topics
50 val topicsSet = topics.split(",").toSet
51 val kafkaParams = Map[String, String](
52 "metadata.broker.list" -> brokers,
53 "group.id" -> groupId,
54 "auto.offset.reset" -> "smallest"
55 )
56
57 val km = new KafkaManager(kafkaParams)
58
59 val messages = km.createDirectStream[String, String, StringDecoder, StringDecoder](
60 ssc, kafkaParams, topicsSet)
61
62 messages.foreachRDD(rdd => {
63 if (!rdd.isEmpty()) {
64 // 先处理消息
65 processRdd(rdd)
66 // 再更新offsets
67 km.updateZKOffsets(rdd)
68 }
69 })
70
71 ssc.start()
72 ssc.awaitTermination()
73 }
74 }