Intellij idea配置Spark开发环境,统计哈姆雷特词频(2)
idea 新建maven 项目 输入maven坐标 maven 坐标 编辑maven文件 Spark 体系 中间层Spark,即核心模块Spark Core,必须在maven中引用。 编译Spark还要声明java8编译工具。 <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> </plugins> </build> idea自动加载引用,在窗口左侧Project导航栏-->External Libraries中看到引用org.apache.spark中spark-core_2.11-2.1.0.jar文件。 idea Externel Libraries 注:Spark Streaming是流式计算框架、SparkSQL数据库工具、Mlib机器学习框架、GraphX图计算工具。 Java 8 lambda函数风格的wordCount //定义单词总数累加器、和停用词累加器 Accumulator countTotal = jsc.accumulator(0); Accumulator stopTotal = jsc.accumulator(0); // 文件初始化RDD JavaRDD<String> stopword = jsc.textFile("data/text/stopword.txt"); JavaRDD<String> rdd = jsc.textFile("data/text/Hamlet.txt"); // RDD 转换为List List<String> stopWordList = stopword.collect(); // Broadcast 广播变量,task共享executor的变量 Broadcast<List<String>> broadcastedStopWordSet = jsc.broadcast(stopWordList); rdd.filter(l->l.length()>0) .flatMap(l-> Arrays.asList(l.trim().split(" ")).iterator()) // 将line分割展成词向量,词向量在连接,返回Rdd<String> .map(v->v.replaceAll("['.,:;?!-]", "").toLowerCase()) // 特殊字符处理, Rdd<String> .filter(v->{ boolean isStop = false; countTotal.add(1); if(broadcastedStopWordSet.value().contains(v)){ stopTotal.add(1); isStop = true; } return !isStop; }) //遍历总数计数、停用词计数,过滤停止词, Rdd<String> .mapToPair(v-> new Tuple2<>(v,1)) .reduceByKey((v1,v2)->v1+v2) //统计个数 .mapToPair(p-> new Tuple2<>(p._2,p._1)) .sortByKey(false) //排序 .take(10).forEach(e->{ System.out.println(e._2+":"+e._1); }); 将line分割展成词向量,词向量连接,flatmap返回Rdd<String> 特殊字符处理,返回 Rdd<String> 遍历总数计数、停用词计数,过滤停止词, 返回Rdd<String> Reduce Rdd<String,1>,返回Rdd<String,total> 排序 SortByKey,返回 Rdd<String,total> 后期有更多案例介绍Java 8 lambda风格的RDD开发