首页 文章 精选 留言 我的

精选列表

搜索[官方],共10007篇文章
优秀的个人博客,低调大师

《Spark 官方文档》机器学习库(MLlib)指南

机器学习库(MLlib)指南 MLlib是Spark的机器学习(ML)库。旨在简化机器学习的工程实践工作,并方便扩展到更大规模。MLlib由一些通用的学习算法和工具组成,包括分类、回归、聚类、协同过滤、降维等,同时还包括底层的优化原语和高层的管道API。 MLllib目前分为两个代码包: spark.mllib包含基于RDD的原始算法API。 spark.ml则提供了基于DataFrames高层次的API,可以用来构建机器学习管道。 我们推荐您使用spark.ml,因为基于DataFrames的API更加的通用而且灵活。不过我们也会继续支持spark.mllib包。用户可以放心使用,spark.mllib还会持续地增加新的功能。不过开发者需要注意,如果新的算法能够适用于机器学习管道的概念,就应该将其放到spark.ml包中,如:特征提取器和转换器。 下面的列表列出了两个包的主要功能。 spark.mllib: 数据类型,算法以及工具 Data types(数据类型) Basic statistics(基础统计) summary statistics(摘要统计) correlations(相关性) stratified sampling(分层抽样) hypothesis testing(假设检验) streaming significance testing random data generation(随机数据生成) Classification and regression(分类和回归) linear models (SVMs, logistic regression, linear regression)(线性模型(SVM,逻辑回归,线性回归)) naive Bayes(朴素贝叶斯) decision trees(决策树) ensembles of trees (Random Forests and Gradient-Boosted Trees)(树套装(随机森林和梯度提升决策树)) isotonic regression(保序回归) Collaborative filtering(协同过滤) alternating least squares (ALS)(交替最小二乘(ALS)) Clustering(聚类) k-means(K-均值) Gaussian mixture(高斯混合) power iteration clustering (PIC)(幂迭代聚类(PIC)) latent Dirichlet allocation (LDA)(隐含狄利克雷分配) bisecting k-means(平分K-均值) streaming k-means(流式K-均值) Dimensionality reduction(降维) singular value decomposition (SVD)(奇异值分解(SVD)) principal component analysis (PCA)(主成分分析(PCA)) Feature extraction and transformation(特征抽取和转换) Frequent pattern mining(频繁模式挖掘) FP-growth(FP-增长) association rules(关联规则) PrefixSpan(PrefixSpan) Evaluation metrics(评价指标) PMML model export(PMML模型导出) Optimization (developer)(优化(开发者)) stochastic gradient descent(随机梯度下降) limited-memory BFGS (L-BFGS)(有限的记忆BFGS(L-BFGS)) spark.ml: 机器学习管道高级API Overview: estimators, transformers and pipelines(概览:评估器,转换器和管道) Extracting, transforming and selecting features(抽取,转换和选取特征) Classification and regression(分类和回归) Clustering(聚类) Advanced topics(高级主题) 虽然还有些降维技术在spark.ml中尚不可用,不过用户可以将spark.mllib中的的相关实现和spark.ml中的算法无缝地结合起来。 依赖项 MLlib使用的线性代数代码包是Breeze,而Breeze又依赖于netlib-java优化的数值处理。如果在运行时环境中这些原生库不可用,你将会收到一条警告,而后spark会使用纯JVM实现来替代之。 由于许可限制的原因,spark在默认情况下不会包含netlib-java的原生代理库。如果需要配置netlib-java/Breeze使用其系统优化库,你需要添加依赖项:com.github.fommil.netlib:all:1.1.2(或者在编译时加上参数:-Pnetlib-lgpl),然后再看一看netlib-java相应的安装文档。 要使用MLlib的Python接口,你需要安装NumPy1.4以上的版本。 迁移指南 MLlib目前还在积极的开发当中。所以标记为 Experimental / DeveloperApi 的接口可能在未来发生变化,下面的迁移指南说明了版本升级后的变化。 从1.5升级到1.6 从1.5到1.6,spark.mllib 和 spark.ml 包中并没有重大的API变化,不过有一些行为不再支持或者发生变化。 已经废弃: SPARK-11358: spark.mllib.clustering.KMeans 的runs参数已经废弃 SPARK-10592: spark.ml.classification.LogisticRegressionModel和spark.ml.regresion.LinearRegressionModel 中,weights字段改名为coefficients。这一变动有助于消除歧义,可以和输入给算法的实例(行)权重(weights)区分开来。 行为有变: SPARK-7770:spark.mllib.tree.GradientBoostedTrees:validationTol的语义在1.6中有变。原先其代表误差变化绝对值的一个阈值,而现在它类似于GradientDescent中的convergenceTol:对于较大的误差,使用相对误差(相对于上一次);而对于较小的误差(<0.01),使用绝对误差。 SPARK-11069:spark.ml.feature.RegexTokenizer:以前,在分词之前不会讲字符串转小写。现在的实现是,默认会将字符串转小写,不过有选项可以设为不转。这中实现和Tokenizertransformer的行为相匹配。 Spark老版本 以前版本的迁移指南归档在这里:on this page 要了解更多有关系统优化的好处和背景资料,可以看看Sam Halliday关于ScalaX的演讲:High Performance Linear Algebra in Scala 转载自并发编程网 - ifeve.com

优秀的个人博客,低调大师

《Spark官方文档》Spark Streaming编程指南(二)

累加器和广播变量 首先需要注意的是,累加器(Accumulators)和广播变量(Broadcast variables)是无法从Spark Streaming的检查点中恢复回来的。所以如果你开启了检查点功能,并同时在使用累加器和广播变量,那么你最好是使用懒惰实例化的单例模式,因为这样累加器和广播变量才能在驱动器(driver)故障恢复后重新实例化。代码示例如下: Scala Java Python object WordBlacklist { @volatile private var instance: Broadcast[Seq[String]] = null def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { if (instance == null) { synchronized { if (instance == null) { val wordBlacklist = Seq("a", "b", "c") instance = sc.broadcast(wordBlacklist) } } } instance } } object DroppedWordsCounter { @volatile private var instance: Accumulator[Long] = null def getInstance(sc: SparkContext): Accumulator[Long] = { if (instance == null) { synchronized { if (instance == null) { instance = sc.accumulator(0L, "WordsInBlacklistCounter") } } } instance } } wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => { // 获取现有或注册新的blacklist广播变量 val blacklist = WordBlacklist.getInstance(rdd.sparkContext) // 获取现有或注册新的 droppedWordsCounter 累加器 val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) // 基于blacklist来过滤词,并将过滤掉的词的个数累加到 droppedWordsCounter 中 val counts = rdd.filter { case (word, count) => if (blacklist.value.contains(word)) { droppedWordsCounter += count false } else { true } }.collect() val output = "Counts at time " + time + " " + counts }) 这里有完整代码:source code。 DataFrame和SQL相关算子 在Streaming应用中可以调用DataFrames and SQL来处理流式数据。开发者可以用通过StreamingContext中的SparkContext对象来创建一个SQLContext,并且,开发者需要确保一旦驱动器(driver)故障恢复后,该SQLContext对象能重新创建出来。同样,你还是可以使用懒惰创建的单例模式来实例化SQLContext,如下面的代码所示,这里我们将最开始的那个小栗子做了一些修改,使用DataFrame和SQL来统计单词计数。其实就是,将每个RDD都转化成一个DataFrame,然后注册成临时表,再用SQL查询这些临时表。 Scala Java Python /** streaming应用中调用DataFrame算子 */ val words: DStream[String] = ... words.foreachRDD { rdd => // 获得SQLContext单例 val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) import sqlContext.implicits._ // 将RDD[String] 转为 DataFrame val wordsDataFrame = rdd.toDF("word") // DataFrame注册为临时表 wordsDataFrame.registerTempTable("words") // 再用SQL语句查询,并打印出来 val wordCountsDataFrame = sqlContext.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show() } See the fullsource code. 这里有完整代码:source code。 你也可以在其他线程里执行SQL查询(异步查询,即:执行SQL查询的线程和运行StreamingContext的线程不同)。不过这种情况下,你需要确保查询的时候 StreamingContext 没有把所需的数据丢弃掉,否则StreamingContext有可能已将老的RDD数据丢弃掉了,那么异步查询的SQL语句也可能无法得到查询结果。举个栗子,如果你需要查询上一个批次的数据,但是你的SQL查询可能要执行5分钟,那么你就需要StreamingContext至少保留最近5分钟的数据:streamingContext.remember(Minutes(5)) (这是Scala为例,其他语言差不多) 更多DataFrame和SQL的文档见这里:DataFrames and SQL MLlib算子 MLlib提供了很多机器学习算法。首先,你需要关注的是流式计算相关的机器学习算法(如:Streaming Linear Regression,Streaming KMeans),这些流式算法可以在流式数据上一边学习训练模型,一边用最新的模型处理数据。除此以外,对更多的机器学习算法而言,你需要离线训练这些模型,然后将训练好的模型用于在线的流式数据。详见MLlib。 缓存/持久化 和RDD类似,DStream也支持将数据持久化到内存中。只需要调用 DStream的persist() 方法,该方法内部会自动调用DStream中每个RDD的persist方法进而将数据持久化到内存中。这对于可能需要计算很多次的DStream非常有用(例如:对于同一个批数据调用多个算子)。对于基于滑动窗口的算子,如:reduceByWindow和reduceByKeyAndWindow,或者有状态的算子,如:updateStateByKey,数据持久化就更重要了。因此,滑动窗口算子产生的DStream对象默认会自动持久化到内存中(不需要开发者调用persist)。 对于从网络接收数据的输入数据流(如:Kafka、Flume、socket等),默认的持久化级别会将数据持久化到两个不同的节点上互为备份副本,以便支持容错。 注意,与RDD不同的是,DStream的默认持久化级别是将数据序列化到内存中。进一步的讨论见性能调优这一小节。关于持久化级别(或者存储级别)的更详细说明见Spark编程指南(Spark Programming Guide)。 检查点 一般来说Streaming 应用都需要7*24小时长期运行,所以必须对一些与业务逻辑无关的故障有很好的容错(如:系统故障、JVM崩溃等)。对于这些可能性,Spark Streaming 必须在检查点保存足够的信息到一些可容错的外部存储系统中,以便能够随时从故障中恢复回来。所以,检查点需要保存以下两种数据: 元数据检查点(Metadata checkpointing)– 保存流式计算逻辑的定义信息到外部可容错存储系统(如:HDFS)。主要用途是用于在故障后回复应用程序本身(后续详谈)。元数包括: Configuration– 创建Streaming应用程序的配置信息。 DStream operations– 定义流式处理逻辑的DStream操作信息。 Incomplete batches– 已经排队但未处理完的批次信息。 数据检查点(Data checkpointing)– 将生成的RDD保存到可靠的存储中。这对一些需要跨批次组合数据或者有状态的算子来说很有必要。在这种转换算子中,往往新生成的RDD是依赖于前几个批次的RDD,因此随着时间的推移,有可能产生很长的依赖链条。为了避免在恢复数据的时候需要恢复整个依赖链条上所有的数据,检查点需要周期性地保存一些中间RDD状态信息,以斩断无限制增长的依赖链条和恢复时间。 总之,元数据检查点主要是为了恢复驱动器节点上的故障,而数据或RDD检查点是为了支持对有状态转换操作的恢复。 何时启用检查点 如果有以下情况出现,你就必须启用检查点了: 使用了有状态的转换算子(Usage of stateful transformations)– 不管是用了 updateStateByKey 还是用了 reduceByKeyAndWindow(有”反归约”函数的那个版本),你都必须配置检查点目录来周期性地保存RDD检查点。 支持驱动器故障中恢复(Recovering from failures of the driver running the application)– 这时候需要元数据检查点以便恢复流式处理的进度信息。 注意,一些简单的流式应用,如果没有用到前面所说的有状态转换算子,则完全可以不开启检查点。不过这样的话,驱动器(driver)故障恢复后,有可能会丢失部分数据(有些已经接收但还未处理的数据可能会丢失)。不过通常这点丢失时可接受的,很多Spark Streaming应用也是这样运行的。对非Hadoop环境的支持未来还会继续改进。 如何配置检查点 检查点的启用,只需要设置好保存检查点信息的检查点目录即可,一般会会将这个目录设为一些可容错的、可靠性较高的文件系统(如:HDFS、S3等)。开发者只需要调用 streamingContext.checkpoint(checkpointDirectory)。设置好检查点,你就可以使用前面提到的有状态转换算子了。另外,如果你需要你的应用能够支持从驱动器故障中恢复,你可能需要重写部分代码,实现以下行为: 如果程序是首次启动,就需要new一个新的StreamingContext,并定义好所有的数据流处理,然后调用StreamingContext.start()。 如果程序是故障后重启,就需要从检查点目录中的数据中重新构建StreamingContext对象。 Scala Java Python 不过这个行为可以用StreamingContext.getOrCreate来实现,示例如下: // 首次创建StreamingContext并定义好数据流处理逻辑 def functionToCreateContext(): StreamingContext = { val ssc = new StreamingContext(...) // 新建一个StreamingContext对象 val lines = ssc.socketTextStream(...) // 创建DStreams ... ssc.checkpoint(checkpointDirectory) // 设置好检查点目录 ssc } // 创建新的StreamingContext对象,或者从检查点构造一个 val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) // 无论是否是首次启动都需要设置的工作在这里 context. ... // 启动StreamingContext对象 context.start() context.awaitTermination() 如果 checkpointDirectory 目录存在,则context对象会从检查点数据重新构建出来。如果该目录不存在(如:首次运行),则 functionToCreateContext 函数会被调用,创建一个新的StreamingContext对象并定义好DStream数据流。完整的示例请参见RecoverableNetworkWordCount,这个例子会将网络数据中的单词计数统计结果添加到一个文件中。 除了使用getOrCreate之外,开发者还需要确保驱动器进程能在故障后重启。这一点只能由应用的部署环境基础设施来保证。进一步的讨论见部署(Deployment)这一节。 另外需要注意的是,RDD检查点会增加额外的保存数据的开销。这可能会导致数据流的处理时间变长。因此,你必须仔细的调整检查点间隔时间。如果批次间隔太小(比如:1秒),那么对每个批次保存检查点数据将大大减小吞吐量。另一方面,检查点保存过于频繁又会导致血统信息和任务个数的增加,这同样会影响系统性能。对于需要RDD检查点的有状态转换算子,默认的间隔是批次间隔的整数倍,且最小10秒。开发人员可以这样来自定义这个间隔:dstream.checkpoint(checkpointInterval)。一般推荐设为批次间隔时间的5~10倍。 部署应用 本节中将主要讨论一下如何部署Spark Streaming应用。 前提条件 要运行一个Spark Streaming 应用,你首先需要具备以下条件: 集群以及集群管理器 – 这是一般Spark应用的基本要求,详见deployment guide。 给Spark应用打个JAR包 – 你需要将你的应用打成一个JAR包。如果使用spark-submit提交应用,那么你不需要提供Spark和Spark Streaming的相关JAR包。但是,如果你使用了高级数据源(advanced sources– 如:Kafka、Flume、Twitter等),那么你需要将这些高级数据源相关的JAR包及其依赖一起打包并部署。例如,如果你使用了TwitterUtils,那么就必须将spark-streaming-twitter_2.10及其相关依赖都打到应用的JAR包中。 为执行器(executor)预留足够的内存 – 执行器必须配置预留好足够的内存,因为接受到的数据都得存在内存里。注意,如果某些窗口长度达到10分钟,那也就是说你的系统必须知道保留10分钟的数据在内存里。可见,到底预留多少内存是取决于你的应用处理逻辑的。 配置检查点 – 如果你的流式应用需要检查点,那么你需要配置一个Hadoop API兼容的可容错存储目录作为检查点目录,流式应用的信息会写入这个目录,故障恢复时会用到这个目录下的数据。详见前面的检查点小节。 配置驱动程序自动重启 – 流式应用自动恢复的前提就是,部署基础设施能够监控驱动器进程,并且能够在其故障时,自动重启之。不同的集群管理器有不同的工具来实现这一功能: Spark独立部署 – Spark独立部署集群可以支持将Spark应用的驱动器提交到集群的某个worker节点上运行。同时,Spark的集群管理器可以对该驱动器进程进行监控,一旦驱动器退出且返回非0值,或者因worker节点原始失败,Spark集群管理器将自动重启这个驱动器。详见Spark独立部署指南(Spark Standalone guide)。 YARN – YARN支持和独立部署类似的重启机制。详细请参考YARN的文档。 Mesos – Mesos上需要用Marathon来实现这一功能。 配置WAL(write ahead log)- 从Spark 1.2起,我们引入了write ahead log来提高容错性。如果启用这个功能,则所有接收到的数据都会以write ahead log形式写入配置好的检查点目录中。这样就能确保数据零丢失(容错语义有详细的讨论)。用户只需将 spark.streaming.receiver.writeAheadLog 设为true。不过,这同样可能会导致接收器的吞吐量下降。不过你可以启动多个接收器并行接收数据,从而提升整体的吞吐量(more receivers in parallel)。另外,建议在启用WAL后禁用掉接收数据多副本功能,因为WAL其实已经是存储在一个多副本存储系统中了。你只需要把存储级别设为 StorageLevel.MEMORY_AND_DISK_SER。如果是使用S3(或者其他不支持flushing的文件系统)存储WAL,一定要记得启用这两个标识:spark.streaming.driver.writeAheadLog.closeFileAfterWrite 和 spark.streaming.receiver.writeAheadLog.closeFileAfterWrite。更详细请参考:Spark Streaming Configuration。 设置好最大接收速率 – 如果集群可用资源不足以跟上接收数据的速度,那么可以在接收器设置一下最大接收速率,即:每秒接收记录的条数。相关的主要配置有:spark.streaming.receiver.maxRate,如果使用Kafka Direct API 还需要设置 spark.streaming.kafka.maxRatePerPartition。从Spark 1.5起,我们引入了backpressure的概念来动态地根据集群处理速度,评估并调整该接收速率。用户只需将 spark.streaming.backpressure.enabled设为true即可启用该功能。 升级应用代码 升级Spark Streaming应用程序代码,可以使用以下两种方式: 新的Streaming程序和老的并行跑一段时间,新程序完成初始化以后,再关闭老的。注意,这种方式适用于能同时发送数据到多个目标的数据源(即:数据源同时将数据发给新老两个Streaming应用程序)。 老程序能够优雅地退出(参考StreamingContext.stop(...)orJavaStreamingContext.stop(...)),即:确保所收到的数据都已经处理完毕后再退出。然后再启动新的Streaming程序,而新程序将接着在老程序退出点上继续拉取数据。注意,这种方式需要数据源支持数据缓存(或者叫数据堆积,如:Kafka、Flume),因为在新旧程序交接的这个空档时间,数据需要在数据源处缓存。目前还不能支持从检查点重启,因为检查点存储的信息包含老程序中的序列化对象信息,在新程序中将其反序列化可能会出错。这种情况下,只能要么指定一个新的检查点目录,要么删除老的检查点目录。 应用监控 除了Spark自身的监控能力(monitoring capabilities)之外,对Spark Streaming还有一些额外的监控功能可用。如果实例化了StreamingContext,那么你可以在Spark web UI上看到多出了一个Streaming tab页,上面显示了正在运行的接收器(是否活跃,接收记录的条数,失败信息等)和处理完的批次信息(批次处理时间,查询延时等)。这些信息都可以用来监控streaming应用。 web UI上有两个度量特别重要: 批次处理耗时(Processing Time)– 处理单个批次耗时 批次调度延时(Scheduling Delay)-各批次在队列中等待时间(等待上一个批次处理完) 如果批次处理耗时一直比批次间隔时间大,或者批次调度延时持续上升,就意味着系统处理速度跟不上数据接收速度。这时候你就得考虑一下怎么把批次处理时间降下来(reducing)。 Spark Streaming程序的处理进度可以用StreamingListener接口来监听,这个接口可以监听到接收器的状态和处理时间。不过需要注意的是,这是一个developer API接口,换句话说这个接口未来很可能会变动(可能会增加更多度量信息)。 性能调优 要获得Spark Streaming应用的最佳性能需要一点点调优工作。本节将深入解释一些能够改进Streaming应用性能的配置和参数。总体上来说,你需要考虑这两方面的事情: 提高集群资源利用率,减少单批次处理耗时。 设置合适的批次大小,以便使数据处理速度能跟上数据接收速度。 减少批次处理时间 有不少优化手段都可以减少Spark对每个批次的处理时间。细节将在优化指南(Tuning Guide)中详谈。这里仅列举一些最重要的。 数据接收并发度 跨网络接收数据(如:从Kafka、Flume、socket等接收数据)需要在Spark中序列化并存储数据。 如果接收数据的过程是系统瓶颈,那么可以考虑增加数据接收的并行度。注意,每个输入DStream只包含一个单独的接收器(receiver,运行约worker节点),每个接收器单独接收一路数据流。所以,配置多个输入DStream就能从数据源的不同分区分别接收多个数据流。例如,可以将从Kafka拉取两个topic的数据流分成两个Kafka输入数据流,每个数据流拉取其中一个topic的数据,这样一来会同时有两个接收器并行地接收数据,因而增加了总体的吞吐量。同时,另一方面我们又可以把这些DStream数据流合并成一个,然后可以在合并后的DStream上使用任何可用的transformation算子。示例代码如下: Scala Java Python val numStreams = 5 val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) } val unifiedStream = streamingContext.union(kafkaStreams) unifiedStream.print() 另一个可以考虑优化的参数就是接收器的阻塞间隔,该参数由配置参数(configuration parameter)spark.streaming.blockInterval决定。大多数接收器都会将数据合并成一个个数据块,然后再保存到spark内存中。对于map类算子来说,每个批次中数据块的个数将会决定处理这批数据并行任务的个数,每个接收器每批次数据处理任务数约等于 (批次间隔 / 数据块间隔)。例如,对于2秒的批次间隔,如果数据块间隔为200ms,则创建的并发任务数为10。如果任务数太少(少于单机cpu core个数),则资源利用不够充分。如需增加这个任务数,对于给定的批次间隔来说,只需要减少数据块间隔即可。不过,我们还是建议数据块间隔至少要50ms,否则任务的启动开销占比就太高了。 另一个切分接收数据流的方法是,显示地将输入数据流划分为多个分区(使用 inputStream.repartition(<number of partitions>))。该操作会在处理前,将数据散开重新分发到集群中多个节点上。 数据处理并发度 在计算各个阶段(stage)中,任何一个阶段的并发任务数不足都有可能造成集群资源利用率低。例如,对于reduce类的算子,如:reduceByKey 和 reduceByKeyAndWindow,其默认的并发任务数是由 spark.default.parallelism 决定的。你既可以修改这个默认值(spark.default.parallelism),也可以通过参数指定这个并发数量(见PairDStreamFunctions)。 数据序列化 调整数据的序列化格式可以大大减少数据序列化的开销。在spark Streaming中主要有两种类型的数据需要序列化: 输入数据: 默认地,接收器收到的数据是以StorageLevel.MEMORY_AND_DISK_SER_2的存储级别存储到执行器(executor)内存中的。也就是说,收到的数据会被序列化以减少GC开销,同时保存两个副本以容错。同时,数据会优先保存在内存里,当内存不足时才吐出到磁盘上。很明显,这个过程中会有数据序列化的开销 – 接收器首先将收到的数据反序列化,然后再以spark所配置指定的格式来序列化数据。 Streaming算子所生产的持久化的RDDs: Streaming计算所生成的RDD可能会持久化到内存中。例如,基于窗口的算子会将数据持久化到内存,因为窗口数据可能会多次处理。所不同的是,spark core默认用StorageLevel.MEMORY_ONLY级别持久化RDD数据,而spark streaming默认使用StorageLevel.MEMORY_ONLY_SER级别持久化接收到的数据,以便尽量减少GC开销。 不管是上面哪一种数据,都可以使用Kryo序列化来减少CPU和内存开销,详见Spark Tuning Guide。另,对于Kryo,你可以考虑这些优化:注册自定义类型,禁用对象引用跟踪(详见Configuration Guide)。 在一些特定的场景下,如果数据量不是很大,那么你可以考虑不用序列化格式,不过你需要注意的是取消序列化是否会导致大量的GC开销。例如,如果你的批次间隔比较短(几秒)并且没有使用基于窗口的算子,这种情况下你可以考虑禁用序列化格式。这样可以减少序列化的CPU开销以优化性能,同时GC的增长也不多。 任务启动开销 如果每秒启动的任务数过多(比如每秒50个以上),那么将任务发送给slave节点的开销会明显增加,那么你也就很难达到亚秒级(sub-second)的延迟。不过以下两个方法可以减少任务的启动开销: 任务序列化(Task Serialization): 使用Kryo来序列化任务,以减少任务本身的大小,从而提高发送任务的速度。任务的序列化格式是由 spark.closure.serializer 属性决定的。不过,目前还不支持闭包序列化,未来的版本可能会增加对此的支持。 执行模式(Execution mode): Spark独立部署或者Mesos粗粒度模式下任务的启动时间比Mesos细粒度模式下的任务启动时间要短。详见Running on Mesos guide。 这些调整有可能能够减少100ms的批次处理时间,这也使得亚秒级的批次间隔成为可能。 设置合适的批次间隔 要想streaming应用在集群上稳定运行,那么系统处理数据的速度必须能跟上其接收数据的速度。换句话说,批次数据的处理速度应该和其生成速度一样快。对于特定的应用来说,可以从其对应的监控(monitoring)页面上观察验证,页面上显示的处理耗时应该要小于批次间隔时间。 根据spark streaming计算的性质,在一定的集群资源限制下,批次间隔的值会极大地影响系统的数据处理能力。例如,在WordCountNetwork示例中,对于特定的数据速率,一个系统可能能够在批次间隔为2秒时跟上数据接收速度,但如果把批次间隔改为500毫秒系统可能就处理不过来了。所以,批次间隔需要谨慎设置,以确保生产系统能够处理得过来。 要找出适合的批次间隔,你可以从一个比较保守的批次间隔值(如5~10秒)开始测试。要验证系统是否能跟上当前的数据接收速率,你可能需要检查一下端到端的批次处理延迟(可以看看Spark驱动器log4j日志中的Total delay,也可以用StreamingListener接口来检测)。如果这个延迟能保持和批次间隔差不多,那么系统基本就是稳定的。否则,如果这个延迟持久在增长,也就是说系统跟不上数据接收速度,那也就意味着系统不稳定。一旦系统文档下来后,你就可以尝试提高数据接收速度,或者减少批次间隔值。不过需要注意,瞬间的延迟增长可以只是暂时的,只要这个延迟后续会自动降下来就没有问题(如:降到小于批次间隔值) 内存调优 Spark应用内存占用和GC调优已经在调优指南(Tuning Guide)中有详细的讨论。墙裂建议你读一读那篇文档。本节中,我们只是讨论一下几个专门用于Spark Streaming的调优参数。 Spark Streaming应用在集群中占用的内存量严重依赖于具体所使用的tranformation算子。例如,如果想要用一个窗口算子操纵最近10分钟的数据,那么你的集群至少需要在内存里保留10分钟的数据;另一个例子是updateStateByKey,如果key很多的话,相对应的保存的key的state也会很多,而这些都需要占用内存。而如果你的应用只是做一个简单的 “映射-过滤-存储”(map-filter-store)操作的话,那需要的内存就很少了。 一般情况下,streaming接收器接收到的数据会以StorageLevel.MEMORY_AND_DISK_SER_2 这个存储级别存到spark中,也就是说,如果内存装不下,数据将被吐到磁盘上。数据吐到磁盘上会大大降低streaming应用的性能,因此还是建议根据你的应用处理的数据量,提供充足的内存。最好就是,一边小规模地放大内存,再观察评估,然后再放大,再评估。 另一个内存调优的方向就是垃圾回收。因为streaming应用往往都需要低延迟,所以肯定不希望出现大量的或耗时较长的JVM垃圾回收暂停。 以下是一些能够帮助你减少内存占用和GC开销的参数或手段: DStream持久化级别(Persistence Level of DStreams): 前面数据序列化(Data Serialization)这小节已经提到过,默认streaming的输入RDD会被持久化成序列化的字节流。相对于非序列化数据,这样可以减少内存占用和GC开销。如果启用Kryo序列化,还能进一步减少序列化数据大小和内存占用量。如果你还需要进一步减少内存占用的话,可以开启数据压缩(通过spark.rdd.compress这个配置设定),只不过数据压缩会增加CPU消耗。 清除老数据(Clearing old data): 默认情况下,所有的输入数据以及DStream的transformation算子产生的持久化RDD都是自动清理的。Spark Streaming会根据所使用的transformation算子来清理老数据。例如,你用了一个窗口操作处理最近10分钟的数据,那么Spark Streaming会保留至少10分钟的数据,并且会主动把更早的数据都删掉。当然,你可以设置 streamingContext.remember 以保留更长时间段的数据(比如:你可能会需要交互式地查询更老的数据)。 CMS垃圾回收器(CMS Garbage Collector): 为了尽量减少GC暂停的时间,我们墙裂建议使用CMS垃圾回收器(concurrent mark-and-sweep GC)。虽然CMS GC会稍微降低系统的总体吞吐量,但我们仍建议使用它,因为CMS GC能使批次处理的时间保持在一个比较恒定的水平上。最后,你需要确保在驱动器(通过spark-submit中的–driver-java-options设置)和执行器(使用spark.executor.extraJavaOptions配置参数)上都设置了CMS GC。 其他提示: 如果还想进一步减少GC开销,以下是更进一步的可以尝试的手段: 配合Tachyon使用堆外内存来持久化RDD。详见Spark编程指南(Spark Programming Guide) 使用更多但是更小的执行器进程。这样GC压力就会分散到更多的JVM堆中。 容错语义 本节中,我们将讨论Spark Streaming应用在出现失败时的具体行为。 背景 要理解Spark Streaming所提供的容错语义,我们首先需要回忆一下Spark RDD所提供的基本容错语义。 RDD是不可变的,可重算的,分布式数据集。每个RDD都记录了其创建算子的血统信息,其中每个算子都以可容错的数据集作为输入数据。 如果RDD的某个分区因为节点失效而丢失,则该分区可以根据RDD的血统信息以及相应的原始输入数据集重新计算出来。 假定所有RDD transformation算子计算过程都是确定性的,那么通过这些算子得到的最终RDD总是包含相同的数据,而与Spark集群的是否故障无关。 Spark主要操作一些可容错文件系统的数据,如:HDFS或S3。因此,所有从这些可容错数据源产生的RDD也是可容错的。然而,对于Spark Streaming并非如此,因为多数情况下Streaming需要从网络远端接收数据,这回导致Streaming的数据源并不可靠(尤其是对于使用了fileStream的应用)。要实现RDD相同的容错属性,数据接收就必须用多个不同worker节点上的Spark执行器来实现(默认副本因子是2)。因此一旦出现故障,系统需要恢复两种数据: 接收并保存了副本的数据– 数据不会因为单个worker节点故障而丢失,因为有副本! 接收但尚未保存副本数据– 因为数据并没有副本,所以一旦故障,只能从数据源重新获取。 此外,还有两种可能的故障类型需要考虑: Worker节点故障– 任何运行执行器的worker节点一旦故障,节点上内存中的数据都会丢失。如果这些节点上有接收器在运行,那么其包含的缓存数据也会丢失。 Driver节点故障– 如果Spark Streaming的驱动节点故障,那么很显然SparkContext对象就没了,所有执行器及其内存数据也会丢失。 有了以上这些基本知识,下面我们就进一步了解一下Spark Streaming的容错语义。 定义 流式系统的可靠度语义可以据此来分类:单条记录在系统中被处理的次数保证。一个流式系统可能提供保证必定是以下三种之一(不管系统是否出现故障): 至多一次(At most once): 每条记录要么被处理一次,要么就没有处理。 至少一次(At least once): 每条记录至少被处理过一次(一次或多次)。这种保证能确保没有数据丢失,比“至多一次”要强。但有可能出现数据重复。 精确一次(Exactly once): 每条记录都精确地只被处理一次 – 也就是说,既没有数据丢失,也不会出现数据重复。这是三种保证中最强的一种。 基础语义 任何流式处理系统一般都会包含以下三个数据处理步骤: 数据接收(Receiving the data): 从数据源拉取数据。 数据转换(Transforming the data): 将接收到的数据进行转换(使用DStream和RDD transformation算子)。 数据推送(Pushing out the data): 将转换后最终数据推送到外部文件系统,数据库或其他展示系统。 如果Streaming应用需要做到端到端的“精确一次”的保证,那么就必须在以上三个步骤中各自都保证精确一次:即,每条记录必须,只接收一次、处理一次、推送一次。下面让我们在Spark Streaming的上下文环境中来理解一下这三个步骤的语义: 数据接收: 不同数据源提供的保证不同,下一节再详细讨论。 数据转换: 所有的数据都会被“精确一次”处理,这要归功于RDD提供的保障。即使出现故障,只要数据源还能访问,最终所转换得到的RDD总是包含相同的内容。 数据推送: 输出操作默认保证“至少一次”的语义,是否能“精确一次”还要看所使用的输出算子(是否幂等)以及下游系统(是否支持事务)。不过用户也可以开发自己的事务机制来实现“精确一次”语义。这个后续会有详细讨论。 接收数据语义 不同的输入源提供不同的数据可靠性级别,从“至少一次”到“精确一次”。 从文件接收数据 如果所有的输入数据都来源于可容错的文件系统,如HDFS,那么Spark Streaming就能在任何故障中恢复并处理所有的数据。这种情况下就能保证精确一次语义,也就是说不管出现什么故障,所有的数据总是精确地只处理一次,不多也不少。 基于接收器接收数据 对于基于接收器的输入源,容错语义将同时依赖于故障场景和接收器类型。前面也已经提到过,spark Streaming主要有两种类型的接收器: 可靠接收器– 这类接收器会在数据接收并保存好副本后,向可靠数据源发送确认信息。这类接收器故障时,是不会给缓存的(已接收但尚未保存副本)数据发送确认信息。因此,一旦接收器重启,没有收到确认的数据,会重新从数据源再获取一遍,所以即使有故障也不会丢数据。 不可靠接收器– 这类接收器不会发送确认信息,因此一旦worker和driver出现故障,就有可能会丢失数据。 对于不同的接收器,我们可以获得如下不同的语义。如果一个worker节点故障了,对于可靠接收器来书,不会有数据丢失。而对于不可靠接收器,缓存的(接收但尚未保存副本)数据可能会丢失。如果driver节点故障了,除了接收到的数据之外,其他的已经接收且已经保存了内存副本的数据都会丢失,这将会影响有状态算子的计算结果。 为了避免丢失已经收到且保存副本的数,从 spark 1.2 开始引入了WAL(write ahead logs),以便将这些数据写入到可容错的存储中。只要你使用可靠接收器,同时启用WAL(write ahead logs enabled),那么久再也不用为数据丢失而担心了。并且这时候,还能提供“至少一次”的语义保证。 下表总结了故障情况下的各种语义: 部署场景 Worker 故障 Driver 故障 Spark 1.1及以前版本 或者 Spark 1.2及以后版本,且未开启WAL 若使用不可靠接收器,则可能丢失缓存(已接收但尚未保存副本)数据; 若使用可靠接收器,则没有数据丢失,且提供至少一次处理语义 若使用不可靠接收器,则缓存数据和已保存数据都可能丢失; 若使用可靠接收器,则没有缓存数据丢失,但已保存数据可能丢失,且不提供语义保证 Spark 1.2及以后版本,并启用WAL 若使用可靠接收器,则没有数据丢失,且提供至少一次语义保证 若使用可靠接收器和文件,则无数据丢失,且提供至少一次语义保证 从Kafka Direct API接收数据 从Spark 1.3开始,我们引入Kafka Direct API,该API能为Kafka数据源提供“精确一次”语义保证。有了这个输入API,再加上输出算子的“精确一次”保证,你就能真正实现端到端的“精确一次”语义保证。(改功能截止Spark 1.6.1还是实验性的)更详细的说明见:Kafka Integration Guide。 输出算子的语义 输出算子(如 foreachRDD)提供“至少一次”语义保证,也就是说,如果worker故障,单条输出数据可能会被多次写入外部实体中。不过这对于文件系统来说是可以接受的(使用saveAs***Files 多次保存文件会覆盖之前的),所以我们需要一些额外的工作来实现“精确一次”语义。主要有两种实现方式: 幂等更新(Idempotent updates): 就是说多次操作,产生的结果相同。例如,多次调用saveAs***Files保存的文件总是包含相同的数据。 事务更新(Transactional updates): 所有的更新都是事务性的,这样一来就能保证更新的原子性。以下是一种实现方式: 用批次时间(在foreachRDD中可用)和分区索引创建一个唯一标识,该标识代表流式应用中唯一的一个数据块。 基于这个标识建立更新事务,并使用数据块数据更新外部系统。也就是说,如果该标识未被提交,则原子地将标识代表的数据更新到外部系统。否则,就认为该标识已经被提交,直接忽略之。dstream.foreachRDD { (rdd, time) => rdd.foreachPartition { partitionIterator => val partitionId = TaskContext.get.partitionId() val uniqueId = generateUniqueId(time.milliseconds, partitionId) // 使用uniqueId作为事务的唯一标识,基于uniqueId实现partitionIterator所指向数据的原子事务提交 } } 迁移指南 – 从0.9.1及以下升级到1.x 在Spark 0.9.1和Spark 1.0之间,有一些API接口变更,变更目的是为了保障未来版本API的稳定。本节将详细说明一下从已有版本迁移升级到1.0所需的工作。 输入DStream(Input DStreams): 所有创建输入流的算子(如:StreamingContext.socketStream, FlumeUtils.createStream 等)的返回值不再是DStream(对Java来说是JavaDStream),而是InputDStream/ReceiverInputDStream(对Java来说是JavaInputDStream/JavaPairInputDStream/JavaReceiverInputDStream/JavaPairReceiverInputDStream)。这样才能确保特定输入流的功能能够在未来持续增加到这些class中,而不会打破二进制兼容性。注意,已有的Spark Streaming应用应该不需要任何代码修改(新的返回类型都是DStream的子类),只不过需要基于Spark 1.0重新编译一把。 定制网络接收器(Custom Network Receivers): 自从Spark Streaming发布以来,Scala就能基于NetworkReceiver来定制网络接收器。但由于错误处理和汇报API方便的限制,该类型不能在Java中使用。所以Spark 1.0开始,用Receiver来替换掉这个NetworkReceiver,主要的好处如下: 该类型新增了stop和restart方法,便于控制接收器的生命周期。详见custom receiver guide。 定制接收器用Scala和Java都能实现。 为了将已有的基于NetworkReceiver的自定义接收器迁移到Receiver上来,你需要如下工作: 首先你的自定义接收器类型需要从org.apache.spark.streaming.receiver.Receiver继承,而不再是org.apache.spark.streaming.dstream.NetworkReceiver。 原先,我们需要在自定义接收器中创建一个BlockGenerator来保存接收到的数据。你必须显示的实现onStart() 和 onStop() 方法。而在新的Receiver class中,这些都不需要了,你只需要调用它的store系列的方法就能将数据保存到Spark中。所以你接下来需要做的迁移工作就是,删除BlockGenerator对象(这个类型在Spark 1.0之后也没有了~),然后用store(…)方法来保存接收到的数据。 基于Actor的接收器(Actor-based Receivers): 从actor class继承后,并实org.apache.spark.streaming.receiver.Receiver后,即可从Akka Actors中获取数据。获取数据的类被重命名为org.apache.spark.streaming.receiver.ActorHelper,而保存数据的pushBlocks(…)方法也被重命名为 store(…)。其他org.apache.spark.streaming.receivers包中的工具类也被移到org.apache.spark.streaming.receiver包下并重命名,新的类名应该比之前更加清晰。 下一步 其他相关参考文档 Kafka Integration Guide Flume Integration Guide Kinesis Integration Guide Custom Receiver Guide API文档 Scala 文档 StreamingContext和DStream KafkaUtils,FlumeUtils,KinesisUtils,TwitterUtils,ZeroMQUtils, 以及MQTTUtils Java 文档 JavaStreamingContext,JavaDStream以及JavaPairDStream KafkaUtils,FlumeUtils,KinesisUtilsTwitterUtils,ZeroMQUtils, 以及MQTTUtils Python 文档 StreamingContext和DStream KafkaUtils 其他示例:Scala,Java以及Python Spark Streaming相关的Paper和video。 转载自 并发编程网 - ifeve.com

优秀的个人博客,低调大师

《Spark 官方文档》在YARN上运行Spark

在YARN上运行Spark 对YARN (Hadoop NextGen)的支持是从Spark-0.6.0开始的,后续的版本也一直持续在改进。 在YARN上启动 首先确保 HADOOP_CONF_DIR 或者 YARN_CONF_DIR 变量指向一个包含Hadoop集群客户端配置文件的目录。这些配置用于读写HDFS和连接YARN资源管理器(ResourceManager)。这些配置应该发布到YARN集群上所有的节点,这样所有的YARN容器才能使用同样的配置。如果这些配置引用了Java系统属性或者其他不属于YARN管理的环境变量,那么这些属性和变量也应该在Spark应用的配置中设置(包括驱动器、执行器,以及其AM【运行于client模式时的YARN Application Master】) 在YARN上启动Spark应用有两种模式。在cluster模式下,Spark驱动器(driver)在YARNApplication Master中运行(运行于集群中),因此客户端可以在Spark应用启动之后关闭退出。而client模式下,Spark驱动器在客户端进程中,这时的YARNApplication Master只用于向YARN申请资源。 与独立部署(Spark standalone)或 在Mesos集群中不同,YARN的master地址不是在–master参数中指定的,而是在Hadoop配置文件中设置。因此,这种情况下,–master只需设置为yarn。 以下用cluster模式启动一个Spark应用: $ ./bin/spark-submit --class path.to.your.Class \ --master yarn \ --deploy-mode cluster \ [options] \ <app jar> [app options] 例如: $ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ --driver-memory 4g \ --executor-memory 2g \ --executor-cores 1 \ --queue thequeue \ lib/spark-examples*.jar 10 以上例子中,启动了一个YARN客户端程序,使用默认的Application Master。而后SparkPi在Application Master中的子线程中运行。客户端会周期性的把Application Master的状态信息拉取下来,并更新到控制台。客户端会在你的应用程序结束后退出。参考“调试你的应用”,这一节说明了如何查看驱动器和执行器的日志。 要以client模式启动一个spark应用,只需在上面的例子中把cluster换成client。下面这个例子就是以client模式启动spark-shell: $ ./bin/spark-shell --master yarn --deploy-mode client 增加其他JAR包 在cluster模式下,驱动器不在客户端机器上运行,所以SparkContext.addJar添加客户端本地文件就不好使了。要使客户端上本地文件能够用SparkContext.addJar来添加,可以用–jars选项: $ ./bin/spark-submit --class my.main.Class \ --master yarn \ --deploy-mode cluster \ --jars my-other-jar.jar,my-other-other-jar.jar my-main-jar.jar app_arg1 app_arg2 准备 在YARN上运行Spark需要其二进制发布包构建的时候增加YARN支持。二进制发布包可以在这里下载:downloads page。 想要自己编译,参考这里:Building Spark 配置 大多数配置,对于YARN或其他集群模式下,都是一样的。详细请参考这里:configuration page。 以下是YARN上专有的配置项。 调试你的应用 在YARN术语集中,执行器和Application Master在容器(container)中运行。YARN在一个应用程序结束后,有两种处理容器日志的模式。如果开启了日志聚合(yarn.log-aggregation-enable),那么容器日志将被复制到HDFS,并删除本地日志。而后这些日志可以在集群任何节点上用yarn logs命令查看: yarn logs -applicationId <app ID> 以上命令,将会打印出指定应用的所有日志文件的内容。你也可以直接在HDFS上查看这些日志(HDFS shell或者HDFS API)。这些目录可以在你的YARN配置中指定(yarn.nodemanager.remote-app-log-dir和yarn.nodemanager-remote-app-log-dir-suffix)。这些日志同样还可以在Spark Web UI上Executors tab页查看。当然,你需要启动Spark history server和 MapReduce history server,再在 yarn-site.xml 中配置好yarn.log.server.url。Spark history server UI 将把你重定向到MapReduce history server 以查看这些聚合日志。 如果日志聚合没有开启,那么日志文件将在每台机器上的 YARN_APP_LOGS_DIR 目录保留,通常这个目录指向 /tmp/logs 或者 $HADOOP_HOME/log/userlogs(这取决于Hadoop版本和安全方式)。查看日志的话,需要到每台机器上查看这些目录。子目录是按 application ID 和 container ID来组织的。这些日志同样可以在 Spark Web UI 上 Executors tab 页查看,而且这时你不需要运行MapReduce history server。 如果需要检查各个容器的启动环境,可以先把 yarn.nodemanager.delete.debug-delay-sec 增大(如:36000),然后访问应用缓存目录yarn.nodemanager.local-dirs,这时容器的启动目录。这里包含了启动脚本、jar包以及容器启动所用的所有环境变量。这对调试 classpath 相关问题尤其有用。(注意,启用这个需要管理员权限,并重启所有的node managers,因此,对托管集群不适用) 要自定义Application Master或执行器的 log4j 配置,有如下方法: 通过spark-submit –files 上传一个自定义的 log4j.properties 文件。 在 spark.driver.extraJavaOptions(对Spark驱动器)或者 spark.executor.extraJavaOptions(对Spark执行器)增加 -Dlog4j.configuration=<location of configuration file>。注意,如果使用文件,那么 file: 协议头必须显式写上,且文件必须在所节点上都存在。 更新${SPARK_CONF_DIR}/log4j.properties 文件以及其他配置。注意,如果在多个地方都配置了log4j,那么上面其他两种方法的配置优先级比本方法要高。 注意,第一种方法中,执行器和Application Master共享同一个log4j配置,在有些环境下(AM和执行器在同一个节点上运行)可能会有问题(例如,AM和执行器日志都写入到同一个日志文件) 如果你需要引用YARN放置日志文件的路径,以便YARN可以正确地展示和聚合日志,请在log4j.properties文件中使用spark.yarn.app.container.log.dir。例如,log4j.appender.file_appender.File=${spark.yarn.app.container.log.dir}/spark.log 。对于流式应用,可以配置RollingFileAppender,并将文件路径设置为YARN日志目录,以避免磁盘打满,而且这些日志还可以利用YARN的日志工具访问和查看。 Spark属性 Property Name Default Meaning spark.yarn.am.memory 512m YARN Application Master在client模式下, 使用内存总量,与JVM内存设置格式相同(如:512m,2g)。 如果是cluster模式下,请设置 spark.driver.memory。 注意使用小写的后缀, 如:k、m、g、t、p,分别代表 kibi-, mebi, gibi-, tebi- 以及pebibytes。 spark.driver.cores 1 YARN cluster模式下,驱动器使用的CPU core个数。 在cluster模式下,驱动器(driver)和YARN AM(application master)使用相同的JVM,所以这个属性也可以用来控制YARN AM。 如果是client模式下,请使用spark.yarn.am.cores来控制YARN AM的CPU core个数。 spark.yarn.am.cores 1 client模式下,用来控制YARN AM的CPU core个数。 cluster模式下,请使用 spark.driver.cores。 spark.yarn.am.waitTime 100s 在cluster模式下,该属性表示YARN AM等待SparkContext初始化的时间。 在client模式下,该属性表示YARN AM等待驱动器连接的时间。 spark.yarn.submit.file .replication 默认的HDFS副本数(通常是3) HDFS文件副本数。包括Spark jar,app jar以及其他分布式缓存文件和存档。 spark.yarn.preserve .staging.files false 设为true以保存stage相关文件(stage相关的jar包和缓存)到作业结束,而不是立即删除。 spark.yarn.scheduler .heartbeat.interval-ms 3000 Spark AM发送给YARN资源管理器心跳的间隔(ms)。 这个值最多不能超过YARN配置的超时间隔的一半。(yarn.am.liveness-monitor.expiry-interval-ms) spark.yarn.scheduler .initial-allocation.interval 200ms Spark AM的初始带外心跳间隔(有待定的资源申请时)。 其值不应该大于 spark.yarn.scheduler.heartbeat.interval-ms。 该资源分配间隔会在每次带外心跳成功后但仍有待定资源申请时倍增, 直至达到 spark.yarn.scheduler.heartbeat.interval-ms 所设定的值。 spark.yarn.max.executor .failures 执行器个数*2且不小于3 Spark应用最大容忍执行器失败次数。 spark.yarn.historyServer .address (none) Spark history server地址,如:host.com:18080 。 这个地址不要包含协议头(http://)。 默认不设置,因为history server是可选的。 应用程序结束以后,YARN资源管理器web UI通过这个地址链接到Spark history server UI。 对于这属性,可以使用YARN属性变量,且这些变量是Spark在运行时组装的。 例如,如果Spark history server和YARN资源管理器(ResourceManager)部署在同一台机器上运行, 那么这个属性可以设置为 ${hadoopconf-yarn.resourcemanager.hostname}:18080 spark.yarn.dist.archives (none) 逗号分隔的文档列表,其指向的文档将被提取到每个执行器的工作目录下。 spark.yarn.dist.files (none) 逗号分隔的文件列表,其指向的文件将被复制到每个执行器的工作目录下。 spark.executor.instances 2 执行器个数。注意,这个属性和 spark.dynamicAllocation.enabled是不兼容的。 如果同时设置了spark.dynamicAllocation.enabled,那么动态分配将被关闭,并使用 spark.executor.instances 所设置的值。 spark.yarn.executor .memoryOverhead 执行器内存 * 0.10或者 384MB中较大者 每个执行器所分配的堆外内存(MB)总量。这些内存将被用于存储VM开销、字符串常量,以及其他原生开销等。这会使执行器所需内存增加(典型情况,增加6%~10%) spark.yarn.driver .memoryOverhead 驱动器内存 * 0.10或者 384MB中较大者 每个驱动器所分配的堆外内存(MB)总量。 这些内存将被用于存储VM开销、字符串常量,以及其他原生开销等。 这会使执行器所需内存增加(典型情况,增加6%~10%) spark.yarn.am .memoryOverhead Application Master 内存 * 0.10或者 384MB中较大者 与 spark.yarn.driver.memoryOverhead 相同,只是仅用于YARN AM client模式下。 spark.yarn.am.port (random) YARN AM所监听的端口。 在YARN client模式下,用于Spark驱动器(driver)和YARN AM通信。 而在YARN cluster模式下,这个端口将被用于动态执行器特性,这个特性会处理调度器后台杀死执行器的请求。 spark.yarn.queue default Spark应用提交到哪个yarn队列。 spark.yarn.jar (none) Spark jar文件位置,如果需要覆盖默认位置,请设定这个值。 默认的,Spark on YARN会使用本地的Spark jar包,但Spark jar包同样可以使用整个集群可读的HDFS文件位置。 这使YARN可以在各节点上缓存Spark jar包,而不需要每次运行一个应用的时候都要分发。 使用 hdfs:///some/path 来指定HDFS上jar包文件路径。 spark.yarn.access .namenodes (none) 逗号分隔的HDFS namenodes。 例如spark.yarn.access.namenodes=hdfs://nn1.com:8032,hdfs://nn2.com:8032。 Spark应用必须有这些机器的访问权限,并且需要配置好 kerberos(可以在同一个域或者信任的域)。 Spark需要每个namenode的安全token,以便访问集群中HDFS。 spark.yarn.appMasterEnv .[EnvironmentVariableName] (none) 增加EnvironmentVariableName所指定的环境变量到YARN AM的进程中。 用户可以指定多个环境变量。在cluster模式下,这个可以控制Spark驱动器的环境变量; 而在client模式下,只控制执行器启动器的环境变量。 spark.yarn .containerLauncherMaxThreads 25 YARN AM 启动执行器的容器最多包含多少线程数。 spark.yarn.am .extraJavaOptions (none) 在client模式下,传给YARN AM 的JVM参数。 在cluster模式下,请使用spark.driver.extraJavaOptions spark.yarn.am .extraLibraryPath (none) client模式下传给YARN AM额外依赖库。 spark.yarn.maxAppAttempts yarn .resourcemanager.am.max-attemptsin YARN 提交应用最大尝试次数。不应大于YARN全局配置的最大尝试次数。 spark.yarn.am .attemptFailuresValidityInterval (none) 定义AM失败跟踪校验间隔。 AM运行了至少要运行这么多时间后,其失败计数才被重置。 这个特性只有配置其值后才会生效,且只支持Hadoop-2.6+ spark.yarn.submit .waitAppCompletion true 在YARN cluster模式下,控制是否客户端等到Spark应用结束后再退出。 如果设为true,客户端进程将一直等待,并持续报告应用状态。 否则,客户端会在提交完成后退出。 spark.yarn.am .nodeLabelExpression (none) 一个YARN节点标签表达式(node label expression),以此来限制AM可以被调度到哪些节点上执行。 只有Hadoop 2.6+才能支持节点标签表达式,所以如果用其他版本运行,这个属性将被忽略。 spark.yarn.executor .nodeLabelExpression (none) 一个YARN节点标签表达式(node label expression),以此来限制执行器可以被调度到哪些节点上启动。 只有Hadoop 2.6+才能支持节点标签表达式,所以如果在其他版本上运行时,这个属性将被忽略。 spark.yarn.tags (none) 逗号分隔的字符串,传递YARN应用tags。 其值将出现在YARN Application Reports中,可以用来过滤和查询YARN 应用。 spark.yarn.keytab (none) 认证文件keytab的全路径。 这个文件将被复制到访问Secure Distributed Cache的YARN 应用节点上,并且周期性的刷新登陆的ticket和代理token(本地模式下也能work) spark.yarn.principal (none) 登陆KDC的认证,secure HDFS需要(local模式下也能用) spark.yarn.config .gatewayPath (none) 某些路径,可能在网关主机上能正常访问(Spark应用启动的地方),而在其他节点上的访问方式(路径)可能不同。 对于这样的路径,需要本属性配合 spark.yarn.config.replacementPath组合使用,对于支持异构配置的集群,必须配置好这两个值,Spark才能正确地启动远程进程。 replacement path 通常包含一些YARN导出的环境变量(因此,对Spark containers可见)。 例如,如果网关节点上Hadoop库安装在 /disk1/hadoop,并且其导出环境变量为 HADOOP_HOME, 就需要将spark.yarn.config.gatewayPath 设置为 /disk1/hadoop 并将 replacement path设为 $HADOOP_HOME, 这样才能在远程节点上以正确的环境变量启动进程。 spark.yarn.config .replacementPath (none) 见 spark.yarn.config.getewayPath spark.yarn.security .tokens.${service}.enabled true 在启用安全设置的情况下,控制是否对non-HDFS服务,获取代理token。 默认地,所有支持的服务,都启用;但你也可以在某些有冲突的情况下,对某些服务禁用。 目前支持的服务有:hive,hbase 重要提示 对CPU资源的请求是否满足,取决于调度器如何配置和使用。 cluster模式下,Spark执行器(executor)和驱动器(driver)的local目录都由YARN配置决定(yarn.nodemanager.local-dirs);如果用户指定了spark.local.dir,这时候将被忽略。在client模式下,Spark执行器(executor)的local目录由YARN决定,而驱动器(driver)的local目录由spark.local.dir决定,因为这时候,驱动器不在YARN上运行。 选项参数 –files和 –archives中井号(#)用法类似于Hadoop。例如,你可以指定 –files localtest.txt#appSees.txt,这将会把localtest.txt文件上传到HDFS上,并重命名为 appSees.txt,而你的程序应用用 appSees.txt来引用这个文件。 当你在cluster模式下使用本地文件时,使用选项–jar 才能让SparkContext.addJar正常工作,而不必使用 HDFS,HTTP,HTTPS或者FTP上的文件。 转载自 并发编程网 - ifeve.com

优秀的个人博客,低调大师

SparkSql官方文档中文翻译(java版本)

1 概述(Overview) 2 DataFrames 2.1 入口:SQLContext(Starting Point: SQLContext) 2.2 创建DataFrames(Creating DataFrames) 2.3 DataFrame操作(DataFrame Operations) 2.4 运行SQL查询程序(Running SQL Queries Programmatically) 2.5 DataFrames与RDDs的相互转换(Interoperating with RDDs) 2.5.1 使用反射获取Schema(Inferring the Schema Using Reflection) 2.5.2 通过编程接口指定Schema(Programmatically Specifying the Schema) 3 数据源(Data Source) 3.1 一般Load/Save方法 3.1.1 手动指定选项(Manually Specifying Options) 3.1.2 存储模式(Save Modes) 3.1.3 持久化到表(Saving to Persistent Tables) 3.2 Parquet文件 3.2.1 读取Parquet文件(Loading Data Programmatically) 3.2.2 解析分区信息(Partition Discovery) 3.2.3 Schema合并(Schema Merging) 3.2.4 Hive metastore Parquet表转换(Hive metastore Parquet table conversion) 3.2.4.1 Hive/Parquet Schema反射(Hive/Parquet Schema Reconciliation) 3.2.4.2 元数据刷新(Metadata Refreshing) 3.2.5 配置(Configuration) 3.3 JSON数据集 3.4 Hive表 3.4.1 访问不同版本的Hive Metastore(Interacting with Different Versions of Hive Metastore) 3.5 JDBC To Other Databases 3.6 故障排除(Troubleshooting) 4 性能调优 4.1 缓存数据至内存(Caching Data In Memory) 4.2 调优参数(Other Configuration Options) 5 分布式SQL引擎 5.1 运行Thrift JDBC/ODBC服务 5.2 运行Spark SQL CLI 6 Migration Guide 6.1 与Hive的兼容(Compatibility with Apache Hive 6.1.1 在Hive warehouse中部署Spark SQL 6.1.2 Spark SQL支持的Hive特性 6.1.3 不支持的Hive功能 7 Reference 7.1 Data Types 7.2 NaN 语义 1 概述(Overview) Spark SQL是Spark的一个组件,用于结构化数据的计算。Spark SQL提供了一个称为DataFrames的编程抽象,DataFrames可以充当分布式SQL查询引擎。 2 DataFrames DataFrame是一个分布式的数据集合,该数据集合以命名列的方式进行整合。DataFrame可以理解为关系数据库中的一张表,也可以理解为R/Python中的一个data frame。DataFrames可以通过多种数据构造,例如:结构化的数据文件、hive中的表、外部数据库、Spark计算过程中生成的RDD等。DataFrame的API支持4种语言:Scala、Java、Python、R。 2.1 入口:SQLContext(Starting Point: SQLContext) Spark SQL程序的主入口是SQLContext类或它的子类。创建一个基本的SQLContext,你只需要SparkContext,创建代码示例如下: Scala val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) Java JavaSparkContext sc = ...; // An existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); 除了基本的SQLContext,也可以创建HiveContext。SQLContext和HiveContext区别与联系为: SQLContext现在只支持SQL语法解析器(SQL-92语法) HiveContext现在支持SQL语法解析器和HiveSQL语法解析器,默认为HiveSQL语法解析器,用户可以通过配置切换成SQL语法解析器,来运行HiveSQL不支持的语法。 使用HiveContext可以使用Hive的UDF,读写Hive表数据等Hive操作。SQLContext不可以对Hive进行操作。 Spark SQL未来的版本会不断丰富SQLContext的功能,做到SQLContext和HiveContext的功能容和,最终可能两者会统一成一个Context HiveContext包装了Hive的依赖包,把HiveContext单独拿出来,可以在部署基本的Spark的时候就不需要Hive的依赖包,需要使用HiveContext时再把Hive的各种依赖包加进来。 SQL的解析器可以通过配置spark.sql.dialect参数进行配置。在SQLContext中只能使用Spark SQL提供的”sql“解析器。在HiveContext中默认解析器为”hiveql“,也支持”sql“解析器。 2.2 创建DataFrames(Creating DataFrames) 使用SQLContext,spark应用程序(Application)可以通过RDD、Hive表、JSON格式数据等数据源创建DataFrames。下面是基于JSON文件创建DataFrame的示例: Scala val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) val df = sqlContext.read.json("examples/src/main/resources/people.json") // Displays the content of the DataFrame to stdout df.show() Java JavaSparkContext sc = ...; // An existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json"); // Displays the content of the DataFrame to stdout df.show(); 2.3 DataFrame操作(DataFrame Operations) DataFrames支持Scala、Java和Python的操作接口。下面是Scala和Java的几个操作示例: Scala val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // Create the DataFrame val df = sqlContext.read.json("examples/src/main/resources/people.json") // Show the content of the DataFrame df.show() // age name // null Michael // 30 Andy // 19 Justin // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show() // name // Michael // Andy // Justin // Select everybody, but increment the age by 1 df.select(df("name"), df("age") + 1).show() // name (age + 1) // Michael null // Andy 31 // Justin 20 // Select people older than 21 df.filter(df("age") > 21).show() // age name // 30 Andy // Count people by age df.groupBy("age").count().show() // age count // null 1 // 19 1 // 30 1 Java JavaSparkContext sc // An existing SparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc) // Create the DataFrame DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json"); // Show the content of the DataFrame df.show(); // age name // null Michael // 30 Andy // 19 Justin // Print the schema in a tree format df.printSchema(); // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show(); // name // Michael // Andy // Justin // Select everybody, but increment the age by 1 df.select(df.col("name"), df.col("age").plus(1)).show(); // name (age + 1) // Michael null // Andy 31 // Justin 20 // Select people older than 21 df.filter(df.col("age").gt(21)).show(); // age name // 30 Andy // Count people by age df.groupBy("age").count().show(); // age count // null 1 // 19 1 // 30 1 详细的DataFrame API请参考API Documentation。 除了简单列引用和表达式,DataFrames还有丰富的library,功能包括string操作、date操作、常见数学操作等。详细内容请参考DataFrame Function Reference。 2.4 运行SQL查询程序(Running SQL Queries Programmatically) Spark Application可以使用SQLContext的sql()方法执行SQL查询操作,sql()方法返回的查询结果为DataFrame格式。代码如下: Scala val sqlContext = ... // An existing SQLContext val df = sqlContext.sql("SELECT * FROM table") Java SQLContext sqlContext = ... // An existing SQLContext DataFrame df = sqlContext.sql("SELECT * FROM table") 2.5 DataFrames与RDDs的相互转换(Interoperating with RDDs) Spark SQL支持两种RDDs转换为DataFrames的方式: 使用反射获取RDD内的Schema 当已知类的Schema的时候,使用这种基于反射的方法会让代码更加简洁而且效果也很好。 通过编程接口指定Schema 通过Spark SQL的接口创建RDD的Schema,这种方式会让代码比较冗长。 这种方法的好处是,在运行时才知道数据的列以及列的类型的情况下,可以动态生成Schema 2.5.1 使用反射获取Schema(Inferring the Schema Using Reflection) Spark SQL支持将JavaBean的RDD自动转换成DataFrame。通过反射获取Bean的基本信息,依据Bean的信息定义Schema。当前Spark SQL版本(Spark 1.5.2)不支持嵌套的JavaBeans和复杂数据类型(如:List、Array)。创建一个实现Serializable接口包含所有属性getters和setters的类来创建一个JavaBean。通过调用createDataFrame并提供JavaBean的Class object,指定一个Schema给一个RDD。示例如下: public static class Person implements Serializable { private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } } // sc is an existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); // Load a text file and convert each line to a JavaBean. JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map( new Function<String, Person>() { public Person call(String line) throws Exception { String[] parts = line.split(","); Person person = new Person(); person.setName(parts[0]); person.setAge(Integer.parseInt(parts[1].trim())); return person; } }); // Apply a schema to an RDD of JavaBeans and register it as a table. DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class); schemaPeople.registerTempTable("people"); // SQL can be run over RDDs that have been registered as tables. DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() { public String call(Row row) { return "Name: " + row.getString(0); } }).collect(); 2.5.2 通过编程接口指定Schema(Programmatically Specifying the Schema) 当JavaBean不能被预先定义的时候,编程创建DataFrame分为三步: 从原来的RDD创建一个Row格式的RDD 创建与RDD中Rows结构匹配的StructType,通过该StructType创建表示RDD的Schema 通过SQLContext提供的createDataFrame方法创建DataFrame,方法参数为RDD的Schema 示例如下: import org.apache.spark.api.java.function.Function; // Import factory methods provided by DataTypes. import org.apache.spark.sql.types.DataTypes; // Import StructType and StructField import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.types.StructField; // Import Row. import org.apache.spark.sql.Row; // Import RowFactory. import org.apache.spark.sql.RowFactory; // sc is an existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); // Load a text file and convert each line to a JavaBean. JavaRDD<String> people = sc.textFile("examples/src/main/resources/people.txt"); // The schema is encoded in a string String schemaString = "name age"; // Generate the schema based on the string of schema List<StructField> fields = new ArrayList<StructField>(); for (String fieldName: schemaString.split(" ")) { fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true)); } StructType schema = DataTypes.createStructType(fields); // Convert records of the RDD (people) to Rows. JavaRDD<Row> rowRDD = people.map( new Function<String, Row>() { public Row call(String record) throws Exception { String[] fields = record.split(","); return RowFactory.create(fields[0], fields[1].trim()); } }); // Apply the schema to the RDD. DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema); // Register the DataFrame as a table. peopleDataFrame.registerTempTable("people"); // SQL can be run over RDDs that have been registered as tables. DataFrame results = sqlContext.sql("SELECT name FROM people"); // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. List<String> names = results.javaRDD().map(new Function<Row, String>() { public String call(Row row) { return "Name: " + row.getString(0); } }).collect(); 3 数据源(Data Source) Spark SQL的DataFrame接口支持多种数据源的操作。一个DataFrame可以进行RDDs方式的操作,也可以被注册为临时表。把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询。Data Sources这部分首先描述了对Spark的数据源执行加载和保存的常用方法,然后对内置数据源进行深入介绍。 3.1 一般Load/Save方法 Spark SQL的默认数据源为Parquet格式。数据源为Parquet文件时,Spark SQL可以方便的执行所有的操作。修改配置项spark.sql.sources.default,可修改默认数据源格式。读取Parquet文件示例如下: Scala val df = sqlContext.read.load("examples/src/main/resources/users.parquet") df.select("name", "favorite_color").write.save("namesAndFavColors.parquet") Java DataFrame df = sqlContext.read().load("examples/src/main/resources/users.parquet"); df.select("name", "favorite_color").write().save("namesAndFavColors.parquet"); 3.1.1 手动指定选项(Manually Specifying Options) 当数据源格式不是parquet格式文件时,需要手动指定数据源的格式。数据源格式需要指定全名(例如:org.apache.spark.sql.parquet),如果数据源格式为内置格式,则只需要指定简称(json,parquet,jdbc)。通过指定的数据源格式名,可以对DataFrames进行类型转换操作。示例如下: Scala val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json") df.select("name", "age").write.format("parquet").save("namesAndAges.parquet") Java DataFrame df = sqlContext.read().format("json").load("examples/src/main/resources/people.json"); df.select("name", "age").write().format("parquet").save("namesAndAges.parquet"); 3.1.2 存储模式(Save Modes) 可以采用SaveMode执行存储操作,SaveMode定义了对数据的处理模式。需要注意的是,这些保存模式不使用任何锁定,不是原子操作。此外,当使用Overwrite方式执行时,在输出新数据之前原数据就已经被删除。SaveMode详细介绍如下表: 3.1.3 持久化到表(Saving to Persistent Tables) 当使用HiveContext时,可以通过saveAsTable方法将DataFrames存储到表中。与registerTempTable方法不同的是,saveAsTable将DataFrame中的内容持久化到表中,并在HiveMetastore中存储元数据。存储一个DataFrame,可以使用SQLContext的table方法。table先创建一个表,方法参数为要创建的表的表名,然后将DataFrame持久化到这个表中。 默认的saveAsTable方法将创建一个“managed table”,表示数据的位置可以通过metastore获得。当存储数据的表被删除时,managed table也将自动删除。 3.2 Parquet文件 Parquet是一种支持多种数据处理系统的柱状的数据格式,Parquet文件中保留了原始数据的模式。Spark SQL提供了Parquet文件的读写功能。 3.2.1 读取Parquet文件(Loading Data Programmatically) 读取Parquet文件示例如下: Scala // sqlContext from the previous example is used in this example. // This is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ val people: RDD[Person] = ... // An RDD of case class objects, from the previous example. // The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet. people.write.parquet("people.parquet") // Read in the parquet file created above. Parquet files are self-describing so the schema is preserved. // The result of loading a Parquet file is also a DataFrame. val parquetFile = sqlContext.read.parquet("people.parquet") //Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerTempTable("parquetFile") val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenagers.map(t => "Name: " + t(0)).collect().foreach(println) Java // sqlContext from the previous example is used in this example. DataFrame schemaPeople = ... // The DataFrame from the previous example. // DataFrames can be saved as Parquet files, maintaining the schema information. schemaPeople.write().parquet("people.parquet"); // Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved. // The result of loading a parquet file is also a DataFrame. DataFrame parquetFile = sqlContext.read().parquet("people.parquet"); // Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerTempTable("parquetFile"); DataFrame teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"); List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() { public String call(Row row) { return "Name: " + row.getString(0); } }).collect(); 3.2.2 解析分区信息(Partition Discovery) 对表进行分区是对数据进行优化的方式之一。在分区的表内,数据通过分区列将数据存储在不同的目录下。Parquet数据源现在能够自动发现并解析分区信息。例如,对人口数据进行分区存储,分区列为gender和country,使用下面的目录结构: path └── to └── table ├── gender=male │ ├── ... │ │ │ ├── country=US │ │ └── data.parquet │ ├── country=CN │ │ └── data.parquet │ └── ... └── gender=female ├── ... │ ├── country=US │ └── data.parquet ├── country=CN │ └── data.parquet └── ... 通过传递path/to/table给 SQLContext.read.parquet或SQLContext.read.load,Spark SQL将自动解析分区信息。返回的DataFrame的Schema如下: root |-- name: string (nullable = true) |-- age: long (nullable = true) |-- gender: string (nullable = true) |-- country: string (nullable = true) 需要注意的是,数据的分区列的数据类型是自动解析的。当前,支持数值类型和字符串类型。自动解析分区类型的参数为:spark.sql.sources.partitionColumnTypeInference.enabled,默认值为true。如果想关闭该功能,直接将该参数设置为disabled。此时,分区列数据格式将被默认设置为string类型,不再进行类型解析。 3.2.3 Schema合并(Schema Merging) 像ProtocolBuffer、Avro和Thrift那样,Parquet也支持Schema evolution(Schema演变)。用户可以先定义一个简单的Schema,然后逐渐的向Schema中增加列描述。通过这种方式,用户可以获取多个有不同Schema但相互兼容的Parquet文件。现在Parquet数据源能自动检测这种情况,并合并这些文件的schemas。 因为Schema合并是一个高消耗的操作,在大多数情况下并不需要,所以Spark SQL从1.5.0开始默认关闭了该功能。可以通过下面两种方式开启该功能: 当数据源为Parquet文件时,将数据源选项mergeSchema设置为true 设置全局SQL选项spark.sql.parquet.mergeSchema为true 示例如下: Scala // sqlContext from the previous example is used in this example. // This is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ // Create a simple DataFrame, stored into a partition directory val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double") df1.write.parquet("data/test_table/key=1") // Create another DataFrame in a new partition directory, // adding a new column and dropping an existing column val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple") df2.write.parquet("data/test_table/key=2") // Read the partitioned table val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table") df3.printSchema() // The final schema consists of all 3 columns in the Parquet files together // with the partitioning column appeared in the partition directory paths. // root // |-- single: int (nullable = true) // |-- double: int (nullable = true) // |-- triple: int (nullable = true) // |-- key : int (nullable = true) 3.2.4 Hive metastore Parquet表转换(Hive metastore Parquet table conversion) 当向Hive metastore中读写Parquet表时,Spark SQL将使用Spark SQL自带的Parquet SerDe(SerDe:Serialize/Deserilize的简称,目的是用于序列化和反序列化),而不是用Hive的SerDe,Spark SQL自带的SerDe拥有更好的性能。这个优化的配置参数为spark.sql.hive.convertMetastoreParquet,默认值为开启。 3.2.4.1 Hive/Parquet Schema反射(Hive/Parquet Schema Reconciliation) 从表Schema处理的角度对比Hive和Parquet,有两个区别: Hive区分大小写,Parquet不区分大小写 hive允许所有的列为空,而Parquet不允许所有的列全为空 由于这两个区别,当将Hive metastore Parquet表转换为Spark SQL Parquet表时,需要将Hive metastore schema和Parquet schema进行一致化。一致化规则如下: 这两个schema中的同名字段必须具有相同的数据类型。一致化后的字段必须为Parquet的字段类型。这个规则同时也解决了空值的问题。 一致化后的schema只包含Hive metastore中出现的字段。 忽略只出现在Parquet schema中的字段 只在Hive metastore schema中出现的字段设为nullable字段,并加到一致化后的schema中 3.2.4.2 元数据刷新(Metadata Refreshing) Spark SQL缓存了Parquet元数据以达到良好的性能。当Hive metastore Parquet表转换为enabled时,表修改后缓存的元数据并不能刷新。所以,当表被Hive或其它工具修改时,则必须手动刷新元数据,以保证元数据的一致性。示例如下: Scala // sqlContext is an existing HiveContext sqlContext.refreshTable("my_table") Java // sqlContext is an existing HiveContext sqlContext.refreshTable("my_table") 3.2.5 配置(Configuration) 配置Parquet可以使用SQLContext的setConf方法或使用SQL执行SET key=value命令。详细参数说明如下: 3.3 JSON数据集 Spark SQL能自动解析JSON数据集的Schema,读取JSON数据集为DataFrame格式。读取JSON数据集方法为SQLContext.read().json()。该方法将String格式的RDD或JSON文件转换为DataFrame。 需要注意的是,这里的JSON文件不是常规的JSON格式。JSON文件每一行必须包含一个独立的、自满足有效的JSON对象。如果用多行描述一个JSON对象,会导致读取出错。读取JSON数据集示例如下: Scala // sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files. val path = "examples/src/main/resources/people.json" val people = sqlContext.read.json(path) // The inferred schema can be visualized using the printSchema() method. people.printSchema() // root // |-- age: integer (nullable = true) // |-- name: string (nullable = true) // Register this DataFrame as a table. people.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // Alternatively, a DataFrame can be created for a JSON dataset represented by // an RDD[String] storing one JSON object per string. val anotherPeopleRDD = sc.parallelize( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val anotherPeople = sqlContext.read.json(anotherPeopleRDD) Java // sc is an existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files. DataFrame people = sqlContext.read().json("examples/src/main/resources/people.json"); // The inferred schema can be visualized using the printSchema() method. people.printSchema(); // root // |-- age: integer (nullable = true) // |-- name: string (nullable = true) // Register this DataFrame as a table. people.registerTempTable("people"); // SQL statements can be run by using the sql methods provided by sqlContext. DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19"); // Alternatively, a DataFrame can be created for a JSON dataset represented by // an RDD[String] storing one JSON object per string. List<String> jsonData = Arrays.asList( "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}"); JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData); DataFrame anotherPeople = sqlContext.read().json(anotherPeopleRDD); 3.4 Hive表 Spark SQL支持对Hive的读写操作。需要注意的是,Hive所依赖的包,没有包含在Spark assembly包中。增加Hive时,需要在Spark的build中添加 -Phive 和 -Phivethriftserver配置。这两个配置将build一个新的assembly包,这个assembly包含了Hive的依赖包。注意,必须上这个心的assembly包到所有的worker节点上。因为worker节点在访问Hive中数据时,会调用Hive的 serialization and deserialization libraries(SerDes),此时将用到Hive的依赖包。 Hive的配置文件为conf/目录下的hive-site.xml文件。在YARN上执行查询命令之前,lib_managed/jars目录下的datanucleus包和conf/目录下的hive-site.xml必须可以被driverhe和所有的executors所访问。确保被访问,最方便的方式就是在spark-submit命令中通过--jars选项和--file选项指定。 操作Hive时,必须创建一个HiveContext对象,HiveContext继承了SQLContext,并增加了对MetaStore和HiveQL的支持。除了sql方法,HiveContext还提供了一个hql方法,hql方法可以执行HiveQL语法的查询语句。示例如下: Scala // sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL sqlContext.sql("FROM src SELECT key, value").collect().foreach(println) Java // sc is an existing JavaSparkContext. HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc); sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"); sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"); // Queries are expressed in HiveQL. Row[] results = sqlContext.sql("FROM src SELECT key, value").collect(); 3.4.1 访问不同版本的Hive Metastore(Interacting with Different Versions of Hive Metastore) Spark SQL经常需要访问Hive metastore,Spark SQL可以通过Hive metastore获取Hive表的元数据。从Spark 1.4.0开始,Spark SQL只需简单的配置,就支持各版本Hive metastore的访问。注意,涉及到metastore时Spar SQL忽略了Hive的版本。Spark SQL内部将Hive反编译至Hive 1.2.1版本,Spark SQL的内部操作(serdes, UDFs, UDAFs, etc)都调用Hive 1.2.1版本的class。版本配置项见下面表格: 3.5 JDBC To Other Databases Spark SQL支持使用JDBC访问其他数据库。当时用JDBC访问其它数据库时,最好使用JdbcRDD。使用JdbcRDD时,Spark SQL操作返回的DataFrame会很方便,也会很方便的添加其他数据源数据。JDBC数据源因为不需要用户提供ClassTag,所以很适合使用Java或Python进行操作。使用JDBC访问数据源,需要在spark classpath添加JDBC driver配置。例如,从Spark Shell连接postgres的配置为: SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell 远程数据库的表,可用DataFrame或Spark SQL临时表的方式调用数据源API。支持的参数有: 代码示例如下: Scala val jdbcDF = sqlContext.read.format("jdbc").options( Map("url" -> "jdbc:postgresql:dbserver", "dbtable" -> "schema.tablename")).load() Java Map<String, String> options = new HashMap<String, String>(); options.put("url", "jdbc:postgresql:dbserver"); options.put("dbtable", "schema.tablename"); DataFrame jdbcDF = sqlContext.read().format("jdbc"). options(options).load(); 3.6 故障排除(Troubleshooting) 在客户端session和所有的executors上,JDBC driver必须对启动类加载器(primordial class loader)设置为visible。因为当创建一个connection时,Java的DriverManager类会执行安全验证,安全验证将忽略所有对启动类加载器为非visible的driver。一个很方便的解决方法是,修改所有worker节点上的compute_classpath.sh脚本,将driver JARs添加至脚本。 有些数据库(例:H2)将所有的名字转换为大写,所以在这些数据库中,Spark SQL也需要将名字全部大写。 4 性能调优 4.1 缓存数据至内存(Caching Data In Memory) Spark SQL可以通过调用sqlContext.cacheTable("tableName") 或者dataFrame.cache(),将表用一种柱状格式( an in­memory columnar format)缓存至内存中。然后Spark SQL在执行查询任务时,只需扫描必需的列,从而以减少扫描数据量、提高性能。通过缓存数据,Spark SQL还可以自动调节压缩,从而达到最小化内存使用率和降低GC压力的目的。调用sqlContext.uncacheTable("tableName")可将缓存的数据移出内存。 可通过两种配置方式开启缓存数据功能: 使用SQLContext的setConf方法 执行SQL命令 SET key=value 4.2 调优参数(Other Configuration Options) 可以通过配置下表中的参数调节Spark SQL的性能。在后续的Spark版本中将逐渐增强自动调优功能,下表中的参数在后续的版本中或许将不再需要配置。 5 分布式SQL引擎 使用Spark SQL的JDBC/ODBC或者CLI,可以将Spark SQL作为一个分布式查询引擎。终端用户或应用不需要编写额外的代码,可以直接使用Spark SQL执行SQL查询。 5.1 运行Thrift JDBC/ODBC服务 这里运行的Thrift JDBC/ODBC服务与Hive 1.2.1中的HiveServer2一致。可以在Spark目录下执行如下命令来启动JDBC/ODBC服务: ./sbin/start-thriftserver.sh 这个命令接收所有bin/spark-submit命令行参数,添加一个--hiveconf参数来指定Hive的属性。详细的参数说明请执行命令./sbin/start-thriftserver.sh --help。服务默认监听端口为localhost:10000。有两种方式修改默认监听端口: 修改环境变量: export HIVE_SERVER2_THRIFT_PORT=<listening-port> export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host> ./sbin/start-thriftserver.sh \ --master <master-uri> \ ... 修改系统属性 ./sbin/start-thriftserver.sh \ --hiveconf hive.server2.thrift.port=<listening-port> \ --hiveconf hive.server2.thrift.bind.host=<listening-host> \ --master <master-uri> ... 使用beeline来测试Thrift JDBC/ODBC服务: ./bin/beeline 连接到Thrift JDBC/ODBC服务 beeline> !connect jdbc:hive2://localhost:10000 在非安全模式下,只需要输入机器上的一个用户名即可,无需密码。在安全模式下,beeline会要求输入用户名和密码。安全模式下的详细要求,请阅读beeline documentation的说明。 配置Hive需要替换conf/目录下的hive-site.xml。 Thrift JDBC服务也支持通过HTTP传输发送thrift RPC messages。开启HTTP模式需要将下面的配参数配置到系统属性或conf/:下的hive-site.xml中 hive.server2.transport.mode - Set this to value: http hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001 hive.server2.http.endpoint - HTTP endpoint; default is cliservice 测试http模式,可以使用beeline链接JDBC/ODBC服务: beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint> 5.2 运行Spark SQL CLI Spark SQL CLI可以很方便的在本地运行Hive元数据服务以及从命令行执行查询任务。需要注意的是,Spark SQL CLI不能与Thrift JDBC服务交互。在Spark目录下执行如下命令启动Spark SQL CLI: ./bin/spark-sql 配置Hive需要替换conf/下的hive-site.xml。执行./bin/spark-sql --help可查看详细的参数说明 。 6 Migration Guide 6.1 与Hive的兼容(Compatibility with Apache Hive) Spark SQL与Hive Metastore、SerDes、UDFs相兼容。Spark SQL兼容Hive Metastore从0.12到1.2.1的所有版本。Spark SQL也与Hive SerDes和UDFs相兼容,当前SerDes和UDFs是基于Hive 1.2.1。 6.1.1 在Hive warehouse中部署Spark SQL Spark SQL Thrift JDBC服务与Hive相兼容,在已存在的Hive上部署Spark SQL Thrift服务不需要对已存在的Hive Metastore做任何修改,也不需要对数据做任何改动。 6.1.2 Spark SQL支持的Hive特性 Spark SQL支持多部分的Hive特性,例如: Hive查询语句,包括: SELECT GROUP BY ORDER BY CLUSTER BY SORT BY 所有Hive运算符,包括 比较操作符(=, ⇔, ==, <>, <, >, >=, <=, etc) 算术运算符(+, -, *, /, %, etc) 逻辑运算符(AND, &&, OR, ||, etc) 复杂类型构造器 数学函数(sign,ln,cos,etc) 字符串函数(instr,length,printf,etc) 用户自定义函数(UDF) 用户自定义聚合函数(UDAF) 用户自定义序列化格式器(SerDes) 窗口函数 Joins JOIN {LEFT|RIGHT|FULL} OUTER JOIN LEFT SEMI JOIN CROSS JOIN Unions 子查询 SELECT col FROM ( SELECT a + b AS col from t1) t2 Sampling Explain 表分区,包括动态分区插入 视图 所有的Hive DDL函数,包括: CREATE TABLE CREATE TABLE AS SELECT ALTER TABLE 大部分的Hive数据类型,包括: TINYINT SMALLINT INT BIGINT BOOLEAN FLOAT DOUBLE STRING BINARY TIMESTAMP DATE ARRAY<> MAP<> STRUCT<> 6.1.3 不支持的Hive功能 下面是当前不支持的Hive特性,其中大部分特性在实际的Hive使用中很少用到。 Major Hive Features Tables with buckets:bucket是在一个Hive表分区内进行hash分区。Spark SQL当前不支持。 Esoteric Hive Features UNION type Unique join Column statistics collecting:当期Spark SQL不智齿列信息统计,只支持填充Hive Metastore的sizeInBytes列。 Hive Input/Output Formats File format for CLI: 这个功能用于在CLI显示返回结果,Spark SQL只支持TextOutputFormat Hadoop archive Hive优化部分Hive优化还没有添加到Spark中。没有添加的Hive优化(比如索引)对Spark SQL这种in-memory计算模型来说不是特别重要。下列Hive优化将在后续Spark SQL版本中慢慢添加。 块级别位图索引和虚拟列(用于建立索引) 自动检测joins和groupbys的reducer数量:当前Spark SQL中需要使用“SET spark.sql.shuffle.partitions=[num_tasks];”控制post-shuffle的并行度,不能自动检测。 仅元数据查询:对于可以通过仅使用元数据就能完成的查询,当前Spark SQL还是需要启动任务来计算结果。 数据倾斜标记:当前Spark SQL不遵循Hive中的数据倾斜标记 jion中STREAMTABLE提示:当前Spark SQL不遵循STREAMTABLE提示 查询结果为多个小文件时合并小文件:如果查询结果包含多个小文件,Hive能合并小文件为几个大文件,避免HDFS metadata溢出。当前Spark SQL不支持这个功能。 7 Reference 7.1 Data Types Spark SQL和DataFrames支持的数据格式如下: 数值类型 ByteType: 代表1字节有符号整数. 数值范围: -128 到 127. ShortType: 代表2字节有符号整数. 数值范围: -32768 到 32767. IntegerType: 代表4字节有符号整数. 数值范围: -2147483648 t到 2147483647. LongType: 代表8字节有符号整数. 数值范围: -9223372036854775808 到 9223372036854775807. FloatType: 代表4字节单精度浮点数。 DoubleType: 代表8字节双精度浮点数。 DecimalType: 表示任意精度的有符号十进制数。内部使用java.math.BigDecimal.A实现。 BigDecimal由一个任意精度的整数非标度值和一个32位的整数组成。 String类型 StringType: 表示字符串值。 Binary类型 BinaryType: 代表字节序列值。 Boolean类型 BooleanType: 代表布尔值。 Datetime类型 TimestampType: 代表包含的年、月、日、时、分和秒的时间值 DateType: 代表包含的年、月、日的日期值 复杂类型 ArrayType(elementType, containsNull): 代表包含一系列类型为elementType的元素。如果在一个将ArrayType值的元素可以为空值,containsNull指示是否允许为空。 MapType(keyType, valueType, valueContainsNull): 代表一系列键值对的集合。key不允许为空,valueContainsNull指示value是否允许为空 StructType(fields): 代表带有一个StructFields(列)描述结构数据。 StructField(name, dataType, nullable): 表示StructType中的一个字段。name表示列名、dataType表示数据类型、nullable指示是否允许为空。 Spark SQL所有的数据类型在org.apache.spark.sql.types包内。不同语言访问或创建数据类型方法不一样: Scala代码中添加import org.apache.spark.sql.types._,再进行数据类型访问或创建操作。 Java可以使用org.apache.spark.sql.types.DataTypes中的工厂方法,如下表: 7.2 NaN 语义 当处理float或double类型时,如果类型不符合标准的浮点语义,则使用专门的处理方式NaN。需要注意的是: NaN = NaN 返回 true 可以对NaN值进行聚合操作 在join操作中,key为NaN时,NaN值与普通的数值处理逻辑相同 NaN值大于所有的数值型数据,在升序排序中排在最后 转自:http://www.cnblogs.com/BYRans/

优秀的个人博客,低调大师

Apache DolphinScheduler 官方发布 3.2.0 版本!大数据调度

今天,Apache DolphinScheduler 3.2.0 版本在万众期待中终于发布了!在之前的预告中,包括《重磅预告!Apache DolphinScheduler 3.2.0 新功能“剧透”》、《3.2.0 版本预告!Apache DolphinScheduler API 增强相关功能》、《3.2.0 版本预告!远程日志解决 Worker 故障获取不到日志的问题》,以及《3.2.0 终极预告!云原生支持新增 Spark on k8S 支持》文章汇总已经大致覆盖了 3.2.0 版本的全新功能和优化。 现在,来看看新版本的全新“样貌”吧! Release Note: https://github.com/apache/dolphinscheduler/releases/tag/3.2.0 下载地址: https://dolphinscheduler.apache.org/en-us/download/3.2.0 主要更新包括: 添加默认租户 新增多种数据源 新增任务类型 重跑任务时指定工作流向前、向后运行 增加远程日志功能 参数优化 资源中心 增强页面易用性 云原生支持新增 Spark on k8S 支持 增加了部分 Restful API 注册中心增加 ETCD、JDBC 注册中心 架构优化 添加默认租户 在之前的版本中,用户部署完毕后必须手动添加租户。3.2.0 版本中添加了默认租户,方便用户更直接地使用 Apache DolphinScheduler。 新增多种数据源 新增了多个数据源,如 Snowflake、Databend、Kyuubi、Doris、OceanBase、Dameng、AzureSQL、StarRocks、AWS Athena、,并且更新了部分数据源,如 Redshift 增加 Access key。 新增任务类型 新增了多个任务类型,包括: 通用模块中,增加Remote-shell组件、Java Task Cloud模块中,新增 Amazon DMS、Azure Datafactory、AWS Database Migration,增强与各种云的互联互通 机器学习模块中,新增Kubeflow组件(基于云原生构建的机器学习任务工具大合集) 其他模块中,增加 AmazonDatasync、Apache Linkis 并更新了部分任务,如 DataX 支持 Presto,http任务增加output 参数传递,运行批量同时 kill 多个 Yarn 任务: Dependent 支持依赖自己: 支持了 Zeppelin 鉴权; 此外,任务现在可以支持缓存; Sqoop 日志支持隐藏密码; 以及 SQL 任务支持默认切割符: 新增远程日志功能 3.2.0 版本增加了远程日志功能,并同时支持了 Google Cloud Storage、Amazon S3、阿里云 OSS 日志存储,用户可以通过编辑配置文件,把日志存储到云端,解决万一意外情况发生,Woker 日志不存在,用户无法查看日志的问题。 详情参加《3.2.0 版本预告!远程日志解决 Worker 故障获取不到日志的问题》。 参数优化 增加了项目级别参数 调整参数优先级,启动参数最高 增加了内置参数计算规则 增加了文件类型的参数 云原生相关 支持 KEDA 做 worker 自动扩缩容 支持 Terraform 部署到 AWS zk 和 pg 支持多架构 提交 Spark 任务到 Kubernetes(详情见《3.2.0 终极预告!云原生支持新增 Spark on k8S 支持》) 获取 pod 实时日志 自定义 k8s 任务标签 资源中心 增加了 Alibaba Cloud OSS 、Huawei Cloud OBS、Azure Blob Storage的支持,重构资源中心并设计默认使用本地作为存储介质,重新支持了 re-upload。 资源中心容许覆盖上传,优化文件路径,显示文件的全部路径。另外,之前版本中资源中心已经上传的同类型文件只能删除后重新上传,新版本中对本功能进行了优化,可以点击上传按钮进行上传。 支持 reupload 文件 API 增强 3.2.0 版本中,增加了部分 Restful API,包括 taskInstance、workflow state、workflowInstance、workflow and schedule、task relation,且API 触发工作流运行可以获得 instance ID,从而使得 Apache DolphinScheduler 的 API 能力得到显著增强。 详情参见:《3.2.0 版本预告!Apache DolphinScheduler API 增强相关功能》 增加页面易用性 3.2.0 增加了页面易用性和便利性,如增加 workflow instance 跳转到当前工作流、复制工作流名称、调整列宽等操作。 跳转到工作流实例 复制工作流名称 调整列表名称宽度 默认情况下会有 default 租户和本地资源中心,安装后就能使用。 默认租户 允许在 workflow instance 中重新运行任务,任务运行日志更加明确。 可以重新运行任务 json 导出可阅读性加强。 注册中心 增加了 ETCD、JDBC 注册中心。 架构 Alert 支持 HA 单线程更新 Kerberos Worker server 移除了 dao 依赖 接管 task instance 失败的任务 增加动态任务组配置 重构了逻辑任务和远程命令 资源限制(cpu 内存)从原来绝对值改成百分比 支持了 SSO 其中,支持了 SSO 后,用户可以通过 Casdoor 实现 SSO 登录。Casdoor 是基于 OAuth 2.0、OIDC、SAML 和 CAS 的面向 UI 的身份访问管理(IAM)/单点登录(SSO)平台,需要先部署 Casdoor 并获取 `Client ID` 和 `Client secret` 两个字段,再修改 dolphinscheduler-api/src/main/resources/application.yaml 文件配置 SSO。 可以通过以下步骤通过 Casdoor 为 Apache Dolphinscheduler 添加 SSO 功能: security: authentication: # Authentication types (supported types: PASSWORD,LDAP,CASDOOR_SSO) type: CASDOOR_SSO casdoor: # Your Casdoor server url endpoint: client-id: client-secret: # The certificate may be multi-line, you can use `|-` for ease certificate: # Your organization name added in Casdoor organization-name: # Your application name added in Casdoor application-name: # Doplhinscheduler login url redirect-url: http://localhost:5173/login 贡献者列表 感谢@zhongjiajie对此次发版的指导,以及下列贡献者的支持: 106umao, Abingcbc, AliceXiaoLu, BongBongBang, CallMeKingsley97, Chris-Arith, DarkAssassinator, EricGao888, EricPyZhou, FlechazoW, Gallardot, GavinGYM, IT-Kwj, LiXuemin, LucasClt, Mukvin, NoSuchField, Orange-Summer, QuantumXiecao, Radeity, Rianico, SYSU-Coder, SbloodyS, Tianqi-Dotes, TyrantLucifer, ZhongJinHacker, Zzih, ahuljh, alei1206, alextinng, amaoisnb, arlendp, baihongbin, bmk15897, boy-xiaozhang, c3Vu, caishunfeng, calvinjiang, darrkz, davidzollo, dddyszy, devosend, ediconss, eye-gu, fengjian1129, fuchanghai, guowei-su, haibingtown, hantmac, hdygxsj, hezean, hiSandog, hoey94, hstdream, huage1994, imizao, insist777, iuhoay, jackfanwan, jbampton, jieguangzhou, kezhenxu94, kingbabingge, labbomb, lenian, ly109974, lynn-illumio, moonkop, muggleChen, pandong2011, pppppjcc, qianli2022, qindongliang, qingwli, rickchengx, ruanwenjun, sandiegoe, seedscoder, shangeyao, shenyun, simsicon, sketchmind, stalary, tracehh, whhe, xdu-chenrj, xiaomin0322, xinxingi, xuchunlai, xxjingcd, yeahhhz, youzipi, zhangfane, zhangkuantian, zhaohehuhu,zhoufanglu, zhuangchong, zhutong6688, zhuxt2015, zzzhangqi 本文由 白鲸开源科技 提供发布支持!

优秀的个人博客,低调大师

腾讯官方 QQ for Linux 新版本开始公测

2019 年,腾讯低调发布了 Linux QQ 的更新,目前版本停留在2.0 Beta2。 时隔 3 年,QQ for Linux 基于 NT 技术架构迎来全新升级。今日(12 月 7 日)起,全新 Linux QQ 正式开启公测,版本号为 2.0.1。 下载链接:deb|rpm|AppImage 需要注意的是,QQ for Linux 新版本目前只支持 x64 架构,arm64 架构还在加急适配中。 根据腾讯 QQ 项目组的通告,全新的 QQ for Linux 基于 Electron 开发,因此理论上支持所有 Linux 发行版。 Electron 是跨平台的桌面应用开发工具,基于 Electron 构建的应用可同时支持 Linux、Windows 和 Mac。 一张广泛传播的 QQ 聊天截图显示,基于 Electron 构建的 QQ 还会提供支持 Windows 的版本,并将于明年对外公布。 目前已经推出的 QQ for Mac 新版本也提到了采用全新架构——“QQNT技术架构”,而 QQ for Mac 新版本正是基于 Electron 构建。 延伸阅读 腾讯悄悄发布 Linux QQ,版本 2.0 Beta QQ for Linux 复活,微信 for Linux 还远吗?

优秀的个人博客,低调大师

Spring Authorization Server 0.3.0 发布,官方文档正式上线

基于OAuth2.1的授权服务器Spring Authorization Server 0.3.0今天正式发布,在本次更新中有几大亮点。 文档正式上线 Spring Authorization Server 的文档随着本次更新正式发布了,目前已经可以在Spring官网访问。 地址是:https://spring.io/projects/spring-authorization-server 该文档目前包含了以下几个重要的模块: 项目概述:简介和功能列表。 获得帮助:示例、常见问题和issues。 入门: 系统要求、依赖和引导你开发第一个应用。 配置模型: 默认配置和自定义配置。 核心模型/组件: 核心的领域模型和组件接口介绍。 协议端点: OAuth2 和OIDC 1.0协议端点的实现。 使用指南: Spring Authorization Server 的指南。 0.3.0的重大变化 将仅包含常量的接口更改为最终类。 将 OAuth2TokenCustomizer 移动到令牌包下。 删除标记为@Deprecation的弃用功能代码。 移除 JwtEncoder 和相关的类。 删除 令牌上下文构建器中的OAuth2TokenClaimsContext.Builder.claims() 。 删除令牌自省中的claim访问器 OAuth2TokenIntrospectionClaimAccessor。 删除对OAuth2中对PKCEplain类型的code_challenge_method的支持。 更多的新特性请参考0.3.0 changelog。 依赖升级 本版本支持刚刚发布的Spring Boot 2.7.0和Spring Security 5.7.1 Update to com.squareup.okhttp3:4.9.3 Update to jackson-bom:2.13.3 Update to mockito-core:4.5.1 Update to nimbus-jose-jwt:9.22 Update to Spring Boot 2.7.0 Update to Spring Framework 5.3.20 Update to Spring Security 5.7.1 新的贡献者 在本次版本中又增加了两名新的贡献者(Contributor): @appchemist @NotFound403

资源下载

更多资源
优质分享App

优质分享App

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

腾讯云软件源

腾讯云软件源

为解决软件依赖安装时官方源访问速度慢的问题,腾讯云为一些软件搭建了缓存服务。您可以通过使用腾讯云软件源站来提升依赖包的安装速度。为了方便用户自由搭建服务架构,目前腾讯云软件源站支持公网访问和内网访问。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。