首页 文章 精选 留言 我的

精选列表

搜索[java],共10000篇文章
优秀的个人博客,低调大师

spark基本操作 java

1.map算子 private static void map() { //创建SparkConf SparkConf conf = new SparkConf() .setAppName("map") .setMaster("local"); //创建JavasparkContext JavaSparkContext sc = new JavaSparkContext(conf); //构造集合 List<Integer> numbers = Arrays.asList(1,2,3,4,5); //并行化集合,创建初始RDD JavaRDD<Integer> numberRDD = sc.parallelize(numbers); //使用map算子,将集合中的每个元素都乘以2 JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer, Integer>() { @Override public Integer call(Integer v1) throws Exception { return v1 * 2; } }); //打印新的RDD multipleNumberRDD.foreach(new VoidFunction<Integer>() { @Override public void call(Integer t) throws Exception { System.out.println(t); } }); //关闭JavasparkContext sc.close(); } 2.filter算子 private static void filter() { //创建SparkConf SparkConf conf = new SparkConf() .setAppName("filter") .setMaster("local"); //创建JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); //模拟集合 List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10); //并行化集合,创建初始RDD JavaRDD<Integer> numberRDD = sc.parallelize(numbers); //对集合使用filter算子,过滤出集合中的偶数 JavaRDD<Integer> evenNumberRDD = numberRDD.filter(new Function<Integer, Boolean>() { @Override public Boolean call(Integer v1) throws Exception { return v1%2==0; } }); evenNumberRDD.foreach(new VoidFunction<Integer>() { @Override public void call(Integer t) throws Exception { System.out.println(t); } }); sc.close(); } 3.flatMap算子 Spark 中 map函数会对每一条输入进行指定的操作,然后为每一条输入返回一个对象; 而flatMap函数则是两个操作的集合——正是“先映射后扁平化”: 操作1:同map函数一样:对每一条输入进行指定的操作,然后为每一条输入返回一个对象 操作2:最后将所有对象合并为一个对象 private static void flatMap() { SparkConf conf = new SparkConf() .setAppName("flatMap") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); List<String> lineList = Arrays.asList("hello you","hello me","hello world"); JavaRDD<String> lines = sc.parallelize(lineList); //对RDD执行flatMap算子,将每一行文本,拆分为多个单词 JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { //在这里,传入第一行,hello,you //返回的是一个Iterable<String>(hello,you) @Override public Iterable<String> call(String t) throws Exception { return Arrays.asList(t.split(" ")); } }); words.foreach(new VoidFunction<String>() { @Override public void call(String t) throws Exception { System.out.println(t); } }); sc.close(); } 4.groupByKey算子 private static void groupByKey() { SparkConf conf = new SparkConf() .setAppName("groupByKey") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); List<Tuple2<String, Integer>> scoreList = Arrays.asList( new Tuple2<String, Integer>("class1", 80), new Tuple2<String, Integer>("class2", 90), new Tuple2<String, Integer>("class1", 97), new Tuple2<String, Integer>("class2", 89)); JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList); //针对scoresRDD,执行groupByKey算子,对每个班级的成绩进行分组 //相当于是,一个key join上的所有value,都放到一个Iterable里面去了 JavaPairRDD<String, Iterable<Integer>> groupedScores = scores.groupByKey(); groupedScores.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() { @Override public void call(Tuple2<String, Iterable<Integer>> t) throws Exception { System.out.println("class:" + t._1); Iterator<Integer> ite = t._2.iterator(); while(ite.hasNext()) { System.out.println(ite.next()); } } }); } 5.reduceByKey算子 private static void reduceByKey() { SparkConf conf = new SparkConf() .setAppName("reduceByKey") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); List<Tuple2<String, Integer>> scoreList = Arrays.asList( new Tuple2<String, Integer>("class1", 80), new Tuple2<String, Integer>("class2", 90), new Tuple2<String, Integer>("class1", 97), new Tuple2<String, Integer>("class2", 89)); JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList); //reduceByKey算法返回的RDD,还是JavaPairRDD<key,value> JavaPairRDD<String, Integer> totalScores = scores.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); totalScores.foreach(new VoidFunction<Tuple2<String,Integer>>() { @Override public void call(Tuple2<String, Integer> t) throws Exception { System.out.println(t._1 + ":" + t._2); } }); sc.close(); } 6.sortByKey算子 private static void sortByKey() { SparkConf conf = new SparkConf() .setAppName("sortByKey") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); List<Tuple2<Integer, String>> scoreList = Arrays.asList( new Tuple2<Integer, String>(78, "marry"), new Tuple2<Integer, String>(89, "tom"), new Tuple2<Integer, String>(72, "jack"), new Tuple2<Integer, String>(86, "leo")); JavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList); JavaPairRDD<Integer, String> sortedScores = scores.sortByKey(); sortedScores.foreach(new VoidFunction<Tuple2<Integer,String>>() { @Override public void call(Tuple2<Integer, String> t) throws Exception { System.out.println(t._1 + ":" + t._2); } }); sc.close(); } 7.join算子 join算子用于关联两个RDD,join以后,会根据key进行join,并返回JavaPairRDD。JavaPairRDD的第一个泛型类型是之前两个JavaPairRDD的key类型,因为通过key进行join的。第二个泛型类型,是Tuple2<v1, v2>的类型,Tuple2的两个泛型分别为原始RDD的value的类型 private static void join() { SparkConf conf = new SparkConf() .setAppName("join") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); List<Tuple2<Integer, String>> studentList = Arrays.asList( new Tuple2<Integer, String>(1, "tom"), new Tuple2<Integer, String>(2, "jack"), new Tuple2<Integer, String>(3, "marry"), new Tuple2<Integer, String>(4, "leo")); List<Tuple2<Integer, Integer>> scoreList = Arrays.asList( new Tuple2<Integer, Integer>(1, 78), new Tuple2<Integer, Integer>(2, 87), new Tuple2<Integer, Integer>(3, 89), new Tuple2<Integer, Integer>(4, 98)); //并行化两个RDD JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);; JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList); //使用join算子关联两个RDD //join以后,会根据key进行join,并返回JavaPairRDD //JavaPairRDD的第一个泛型类型,之前两个JavaPairRDD的key类型,因为通过key进行join的 //第二个泛型类型,是Tuple2<v1, v2>的类型,Tuple2的两个泛型分别为原始RDD的value的类型 JavaPairRDD<Integer, Tuple2<String, Integer>> studentScores = students.join(scores); //打印 studentScores.foreach(new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() { @Override public void call(Tuple2<Integer, Tuple2<String, Integer>> t) throws Exception { System.out.println("student id:" + t._1); System.out.println("student name:" + t._2._1); System.out.println("student score:" + t._2._2); System.out.println("=========================="); } }); sc.close(); } 更深的方法参见:http://blog.csdn.net/liulingyuan6/article/details/53397780http://blog.csdn.net/liulingyuan6/article/details/53410832https://www.2cto.com/net/201608/543044.html 转自:http://blog.csdn.net/sunhaoning/article/details/70505718 本文转自whk66668888 51CTO博客,原文链接:http://blog.51cto.com/12597095/2063767

优秀的个人博客,低调大师

JAVA性能测试初体验

序言:做自动化测试的时候,一直没想过要去做性能,等到现在做性能的时候,才明白这本身就是一个必须都要经历的过程,就像编程一样,编写小型软件的时候,我们不用过多关注架构和性能,但是等成长到一定时候,就会需要关注软件的可复用性(这是由开发成本决定,这点可以在软件架构上去改善,常说的自动化框架也是为了增强脚本的可复用性和可维护性)、性能瓶颈(这是由系统资源成本决定,空间和时间的调配)、可测试行(这能大大提高测试人员的测试效率,很多时候我们要求开发提供一种测试的接口来方便测试人员进行测试)、可部署性(利用make、ant或者maven,能够大大提高软件发布效率,这也是持续集成中的一种手段)等,因此,测试中的发展其实可以有很多的,不仅关注测试手段,还要关注如何在更多的途径上提高测试效率。下面是对本次性能测试项目至今的一些简单总结,欢迎指正。 一、性能测试项目的背景 性能测试缘起于产品存在大量背景数据时,程序响应时间过慢,而且在特定的情况下有可能会造成一些数据上报丢失,所以需要定位。 产品为C/S架构,采用的协议是snmp协议,运行在jvm上。 二、性能测试的策略 1、测试目的的确定 1)系统监控,包括cpu、内存、线程使用情况,在大数据情况下,发现问题,帮助修正代码结构,系统结构,提高系统的运行效率。 2)确定软件运行资源需求指标。 2、性能测试指标确定 1)确定指标来源,主要包括:产品规格、行业标准、客户需求与故障场景等 2)确定测试特性,例如:系统容量、及时性、稳定性、抗压性、资源利用性等,这些特性可以根据行业性能测试特性以及产品的相关特性来决定。 3)确定具体指标,包括数目和单位。 3、性能测试技术储备 其实性能测试可以算得上是自动化测试的一种大数据测试 1)测试场景准备:准备测试场景,可以理解为对背景数据的构造,其实可以将这种构造理解为另类的接口测试,例如:我们的软件服务器是应用SNMP协议进行通信,设备端有一个agent,专门用来与软件服务器端通信,那么可以虚拟出这么一个agent,保存相应的设备信息,虚拟过程可以通过对在网的实际设备进行录制,然后生成。 互联网中,客户端与服务器的交涉是基于http接口协议,其一般的性能测试都是发送大量的http请求,其实这种过程有一个问题就是无法模拟真实的背景数据,因为报文过于单一,而印象很深的是新浪一位朋友开发的tcpcopy工具,在传输层,将线上数据复制到测试场景下,从而成功模拟了真实场景环境,这是一种很好的测试方法。 (还有一种准备工作就是对测试服务器的选型,包括操作系统类型、CPU内核数目、内存数目等) 2)测试数据准备:这其实就是接口数据,在互联网中,这方面的模拟比较简单,用很多工具,例如LR、jmeter、soaupi等都可以成功构造模拟http报文,从而查看服务器的响应。因为我们采用的是snmp协议,所以业内没有这样的snmp接口工具,所以就自己基于snmp协议包开发了其snmp报文模拟工具。 3)性能测试监控:性能测试过程中,对软件系统服务器的监控是关键,例如:web测试中,往往会对web服务器和数据库服务器、操作系统的指标性能进行监控,因为我们的软件是运行在jvm上,所以直接采用jconsole或者jprofiler监控服务器的内存使用、cpu使用、各个线程使用情况,还有对数据库和操作系统的监控等。 4、性能测试方法 1)基于指标,进行测试数据构造测试,查看系统是否工作正常以及监测是否没有问题。 2)基于指标,在基于测试数据测试的同时,由测试人员参与进行操作,测试在特定环境下的系统工作情况。 3)客户场景模拟测试。 4)随机测试,利用算法进行大量随机数据构造。 三、性能测试调优 1、性能测试是一个不断探索和不断完善的一个测试过程。 调优步骤:衡量系统现状、设定调优目标、寻找性能瓶颈、性能调优、衡量是否到达目标(如果未到达目标,需重新寻找性能瓶颈)、性能调优结束 2、衡量现状,系统性能主要存在问题 1)内存泄露 2)内存占用过大,响应速率慢 3)线程数不断增加,出现死锁或空闲线程 4)某些类实例化数目过多,占用多余的内存空间 3、内存泄露 1)检验方式:内存泄露需要进行时长测试,既将监控界面及系统界面全部打开,进行长时间运行(如12小时),观察系统类的增长情况。 2)问题定位:若出现JVM的Heap持续增长或者Memory views经过时长测试,出现较大规模的红色部分(增长部分),且无法GC。 4、系统内存占用大 1)检验方式:进行某些特定的操作,系统进行大量内存占用或者数据读写操作。 2)问题定位:若系统内存数突发性的增长,且之后不回落,说明某些模块在持续性的占用系统资源。或者出现JVM的Heap有增长,虽不是持续增长,但一直无法回落。 5、线程数目过多或死锁 1)检验方式:进行某些特定的操作,可以使系统产生大量线程操作。 2)问题定位:若系统线程数突发性的增长或持续增长,且之后不回落,说明某些模块在持续性的占用线程。或者观察是否有许多线程来自同一个模块、长期处于waiting或block状态 6、性能调优原则 调优过程中,充分而不过分使用硬件资源、不要一遇到问题就去改善资源环境,然后,合理调整JVM,需要重点掌握一些JVM的参数,并且要善于分析系统架构和JVM的底层工作机制。 总结:性能测试是一个很漫长的过程,不管是做JVM性能测试、WEB架构方面的性能测试,其实道理是相通的,个人觉得,要做好性能测试,不仅要对测试理解,更要对软件架构和底层的服务器工作机制特别理解,不然,往往,你只能去简单做一些所谓的性能测试操作,但是却无法针对很多场景提供有效的测试策略和调优建议。好的性能测试工程师应该是能够快速搭建场景定位问题、提供指标,并且能够对软件系统架构提出有效建议。共勉之 ====================================分割线================================ 最新内容请见作者的GitHub页:http://qaseven.github.io/

优秀的个人博客,低调大师

Java代码调用MaxCompute

MaxCompute的客户端的工具odpscmd是好东西,什么都能干。但是它不方便在自己的代码做很好的集成,毕竟它是个Shell脚本。那有什么办法把MaxCompute的作业、设置和自己的代码做无缝集成呢,MaxComput SDK就能干这个。本文就实际的工作中最常见的几个场景,做一些示例。详细的使用可以在Maven上下到SDK的文档说明。其实这里面的很多写法都是文档里有的,或者在帮助文档里有写过类似的例子。这里就做算是做个整理吧。 对象操作 其实官方的SDK文档里,对这方面的介绍是最多的了,可以参考这里。这里我再针对实际场景里的比较多的创建表和分区做个例子,相信看完这些后对这方面就没有疑惑了 String access_id = "your access id"; String access_key =

资源下载

更多资源
优质分享App

优质分享App

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

Nacos

Nacos

Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。

Sublime Text

Sublime Text

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。

用户登录
用户注册