首页 文章 精选 留言 我的

精选列表

搜索[学习],共10000篇文章
优秀的个人博客,低调大师

大数据学习路线分享大数据之基础语法

1.2.1 计算机理论介绍 1.2.2 编程基础--进制分类、进制转换进制 就是进位制。指的是我们来表示一个数字的时候进位的制度。 进制分类 计算机中,常用的进制有以下几种: 进制 描述 示例 备注 二进制 使用0和1来描述所有的自然数 0, 1, 10, 11, 100, 101 使用0b开头 八进制 使用0-7来描述所有的自然数 4, 5, 6, 7, 10, 11, 12 使用0开头 十进制 使用0-9来描述所有的自然数 6, 7, 8, 9, 10, 11, 12 十六进制 使用0-9, a-f来描述所有的自然数 9, A, B, C, D, E, F, 10 使用0x开头 进制转换 十进制转其他进制 辗转相除法,用数字除进制,再用商除进制,一直到商为零结束,最后将每一步得到的余数倒着连接其他进制转十进制 每一位的数字乘进制的位数-1次方,再将所有的结果累加到一起二进制与八进制之间的相互转换 每一个八进制位可以等价替换成三个二进制位每一个十六进制位可以等价替换成四个二进制位原码、反码、补码 正数 负数 原码 由数字直接计算出的二进制表示形式 最高位表示符号位: 0代表正数, 1代表负数 反码 与原码相同 符号位不变, 其他位按位取反 补码 与原码、反码相同 反码 + 1 注意事项: 数据的存储与运算都是以补码的形式进行的补码与补码运算的结果还是补码对补码再求一次补, 可以得到原码 1.2.3 什么是JavaJava 是一种面向对象的程序设计语言,可以使用这种语言编写程序,实现我们想要的一些功能。主要分三块: J2SE: Java的标准版,用于标准的应用程序开发。 J2ME: Java的微型版,常用于移动端的开发。 J2EE: Java的企业版,用于企业级的应用服务开发。 Java程序的运行,需要先将.java源文件编译成.class字节码文件,然后由jvm虚拟机将这些字节码文件翻译成机器语言,然后执行相应的操作。 常用名词解释 JDK: Java Development Kit, Java开发框架, 开发Java程序需要用到的各种工具包。 JRE: Java Runtime Envrioment, Java运行时环境。 JVM: Java Virtual Mechine, Java虚拟机, 能够运行Java程序 1.2.4 常用DOS命令cd: 切换到指定的路径dir: 列举当前目录下的所有文件和文件夹mkdir: 创建文件夹rmdir: 删除文件夹 1.2.5 JDK的安装与环境变量的配置JDK的安装 找到对应的版本号, 直接安装即可安装完成后, 在 jdk 的 bin 目录下, 有编译运行Java程序所需要的指令环境变量的配置 为什么要配置环境变量 因为我们需要使用bin目录下的javac和java指令来编译和运行程序, 而使用这两个程序的话, 就需要先用cd切到指定路径下才能执行, 并且参数需要写java文件所在的目录, 因此在编译和运行程序的时候非常不方便。因此需要将bin目录添加到环境变量, 这样我们就可以在任意的路径下使用javac和java指令,来对我们的程序进行编译和运行。怎么配置环境变量 我的电脑 -> 右键 -> 属性 -> 高级系统设置 -> 环境变量选择系统变量 -> Path -> 编辑 win10: 新建一个变量,将jdk下面的bin目录粘贴进去,并上移到最上方win7/8: 将jdk下面的bin目录的路径粘贴到所有路径的最前方,然后添加一个路径分隔符;怎么验证环境变量是否配置成功 新建一个DOS窗口, 输入指令 javac, 如果没有错误提示就是是成功 1.2.6 注释注释是对代码的描述, 是开发者写给自己或者别人看的, 相当于我们记录的一些笔记, 或者备忘录。 注释部分的内容不会被编译, 因此没有语法上的要求。 注释可以分为三类: 单行注释: 以两个 // 开头, 后面的一行内容会被注释多行注释: 以一个 / 开头, 以一个 / 结尾。中间所有的内容都会被注释文档注释: 以 /* 开头, / 结尾。文档注释中可以添加一些标签,更方便的记录程序的信息 1.2.7 数据类型在程序中, 我们需要操作各种各样的数据, 这些数据都有各自不同的类型。 Java中, 所有的数据可以分为两大类: 基本数据类型 和 引用数据类型 引用数据类型, 又叫做引用类型。在后面课程中具体说明。 基本数据类型, 又叫做值类型, 在Java中有以下分类: 整型 就是整数, 按照占用空间大小, 分为四种字节型: byte, 1byte, [-128, 127]短整型: short, 2byte, [-2^15^, 2^15^-1]整型: int, 4byte, [-2^31^, 2^31^-1]长整型: long, 8byte, [-2^63^, 2^63^-1]浮点型 就是小数, 按照占用空间大小, 分为两种单精度浮点型: float, 4byte双精度浮点型: double, 8byte双精度浮点型可以比单精度浮点型精确更多的小数点后面的位数布尔型 用来描述程序中不是对的就是错的, 不是真的就是假的数据boolean, 1byte只有两个值: true / false字符型 用来描述组成一个文本的最小单元char, 2byte字符型的数据, 需要用单引号括起来, 单引号中只能有一个字符, 不能多也不能少 1.2.8 标识符由若干个字符组成的一个有序的序列, 用来描述程序中的一个数据 命名规则 只能由字母、数字、下划线和$符号组成不能以数字作为开头不能与关键字和保留字同名 关键字: 系统占用的, 已经被赋予了特殊含义的字符序列保留字: 系统占用的, 暂时还没有特殊含义, 但是后续可能会用到的字符序列命名规范 望文知意: 应该可以从标识符的命名中看出想表述的数据含义遵循驼峰命名法 大驼峰命名法: 所有的单词首字母都大写小驼峰命名法: 首单词除外, 从第二个单词开始, 每个单词的首字母都大写 1.2.9 变量和常量变量: 在程序运行的过程中, 数值可以发生改变的数据 常量: 在程序运行的过程中, 数值不可以发生改变的数据 在程序中的声明 变量 数据类型 标识符;数据类型 标识符 = 初始值;数据类型 标识符1, 标识符2, ...;数据类型 标识符1 = 初始值, 标识符2, 标识符3 = 初始值, ...;常量 final 数据类型 标识符 = 初始值;final 数据类型 标识符; 标识符 = 初始值;Java允许在声明常量的时候不赋值初始值, 可以延迟赋值。但是赋值只能进行一次。 1.2.10 转义字符一个特殊的字符, 主要有两个作用 可以将某些具有特殊含义的字符转成普通字符 单引号, 用来匹配一个字符的开始和结尾, 转义字符可以使其成为一个普通的单引号双引号, 用来匹配一个字符串的开始和结尾, 转义字符可以使其成为一个普通的双引号可以配合某些普通字符使用, 使其没有特殊含义 n本身是一个普通字符, 配合转义字符使用: n 表示换行t本身是一个普通字符, 配合转义字符使用: t 表示tabr本身是一个普通字符, 配合转义字符使用: r 表示return 1.2.11 数据类型转换一个变量声明完成后, 在内存中已经开辟好了空间, 此时是不允许调整空间大小的, 也就是说这个变量的数据类型是不允许改变的。这里说的数据类型转换, 指的是声明一个新的指定类型的变量, 将原来变量中的值拷贝到新的变量中。 数据类型转换可以分为两种: 自动类型转换 又叫做隐式转换, 一般是由取值范围小的数据类型, 向取值范围大的数据类型转换转换过程不需要任何额外操作转换后, 没有任何精度丢失情况强制类型转换 又叫做显式转换, 一般是由取值范围大的数据类型, 向取值范围小的数据类型转换转换过程需要强制操作, 不能自动完成转换后, 可能会出现精度丢失的情况byte a = 10;int b = a; // 由 byte 类型转型为 int 类型, 自动完成, 不需要任何额外操作 int c = 128;byte d = (byte)c; // 由 int 类型转型为 byte 类型, 强制操作, 会存在精度丢失额外说明 byte, short, char 类型的数据在进行运算的时候, 会自动的转型为int类型浮点型转整型, 会舍去小数点后面所有的内容, 只保留整数部分 1.2.12 常用运算符算术运算符 用来做基础的算术计算, + - * / % ++ -- 其中: % 和数学计算中没有区别两个整型的数据进行除法, 结果还是整型, 会将计算的浮点结果强转成整型 自增运算符++, 用在变量前, 表示是前取变量值, 后对这个变量进行+1操作自增运算符++, 用在变量后, 表示是先对这个变量进行+1操作, 然后再取变量值自减同自增int a = 10; int b = a++; // b的值是10 int c = ++b; // c的值是11 赋值运算符 =: 将等号右边的值, 给左边的变量进行赋值 +=, -=, *=, /=, %=: 组合运算符, 对一个变量进行运算 a += 10; 等价于 a = a + 10关系运算符 < >= <= == != 逻辑运算符 & : 与and | : 或or ! : 非not ^ : 异或xor, 计算逻辑: 两个相同为false, 两个不同为true && : 短路与, 结果与逻辑与相同. 区别在于: 如果前面的结果可以决定整体的运算结果, 后面的表达式不参与运算 || : 短路或, 结果与逻辑或相同. 区别在于: 如果前面的结果可以决定整体的运算结果, 后面的表达式不参与运算 位运算 位运算操作的是两个整型的数字, 计算的逻辑就是将两个整型的数字求出补码, 再对补码的每一位做类似于逻辑运算的操作, 其中 1 相当于 true, 0 相当于 false 三目运算符 condition ? expression1 : expression2 condition是一个boolean类型的变量, 或者一个boolean结果的表达式. 如果condition为true, 最终整体的结果取expression1, 否则, 结果取expression2

优秀的个人博客,低调大师

Spark 大规模机器学习官方文档 - 中文翻译

Spark官方文档 - 中文翻译 Spark版本:1.6.0 转载请注明出处:http://www.cnblogs.com/BYRans/ 1 概述(Overview) 2 引入Spark(Linking with Spark) 3 初始化Spark(Initializing Spark) 3.1 使用Spark Shell(Using the Shell) 4 弹性分布式数据集(RDDs) 4.4.1 如何选择存储级别(Which Storage Level to Choose?) 4.4.2 移除数据(Removing Data) 4.3.1 基础(Basics) 4.3.2 把函数传递到Spark(Passing Functions to Spark) 4.3.3 理解闭包(Understanding closures) 4.3.4 操作键值对(Working with Key-Value Pairs) 4.3.5 Transformations 4.3.6 Actions 4.3.7 Shuffle操作(Shuffle operations) 4.3.3.1 示例(Example) 4.3.3.2 本地模式 VS 集群模式(Local vs. cluster modes) 4.3.3.3 打印RDD的元素(Printing elements of an RDD) 4.3.7.1 背景(Background) 4.3.7.2 性能影响(Performance Impact) 4.1 并行集合(Parallelized Collections) 4.2 外部数据库(External Datasets) 4.3 RDD操作(RDD Operations) 4.4 RDD持久化(RDD Persistence) 5 共享变量(Shared Variables) 5.1 广播变量(broadcast variables) 5.2 累加器(Accumulators) 6 将应用提交到集群(Deploying to a Cluster) 7 Java/Scala中启动Spark作业(Launching Spark jobs from Java / Scala) 8 单元测试(Unit Testing) 9 从Spark1.0之前的版本迁移(Migrating from pre-1.0 Versions of Spark) 10 下一步(Where to Go from Here) 1 概述(Overview) 总体来讲,每一个Spark驱动程序应用都由一个驱动程序组成,该驱动程序包含一个由用户编写的main方法,该方法会在集群上并行执行一些列并行计算操作。Spark最重要的一个概念是弹性分布式数据集,简称RDD(resilient distributed dataset )。RDD是一个数据容器,它将分布在集群上各个节点上的数据抽象为一个数据集,并且RDD能够进行一系列的并行计算操作。可以将RDD理解为一个分布式的List,该List的数据为分布在各个节点上的数据。RDD通过读取Hadoop文件系统中的一个文件进行创建,也可以由一个RDD经过转换得到。用户也可以将RDD缓存至内存,从而高效的处理RDD,提高计算效率。另外,RDD有良好的容错机制。 Spark另外一个重要的概念是共享变量(shared variables)。在并行计算时,可以方便的使用共享变量。在默认情况下,执行Spark任务时会在多个节点上并行执行多个task,Spark将每个变量的副本分发给各个task。在一些场景下,需要一个能够在各个task间共享的变量。Spark支持两种类型的共享变量: 广播变量(broadcast variables):将一个只读变量缓存到集群的每个节点上。例如,将一份数据的只读缓存分发到每个节点。 累加变量(accumulators):只允许add操作,用于计数、求和。 2 引入Spark(Linking with Spark) 在Spark 1.6.0上编写应用程序,支持使用Scala 2.10.X、Java 7+、Python 2.6+、R 3.1+。如果使用Java 8,支持lambda表达式(lambda expressions)。在编写Spark应用时,需要在Maven依赖中添加Spark,Spark的Maven Central为: groupId=org.apache.sparkartifactId=spark-core_2.10version=1.6.0 另外,如果Spark应用中需要访问HDFS集群,则需要在hadoop-client中添加对应版本的HDFS依赖: groupId=org.apache.hadoopartifactId=hadoop-clientversion=<your-hdfs-version> 最后,需要在程序中添加Spark类。代码如下: importorg.apache.spark.SparkContextimportorg.apache.spark.SparkConf (在Spark 1.3.0之前的版本,使用Scala语言编写Spark应用程序时,需要添加import org.apache.spark.SparkContext._来启用必要的隐式转换) 3 初始化Spark(Initializing Spark) 使用Scala编写Spark程序的需要做的第一件事就是创建一个SparkContext对象(使用Java语言时创建JavaSparkContext)。SparkContext对象指定了Spark应用访问集群的方式。创建SparkContext需要先创建一个SparkConf对象,SparkConf对象包含了Spark应用的一些列信息。代码如下: Scala valconf=newSparkConf().setAppName(appName).setMaster(master)newSparkContext(conf) java SparkConfconf=newSparkConf().setAppName(appName).setMaster(master); JavaSparkContextsc=newJavaSparkContext(conf); appName参数为应用程序在集群的UI上显示的名字。master为Spark、Mesos、YARN URL或local。使用local值时,表示在本地模式下运行程序。应用程序的执行模型也可以在使用spark-submit命令提交任务时进行指定。 3.1 使用Spark Shell(Using the Shell) 在Spark Shell下,一个特殊的SparkContext对象已经帮用户创建好,变量为sc。使用参数--master设置master参数值,使用参数--jars设置依赖包,多个jar包使用逗号分隔。可以使用--packages参数指定Maven坐标来添加依赖包,多个坐标使用逗号分隔。可以使用参数--repositories添加外部的repository。示例如下: 本地模式下,使用4个核运行Spark程序: $./bin/spark-shell--masterlocal[4] 将code.jar包添加到classpath: $./bin/spark-shell--masterlocal[4]--jarscode.jar 使用Maven坐标添加一个依赖: $./bin/spark-shell--masterlocal[4]--packages"org.example:example:0.1" 详细的Spark Shell参数描述请执行命令spark-shell --help。更多的spark-submit脚本请见spark-submit script。 4 弹性分布式数据集(RDDs) Spark最重要的一个概念就是RDD,RDD是一个有容错机制的元素容器,它可以进行并行运算操作。得到RDD的方式有两个: 通过并行化驱动程序中已有的一个集合而获得 通过外部存储系统(例如共享的文件系统、HDFS、HBase等)的数据集进行创建 4.1 并行集合(Parallelized Collections) 在驱动程序中,在一个已经存在的集合上(例如一个Scala的Seq)调用SparkContext的parallelize方法可以创建一个并行集合。集合里的元素将被复制到一个可被并行操作的分布式数据集中。下面为并行化一个保存数字1到5的集合示例: Scala valdata=Array(1,2,3,4,5)valdistData=sc.parallelize(data) Java List<Integer>data=Arrays.asList(1,2,3,4,5); JavaRDD<Integer>distData=sc.parallelize(data); 当分布式数据集创建之后,就可以进行并行操作。例如,可以调用方法distData.reduce((a,b) => a + b)求数组内元素的和。Spark支持的分布式数据集上的操作将在后面章节中详细描述。 并行集合的一个重要的参数是表示将数据划分为几个分区(partition)的分区数。Spark将在集群上每个数据分区上启动一个task。通常情况下,你可以在集群上为每个CPU设置2-4个分区。一般情况下,Spark基于集群自动设置分区数目。也可以手动进行设置,设置该参数需要将参数值作为第二参数传给parallelize方法,例如:sc.parallelize(data, 10)。注意:在代码中,部分位置使用术语slices(而不是partition),这么做的原因是为了保持版本的向后兼容性。 4.2 外部数据库(External Datasets) Spark可以通过Hadoop支持的外部数据源创建分布式数据集,Hadoop支持的数据源有本地文件系统、HDFS、Cassandra、HBase、Amazon S3、Spark支持的文本文件、SequenceFiles、HadoopInputFormat。 SparkContext的testFile方法可以创建文本文件RDD。使用这个方法需要传递文本文件的URI,URI可以为本机文件路径、hdfs://、s3n://等。该方法读取文本文件的每一行至容器中。示例如下: Scala scala>valdistFile=sc.textFile("data.txt") distFile:RDD[String]=MappedRDD@1d4cee08 Java JavaRDD<String>distFile=sc.textFile("data.txt"); 创建之后,distFile就可以进行数据集的通用操作。例如,使用map和reduce操作计算所有行的长度的总和:distFile.map(s => s.length).reduce((a, b) => a + b)。使用Spark读取文件需要注意一下几点: 程序中如果使用到本地文件路径,在其它worker节点上该文件必须在同一目录,并有访问权限。在这种情况下,可以将文件复制到所有的worker节点,也可以使用网络内的共享文件系统。 Spark所有的基于文件输入的方法(包括textFile),都支持文件夹、压缩文件、通配符。例如:textFile("/my/directory")、textFile("/my/directory/*.txt")、textFile("/my/directory/*.gz")。 textFile方法提供了一个可选的第二参数,用于控制文件的分区数。默认情况下,Spark为文件的每个块创建一个分区(块使用HDFS的默认值64MB),通过设置这个第二参数可以修改这个默认值。需要注意的是,分区数不能小于块数。 除了文本文件之外,Spark还支持其它的数据格式: SparkContext.wholeTextFiles能够读取指定目录下的许多小文本文件,返回(filename,content)对。而textFile只能读取一个文本文件,返回该文本文件的每一行。 对于SequenceFiles可以使用SparkContext的sequenceFile[K,V]方法,其中K是文件中key和value的类型。它们必须为像IntWritable和Text那样,是Hadoop的Writable接口的子类。另外,对于通用的Writable,Spark允许用户指定原生类型。例如,sequenceFile[Int,String]将自动读取IntWritable和Text。 对于其他Hadoop InputFormat,可以使用SparkContext.hadoopRDD方法,该方法接收任意类型的JobConf和输入格式类、键类型和值类型。可以像设置Hadoop job那样设置输入源。对于InputFormat还可以使用基于新版本MapReduce API(org.apache.hadoop.mapreduce)的SparkContext.newAPIHadoopRDD。(老版本接口为:SparkContext.newHadoopRDD) RDD.saveAsObjectFile和SparkContext.objectFile能够保存包含简单的序列化Java对象的RDD。但是这个方法不如Avro高效,Avro能够方便的保存任何RDD。 4.3 RDD操作(RDD Operations) RDD支持两种类型的操作: transformation:从一个RDD转换为一个新的RDD。 action:基于一个数据集进行运算,并返回RDD。 例如,map是一个transformation操作,map将数据集的每一个元素按指定的函数转换为一个RDD返回。reduce是一个action操作,reduce将RDD的所有元素按指定的函数进行聚合并返回结果给驱动程序(还有一个并行的reduceByKey能够返回一个分布式的数据集)。 Spark的所有transformation操作都是懒执行,它们并不立马执行,而是先记录对数据集的一系列transformation操作。在执行一个需要执行一个action操作时,会执行该数据集上所有的transformation操作,然后返回结果。这种设计让Spark的运算更加高效,例如,对一个数据集map操作之后使用reduce只返回结果,而不返回庞大的map运算的结果集。 默认情况下,每个转换的RDD在执行action操作时都会重新计算。即使两个action操作会使用同一个转换的RDD,该RDD也会重新计算。在这种情况下,可以使用persist方法或cache方法将RDD缓存到内存,这样在下次使用这个RDD时将会提高计算效率。在这里,也支持将RDD持久化到磁盘,或在多个节点上复制。 4.3.1 基础(Basics) 参考下面的程序,了解RDD的基本轮廓: Scala vallines=sc.textFile("data.txt")vallineLengths=lines.map(s=>s.length)valtotalLength=lineLengths.reduce((a,b)=>a+b) Java JavaRDD<String>lines=sc.textFile("data.txt"); JavaRDD<Integer>lineLengths=lines.map(s->s.length());inttotalLength=lineLengths.reduce((a,b)->a+b); 第一行通过读取一个文件创建了一个基本的RDD。这个数据集没有加载到内存,也没有进行其他的操作,变量lines仅仅是一个指向文件的指针。第二行为transformation操作map的结果。此时lineLengths也没有进行运算,因为map操作为懒执行。最后,执行action操作reduce。此时Spark将运算分隔成多个任务分发给多个机器,每个机器执行各自部分的map并进行本地reduce,最后返回运行结果给驱动程序。 如果在后面的运算中仍会用到lineLengths,可以将其缓存,在reduce操作之前添加如下代码,该persist操作将在lineLengths第一次被计算得到后将其缓存到内存: Scala lineLengths.persist() Java lineLengths.persist(StorageLevel.MEMORY_ONLY()); 4.3.2 把函数传递到Spark(Passing Functions to Spark) ScalaSpark的API,在很大程度上依赖于把驱动程序中的函数传递到集群上运行。这有两种推荐的实现方式: 使用匿名函数的语法,这可以让代码更加简洁。 使用全局单例对象的静态方法。比如,你可以定义函数对象objectMyFunctions,然后将该对象的MyFunction.func1方法传递给Spark,如下所示: objectMyFunctions{deffunc1(s:String):String={...} } myRdd.map(MyFunctions.func1) 注意:由于可能传递的是一个类实例方法的引用(而不是一个单例对象),在传递方法的时候,应该同时传递包含该方法的类对象。举个例子: classMyClass{deffunc1(s:String):String={...}defdoStuff(rdd:RDD[String]):RDD[String]={rdd.map(func1)} } 上面示例中,如果我们创建了一个类实例new MyClass,并且调用了实例的doStuff方法,该方法中的map操作调用了这个MyClass实例的func1方法,所以需要将整个对象传递到集群中。类似于写成:rdd.map(x=>this.func1(x))。 类似地,访问外部对象的字段时将引用整个对象: classMyClass{valfield="Hello" defdoStuff(rdd:RDD[String]):RDD[String]={rdd.map(x=>field+x)} } 等同于写成rdd.map(x=>this.field+x),引用了整个this。为了避免这种问题,最简单的方式是把field拷贝到本地变量,而不是去外部访问它: defdoStuff(rdd:RDD[String]):RDD[String]={valfield_=this.field rdd.map(x=>field_+x) } JavaSpark的API,在很大程度上依赖于把驱动程序中的函数传递到集群上运行。在Java中,函数由那些实现了org.apache.spark.api.java.function包中的接口的类表示。有两种创建这样的函数的方式: 在你自己的类中实现Function接口,可以是匿名内部类,或者命名类,并且传递类的一个实例到Spark。 在Java8中,使用lambda表达式来简明地定义函数的实现。 为了保持简洁性,本指南中大量使用了lambda语法,这在长格式中很容易使用所有相同的APIs。比如,我们可以把上面的代码写成: JavaRDD<String>lines=sc.textFile("data.txt"); JavaRDDlineLengths=lines.map(newFunctionInteger>(){publicIntegercall(Strings){returns.length();} });inttotalLength=lineLengths.reduce(newFunction2Integer,Integer>(){publicIntegercall(Integera,Integerb){returna+b;} }); 同样的功能,使用内联式的实现显得更为笨重繁琐,代码如下: classGetLengthimplementsFunctionInteger>{publicIntegercall(Strings){returns.length();} }classSumimplementsFunction2Integer,Integer>{publicIntegercall(Integera,Integerb){returna+b;} } JavaRDDlines=sc.textFile("data.txt"); JavaRDDlineLengths=lines.map(newGetLength());inttotalLength=lineLengths.reduce(newSum()); 注意,java中的内部匿名类,只要带有final关键字,就可以访问类范围内的变量。Spark也会把变量复制到每一个worker节点。 4.3.3 理解闭包(Understanding closures) 使用Spark的一个难点为:理解程序在集群中执行时变量和方法的生命周期。RDD操作可以在变量范围之外修改变量,这是一个经常导致迷惑的地方。比如下面的例子,使用foreach()方法增加计数器(counter)的值(类似的情况,在其他的RDD操作中经常出现)。 4.3.3.1 示例(Example) 参考下面简单的RDD元素求和示例,求和运算是否在同一个JVM中执行,其复杂度也不同。Spark可以在local模式下(--master = local[n])执行应用,也可以将该Spark应用提交到集群上执行(例如通过spark-submit提交到YARN): Scala varcounter=0varrdd=sc.parallelize(data)//Wrong:Don'tdothis!!rdd.foreach(x=>counter+=x)println("Countervalue:"+counter) Java intcounter=0; JavaRDD<Integer>rdd=sc.parallelize(data);//Wrong:Don'tdothis!!rdd.foreach(x->counter+=x);println("Countervalue:"+counter); 4.3.3.2 本地模式 VS 集群模式(Local vs. cluster modes) 在本地模式下仅有一个JVM,上面的代码将直接计算RDD中元素和,并存储到counter中。此时RDD和变量counter都在driver节点的同一内存空间中。 然而,在集群模式下,情况会变得复杂,上面的代码并不会按照预期的方式执行。为了执行这个job,Spark把处理RDD的操作分割成多个任务,每个任务将被一个executor处理。在执行之前,Spark首先计算闭包(closure)。闭包是必须对executor可见的变量和方法,在对RDD进行运算时将会用到这些变量和方法(在本例子中指foreach())。这个闭包会被序列化,并发送给每个executor。在local模式下,只有一个executor,所以所有的变量和方法都使用同一个闭包。在其他模式下情况跟local模式不一样,每个executor在不同的worker节点上运行,每个executor都有一个单独的闭包。 在这里,发送给每个executor的闭包内的变量是当前变量的副本,因此当counter在foreach中被引用时,已经不是在driver节点上的counter了。在driver节点的内存中仍然有一个counter,但这个counter对executors不可见。executor只能操作序列化的闭包中的counter副本。因此,最终counter的值仍然是0,因为所有对counter的操作都是在序列化的闭包内的counter上进行的。 在类似这种场景下,为了保证良好的行为确保,应该使用累加器。Spark中的累加器专门为在集群中多个节点间更新变量提供了一种安全机制。在本手册的累加器部分将对累加器进行详细介绍。 一般情况下,像环或本地定义方法这样的闭包结构,不应该用于更改全局状态。Spark不定义也不保证来自闭包外引用导致的对象变化行为。有些情况下,在local模式下可以正常运行的代码,在分布式模式下也许并不会像预期那样执行。在分布式下运行时,建议使用累加器定义一些全局集合。 4.3.3.3 打印RDD的元素(Printing elements of an RDD) 打印一个RDD的元素也是一个常用的语法,带引RDD元素可以使用方法rdd.foreach(println)或rdd.map(println)。在本地模式下,该方法将生成预期的输出并打印RDD所有的元素。然而,在集群模式下各个executor调用stdout,将结果打印到executor的stdout中。因为不是打印到driver节点上,所以在driver节点的stdout上不会看到这些输出。如果想将RDD的元素打印到driver节点上,可以使用collect()方法将RDD发送到driver节点上,然后再打印该RDD:rdd.collect().foreach(println)。这个操作可能会导致driver节点内存不足,因为collect()方法将RDD全部的数据都发送到一台节点上。如果仅仅打印RDD的部分元素,一个安全的方法是使用take()方法:rdd.take(100).foreach(println)。 4.3.4 操作键值对(Working with Key-Value Pairs) Spark大部分的RDD操作都是对任意类型的对象的,但是,有部分特殊的操作仅支持对键值对的RDD进行操作。最常用的是分布式“shuffle”操作,比如按照key将RDD的元素进行分组或聚集操作。 Scala在Scala中,包含Tuple2对象在内的RDD键值对操作,都是可以自动可用的(Tuple2对象是Scala语言内置的元组类型,可以通过简单的编写进行(a,b)创建)。键值对操作接口在PairRDDFunctions类中,该类中的接口自动使用RDD的元组。例如,在下面的代码中使用reduceByKey操作对键值对进行计数,计算每行的文本出现的次数: vallines=sc.textFile("data.txt")valpairs=lines.map(s=>(s,1))valcounts=pairs.reduceByKey((a,b)=>a+b) Java在Java中,键值对使用的是scala.Tuple2类。用户可以使用特定的map操作将JavaRDDs转换为JavaPairRDDs,例如mapToPair和flatMapToPair。JavaPairRDD拥有标准RDD和特殊键值对的方法。例如,在下面的代码中使用reduceByKey操作对键值对进行计数,计算每行的文本出现的次数: JavaRDD<String>lines=sc.textFile("data.txt"); JavaPairRDD<String,Integer>pairs=lines.mapToPair(s->newTuple2(s,1)); JavaPairRDD<String,Integer>counts=pairs.reduceByKey((a,b)->a+b); 我们还可以使用counts.sortByKey()按照字母顺序将键值对排序,使用counts.collect()将结果以一个数组的形式发送给driver节点。 注意,当在键值对操作中使用自定义对象作为key时,你必须保证自定义的equals()方法有一个对应的hashCode()方法。详细的细节,请阅读Object.hashCode() documentation。 4.3.5 Transformations 下面列出了Spark常用的transformation操作。详细的细节请参考RDD API文档(Scala、Java、Python、R)和键值对RDD方法文档(Scala、Java)。 map(func)将原来RDD的每个数据项,使用map中用户自定义的函数func进行映射,转变为一个新的元素,并返回一个新的RDD。 filter(func)使用函数func对原RDD中数据项进行过滤,将符合func中条件的数据项组成新的RDD返回。 flatMap(func)类似于map,但是输入数据项可以被映射到0个或多个输出数据集合中,所以函数func的返回值是一个数据项集合而不是一个单一的数据项。 mapPartitions(func)类似于map,但是该操作是在每个分区上分别执行,所以当操作一个类型为T的RDD时func的格式必须是Iterator<T> => Iterator<U>。即mapPartitions需要获取到每个分区的迭代器,在函数中通过这个分区的迭代器对整个分区的元素进行操作。 mapPartitionsWithIndex(func)类似于mapPartitions,但是需要提供给func一个整型值,这个整型值是分区的索引,所以当处理T类型的RDD时,func的格式必须为(Int, Iterator<T>) => Iterator<U>。 sample(withReplacement, fraction, seed)对数据采样。用户可以设定是否有放回(withReplacement)、采样的百分比(fraction)、随机种子(seed)。 union(otherDataset)返回原数据集和参数指定的数据集合并后的数据集。使用union函数时需要保证两个RDD元素的数据类型相同,返回的RDD数据类型和被合并的RDD元素数据类型相同。该操作不进行去重操作,返回的结果会保存所有元素。如果想去重,可以使用distinct()。 intersection(otherDataset)返回两个数据集的交集。 distinct([numTasks]))将RDD中的元素进行去重操作。 groupByKey([numTasks])操作(K,V)格式的数据集,返回 (K, Iterable)格式的数据集。注意,如果分组是为了按key进行聚合操作(例如,计算sum、average),此时使用reduceByKey或aggregateByKey计算效率会更高。注意,默认情况下,并行情况取决于父RDD的分区数,但可以通过参数numTasks来设置任务数。 reduceByKey(func, [numTasks])使用给定的func,将(K,V)对格式的数据集中key相同的值进行聚集,其中func的格式必须为(V,V) => V。可选参数numTasks可以指定reduce任务的数目。 aggregateByKey(zeroValue)(seqOp, combOp,[numTasks])对(K,V)格式的数据按key进行聚合操作,聚合时使用给定的合并函数和一个初试值,返回一个(K,U)对格式数据。需要指定的三个参数:zeroValue为在每个分区中,对key值第一次读取V类型的值时,使用的U类型的初始变量;seqOp用于在每个分区中,相同的key中V类型的值合并到zeroValue创建的U类型的变量中。combOp是对重新分区后两个分区中传入的U类型数据的合并函数。 sortByKey([ascending], [numTasks])(K,V)格式的数据集,其中K已实现了Ordered,经过sortByKey操作返回排序后的数据集。指定布尔值参数ascending来指定升序或降序排列。 join(otherDataset, [numTasks])用于操作两个键值对格式的数据集,操作两个数据集(K,V)和(K,W)返回(K, (V, W))格式的数据集。通过leftOuterJoin、rightOuterJoin、fullOuterJoin完成外连接操作。 cogroup(otherDataset, [numTasks])用于操作两个键值对格式数据集(K,V)和(K,W),返回数据集格式为 (K,(Iterable, Iterable)) 。这个操作也称为groupWith。对在两个RDD中的Key-Value类型的元素,每个RDD相同Key的元素分别聚合为一个集合,并且返回两个RDD中对应Key的元素集合的迭代器。 cartesian(otherDataset)对类型为T和U的两个数据集进行操作,返回包含两个数据集所有元素对的(T,U)格式的数据集。即对两个RDD内的所有元素进行笛卡尔积操作。 pipe(command, [envVars])以管道(pipe)方式将 RDD的各个分区(partition)使用 shell命令处理(比如一个 Perl或 bash脚本)。 RDD的元素会被写入进程的标准输入(stdin),将进程返回的一个字符串型 RDD(RDD of strings),以一行文本的形式写入进程的标准输出(stdout)中。 coalesce(numPartitions)把RDD的分区数降低到通过参数numPartitions指定的值。在得到的更大一些数据集上执行操作,会更加高效。 repartition(numPartitions)随机地对RDD的数据重新洗牌(Reshuffle),从而创建更多或更少的分区,以平衡数据。总是对网络上的所有数据进行洗牌(shuffles)。 repartitionAndSortWithinPartitions(partitioner)根据给定的分区器对RDD进行重新分区,在每个结果分区中,按照key值对记录排序。这在每个分区中比先调用repartition再排序效率更高,因为它可以将排序过程在shuffle操作的机器上进行。 4.3.6 Actions 下面列出了Spark支持的常用的action操作。详细请参考RDD API文档(Scala、Java、Python、R)和键值对RDD方法文档(Scala、Java)。 reduce(func)使用函数func聚集数据集中的元素,这个函数func输入为两个元素,返回为一个元素。这个函数应该符合结合律和交换了,这样才能保证数据集中各个元素计算的正确性。 collect()在驱动程序中,以数组的形式返回数据集的所有元素。通常用于filter或其它产生了大量小数据集的情况。 count()返回数据集中元素的个数。 first()返回数据集中的第一个元素(类似于take(1))。 take(n)返回数据集中的前n个元素。 takeSample(withReplacement,num, [seed])对一个数据集随机抽样,返回一个包含num个随机抽样元素的数组,参数withReplacement指定是否有放回抽样,参数seed指定生成随机数的种子。 takeOrdered(n, [ordering])返回RDD按自然顺序或自定义顺序排序后的前n个元素。 saveAsTextFile(path)将数据集中的元素以文本文件(或文本文件集合)的形式保存到指定的本地文件系统、HDFS或其它Hadoop支持的文件系统中。Spark将在每个元素上调用toString方法,将数据元素转换为文本文件中的一行记录。 saveAsSequenceFile(path) (Java and Scala)将数据集中的元素以Hadoop Sequence文件的形式保存到指定的本地文件系统、HDFS或其它Hadoop支持的文件系统中。该操作只支持对实现了Hadoop的Writable接口的键值对RDD进行操作。在Scala中,还支持隐式转换为Writable的类型(Spark包括了基本类型的转换,例如Int、Double、String等等)。 saveAsObjectFile(path) (Java and Scala)将数据集中的元素以简单的Java序列化的格式写入指定的路径。这些保存该数据的文件,可以使用SparkContext.objectFile()进行加载。 countByKey()仅支持对(K,V)格式的键值对类型的RDD进行操作。返回(K,Int)格式的Hashmap,(K,Int)为每个key值对应的记录数目。 foreach(func)对数据集中每个元素使用函数func进行处理。该操作通常用于更新一个累加器(Accumulator)或与外部数据源进行交互。注意:在foreach()之外修改累加器变量可能引起不确定的后果。详细介绍请阅读Understanding closures部分。 4.3.7 Shuffle操作(Shuffle operations) Spark内的一个操作将会触发shuffle事件。shuffle是Spark将多个分区的数据重新分组重新分布数据的机制。shuffle是一个复杂且代价较高的操作,它需要完成将数据在executor和机器节点之间进行复制的工作。 4.3.7.1 背景(Background) 通过reduceByKey操作的例子,来理解shuffle过程。reduceByKey操作生成了一个新的RDD,原始数据中相同key的所有记录的聚合值合并为一个元组,这个元组中的key对应的值为执行reduce函数之后的结果。这个操作的挑战是,key相同的所有记录不在同一各分区种,甚至不在同一台机器上,但是该操作必须将这些记录联合运算。 在Spark中,通常一条数据不会垮分区分布,除非为了一个特殊的操作在必要的地方才会跨分区分布。在计算过程中,一个分区由一个task进行处理。因此,为了组织所有的数据让一个reduceByKey任务执行,Spark需要进行一个all-to-all操作。all-to-all操作需要读取所有分区上的数据的所有的key,以及key对应的所有的值,然后将多个分区上的数据进行汇总,并将每个key对应的多个分区的数据进行计算得出最终的结果,这个过程称为shuffle。 虽然每个分区中新shuffle后的数据元素是确定的,分区间的顺序也是确定的,但是所有的元素是无序的。如果想在shuffle操作后将数据按指定规则进行排序,可以使用下面的方法: 使用mapPartitions操作在每个分区上进行排序,排序可以使用.sorted等方法。 使用repartitionAndSortWithinPartitions操作在重新分区的同时高效的对分区进行排序。 使用sortBy将RDD进行排序。 会引起shuffle过程的操作有: repartition操作,例如:repartition、coalesce ByKey操作(除了counting相关操作),例如:groupByKey、reduceByKey join操作,例如:cogroup、join 4.3.7.2 性能影响(Performance Impact) shuffle是一个代价比较高的操作,它涉及磁盘IO、数据序列化、网络IO。为了准备shuffle操作的数据,Spark启动了一系列的map任务和reduce任务,map任务完成数据的处理工作,reduce完成map任务处理后的数据的收集工作。这里的map、reduce来自MapReduce,跟Spark的map操作和reduce操作没有关系。 在内部,一个map任务的所有结果数据会保存在内存,直到内存不能全部存储为止。然后,这些数据将基于目标分区进行排序并写入一个单独的文件中。在reduce时,任务将读取相关的已排序的数据块。 某些shuffle操作会大量消耗堆内存空间,因为shuffle操作在数据转换前后,需要在使用内存中的数据结构对数据进行组织。需要特别说明的是,reduceByKey和aggregateByKey在map时会创建这些数据结构,ByKey操作在reduce时创建这些数据结构。当内存满的时候,Spark会把溢出的数据存到磁盘上,这将导致额外的磁盘IO开销和垃圾回收开销的增加。 shuffle操作还会在磁盘上生成大量的中间文件。在Spark 1.3中,这些文件将会保留至对应的RDD不在使用并被垃圾回收为止。这么做的好处是,如果在Spark重新计算RDD的血统关系(lineage)时,shuffle操作产生的这些中间文件不需要重新创建。如果Spark应用长期保持对RDD的引用,或者垃圾回收不频繁,这将导致垃圾回收的周期比较长。这意味着,长期运行Spark任务可能会消耗大量的磁盘空间。临时数据存储路径可以通过SparkContext中设置参数spark.local.dir进行配置。 shuffle操作的行为可以通过调节多个参数进行设置。详细的说明请看Configuration Guide中的“Shuffle Behavior”部分。 4.4 RDD持久化(RDD Persistence) Spark中一个很重要的能力是将数据持久化(或称为缓存),在多个操作间都可以访问这些持久化的数据。当持久化一个RDD时,每个节点会将本节点计算的数据块存储到内存,在该数据上的其他action操作将直接使用内存中的数据。这样会让以后的action操作计算速度加快(通常运行速度会加速10倍)。缓存是迭代算法和快速的交互式使用的重要工具。 RDD可以使用persist()方法或cache()方法进行持久化。数据将会在第一次action操作时进行计算,并在各个节点的内存中缓存。Spark的缓存具有容错机制,如果一个缓存的RDD的某个分区丢失了,Spark将按照原来的计算过程,自动重新计算并进行缓存。 另外,每个持久化的RDD可以使用不同的存储级别进行缓存,例如,持久化到磁盘、已序列化的Java对象形式持久化到内存(可以节省空间)、跨节点间复制、以off-heap的方式存储在 Tachyon。这些存储级别通过传递一个StorageLevel对象(Scala、Java、Python)给persist()方法进行设置。cache()方法是使用默认存储级别的快捷设置方法,默认的存储级别是StorageLevel.MEMORY_ONLY(将反序列化的对象存储到内存中)。详细的存储级别介绍如下: MEMORY_ONLY:将RDD以反序列化Java对象的形式存储在JVM中。如果内存空间不够,部分数据分区将不再缓存,在每次需要用到这些数据时重新进行计算。这是默认的级别。 MEMORY_AND_DISK:将RDD以反序列化Java对象的形式存储在JVM中。如果内存空间不够,将未缓存的数据分区存储到磁盘,在需要使用这些分区时从磁盘读取。 MEMORY_ONLY_SER:将RDD以序列化的Java对象的形式进行存储(每个分区为一个byte数组)。这种方式会比反序列化对象的方式节省很多空间,尤其是在使用fast serializer时会节省更多的空间,但是在读取时会增加CPU的计算负担。 MEMORY_AND_DISK_SER:类似于MEMORY_ONLY_SER,但是溢出的分区会存储到磁盘,而不是在用到它们时重新计算。 DISK_ONLY:只在磁盘上缓存RDD。 MEMORY_ONLY_2,MEMORY_AND_DISK_2,等等:与上面的级别功能相同,只不过每个分区在集群中两个节点上建立副本。 OFF_HEAP (实验中):以序列化的格式 (serialized format) 将 RDD存储到Tachyon。相比于MEMORY_ONLY_SER, OFF_HEAP 降低了垃圾收集(garbage collection)的开销,使得 executors变得更小,而且共享了内存池,在使用大堆(heaps)和多应用并行的环境下有更好的表现。此外,由于 RDD存储在Tachyon中, executor的崩溃不会导致内存中缓存数据的丢失。在这种模式下, Tachyon中的内存是可丢弃的。因此,Tachyon不会尝试重建一个在内存中被清除的分块。如果你打算使用Tachyon进行off heap级别的缓存,Spark与Tachyon当前可用的版本相兼容。详细的版本配对使用建议请参考Tachyon的说明。 注意,在Python中,缓存的对象总是使用Pickle进行序列化,所以在Python中不关心你选择的是哪一种序列化级别。 在shuffle操作中(例如reduceByKey),即便是用户没有调用persist方法,Spark也会自动缓存部分中间数据。这么做的目的是,在shuffle的过程中某个节点运行失败时,不需要重新计算所有的输入数据。如果用户想多次使用某个RDD,强烈推荐在该RDD上调用persist方法。 4.4.1 如何选择存储级别(Which Storage Level to Choose?) Spark的存储级别的选择,核心问题是在内存使用率和CPU效率之间进行权衡。建议按下面的过程进行存储级别的选择: 如果使用默认的存储级别(MEMORY_ONLY),存储在内存中的RDD没有发生溢出,那么就选择默认的存储级别。默认存储级别可以最大程度的提高CPU的效率,可以使在RDD上的操作以最快的速度运行。 如果内存不能全部存储RDD,那么使用MEMORY_ONLY_SER,并挑选一个快速序列化库将对象序列化,以节省内存空间。使用这种存储级别,计算速度仍然很快。 除了在计算该数据集的代价特别高,或者在需要过滤大量数据的情况下,尽量不要将溢出的数据存储到磁盘。因为,重新计算这个数据分区的耗时与从磁盘读取这些数据的耗时差不多。 如果想快速还原故障,建议使用多副本存储界别(例如,使用Spark作为web应用的后台服务,在服务出故障时需要快速恢复的场景下)。所有的存储级别都通过重新计算丢失的数据的方式,提供了完全容错机制。但是多副本级别在发生数据丢失时,不需要重新计算对应的数据库,可以让任务继续运行。 在高内存消耗或者多任务的环境下,还处于实验性的OFF_HEAP模式有下列几个优势: 它支持多个executor使用Tachyon中的同一个内存池。 它显著减少了内存回收的代价。 如果个别executor崩溃掉,缓存的数据不会丢失。 4.4.2 移除数据(Removing Data) Spark自动监控各个节点上的缓存使用率,并以最近最少使用的方式(LRU)将旧数据块移除内存。如果想手动移除一个RDD,而不是等待该RDD被Spark自动移除,可以使用RDD.unpersist()方法。 5 共享变量(Shared Variables) 通常情况下,一个传递给Spark操作(例如map或reduce)的方法是在远程集群上的节点执行的。方法在多个节点执行过程中使用的变量,是同一份变量的多个副本。这些变量的以副本的方式拷贝到每个机器上,各个远程机器上变量的更新并不会传回driver程序。然而,为了满足两种常见的使用场景,Spark提供了两种特定类型的共享变量:广播变量(broadcast variables)和累加器(accumulators)。 5.1 广播变量(broadcast variables) 广播变量允许编程者将一个只读变量缓存到每台机器上,而不是给每个任务传递一个副本。例如,广播变量可以用一种高效的方式给每个节点传递一份比较大的数据集副本。在使用广播变量时,Spark也尝试使用高效广播算法分发变量,以降低通信成本。 Spark的action操作是通过一些列的阶段(stage)进行执行的,这些阶段(stage)是通过分布式的shuffle操作进行切分的。Spark自动广播在每个阶段内任务需要的公共数据。这种情况下广播的数据使用序列化的形式进行缓存,并在每个任务在运行前进行反序列化。这明确说明了,只有在跨越多个阶段的多个任务任务会使用相同的数据,或者在使用反序列化形式的数据特别重要的情况下,使用广播变量会有比较好的效果。 广播变量通过在一个变量v上调用SparkContext.broadcast(v)方法进行创建。广播变量是v的一个封装器,可以通过value方法访问v的值。代码示例如下: Scala scala>valbroadcastVar=sc.broadcast(Array(1,2,3)) broadcastVar:org.apache.spark.broadcast.Broadcast[Array[Int]]=Broadcast(0) scala>broadcastVar.valueres0:Array[Int]=Array(1,2,3) Java Broadcast<int[]>broadcastVar=sc.broadcast(newint[]{1,2,3}); broadcastVar.value();//returns[1,2,3] 广播变量创建之后,在集群上执行的所有的函数中,应该使用该广播变量代替原来的v值。所以,每个节点上的v最多分发一次。另外,对象v在广播后不应该再被修改,以保证分发到所有的节点上的广播变量有同样的值(例如,在分发广播变量之后,又对广播变量进行了修改,然后又需要将广播变量分发到新的节点)。 5.2 累加器(Accumulators) 累加器只允许关联操作进行"added"操作,因此在并行计算中可以支持特定的计算。累加器可以用于实现计数(类似在MapReduce中那样)或者求和。原生Spark支持数值型的累加器,编程者可以添加新的支持类型。创建累加器并命名之后,在Spark的UI界面上将会显示该累加器。这样可以帮助理解正在运行的阶段的运行情况(注意,在Python中还不支持)。 一个累加器可以通过在原始值v上调用SparkContext.accumulator(v)。然后,集群上正在运行的任务就可以使用add方法或+=操作对该累加器进行累加操作。只有driver程序可以读取累加器的值,读取累加器的值使用value方法。下面代码将数组中的元素进行求和: Scala scala>valaccum=sc.accumulator(0,"MyAccumulator") accum:spark.Accumulator[Int]=0scala>sc.parallelize(Array(1,2,3,4)).foreach(x=>accum+=x) ...10/09/2918:41:08INFOSparkContext:Tasksfinishedin0.317106s scala>accum.valueres2:Int=10 Java Accumulator<Integer>accum=sc.accumulator(0); sc.parallelize(Arrays.asList(1,2,3,4)).foreach(x->accum.add(x));//...//10/09/2918:41:08INFOSparkContext:Tasksfinishedin0.317106saccum.value();//returns10 上面的代码示例使用的是Spark内置的Int类型的累加器,开发者可以通过集成AccumulatorParam类创建新的累加器类型。AccumulatorParam接口有两个方法:zero方法和addInPlace方法。zero方法给数据类型提供了一个0值,addInPlace方法能够将两个值进行累加。例如,假设我们有一个表示数学上向量的Vector类,我们可以写成: Scala objectVectorAccumulatorParamextendsAccumulatorParam[Vector]{defzero(initialValue:Vector):Vector={Vector.zeros(initialValue.size) }defaddInPlace(v1:Vector,v2:Vector):Vector={ v1+=v2 } }//Then,createanAccumulatorofthistype:valvecAccum=sc.accumulator(newVector(...))(VectorAccumulatorParam) Java classVectorAccumulatorParamimplementsAccumulatorParam<Vector>{publicVectorzero(VectorinitialValue){returnVector.zeros(initialValue.size()); }publicVectoraddInPlace(Vectorv1,Vectorv2){ v1.addInPlace(v2);returnv1; } }//Then,createanAccumulatorofthistype:Accumulator<Vector>vecAccum=sc.accumulator(newVector(...),newVectorAccumulatorParam()); Spark也支持使用更通用的 Accumulable接口去累加数据,其结果数据的类型和累加的元素类型不同(例如,通过收集数据元素创建一个list)。在Scala中,SparkContext.accumulableCollection方法可用于累加常用的Scala集合类型。 累加器的更新只发生在action操作中,Spark保证每个任务只能更新累加器一次,例如重新启动一个任务,该重启的任务不允许更新累加器的值。在transformation用户需要注意的是,如果任务过job的阶段重新执行,每个任务的更新操作将会执行多次。 累加器没有改变Spark懒执行的模式。如果累加器在RDD中的一个操作中进行更新,该累加器的值只在该RDD进行action操作时进行更新。因此,在一个像map()这样的转换操作中,累加器的更新并没有执行。下面的代码片段证明了这个特性: Scala valaccum=sc.accumulator(0) data.map{x=>accum+=x;f(x)}//Here,accumisstill0becausenoactionshavecausedthe<code>map</code>tobecomputed. Java Accumulator<Integer>accum=sc.accumulator(0); data.map(x->{accum.add(x);returnf(x);});//Here,accumisstill0becausenoactionshavecausedthe`map`tobecomputed. 6 将应用提交到集群(Deploying to a Cluster) 应用提交手册描述了如何将应用提交到集群。简单的说,当你将你的应用打包成一个JAR(Java/Scala)或者一组.py或.zip文件(Python)后,就可以通过bin/spark-submit脚本将脚本提交到集群支持的管理器中。 7 Java/Scala中启动Spark作业(Launching Spark jobs from Java / Scala) 使用org.apache.spark.launcher包提供的简单的Java API,可以将Spark作业以该包中提供的类的子类的形式启动。 8 单元测试(Unit Testing) Spark可以友好的使用流行的单元测试框架进行单元测试。在test中简单的创建一个SparkContext,master的URL设置为local,运行几个操作,然后调用SparkContext.stop()将该作业停止。因为Spark不支持在同一个程序中运行两个context,所以需要请确保使用finally块或者测试框架的tearDown方法将context停止。 9 从Spark1.0之前的版本迁移(Migrating from pre-1.0 Versions of Spark) Spark 1.0冻结了1.X系列的Spark核的API,因此,当前没有标记为"experimental"或者“developer API”的API都将在未来的版本中进行支持。 Scala的变化 对于Scala的变化是,分组操作(例如groupByKey、cogroup和join)的返回类型由(Key,Seq[Value])变为(Key,Iterable[Value])。 Java API的变化 1.0中org.apache.spark.api.java.function类中的Function类变成了接口,这意味着旧的代码中extends Function应该改为implement Function。 增加了新的map型操作,例如mapToPair和mapToDouble,增加的这些操作可用于创建特殊类型的RDD。 分组操作(例如groupByKey、cogroup和join)的返回类型由(Key,Seq[Value])变为(Key,Iterable[Value])。 这些迁移指导对Spark Streaming、MLlib和GraphX同样有效。 10 下一步(Where to Go from Here) 你可以在Spark网站看一些Spark编程示例。另外,Spark在examples目录下包含了许多例子(Scala、Java、Python、R)。运行Java和Scala例子,可以通过将例子的类名传给Spark的bin/run-example脚本进行启动。例如: ./bin/run-exampleSparkPi Python示例,使用spark-submit命令提交: ./bin/spark-submitexamples/src/main/python/pi.py R示例,使用spark-submit命令提交: ./bin/spark-submitexamples/src/main/r/dataframe.R 在configuration和tuning手册中,有许多优化程序的实践。这些优化建议,能够确保你的数据以高效的格式存储在内存中。对于部署的帮助信息,请阅读cluster mode overview,该文档描述了分布式操作和支持集群管理器的组件。 最后,完整的API文档请查阅Scala、Java、Python、R。

优秀的个人博客,低调大师

go 学习笔记之解读什么是defer延迟函数

Go 语言中有个 defer 关键字,常用于实现延迟函数来保证关键代码的最终执行,常言道: "未雨绸缪方可有备无患". 延迟函数就是这么一种机制,无论程序是正常返回还是异常报错,只要存在延迟函数都能保证这部分关键逻辑最终执行,所以用来做些资源清理等操作再合适不过了. 出入成双有始有终 日常开发编程中,有些操作总是成双成对出现的,有开始就有结束,有打开就要关闭,还有一些连续依赖关系等等. 一般来说,我们需要控制结束语句,在合适的位置和时机控制结束语句,手动保证整个程序有始有终,不遗漏清理收尾操作. 最常见的拷贝文件操作大致流程如下: 打开源文件 srcFile, err := os.Open("fib.txt") if err != nil { t.Error(err) return } 创建目标文件 dstFile, err := os.Create("fib.txt.bak") if err != nil { t.Error(err) return } 拷贝源文件到目标文件 io.Copy(dstFile, srcFile) 关闭目标文件 dstFile.Close() srcFile.Close() 关闭源文件 srcFile.Close() 值得注意的是: 这种拷贝文件的操作需要特别注意操作顺序而且也不要忘记释放资源,比如先打开再关闭等等! func TestCopyFileWithoutDefer(t *testing.T) { srcFile, err := os.Open("fib.txt") if err != nil { t.Error(err) return } dstFile, err := os.Create("fib.txt.bak") if err != nil { t.Error(err) return } io.Copy(dstFile, srcFile) dstFile.Close() srcFile.Close() } > 「雪之梦技术驿站」: 上述代码逻辑还是清晰简单的,可能不会忘记释放资源也能保证操作顺序,但是如果逻辑代码比较复杂的情况,这时候就有一定的实现难度了! 可能是为了简化类似代码的逻辑,Go 语言引入了 defer 关键字,创造了"延迟函数"的概念. 无 defer 的文件拷贝 func TestCopyFileWithoutDefer(t *testing.T) { if srcFile, err := os.Open("fib.txt"); err != nil { t.Error(err) return } else { if dstFile,err := os.Create("fib.txt.bak");err != nil{ t.Error(err) return }else{ io.Copy(dstFile,srcFile) dstFile.Close() srcFile.Close() } } } 有 defer 的文件拷贝 func TestCopyFileWithDefer(t *testing.T) { if srcFile, err := os.Open("fib.txt"); err != nil { t.Error(err) return } else { defer srcFile.Close() if dstFile, err := os.Create("fib.txt.bak"); err != nil { t.Error(err) return } else { defer dstFile.Close() io.Copy(dstFile, srcFile) } } } 上述示例代码简单展示了 defer 关键字的基本使用方式,显著的好处在于 Open/Close 是一对操作,不会因为写到最后而忘记 Close 操作,而且连续依赖时也能正常保证延迟时机. 简而言之,如果函数内部存在连续依赖关系,也就是说创建顺序是 A-&gt;B-&gt;C 而销毁顺序是 C-&gt;B-&gt;A.这时候使用 defer 关键字最合适不过. 懒人福音延迟函数 > 官方文档相关表述见 Defer statements 如果没有 defer 延迟函数前,普通函数正常运行: func TestFuncWithoutDefer(t *testing.T) { // 「雪之梦技术驿站」: 正常顺序 t.Log("「雪之梦技术驿站」: 正常顺序") // 1 2 t.Log(1) t.Log(2) } 当添加 defer 关键字实现延迟后,原来的 1 被推迟到 2 后面而不是之前的 1 2 顺序. func TestFuncWithDefer(t *testing.T) { // 「雪之梦技术驿站」: 正常顺序执行完毕后才执行 defer 代码 t.Log(" 「雪之梦技术驿站」: 正常顺序执行完毕后才执行 defer 代码") // 2 1 defer t.Log(1) t.Log(2) } 如果存在多个 defer 关键字,执行顺序可想而知,越往后的越先执行,这样才能保证按照依赖顺序依次释放资源. func TestFuncWithMultipleDefer(t *testing.T) { // 「雪之梦技术驿站」: 猜测 defer 底层实现数据结构可能是栈,先进后出. t.Log(" 「雪之梦技术驿站」: 猜测 defer 底层实现数据结构可能是栈,先进后出.") // 3 2 1 defer t.Log(1) defer t.Log(2) t.Log(3) } 相信你已经明白了多个 defer 语句的执行顺序,那就测试一下吧! func TestFuncWithMultipleDeferOrder(t *testing.T) { // 「雪之梦技术驿站」: defer 底层实现数据结构类似于栈结构,依次倒叙执行多个 defer 语句 t.Log(" 「雪之梦技术驿站」: defer 底层实现数据结构类似于栈结构,依次倒叙执行多个 defer 语句") // 2 3 1 defer t.Log(1) t.Log(2) defer t.Log(3) } 初步认识了 defer 延迟函数的使用情况后,我们再结合文档详细解读一下相关定义. 英文原版文档 > A "defer" statement invokes a function whose execution is deferred to the moment the surrounding function returns,either because the surrounding function executed a return statement,reached the end of its function body,or because the corresponding goroutine is panicking. 中文翻译文档 > "defer"语句调用一个函数,该函数的执行被推迟到周围函数返回的那一刻,这是因为周围函数执行了一个return语句,到达了函数体的末尾,或者是因为相应的协程正在惊慌. 具体来说,延迟函数的执行时机大概分为三种情况: 周围函数执行return > because the surrounding function executed a return statement return 后面的 t.Log(4) 语句自然是不会运行的,程序最终输出结果为 3 2 1 说明了 defer 语句会在周围函数执行 return 前依次逆序执行. func funcWithMultipleDeferAndReturn() { defer fmt.Println(1) defer fmt.Println(2) fmt.Println(3) return fmt.Println(4) } func TestFuncWithMultipleDeferAndReturn(t *testing.T) { // 「雪之梦技术驿站」: defer 延迟函数会在包围函数正常return之前逆序执行. t.Log(" 「雪之梦技术驿站」: defer 延迟函数会在包围函数正常return之前逆序执行.") // 3 2 1 funcWithMultipleDeferAndReturn() } 周围函数到达函数体 > reached the end of its function body 周围函数的函数体运行到结尾前逆序执行多个 defer 语句,即先输出 3 后依次输出 2 1. 最终函数的输出结果是 3 2 1 ,也就说是没有 return 声明也能保证结束前执行完 defer 延迟函数. func funcWithMultipleDeferAndEnd() { defer fmt.Println(1) defer fmt.Println(2) fmt.Println(3) } func TestFuncWithMultipleDeferAndEnd(t *testing.T) { // 「雪之梦技术驿站」: defer 延迟函数会在包围函数到达函数体结尾之前逆序执行. t.Log(" 「雪之梦技术驿站」: defer 延迟函数会在包围函数到达函数体结尾之前逆序执行.") // 3 2 1 funcWithMultipleDeferAndEnd() } 当前协程正惊慌失措 > because the corresponding goroutine is panicking 周围函数万一发生 panic 时也会先运行前面已经定义好的 defer 语句,而 panic 后续代码因为没有特殊处理,所以程序崩溃了也就无法运行. 函数的最终输出结果是 3 2 1 panic ,如此看来 defer 延迟函数还是非常尽忠职守的,虽然心里很慌但还是能保证老弱病残先行撤退! func funcWithMultipleDeferAndPanic() { defer fmt.Println(1) defer fmt.Println(2) fmt.Println(3) panic("panic") fmt.Println(4) } func TestFuncWithMultipleDeferAndPanic(t *testing.T) { // 「雪之梦技术驿站」: defer 延迟函数会在包围函数panic惊慌失措之前逆序执行. t.Log(" 「雪之梦技术驿站」: defer 延迟函数会在包围函数panic惊慌失措之前逆序执行.") // 3 2 1 funcWithMultipleDeferAndPanic() } 通过解读 defer 延迟函数的定义以及相关示例,相信已经讲清楚什么是 defer 延迟函数了吧? 简单地说,延迟函数就是一种未雨绸缪的规划机制,帮助开发者编程程序时及时做好收尾善后工作,提前做好预案以准备随时应对各种情况. 当周围函数正常执行到到达函数体结尾时,如果发现存在延迟函数自然会逆序执行延迟函数. 当周围函数正常执行遇到return语句准备返回给调用者时,存在延迟函数时也会执行,同样满足善后清理的需求. 当周围函数异常运行不小心 panic 惊慌失措时,程序存在延迟函数也不会忘记执行,提前做好预案发挥了作用. 所以不论是正常运行还是异常运行,提前做好预案总是没错的,基本上可以保证万无一失,所以不妨考虑考虑 defer 延迟函数? 延迟函数应用场景 基本上成双成对的操作都可以使用延迟函数,尤其是申请的资源前后存在依赖关系时更应该使用 defer 关键字来简化处理逻辑. 下面举两个常见例子来说明延迟函数的应用场景. Open/Close 文件操作一般会涉及到打开和开闭操作,尤其是文件之间拷贝操作更是有着严格的顺序,只需要按照申请资源的顺序紧跟着defer 就可以满足资源释放操作. func readFileWithDefer(filename string) ([]byte, error) { f, err := os.Open(filename) if err != nil { return nil, err } defer f.Close() return ioutil.ReadAll(f) } Lock/Unlock 锁的申请和释放是保证同步的一种重要机制,需要申请多个锁资源时可能存在依赖关系,不妨尝试一下延迟函数! var mu sync.Mutex var m = make(map[string]int) func lookupWithDefer(key string) int { mu.Lock() defer mu.Unlock() return m[key] } 总结以及下节预告 defer 延迟函数是保障关键逻辑正常运行的一种机制,如果存在多个延迟函数的话,一般会按照逆序的顺序运行,类似于栈结构. 延迟函数的运行时机一般有三种情况: 周围函数遇到返回时 func funcWithMultipleDeferAndReturn() { defer fmt.Println(1) defer fmt.Println(2) fmt.Println(3) return fmt.Println(4) } 周围函数函数体结尾处 func funcWithMultipleDeferAndEnd() { defer fmt.Println(1) defer fmt.Println(2) fmt.Println(3) } 当前协程惊慌失措中 func funcWithMultipleDeferAndPanic() { defer fmt.Println(1) defer fmt.Println(2) fmt.Println(3) panic("panic") fmt.Println(4) } 本文主要介绍了什么是 defer 延迟函数,通过解读官方文档并配套相关代码认识了延迟函数,但是延迟函数中存在一些可能令人比较迷惑的地方. 读者不妨看一下下面的代码,将心里的猜想和实际运行结果比较一下,我们下次再接着分享,感谢你的阅读. func deferFuncWithAnonymousReturnValue() int { var retVal int defer func() { retVal++ }() return 0 } func deferFuncWithNamedReturnValue() (retVal int) { defer func() { retVal++ }() return 0 } 延伸阅读参考文档 Defer_statements go语言的defer语句 Go defer实现原理剖析 go语言 defer 你不知道的秘密! Go语言中defer的一些坑 go defer (go延迟函数) > 如果本文对你有所帮助,不用赞赏直接点赞就是最大的鼓励,顺便关注下微信公众号「 雪之梦技术驿站 」那就更好啦!

优秀的个人博客,低调大师

操作系统学习(二)--进程描述和执行

这是操作系统系列第 2 篇。 如果你想知道操作系统每天都在做些什么,那就打开你的资源监视器: 资源监视器截图,Windows 10 单独通过这一张图,我们就能够总结出操作系统的几个重要功能: 进程管理 线程管理 内存管理 I/O 管理(包含了磁盘调度) 文件管理,这一功能在图里没有表现出来,但我相信每个使用计算机的人都知道它。 为什么我要从进程开始讲起呢? 原因很简单,我们每天使用计算机,包括手机和电脑,本质上是使用运行在其操作系统上的应用程序。对于我们来讲,操作系统最为直观的功能就是进程管理,所以,让我们从进程管理入手,由表及里,一步步深挖操作系统的本质。 进程是什么? 我在第一篇文章里简单提到了进程这一概念,这里再详细讲一下,加深理解。 操作系统的设计从根本上来说是为了迎合用户需求,对个人用户来说,需求就是在一台计算机上运行多个应用程序,以满足生活和工作的需要。但应用程序这么多,不可能让每一个程序占用一个 CPU 核心啊,因为 CPU 核心是有限的,人的需求是无限的。 所以操作系统就需要将无限(夸张一下)的应用程序,分配到有限的 CPU 上去。 当我们打开一堆程序时,这些程序就会被加载到内存上,为了让这些运行的程序与没有打开的程序作区分,我们创造了进程(Process)这个名词。所以,进程就是对运行的程序的一种抽象,具有动态性。进程管理其实就是操作系统通过某种方式,管理我们已经打开的程序。 注:为了简化后面的讨论,我们假设所说的计算机是单核的。 进程的状态有哪几种? 讲到进程,我们必然需要了解进程状态,想要了解进程的状态,我们就得从进程的角度,看一看进程的一生会发生什么。 首先,用户打开某个应用程序,这个程序就处于新建态(New),这个时候操作系统还没有为这个程序做好准备工作,这个进程自身还没有进入内存,可能还留在磁盘里。 等到这个进程被加载进内存,就代表它已经准备好运行了,但因为 CPU 资源正被别的进程占用,它只能等待操作系统为它分配 CPU。这个状态称为就绪态(Ready)。 在就绪态一段时间后,总会分配到 CPU 资源,一旦进程开始执行,它就进入了运行态(Running)。 有的进程可能会执行某些阻塞操作,就拿 I/O 操作来举例子,执行操作后,进程需要等待 I/O 操作完成,第一篇文章讲过,进程在这段时间内是无法使用 CPU 的,如果让它继续占用 CPU,就造成了资源浪费。所以操作系统会剥夺它的 CPU 使用权,并把它放在阻塞态。等到 I/O 操作结束后,再将其放入就绪态。 还有最后一种状态——退出态,顾名思义,进程终止后,就会进入退出态,这个进程可能还没有从内存中清理出去。等到进程完全退出内存,进程的一生就彻底结束了。 综合以上的讨论,我们得到了进程的 5 种状态: 细心的读者可能会发现,运行态到阻塞态,以及阻塞态到就绪态之间的箭头是单向的。为什么? 先来看看运行态和阻塞态。回忆一下,一个进程处于阻塞态代表什么?(希望你看到这确实停下来思考了) 一个进程处于阻塞态,代表进程执行了某个阻塞操作,正在等待操作的结果。也就是说,处于阻塞态的进程没有使用 CPU 的能力,所以即使给了它 CPU 它也没办法运行,自然无法进入执行状态。所以从阻塞态是无法直接跳到运行态的。 再来说说就绪态和阻塞态。如果一个进程位于就绪态,说明它现在没有使用 CPU,所以更不可能执行阻塞操作。因此从就绪态也不能直接跳转到阻塞态。 结合刚刚的解释,我们来看看一个简单的进程的排队模型: 图中的 ABCDEF 代表了进程 要注意的是,就绪态,运行态和阻塞态提供了一种描述进程行为的系统方法,指导了操作系统的实现,许多实际的操作系统都是按照这样的三种状态进行具体构造的。但这不代表就没有其他状态的立足之地了,在一些实现中(其实是主流实现,但由于牵扯到虚拟内存的概念,所以留到以后讲解),还有挂起态等状态。但不管哪种状态,他们都是为了操作系统能够最大化利用计算机资源而抽象出来的。 留几个小问题做思考: 为什么只有从运行态才能转换到退出态? 你能看懂上图排队模型吗? PCB 是什么东西? 操作系统在管理和控制进程的时候,首先必须知道进程的位置(即进程被加载到哪一块内存了),其次,它还需要知道进程的属性,如进程 ID,进程状态等,所以我们就得有一个结构能够保存这些信息。 进程控制块(Process Control Block)就是这么一个结构。进程控制块会在程序启动时就被创建出来。 进程控制块的主要内容及内存映像(字丑勿怪) 我们可以看到,PCB 中存储着进程 ID,寄存器状态,栈指针等重要信息,这些信息现在看来非常陌生,但以后随着你对操作系统理解的加深,你就会理解这些信息的含义和作用。 图中还有一个信息,那就是 PCB 存储在内核空间——表明只有操作系统有权利更改 PCB 里面的内容。因为 PCB 太重要了,如果其内部信息被恶意修改,将造成进程意外终止,甚至可能导致操作系统的崩溃。 总结 让我们来串一下今天的内容: 如果一个程序开始运行,那么操作系统就会为其创建一个进程控制块,并将其加载到内存中,进程控制块内的「进程状态」信息会更改为就绪态,并将进程放入就绪队列等待分配 CPU。一旦分得 CPU,进程就进入运行态,根据实际情况,还可能因为执行阻塞操作而进入阻塞态,等到程序运行完毕,进程就被操作系统清出内存,然后删除其进程控制块。 如果看完上一段,对黑体字的概念还很模糊,那我建议你再慢慢看一遍文章,而且一定要带上自己的思考,没有经过思考的阅读的效率是很低的。 希望你在看完文章之后有所收获。感谢你的阅读,我们后会有期! 声明:原创文章,未经授权,禁止转载

优秀的个人博客,低调大师

MySQL5.7特性:JSON数据类型学习

概述 MySQL5.7的发行声明中,官方称之为里程碑式的版本,除了运行速度大幅度提升之外,还添加了之前版本没有的功能,如本文所述的原生JSON数据类型功能。在此版本之前,MySQL所有的JSON数据类型,全部是使用text等文本类型来实现的,数据的处理只能在应用代码级来实现,十分不方便。 什么是JSON类型 作为DBA,可能会对这个概念稍微有点陌生,但是对于开发者来说,这是一个十分熟悉的事物。 JSON(JavaScript Object Notation, JS 对象简谱) 是一种轻量级的数据交换格式。它基于 ECMAScript (欧洲计算机协会制定的js规范)的一个子集,采用完全独立于编程语言的文本格式来存储和表示数据。简洁和清晰的层次结构使得 JSON 成为理想的数据交换语言。 易于人阅读和编写,同时也易于机器解析和生成,并有效地提升网络传输效率。 MySQL原始JSON类型的优势在哪? 原生的JSON优势如下: 存储上类似text,可以存非常大的数据。 存储在JSON列中的JSON文档的自动验证 。无效的文档会产生错误。 优化的存储格式。存储在JSON列中的JSON文档将 转换为内部格式,以允许对文档元素进行快速读取访问。 相比于传统形式,不需要遍历所有字符串才能找到数据。 支持索引:通过虚拟列的功能可以对JSON中部分的数据进行索引。 MySQL的JSON类型 创建JSON类型表 创建一个基础的员工表,除了工号字段外,还有一个个人基础信息字段和一个个人能力信息字段 MySQL [test]> CREATE TABLE employee ( -> -> `empno` int(10) unsigned NOT NULL AUTO_INCREMENT, -> -> `basic_info` JSON NOT NULL, -> -> `skill_info` JSON NOT NULL, -> -> PRIMARY KEY (`empno`) -> -> ); Query OK, 0 rows affected (0.02 sec) 表的基础信息,其中JSON类型的字段,是不可以有默认值的,这点需要注意 MySQL [test]> desc employee; +------------+------------------+------+-----+---------+----------------+ | Field | Type | Null | Key | Default | Extra | +------------+------------------+------+-----+---------+----------------+ | empno | int(10) unsigned | NO | PRI | NULL | auto_increment | | basic_info | json | NO | | NULL | | | skill_info | json | NO | | NULL | | +------------+------------------+------+-----+---------+----------------+ 3 rows in set (0.00 sec) 试着插入几条数据 我们手动插入几条数据进这张表中,在前两条数据中,在个人能力信息上使用的是数组的方式,,后面两条则是使用对象的形式。在MySQL5.7.8版本后的JSON类型中,这两种都是可以的 INSERT INTO `employee` VALUES (1,'{"name": "wangyiyi", "age": "23" ,"from": "hangzhou"}', '["java", "go", "python"]'); INSERT INTO `employee` VALUES (2,'{"name": "linxue", "age": 24 ,"from": "shanghai"}', '["mysql", "oracle", "python"]'); INSERT INTO `employee` VALUES (3,'{"name": "zhaoqing", "age": 24 ,"from": "shanghai"}', '{"system": "linux","database": "mysql", "language": "python"}'); INSERT INTO `employee` VALUES (4,'{"name": "zhouxixi", "age": 30 ,"from": "nanjing"}', '{"system": ["linux","windows"],"database": ["mysql","oracle","postgresql"], "language": ["python","java","go"]}'); 插入多个数据后,表中内容为如下 MySQL [test]> select * from employee; +-------+-------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------+ | empno | basic_info | skill_info | +-------+-------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------+ | 1 | {"age": "23", "from": "hangzhou", "name": "wangyiyi"} | ["java", "go", "python"] | | 2 | {"age": 24, "from": "shanghai", "name": "linxue"} | ["mysql", "oracle", "python"] | | 3 | {"age": 24, "from": "shanghai", "name": "zhaoqing"} | {"system": "linux", "database": "mysql", "language": "python"} | | 4 | {"age": 30, "from": "nanjing", "name": "zhouxixi"} | {"system": ["linux", "windows"], "database": ["mysql", "oracle", "postgresql"], "language": ["python", "java", "go"]} | +-------+-------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------+ 4 rows in set (0.00 sec) json数据查询方式 在插入了json类型的数据之后,可以针对JSON类型做一些特定的查询,如查询年龄大于20的记录在SQL的语句中使用 字段->.键名 就可以查询出所对应的键值 MySQL [test]> select * from employee WHERE basic_info->'$.age'> 20; +-------+-------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------+ | empno | basic_info | skill_info | +-------+-------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------+ | 1 | {"age": "23", "from": "hangzhou", "name": "wangyiyi"} | ["java", "go", "python"] | | 2 | {"age": 24, "from": "shanghai", "name": "linxue"} | ["mysql", "oracle", "python"] | | 3 | {"age": 28, "from": "shanghai", "name": "zhaoqing"} | {"system": "linux", "database": "mysql", "language": "go"} | | 4 | {"age": 30, "from": "nanjing", "name": "zhouxixi"} | {"system": ["linux", "windows"], "database": ["mysql", "oracle", "postgresql"], "language": ["python", "java", "go"]} | +-------+-------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------+ 4 rows in set (0.00 sec) MySQL [test]> select * from employee WHERE basic_info->'$.age'< 20; Empty set (0.00 sec) 除了使用上述方式外,也可使用 提取json值的 函数 json_extract (使用函数的方式) MySQL [test]> select * from employee where json_extract(basic_info,'$.age') = 24; +-------+-----------------------------------------------------+----------------------------------------------------------------+ | empno | basic_info | skill_info | +-------+-----------------------------------------------------+----------------------------------------------------------------+ | 2 | {"age": 24, "from": "shanghai", "name": "linxue"} | ["mysql", "oracle", "python"] | | 3 | {"age": 24, "from": "shanghai", "name": "zhaoqing"} | {"system": "linux", "database": "mysql", "language": "python"} | +-------+-----------------------------------------------------+----------------------------------------------------------------+ 2 rows in set (0.00 sec) 对于数值查询也可做一个范围内查询,如下: MySQL [test]> select * from employee WHERE basic_info->'$.age' in (23, 24); +-------+-------------------------------------------------------+----------------------------------------------------------------+ | empno | basic_info | skill_info | +-------+-------------------------------------------------------+----------------------------------------------------------------+ | 1 | {"age": "23", "from": "hangzhou", "name": "wangyiyi"} | ["java", "go", "python"] | | 2 | {"age": 24, "from": "shanghai", "name": "linxue"} | ["mysql", "oracle", "python"] | | 3 | {"age": 24, "from": "shanghai", "name": "zhaoqing"} | {"system": "linux", "database": "mysql", "language": "python"} | +-------+-------------------------------------------------------+----------------------------------------------------------------+ 3 rows in set, 1 warning (0.00 sec) 因为 JSON 不同于字符串,所以如果用字符串和 JSON 字段比较,是不会相等的: 如下,直接使用字符串查询,查询不出来内容 MySQL [test]> select * from employee where basic_info = '{"age": 24, "from": "shanghai", "name": "linxue"}'; Empty set (0.00 sec) 可以通过 CAST 将字符串转成 JSON 的形式,如下: MySQL [test]> select * from employee where basic_info = CAST('{"age": 24, "from": "shanghai", "name": "linxue"}' AS JSON); +-------+---------------------------------------------------+-------------------------------+ | empno | basic_info | skill_info | +-------+---------------------------------------------------+-------------------------------+ | 2 | {"age": 24, "from": "shanghai", "name": "linxue"} | ["mysql", "oracle", "python"] | +-------+---------------------------------------------------+-------------------------------+ 1 row in set (0.00 sec) 查看单纯数组类型的函数JSON_CONTAINS MySQL [test]> select * from employee where JSON_CONTAINS (skill_info,'"mysql"'); +-------+---------------------------------------------------+-------------------------------+ | empno | basic_info | skill_info | +-------+---------------------------------------------------+-------------------------------+ | 2 | {"age": 24, "from": "shanghai", "name": "linxue"} | ["mysql", "oracle", "python"] | +-------+---------------------------------------------------+-------------------------------+ 1 row in set (0.00 sec) JSON_PRETTY函数: 以易于阅读的格式打印出JSON值便于在一些外部应用引用数据时,更方便的使用它 MySQL [test]> select JSON_PRETTY(basic_info) from employee; +---------------------------------------------------------------+ | JSON_PRETTY(basic_info) | +---------------------------------------------------------------+ | { "age": "23", "from": "hangzhou", "name": "wangyiyi" } | | { "age": 24, "from": "shanghai", "name": "linxue" } | | { "age": 28, "from": "shanghai", "name": "zhaoqing" } | | { "age": 30, "from": "nanjing", "name": "zhouxixi" } | +---------------------------------------------------------------+ 4 rows in set (0.00 sec) MySQL 5.7.22中添加了此功能,此函数返回用于存储JSON文档的二进制表示的字节数,用于查看当前JSON字段的存储大小 MySQL [test]> select skill_info,JSON_STORAGE_SIZE(skill_info) AS Size from employee; +-----------------------------------------------------------------------------------------------------------------------+------+ | skill_info | Size | +-----------------------------------------------------------------------------------------------------------------------+------+ | ["java", "go", "python"] | 29 | | ["mysql", "oracle", "python"] | 34 | | {"system": "linux", "database": "mysql", "language": "go"} | 63 | | {"system": ["linux", "windows"], "database": ["mysql", "oracle", "postgresql"], "language": ["python", "java", "go"]} | 137 | +-----------------------------------------------------------------------------------------------------------------------+------+ 4 rows in set (0.00 sec) 查询JSON字段的长度 MySQL [test]> select JSON_LENGTH(basic_info) from employee; +-------------------------+ | JSON_LENGTH(basic_info) | +-------------------------+ | 3 | | 3 | | 3 | | 3 | +-------------------------+ 4 rows in set (0.00 sec) 查看数据的类型:可以是对象,数组或标量类型 MySQL [test]> select JSON_TYPE(skill_info) from employee; +-----------------------+ | JSON_TYPE(skill_info) | +-----------------------+ | ARRAY | | ARRAY | | OBJECT | | OBJECT | +-----------------------+ 4 rows in set (0.00 sec) json数据修改方式 如果是整个 json 更新的话,和一般类型插入是一样的 json_array_insert是在指定下标插入,这是插入一般数组类型时的操作 MySQL [test]> SELECT json_array_insert(skill_info, '$[1]', 'php') from employee; +-----------------------------------------------------------------------------------------------------------------------+ | json_array_insert(skill_info, '$[1]', 'php') | +-----------------------------------------------------------------------------------------------------------------------+ | ["java", "php", "go", "python"] | | ["mysql", "php", "oracle", "python"] | | {"system": "linux", "database": "mysql", "language": "python"} | | {"system": ["linux", "windows"], "database": ["mysql", "oracle", "postgresql"], "language": ["python", "java", "go"]} | +-----------------------------------------------------------------------------------------------------------------------+ 4 rows in set (0.00 sec) 替换操作,也就是修改update操作,使用的是 json_replace 函数json_replace:只替换已经存在的旧值,不存在则忽略; MySQL [test]> update employee set skill_info = json_replace(skill_info, "$.language", "go") where empno = 3; Query OK, 1 row affected (0.00 sec) Rows matched: 1 Changed: 1 Warnings: 0 MySQL [test]> select * from employee where empno = 3; +-------+-----------------------------------------------------+------------------------------------------------------------+ | empno | basic_info | skill_info | +-------+-----------------------------------------------------+------------------------------------------------------------+ | 3 | {"age": 24, "from": "shanghai", "name": "zhaoqing"} | {"system": "linux", "database": "mysql", "language": "go"} | +-------+-----------------------------------------------------+------------------------------------------------------------+ 1 row in set (0.00 sec) json_set:替换旧值,并插入不存在的新值; MySQL [test]> update employee set basic_info = json_set(basic_info, "$.age", 28,"$.sex" ,"man") where empno = 3; Query OK, 1 row affected (0.01 sec) Rows matched: 1 Changed: 1 Warnings: 0 MySQL [test]> MySQL [test]> select * from employee where empno = 3; +-------+-------------------------------------------------------------------+------------------------------------------------------------+ | empno | basic_info | skill_info | +-------+-------------------------------------------------------------------+------------------------------------------------------------+ | 3 | {"age": 28, "sex": "man", "from": "shanghai", "name": "zhaoqing"} | {"system": "linux", "database": "mysql", "language": "go"} | +-------+-------------------------------------------------------------------+------------------------------------------------------------+ 1 row in set (0.00 sec) json_insert:插入新值,但不替换已经存在的旧值; MySQL [test]> update employee set basic_info = json_insert (basic_info, "$.age", 30, "$.phone" ,"123456789") where empno = 3; Query OK, 1 row affected (0.01 sec) Rows matched: 1 Changed: 1 Warnings: 0 MySQL [test]> select * from employee where empno = 3; +-------+-----------------------------------------------------------------------------------------+------------------------------------------------------------+ | empno | basic_info | skill_info | +-------+-----------------------------------------------------------------------------------------+------------------------------------------------------------+ | 3 | {"age": 28, "sex": "man", "from": "shanghai", "name": "zhaoqing", "phone": "123456789"} | {"system": "linux", "database": "mysql", "language": "go"} | +-------+-----------------------------------------------------------------------------------------+------------------------------------------------------------+ 1 row in set (0.00 sec) json_remove() 删除元素函数。 MySQL [test]> update employee set basic_info = json_remove (basic_info, "$.sex", "$.phone") where empno = 3; Query OK, 1 row affected (0.01 sec) Rows matched: 1 Changed: 1 Warnings: 0 MySQL [test]> MySQL [test]> select * from employee where empno = 3; +-------+-----------------------------------------------------+------------------------------------------------------------+ | empno | basic_info | skill_info | +-------+-----------------------------------------------------+------------------------------------------------------------+ | 3 | {"age": 28, "from": "shanghai", "name": "zhaoqing"} | {"system": "linux", "database": "mysql", "language": "go"} | +-------+-----------------------------------------------------+------------------------------------------------------------+ 1 row in set (0.00 sec) 结语 JSON数据类型是一个对开发十分友好的功能,有了它,MySQL的功能才更趋于完善。经常使用,会发现还有许多便捷的JSON函数能够在特定情况下帮到我们。详细信息也可以查看 https://dev.mysql.com/doc/refman/5.7/en/json-function-reference.html 云平-20190508

资源下载

更多资源
优质分享App

优质分享App

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

Nacos

Nacos

Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。

Sublime Text

Sublime Text

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。

用户登录
用户注册