首页 文章 精选 留言 我的

精选列表

搜索[官方镜像],共10000篇文章
优秀的个人博客,低调大师

《Spark官方文档》Spark Streaming编程指南(一)

Spark Streaming编程指南 概览 Spark Streaming是对核心Spark API的一个扩展,它能够实现对实时数据流的流式处理,并具有很好的可扩展性、高吞吐量和容错性。Spark Streaming支持从多种数据源提取数据,如:Kafka、Flume、Twitter、ZeroMQ、Kinesis以及TCP套接字,并且可以提供一些高级API来表达复杂的处理算法,如:map、reduce、join和window等。最后,Spark Streaming支持将处理完的数据推送到文件系统、数据库或者实时仪表盘中展示。实际上,你完全可以将Spark的机器学习(machine learning)和 图计算(graph processing)的算法应用于Spark Streaming的数据流当中。 下图展示了Spark Streaming的内部工作原理。Spark Streaming从实时数据流接入数据,再将其划分为一个个小批量供后续Spark engine处理,所以实际上,Spark Streaming是按一个个小批量来处理数据流的。 Spark Streaming为这种持续的数据流提供了的一个高级抽象,即:discretized stream(离散数据流)或者叫DStream。DStream既可以从输入数据源创建得来,如:Kafka、Flume或者Kinesis,也可以从其他DStream经一些算子操作得到。其实在内部,一个DStream就是包含了一系列RDDs。 本文档将向你展示如何用DStream进行Spark Streaming编程。Spark Streaming支持Scala、Java和Python(始于Spark 1.2),本文档的示例包括这三种语言。 注意:对Python来说,有一部分API尚不支持,或者是和Scala、Java不同。本文档中会用高亮形式来注明这部分Python API。 一个小栗子 在深入Spark Streaming编程细节之前,我们先来看看一个简单的小栗子以便有个感性认识。假设我们在一个TCP端口上监听一个数据服务器的数据,并对收到的文本数据中的单词计数。以下你所需的全部工作: Scala Java Python 首先,我们需要导入Spark Streaming的相关class的一些包,以及一些支持StreamingContext隐式转换的包(这些隐式转换能给DStream之类的class增加一些有用的方法)。StreamingContext是Spark Streaming的入口。我们将会创建一个本地 StreamingContext对象,包含两个执行线程,并将批次间隔设为1秒。 import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ // 从Spark 1.3之后这行就可以不需要了 // 创建一个local StreamingContext,包含2个工作线程,并将批次间隔设为1秒 // master至少需要2个CPU核,以避免出现任务饿死的情况 val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) 利用这个上下文对象(StreamingContext),我们可以创建一个DStream,该DStream代表从前面的TCP数据源流入的数据流,同时TCP数据源是由主机名(如:hostnam)和端口(如:9999)来描述的。 // 创建一个连接到hostname:port的DStream,如:localhost:9999 val lines = ssc.socketTextStream("localhost", 9999) 这里的 lines 就是从数据server接收到的数据流。其中每一条记录都是一行文本。接下来,我们就需要把这些文本行按空格分割成单词。 // 将每一行分割成多个单词 val words = lines.flatMap(_.split(" ")) flatMap 是一种 “一到多”(one-to-many)的映射算子,它可以将源DStream中每一条记录映射成多条记录,从而产生一个新的DStream对象。在本例中,lines中的每一行都会被flatMap映射为多个单词,从而生成新的words DStream对象。然后,我们就能对这些单词进行计数了。 import org.apache.spark.streaming.StreamingContext._ // Spark 1.3之后不再需要这行 // 对每一批次中的单词进行计数 val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // 将该DStream产生的RDD的头十个元素打印到控制台上 wordCounts.print() words这个DStream对象经过map算子(一到一的映射)转换为一个包含(word, 1)键值对的DStream对象pairs,再对pairs使用reduce算子,得到每个批次中各个单词的出现频率。最后,wordCounts.print() 将会每秒(前面设定的批次间隔)打印一些单词计数到控制台上。 注意,执行以上代码后,Spark Streaming只是将计算逻辑设置好,此时并未真正的开始处理数据。要启动之前的处理逻辑,我们还需要如下调用: ssc.start() // 启动流式计算 ssc.awaitTermination() // 等待直到计算终止 完整的代码可以在Spark Streaming的例子NetworkWordCount中找到。 如果你已经有一个Spark包(下载在这里downloaded,自定义构建在这里built),就可以执行按如下步骤运行这个例子。 首先,你需要运行netcat(Unix-like系统都会有这个小工具),将其作为data server $ nc -lk 9999 然后,在另一个终端,按如下指令执行这个例子 Scala Java Python $ ./bin/run-example streaming.NetworkWordCount localhost 9999 好了,现在你尝试可以在运行netcat的终端里敲几个单词,你会发现这些单词以及相应的计数会出现在启动Spark Streaming例子的终端屏幕上。看上去应该和下面这个示意图类似: # TERMINAL 1: # Running Netcat $ nc -lk 9999 hello world ... # TERMINAL 2: RUNNING NetworkWordCount $ ./bin/run-example streaming.NetworkWordCount localhost 9999 ... ------------------------------------------- Time: 1357008430000 ms ------------------------------------------- (hello,1) (world,1) ... 基本概念 下面,我们在之前的小栗子基础上,继续深入了解一下Spark Streaming的一些基本概念。 链接依赖项 和Spark类似,Spark Streaming也能在Maven库中找到。如果你需要编写Spark Streaming程序,你就需要将以下依赖加入到你的SBT或Maven工程依赖中。 Maven SBT <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.6.1</version> </dependency> 还有,对于从Kafka、Flume以及Kinesis这类数据源提取数据的流式应用来说,还需要额外增加相应的依赖项,下表列出了各种数据源对应的额外依赖项: 数据源 Maven工件 Kafka spark-streaming-kafka_2.10 Flume spark-streaming-flume_2.10 Kinesis spark-streaming-kinesis-asl_2.10 [Amazon Software License] Twitter spark-streaming-twitter_2.10 ZeroMQ spark-streaming-zeromq_2.10 MQTT spark-streaming-mqtt_2.10 最新的依赖项信息(包括源代码和Maven工件)请参考Maven repository。 初始化StreamingContext 要初始化任何一个Spark Streaming程序,都需要在入口代码中创建一个StreamingContext对象。 Scala Java Python AStreamingContextobject can be created from aSparkConfobject. 而StreamingContext对象需要一个SparkConf对象作为其构造参数。 import org.apache.spark._ import org.apache.spark.streaming._ val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(1)) 上面代码中的 appName 是你给该应用起的名字,这个名字会展示在Spark集群的web UI上。而 master 是Spark, Mesos or YARN cluster URL,如果支持本地测试,你也可以用”local[*]”为其赋值。通常在实际工作中,你不应该将master参数硬编码到代码里,而是应用通过spark-submit的参数来传递master的值(launch the application withspark-submit)。不过对本地测试来说,”local[*]”足够了(该值传给master后,Spark Streaming将在本地进程中,启动n个线程运行,n与本地系统CPU core数相同)。注意,StreamingContext在内部会创建一个SparkContext对象(SparkContext是所有Spark应用的入口,在StreamingContext对象中可以这样访问:ssc.sparkContext)。 StreamingContext还有另一个构造参数,即:批次间隔,这个值的大小需要根据应用的具体需求和可用的集群资源来确定。详见Spark性能调优(Performance Tuning)。 StreamingContext对象也可以通过已有的SparkContext对象来创建,示例如下: import org.apache.spark.streaming._ val sc = ... // 已有的SparkContext val ssc = new StreamingContext(sc, Seconds(1)) context对象创建后,你还需要如下步骤: 创建DStream对象,并定义好输入数据源。 基于数据源DStream定义好计算逻辑和输出。 调用streamingContext.start() 启动接收并处理数据。 调用streamingContext.awaitTermination() 等待流式处理结束(不管是手动结束,还是发生异常错误) 你可以主动调用 streamingContext.stop() 来手动停止处理流程。 需要关注的重点: 一旦streamingContext启动,就不能再对其计算逻辑进行添加或修改。 一旦streamingContext被stop掉,就不能restart。 单个JVM虚机同一时间只能包含一个active的StreamingContext。 StreamingContext.stop() 也会把关联的SparkContext对象stop掉,如果不想把SparkContext对象也stop掉,可以将StreamingContext.stop的可选参数 stopSparkContext 设为false。 一个SparkContext对象可以和多个StreamingContext对象关联,只要先对前一个StreamingContext.stop(sparkContext=false),然后再创建新的StreamingContext对象即可。 离散数据流 (DStreams) 离散数据流(DStream)是Spark Streaming最基本的抽象。它代表了一种连续的数据流,要么从某种数据源提取数据,要么从其他数据流映射转换而来。DStream内部是由一系列连续的RDD组成的,每个RDD都是不可变、分布式的数据集(详见Spark编程指南 –Spark Programming Guide)。每个RDD都包含了特定时间间隔内的一批数据,如下图所示: 任何作用于DStream的算子,其实都会被转化为对其内部RDD的操作。例如,在前面的例子中,我们将 lines 这个DStream转成words DStream对象,其实作用于lines上的flatMap算子,会施加于lines中的每个RDD上,并生成新的对应的RDD,而这些新生成的RDD对象就组成了words这个DStream对象。其过程如下图所示: 底层的RDD转换仍然是由Spark引擎来计算。DStream的算子将这些细节隐藏了起来,并为开发者提供了更为方便的高级API。后续会详细讨论这些高级算子。 输入DStream和接收器 输入DStream代表从某种流式数据源流入的数据流。在之前的例子里,lines 对象就是输入DStream,它代表从netcat server收到的数据流。每个输入DStream(除文件数据流外)都和一个接收器(Receiver –Scala doc,Java doc)相关联,而接收器则是专门从数据源拉取数据到内存中的对象。 Spark Streaming主要提供两种内建的流式数据源: 基础数据源(Basic sources): 在StreamingContext API 中可直接使用的源,如:文件系统,套接字连接或者Akka actor。 高级数据源(Advanced sources): 需要依赖额外工具类的源,如:Kafka、Flume、Kinesis、Twitter等数据源。这些数据源都需要增加额外的依赖,详见依赖链接(linking)这一节。 本节中,我们将会从每种数据源中挑几个继续深入讨论。 注意,如果你需要同时从多个数据源拉取数据,那么你就需要创建多个DStream对象(详见后续的性能调优这一小节)。多个DStream对象其实也就同时创建了多个数据流接收器。但是请注意,Spark的worker/executor 都是长期运行的,因此它们都会各自占用一个分配给Spark Streaming应用的CPU。所以,在运行Spark Streaming应用的时候,需要注意分配足够的CPU core(本地运行时,需要足够的线程)来处理接收到的数据,同时还要足够的CPU core来运行这些接收器。 要点 如果本地运行Spark Streaming应用,记得不能将master设为”local” 或 “local[1]”。这两个值都只会在本地启动一个线程。而如果此时你使用一个包含接收器(如:套接字、Kafka、Flume等)的输入DStream,那么这一个线程只能用于运行这个接收器,而处理数据的逻辑就没有线程来执行了。因此,本地运行时,一定要将master设为”local[n]”,其中 n > 接收器的个数(有关master的详情请参考Spark Properties)。 将Spark Streaming应用置于集群中运行时,同样,分配给该应用的CPU core数必须大于接收器的总数。否则,该应用就只会接收数据,而不会处理数据。 基础数据源 前面的小栗子中,我们已经看到,使用ssc.socketTextStream(…) 可以从一个TCP连接中接收文本数据。而除了TCP套接字外,StreamingContext API 还支持从文件或者Akka actor中拉取数据。 文件数据流(File Streams):可以从任何兼容HDFS API(包括:HDFS、S3、NFS等)的文件系统,创建方式如下: Scala Java Python streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory) Spark Streaming将监视该dataDirectory目录,并处理该目录下任何新建的文件(目前还不支持嵌套目录)。注意: 各个文件数据格式必须一致。 dataDirectory中的文件必须通过moving或者renaming来创建。 一旦文件move进dataDirectory之后,就不能再改动。所以如果这个文件后续还有写入,这些新写入的数据不会被读取。 对于简单的文本文件,更简单的方式是调用 streamingContext.textFileStream(dataDirectory)。 另外,文件数据流不是基于接收器的,所以不需要为其单独分配一个CPU core。 Python APIfileStream目前暂时不可用,Python目前只支持textFileStream。 基于自定义Actor的数据流(Streams based on Custom Actors):DStream可以由Akka actor创建得到,只需调用 streamingContext.actorStream(actorProps, actor-name)。详见自定义接收器(Custom Receiver Guide)。actorStream暂时不支持Python API。 RDD队列数据流(Queue of RDDs as a Stream):如果需要测试Spark Streaming应用,你可以创建一个基于一批RDD的DStream对象,只需调用 streamingContext.queueStream(queueOfRDDs)。RDD会被一个个依次推入队列,而DStream则会依次以数据流形式处理这些RDD的数据。 关于套接字、文件以及Akka actor数据流更详细信息,请参考相关文档:StreamingContextfor Scala,JavaStreamingContextfor Java, andStreamingContextfor Python。 高级数据源 Python API自 Spark 1.6.1 起,Kafka、Kinesis、Flume和MQTT这些数据源将支持Python。 使用这类数据源需要依赖一些额外的代码库,有些依赖还挺复杂的(如:Kafka、Flume)。因此为了减少依赖项版本冲突问题,各个数据源DStream的相关功能被分割到不同的代码包中,只有用到的时候才需要链接打包进来。例如,如果你需要使用Twitter的tweets作为数据源,你需要以下步骤: Linking: 将spark-streaming-twitter_2.10工件加入到SBT/Maven项目依赖中。 Programming: 导入TwitterUtils class,然后调用 TwitterUtils.createStream 创建一个DStream,具体代码见下放。 Deploying: 生成一个uber Jar包,并包含其所有依赖项(包括 spark-streaming-twitter_2.10及其自身的依赖树),再部署这个Jar包。部署详情请参考部署这一节(Deploying section)。 Scala Java import org.apache.spark.streaming.twitter._ TwitterUtils.createStream(ssc, None) 注意,高级数据源在spark-shell中不可用,因此不能用spark-shell来测试基于高级数据源的应用。如果真有需要的话,你需要自行下载相应数据源的Maven工件及其依赖项,并将这些Jar包部署到spark-shell的classpath中。 下面列举了一些高级数据源: Kafka:Spark Streaming 1.6.1 可兼容 Kafka 0.8.2.1。详见Kafka Integration Guide。 Flume:Spark Streaming 1.6.1 可兼容 Flume 1.6.0 。详见Flume Integration Guide。 Kinesis:Spark Streaming 1.6.1 可兼容 Kinesis Client Library 1.2.1。详见Kinesis Integration Guide。 Twitter:Spark Streaming TwitterUtils 使用Twitter4j 通过Twitter’s Streaming API拉取公开tweets数据流。认证信息可以用任何Twitter4j所支持的方法(methods)。你可以获取所有的公开数据流,当然也可以基于某些关键词进行过滤。示例可以参考TwitterPopularTags和TwitterAlgebirdCMS。 自定义数据源 Python API自定义数据源目前还不支持Python。 输入DStream也可以用自定义的方式创建。你需要做的只是实现一个自定义的接收器(receiver),以便从自定义的数据源接收数据,然后将数据推入Spark中。详情请参考自定义接收器指南(Custom Receiver Guide)。 接收器可靠性 从可靠性角度来划分,大致有两种数据源。其中,像Kafka、Flume这样的数据源,它们支持对所传输的数据进行确认。系统收到这类可靠数据源过来的数据,然后发出确认信息,这样就能够确保任何失败情况下,都不会丢数据。因此我们可以将接收器也相应地分为两类: 可靠接收器(Reliable Receiver)– 可靠接收器会在成功接收并保存好Spark数据副本后,向可靠数据源发送确认信息。 不可靠接收器(Unreliable Receiver)– 不可靠接收器不会发送任何确认信息。不过这种接收器常用语于不支持确认的数据源,或者不想引入数据确认的复杂性的数据源。 自定义接收器指南(Custom Receiver Guide)中详细讨论了如何写一个可靠接收器。 DStream支持的transformation算子 和RDD类似,DStream也支持从输入DStream经过各种transformation算子映射成新的DStream。DStream支持很多RDD上常见的transformation算子,一些常用的见下表: Transformation算子 用途 map(func) 返回会一个新的DStream,并将源DStream中每个元素通过func映射为新的元素 flatMap(func) 和map类似,不过每个输入元素不再是映射为一个输出,而是映射为0到多个输出 filter(func) 返回一个新的DStream,并包含源DStream中被func选中(func返回true)的元素 repartition(numPartitions) 更改DStream的并行度(增加或减少分区数) union(otherStream) 返回新的DStream,包含源DStream和otherDStream元素的并集 count() 返回一个包含单元素RDDs的DStream,其中每个元素是源DStream中各个RDD中的元素个数 reduce(func) 返回一个包含单元素RDDs的DStream,其中每个元素是通过源RDD中各个RDD的元素经func(func输入两个参数并返回一个同类型结果数据)聚合得到的结果。func必须满足结合律,以便支持并行计算。 countByValue() 如果源DStream包含的元素类型为K,那么该算子返回新的DStream包含元素为(K, Long)键值对,其中K为源DStream各个元素,而Long为该元素出现的次数。 reduceByKey(func, [numTasks]) 如果源DStream 包含的元素为 (K, V) 键值对,则该算子返回一个新的也包含(K, V)键值对的DStream,其中V是由func聚合得到的。注意:默认情况下,该算子使用Spark的默认并发任务数(本地模式为2,集群模式下由spark.default.parallelism 决定)。你可以通过可选参数numTasks来指定并发任务个数。 join(otherStream, [numTasks]) 如果源DStream包含元素为(K, V),同时otherDStream包含元素为(K, W)键值对,则该算子返回一个新的DStream,其中源DStream和otherDStream中每个K都对应一个 (K, (V, W))键值对元素。 cogroup(otherStream, [numTasks]) 如果源DStream包含元素为(K, V),同时otherDStream包含元素为(K, W)键值对,则该算子返回一个新的DStream,其中每个元素类型为包含(K, Seq[V], Seq[W])的tuple。 transform(func) 返回一个新的DStream,其包含的RDD为源RDD经过func操作后得到的结果。利用该算子可以对DStream施加任意的操作。 updateStateByKey(func) 返回一个包含新”状态”的DStream。源DStream中每个key及其对应的values会作为func的输入,而func可以用于对每个key的“状态”数据作任意的更新操作。 下面我们会挑几个transformation算子深入讨论一下。 updateStateByKey算子 updateStateByKey 算子支持维护一个任意的状态。要实现这一点,只需要两步: 定义状态 – 状态数据可以是任意类型。 定义状态更新函数 – 定义好一个函数,其输入为数据流之前的状态和新的数据流数据,且可其更新步骤1中定义的输入数据流的状态。 在每一个批次数据到达后,Spark都会调用状态更新函数,来更新所有已有key(不管key是否存在于本批次中)的状态。如果状态更新函数返回None,则对应的键值对会被删除。 举例如下。假设你需要维护一个流式应用,统计数据流中每个单词的出现次数。这里将各个单词的出现次数这个整型数定义为状态。我们接下来定义状态更新函数如下: Scala Java Python def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = { val newCount = ... // 将新的计数值和之前的状态值相加,得到新的计数值 Some(newCount) } 该状态更新函数可以作用于一个包括(word, 1) 键值对的DStream上(见本文开头的小栗子)。 val runningCounts = pairs.updateStateByKey[Int](updateFunction _) 该状态更新函数会为每个单词调用一次,且相应的newValues是一个包含很多个”1″的数组(这些1来自于(word,1)键值对),而runningCount包含之前该单词的计数。本例的完整代码请参考StatefulNetworkWordCount.scala。 注意,调用updateStateByKey前需要配置检查点目录,后续对此有详细的讨论,见检查点(checkpointing)这节。 transform算子 transform算子(及其变体transformWith)可以支持任意的RDD到RDD的映射操作。也就是说,你可以用tranform算子来包装任何DStream API所不支持的RDD算子。例如,将DStream每个批次中的RDD和另一个Dataset进行关联(join)操作,这个功能DStream API并没有直接支持。不过你可以用transform来实现这个功能,可见transform其实为DStream提供了非常强大的功能支持。比如说,你可以用事先算好的垃圾信息,对DStream进行实时过滤。 Scala Java Python val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // 包含垃圾信息的RDD val cleanedDStream = wordCounts.transform(rdd => { rdd.join(spamInfoRDD).filter(...) // 将DStream中的RDD和spamInfoRDD关联,并实时过滤垃圾数据 ... }) 注意,这里transform包含的算子,其调用时间间隔和批次间隔是相同的。所以你可以基于时间改变对RDD的操作,如:在不同批次,调用不同的RDD算子,设置不同的RDD分区或者广播变量等。 基于窗口(window)的算子 Spark Streaming同样也提供基于时间窗口的计算,也就是说,你可以对某一个滑动时间窗内的数据施加特定tranformation算子。如下图所示: 如上图所示,每次窗口滑动时,源DStream中落入窗口的RDDs就会被合并成新的windowed DStream。在上图的例子中,这个操作会施加于3个RDD单元,而滑动距离是2个RDD单元。由此可以得出任何窗口相关操作都需要指定一下两个参数: (窗口长度)window length– 窗口覆盖的时间长度(上图中为3) (滑动距离)sliding interval– 窗口启动的时间间隔(上图中为2) 注意,这两个参数都必须是DStream批次间隔(上图中为1)的整数倍. 下面咱们举个栗子。假设,你需要扩展前面的那个小栗子,你需要每隔10秒统计一下前30秒内的单词计数。为此,我们需要在包含(word, 1)键值对的DStream上,对最近30秒的数据调用reduceByKey算子。不过这些都可以简单地用一个 reduceByKeyAndWindow搞定。 Scala Java Python // 每隔10秒归约一次最近30秒的数据 val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10)) 以下列出了常用的窗口算子。所有这些算子都有前面提到的那两个参数 – 窗口长度 和 滑动距离。 Transformation窗口算子 用途 window(windowLength,slideInterval) 将源DStream窗口化,并返回转化后的DStream countByWindow(windowLength,slideInterval) 返回数据流在一个滑动窗口内的元素个数 reduceByWindow(func,windowLength,slideInterval) 基于数据流在一个滑动窗口内的元素,用func做聚合,返回一个单元素数据流。func必须满足结合律,以便支持并行计算。 reduceByKeyAndWindow(func,windowLength,slideInterval, [numTasks]) 基于(K, V)键值对DStream,将一个滑动窗口内的数据进行聚合,返回一个新的包含(K,V)键值对的DStream,其中每个value都是各个key经过func聚合后的结果。 注意:如果不指定numTasks,其值将使用Spark的默认并行任务数(本地模式下为2,集群模式下由 spark.default.parallelism决定)。当然,你也可以通过numTasks来指定任务个数。 reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval, [numTasks]) 和前面的reduceByKeyAndWindow() 类似,只是这个版本会用之前滑动窗口计算结果,递增地计算每个窗口的归约结果。当新的数据进入窗口时,这些values会被输入func做归约计算,而这些数据离开窗口时,对应的这些values又会被输入 invFunc 做”反归约”计算。举个简单的例子,就是把新进入窗口数据中各个单词个数“增加”到各个单词统计结果上,同时把离开窗口数据中各个单词的统计个数从相应的统计结果中“减掉”。不过,你的自己定义好”反归约”函数,即:该算子不仅有归约函数(见参数func),还得有一个对应的”反归约”函数(见参数中的 invFunc)。和前面的reduceByKeyAndWindow() 类似,该算子也有一个可选参数numTasks来指定并行任务数。注意,这个算子需要配置好检查点(checkpointing)才能用。 countByValueAndWindow(windowLength,slideInterval, [numTasks]) 基于包含(K, V)键值对的DStream,返回新的包含(K, Long)键值对的DStream。其中的Long value都是滑动窗口内key出现次数的计数。 和前面的reduceByKeyAndWindow() 类似,该算子也有一个可选参数numTasks来指定并行任务数。 Join相关算子 最后,值得一提的是,你在Spark Streaming中做各种关联(join)操作非常简单。 流-流(Stream-stream)关联 一个数据流可以和另一个数据流直接关联。 Scala Java Python val stream1: DStream[String, String] = ... val stream2: DStream[String, String] = ... val joinedStream = stream1.join(stream2) 上面代码中,stream1的每个批次中的RDD会和stream2相应批次中的RDD进行join。同样,你可以类似地使用 leftOuterJoin, rightOuterJoin, fullOuterJoin 等。此外,你还可以基于窗口来join不同的数据流,其实现也很简单,如下;) Scala Java Python val windowedStream1 = stream1.window(Seconds(20)) val windowedStream2 = stream2.window(Minutes(1)) val joinedStream = windowedStream1.join(windowedStream2) 流-数据集(stream-dataset)关联 其实这种情况已经在前面的DStream.transform算子中介绍过了,这里再举个基于滑动窗口的例子。 Scala Java Python val dataset: RDD[String, String] = ... val windowedStream = stream.window(Seconds(20))... val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) } 实际上,在上面代码里,你可以动态地该表join的数据集(dataset)。传给tranform算子的操作函数会在每个批次重新求值,所以每次该函数都会用最新的dataset值,所以不同批次间你可以改变dataset的值。 完整的DStream transformation算子列表见API文档。Scala请参考DStream和PairDStreamFunctions. Java请参考JavaDStream和JavaPairDStream. Python见DStream。 DStream输出算子 输出算子可以将DStream的数据推送到外部系统,如:数据库或者文件系统。因为输出算子会将最终完成转换的数据输出到外部系统,因此只有输出算子调用时,才会真正触发DStream transformation算子的真正执行(这一点类似于RDD 的action算子)。目前所支持的输出算子如下表: 输出算子 用途 print() 在驱动器(driver)节点上打印DStream每个批次中的头十个元素。 Python API对应的Python API为pprint() saveAsTextFiles(prefix, [suffix]) 将DStream的内容保存到文本文件。 每个批次一个文件,各文件命名规则为 “prefix-TIME_IN_MS[.suffix]” saveAsObjectFiles(prefix, [suffix]) 将DStream内容以序列化Java对象的形式保存到顺序文件中。 每个批次一个文件,各文件命名规则为 “prefix-TIME_IN_MS[.suffix]”Python API暂不支持Python saveAsHadoopFiles(prefix, [suffix]) 将DStream内容保存到Hadoop文件中。 每个批次一个文件,各文件命名规则为 “prefix-TIME_IN_MS[.suffix]”Python API暂不支持Python foreachRDD(func) 这是最通用的输出算子了,该算子接收一个函数func,func将作用于DStream的每个RDD上。 func应该实现将每个RDD的数据推到外部系统中,比如:保存到文件或者写到数据库中。 注意,func函数是在streaming应用的驱动器进程中执行的,所以如果其中包含RDD的action算子,就会触发对DStream中RDDs的实际计算过程。 使用foreachRDD的设计模式 DStream.foreachRDD是一个非常强大的原生工具函数,用户可以基于此算子将DStream数据推送到外部系统中。不过用户需要了解如何正确而高效地使用这个工具。以下列举了一些常见的错误。 通常,对外部系统写入数据需要一些连接对象(如:远程server的TCP连接),以便发送数据给远程系统。因此,开发人员可能会不经意地在Spark驱动器(driver)进程中创建一个连接对象,然后又试图在Spark worker节点上使用这个连接。如下例所示: Scala Python dstream.foreachRDD { rdd => val connection = createNewConnection() // 这行在驱动器(driver)进程执行 rdd.foreach { record => connection.send(record) // 而这行将在worker节点上执行 } } 这段代码是错误的,因为它需要把连接对象序列化,再从驱动器节点发送到worker节点。而这些连接对象通常都是不能跨节点(机器)传递的。比如,连接对象通常都不能序列化,或者在另一个进程中反序列化后再次初始化(连接对象通常都需要初始化,因此从驱动节点发到worker节点后可能需要重新初始化)等。解决此类错误的办法就是在worker节点上创建连接对象。 然而,有些开发人员可能会走到另一个极端 – 为每条记录都创建一个连接对象,例如: Scala Python dstream.foreachRDD { rdd => rdd.foreach { record => val connection = createNewConnection() connection.send(record) connection.close() } } 一般来说,连接对象是有时间和资源开销限制的。因此,对每条记录都进行一次连接对象的创建和销毁会增加很多不必要的开销,同时也大大减小了系统的吞吐量。一个比较好的解决方案是使用 rdd.foreachPartition – 为RDD的每个分区创建一个单独的连接对象,示例如下: Scala Python dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() } } 这样一来,连接对象的创建开销就摊到很多条记录上了。 最后,还有一个更优化的办法,就是在多个RDD批次之间复用连接对象。开发者可以维护一个静态连接池来保存连接对象,以便在不同批次的多个RDD之间共享同一组连接对象,示例如下: Scala Python dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // ConnectionPool 是一个静态的、懒惰初始化的连接池 val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // 将连接返还给连接池,以便后续复用之 } } 注意,连接池中的连接应该是懒惰创建的,并且有确定的超时时间,超时后自动销毁。这个实现应该是目前发送数据最高效的实现方式。 其他要点: DStream的转化执行也是懒惰的,需要输出算子来触发,这一点和RDD的懒惰执行由action算子触发很类似。特别地,DStream输出算子中包含的RDD action算子会强制触发对所接收数据的处理。因此,如果你的Streaming应用中没有输出算子,或者你用了dstream.foreachRDD(func)却没有在func中调用RDD action算子,那么这个应用只会接收数据,而不会处理数据,接收到的数据最后只是被简单地丢弃掉了。 默认地,输出算子只能一次执行一个,且按照它们在应用程序代码中定义的顺序执行。 转载自 并发编程网 - ifeve.com

优秀的个人博客,低调大师

《Spark 官方文档》机器学习库(MLlib)指南

机器学习库(MLlib)指南 MLlib是Spark的机器学习(ML)库。旨在简化机器学习的工程实践工作,并方便扩展到更大规模。MLlib由一些通用的学习算法和工具组成,包括分类、回归、聚类、协同过滤、降维等,同时还包括底层的优化原语和高层的管道API。 MLllib目前分为两个代码包: spark.mllib包含基于RDD的原始算法API。 spark.ml则提供了基于DataFrames高层次的API,可以用来构建机器学习管道。 我们推荐您使用spark.ml,因为基于DataFrames的API更加的通用而且灵活。不过我们也会继续支持spark.mllib包。用户可以放心使用,spark.mllib还会持续地增加新的功能。不过开发者需要注意,如果新的算法能够适用于机器学习管道的概念,就应该将其放到spark.ml包中,如:特征提取器和转换器。 下面的列表列出了两个包的主要功能。 spark.mllib: 数据类型,算法以及工具 Data types(数据类型) Basic statistics(基础统计) summary statistics(摘要统计) correlations(相关性) stratified sampling(分层抽样) hypothesis testing(假设检验) streaming significance testing random data generation(随机数据生成) Classification and regression(分类和回归) linear models (SVMs, logistic regression, linear regression)(线性模型(SVM,逻辑回归,线性回归)) naive Bayes(朴素贝叶斯) decision trees(决策树) ensembles of trees (Random Forests and Gradient-Boosted Trees)(树套装(随机森林和梯度提升决策树)) isotonic regression(保序回归) Collaborative filtering(协同过滤) alternating least squares (ALS)(交替最小二乘(ALS)) Clustering(聚类) k-means(K-均值) Gaussian mixture(高斯混合) power iteration clustering (PIC)(幂迭代聚类(PIC)) latent Dirichlet allocation (LDA)(隐含狄利克雷分配) bisecting k-means(平分K-均值) streaming k-means(流式K-均值) Dimensionality reduction(降维) singular value decomposition (SVD)(奇异值分解(SVD)) principal component analysis (PCA)(主成分分析(PCA)) Feature extraction and transformation(特征抽取和转换) Frequent pattern mining(频繁模式挖掘) FP-growth(FP-增长) association rules(关联规则) PrefixSpan(PrefixSpan) Evaluation metrics(评价指标) PMML model export(PMML模型导出) Optimization (developer)(优化(开发者)) stochastic gradient descent(随机梯度下降) limited-memory BFGS (L-BFGS)(有限的记忆BFGS(L-BFGS)) spark.ml: 机器学习管道高级API Overview: estimators, transformers and pipelines(概览:评估器,转换器和管道) Extracting, transforming and selecting features(抽取,转换和选取特征) Classification and regression(分类和回归) Clustering(聚类) Advanced topics(高级主题) 虽然还有些降维技术在spark.ml中尚不可用,不过用户可以将spark.mllib中的的相关实现和spark.ml中的算法无缝地结合起来。 依赖项 MLlib使用的线性代数代码包是Breeze,而Breeze又依赖于netlib-java优化的数值处理。如果在运行时环境中这些原生库不可用,你将会收到一条警告,而后spark会使用纯JVM实现来替代之。 由于许可限制的原因,spark在默认情况下不会包含netlib-java的原生代理库。如果需要配置netlib-java/Breeze使用其系统优化库,你需要添加依赖项:com.github.fommil.netlib:all:1.1.2(或者在编译时加上参数:-Pnetlib-lgpl),然后再看一看netlib-java相应的安装文档。 要使用MLlib的Python接口,你需要安装NumPy1.4以上的版本。 迁移指南 MLlib目前还在积极的开发当中。所以标记为 Experimental / DeveloperApi 的接口可能在未来发生变化,下面的迁移指南说明了版本升级后的变化。 从1.5升级到1.6 从1.5到1.6,spark.mllib 和 spark.ml 包中并没有重大的API变化,不过有一些行为不再支持或者发生变化。 已经废弃: SPARK-11358: spark.mllib.clustering.KMeans 的runs参数已经废弃 SPARK-10592: spark.ml.classification.LogisticRegressionModel和spark.ml.regresion.LinearRegressionModel 中,weights字段改名为coefficients。这一变动有助于消除歧义,可以和输入给算法的实例(行)权重(weights)区分开来。 行为有变: SPARK-7770:spark.mllib.tree.GradientBoostedTrees:validationTol的语义在1.6中有变。原先其代表误差变化绝对值的一个阈值,而现在它类似于GradientDescent中的convergenceTol:对于较大的误差,使用相对误差(相对于上一次);而对于较小的误差(<0.01),使用绝对误差。 SPARK-11069:spark.ml.feature.RegexTokenizer:以前,在分词之前不会讲字符串转小写。现在的实现是,默认会将字符串转小写,不过有选项可以设为不转。这中实现和Tokenizertransformer的行为相匹配。 Spark老版本 以前版本的迁移指南归档在这里:on this page 要了解更多有关系统优化的好处和背景资料,可以看看Sam Halliday关于ScalaX的演讲:High Performance Linear Algebra in Scala 转载自并发编程网 - ifeve.com

优秀的个人博客,低调大师

《Spark官方文档》Spark Streaming编程指南(二)

累加器和广播变量 首先需要注意的是,累加器(Accumulators)和广播变量(Broadcast variables)是无法从Spark Streaming的检查点中恢复回来的。所以如果你开启了检查点功能,并同时在使用累加器和广播变量,那么你最好是使用懒惰实例化的单例模式,因为这样累加器和广播变量才能在驱动器(driver)故障恢复后重新实例化。代码示例如下: Scala Java Python object WordBlacklist { @volatile private var instance: Broadcast[Seq[String]] = null def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { if (instance == null) { synchronized { if (instance == null) { val wordBlacklist = Seq("a", "b", "c") instance = sc.broadcast(wordBlacklist) } } } instance } } object DroppedWordsCounter { @volatile private var instance: Accumulator[Long] = null def getInstance(sc: SparkContext): Accumulator[Long] = { if (instance == null) { synchronized { if (instance == null) { instance = sc.accumulator(0L, "WordsInBlacklistCounter") } } } instance } } wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => { // 获取现有或注册新的blacklist广播变量 val blacklist = WordBlacklist.getInstance(rdd.sparkContext) // 获取现有或注册新的 droppedWordsCounter 累加器 val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) // 基于blacklist来过滤词,并将过滤掉的词的个数累加到 droppedWordsCounter 中 val counts = rdd.filter { case (word, count) => if (blacklist.value.contains(word)) { droppedWordsCounter += count false } else { true } }.collect() val output = "Counts at time " + time + " " + counts }) 这里有完整代码:source code。 DataFrame和SQL相关算子 在Streaming应用中可以调用DataFrames and SQL来处理流式数据。开发者可以用通过StreamingContext中的SparkContext对象来创建一个SQLContext,并且,开发者需要确保一旦驱动器(driver)故障恢复后,该SQLContext对象能重新创建出来。同样,你还是可以使用懒惰创建的单例模式来实例化SQLContext,如下面的代码所示,这里我们将最开始的那个小栗子做了一些修改,使用DataFrame和SQL来统计单词计数。其实就是,将每个RDD都转化成一个DataFrame,然后注册成临时表,再用SQL查询这些临时表。 Scala Java Python /** streaming应用中调用DataFrame算子 */ val words: DStream[String] = ... words.foreachRDD { rdd => // 获得SQLContext单例 val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) import sqlContext.implicits._ // 将RDD[String] 转为 DataFrame val wordsDataFrame = rdd.toDF("word") // DataFrame注册为临时表 wordsDataFrame.registerTempTable("words") // 再用SQL语句查询,并打印出来 val wordCountsDataFrame = sqlContext.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show() } See the fullsource code. 这里有完整代码:source code。 你也可以在其他线程里执行SQL查询(异步查询,即:执行SQL查询的线程和运行StreamingContext的线程不同)。不过这种情况下,你需要确保查询的时候 StreamingContext 没有把所需的数据丢弃掉,否则StreamingContext有可能已将老的RDD数据丢弃掉了,那么异步查询的SQL语句也可能无法得到查询结果。举个栗子,如果你需要查询上一个批次的数据,但是你的SQL查询可能要执行5分钟,那么你就需要StreamingContext至少保留最近5分钟的数据:streamingContext.remember(Minutes(5)) (这是Scala为例,其他语言差不多) 更多DataFrame和SQL的文档见这里:DataFrames and SQL MLlib算子 MLlib提供了很多机器学习算法。首先,你需要关注的是流式计算相关的机器学习算法(如:Streaming Linear Regression,Streaming KMeans),这些流式算法可以在流式数据上一边学习训练模型,一边用最新的模型处理数据。除此以外,对更多的机器学习算法而言,你需要离线训练这些模型,然后将训练好的模型用于在线的流式数据。详见MLlib。 缓存/持久化 和RDD类似,DStream也支持将数据持久化到内存中。只需要调用 DStream的persist() 方法,该方法内部会自动调用DStream中每个RDD的persist方法进而将数据持久化到内存中。这对于可能需要计算很多次的DStream非常有用(例如:对于同一个批数据调用多个算子)。对于基于滑动窗口的算子,如:reduceByWindow和reduceByKeyAndWindow,或者有状态的算子,如:updateStateByKey,数据持久化就更重要了。因此,滑动窗口算子产生的DStream对象默认会自动持久化到内存中(不需要开发者调用persist)。 对于从网络接收数据的输入数据流(如:Kafka、Flume、socket等),默认的持久化级别会将数据持久化到两个不同的节点上互为备份副本,以便支持容错。 注意,与RDD不同的是,DStream的默认持久化级别是将数据序列化到内存中。进一步的讨论见性能调优这一小节。关于持久化级别(或者存储级别)的更详细说明见Spark编程指南(Spark Programming Guide)。 检查点 一般来说Streaming 应用都需要7*24小时长期运行,所以必须对一些与业务逻辑无关的故障有很好的容错(如:系统故障、JVM崩溃等)。对于这些可能性,Spark Streaming 必须在检查点保存足够的信息到一些可容错的外部存储系统中,以便能够随时从故障中恢复回来。所以,检查点需要保存以下两种数据: 元数据检查点(Metadata checkpointing)– 保存流式计算逻辑的定义信息到外部可容错存储系统(如:HDFS)。主要用途是用于在故障后回复应用程序本身(后续详谈)。元数包括: Configuration– 创建Streaming应用程序的配置信息。 DStream operations– 定义流式处理逻辑的DStream操作信息。 Incomplete batches– 已经排队但未处理完的批次信息。 数据检查点(Data checkpointing)– 将生成的RDD保存到可靠的存储中。这对一些需要跨批次组合数据或者有状态的算子来说很有必要。在这种转换算子中,往往新生成的RDD是依赖于前几个批次的RDD,因此随着时间的推移,有可能产生很长的依赖链条。为了避免在恢复数据的时候需要恢复整个依赖链条上所有的数据,检查点需要周期性地保存一些中间RDD状态信息,以斩断无限制增长的依赖链条和恢复时间。 总之,元数据检查点主要是为了恢复驱动器节点上的故障,而数据或RDD检查点是为了支持对有状态转换操作的恢复。 何时启用检查点 如果有以下情况出现,你就必须启用检查点了: 使用了有状态的转换算子(Usage of stateful transformations)– 不管是用了 updateStateByKey 还是用了 reduceByKeyAndWindow(有”反归约”函数的那个版本),你都必须配置检查点目录来周期性地保存RDD检查点。 支持驱动器故障中恢复(Recovering from failures of the driver running the application)– 这时候需要元数据检查点以便恢复流式处理的进度信息。 注意,一些简单的流式应用,如果没有用到前面所说的有状态转换算子,则完全可以不开启检查点。不过这样的话,驱动器(driver)故障恢复后,有可能会丢失部分数据(有些已经接收但还未处理的数据可能会丢失)。不过通常这点丢失时可接受的,很多Spark Streaming应用也是这样运行的。对非Hadoop环境的支持未来还会继续改进。 如何配置检查点 检查点的启用,只需要设置好保存检查点信息的检查点目录即可,一般会会将这个目录设为一些可容错的、可靠性较高的文件系统(如:HDFS、S3等)。开发者只需要调用 streamingContext.checkpoint(checkpointDirectory)。设置好检查点,你就可以使用前面提到的有状态转换算子了。另外,如果你需要你的应用能够支持从驱动器故障中恢复,你可能需要重写部分代码,实现以下行为: 如果程序是首次启动,就需要new一个新的StreamingContext,并定义好所有的数据流处理,然后调用StreamingContext.start()。 如果程序是故障后重启,就需要从检查点目录中的数据中重新构建StreamingContext对象。 Scala Java Python 不过这个行为可以用StreamingContext.getOrCreate来实现,示例如下: // 首次创建StreamingContext并定义好数据流处理逻辑 def functionToCreateContext(): StreamingContext = { val ssc = new StreamingContext(...) // 新建一个StreamingContext对象 val lines = ssc.socketTextStream(...) // 创建DStreams ... ssc.checkpoint(checkpointDirectory) // 设置好检查点目录 ssc } // 创建新的StreamingContext对象,或者从检查点构造一个 val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) // 无论是否是首次启动都需要设置的工作在这里 context. ... // 启动StreamingContext对象 context.start() context.awaitTermination() 如果 checkpointDirectory 目录存在,则context对象会从检查点数据重新构建出来。如果该目录不存在(如:首次运行),则 functionToCreateContext 函数会被调用,创建一个新的StreamingContext对象并定义好DStream数据流。完整的示例请参见RecoverableNetworkWordCount,这个例子会将网络数据中的单词计数统计结果添加到一个文件中。 除了使用getOrCreate之外,开发者还需要确保驱动器进程能在故障后重启。这一点只能由应用的部署环境基础设施来保证。进一步的讨论见部署(Deployment)这一节。 另外需要注意的是,RDD检查点会增加额外的保存数据的开销。这可能会导致数据流的处理时间变长。因此,你必须仔细的调整检查点间隔时间。如果批次间隔太小(比如:1秒),那么对每个批次保存检查点数据将大大减小吞吐量。另一方面,检查点保存过于频繁又会导致血统信息和任务个数的增加,这同样会影响系统性能。对于需要RDD检查点的有状态转换算子,默认的间隔是批次间隔的整数倍,且最小10秒。开发人员可以这样来自定义这个间隔:dstream.checkpoint(checkpointInterval)。一般推荐设为批次间隔时间的5~10倍。 部署应用 本节中将主要讨论一下如何部署Spark Streaming应用。 前提条件 要运行一个Spark Streaming 应用,你首先需要具备以下条件: 集群以及集群管理器 – 这是一般Spark应用的基本要求,详见deployment guide。 给Spark应用打个JAR包 – 你需要将你的应用打成一个JAR包。如果使用spark-submit提交应用,那么你不需要提供Spark和Spark Streaming的相关JAR包。但是,如果你使用了高级数据源(advanced sources– 如:Kafka、Flume、Twitter等),那么你需要将这些高级数据源相关的JAR包及其依赖一起打包并部署。例如,如果你使用了TwitterUtils,那么就必须将spark-streaming-twitter_2.10及其相关依赖都打到应用的JAR包中。 为执行器(executor)预留足够的内存 – 执行器必须配置预留好足够的内存,因为接受到的数据都得存在内存里。注意,如果某些窗口长度达到10分钟,那也就是说你的系统必须知道保留10分钟的数据在内存里。可见,到底预留多少内存是取决于你的应用处理逻辑的。 配置检查点 – 如果你的流式应用需要检查点,那么你需要配置一个Hadoop API兼容的可容错存储目录作为检查点目录,流式应用的信息会写入这个目录,故障恢复时会用到这个目录下的数据。详见前面的检查点小节。 配置驱动程序自动重启 – 流式应用自动恢复的前提就是,部署基础设施能够监控驱动器进程,并且能够在其故障时,自动重启之。不同的集群管理器有不同的工具来实现这一功能: Spark独立部署 – Spark独立部署集群可以支持将Spark应用的驱动器提交到集群的某个worker节点上运行。同时,Spark的集群管理器可以对该驱动器进程进行监控,一旦驱动器退出且返回非0值,或者因worker节点原始失败,Spark集群管理器将自动重启这个驱动器。详见Spark独立部署指南(Spark Standalone guide)。 YARN – YARN支持和独立部署类似的重启机制。详细请参考YARN的文档。 Mesos – Mesos上需要用Marathon来实现这一功能。 配置WAL(write ahead log)- 从Spark 1.2起,我们引入了write ahead log来提高容错性。如果启用这个功能,则所有接收到的数据都会以write ahead log形式写入配置好的检查点目录中。这样就能确保数据零丢失(容错语义有详细的讨论)。用户只需将 spark.streaming.receiver.writeAheadLog 设为true。不过,这同样可能会导致接收器的吞吐量下降。不过你可以启动多个接收器并行接收数据,从而提升整体的吞吐量(more receivers in parallel)。另外,建议在启用WAL后禁用掉接收数据多副本功能,因为WAL其实已经是存储在一个多副本存储系统中了。你只需要把存储级别设为 StorageLevel.MEMORY_AND_DISK_SER。如果是使用S3(或者其他不支持flushing的文件系统)存储WAL,一定要记得启用这两个标识:spark.streaming.driver.writeAheadLog.closeFileAfterWrite 和 spark.streaming.receiver.writeAheadLog.closeFileAfterWrite。更详细请参考:Spark Streaming Configuration。 设置好最大接收速率 – 如果集群可用资源不足以跟上接收数据的速度,那么可以在接收器设置一下最大接收速率,即:每秒接收记录的条数。相关的主要配置有:spark.streaming.receiver.maxRate,如果使用Kafka Direct API 还需要设置 spark.streaming.kafka.maxRatePerPartition。从Spark 1.5起,我们引入了backpressure的概念来动态地根据集群处理速度,评估并调整该接收速率。用户只需将 spark.streaming.backpressure.enabled设为true即可启用该功能。 升级应用代码 升级Spark Streaming应用程序代码,可以使用以下两种方式: 新的Streaming程序和老的并行跑一段时间,新程序完成初始化以后,再关闭老的。注意,这种方式适用于能同时发送数据到多个目标的数据源(即:数据源同时将数据发给新老两个Streaming应用程序)。 老程序能够优雅地退出(参考StreamingContext.stop(...)orJavaStreamingContext.stop(...)),即:确保所收到的数据都已经处理完毕后再退出。然后再启动新的Streaming程序,而新程序将接着在老程序退出点上继续拉取数据。注意,这种方式需要数据源支持数据缓存(或者叫数据堆积,如:Kafka、Flume),因为在新旧程序交接的这个空档时间,数据需要在数据源处缓存。目前还不能支持从检查点重启,因为检查点存储的信息包含老程序中的序列化对象信息,在新程序中将其反序列化可能会出错。这种情况下,只能要么指定一个新的检查点目录,要么删除老的检查点目录。 应用监控 除了Spark自身的监控能力(monitoring capabilities)之外,对Spark Streaming还有一些额外的监控功能可用。如果实例化了StreamingContext,那么你可以在Spark web UI上看到多出了一个Streaming tab页,上面显示了正在运行的接收器(是否活跃,接收记录的条数,失败信息等)和处理完的批次信息(批次处理时间,查询延时等)。这些信息都可以用来监控streaming应用。 web UI上有两个度量特别重要: 批次处理耗时(Processing Time)– 处理单个批次耗时 批次调度延时(Scheduling Delay)-各批次在队列中等待时间(等待上一个批次处理完) 如果批次处理耗时一直比批次间隔时间大,或者批次调度延时持续上升,就意味着系统处理速度跟不上数据接收速度。这时候你就得考虑一下怎么把批次处理时间降下来(reducing)。 Spark Streaming程序的处理进度可以用StreamingListener接口来监听,这个接口可以监听到接收器的状态和处理时间。不过需要注意的是,这是一个developer API接口,换句话说这个接口未来很可能会变动(可能会增加更多度量信息)。 性能调优 要获得Spark Streaming应用的最佳性能需要一点点调优工作。本节将深入解释一些能够改进Streaming应用性能的配置和参数。总体上来说,你需要考虑这两方面的事情: 提高集群资源利用率,减少单批次处理耗时。 设置合适的批次大小,以便使数据处理速度能跟上数据接收速度。 减少批次处理时间 有不少优化手段都可以减少Spark对每个批次的处理时间。细节将在优化指南(Tuning Guide)中详谈。这里仅列举一些最重要的。 数据接收并发度 跨网络接收数据(如:从Kafka、Flume、socket等接收数据)需要在Spark中序列化并存储数据。 如果接收数据的过程是系统瓶颈,那么可以考虑增加数据接收的并行度。注意,每个输入DStream只包含一个单独的接收器(receiver,运行约worker节点),每个接收器单独接收一路数据流。所以,配置多个输入DStream就能从数据源的不同分区分别接收多个数据流。例如,可以将从Kafka拉取两个topic的数据流分成两个Kafka输入数据流,每个数据流拉取其中一个topic的数据,这样一来会同时有两个接收器并行地接收数据,因而增加了总体的吞吐量。同时,另一方面我们又可以把这些DStream数据流合并成一个,然后可以在合并后的DStream上使用任何可用的transformation算子。示例代码如下: Scala Java Python val numStreams = 5 val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) } val unifiedStream = streamingContext.union(kafkaStreams) unifiedStream.print() 另一个可以考虑优化的参数就是接收器的阻塞间隔,该参数由配置参数(configuration parameter)spark.streaming.blockInterval决定。大多数接收器都会将数据合并成一个个数据块,然后再保存到spark内存中。对于map类算子来说,每个批次中数据块的个数将会决定处理这批数据并行任务的个数,每个接收器每批次数据处理任务数约等于 (批次间隔 / 数据块间隔)。例如,对于2秒的批次间隔,如果数据块间隔为200ms,则创建的并发任务数为10。如果任务数太少(少于单机cpu core个数),则资源利用不够充分。如需增加这个任务数,对于给定的批次间隔来说,只需要减少数据块间隔即可。不过,我们还是建议数据块间隔至少要50ms,否则任务的启动开销占比就太高了。 另一个切分接收数据流的方法是,显示地将输入数据流划分为多个分区(使用 inputStream.repartition(<number of partitions>))。该操作会在处理前,将数据散开重新分发到集群中多个节点上。 数据处理并发度 在计算各个阶段(stage)中,任何一个阶段的并发任务数不足都有可能造成集群资源利用率低。例如,对于reduce类的算子,如:reduceByKey 和 reduceByKeyAndWindow,其默认的并发任务数是由 spark.default.parallelism 决定的。你既可以修改这个默认值(spark.default.parallelism),也可以通过参数指定这个并发数量(见PairDStreamFunctions)。 数据序列化 调整数据的序列化格式可以大大减少数据序列化的开销。在spark Streaming中主要有两种类型的数据需要序列化: 输入数据: 默认地,接收器收到的数据是以StorageLevel.MEMORY_AND_DISK_SER_2的存储级别存储到执行器(executor)内存中的。也就是说,收到的数据会被序列化以减少GC开销,同时保存两个副本以容错。同时,数据会优先保存在内存里,当内存不足时才吐出到磁盘上。很明显,这个过程中会有数据序列化的开销 – 接收器首先将收到的数据反序列化,然后再以spark所配置指定的格式来序列化数据。 Streaming算子所生产的持久化的RDDs: Streaming计算所生成的RDD可能会持久化到内存中。例如,基于窗口的算子会将数据持久化到内存,因为窗口数据可能会多次处理。所不同的是,spark core默认用StorageLevel.MEMORY_ONLY级别持久化RDD数据,而spark streaming默认使用StorageLevel.MEMORY_ONLY_SER级别持久化接收到的数据,以便尽量减少GC开销。 不管是上面哪一种数据,都可以使用Kryo序列化来减少CPU和内存开销,详见Spark Tuning Guide。另,对于Kryo,你可以考虑这些优化:注册自定义类型,禁用对象引用跟踪(详见Configuration Guide)。 在一些特定的场景下,如果数据量不是很大,那么你可以考虑不用序列化格式,不过你需要注意的是取消序列化是否会导致大量的GC开销。例如,如果你的批次间隔比较短(几秒)并且没有使用基于窗口的算子,这种情况下你可以考虑禁用序列化格式。这样可以减少序列化的CPU开销以优化性能,同时GC的增长也不多。 任务启动开销 如果每秒启动的任务数过多(比如每秒50个以上),那么将任务发送给slave节点的开销会明显增加,那么你也就很难达到亚秒级(sub-second)的延迟。不过以下两个方法可以减少任务的启动开销: 任务序列化(Task Serialization): 使用Kryo来序列化任务,以减少任务本身的大小,从而提高发送任务的速度。任务的序列化格式是由 spark.closure.serializer 属性决定的。不过,目前还不支持闭包序列化,未来的版本可能会增加对此的支持。 执行模式(Execution mode): Spark独立部署或者Mesos粗粒度模式下任务的启动时间比Mesos细粒度模式下的任务启动时间要短。详见Running on Mesos guide。 这些调整有可能能够减少100ms的批次处理时间,这也使得亚秒级的批次间隔成为可能。 设置合适的批次间隔 要想streaming应用在集群上稳定运行,那么系统处理数据的速度必须能跟上其接收数据的速度。换句话说,批次数据的处理速度应该和其生成速度一样快。对于特定的应用来说,可以从其对应的监控(monitoring)页面上观察验证,页面上显示的处理耗时应该要小于批次间隔时间。 根据spark streaming计算的性质,在一定的集群资源限制下,批次间隔的值会极大地影响系统的数据处理能力。例如,在WordCountNetwork示例中,对于特定的数据速率,一个系统可能能够在批次间隔为2秒时跟上数据接收速度,但如果把批次间隔改为500毫秒系统可能就处理不过来了。所以,批次间隔需要谨慎设置,以确保生产系统能够处理得过来。 要找出适合的批次间隔,你可以从一个比较保守的批次间隔值(如5~10秒)开始测试。要验证系统是否能跟上当前的数据接收速率,你可能需要检查一下端到端的批次处理延迟(可以看看Spark驱动器log4j日志中的Total delay,也可以用StreamingListener接口来检测)。如果这个延迟能保持和批次间隔差不多,那么系统基本就是稳定的。否则,如果这个延迟持久在增长,也就是说系统跟不上数据接收速度,那也就意味着系统不稳定。一旦系统文档下来后,你就可以尝试提高数据接收速度,或者减少批次间隔值。不过需要注意,瞬间的延迟增长可以只是暂时的,只要这个延迟后续会自动降下来就没有问题(如:降到小于批次间隔值) 内存调优 Spark应用内存占用和GC调优已经在调优指南(Tuning Guide)中有详细的讨论。墙裂建议你读一读那篇文档。本节中,我们只是讨论一下几个专门用于Spark Streaming的调优参数。 Spark Streaming应用在集群中占用的内存量严重依赖于具体所使用的tranformation算子。例如,如果想要用一个窗口算子操纵最近10分钟的数据,那么你的集群至少需要在内存里保留10分钟的数据;另一个例子是updateStateByKey,如果key很多的话,相对应的保存的key的state也会很多,而这些都需要占用内存。而如果你的应用只是做一个简单的 “映射-过滤-存储”(map-filter-store)操作的话,那需要的内存就很少了。 一般情况下,streaming接收器接收到的数据会以StorageLevel.MEMORY_AND_DISK_SER_2 这个存储级别存到spark中,也就是说,如果内存装不下,数据将被吐到磁盘上。数据吐到磁盘上会大大降低streaming应用的性能,因此还是建议根据你的应用处理的数据量,提供充足的内存。最好就是,一边小规模地放大内存,再观察评估,然后再放大,再评估。 另一个内存调优的方向就是垃圾回收。因为streaming应用往往都需要低延迟,所以肯定不希望出现大量的或耗时较长的JVM垃圾回收暂停。 以下是一些能够帮助你减少内存占用和GC开销的参数或手段: DStream持久化级别(Persistence Level of DStreams): 前面数据序列化(Data Serialization)这小节已经提到过,默认streaming的输入RDD会被持久化成序列化的字节流。相对于非序列化数据,这样可以减少内存占用和GC开销。如果启用Kryo序列化,还能进一步减少序列化数据大小和内存占用量。如果你还需要进一步减少内存占用的话,可以开启数据压缩(通过spark.rdd.compress这个配置设定),只不过数据压缩会增加CPU消耗。 清除老数据(Clearing old data): 默认情况下,所有的输入数据以及DStream的transformation算子产生的持久化RDD都是自动清理的。Spark Streaming会根据所使用的transformation算子来清理老数据。例如,你用了一个窗口操作处理最近10分钟的数据,那么Spark Streaming会保留至少10分钟的数据,并且会主动把更早的数据都删掉。当然,你可以设置 streamingContext.remember 以保留更长时间段的数据(比如:你可能会需要交互式地查询更老的数据)。 CMS垃圾回收器(CMS Garbage Collector): 为了尽量减少GC暂停的时间,我们墙裂建议使用CMS垃圾回收器(concurrent mark-and-sweep GC)。虽然CMS GC会稍微降低系统的总体吞吐量,但我们仍建议使用它,因为CMS GC能使批次处理的时间保持在一个比较恒定的水平上。最后,你需要确保在驱动器(通过spark-submit中的–driver-java-options设置)和执行器(使用spark.executor.extraJavaOptions配置参数)上都设置了CMS GC。 其他提示: 如果还想进一步减少GC开销,以下是更进一步的可以尝试的手段: 配合Tachyon使用堆外内存来持久化RDD。详见Spark编程指南(Spark Programming Guide) 使用更多但是更小的执行器进程。这样GC压力就会分散到更多的JVM堆中。 容错语义 本节中,我们将讨论Spark Streaming应用在出现失败时的具体行为。 背景 要理解Spark Streaming所提供的容错语义,我们首先需要回忆一下Spark RDD所提供的基本容错语义。 RDD是不可变的,可重算的,分布式数据集。每个RDD都记录了其创建算子的血统信息,其中每个算子都以可容错的数据集作为输入数据。 如果RDD的某个分区因为节点失效而丢失,则该分区可以根据RDD的血统信息以及相应的原始输入数据集重新计算出来。 假定所有RDD transformation算子计算过程都是确定性的,那么通过这些算子得到的最终RDD总是包含相同的数据,而与Spark集群的是否故障无关。 Spark主要操作一些可容错文件系统的数据,如:HDFS或S3。因此,所有从这些可容错数据源产生的RDD也是可容错的。然而,对于Spark Streaming并非如此,因为多数情况下Streaming需要从网络远端接收数据,这回导致Streaming的数据源并不可靠(尤其是对于使用了fileStream的应用)。要实现RDD相同的容错属性,数据接收就必须用多个不同worker节点上的Spark执行器来实现(默认副本因子是2)。因此一旦出现故障,系统需要恢复两种数据: 接收并保存了副本的数据– 数据不会因为单个worker节点故障而丢失,因为有副本! 接收但尚未保存副本数据– 因为数据并没有副本,所以一旦故障,只能从数据源重新获取。 此外,还有两种可能的故障类型需要考虑: Worker节点故障– 任何运行执行器的worker节点一旦故障,节点上内存中的数据都会丢失。如果这些节点上有接收器在运行,那么其包含的缓存数据也会丢失。 Driver节点故障– 如果Spark Streaming的驱动节点故障,那么很显然SparkContext对象就没了,所有执行器及其内存数据也会丢失。 有了以上这些基本知识,下面我们就进一步了解一下Spark Streaming的容错语义。 定义 流式系统的可靠度语义可以据此来分类:单条记录在系统中被处理的次数保证。一个流式系统可能提供保证必定是以下三种之一(不管系统是否出现故障): 至多一次(At most once): 每条记录要么被处理一次,要么就没有处理。 至少一次(At least once): 每条记录至少被处理过一次(一次或多次)。这种保证能确保没有数据丢失,比“至多一次”要强。但有可能出现数据重复。 精确一次(Exactly once): 每条记录都精确地只被处理一次 – 也就是说,既没有数据丢失,也不会出现数据重复。这是三种保证中最强的一种。 基础语义 任何流式处理系统一般都会包含以下三个数据处理步骤: 数据接收(Receiving the data): 从数据源拉取数据。 数据转换(Transforming the data): 将接收到的数据进行转换(使用DStream和RDD transformation算子)。 数据推送(Pushing out the data): 将转换后最终数据推送到外部文件系统,数据库或其他展示系统。 如果Streaming应用需要做到端到端的“精确一次”的保证,那么就必须在以上三个步骤中各自都保证精确一次:即,每条记录必须,只接收一次、处理一次、推送一次。下面让我们在Spark Streaming的上下文环境中来理解一下这三个步骤的语义: 数据接收: 不同数据源提供的保证不同,下一节再详细讨论。 数据转换: 所有的数据都会被“精确一次”处理,这要归功于RDD提供的保障。即使出现故障,只要数据源还能访问,最终所转换得到的RDD总是包含相同的内容。 数据推送: 输出操作默认保证“至少一次”的语义,是否能“精确一次”还要看所使用的输出算子(是否幂等)以及下游系统(是否支持事务)。不过用户也可以开发自己的事务机制来实现“精确一次”语义。这个后续会有详细讨论。 接收数据语义 不同的输入源提供不同的数据可靠性级别,从“至少一次”到“精确一次”。 从文件接收数据 如果所有的输入数据都来源于可容错的文件系统,如HDFS,那么Spark Streaming就能在任何故障中恢复并处理所有的数据。这种情况下就能保证精确一次语义,也就是说不管出现什么故障,所有的数据总是精确地只处理一次,不多也不少。 基于接收器接收数据 对于基于接收器的输入源,容错语义将同时依赖于故障场景和接收器类型。前面也已经提到过,spark Streaming主要有两种类型的接收器: 可靠接收器– 这类接收器会在数据接收并保存好副本后,向可靠数据源发送确认信息。这类接收器故障时,是不会给缓存的(已接收但尚未保存副本)数据发送确认信息。因此,一旦接收器重启,没有收到确认的数据,会重新从数据源再获取一遍,所以即使有故障也不会丢数据。 不可靠接收器– 这类接收器不会发送确认信息,因此一旦worker和driver出现故障,就有可能会丢失数据。 对于不同的接收器,我们可以获得如下不同的语义。如果一个worker节点故障了,对于可靠接收器来书,不会有数据丢失。而对于不可靠接收器,缓存的(接收但尚未保存副本)数据可能会丢失。如果driver节点故障了,除了接收到的数据之外,其他的已经接收且已经保存了内存副本的数据都会丢失,这将会影响有状态算子的计算结果。 为了避免丢失已经收到且保存副本的数,从 spark 1.2 开始引入了WAL(write ahead logs),以便将这些数据写入到可容错的存储中。只要你使用可靠接收器,同时启用WAL(write ahead logs enabled),那么久再也不用为数据丢失而担心了。并且这时候,还能提供“至少一次”的语义保证。 下表总结了故障情况下的各种语义: 部署场景 Worker 故障 Driver 故障 Spark 1.1及以前版本 或者 Spark 1.2及以后版本,且未开启WAL 若使用不可靠接收器,则可能丢失缓存(已接收但尚未保存副本)数据; 若使用可靠接收器,则没有数据丢失,且提供至少一次处理语义 若使用不可靠接收器,则缓存数据和已保存数据都可能丢失; 若使用可靠接收器,则没有缓存数据丢失,但已保存数据可能丢失,且不提供语义保证 Spark 1.2及以后版本,并启用WAL 若使用可靠接收器,则没有数据丢失,且提供至少一次语义保证 若使用可靠接收器和文件,则无数据丢失,且提供至少一次语义保证 从Kafka Direct API接收数据 从Spark 1.3开始,我们引入Kafka Direct API,该API能为Kafka数据源提供“精确一次”语义保证。有了这个输入API,再加上输出算子的“精确一次”保证,你就能真正实现端到端的“精确一次”语义保证。(改功能截止Spark 1.6.1还是实验性的)更详细的说明见:Kafka Integration Guide。 输出算子的语义 输出算子(如 foreachRDD)提供“至少一次”语义保证,也就是说,如果worker故障,单条输出数据可能会被多次写入外部实体中。不过这对于文件系统来说是可以接受的(使用saveAs***Files 多次保存文件会覆盖之前的),所以我们需要一些额外的工作来实现“精确一次”语义。主要有两种实现方式: 幂等更新(Idempotent updates): 就是说多次操作,产生的结果相同。例如,多次调用saveAs***Files保存的文件总是包含相同的数据。 事务更新(Transactional updates): 所有的更新都是事务性的,这样一来就能保证更新的原子性。以下是一种实现方式: 用批次时间(在foreachRDD中可用)和分区索引创建一个唯一标识,该标识代表流式应用中唯一的一个数据块。 基于这个标识建立更新事务,并使用数据块数据更新外部系统。也就是说,如果该标识未被提交,则原子地将标识代表的数据更新到外部系统。否则,就认为该标识已经被提交,直接忽略之。dstream.foreachRDD { (rdd, time) => rdd.foreachPartition { partitionIterator => val partitionId = TaskContext.get.partitionId() val uniqueId = generateUniqueId(time.milliseconds, partitionId) // 使用uniqueId作为事务的唯一标识,基于uniqueId实现partitionIterator所指向数据的原子事务提交 } } 迁移指南 – 从0.9.1及以下升级到1.x 在Spark 0.9.1和Spark 1.0之间,有一些API接口变更,变更目的是为了保障未来版本API的稳定。本节将详细说明一下从已有版本迁移升级到1.0所需的工作。 输入DStream(Input DStreams): 所有创建输入流的算子(如:StreamingContext.socketStream, FlumeUtils.createStream 等)的返回值不再是DStream(对Java来说是JavaDStream),而是InputDStream/ReceiverInputDStream(对Java来说是JavaInputDStream/JavaPairInputDStream/JavaReceiverInputDStream/JavaPairReceiverInputDStream)。这样才能确保特定输入流的功能能够在未来持续增加到这些class中,而不会打破二进制兼容性。注意,已有的Spark Streaming应用应该不需要任何代码修改(新的返回类型都是DStream的子类),只不过需要基于Spark 1.0重新编译一把。 定制网络接收器(Custom Network Receivers): 自从Spark Streaming发布以来,Scala就能基于NetworkReceiver来定制网络接收器。但由于错误处理和汇报API方便的限制,该类型不能在Java中使用。所以Spark 1.0开始,用Receiver来替换掉这个NetworkReceiver,主要的好处如下: 该类型新增了stop和restart方法,便于控制接收器的生命周期。详见custom receiver guide。 定制接收器用Scala和Java都能实现。 为了将已有的基于NetworkReceiver的自定义接收器迁移到Receiver上来,你需要如下工作: 首先你的自定义接收器类型需要从org.apache.spark.streaming.receiver.Receiver继承,而不再是org.apache.spark.streaming.dstream.NetworkReceiver。 原先,我们需要在自定义接收器中创建一个BlockGenerator来保存接收到的数据。你必须显示的实现onStart() 和 onStop() 方法。而在新的Receiver class中,这些都不需要了,你只需要调用它的store系列的方法就能将数据保存到Spark中。所以你接下来需要做的迁移工作就是,删除BlockGenerator对象(这个类型在Spark 1.0之后也没有了~),然后用store(…)方法来保存接收到的数据。 基于Actor的接收器(Actor-based Receivers): 从actor class继承后,并实org.apache.spark.streaming.receiver.Receiver后,即可从Akka Actors中获取数据。获取数据的类被重命名为org.apache.spark.streaming.receiver.ActorHelper,而保存数据的pushBlocks(…)方法也被重命名为 store(…)。其他org.apache.spark.streaming.receivers包中的工具类也被移到org.apache.spark.streaming.receiver包下并重命名,新的类名应该比之前更加清晰。 下一步 其他相关参考文档 Kafka Integration Guide Flume Integration Guide Kinesis Integration Guide Custom Receiver Guide API文档 Scala 文档 StreamingContext和DStream KafkaUtils,FlumeUtils,KinesisUtils,TwitterUtils,ZeroMQUtils, 以及MQTTUtils Java 文档 JavaStreamingContext,JavaDStream以及JavaPairDStream KafkaUtils,FlumeUtils,KinesisUtilsTwitterUtils,ZeroMQUtils, 以及MQTTUtils Python 文档 StreamingContext和DStream KafkaUtils 其他示例:Scala,Java以及Python Spark Streaming相关的Paper和video。 转载自 并发编程网 - ifeve.com

优秀的个人博客,低调大师

《Spark 官方文档》在YARN上运行Spark

在YARN上运行Spark 对YARN (Hadoop NextGen)的支持是从Spark-0.6.0开始的,后续的版本也一直持续在改进。 在YARN上启动 首先确保 HADOOP_CONF_DIR 或者 YARN_CONF_DIR 变量指向一个包含Hadoop集群客户端配置文件的目录。这些配置用于读写HDFS和连接YARN资源管理器(ResourceManager)。这些配置应该发布到YARN集群上所有的节点,这样所有的YARN容器才能使用同样的配置。如果这些配置引用了Java系统属性或者其他不属于YARN管理的环境变量,那么这些属性和变量也应该在Spark应用的配置中设置(包括驱动器、执行器,以及其AM【运行于client模式时的YARN Application Master】) 在YARN上启动Spark应用有两种模式。在cluster模式下,Spark驱动器(driver)在YARNApplication Master中运行(运行于集群中),因此客户端可以在Spark应用启动之后关闭退出。而client模式下,Spark驱动器在客户端进程中,这时的YARNApplication Master只用于向YARN申请资源。 与独立部署(Spark standalone)或 在Mesos集群中不同,YARN的master地址不是在–master参数中指定的,而是在Hadoop配置文件中设置。因此,这种情况下,–master只需设置为yarn。 以下用cluster模式启动一个Spark应用: $ ./bin/spark-submit --class path.to.your.Class \ --master yarn \ --deploy-mode cluster \ [options] \ <app jar> [app options] 例如: $ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ --driver-memory 4g \ --executor-memory 2g \ --executor-cores 1 \ --queue thequeue \ lib/spark-examples*.jar 10 以上例子中,启动了一个YARN客户端程序,使用默认的Application Master。而后SparkPi在Application Master中的子线程中运行。客户端会周期性的把Application Master的状态信息拉取下来,并更新到控制台。客户端会在你的应用程序结束后退出。参考“调试你的应用”,这一节说明了如何查看驱动器和执行器的日志。 要以client模式启动一个spark应用,只需在上面的例子中把cluster换成client。下面这个例子就是以client模式启动spark-shell: $ ./bin/spark-shell --master yarn --deploy-mode client 增加其他JAR包 在cluster模式下,驱动器不在客户端机器上运行,所以SparkContext.addJar添加客户端本地文件就不好使了。要使客户端上本地文件能够用SparkContext.addJar来添加,可以用–jars选项: $ ./bin/spark-submit --class my.main.Class \ --master yarn \ --deploy-mode cluster \ --jars my-other-jar.jar,my-other-other-jar.jar my-main-jar.jar app_arg1 app_arg2 准备 在YARN上运行Spark需要其二进制发布包构建的时候增加YARN支持。二进制发布包可以在这里下载:downloads page。 想要自己编译,参考这里:Building Spark 配置 大多数配置,对于YARN或其他集群模式下,都是一样的。详细请参考这里:configuration page。 以下是YARN上专有的配置项。 调试你的应用 在YARN术语集中,执行器和Application Master在容器(container)中运行。YARN在一个应用程序结束后,有两种处理容器日志的模式。如果开启了日志聚合(yarn.log-aggregation-enable),那么容器日志将被复制到HDFS,并删除本地日志。而后这些日志可以在集群任何节点上用yarn logs命令查看: yarn logs -applicationId <app ID> 以上命令,将会打印出指定应用的所有日志文件的内容。你也可以直接在HDFS上查看这些日志(HDFS shell或者HDFS API)。这些目录可以在你的YARN配置中指定(yarn.nodemanager.remote-app-log-dir和yarn.nodemanager-remote-app-log-dir-suffix)。这些日志同样还可以在Spark Web UI上Executors tab页查看。当然,你需要启动Spark history server和 MapReduce history server,再在 yarn-site.xml 中配置好yarn.log.server.url。Spark history server UI 将把你重定向到MapReduce history server 以查看这些聚合日志。 如果日志聚合没有开启,那么日志文件将在每台机器上的 YARN_APP_LOGS_DIR 目录保留,通常这个目录指向 /tmp/logs 或者 $HADOOP_HOME/log/userlogs(这取决于Hadoop版本和安全方式)。查看日志的话,需要到每台机器上查看这些目录。子目录是按 application ID 和 container ID来组织的。这些日志同样可以在 Spark Web UI 上 Executors tab 页查看,而且这时你不需要运行MapReduce history server。 如果需要检查各个容器的启动环境,可以先把 yarn.nodemanager.delete.debug-delay-sec 增大(如:36000),然后访问应用缓存目录yarn.nodemanager.local-dirs,这时容器的启动目录。这里包含了启动脚本、jar包以及容器启动所用的所有环境变量。这对调试 classpath 相关问题尤其有用。(注意,启用这个需要管理员权限,并重启所有的node managers,因此,对托管集群不适用) 要自定义Application Master或执行器的 log4j 配置,有如下方法: 通过spark-submit –files 上传一个自定义的 log4j.properties 文件。 在 spark.driver.extraJavaOptions(对Spark驱动器)或者 spark.executor.extraJavaOptions(对Spark执行器)增加 -Dlog4j.configuration=<location of configuration file>。注意,如果使用文件,那么 file: 协议头必须显式写上,且文件必须在所节点上都存在。 更新${SPARK_CONF_DIR}/log4j.properties 文件以及其他配置。注意,如果在多个地方都配置了log4j,那么上面其他两种方法的配置优先级比本方法要高。 注意,第一种方法中,执行器和Application Master共享同一个log4j配置,在有些环境下(AM和执行器在同一个节点上运行)可能会有问题(例如,AM和执行器日志都写入到同一个日志文件) 如果你需要引用YARN放置日志文件的路径,以便YARN可以正确地展示和聚合日志,请在log4j.properties文件中使用spark.yarn.app.container.log.dir。例如,log4j.appender.file_appender.File=${spark.yarn.app.container.log.dir}/spark.log 。对于流式应用,可以配置RollingFileAppender,并将文件路径设置为YARN日志目录,以避免磁盘打满,而且这些日志还可以利用YARN的日志工具访问和查看。 Spark属性 Property Name Default Meaning spark.yarn.am.memory 512m YARN Application Master在client模式下, 使用内存总量,与JVM内存设置格式相同(如:512m,2g)。 如果是cluster模式下,请设置 spark.driver.memory。 注意使用小写的后缀, 如:k、m、g、t、p,分别代表 kibi-, mebi, gibi-, tebi- 以及pebibytes。 spark.driver.cores 1 YARN cluster模式下,驱动器使用的CPU core个数。 在cluster模式下,驱动器(driver)和YARN AM(application master)使用相同的JVM,所以这个属性也可以用来控制YARN AM。 如果是client模式下,请使用spark.yarn.am.cores来控制YARN AM的CPU core个数。 spark.yarn.am.cores 1 client模式下,用来控制YARN AM的CPU core个数。 cluster模式下,请使用 spark.driver.cores。 spark.yarn.am.waitTime 100s 在cluster模式下,该属性表示YARN AM等待SparkContext初始化的时间。 在client模式下,该属性表示YARN AM等待驱动器连接的时间。 spark.yarn.submit.file .replication 默认的HDFS副本数(通常是3) HDFS文件副本数。包括Spark jar,app jar以及其他分布式缓存文件和存档。 spark.yarn.preserve .staging.files false 设为true以保存stage相关文件(stage相关的jar包和缓存)到作业结束,而不是立即删除。 spark.yarn.scheduler .heartbeat.interval-ms 3000 Spark AM发送给YARN资源管理器心跳的间隔(ms)。 这个值最多不能超过YARN配置的超时间隔的一半。(yarn.am.liveness-monitor.expiry-interval-ms) spark.yarn.scheduler .initial-allocation.interval 200ms Spark AM的初始带外心跳间隔(有待定的资源申请时)。 其值不应该大于 spark.yarn.scheduler.heartbeat.interval-ms。 该资源分配间隔会在每次带外心跳成功后但仍有待定资源申请时倍增, 直至达到 spark.yarn.scheduler.heartbeat.interval-ms 所设定的值。 spark.yarn.max.executor .failures 执行器个数*2且不小于3 Spark应用最大容忍执行器失败次数。 spark.yarn.historyServer .address (none) Spark history server地址,如:host.com:18080 。 这个地址不要包含协议头(http://)。 默认不设置,因为history server是可选的。 应用程序结束以后,YARN资源管理器web UI通过这个地址链接到Spark history server UI。 对于这属性,可以使用YARN属性变量,且这些变量是Spark在运行时组装的。 例如,如果Spark history server和YARN资源管理器(ResourceManager)部署在同一台机器上运行, 那么这个属性可以设置为 ${hadoopconf-yarn.resourcemanager.hostname}:18080 spark.yarn.dist.archives (none) 逗号分隔的文档列表,其指向的文档将被提取到每个执行器的工作目录下。 spark.yarn.dist.files (none) 逗号分隔的文件列表,其指向的文件将被复制到每个执行器的工作目录下。 spark.executor.instances 2 执行器个数。注意,这个属性和 spark.dynamicAllocation.enabled是不兼容的。 如果同时设置了spark.dynamicAllocation.enabled,那么动态分配将被关闭,并使用 spark.executor.instances 所设置的值。 spark.yarn.executor .memoryOverhead 执行器内存 * 0.10或者 384MB中较大者 每个执行器所分配的堆外内存(MB)总量。这些内存将被用于存储VM开销、字符串常量,以及其他原生开销等。这会使执行器所需内存增加(典型情况,增加6%~10%) spark.yarn.driver .memoryOverhead 驱动器内存 * 0.10或者 384MB中较大者 每个驱动器所分配的堆外内存(MB)总量。 这些内存将被用于存储VM开销、字符串常量,以及其他原生开销等。 这会使执行器所需内存增加(典型情况,增加6%~10%) spark.yarn.am .memoryOverhead Application Master 内存 * 0.10或者 384MB中较大者 与 spark.yarn.driver.memoryOverhead 相同,只是仅用于YARN AM client模式下。 spark.yarn.am.port (random) YARN AM所监听的端口。 在YARN client模式下,用于Spark驱动器(driver)和YARN AM通信。 而在YARN cluster模式下,这个端口将被用于动态执行器特性,这个特性会处理调度器后台杀死执行器的请求。 spark.yarn.queue default Spark应用提交到哪个yarn队列。 spark.yarn.jar (none) Spark jar文件位置,如果需要覆盖默认位置,请设定这个值。 默认的,Spark on YARN会使用本地的Spark jar包,但Spark jar包同样可以使用整个集群可读的HDFS文件位置。 这使YARN可以在各节点上缓存Spark jar包,而不需要每次运行一个应用的时候都要分发。 使用 hdfs:///some/path 来指定HDFS上jar包文件路径。 spark.yarn.access .namenodes (none) 逗号分隔的HDFS namenodes。 例如spark.yarn.access.namenodes=hdfs://nn1.com:8032,hdfs://nn2.com:8032。 Spark应用必须有这些机器的访问权限,并且需要配置好 kerberos(可以在同一个域或者信任的域)。 Spark需要每个namenode的安全token,以便访问集群中HDFS。 spark.yarn.appMasterEnv .[EnvironmentVariableName] (none) 增加EnvironmentVariableName所指定的环境变量到YARN AM的进程中。 用户可以指定多个环境变量。在cluster模式下,这个可以控制Spark驱动器的环境变量; 而在client模式下,只控制执行器启动器的环境变量。 spark.yarn .containerLauncherMaxThreads 25 YARN AM 启动执行器的容器最多包含多少线程数。 spark.yarn.am .extraJavaOptions (none) 在client模式下,传给YARN AM 的JVM参数。 在cluster模式下,请使用spark.driver.extraJavaOptions spark.yarn.am .extraLibraryPath (none) client模式下传给YARN AM额外依赖库。 spark.yarn.maxAppAttempts yarn .resourcemanager.am.max-attemptsin YARN 提交应用最大尝试次数。不应大于YARN全局配置的最大尝试次数。 spark.yarn.am .attemptFailuresValidityInterval (none) 定义AM失败跟踪校验间隔。 AM运行了至少要运行这么多时间后,其失败计数才被重置。 这个特性只有配置其值后才会生效,且只支持Hadoop-2.6+ spark.yarn.submit .waitAppCompletion true 在YARN cluster模式下,控制是否客户端等到Spark应用结束后再退出。 如果设为true,客户端进程将一直等待,并持续报告应用状态。 否则,客户端会在提交完成后退出。 spark.yarn.am .nodeLabelExpression (none) 一个YARN节点标签表达式(node label expression),以此来限制AM可以被调度到哪些节点上执行。 只有Hadoop 2.6+才能支持节点标签表达式,所以如果用其他版本运行,这个属性将被忽略。 spark.yarn.executor .nodeLabelExpression (none) 一个YARN节点标签表达式(node label expression),以此来限制执行器可以被调度到哪些节点上启动。 只有Hadoop 2.6+才能支持节点标签表达式,所以如果在其他版本上运行时,这个属性将被忽略。 spark.yarn.tags (none) 逗号分隔的字符串,传递YARN应用tags。 其值将出现在YARN Application Reports中,可以用来过滤和查询YARN 应用。 spark.yarn.keytab (none) 认证文件keytab的全路径。 这个文件将被复制到访问Secure Distributed Cache的YARN 应用节点上,并且周期性的刷新登陆的ticket和代理token(本地模式下也能work) spark.yarn.principal (none) 登陆KDC的认证,secure HDFS需要(local模式下也能用) spark.yarn.config .gatewayPath (none) 某些路径,可能在网关主机上能正常访问(Spark应用启动的地方),而在其他节点上的访问方式(路径)可能不同。 对于这样的路径,需要本属性配合 spark.yarn.config.replacementPath组合使用,对于支持异构配置的集群,必须配置好这两个值,Spark才能正确地启动远程进程。 replacement path 通常包含一些YARN导出的环境变量(因此,对Spark containers可见)。 例如,如果网关节点上Hadoop库安装在 /disk1/hadoop,并且其导出环境变量为 HADOOP_HOME, 就需要将spark.yarn.config.gatewayPath 设置为 /disk1/hadoop 并将 replacement path设为 $HADOOP_HOME, 这样才能在远程节点上以正确的环境变量启动进程。 spark.yarn.config .replacementPath (none) 见 spark.yarn.config.getewayPath spark.yarn.security .tokens.${service}.enabled true 在启用安全设置的情况下,控制是否对non-HDFS服务,获取代理token。 默认地,所有支持的服务,都启用;但你也可以在某些有冲突的情况下,对某些服务禁用。 目前支持的服务有:hive,hbase 重要提示 对CPU资源的请求是否满足,取决于调度器如何配置和使用。 cluster模式下,Spark执行器(executor)和驱动器(driver)的local目录都由YARN配置决定(yarn.nodemanager.local-dirs);如果用户指定了spark.local.dir,这时候将被忽略。在client模式下,Spark执行器(executor)的local目录由YARN决定,而驱动器(driver)的local目录由spark.local.dir决定,因为这时候,驱动器不在YARN上运行。 选项参数 –files和 –archives中井号(#)用法类似于Hadoop。例如,你可以指定 –files localtest.txt#appSees.txt,这将会把localtest.txt文件上传到HDFS上,并重命名为 appSees.txt,而你的程序应用用 appSees.txt来引用这个文件。 当你在cluster模式下使用本地文件时,使用选项–jar 才能让SparkContext.addJar正常工作,而不必使用 HDFS,HTTP,HTTPS或者FTP上的文件。 转载自 并发编程网 - ifeve.com

优秀的个人博客,低调大师

SparkSql官方文档中文翻译(java版本)

1 概述(Overview) 2 DataFrames 2.1 入口:SQLContext(Starting Point: SQLContext) 2.2 创建DataFrames(Creating DataFrames) 2.3 DataFrame操作(DataFrame Operations) 2.4 运行SQL查询程序(Running SQL Queries Programmatically) 2.5 DataFrames与RDDs的相互转换(Interoperating with RDDs) 2.5.1 使用反射获取Schema(Inferring the Schema Using Reflection) 2.5.2 通过编程接口指定Schema(Programmatically Specifying the Schema) 3 数据源(Data Source) 3.1 一般Load/Save方法 3.1.1 手动指定选项(Manually Specifying Options) 3.1.2 存储模式(Save Modes) 3.1.3 持久化到表(Saving to Persistent Tables) 3.2 Parquet文件 3.2.1 读取Parquet文件(Loading Data Programmatically) 3.2.2 解析分区信息(Partition Discovery) 3.2.3 Schema合并(Schema Merging) 3.2.4 Hive metastore Parquet表转换(Hive metastore Parquet table conversion) 3.2.4.1 Hive/Parquet Schema反射(Hive/Parquet Schema Reconciliation) 3.2.4.2 元数据刷新(Metadata Refreshing) 3.2.5 配置(Configuration) 3.3 JSON数据集 3.4 Hive表 3.4.1 访问不同版本的Hive Metastore(Interacting with Different Versions of Hive Metastore) 3.5 JDBC To Other Databases 3.6 故障排除(Troubleshooting) 4 性能调优 4.1 缓存数据至内存(Caching Data In Memory) 4.2 调优参数(Other Configuration Options) 5 分布式SQL引擎 5.1 运行Thrift JDBC/ODBC服务 5.2 运行Spark SQL CLI 6 Migration Guide 6.1 与Hive的兼容(Compatibility with Apache Hive 6.1.1 在Hive warehouse中部署Spark SQL 6.1.2 Spark SQL支持的Hive特性 6.1.3 不支持的Hive功能 7 Reference 7.1 Data Types 7.2 NaN 语义 1 概述(Overview) Spark SQL是Spark的一个组件,用于结构化数据的计算。Spark SQL提供了一个称为DataFrames的编程抽象,DataFrames可以充当分布式SQL查询引擎。 2 DataFrames DataFrame是一个分布式的数据集合,该数据集合以命名列的方式进行整合。DataFrame可以理解为关系数据库中的一张表,也可以理解为R/Python中的一个data frame。DataFrames可以通过多种数据构造,例如:结构化的数据文件、hive中的表、外部数据库、Spark计算过程中生成的RDD等。DataFrame的API支持4种语言:Scala、Java、Python、R。 2.1 入口:SQLContext(Starting Point: SQLContext) Spark SQL程序的主入口是SQLContext类或它的子类。创建一个基本的SQLContext,你只需要SparkContext,创建代码示例如下: Scala val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) Java JavaSparkContext sc = ...; // An existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); 除了基本的SQLContext,也可以创建HiveContext。SQLContext和HiveContext区别与联系为: SQLContext现在只支持SQL语法解析器(SQL-92语法) HiveContext现在支持SQL语法解析器和HiveSQL语法解析器,默认为HiveSQL语法解析器,用户可以通过配置切换成SQL语法解析器,来运行HiveSQL不支持的语法。 使用HiveContext可以使用Hive的UDF,读写Hive表数据等Hive操作。SQLContext不可以对Hive进行操作。 Spark SQL未来的版本会不断丰富SQLContext的功能,做到SQLContext和HiveContext的功能容和,最终可能两者会统一成一个Context HiveContext包装了Hive的依赖包,把HiveContext单独拿出来,可以在部署基本的Spark的时候就不需要Hive的依赖包,需要使用HiveContext时再把Hive的各种依赖包加进来。 SQL的解析器可以通过配置spark.sql.dialect参数进行配置。在SQLContext中只能使用Spark SQL提供的”sql“解析器。在HiveContext中默认解析器为”hiveql“,也支持”sql“解析器。 2.2 创建DataFrames(Creating DataFrames) 使用SQLContext,spark应用程序(Application)可以通过RDD、Hive表、JSON格式数据等数据源创建DataFrames。下面是基于JSON文件创建DataFrame的示例: Scala val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) val df = sqlContext.read.json("examples/src/main/resources/people.json") // Displays the content of the DataFrame to stdout df.show() Java JavaSparkContext sc = ...; // An existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json"); // Displays the content of the DataFrame to stdout df.show(); 2.3 DataFrame操作(DataFrame Operations) DataFrames支持Scala、Java和Python的操作接口。下面是Scala和Java的几个操作示例: Scala val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // Create the DataFrame val df = sqlContext.read.json("examples/src/main/resources/people.json") // Show the content of the DataFrame df.show() // age name // null Michael // 30 Andy // 19 Justin // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show() // name // Michael // Andy // Justin // Select everybody, but increment the age by 1 df.select(df("name"), df("age") + 1).show() // name (age + 1) // Michael null // Andy 31 // Justin 20 // Select people older than 21 df.filter(df("age") > 21).show() // age name // 30 Andy // Count people by age df.groupBy("age").count().show() // age count // null 1 // 19 1 // 30 1 Java JavaSparkContext sc // An existing SparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc) // Create the DataFrame DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json"); // Show the content of the DataFrame df.show(); // age name // null Michael // 30 Andy // 19 Justin // Print the schema in a tree format df.printSchema(); // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show(); // name // Michael // Andy // Justin // Select everybody, but increment the age by 1 df.select(df.col("name"), df.col("age").plus(1)).show(); // name (age + 1) // Michael null // Andy 31 // Justin 20 // Select people older than 21 df.filter(df.col("age").gt(21)).show(); // age name // 30 Andy // Count people by age df.groupBy("age").count().show(); // age count // null 1 // 19 1 // 30 1 详细的DataFrame API请参考API Documentation。 除了简单列引用和表达式,DataFrames还有丰富的library,功能包括string操作、date操作、常见数学操作等。详细内容请参考DataFrame Function Reference。 2.4 运行SQL查询程序(Running SQL Queries Programmatically) Spark Application可以使用SQLContext的sql()方法执行SQL查询操作,sql()方法返回的查询结果为DataFrame格式。代码如下: Scala val sqlContext = ... // An existing SQLContext val df = sqlContext.sql("SELECT * FROM table") Java SQLContext sqlContext = ... // An existing SQLContext DataFrame df = sqlContext.sql("SELECT * FROM table") 2.5 DataFrames与RDDs的相互转换(Interoperating with RDDs) Spark SQL支持两种RDDs转换为DataFrames的方式: 使用反射获取RDD内的Schema 当已知类的Schema的时候,使用这种基于反射的方法会让代码更加简洁而且效果也很好。 通过编程接口指定Schema 通过Spark SQL的接口创建RDD的Schema,这种方式会让代码比较冗长。 这种方法的好处是,在运行时才知道数据的列以及列的类型的情况下,可以动态生成Schema 2.5.1 使用反射获取Schema(Inferring the Schema Using Reflection) Spark SQL支持将JavaBean的RDD自动转换成DataFrame。通过反射获取Bean的基本信息,依据Bean的信息定义Schema。当前Spark SQL版本(Spark 1.5.2)不支持嵌套的JavaBeans和复杂数据类型(如:List、Array)。创建一个实现Serializable接口包含所有属性getters和setters的类来创建一个JavaBean。通过调用createDataFrame并提供JavaBean的Class object,指定一个Schema给一个RDD。示例如下: public static class Person implements Serializable { private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } } // sc is an existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); // Load a text file and convert each line to a JavaBean. JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map( new Function<String, Person>() { public Person call(String line) throws Exception { String[] parts = line.split(","); Person person = new Person(); person.setName(parts[0]); person.setAge(Integer.parseInt(parts[1].trim())); return person; } }); // Apply a schema to an RDD of JavaBeans and register it as a table. DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class); schemaPeople.registerTempTable("people"); // SQL can be run over RDDs that have been registered as tables. DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() { public String call(Row row) { return "Name: " + row.getString(0); } }).collect(); 2.5.2 通过编程接口指定Schema(Programmatically Specifying the Schema) 当JavaBean不能被预先定义的时候,编程创建DataFrame分为三步: 从原来的RDD创建一个Row格式的RDD 创建与RDD中Rows结构匹配的StructType,通过该StructType创建表示RDD的Schema 通过SQLContext提供的createDataFrame方法创建DataFrame,方法参数为RDD的Schema 示例如下: import org.apache.spark.api.java.function.Function; // Import factory methods provided by DataTypes. import org.apache.spark.sql.types.DataTypes; // Import StructType and StructField import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.types.StructField; // Import Row. import org.apache.spark.sql.Row; // Import RowFactory. import org.apache.spark.sql.RowFactory; // sc is an existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); // Load a text file and convert each line to a JavaBean. JavaRDD<String> people = sc.textFile("examples/src/main/resources/people.txt"); // The schema is encoded in a string String schemaString = "name age"; // Generate the schema based on the string of schema List<StructField> fields = new ArrayList<StructField>(); for (String fieldName: schemaString.split(" ")) { fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true)); } StructType schema = DataTypes.createStructType(fields); // Convert records of the RDD (people) to Rows. JavaRDD<Row> rowRDD = people.map( new Function<String, Row>() { public Row call(String record) throws Exception { String[] fields = record.split(","); return RowFactory.create(fields[0], fields[1].trim()); } }); // Apply the schema to the RDD. DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema); // Register the DataFrame as a table. peopleDataFrame.registerTempTable("people"); // SQL can be run over RDDs that have been registered as tables. DataFrame results = sqlContext.sql("SELECT name FROM people"); // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. List<String> names = results.javaRDD().map(new Function<Row, String>() { public String call(Row row) { return "Name: " + row.getString(0); } }).collect(); 3 数据源(Data Source) Spark SQL的DataFrame接口支持多种数据源的操作。一个DataFrame可以进行RDDs方式的操作,也可以被注册为临时表。把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询。Data Sources这部分首先描述了对Spark的数据源执行加载和保存的常用方法,然后对内置数据源进行深入介绍。 3.1 一般Load/Save方法 Spark SQL的默认数据源为Parquet格式。数据源为Parquet文件时,Spark SQL可以方便的执行所有的操作。修改配置项spark.sql.sources.default,可修改默认数据源格式。读取Parquet文件示例如下: Scala val df = sqlContext.read.load("examples/src/main/resources/users.parquet") df.select("name", "favorite_color").write.save("namesAndFavColors.parquet") Java DataFrame df = sqlContext.read().load("examples/src/main/resources/users.parquet"); df.select("name", "favorite_color").write().save("namesAndFavColors.parquet"); 3.1.1 手动指定选项(Manually Specifying Options) 当数据源格式不是parquet格式文件时,需要手动指定数据源的格式。数据源格式需要指定全名(例如:org.apache.spark.sql.parquet),如果数据源格式为内置格式,则只需要指定简称(json,parquet,jdbc)。通过指定的数据源格式名,可以对DataFrames进行类型转换操作。示例如下: Scala val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json") df.select("name", "age").write.format("parquet").save("namesAndAges.parquet") Java DataFrame df = sqlContext.read().format("json").load("examples/src/main/resources/people.json"); df.select("name", "age").write().format("parquet").save("namesAndAges.parquet"); 3.1.2 存储模式(Save Modes) 可以采用SaveMode执行存储操作,SaveMode定义了对数据的处理模式。需要注意的是,这些保存模式不使用任何锁定,不是原子操作。此外,当使用Overwrite方式执行时,在输出新数据之前原数据就已经被删除。SaveMode详细介绍如下表: 3.1.3 持久化到表(Saving to Persistent Tables) 当使用HiveContext时,可以通过saveAsTable方法将DataFrames存储到表中。与registerTempTable方法不同的是,saveAsTable将DataFrame中的内容持久化到表中,并在HiveMetastore中存储元数据。存储一个DataFrame,可以使用SQLContext的table方法。table先创建一个表,方法参数为要创建的表的表名,然后将DataFrame持久化到这个表中。 默认的saveAsTable方法将创建一个“managed table”,表示数据的位置可以通过metastore获得。当存储数据的表被删除时,managed table也将自动删除。 3.2 Parquet文件 Parquet是一种支持多种数据处理系统的柱状的数据格式,Parquet文件中保留了原始数据的模式。Spark SQL提供了Parquet文件的读写功能。 3.2.1 读取Parquet文件(Loading Data Programmatically) 读取Parquet文件示例如下: Scala // sqlContext from the previous example is used in this example. // This is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ val people: RDD[Person] = ... // An RDD of case class objects, from the previous example. // The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet. people.write.parquet("people.parquet") // Read in the parquet file created above. Parquet files are self-describing so the schema is preserved. // The result of loading a Parquet file is also a DataFrame. val parquetFile = sqlContext.read.parquet("people.parquet") //Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerTempTable("parquetFile") val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenagers.map(t => "Name: " + t(0)).collect().foreach(println) Java // sqlContext from the previous example is used in this example. DataFrame schemaPeople = ... // The DataFrame from the previous example. // DataFrames can be saved as Parquet files, maintaining the schema information. schemaPeople.write().parquet("people.parquet"); // Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved. // The result of loading a parquet file is also a DataFrame. DataFrame parquetFile = sqlContext.read().parquet("people.parquet"); // Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerTempTable("parquetFile"); DataFrame teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"); List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() { public String call(Row row) { return "Name: " + row.getString(0); } }).collect(); 3.2.2 解析分区信息(Partition Discovery) 对表进行分区是对数据进行优化的方式之一。在分区的表内,数据通过分区列将数据存储在不同的目录下。Parquet数据源现在能够自动发现并解析分区信息。例如,对人口数据进行分区存储,分区列为gender和country,使用下面的目录结构: path └── to └── table ├── gender=male │ ├── ... │ │ │ ├── country=US │ │ └── data.parquet │ ├── country=CN │ │ └── data.parquet │ └── ... └── gender=female ├── ... │ ├── country=US │ └── data.parquet ├── country=CN │ └── data.parquet └── ... 通过传递path/to/table给 SQLContext.read.parquet或SQLContext.read.load,Spark SQL将自动解析分区信息。返回的DataFrame的Schema如下: root |-- name: string (nullable = true) |-- age: long (nullable = true) |-- gender: string (nullable = true) |-- country: string (nullable = true) 需要注意的是,数据的分区列的数据类型是自动解析的。当前,支持数值类型和字符串类型。自动解析分区类型的参数为:spark.sql.sources.partitionColumnTypeInference.enabled,默认值为true。如果想关闭该功能,直接将该参数设置为disabled。此时,分区列数据格式将被默认设置为string类型,不再进行类型解析。 3.2.3 Schema合并(Schema Merging) 像ProtocolBuffer、Avro和Thrift那样,Parquet也支持Schema evolution(Schema演变)。用户可以先定义一个简单的Schema,然后逐渐的向Schema中增加列描述。通过这种方式,用户可以获取多个有不同Schema但相互兼容的Parquet文件。现在Parquet数据源能自动检测这种情况,并合并这些文件的schemas。 因为Schema合并是一个高消耗的操作,在大多数情况下并不需要,所以Spark SQL从1.5.0开始默认关闭了该功能。可以通过下面两种方式开启该功能: 当数据源为Parquet文件时,将数据源选项mergeSchema设置为true 设置全局SQL选项spark.sql.parquet.mergeSchema为true 示例如下: Scala // sqlContext from the previous example is used in this example. // This is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ // Create a simple DataFrame, stored into a partition directory val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double") df1.write.parquet("data/test_table/key=1") // Create another DataFrame in a new partition directory, // adding a new column and dropping an existing column val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple") df2.write.parquet("data/test_table/key=2") // Read the partitioned table val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table") df3.printSchema() // The final schema consists of all 3 columns in the Parquet files together // with the partitioning column appeared in the partition directory paths. // root // |-- single: int (nullable = true) // |-- double: int (nullable = true) // |-- triple: int (nullable = true) // |-- key : int (nullable = true) 3.2.4 Hive metastore Parquet表转换(Hive metastore Parquet table conversion) 当向Hive metastore中读写Parquet表时,Spark SQL将使用Spark SQL自带的Parquet SerDe(SerDe:Serialize/Deserilize的简称,目的是用于序列化和反序列化),而不是用Hive的SerDe,Spark SQL自带的SerDe拥有更好的性能。这个优化的配置参数为spark.sql.hive.convertMetastoreParquet,默认值为开启。 3.2.4.1 Hive/Parquet Schema反射(Hive/Parquet Schema Reconciliation) 从表Schema处理的角度对比Hive和Parquet,有两个区别: Hive区分大小写,Parquet不区分大小写 hive允许所有的列为空,而Parquet不允许所有的列全为空 由于这两个区别,当将Hive metastore Parquet表转换为Spark SQL Parquet表时,需要将Hive metastore schema和Parquet schema进行一致化。一致化规则如下: 这两个schema中的同名字段必须具有相同的数据类型。一致化后的字段必须为Parquet的字段类型。这个规则同时也解决了空值的问题。 一致化后的schema只包含Hive metastore中出现的字段。 忽略只出现在Parquet schema中的字段 只在Hive metastore schema中出现的字段设为nullable字段,并加到一致化后的schema中 3.2.4.2 元数据刷新(Metadata Refreshing) Spark SQL缓存了Parquet元数据以达到良好的性能。当Hive metastore Parquet表转换为enabled时,表修改后缓存的元数据并不能刷新。所以,当表被Hive或其它工具修改时,则必须手动刷新元数据,以保证元数据的一致性。示例如下: Scala // sqlContext is an existing HiveContext sqlContext.refreshTable("my_table") Java // sqlContext is an existing HiveContext sqlContext.refreshTable("my_table") 3.2.5 配置(Configuration) 配置Parquet可以使用SQLContext的setConf方法或使用SQL执行SET key=value命令。详细参数说明如下: 3.3 JSON数据集 Spark SQL能自动解析JSON数据集的Schema,读取JSON数据集为DataFrame格式。读取JSON数据集方法为SQLContext.read().json()。该方法将String格式的RDD或JSON文件转换为DataFrame。 需要注意的是,这里的JSON文件不是常规的JSON格式。JSON文件每一行必须包含一个独立的、自满足有效的JSON对象。如果用多行描述一个JSON对象,会导致读取出错。读取JSON数据集示例如下: Scala // sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files. val path = "examples/src/main/resources/people.json" val people = sqlContext.read.json(path) // The inferred schema can be visualized using the printSchema() method. people.printSchema() // root // |-- age: integer (nullable = true) // |-- name: string (nullable = true) // Register this DataFrame as a table. people.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // Alternatively, a DataFrame can be created for a JSON dataset represented by // an RDD[String] storing one JSON object per string. val anotherPeopleRDD = sc.parallelize( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val anotherPeople = sqlContext.read.json(anotherPeopleRDD) Java // sc is an existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files. DataFrame people = sqlContext.read().json("examples/src/main/resources/people.json"); // The inferred schema can be visualized using the printSchema() method. people.printSchema(); // root // |-- age: integer (nullable = true) // |-- name: string (nullable = true) // Register this DataFrame as a table. people.registerTempTable("people"); // SQL statements can be run by using the sql methods provided by sqlContext. DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19"); // Alternatively, a DataFrame can be created for a JSON dataset represented by // an RDD[String] storing one JSON object per string. List<String> jsonData = Arrays.asList( "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}"); JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData); DataFrame anotherPeople = sqlContext.read().json(anotherPeopleRDD); 3.4 Hive表 Spark SQL支持对Hive的读写操作。需要注意的是,Hive所依赖的包,没有包含在Spark assembly包中。增加Hive时,需要在Spark的build中添加 -Phive 和 -Phivethriftserver配置。这两个配置将build一个新的assembly包,这个assembly包含了Hive的依赖包。注意,必须上这个心的assembly包到所有的worker节点上。因为worker节点在访问Hive中数据时,会调用Hive的 serialization and deserialization libraries(SerDes),此时将用到Hive的依赖包。 Hive的配置文件为conf/目录下的hive-site.xml文件。在YARN上执行查询命令之前,lib_managed/jars目录下的datanucleus包和conf/目录下的hive-site.xml必须可以被driverhe和所有的executors所访问。确保被访问,最方便的方式就是在spark-submit命令中通过--jars选项和--file选项指定。 操作Hive时,必须创建一个HiveContext对象,HiveContext继承了SQLContext,并增加了对MetaStore和HiveQL的支持。除了sql方法,HiveContext还提供了一个hql方法,hql方法可以执行HiveQL语法的查询语句。示例如下: Scala // sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL sqlContext.sql("FROM src SELECT key, value").collect().foreach(println) Java // sc is an existing JavaSparkContext. HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc); sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"); sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"); // Queries are expressed in HiveQL. Row[] results = sqlContext.sql("FROM src SELECT key, value").collect(); 3.4.1 访问不同版本的Hive Metastore(Interacting with Different Versions of Hive Metastore) Spark SQL经常需要访问Hive metastore,Spark SQL可以通过Hive metastore获取Hive表的元数据。从Spark 1.4.0开始,Spark SQL只需简单的配置,就支持各版本Hive metastore的访问。注意,涉及到metastore时Spar SQL忽略了Hive的版本。Spark SQL内部将Hive反编译至Hive 1.2.1版本,Spark SQL的内部操作(serdes, UDFs, UDAFs, etc)都调用Hive 1.2.1版本的class。版本配置项见下面表格: 3.5 JDBC To Other Databases Spark SQL支持使用JDBC访问其他数据库。当时用JDBC访问其它数据库时,最好使用JdbcRDD。使用JdbcRDD时,Spark SQL操作返回的DataFrame会很方便,也会很方便的添加其他数据源数据。JDBC数据源因为不需要用户提供ClassTag,所以很适合使用Java或Python进行操作。使用JDBC访问数据源,需要在spark classpath添加JDBC driver配置。例如,从Spark Shell连接postgres的配置为: SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell 远程数据库的表,可用DataFrame或Spark SQL临时表的方式调用数据源API。支持的参数有: 代码示例如下: Scala val jdbcDF = sqlContext.read.format("jdbc").options( Map("url" -> "jdbc:postgresql:dbserver", "dbtable" -> "schema.tablename")).load() Java Map<String, String> options = new HashMap<String, String>(); options.put("url", "jdbc:postgresql:dbserver"); options.put("dbtable", "schema.tablename"); DataFrame jdbcDF = sqlContext.read().format("jdbc"). options(options).load(); 3.6 故障排除(Troubleshooting) 在客户端session和所有的executors上,JDBC driver必须对启动类加载器(primordial class loader)设置为visible。因为当创建一个connection时,Java的DriverManager类会执行安全验证,安全验证将忽略所有对启动类加载器为非visible的driver。一个很方便的解决方法是,修改所有worker节点上的compute_classpath.sh脚本,将driver JARs添加至脚本。 有些数据库(例:H2)将所有的名字转换为大写,所以在这些数据库中,Spark SQL也需要将名字全部大写。 4 性能调优 4.1 缓存数据至内存(Caching Data In Memory) Spark SQL可以通过调用sqlContext.cacheTable("tableName") 或者dataFrame.cache(),将表用一种柱状格式( an in­memory columnar format)缓存至内存中。然后Spark SQL在执行查询任务时,只需扫描必需的列,从而以减少扫描数据量、提高性能。通过缓存数据,Spark SQL还可以自动调节压缩,从而达到最小化内存使用率和降低GC压力的目的。调用sqlContext.uncacheTable("tableName")可将缓存的数据移出内存。 可通过两种配置方式开启缓存数据功能: 使用SQLContext的setConf方法 执行SQL命令 SET key=value 4.2 调优参数(Other Configuration Options) 可以通过配置下表中的参数调节Spark SQL的性能。在后续的Spark版本中将逐渐增强自动调优功能,下表中的参数在后续的版本中或许将不再需要配置。 5 分布式SQL引擎 使用Spark SQL的JDBC/ODBC或者CLI,可以将Spark SQL作为一个分布式查询引擎。终端用户或应用不需要编写额外的代码,可以直接使用Spark SQL执行SQL查询。 5.1 运行Thrift JDBC/ODBC服务 这里运行的Thrift JDBC/ODBC服务与Hive 1.2.1中的HiveServer2一致。可以在Spark目录下执行如下命令来启动JDBC/ODBC服务: ./sbin/start-thriftserver.sh 这个命令接收所有bin/spark-submit命令行参数,添加一个--hiveconf参数来指定Hive的属性。详细的参数说明请执行命令./sbin/start-thriftserver.sh --help。服务默认监听端口为localhost:10000。有两种方式修改默认监听端口: 修改环境变量: export HIVE_SERVER2_THRIFT_PORT=<listening-port> export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host> ./sbin/start-thriftserver.sh \ --master <master-uri> \ ... 修改系统属性 ./sbin/start-thriftserver.sh \ --hiveconf hive.server2.thrift.port=<listening-port> \ --hiveconf hive.server2.thrift.bind.host=<listening-host> \ --master <master-uri> ... 使用beeline来测试Thrift JDBC/ODBC服务: ./bin/beeline 连接到Thrift JDBC/ODBC服务 beeline> !connect jdbc:hive2://localhost:10000 在非安全模式下,只需要输入机器上的一个用户名即可,无需密码。在安全模式下,beeline会要求输入用户名和密码。安全模式下的详细要求,请阅读beeline documentation的说明。 配置Hive需要替换conf/目录下的hive-site.xml。 Thrift JDBC服务也支持通过HTTP传输发送thrift RPC messages。开启HTTP模式需要将下面的配参数配置到系统属性或conf/:下的hive-site.xml中 hive.server2.transport.mode - Set this to value: http hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001 hive.server2.http.endpoint - HTTP endpoint; default is cliservice 测试http模式,可以使用beeline链接JDBC/ODBC服务: beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint> 5.2 运行Spark SQL CLI Spark SQL CLI可以很方便的在本地运行Hive元数据服务以及从命令行执行查询任务。需要注意的是,Spark SQL CLI不能与Thrift JDBC服务交互。在Spark目录下执行如下命令启动Spark SQL CLI: ./bin/spark-sql 配置Hive需要替换conf/下的hive-site.xml。执行./bin/spark-sql --help可查看详细的参数说明 。 6 Migration Guide 6.1 与Hive的兼容(Compatibility with Apache Hive) Spark SQL与Hive Metastore、SerDes、UDFs相兼容。Spark SQL兼容Hive Metastore从0.12到1.2.1的所有版本。Spark SQL也与Hive SerDes和UDFs相兼容,当前SerDes和UDFs是基于Hive 1.2.1。 6.1.1 在Hive warehouse中部署Spark SQL Spark SQL Thrift JDBC服务与Hive相兼容,在已存在的Hive上部署Spark SQL Thrift服务不需要对已存在的Hive Metastore做任何修改,也不需要对数据做任何改动。 6.1.2 Spark SQL支持的Hive特性 Spark SQL支持多部分的Hive特性,例如: Hive查询语句,包括: SELECT GROUP BY ORDER BY CLUSTER BY SORT BY 所有Hive运算符,包括 比较操作符(=, ⇔, ==, <>, <, >, >=, <=, etc) 算术运算符(+, -, *, /, %, etc) 逻辑运算符(AND, &&, OR, ||, etc) 复杂类型构造器 数学函数(sign,ln,cos,etc) 字符串函数(instr,length,printf,etc) 用户自定义函数(UDF) 用户自定义聚合函数(UDAF) 用户自定义序列化格式器(SerDes) 窗口函数 Joins JOIN {LEFT|RIGHT|FULL} OUTER JOIN LEFT SEMI JOIN CROSS JOIN Unions 子查询 SELECT col FROM ( SELECT a + b AS col from t1) t2 Sampling Explain 表分区,包括动态分区插入 视图 所有的Hive DDL函数,包括: CREATE TABLE CREATE TABLE AS SELECT ALTER TABLE 大部分的Hive数据类型,包括: TINYINT SMALLINT INT BIGINT BOOLEAN FLOAT DOUBLE STRING BINARY TIMESTAMP DATE ARRAY<> MAP<> STRUCT<> 6.1.3 不支持的Hive功能 下面是当前不支持的Hive特性,其中大部分特性在实际的Hive使用中很少用到。 Major Hive Features Tables with buckets:bucket是在一个Hive表分区内进行hash分区。Spark SQL当前不支持。 Esoteric Hive Features UNION type Unique join Column statistics collecting:当期Spark SQL不智齿列信息统计,只支持填充Hive Metastore的sizeInBytes列。 Hive Input/Output Formats File format for CLI: 这个功能用于在CLI显示返回结果,Spark SQL只支持TextOutputFormat Hadoop archive Hive优化部分Hive优化还没有添加到Spark中。没有添加的Hive优化(比如索引)对Spark SQL这种in-memory计算模型来说不是特别重要。下列Hive优化将在后续Spark SQL版本中慢慢添加。 块级别位图索引和虚拟列(用于建立索引) 自动检测joins和groupbys的reducer数量:当前Spark SQL中需要使用“SET spark.sql.shuffle.partitions=[num_tasks];”控制post-shuffle的并行度,不能自动检测。 仅元数据查询:对于可以通过仅使用元数据就能完成的查询,当前Spark SQL还是需要启动任务来计算结果。 数据倾斜标记:当前Spark SQL不遵循Hive中的数据倾斜标记 jion中STREAMTABLE提示:当前Spark SQL不遵循STREAMTABLE提示 查询结果为多个小文件时合并小文件:如果查询结果包含多个小文件,Hive能合并小文件为几个大文件,避免HDFS metadata溢出。当前Spark SQL不支持这个功能。 7 Reference 7.1 Data Types Spark SQL和DataFrames支持的数据格式如下: 数值类型 ByteType: 代表1字节有符号整数. 数值范围: -128 到 127. ShortType: 代表2字节有符号整数. 数值范围: -32768 到 32767. IntegerType: 代表4字节有符号整数. 数值范围: -2147483648 t到 2147483647. LongType: 代表8字节有符号整数. 数值范围: -9223372036854775808 到 9223372036854775807. FloatType: 代表4字节单精度浮点数。 DoubleType: 代表8字节双精度浮点数。 DecimalType: 表示任意精度的有符号十进制数。内部使用java.math.BigDecimal.A实现。 BigDecimal由一个任意精度的整数非标度值和一个32位的整数组成。 String类型 StringType: 表示字符串值。 Binary类型 BinaryType: 代表字节序列值。 Boolean类型 BooleanType: 代表布尔值。 Datetime类型 TimestampType: 代表包含的年、月、日、时、分和秒的时间值 DateType: 代表包含的年、月、日的日期值 复杂类型 ArrayType(elementType, containsNull): 代表包含一系列类型为elementType的元素。如果在一个将ArrayType值的元素可以为空值,containsNull指示是否允许为空。 MapType(keyType, valueType, valueContainsNull): 代表一系列键值对的集合。key不允许为空,valueContainsNull指示value是否允许为空 StructType(fields): 代表带有一个StructFields(列)描述结构数据。 StructField(name, dataType, nullable): 表示StructType中的一个字段。name表示列名、dataType表示数据类型、nullable指示是否允许为空。 Spark SQL所有的数据类型在org.apache.spark.sql.types包内。不同语言访问或创建数据类型方法不一样: Scala代码中添加import org.apache.spark.sql.types._,再进行数据类型访问或创建操作。 Java可以使用org.apache.spark.sql.types.DataTypes中的工厂方法,如下表: 7.2 NaN 语义 当处理float或double类型时,如果类型不符合标准的浮点语义,则使用专门的处理方式NaN。需要注意的是: NaN = NaN 返回 true 可以对NaN值进行聚合操作 在join操作中,key为NaN时,NaN值与普通的数值处理逻辑相同 NaN值大于所有的数值型数据,在升序排序中排在最后 转自:http://www.cnblogs.com/BYRans/

资源下载

更多资源
优质分享App

优质分享App

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

Spring

Spring

Spring框架(Spring Framework)是由Rod Johnson于2002年提出的开源Java企业级应用框架,旨在通过使用JavaBean替代传统EJB实现方式降低企业级编程开发的复杂性。该框架基于简单性、可测试性和松耦合性设计理念,提供核心容器、应用上下文、数据访问集成等模块,支持整合Hibernate、Struts等第三方框架,其适用范围不仅限于服务器端开发,绝大多数Java应用均可从中受益。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。