首页 文章 精选 留言 我的

精选列表

搜索[快速入门],共10000篇文章
优秀的个人博客,低调大师

写给小白的 kubectl 入门

前言 意外又看到不少正在学 Kubernetes 新手。想想本人写过各种自己懂或不懂、信或不信的原理、机制、方法和工具等等各种东西,唯独没写过 kubectl,其实这东西也是值得一写的——比如说去年我才从一线同学的操作里学会用 -A 代替 --all-namespaces。理顺 kubectl 的用法,也会对 Kubernetes 的知识体系以及运维工作有很大的帮助。 对 Kubernetes 稍有了解的读者应该都知道声明式 API 的说法,kubectl 就是一个这种 API 的客户端,所以 kubectl 的主要功能就是用来操作对象的。 开局两张图 下图是个常见的使用方式: 其实本来想写主谓宾定状补的,后来想想还得复习一下,算了算了。 一般的 kubectl 使用都是这么个顺序,参数是可以调整位置的,暂且如此就可以了。 用一个思维导图来归纳一下: 动作 在 kubectl 中被称为 command 也就是命令。使用 kubectl --help 能看到可用的命令列表: $ kubectl --helpkubectl controls the Kubernetes cluster manager. Find more information at: https://kubernetes.io/docs/reference/kubectl/overview/Basic Commands (Beginner): create Create a resource from a file or from stdin.... run 在集群中运行一个指定的镜像...Basic Commands (Intermediate): explain 查看资源的文档 get 显示一个或更多 resources...Deploy Commands: rollout Manage the rollout of a resource... 可以看到 kubectl 的命令行帮助非常不错,不仅有功能说明、分类,还有难度标识,甚至有部分的中文说明,kubectl 的每个命令都可以用 --help 查看进一步的帮助说明。 这里列出了很多可用的命令,按照操作能力,主流命令基本可以分为增删改查(CRUD)四种。 C 新建命令用于在集群中创建对象,最常用的新建命令应该是 create、run 了,create 能够创建多种对象,而 run 则主要用来创建 Pod。这两个命令都需要在命令行中使用参数的方式来表达待创建的对象的字段内容,其表达力非常粗糙和有限,并且带有明显的命令式 API 风味,在我的日常工作中已经很少用到这样的命令了。 但是这种命令往往有个妙用,--dry-run=client(旧版本中是 --dry-run),可以在不产生实际操作的情况下,测试命令的输出,加上 -o yaml,可以帮助输出 YAML 文档。 R get 是最常用的查询指令,用于获取对象列表和基本信息,而 describe 则用于获取一个对象的详细信息。另外一个常用的读取指令就是 Debug 常用的日志查看指令:kubectl logs。 U 最重要的更新命令可以说是 apply,edit 了,patch、label、annotation、scale 等命令也算常用。 apply 是把 yaml 提交给 Kubernetes 集群的最常用方式,而 edit patch 都是用于修改线上负载的常用手段。label 和 annotation 命令则是用于修改对象元数据的,例如标签和注解。 D 这个没什么好说——delete 获取帮助 kubectl 的所有命令、子命令都支持 --help 参数,可以用这种方式获取帮助。 kubectl options 命令能够获取 kubectl 的所有全局参数。 常用参数 -f:很多指令(不只是 apply 和 create)都可以用 -f <文件名> 的方式进行输入,如果使用管道操作,则可以用参数 -f - 接收 STDIN 的输入。 -l:可以使用各种对象上的标签对操作范围进行过滤,例如 -l app=hello -o:指定输出格式,这个参数相对复杂,最常用的是 yaml 或者 json 用于输出机器报文,还可以用 JSON Path 或者 Go Template 对结果进行处理。 对象 对象通常是类型+名称的一个组合,可以用 kubectl 获得当前集群支持的对象类型: 如上图,输出内容包含几个列:名称、简称、API 群组、是否归属命名空间以及对象的 Kind 属性。例如常用的 Deployment: 名称:Deployment 简称:Deploy API 群组:apps 归属命名空间:是 Kind:Deployment 使用命令 kubectl get deploy,就能获得当前命名空间中的 Deployment 对象列表,如果在尾巴上加入 Deployment 的名称,就能得到符合名称要求的 Deployment 对象, Schema 前面提到的 -f 参数,或者是 get -o yaml,都要用到具体的对象数据结构,这个结构到底是哪里规定的呢?基本结构可以分为三个部分,以一个 Namespace 为例: apiVersion: v1kind: Namespacemetadata: name: defaultspec: finalizers: - kubernetes 一般会分为四个基础字段:apiVersion、kind、metadata、status 以及 spec。 apiVersion:格式为 <apiGroup>/<apiVersion>,一个对象的 API Group,可以用前文提到的 api-resources 命令查到,而版本则可以通过 kubectl api-versions 查询得到。 kind:对应 api-resources 命令输出的字段。 metadata:元数据,其中包括标签、注解、名称等字段,如果对象是属于命名空间的,也会把命名空间写在这里。 status:这个字段的内容通常是由 Kubenretes 自动填写的。经常会被省略掉。 spec:具体的对象内容,可以由几个途径获取其定义结构 部分资源可以使用 kubectl explain <对象类别> 获得解释 如果该资源在集群中有对象存在,可以使用 kubectl get <对象类别> <对象名称> -o yaml 的方式获得原文,向其致敬。 如果前两种方法都没有,就需要去查看 Kubernetes 或者第三方的 API Reference 了。 最后 看了上面的解释,是不是对 Kubernetes 的控制台操作有点底了? 文章转载自伪架构师。点击这里阅读原文了解更多。 CNCF概况(幻灯片) 扫描二维码联系我们! CNCF (Cloud Native Computing Foundation)成立于2015年12月,隶属于Linux Foundation,是非营利性组织。 CNCF(云原生计算基金会)致力于培育和维护一个厂商中立的开源生态系统,来推广云原生技术。我们通过将最前沿的模式民主化,让这些创新为大众所用。请长按以下二维码进行关注。 本文分享自微信公众号 - CNCF(lf_cncf)。如有侵权,请联系 support@oschina.cn 删除。本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

优秀的个人博客,低调大师

Redis Lua脚本完全入门

1. 前言 Redis是高性能的KV内存数据库,除了做缓存中间件的基本作用外还有很多用途,比如胖哥以前分享的Redis GEO地理位置信息计算。Redis提供了丰富的命令来供我们使用以实现一些计算。Redis的单个命令都是原子性的,有时候我们希望能够组合多个Redis命令,并让这个组合也能够原子性的执行,甚至可以重复使用,在软件热更新中也有一席之地。Redis开发者意识到这种场景还是很普遍的,就在2.6版本中引入了一个特性来解决这个问题,这就是Redis执行Lua脚本。 2. Lua Lua也算一门古老的语言了,玩魔兽世界的玩家应该对它不陌生,WOW的插件就是用Lua脚本编写的。在高并发的网络游戏中Lua大放异彩被广泛使用。 Lua广泛作为其它语言的嵌入脚本,尤其是C/C++,语法简单,小巧,源码一共才200多K,这可能也是Redis官方选择它的原因。 另一款明星软件Nginx也支持Lua,利用Lua也可以实现很多有用的功能。 3. Lua并不难 Redis 官方指南也指出不要在Lua脚本中编写过于复杂的逻辑。 为了实现一个功能就要学习一门语言,这看起来就让人有打退堂鼓的感觉。其实Lua并不难学,而且作为本文的场景来说我们不需要去学习Lua的完全特性,要在Redis中轻量级使用Lua语言。这对掌握了Java这种重量级语言的你来说根本不算难事。这里胖哥只对Redis中的涉及到的基本语法说一说。 Lua 的简单语法 Lua在Redis脚本中我个人建议只需要使用下面这几种类型: nil 空 boolean 布尔值 number 数字 string 字符串 table 表 声明类型 声明类型非常简单,不用携带类型。 --- 全局变量 name = 'felord.cn' --- 局部变量 local age = 18 Redis脚本在实践中不要使用全局变量,局部变量效率更高。 table 类型 前面四种非常好理解,第五种table需要简单说一下,它既是数组又类似Java中的HashMap(字典),它是Lua中仅有的数据结构。 数组不分具体类型,演示如下 Lua 5.1.5 Copyright (C) 1994-2012 Lua.org, PUC-Rio > arr_table = {'felord.cn','Felordcn',1} > print(arr_table[1]) felord.cn > print(arr_table[3]) 1 > print(#arr_table) 3 作为字典: Lua 5.1.5 Copyright (C) 1994-2012 Lua.org, PUC-Rio > arr_table = {name = 'felord.cn', age = 18} > print(arr_table['name']) felord.cn > print(arr_table.name) felord.cn > print(arr_table[1]) nil > print(arr_table['age']) 18 > print(#arr_table) 0 混合模式: Lua 5.1.5 Copyright (C) 1994-2012 Lua.org, PUC-Rio > arr_table = {'felord.cn','Felordcn',1,age = 18,nil} > print(arr_table[1]) felord.cn > print(arr_table[4]) nil > print(arr_table['age']) 18 > print(#arr_table) 3 ❗ # 取table的长度不一定精准,慎用。同时在Redis脚本中避免使用混合模式的table,同时元素应该避免包含空值nil。在不确定元素的情况下应该使用循环来计算真实的长度。 判断 判断非常简单,格式为: local a = 10 if a < 10 then print('a小于10') elseif a < 20 then print('a小于20,大于等于10') else print('a大于等于20') end 数组循环 local arr = {1,2,name='felord.cn'} for i, v in ipairs(arr) do print('i = '..i) print('v = '.. v) end print('-------------------') for i, v in pairs(arr) do print('p i = '..i) print('p v = '.. v) end 打印结果: i = 1 v = 1 i = 2 v = 2 ----------------------- p i = 1 p v = 1 p i = 2 p v = 2 p i = name p v = felord.cn 返回值 像Python一样,Lua也可以返回多个返回值。不过在Redis的Lua脚本中不建议使用此特性,如果有此需求请封装为数组结构。在Spring Data Redis中支持脚本的返回值规则可以从这里分析: public static ReturnType fromJavaType(@Nullable Class<?> javaType) { if (javaType == null) { return ReturnType.STATUS; } if (javaType.isAssignableFrom(List.class)) { return ReturnType.MULTI; } if (javaType.isAssignableFrom(Boolean.class)) { return ReturnType.BOOLEAN; } if (javaType.isAssignableFrom(Long.class)) { return ReturnType.INTEGER; } return ReturnType.VALUE; } 胖哥在实践中会使用 List、Boolean、Long三种,避免出现幺蛾子。 到此为止Redis Lua脚本所需要知识点就完了,其它的函数、协程等特性也不应该在Redis Lua脚本中出现,用到内置函数的话搜索查询一下就行了。 在接触一门新的技术时先要中规中矩的使用,如果你想玩花活就意味着更高的学习成本。 4. Redis中的Lua 接下来就是Redis Lua脚本的实际操作了。 EVAL命令 Redis中使用EVAL命令来直接执行指定的Lua脚本。 EVAL luascript numkeys key [key ...] arg [arg ...] EVAL 命令的关键字。 luascript Lua 脚本。 numkeys 指定的Lua脚本需要处理键的数量,其实就是 key数组的长度。 key 传递给Lua脚本零到多个键,空格隔开,在Lua 脚本中通过 KEYS[INDEX]来获取对应的值,其中1 <= INDEX <= numkeys。 arg是传递给脚本的零到多个附加参数,空格隔开,在Lua脚本中通过ARGV[INDEX]来获取对应的值,其中1 <= INDEX <= numkeys。 接下来我简单来演示获取键hello的值得简单脚本: 127.0.0.1:6379> set hello world OK 127.0.0.1:6379> get hello "world" 127.0.0.1:6379> EVAL "return redis.call('GET',KEYS[1])" 1 hello "world" 127.0.0.1:6379> EVAL "return redis.call('GET','hello')" (error) ERR wrong number of arguments for 'eval' command 127.0.0.1:6379> EVAL "return redis.call('GET','hello')" 0 "world" 从上面的演示代码中发现,KEYS[1]可以直接替换为hello,但是Redis官方文档指出这种是不建议的,目的是在命令执行前会对命令进行分析,以确保Redis Cluster可以将命令转发到适当的集群节点。 numkeys无论什么情况下都是必须的命令参数。 call函数和pcall函数 在上面的例子中我们通过redis.call()来执行了一个SET命令,其实我们也可以替换为redis.pcall()。它们唯一的区别就在于处理错误的方式,前者执行命令错误时会向调用者直接返回一个错误;而后者则会将错误包装为一个我们上面讲的table表格: 127.0.0.1:6379> EVAL "return redis.call('no_command')" 0 (error) ERR Error running script (call to f_1e6efd00ab50dd564a9f13e5775e27b966c2141e): @user_script:1: @user_script: 1: Unknown Redis command called from Lua script 127.0.0.1:6379> EVAL "return redis.pcall('no_command')" 0 (error) @user_script: 1: Unknown Redis command called from Lua script 这就像Java遇到一个异常,前者会直接抛出一个异常;后者会把异常处理成JSON返回。 值转换 由于在Redis中存在Redis和Lua两种不同的运行环境,在Redis和Lua互相传递数据时必然发生对应的转换操作,这种转换操作是我们在实践中不能忽略的。例如如果Lua脚本向Redis返回小数,那么会损失小数精度;如果转换为字符串则是安全的。 127.0.0.1:6379> EVAL "return 3.14" 0 (integer) 3 127.0.0.1:6379> EVAL "return tostring(3.14)" 0 "3.14" 根据胖哥经验传递字符串、整数是安全的,其它需要你去仔细查看官方文档并进行实际验证。 原子执行 Lua脚本在Redis中是以原子方式执行的,在Redis服务器执行EVAL命令时,在命令执行完毕并向调用者返回结果之前,只会执行当前命令指定的Lua脚本包含的所有逻辑,其它客户端发送的命令将被阻塞,直到EVAL命令执行完毕为止。因此LUA脚本不宜编写一些过于复杂了逻辑,必须尽量保证Lua脚本的效率,否则会影响其它客户端。 脚本管理 SCRIPT LOAD 加载脚本到缓存以达到重复使用,避免多次加载浪费带宽,每一个脚本都会通过SHA校验返回唯一字符串标识。需要配合EVALSHA命令来执行缓存后的脚本。 127.0.0.1:6379> SCRIPT LOAD "return 'hello'" "1b936e3fe509bcbc9cd0664897bbe8fd0cac101b" 127.0.0.1:6379> EVALSHA 1b936e3fe509bcbc9cd0664897bbe8fd0cac101b 0 "hello" SCRIPT FLUSH 既然有缓存就有清除缓存,但是遗憾的是并没有根据SHA来删除脚本缓存,而是清除所有的脚本缓存,所以在生产中一般不会再生产过程中使用该命令。 SCRIPT EXISTS 以SHA标识为参数检查一个或者多个缓存是否存在。 127.0.0.1:6379> SCRIPT EXISTS 1b936e3fe509bcbc9cd0664897bbe8fd0cac101b 1b936e3fe509bcbc9cd0664897bbe8fd0cac1012 1) (integer) 1 2) (integer) 0 SCRIPT KILL 终止正在执行的脚本。但是为了数据的完整性此命令并不能保证一定能终止成功。如果当一个脚本执行了一部分写的逻辑而需要被终止时,该命令是不凑效的。需要执行SHUTDOWN nosave在不对数据执行持久化的情况下终止服务器来完成终止脚本。 其它一些要点 了解了上面这些知识基本上可以满足开发一些简单的Lua脚本了。但是实际开发中还是有一些要点的。 务必对Lua脚本进行全面测试以保证其逻辑的健壮性,当Lua脚本遇到异常时,已经执行过的逻辑是不会回滚的。 尽量不使用Lua提供的具有随机性的函数,参见相关官方文档。 在Lua脚本中不要编写function函数,整个脚本作为一个函数的函数体。 在脚本编写中声明的变量全部使用local关键字。 在集群中使用Lua脚本要确保逻辑中所有的key分到相同机器,也就是同一个插槽(slot)中,可采用Redis Hash Tag技术。 再次重申Lua脚本一定不要包含过于耗时、过于复杂的逻辑。 5. 总结 本文对Redis Lua脚本的场景以及Redis Lua脚本所需要的Lua编程语法进行了详细的讲解和演示,也对Redis Lua脚本在实际开发中需要注意的一些要点进行了分享。希望能够帮助你掌握此技术。今天的分享就到这里,下次我将分享如何在实际Redis开发中使用Lua脚本,所以这一篇一定要进行掌握。多多关注:码农小胖哥 获取更多编程知识干货。 关注公众号:Felordcn获取更多资讯 个人博客:https://felord.cn

优秀的个人博客,低调大师

Flink从入门到入土

和其他所有的计算框架一样,flink也有一些基础的开发步骤以及基础,核心的API,从开发步骤的角度来讲,主要分为四大部分 1.Environment Flink Job在提交执行计算时,需要首先建立和Flink框架之间的联系,也就指的是当前的flink运行环境,只有获取了环境信息,才能将task调度到不同的taskManager执行。而这个环境对象的获取方式相对比较简单 //批处理环境valenv=ExecutionEnvironment.getExecutionEnvironment//流式数据处理环境valenv=StreamExecutionEnvironment.getExecutionEnvironment 2.Source Flink框架可以从不同的来源获取数据,将数据提交给框架进行处理, 我们将获取数据的来源称之为数据源. 2.1.从集合读取数据 一般情况下,可以将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用。这里的数据结构采用集合类型是比较普遍的 importorg.apache.flink.streaming.api.scala._/***description:SourceList*date:2020/8/2819:02*version:1.0**@author阳斌*邮箱:1692207904@qq.com*类的说明:从集合读取数据*/objectSourceList{defmain(args:Array[String]):Unit={//1.创建执行的环境valenv:StreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironment//2.从集合中读取数据valsensorDS:DataStream[WaterSensor]=env.fromCollection(//List(1,2,3,4,5)List(WaterSensor("ws_001",1577844001,45.0),WaterSensor("ws_002",1577844015,43.0),WaterSensor("ws_003",1577844020,42.0)))//3.打印sensorDS.print()//4.执行env.execute("sensor")}/***定义样例类:水位传感器:用于接收空高数据**@paramid传感器编号*@paramts时间戳*@paramvc空高*/caseclassWaterSensor(id:String,ts:Long,vc:Double)} 2.2从文件中读取数据 通常情况下,我们会从存储介质中获取数据,比较常见的就是将日志文件作为数据源 importorg.apache.flink.streaming.api.scala._/***description:SourceList*date:2020/8/2819:02*version:1.0**@author阳斌*邮箱:1692207904@qq.com*类的说明:从文件读取数据*/objectSourceFile{defmain(args:Array[String]):Unit={//1.创建执行的环境valenv:StreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironment//2.从指定路径获取数据valfileDS:DataStream[String]=env.readTextFile("input/data.log")//3.打印fileDS.print()//4.执行env.execute("sensor")}}/***在读取文件时,文件路径可以是目录也可以是单一文件。如果采用相对文件路径,会从当前系统参数user.dir中获取路径*System.getProperty("user.dir")*//***如果在IDEA中执行代码,那么系统参数user.dir自动指向项目根目录,*如果是standalone集群环境,默认为集群节点根目录,当然除了相对路径以外,*也可以将路径设置为分布式文件系统路径,如HDFSvalfileDS:DataStream[String]=env.readTextFile("hdfs://hadoop02:9000/test/1.txt")*/ 如果是standalone集群环境, 默认为集群节点根目录,当然除了相对路径以外,也可以将路径设置为分布式文件系统路径,如HDFS valfileDS:DataStream[String]=env.readTextFile("hdfs://hadoop02:9000/test/1.txt") 默认读取时,flink的依赖关系中是不包含Hadoop依赖关系的,所以执行上面代码时,会出现错误。 解决方法就是增加相关依赖jar包就可以了 2.3 kafka读取数据 Kafka作为消息传输队列,是一个分布式的,高吞吐量,易于扩展地基于主题发布/订阅的消息系统。在现今企业级开发中,Kafka 和 Flink成为构建一个实时的数据处理系统的首选 2.3.1 引入kafka连接器的依赖 <!--https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.11</artifactId><version>1.10.0</version></dependency> 2.3.2 代码实现参考 importjava.util.Propertiesimportorg.apache.flink.streaming.api.scala._importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011importorg.apache.flink.streaming.util.serialization.SimpleStringSchema/***description:SourceList*date:2020/8/2819:02*version:1.0**@author阳斌*邮箱:1692207904@qq.com*类的说明:从kafka读取数据*/objectSourceKafka{defmain(args:Array[String]):Unit={valenv:StreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironmentvalproperties=newProperties()properties.setProperty("bootstrap.servers","hadoop02:9092")properties.setProperty("group.id","consumer-group")properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("auto.offset.reset","latest")valkafkaDS:DataStream[String]=env.addSource(newFlinkKafkaConsumer011[String]("sensor",newSimpleStringSchema(),properties))kafkaDS.print()env.execute("sensor")}} 2.4自定义数据源 大多数情况下,前面的数据源已经能够满足需要,但是难免会存在特殊情况的场合,所以flink也提供了能自定义数据源的方式 2.4.1 创建自定义数据源 importcom.atyang.day01.Source.SourceList.WaterSensorimportorg.apache.flink.streaming.api.functions.source.SourceFunctionimportscala.util.Random/***description:ss*date:2020/8/2820:36*version:1.0**@author阳斌*邮箱:1692207904@qq.com*类的说明:自定义数据源*/classMySensorSourceextendsSourceFunction[WaterSensor]{varflg=trueoverridedefrun(ctx:SourceFunction.SourceContext[WaterSensor]):Unit={while(flg){//采集数据ctx.collect(WaterSensor("sensor_"+newRandom().nextInt(3),1577844001,newRandom().nextInt(5)+40))Thread.sleep(100)}}overridedefcancel():Unit={flg=false;}} 3.Transform 在Spark中,算子分为转换算子和行动算子,转换算子的作用可以通过算子方法的调用将一个RDD转换另外一个RDD,Flink中也存在同样的操作,可以将一个数据流转换为其他的数据流。 转换过程中,数据流的类型也会发生变化,那么到底Flink支持什么样的数据类型呢,其实我们常用的数据类型,Flink都是支持的。比如:Long, String, Integer, Int, 元组,样例类,List, Map等。 3.1 map 映射:将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素 参数:Scala匿名函数或MapFunction 返回:DataStream importorg.apache.flink.streaming.api.scala._/***description:SourceList*date:2020/8/2819:02*version:1.0**@author阳斌*邮箱:1692207904@qq.com*类的说明:从集合读取数据*/objectTransfrom_map{defmain(args:Array[String]):Unit={//1.创建执行的环境valenv:StreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironment//2.从集合中读取数据valsensorDS:DataStream[WaterSensor]=env.fromCollection(//List(1,2,3,4,5)List(WaterSensor("ws_001",1577844001,45.0),WaterSensor("ws_002",1577844015,43.0),WaterSensor("ws_003",1577844020,42.0)))valsensorDSMap=sensorDS.map(x=>(x.id+"_1",x.ts+"_1",x.vc+1))//3.打印sensorDSMap.print()//4.执行env.execute("sensor")}/***定义样例类:水位传感器:用于接收空高数据**@paramid传感器编号*@paramts时间戳*@paramvc空高*/caseclassWaterSensor(id:String,ts:Long,vc:Double)} 3.1.1 MapFunction Flink为每一个算子的参数都至少提供了Scala匿名函数和函数类两种的方式,其中如果使用函数类作为参数的话,需要让自定义函数继承指定的父类或实现特定的接口。例如:MapFunction sensor-data.log 文件数据 sensor_1,1549044122,10sensor_1,1549044123,20sensor_1,1549044124,30sensor_2,1549044125,40sensor_1,1549044126,50sensor_2,1549044127,60sensor_1,1549044128,70sensor_3,1549044129,80sensor_3,1549044130,90sensor_3,1549044130,100 importorg.apache.flink.streaming.api.scala._/***description:SourceList*date:2020/8/2819:02*version:1.0**@author阳斌*邮箱:1692207904@qq.com*类的说明:从文件读取数据*/objectSourceFileMap{defmain(args:Array[String]):Unit={//1.创建执行的环境valenv:StreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironment//2.从指定路径获取数据valfileDS:DataStream[String]=env.readTextFile("input/sensor-data.log")valMapDS=fileDS.map(lines=>{//更加逗号切割获取每个元素valdatas:Array[String]=lines.split(",")WaterSensor(datas(0),datas(1).toLong,datas(2).toInt)})//3.打印MapDS.print()//4.执行env.execute("map")}/***定义样例类:水位传感器:用于接收空高数据**@paramid传感器编号*@paramts时间戳*@paramvc空高*/caseclassWaterSensor(id:String,ts:Long,vc:Double)} importorg.apache.flink.api.common.functions.MapFunctionimportorg.apache.flink.streaming.api.scala._/***description:SourceList*date:2020/8/2819:02*version:1.0**@author阳斌*邮箱:1692207904@qq.com*类的说明:从文件读取数据*/objectTransform_MapFunction{defmain(args:Array[String]):Unit={//1.创建执行的环境valenv:StreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironment//2.从指定路径获取数据valsensorDS:DataStream[String]=env.readTextFile("input/sensor-data.log")sensorDS.map()//3.打印//MapDS.print()//4.执行env.execute("map")}/***自定义继承MapFunction*MapFunction[T,O]*自定义输入和输出**/classMyMapFunctionextendsMapFunction[String,WaterSensor]{overridedefmap(t:String):WaterSensor={valdatas:Array[String]=t.split(",")WaterSensor(datas(0),datas(1).toLong,datas(2).toInt)}}/***定义样例类:水位传感器:用于接收空高数据**@paramid传感器编号*@paramts时间戳*@paramvc空高*/caseclassWaterSensor(id:String,ts:Long,vc:Double)} 3.1.2 RichMapFunction 所有Flink函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。也有意味着提供了更多的,更丰富的功能。例如:RichMapFunction sensor-data.log 文件数据 同上一致 importorg.apache.flink.api.common.functions.{MapFunction,RichMapFunction}importorg.apache.flink.configuration.Configurationimportorg.apache.flink.streaming.api.scala._/***description:SourceList*date:2020/8/2819:02*version:1.0**@author阳斌*邮箱:1692207904@qq.com*类的说明:从文件读取数据*/objectTransform_RichMapFunction{defmain(args:Array[String]):Unit={//1.创建执行的环境valenv:StreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironment//2.从指定路径获取数据valsensorDS:DataStream[String]=env.readTextFile("input/sensor-data.log")valmyMapDS:DataStream[WaterSensor]=sensorDS.map(newMyRichMapFunction)//3.打印myMapDS.print()//4.执行env.execute("map")}/***自定义继承MapFunction*MapFunction[T,O]*自定义输入和输出**/classMyRichMapFunctionextendsRichMapFunction[String,WaterSensor]{overridedefmap(value:String):WaterSensor={valdatas:Array[String]=value.split(",")//WaterSensor(datas(0),datas(1).toLong,datas(2).toInt)WaterSensor(getRuntimeContext.getTaskName,datas(1).toLong,datas(2).toInt)}//富函数提供了生命周期方法overridedefopen(parameters:Configuration):Unit={}overridedefclose():Unit={}}/***定义样例类:水位传感器:用于接收空高数据**@paramid传感器编号*@paramts时间戳*@paramvc空高*/caseclassWaterSensor(id:String,ts:Long,vc:Double)} Rich Function有一个生命周期的概念。典型的生命周期方法有: open()方法是rich function的初始化方法,当一个算子例如map或者filter被调 用之前open()会被调用 close()方法是生命周期中的最后一个调用的方法,做一些清理工作 getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行 的并行度,任务的名字,以及state状态 3.1.3 flatMap 扁平映射:将数据流中的整体拆分成一个一个的个体使用,消费一个元素并产生零到多个元素 参数:Scala匿名函数或FlatMapFunction 返回:DataStream importorg.apache.flink.streaming.api.scala._/***description:SourceList*date:2020/8/2819:02*version:1.0**@author阳斌*邮箱:1692207904@qq.com*类的说明:FlatMap*/objectTransform_FlatMap{defmain(args:Array[String]):Unit={//1.创建执行环境valenv:StreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//2.读取数据vallistDS:DataStream[List[Int]]=env.fromCollection(List(List(1,2,3,4),List(5,6,7,1,1,1)))valresultDS:DataStream[Int]=listDS.flatMap(list=>list)resultDS.print()//4.执行env.execute()}} 3.2. filter 过滤:根据指定的规则将满足条件(true)的数据保留,不满足条件(false)的数据丢弃 参数:Scala匿名函数或FilterFunction 返回:DataStream importorg.apache.flink.streaming.api.scala._/***description:SourceList*date:2020/8/2819:02*version:1.0**@author阳斌*邮箱:1692207904@qq.com*类的说明:Filter*/objectTransform_Filter{defmain(args:Array[String]):Unit={//1.创建执行环境valenv:StreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//2.读取数据vallistDS:DataStream[List[Int]]=env.fromCollection(List(List(1,2,3,4,1,2,3,4),List(5,6,7,1,1,1,1,2,3,4,1,2,3,4),List(1,2,3,4),List(5,6,7,1,1,1),List(1,2,3,4),List(5,6,7,1,1,1)))//true就留下,false就抛弃listDS.filter(num=>{num.size>5}).print("filter")//4.执行env.execute()}} 3.3 keyBy 在Spark中有一个GroupBy的算子,用于根据指定的规则将数据进行分组,在flink中也有类似的功能,那就是keyBy,根据指定的key对数据进行分流 分流:根据指定的Key将元素发送到不同的分区,相同的Key会被分到一个分区(这里分区指的就是下游算子多个并行节点的其中一个)。keyBy()是通过哈希来分区的 参数:Scala匿名函数或POJO属性或元组索引,不能使用数组 返回:KeyedStream importorg.apache.flink.streaming.api.scala._/***description:SourceList*date:2020/8/2819:02*version:1.0**@author阳斌*邮箱:1692207904@qq.com*类的说明:FlatMap*/objectTransform_KeyBy{defmain(args:Array[String]):Unit={//1.创建执行环境valenv:StreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//2.读取数据valsensorDS:DataStream[String]=env.readTextFile("input/sensor-data.log")//3.转换为样例类valmapDS=sensorDS.map(lines=>{valdatas=lines.split(",")WaterSensor(datas(0),datas(1).toLong,datas(2).toInt)})//4.使用keyby进行分组// TODO 关于返回的key的类型://1.如果是位置索引或字段名称,程序无法推断出key的类型,所以给一个java的Tuple类型//2.如果是匿名函数或函数类的方式,可以推断出key的类型,比较推荐使用//***分组的概念:分组只是逻辑上进行分组,打上了记号(标签),跟并行度没有绝对的关系//同一个分组的数据在一起(不离不弃)//同一个分区里可以有多个不同的组//valsensorKS:KeyedStream[WaterSensor,Tuple]=mapDS.keyBy(0)//valsensorKS:KeyedStream[WaterSensor,Tuple]=mapDS.keyBy("id")valsensorKS:KeyedStream[WaterSensor,String]=mapDS.keyBy(_.id)//valsensorKS:KeyedStream[WaterSensor,String]=mapDS.keyBy(//newKeySelector[WaterSensor,String]{//overridedefgetKey(value:WaterSensor):String={//value.id//}//}//)sensorKS.print().setParallelism(5)//4.执行env.execute()}/***定义样例类:水位传感器:用于接收空高数据**@paramid传感器编号*@paramts时间戳*@paramvc空高*/caseclassWaterSensor(id:String,ts:Long,vc:Double)} 3.4 shuffle 打乱重组(洗牌):将数据按照均匀分布打散到下游 参数:无 返回:DataStream importorg.apache.flink.streaming.api.scala._/***description:SourceList*date:2020/8/2819:02*version:1.0**@author阳斌*邮箱:1692207904@qq.com*类的说明:FlatMap*/objectTransform_Shuffle{defmain(args:Array[String]):Unit={//1.创建执行环境valenv:StreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//2.读取数据valsensorDS:DataStream[String]=env.readTextFile("input/sensor-data.log")valshuffleDS=sensorDS.shufflesensorDS.print("data")shuffleDS.print("shuffle")//4.执行env.execute()}} 3.5. split 在某些情况下,我们需要将数据流根据某些特征拆分成两个或者多个数据流,给不同数据流增加标记以便于从流中取出。 需求:将水位传感器数据按照空高高低(以40cm,30cm为界),拆分成三个流 importorg.apache.flink.streaming.api.scala._/***description:SourceList*date:2020/8/2819:02*version:1.0**@author阳斌*邮箱:1692207904@qq.com*类的说明:FlatMap*/objectTransform_Split{defmain(args:Array[String]):Unit={//1.创建执行环境valenv:StreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//2.读取数据valsensorDS:DataStream[String]=env.readTextFile("input/sensor-data.log")//3.转换成样例类valmapDS:DataStream[WaterSensor]=sensorDS.map(lines=>{valdatas:Array[String]=lines.split(",")WaterSensor(datas(0),datas(1).toLong,datas(2).toInt)})valsplitSS:SplitStream[WaterSensor]=mapDS.split(sensor=>{if(sensor.vc<40){Seq("normal")}elseif(sensor.vc<80){Seq("Warn")}else{Seq("alarm")}})//4.执行env.execute()}/***定义样例类:水位传感器:用于接收空高数据**@paramid传感器编号*@paramts时间戳*@paramvc空高*/caseclassWaterSensor(id:String,ts:Long,vc:Double)} 3.6 select 将数据流进行切分后,如何从流中将不同的标记取出呢,这时就需要使用select算子了。 importorg.apache.flink.streaming.api.scala._/***description:SourceList*date:2020/8/2819:02*version:1.0**@author阳斌*邮箱:1692207904@qq.com*类的说明:FlatMap*/objectTransform_Split{defmain(args:Array[String]):Unit={//1.创建执行环境valenv:StreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//2.读取数据valsensorDS:DataStream[String]=env.readTextFile("input/sensor-data.log")//3.转换成样例类valmapDS:DataStream[WaterSensor]=sensorDS.map(lines=>{valdatas:Array[String]=lines.split(",")WaterSensor(datas(0),datas(1).toLong,datas(2).toInt)})valsplitDS:SplitStream[WaterSensor]=mapDS.split(sensor=>{if(sensor.vc<40){Seq("info")}elseif(sensor.vc<80){Seq("warn")}else{Seq("error")}})valerrorDS:DataStream[WaterSensor]=splitDS.select("error")valwarnDS:DataStream[WaterSensor]=splitDS.select("warn")valinfoDS:DataStream[WaterSensor]=splitDS.select("info")infoDS.print("info")warnDS.print("warn")errorDS.print("error")//4.执行env.execute()}/***定义样例类:水位传感器:用于接收空高数据**@paramid传感器编号*@paramts时间戳*@paramvc空高*/caseclassWaterSensor(id:String,ts:Long,vc:Double)} 3.7 connect 在某些情况下,我们需要将两个不同来源的数据流进行连接,实现数据匹配,比如订单支付和第三方交易信息,这两个信息的数据就来自于不同数据源,连接后,将订单支付和第三方交易信息进行对账,此时,才能算真正的支付完成。 Flink中的connect算子可以连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。 importorg.apache.flink.streaming.api.scala._/***description:SourceList*date:2020/8/2819:02*version:1.0**@author阳斌*邮箱:1692207904@qq.com*类的说明:FlatMap*/objectTransform_Connect{defmain(args:Array[String]):Unit={//1.创建执行环境valenv:StreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//2.读取数据valsensorDS:DataStream[String]=env.readTextFile("input/sensor-data.log")//3.转换成样例类valmapDS:DataStream[WaterSensor]=sensorDS.map(lines=>{valdatas:Array[String]=lines.split(",")WaterSensor(datas(0),datas(1).toLong,datas(2).toInt)})//4.从集合中再读取一条流valnumDS:DataStream[Int]=env.fromCollection(List(1,2,3,4,5,6))valresultCS:ConnectedStreams[WaterSensor,Int]=mapDS.connect(numDS)//coMap表示连接流调用的map,各自都需要一个functionresultCS.map(sensor=>sensor.id,num=>num+1).print()//4.执行env.execute()}/***定义样例类:水位传感器:用于接收空高数据**@paramid传感器编号*@paramts时间戳*@paramvc空高*/caseclassWaterSensor(id:String,ts:Long,vc:Double)} 3.8 union 对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream connect与 union 区别: union之前两个流的类型必须是一样,connect可以不一样 connect只能操作两个流,union可以操作多个。 importorg.apache.flink.streaming.api.scala._/***description:SourceList*date:2020/8/2819:02*version:1.0**@author阳斌*邮箱:1692207904@qq.com*类的说明:FlatMap*/objectTransform_Union{defmain(args:Array[String]):Unit={//1.创建执行环境valenv:StreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//2.从集合中读取流valnum1DS:DataStream[Int]=env.fromCollection(List(1,2,3,4))valnum2DS:DataStream[Int]=env.fromCollection(List(7,8,9,10))valnum3DS:DataStream[Int]=env.fromCollection(List(17,18,19,110))//TODOunion真正将多条流合并成一条流//合并的流,类型必须一致//可以合并多条流,只要类型一致num1DS.union(num2DS).union(num3DS).print()//4.执行env.execute()}/***定义样例类:水位传感器:用于接收空高数据**@paramid传感器编号*@paramts时间戳*@paramvc空高*/caseclassWaterSensor(id:String,ts:Long,vc:Double)} 3.9 Operator Flink作为计算框架,主要应用于数据计算处理上, 所以在keyBy对数据进行分流后,可以对数据进行相应的统计分析 3.9.1 滚动聚合算子(Rolling Aggregation) 这些算子可以针对KeyedStream的每一个支流做聚合。执行完成后,会将聚合的结果合成一个流返回,所以结果都是DataStream sum() min() max() 3.9.2 reduce 一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。 importorg.apache.flink.streaming.api.scala._/***description:SourceList*date:2020/8/2819:02*version:1.0**@author阳斌*邮箱:1692207904@qq.com*类的说明:Reduce*/objectTransform_Reduce{defmain(args:Array[String]):Unit={//1.创建执行环境valenv:StreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//2.读取数据valsensorDS:DataStream[String]=env.readTextFile("input/sensor-data.log")//3.转换成样例类valmapDS:DataStream[WaterSensor]=sensorDS.map(lines=>{valdatas:Array[String]=lines.split(",")WaterSensor(datas(0),datas(1).toLong,datas(2).toInt)})valsensorKS:KeyedStream[WaterSensor,String]=mapDS.keyBy(_.id)//输入的类型一样,输出类型和输出类型也要一样//组内的第一条数据,不进入reduce计算valreduceDS:DataStream[WaterSensor]=sensorKS.reduce((ws1,ws2)=>{println(ws1+"<===>"+ws2)WaterSensor(ws1.id,System.currentTimeMillis(),ws1.vc+ws2.vc)})reduceDS.print("reduce")//4.执行env.execute()}/***定义样例类:水位传感器:用于接收空高数据**@paramid传感器编号*@paramts时间戳*@paramvc空高*/caseclassWaterSensor(id:String,ts:Long,vc:Double)} 3.9.3process Flink在数据流通过keyBy进行分流处理后,如果想要处理过程中获取环境相关信息,可以采用process算子自定义实现 1)继承KeyedProcessFunction抽象类,并定义泛型:[KEY, IN, OUT] classMyKeyedProcessFunctionextendsKeyedProcessFunction[String,WaterSensor,String]{} 重写方法 //自定义KeyedProcessFunction,是一个特殊的富函数// 1.实现KeyedProcessFunction,指定泛型:K - key的类型, I -上游数据的类型, O -输出的数据类型//2.重写processElement方法,定义每条数据来的时候的处理逻辑/***处理逻辑:来一条处理一条**@paramvalue一条数据*@paramctx上下文对象*@paramout 采集器:收集数据,并输出*/overridedefprocessElement(value:WaterSensor,ctx:KeyedProcessFunction[String,WaterSensor,String]#Context,out:Collector[String]):Unit={out.collect("我来到process啦,分组的key是="+ctx.getCurrentKey+",数据="+value)//如果key是tuple,即keyby的时候,使用的是位置索引或字段名称,那么key获取到是一个tuple//ctx.getCurrentKey.asInstanceOf[Tuple1].f0//Tuple1需要手动引入Java的Tuple} 完整代码: importorg.apache.flink.streaming.api.functions.KeyedProcessFunctionimportorg.apache.flink.streaming.api.scala._importorg.apache.flink.util.Collector/***description:SourceList*date:2020/8/2819:02*version:1.0**@author阳斌*邮箱:1692207904@qq.com*类的说明:Reduce*/objectTransform_Process{defmain(args:Array[String]):Unit={//1.创建执行环境valenv:StreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//2.读取数据valsensorDS:DataStream[String]=env.readTextFile("input/sensor-data.log")//3.转换成样例类valmapDS:DataStream[WaterSensor]=sensorDS.map(lines=>{valdatas:Array[String]=lines.split(",")WaterSensor(datas(0),datas(1).toLong,datas(2).toInt)})//按照ID进行分组valsensorKS:KeyedStream[WaterSensor,String]=mapDS.keyBy(_.id)sensorKS.process(newMyKeyedProcessFunction)//4.执行env.execute()}//自定义KeyedProcessFunction,是一个特殊的富函数// 1.实现KeyedProcessFunction,指定泛型:K - key的类型, I -上游数据的类型, O -输出的数据类型//2.重写processElement方法,定义每条数据来的时候的处理逻辑classMyKeyedProcessFunctionextendsKeyedProcessFunction[String,WaterSensor,String]{/***处理逻辑:来一条处理一条**@paramvalue一条数据*@paramctx上下文对象*@paramout 采集器:收集数据,并输出*/overridedefprocessElement(value:WaterSensor,ctx:KeyedProcessFunction[String,WaterSensor,String]#Context,out:Collector[String]):Unit={out.collect("我来到process啦,分组的key是="+ctx.getCurrentKey+",数据="+value)//如果key是tuple,即keyby的时候,使用的是位置索引或字段名称,那么key获取到是一个tuple//ctx.getCurrentKey.asInstanceOf[Tuple1].f0//Tuple1需要手动引入Java的Tuple}}/***定义样例类:水位传感器:用于接收空高数据**@paramid传感器编号*@paramts时间戳*@paramvc空高*/caseclassWaterSensor(id:String,ts:Long,vc:Double)} 4.Sink Sink有下沉的意思,在Flink中所谓的Sink其实可以表示为将数据存储起来的意思,也可以将范围扩大,表示将处理完的数据发送到指定的存储系统的输出操作 之前我们一直在使用的print方法其实就是一种Sink。 @PublicEvolvingpublicDataStreamSink<T>print(StringsinkIdentifier){PrintSinkFunction<T>printFunction=newPrintSinkFunction(sinkIdentifier,false);returnthis.addSink(printFunction).name("PrinttoStd.Out");} 官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink 另 琐碎时间想看一些技术文章,可以去公众号菜单栏翻一翻我分类好的内容,应该对部分童鞋有帮助。同时看的过程中发现问题欢迎留言指出,不胜感谢~。另外,有想多了解哪些方面内容的可以留言(什么时候,哪篇文章下留言都行),附菜单栏截图(PS:很多人不知道公众号菜单栏是什么) END 我知道你 “在看” 本文分享自微信公众号 - Java知音(Java_friends)。如有侵权,请联系 support@oschina.cn 删除。本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

优秀的个人博客,低调大师

Scala语言入门三(集合)

一、数组 Scala 语言中提供的数组是用来存储固定大小的同类型元素,数组对于每一门编辑应语言来说都是重要的数据结构之一。数组的第一个元素索引为0,最后一个元素的索引为元素总数减1 1、声明数组 不可变数组 /** * @author Gjing **/ object ArrayApp { def main(args: Array[String]): Unit = { // 声明一个长度为2的不可变数组 val arr = new Array[String](2) // 赋值 arr(0) = "java" arr(1) = "scala" // 使用另外一种方式声明数组 val arr2 = Array("java","scala") } } 不可变数组 /** * @author Gjing **/ object ArrayApp { def main(args: Array[String]): Unit = { val buffer = ArrayBuffer[String]() // 追加一个值 buffer += "java" // 追加一个数组 buffer ++= Array("scala", "vue") } } 2、数组的处理 列出常见的一些操作,其余的自行查阅文档 循环输出 /** * @author Gjing **/ object ArrayApp { def main(args: Array[String]): Unit = { val arr = Array("java", "scala") for (x <- arr) { println(x) } } } 合并数组 /** * @author Gjing **/ object ArrayApp { def main(args: Array[String]): Unit = { val arr = Array("java", "scala") val arr2 = Array("vue", "go") // 这种方式需要引入 import Array._ val arr3 = concat(arr, arr2) // 直接调用 val arr4 = arr.concat(arr2) } } 二、List List的特征是其元素以线性方式存储,集合中可以存放重复对象。 1、创建List 定长list /** * @author Gjing **/ object ListApp { def main(args: Array[String]): Unit = { // 数字列表 var l1 = List(1, 2, 3, 4) var l2 = 1 :: 2 :: 3 :: Nil // 生成空列表 var l3 = Nil } } 构造列表的两个基本单位是 Nil 和 :: Nil 也可以表示为一个空列表。 不定长 /** * @author Gjing **/ object ListApp { def main(args: Array[String]): Unit = { val l = ListBuffer[Int]() // 加一个元素 l += 2 // 加入多个元素 l += (3,4) // 加入一个集合 l ++= List(5,6) // 打印 2,3,4,5,6 println(l.mkString(",")) } } 2、列表的操作 基本操作(返回第一个元素、返回第一个元素以外的列表、判断列表是否为空) /** * @author Gjing **/ object ListApp { def main(args: Array[String]): Unit = { var l = List(1,2,3) // 打印 1 println(l.head) // 打印 List(2,3) println(l.tail) // 打印false println(l.isEmpty) } } 拼接 /** * @author Gjing **/ object ListApp { def main(args: Array[String]): Unit = { val l1 = List(1, 2, 3) val l2 = List(3,4) val l3 = l1 ::: l2 // 较第一种是后者加进去后再列表前面 val l4 = l1.:::(l2) val l5 = List.concat(l1,l2) println(l3.mkString(",")) println(l4.mkString(",")) println(l5.mkString(",")) } } 列表反转 /** * @author Gjing **/ object ListApp { def main(args: Array[String]): Unit = { val l1 = List(1, 2, 3) // 打印 3,2,1 println(l1.reverse.mkString(",")) } } 创建一个指定重复数量的元素列表 /** * @author Gjing **/ object ListApp { def main(args: Array[String]): Unit = { val l1 = List.fill(2)(1) // 打印 1,1 println(l1.mkString(",")) } } 三、Set Set是最简单的一种集合。集合中的对象不按特定的方式排序,并且没有重复对象, 用法和list差不多,这里不过多介绍 四、Map Map 是一种把键对象和值对象映射的集合,它的每一个元素都包含一对键对象和值对象。Map 有两种类型,可变与不可变,区别在于可变对象可以修改它,而不可变对象不可以。默认情况下 Scala 使用不可变 Map。如果你需要使用可变集合,你需要显式的引入import scala.collection.mutable.Map 类在 Scala 中 你可以同时使用可变与不可变 Map,不可变的直接使用 Map,可变的使用 mutable.Map 1、创建Map /** * @author Gjing **/ object MapApp { def main(args: Array[String]): Unit = { // 空map val m1 = Map() // 存在键值对 var m2 = Map("名字" -> "张三", "年龄" -> 12) println(m2.mkString(",")) // 加一个新的键值对进去 m2 += ("性别" -> "男") println(m2.mkString(",")) } } 2、基本操作 /** * @author Gjing **/ object MapApp { def main(args: Array[String]): Unit = { var m2 = Map("名字" -> "张三", "年龄" -> 12) // 返回所有key println(m2.keys) // 返回所有value println(m2.values) // 判断是否为空 println(m2.isEmpty) } } 3、合并 你可以使用 ++ 运算符或 Map.++() 方法来连接两个 Map,Map 合并时会移除重复的 key /** * @author Gjing **/ object MapApp { def main(args: Array[String]): Unit = { val map1 = Map("年龄" -> "12") val map2 = Map("性别" -> "男") println(map1 ++ map2) println(map1.++(map2)) } } 4、查看 Map 中是否存在指定的 Key object MapApp { def main(args: Array[String]): Unit = { var m1 = Map("名字" -> "张三", "年龄" -> 12) // 打印 true println(m1.contains("名字")) } } 四、元组 元组是不同类型的值的集合,与列表一样,元组也是不可变的,但与列表不同的是元组可以包含不同类型的元素 1、定义元组 /** * @author Gjing **/ object TupleApp { def main(args: Array[String]): Unit = { val t1 = (1, "呢", 1.1) val t2 = new Tuple3(1, "呢", 1.1) } } 元组的实际类型取决于它的元素的类型,比如 (99, "runoob") 是 Tuple2[Int, String], 目前 Scala 支持的元组最大长度为 22。对于更大长度你可以使用集合,或者扩展元组 2、访问元组 /** * @author Gjing **/ object TupleApp { def main(args: Array[String]): Unit = { val t1 = (1, "呢", 1.1) // 获取元组第一个内容 println(t1._1) // 获取元组第二个内容 println(t1._2) } } 3、元组的迭代 /** * @author Gjing **/ object TupleApp { def main(args: Array[String]): Unit = { val t1 = (1, "呢", 1.1) t1.productIterator.foreach(e => println(e)) } } 4、元组转字符串 /** * @author Gjing **/ object TupleApp { def main(args: Array[String]): Unit = { val t1 = (1, "呢", 1.1) println(t1.toString()) } }

优秀的个人博客,低调大师

梯度下降极简入门

导 语 梯度下降及其变体被用作训练过程的关键部分在机器学习中广泛使用。梯度下降中的“梯度”是指单变量导数的推广形式,即多元变量求导。 梯度下降法是解决“优化问题”的迭代方法,其中优化问题是指围绕寻找函数的全局最小值或最大值而展开的数学问题。我们将很快看到,对于简单的优化问题可以不用梯度下降。当事情变得复杂时,我们则需要用诸如梯度下降之类的迭代法,当然和神经网络相关的优化问题确实足够复杂。 01 重新回顾优化问题 假设你已经购买了200米的铁丝网。您想使用此围栏为羊群创建一个矩形牧场。如何确定使牧场内部面积最大化时,牧场对应的长度和宽度? 使用标准的分析方法来解决这个问题,我们首先要写一个方程式来表示我们的问题。首先,我们知道两件事: area(面积)=length(长度)*width(宽度)(2 * length)+(2 * width)= 200 但是我们想用一个变量而不是两个变量来表示面积,所以我们可以求解这两个方程中的第二个变量的宽度: 2 * width = 200–2 * lengthwidth = [200-2 * length] / 2width = 100 - length 现在我们可以用100- length代替width,则: area= length * [100–length]area = f(length)=100 * length- length² 取面积的导数: area’=100-2*length;length=50 现在,我们找到了函数的“临界点”,即一阶导数等于零时对应的长度,当然函数在该点处的斜率也为零。我们关心这种值,因为它们是唯一能对应函数最小值或函数最大值的数。在临界点处,该点两侧的值可能都小于或都大于临界点处的值。因为函数中该点外的斜率都不为零,这意味着临界点上一边的点对应的面积将小于该临界点对应的面积,而另一边的点对应的面积将大于该临界点对应的面积。也就是说,该函数在非临界点处正在增加或减少,因此不能为最大值或最小值。 area’=100-2*length=0-->length=50 这表明如果存在全局最大值或最小值,则必须在长度= 50时发生。也就是说,如果矩形的长度存在最佳选择,则为50米。可能存在临界点既不是最小值也不是最大值,所以应该检测已发现的临界点确实是最值。在微积分课程中,有相关的检验方法(比如二阶导数检验),现在用更简单的方法(类似梯度下降法)来解决该问题,比如在临界点长度= 50的左右两侧测试2个点。 f(49)= 4900–2401 = 2499f(50)= 5000–2500 = 2500f(51)= 5100–2601 = 2499 这并不能确认矩阵面积最大时对应长度值就是50,或者某些时候有比50更好的点,不过这种近似的方法最终将应用于训练模型。对于这个简单的问题,通过绘制函数了解该方法和实际的误差: 02 梯度下降—迭代式猜测 那么这与梯度下降方法有什么关系呢?梯度下降方法比较粗糙,就像刚才所做的那些不精确的猜测一样。看上去它是比随机猜测稍强一点的显得杂乱的体系,不过它将最终成为我们解决围栏问题的方法: 猜测围栏的最佳长度 计算此时的梯度值 根据梯度的值,调整猜测 重复猜测直到满足条件则终止 假设我们随机选取57当作长度的第一个猜测值,在57处,面积函数的导数为: f(57)=100*57 - 57*57 = 2451f'(57)=100–2*(57)=-14 ‍ length = 57处的斜率不为0,因此不是临界点。此外斜率为-14表示如果将长度增加 1,则面积将减小 14(假设函数的斜率不变)。梯度下降使用此值作为进行下一个猜测的指导。比如要增加面积的值,因为斜率是负数,所以应该57基础上减小长度,以此增加面积的值。 然而即使知道当前应该减少length的值,也没有先验的方法可以确切地知道“合适”的数来调整我们的当前的猜测(length=57)。在像TensorFlow这样的软件包中,我们用来调整当前length的快慢将由一个称为“学习率”的超参数控制,可以在训练时设置该参数。提高学习率将导致梯度下降对当前当前length调整很快;减小学习率会导致梯度下降对当前当前length调整较慢。现在假设将当前length减少3,虽然有点武断,但实际上效果可以接受。 57–3 = 54f(54)= 5400–(54²)-->f(54)= 5400–2916 -->f(54)= 2484 2484大于2451,说明找到的面积比上次大。再次使用导数来检查是否处于临界点,如果不是,则调整猜测值: f'(54)= 100–2 * 54 = 100–108 = -8 我们的当前猜测值(54)仍然很大,斜率-8低于-14,应该适当减缓对当前猜测值的更改幅度,比如在54基础上减少2,则: f(52)= 5200–52² --> f(52)= 5200–2704 -->f(52)= 2496 我们将重复此过程,直到找到一个临界点。或者直到f'(length)的值非常接近零,才能认为length已足够接近临界点。手工完成此过程是重复且繁琐的,但是计算机善于解决重复繁琐的任务。通俗点说,将像在爬山和滚球,一直沿着抛物线上升,直到无法进一步上升为止。 请注意导数(蓝色),它在最大值的右边是负数,在最大值的左边是正数。在这种情况下,很容易找到最大值。 由于试图找到面积的最大值,上面的示例称之为“梯度上升”更好。在梯度下降中,每次计算梯度值时都将其相反数。 马上将提到的梯度下降和在神经网络里面用到的梯度下降有两个不一样的地方。首先,神经网络对应的函数比f(length)更复杂。我们的神经网络具有成千上万的可调参数,而其中f(length)只有一个。我们使用诸如梯度下降之类的迭代方法主要是因为神经网络模型非常复杂,而不是选用之前查找临界点时的分析过程。计算当前如此庞大规模的神经网络所表示的一般性的函数导数根本不可行。第二个区别是,在此示例中,有一个充当标准结果的实函数,并且发现了该函数的最佳值。在神经网络中,没有显式的函数;取而代之的是,我们尝试创建一个不存在标准结果的函数。 03 用于函数查找的梯度下降 在前面的示例中,使用梯度下降方法沿该曲线可以找到最佳值。在机器学习中,对于训练集中的数据,我们希望创建一条曲线来更好地拟合这些数据。回顾一下刚刚研究的围栏问题,将它转化为我们更倾向于用机器学习解决的问题。 我们从围栏数据库中收集了很多数据样本。我们数据集中的每个数据点的背景都是来自用100米围栏材料来建造矩形牧场。每个数据点有2个参数:围栏一侧的长度和围栏的面积。在这个问题中,不是尝试找到围栏的最佳长度,而是试图找到一个函数,该函数可以根据给定的边长来预测矩形围栏的面积。我们的输入数据如下所示: 看起来数据中有一个模式,我们可以使用机器学习来定义模式吗? 机器学习解决此问题的方法是认为:“看起来有点像某种数学函数,想知道到底是哪个函数?”,更进一步,我们猜测该函数是某种抛物线函数,则该函数的一般表达式是: F(x)=ax²+ bx 在数据集中已知x值和y值,现在我们将使用梯度下降法来找到a和b的最佳值。为此,我们引入了另一个函数--损失函数,并运行梯度下降以最小化损失函数。将F(x)当作“正在进行训练的函数”,其中x仍然是围栏一侧的长度,y是围栏的真实面积。单次预测的绝对误差为: L(x)= | F(x)- y | 我们需要给误差加上绝对值。如果不这样做且连续做两次预测,假设一次错了1000,另一次错了-1000,则两次错误刚好抵消,实际上这样一共错了两次。 最终我们会用这样的一个损失函数:平均绝对误差,它是数据集中所有的点对应L(x)的平均值。使用诸如均方误差之类的指标更为常见,但是不同的损失函数对不同的数据集。 假设我们随机选择权重的起始值:a = -2和b = 30。在对应的F(x)变为: L(x)= | -2x²+ 30x-y | 接下来的内容略带技巧性。我们想要调整a和b的值以最小化损失函数。因此我们需要计算损失函数对a和b的梯度(偏导数)。以前的函数中,x的值可以不断变化。但是这个问题里面,情况并非如此。x的值始终只是我们数据集中的某个固定值。因此单个数据点对应的损失函数为:L(a, b) = ax² + bx – y,其中x、y是常数。 使用平均绝对误差的损失函数为: L(a,b)=1/m*SUM(|F(a,b)—yi|)L(a,b)=(1 / m)* SUM(|axi²+ bxi — yi |) 其中xi和yi代表我们数据集中的单个数据点,而m是我们数据集中点的总数。这里有两个细节上的问题:一个是我们必须使用链式规则来计算涉及绝对值的梯度;另一个是绝对值函数在预测值恰好等于真实值的点上是不可求导的,并且。 如果平均绝对误差为0,可以通过停止计算梯度来解决不可导的问题。这是有实际意义的。如果我们在这个数据点上的预测值与实际值都符,那么就不需要用改善绝对误差的方式来调整模型: d | x |/ dx = 1,如果x为正d | x |/ dx = -1,如果x为负则损失函数对a、b求偏导数可以写成:L(a, b) = (1/m) * SUM( | axi² + bxi — yi | )L’_a(a, b) = (1 / m) * SUM( 1 * xi² ); F(xi) > yiL’_a(a, b) = (1 / m) * SUM( -1 * xi² ); F(xi) < yiL’_b(a, b) = (1 / m) * SUM( 1 * x ); F(xi) > yL’_b(a, b) = (1 / m) * SUM( -1 * x );F(xi) < yi 这些公式实际上为我们提供了一个非常简单的更新规则:如果我们的预测太小,则将a和b都增大。如果我们的预测太大,则将a和b都减小。 深刻地理解梯度下降以及优化损失函数的思路可以更好地理解过拟合之类的问题,同时它还可以帮助我们更好地了解神经网络训练的过程。 1 END 1 长 按 关 注 获取最新AI资讯与实战案例 实用AI客栈 小编微信号 : langu86 本文分享自微信公众号 - 实用AI客栈(gh_0b0b5e56231f)。如有侵权,请联系 support@oschina.cn 删除。本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

优秀的个人博客,低调大师

Charles 从入门到精通

内容清单 Charles 的简介 安装 Charles Charles 初始化设置 过滤网络请求 截取HTTP/HTTPS数据 模拟弱网环境 修改网络请求 修改服务器返回内容 服务器压力测试 反向代理 解决与翻墙软件的冲突 Charles 的简介 Charles 是目前最主流的网络调试工具(Charles、Fiddler、Wireshark...)之一,对于一个开发者来说与网络打交道是日常需求,因此很多时候我们需要调试参数、返回的数据结构、查看网络请求的各种头信息、协议、响应时间等等。所以了解 Charles 并使用它 Charles 通过将自己设置为系统的网络访问代理服务器,这样所有的网络请求都会通过它,从而实现了网路请求的截获和分析。 Chareles 不仅可以分析电脑本机的网络请求(HTTP 和 HTTPS),还可以分析移动端设备的网络请求。 Charles 是收费软件,作者开发出这样一个方便开发者使用的伟大工具,我们鼓励使用正版软件,但是对于一些囊中羞涩或者学生来说,有破解版的更好,别担心,这些我都准备好了,下一个 section 会讲解如何下载安装。 安装 Charles 方式1: Charles 官网地址,根据你的电脑操作系统选择合适的下载方式。此时下载下来的是需要收费的,不差钱的同学当然可以直接购买。购买链接 方式2:按照方式1的方式去官网下载,然后下载相应 JAR 包(链接:https://pan.baidu.com/s/1utD_gS4DwSefgzcfp6mWPg 密码:nv3m)。这里以 MAC 为例,打 Finder,选择应用程序,选中 Charles,右击并选择“显示包内容”,看到 Contents 目录,点击进去选择 Java 文件夹,将下载下来的 JAR包 拖进去替换。至此,完成了 Charles 的破解。 Charles 初始化设置 Charles 的工作原理是将自身设置为系统的代理服务器来捕获所有的网络请求。所以使用 Charles ,我们必须设置 Charles 为系统的代理服务器。 打开 Charles,当第一次启动的时候如果没有购买或者没有破解,会有倒计时,之后会看到软件的主界面,然后会请求你赋予它为系统代理的权限。点击授权会让你输入当前系统用户的密码。当然你也可以忽略或者拒绝该请求,然后等想要抓包的时候将它设置为系统的代理服务器。步骤:**选择菜单中的“Proxy” -> "Mac OS X Proxy"。**如下图: 之后你的电脑上的任何网络请求都可以在 Charles 的请求面板中看到 看看 Charles 的主界面 图上红色圈1:这里代表所有网络请求的展示方式。分别名为 “Structure” 和 “Sequence”。 Structure 将所有的网络请求按照域名划分并展示 Sequence 将所有的网络请求按照时间排序并展示 图上红色圈2:一些的网络请求设置比如 HTTPS 以及端口等信息都在这个菜单栏设置 图上红色圈2:证书设置都在这里进行 过滤网络请求 由于 Charles 可以将电脑或者设置过的手机的所有网络请求捕获到,而且我们分析网络传输应该是针对某个特定的网络下的抓包分析,为了清楚明显地看到我们感兴趣的网络请求通常会用到 Charles 的**“过滤网络请求的功能”**。 方法1:在 Charles 主面板的左侧所有网络请求的下方可以看到看到一个 ”Filter“ 输入栏,在这里你可以输入关键词来筛选出自己感兴趣的网络请求。比如我想分析的网络请求来自于”www.baidu.com" 下,你可以在下面输入"baidu"即可。 方法2:在 Charles 菜单栏的顶部会看到 “Proxy” 的选项,点击菜单栏选择 “Proxy” -> "Recording Settings" 。选择 “include”。看到面板上面有一个 “Add” 按钮,点击后在弹出的面板里面设置好我们需要分析的网络请求的协议、主机名、端口、路径、参数,当然你也可以只设置一些主要的信息,比如协议和主机名的组合。 方法3:一般打开 Charles 并设置好配置信息后(比如电脑本机或者设置过代理的手机)所有的网络请求都将在 Charles 的面板上显示,同时我们感兴趣的网络请求如果也在面板上显示的话,“Structure”模式下可以选中需要分析的网络请求,鼠标右击选择**“Focus”。“Sequence”模式下可以在面板的网络请求显示面板的右下角看到一个Focus**按钮,点击勾选后 Charles 只会显示你感兴趣的网络请求。 截取HTTP/HTTPS数据 截取 HTTP 请求 Charles 的主要目的是抓取捕获网络请求,这里以 iPhone 的抓包为例讲解。 Charles 的设置 要截获 iPhone 的网络请求就需要为 Charles 开启代理功能。在菜单栏选择**“Proxy” ->"Proxy Settings"。填写代理的端口号并将“Enable transparent HTTP proxying”**勾选上。 iPhone 上的设置 在电脑“系统偏好设置”中心打开网络查看本机 IP 地址,打开手机“设置”->“无线局域网”,进入当前使用的网络,点击进入当前 WIFI 的详情页(可以看到当前 WIFI 的基本信息,包括子网掩码、端口、IP地址、路由器),在最下角可以看到**“DNS”和“HTTP代理”2个section。我们点击“配置代理”**,设置 HTTP 代理选中“手动”。服务器处填写电脑ip地址,端口写8888。设置好后,我们打开 iPhone 上的任意需要网络请求的应用,就可以看到 Charles 弹出请求的确认菜单,单击"Allow"按钮,即可完成设置。 截取 HTTPS 请求 如果你需要捕获 HTTPS 协议的网络请求,那么则需要安装 Charles 的 CA 证书。步骤如下; 首先需要在 MAC 上安装证书。点击 Charles 顶部的菜单栏,选择 “Help” -> "SSL Proxying" -> "Install Charles Root Certificate"。 在 keychain 处将新安装的证书设置为永久信任 即使安装了 CA 证书,Charles 默认是不捕获 HTTPS 协议的网络请求,所以我们需要对某个主机下的网络请求抓包分析的话,选中该网络请求右击选中 “SSL Proxying Enabled”。这样就可以看到我们感兴趣的HTTPS 网络请求了。 如果你需要捕获移动设备的 HTTPS 网络请求,则需要在移动设备上安装证书并作简单的设置 选择 Charles 顶部菜单栏选择 “Help” ->"Install Charles Root Certificate on a Mobile Device or Remote Browser"。然后就可以看到 Charles 弹出的安装说明了。 在手机设置好 Charles 代理的情况下,在手机浏览器输入 “chls.pro/ssl”。安装提示下载好CA证书。 验证刚刚安装的 CA证书 iPhone 打开设置 -> 通用 -> 关于本机 -> 证书信任设置 -> 开启开关 在 Charles 菜单栏 Proxy -> SSL Proxying Setting -> 点击 Add 按钮 -> 在弹出的对对话框设置需要监听的 HTTPS 域(*:代表通配符) 设置完毕,尽情抓取你想要的 HTTPS 网络请求吧。 模拟弱网环境 在平时开发的时候我们经常需要模拟弱网环境,并作弱网环境下的适配工作。Charles 为我们提供了这个服务。 在 Charles 菜单栏选择 “Proxy” -> "Throttle Settings"。在弹出的面板上设置网络请求的参数(上行,下行带宽、利用率、可靠性等等信息)。如下图所示。 如果你想对指定主机进行弱网环境下的测试,可以点击上图的“Add”按钮,在弹出的面板上设置协议、主机、端口来对指定的主机进行弱网设置。 修改网络请求 对于捕获的网络请求,我们经常需要修改网络请求的cookie、Headers、Url等信息。Charles 提供了对网络请求的编辑和重发功能。只需要选中需要修改编辑的网络请求,在对应的右上角看到有一个“钢笔”的按钮,点击后就可以对选中的网络请求进行编辑了,编辑好后可以在右下角看到 Execute 按钮。这样我们编辑后的网络请求就可以被执行了。 修改服务器返回内容 很多时候为了方便调试代码,我们会有这种需求,修改接口返回的数据节点或者内容、甚至是状态码。比如数据为空、数据异常、请求失败、多页数据的情况。 Charles 为我们提供了超实用的功能,“Map(Map Local、Map Remote)功能”、Rewrite功能、Breakpoints功能 ,都可以实现修改服务端返回数据的功能。但是有区别和适用场景: Map 功能适合长期地将某一请求重定向到另一个指定的网络地址或者本地 JSON 文件 Rewrite 功能适合对网络请求进行一些正则替换 Breakpoints 功能适合对网络请求进行一些临时性的修改(类似于我们开发的断点作用) Map 功能 Map 功能分为 Map Local(将某个网络请求重定向到本地 JSON 文件) 和 Map Remote 功能(将网络请求重定向到另一个网络接口)。 在 Charles 菜单栏选择 “Tools” -> "Map Remote" 或 “Map Local” 即可进入相应的功能模块。 Map Remote 功能 适合于切换线上到本地、测试服务到正式服务的场景。比如下图从正式服务切换到测试服务 Map Local 功能 我们需要填写重定向的原地址信息和本地目标文件。我们可以先将某个接口的响应内容保存下来(选择对应的网络请求,右击点击 Save Response )成为 data.json 文件。然后我们编辑里面的 status 、message、data 等信息为我们想要的目标映射文件。 如下所示,我将一个网络请求的内容映射到我本地的一个 JSON 文件。之后这个请求的内容都从网络变为返回我本地的数据了。 Map Local 可能会存在一个小缺陷,其返回的 HTTP Response Header 与正常的网络请求不一样,如果程序设置了校验 Header 信息,此时 Map Local 就会失败,解决办法是同时使用 Rewrite功能将相关的HTTP 头部信息 rewrite 成我们需要的信息 Rewrite 功能 Rewrite 适合对某个网络请求进行正则替换,以达到修改结果的目的。 假如我的 App 的界面上的显示的功能模块及其点击事件是根据接口来完成的,我想实现替换功能模块的名称的目的。步骤:点击顶部菜单栏的**“Tools” -> "Rewrite"**。在弹出的面板上勾选 “Enable Rewrite”。点击左下角的 Add按钮,在右上角的 **Name:**处写好本次配置的名称(如果有多个 Rewrite,为了后期容易区分)。 可以针对特定的网络请求进行 Rewrite。可以点击右上角 Location 面板下面的 Add按钮。在弹出的面板上设置网络请求配置信息。注意此时需要同时设置 Protocol、Port、Host、Path信息(我测试加了 Protocol、Host、Port这3个是无效的) 然后对指定的 Type 和 Action 进行 Rewrite。 Type 主要有 Add Header、Modify Header、Remove Header、Host、Path等等。 Where 可以选择 Request 和 Response。指的是下面的修改是针对 Request 还是 Response 完成设置后点击 Apply 按钮,即可生效。下次继续请求该网络,返回的内容就是我们刚刚设置的内容。比如当前的“政策法规”要变成“哈哈哈,我是假的政策法规”。这时候就可以使用 Rewrite 功能 Breakpoints 功能 Breakpoints 相比于其他几个修改网络请求的特点是只是针对当前的网络请求,Breakpoints 只存在于设置过的当前的网络请求,Charles 关闭后下次打开 Breakpoints 消失了。想要修改网络请求 Breakpoints 步骤最简单,跟我们调试工具里面设置的断点一样方便。 对于我们设置了 Breakpoints 的网络请求, Charles 会在下次继续访问该请求的时候停止掉,就跟 debug 一样。此时我们可以 Edit Request,修改过 Request 之后点击右下角的 Execute 按钮。然后等到服务端返回的时候继续是断点状态,此时可以 Edit Response。步骤: 选中某个网络请求 -> 右击 -> 点击“Breakpoints”。 如下图:对该接口设置了 Breakpoints。请求网络后 Edit Response,点击 execute 后服务端返回的结果就是我们编辑的内容了。 服务器压力测试 我们可以使用 Charles 的 Repeat 功能地对服务器进行并发访问进行压力测试。步骤:**选中某个网络请求 -> 右击 -> Repeat Advanced -> 在弹出的面板里面设置总共的迭代次数(Iterations)、并发数(Concurrency) -> 点击“OK” 。**开始执行可以看到以设置的并发数的规模,进行总共达设置的总共迭代次数的访问。(专业的压力测试工具:Load Runner) 反向代理 Charles 的反向代理功能允许我们将本地指定端口的请求映射到远程的另一个端口上。设置:点击顶部菜单栏 Proxy -> 点击 Reverse Proxies。 如下所示,我将本地的 8080 端口映射到远程的 80 端口上,点击 OK 生效后,当我继续访问本地的 80 端口,实际返回的就是远程 80 端口的提供的内容了。 解决与翻墙软件的冲突 Charles 的工作原理是把自己设置为系统的代理服务器,但是我们开发者经常会利用 VPN 翻墙访问谷歌查找资料(这些翻墙软件的工作原理也是把自己设置成为系统的代理服务器),为了2者和平共处。我们可以在 Charles 的 External Proxy Settings 中将翻墙的代理端口等信息填写。同时我们需要关闭翻墙软件的自动设置,更改为**“手动模式”**。(使其不主动修改系统代理) 总结 Charles 功能强大、界面简洁,读完这篇文章并做出练习,相信你能很快掌握它,“工欲善其事,必先利其器” ,掌握了它,相信可以为你大大提高开发中调试网络的效率。Enjoy yourself 参考链接 唐巧的博客

优秀的个人博客,低调大师

Tablestore入门手册--局部事务

局部事务介绍 表格存储提供的局部事务也可以称为是分区键事务:可以指定某个分区键下的操作是原子的,要么全部成功要么全部失败,并且所提供的隔离级别为串行化。也就是说表格存储的局部事务可以防止以下问题 脏读:事务之外的操作读到了尚未提交的写入 脏写:事务之外的写入覆盖了本事务尚未提交的写入 不可重复读:在事务中的多次对同一行数据的读操作读到了不同的值 更新丢失:本事务提交已提交之后被其他并行执行的事务所覆盖(与脏写不同,脏写是两个事务都没有提交时发生的) 局部事务基本使用流程如下图所示 Tablestore的局部事务在启动事务时或首先获取到分区键下的锁,所有后续对该分区键的写操作与启动事务操作都会被阻塞至原事务提交或者超时以保证操作的隔离性,有如下的一些特性: 在事务提交或者中止之前,不能有另外一个事务在同分区键下启动事务 在事务提交或中止之前,非本事务的写入将被阻塞或超时失败 在事务提交和中止之前,非本事务的读取操作无法读取到事务中未提交的写入,而本事务的读操作可以获取到本事务中的写入 局部事务的使用 事务的启动、提交与中止 启动事务 // 局部事务需要指定一个分区键(第一列主键) PrimaryKey transactionPK = new PrimaryKey(Collections.singletonList( new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)) )); StartLocalTransactionRequest startTransactionRequest = new StartLocalTransactionRequest(TABLE_NAME, transactionPK); StartLocalTransactionResponse startTransactionResponse = syncClient.startLocalTransaction(startTransactionRequest); // 启动事务成功后会返回一个transactionId,任意实现了TxnRequest抽象类的请求都可以通过setTransactionId方法指定事务 final String transactionId = startTransactionResponse.getTransactionID(); 提交事务 // 提交事务,所有与该transactionId相关的写入操作将被永久地写入到Tablestore中 syncClient.commitTransaction(new CommitTransactionRequest(transactionId)); 中止事务 // 中止事务,所有与改transactionId相关的写入操作将被回滚 syncClient.abortTransaction(new AbortTransactionRequest(transactionId)); 基本用法 分别用同一个transactionId两次写入数据并提交,要么全部失败,要么全部失败写入第一行数据 PutRowRequest typeAPutRequest = new PutRowRequest(new RowPutChange( TABLE_NAME, new PrimaryKey(Arrays.asList( new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)), new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeA)) )) ).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_a"))); // 设置事务ID,由启动事务返回 typeAPutRequest.setTransactionId(transactionId); syncClient.putRow(typeAPutRequest); 通过事务ID读取上面写入的数据 GetRowRequest getRowRequest = new GetRowRequest(); SingleRowQueryCriteria singleRowQueryCriteria = new SingleRowQueryCriteria( TABLE_NAME, new PrimaryKey(Arrays.asList( new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)), new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeA)) )) ); singleRowQueryCriteria.setMaxVersions(1); getRowRequest.setRowQueryCriteria(singleRowQueryCriteria); // 这边需要设置事务ID getRowRequest.setTransactionId(transactionId); GetRowResponse getRowResponse = syncClient.getRow(getRowRequest); // 可以正常获取到数据 Assert.assertNotNull(getRowResponse.getRow()); 不通过事务ID读取 GetRowRequest getRowRequest = new GetRowRequest(); SingleRowQueryCriteria singleRowQueryCriteria = new SingleRowQueryCriteria( TABLE_NAME, new PrimaryKey(Arrays.asList( new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)), new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeA)) )) ); singleRowQueryCriteria.setMaxVersions(1); getRowRequest.setRowQueryCriteria(singleRowQueryCriteria); GetRowResponse getRowResponse = syncClient.getRow(getRowRequest); // 不设置事务ID获取,由于Tablestore提供的隔离级别为串行化,这边不能读取到未提交的写入 Assert.assertNull(getRowResponse.getRow()); 写入第二行数据 PutRowRequest typeBPutRequest = new PutRowRequest(new RowPutChange( TABLE_NAME, new PrimaryKey(Arrays.asList( new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)), new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeB)) )) ).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_b"))); typeBPutRequest.setTransactionId(transactionId); syncClient.putRow(typeBPutRequest); 提交 // 提交事务,提交事务后,其他读操作就可以获取到之前的写入 syncClient.commitTransaction(new CommitTransactionRequest(transactionId)); 中止事务 // 中止事务,所有与改transactionId相关的写入操作将被回滚 syncClient.abortTransaction(new AbortTransactionRequest(transactionId)); 启动事务后使用后其他操作非本事务的操作尝试写入 本示例展示的是一个在事务执行期间有另外一个同分区键的写入时的场景,由于在分区键下启动事务会直接锁定分区键下所有的写操作,在事务执行期间任何向同分区下的写入操作将被阻塞至事务提交或超时。下面的流程图展示了两个线程在通过事务写入的一些情景: 写入第一行 // put row A with transactionID PutRowRequest typeAPutRequest = new PutRowRequest(new RowPutChange( TABLE_NAME, new PrimaryKey(Arrays.asList( new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)), new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeA)) )) ).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_a"))); // set transactionId in startTransactionResponse typeAPutRequest.setTransactionId(transactionId); syncClient.putRow(typeAPutRequest); 写入第二行时不带事务ID,模拟非本事务的操作尝试写入,写入失败,返回的错误码为OTSRowOperationConflict // put row B without transactionID PutRowRequest typeBPutRequest = new PutRowRequest(new RowPutChange( TABLE_NAME, new PrimaryKey(Arrays.asList( new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)), new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeB)) )) ).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_b"))); // put without transactionId, due to PK_USE_ID is locked by another transaction, this action will fail try { syncClient.putRow(typeBPutRequest); Assert.fail(); } catch (TableStoreException e) { // ok Assert.assertEquals("OTSRowOperationConflict", e.getErrorCode()); } Batch写入 使用batch写入第一行和第二行 BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest(); batchWriteRowRequest.addRowChange(new RowPutChange(TABLE_NAME, new PrimaryKey(Arrays.asList( new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)), new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeA)) ))).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_a"))); batchWriteRowRequest.addRowChange(new RowPutChange(TABLE_NAME, new PrimaryKey(Arrays.asList( new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)), new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeB)) ))).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_b"))); batchWriteRowRequest.setTransactionId(transactionId); syncClient.batchWriteRow(batchWriteRowRequest); 使用batch写入第三行和第四行 BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest(); batchWriteRowRequest.addRowChange(new RowPutChange(TABLE_NAME, new PrimaryKey(Arrays.asList( new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)), new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeC)) ))).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_c"))); batchWriteRowRequest.addRowChange(new RowPutChange(TABLE_NAME, new PrimaryKey(Arrays.asList( new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)), new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeD)) ))).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_d"))); batchWriteRowRequest.setTransactionId(transactionId); syncClient.batchWriteRow(batchWriteRowRequest);

优秀的个人博客,低调大师

Nginx Ingress Controller 入门实践

Ingress是什么? 在 Kubernetes 集群中,Ingress是授权入站连接到达集群服务的规则集合,提供七层负载均衡能力。可以给 Ingress 配置提供外部可访问的 URL、负载均衡、SSL、基于名称的虚拟主机等。简单来说Ingress说一些规则集合,这些规则可以实现url到Kubernetes中的service这一层的路由功能。既然Ingress只是规则,那么这些规则的具体实现是怎么样的呢?是有Ingress-Controller来实现的,目前比较常见使用较多的是Nginx-Ingress-Controller。 可以这样理解一下,nginx-ingress-controller是一个nginx应用,它能干什么呢?它能代理后端的service,它能根据Ingress的配置,将对应的配置翻译成nginx应用的配置,来实现七层路由的功能。既然nginx-ingress-controller是作为一个类似网关的这么一个应用,那么我的nginx-ingress-controller这个应用本身就是需要在集群外能够访问到的,那么我是需要对外暴露nginx-ingress-controller这个应用的,在k8s中是通过创建一个LoadBalancer的Service:nginx-ingress-lb来暴露nginx-ingress-controller这个应用的。对应的,我们也就知道了nginx-ingress-controller这个应用从外部访问是通过nginx-ingress-lb这个service关联的SLB(负载均衡产品)来进行的。至于对应的slb的相关配置策略可以参考一下之前的文章,关于服务实现的内容。 简单的请求链路如下: 客户端 --> slb --> nginx-ingress-lb service --> nginx-ingress-controller pod --> app service --> app pod 我们使用Ingress来暴露服务,那么需要创建对应的资源,要想功能正常,nginx-ingress-controller的pod需要正常运行,nginx-ingress-lb service和slb监听配置正常才行,Ingress要关联的后端应用服务也是需要配置正确,包括应用pod运行正常,应用的service配置正确。 然后我们需要创建对应的Ingress来实现我们的需求。我们先创建一个简单的Ingress来大体看下具体的功能是什么样子的。我们这条Ingress的目的要实现:请求域名:ingress.test.com,实际请求会到后端的tomcat应用中。 相关配置: apiVersion: apps/v1beta2 kind: Deployment metadata: labels: app: tomcat name: tomcat namespace: default spec: progressDeadlineSeconds: 600 replicas: 1 revisionHistoryLimit: 10 selector: matchLabels: app: tomcat strategy: rollingUpdate: maxSurge: 25% maxUnavailable: 25% type: RollingUpdate template: metadata: labels: app: tomcat spec: containers: - image: 'tomcat:latest' imagePullPolicy: Always name: tomcat resources: requests: cpu: 100m memory: 200Mi terminationMessagePath: /dev/termination-log terminationMessagePolicy: File dnsPolicy: ClusterFirst restartPolicy: Always schedulerName: default-scheduler securityContext: {} terminationGracePeriodSeconds: 30 --- apiVersion: v1 kind: Service metadata: name: tomcat-svc namespace: default spec: clusterIP: 172.21.6.143 ports: - port: 8080 protocol: TCP targetPort: 8080 #service常见配置错误的地方,targetPort必须是pod暴露的端口,不能是其他的 selector: app: tomcat sessionAffinity: None type: ClusterIP --- apiVersion: extensions/v1beta1 kind: Ingress metadata: name: tomcat namespace: default spec: rules: - host: ingress.test.com http: paths: - backend: serviceName: tomcat-svc servicePort: 8080 path: / 在Ingress创建成功后会自动生成一个端点IP,我们应该将域名:ingress.test.com做A记录解析到这个端点IP。这样我们访问域名:ingress.test.com,实际请求会请求到我们的tomcat应用。 测试结果: #curl http://端点IP -H "host:ingress.test.com" -I HTTP/1.1 200 Date: Thu, 26 Sep 2019 04:55:39 GMT Content-Type: text/html;charset=UTF-8 Connection: keep-alive Vary: Accept-Encoding nginx-ingress-controller 配置分析 yaml如下: apiVersion: apps/v1beta2 kind: Deployment metadata: labels: app: ingress-nginx name: nginx-ingress-controller namespace: kube-system spec: progressDeadlineSeconds: 600 replicas: 2 revisionHistoryLimit: 10 selector: matchLabels: app: ingress-nginx strategy: rollingUpdate: maxSurge: 25% maxUnavailable: 25% type: RollingUpdate template: metadata: annotations: prometheus.io/port: '10254' prometheus.io/scrape: 'true' labels: app: ingress-nginx spec: affinity: podAntiAffinity: preferredDuringSchedulingIgnoredDuringExecution: - podAffinityTerm: labelSelector: matchExpressions: - key: app operator: In values: - ingress-nginx topologyKey: kubernetes.io/hostname weight: 100 containers: - args: - /nginx-ingress-controller - '--configmap=$(POD_NAMESPACE)/nginx-configuration' - '--tcp-services-configmap=$(POD_NAMESPACE)/tcp-services' - '--udp-services-configmap=$(POD_NAMESPACE)/udp-services' - '--annotations-prefix=nginx.ingress.kubernetes.io' - '--publish-service=$(POD_NAMESPACE)/nginx-ingress-lb' - '--v=2' env: - name: POD_NAME valueFrom: fieldRef: apiVersion: v1 fieldPath: metadata.name - name: POD_NAMESPACE valueFrom: fieldRef: apiVersion: v1 fieldPath: metadata.namespace image: >- registry-vpc.cn-shenzhen.aliyuncs.com/acs/aliyun-ingress-controller:v0.22.0.5-552e0db-aliyun imagePullPolicy: IfNotPresent livenessProbe: failureThreshold: 3 httpGet: path: /healthz port: 10254 scheme: HTTP initialDelaySeconds: 10 periodSeconds: 10 successThreshold: 1 timeoutSeconds: 1 name: nginx-ingress-controller ports: - containerPort: 80 name: http protocol: TCP - containerPort: 443 name: https protocol: TCP readinessProbe: failureThreshold: 3 httpGet: path: /healthz port: 10254 scheme: HTTP periodSeconds: 10 successThreshold: 1 timeoutSeconds: 1 resources: {} securityContext: capabilities: add: - NET_BIND_SERVICE drop: - ALL procMount: Default runAsUser: 33 terminationMessagePath: /dev/termination-log terminationMessagePolicy: File volumeMounts: - mountPath: /etc/localtime name: localtime readOnly: true dnsPolicy: ClusterFirst initContainers: - command: - /bin/sh - '-c' - | sysctl -w net.core.somaxconn=65535 sysctl -w net.ipv4.ip_local_port_range="1024 65535" sysctl -w fs.file-max=1048576 sysctl -w fs.inotify.max_user_instances=16384 sysctl -w fs.inotify.max_user_watches=524288 sysctl -w fs.inotify.max_queued_events=16384 image: 'registry-vpc.cn-shenzhen.aliyuncs.com/acs/busybox:latest' imagePullPolicy: Always name: init-sysctl resources: {} securityContext: privileged: true procMount: Default terminationMessagePath: /dev/termination-log terminationMessagePolicy: File nodeSelector: beta.kubernetes.io/os: linux restartPolicy: Always schedulerName: default-scheduler securityContext: {} serviceAccount: nginx-ingress-controller serviceAccountName: nginx-ingress-controller terminationGracePeriodSeconds: 30 volumes: - hostPath: path: /etc/localtime type: File name: localtime --- apiVersion: v1 kind: Service metadata: labels: app: nginx-ingress-lb name: nginx-ingress-lb namespace: kube-system spec: clusterIP: 172.21.11.181 externalTrafficPolicy: Local healthCheckNodePort: 32435 ports: - name: http nodePort: 31184 port: 80 protocol: TCP targetPort: 80 - name: https nodePort: 31972 port: 443 protocol: TCP targetPort: 443 selector: app: ingress-nginx sessionAffinity: None type: LoadBalancer 其中需要特别注意的配置是容器的args配置: --configmap=$(POD_NAMESPACE)/nginx-configuration 表明nginx-ingress-controller使用是哪个namespace下configmap来读取nginx-ingress-controller的nginx配置。默认是使用kube-system/nginx-configuration这个configmap。 --publish-service=$(POD_NAMESPACE)/nginx-ingress-lb 表明选择使用nginx-ingress-controller的Ingress的端点地址是使用的哪一个LoadBalancer的Service的扩展IP。默认是使用kube-system/nginx-ingress-lb这个Service。 --ingress-class=INGRESS_CLASS 这个配置是对nginx-ingress-controller自身的一个标识,表示我是谁,没有配置就是默认的“nginx”。这个配置有什么用呢?是用来让Ingress选择我要使用的ingress-controller是谁,Ingress通过注解:kubernetes.io/ingress.class: ""决定选择哪一个ingress-controller,如果没有配置那么就是选择--ingress-class=“nginx”这个ingress-controller。 如何在一个阿里云kubernetes集群里面部署多套Nginx Ingress Controller参考文档: https://yq.aliyun.com/articles/645856 之前我们也说了Ingress是规则,会下发到ingress-controller实现相关功能,那我们就来看下下发到ingress-controller的配置究竟是什么样。我们可以进入到nginx-ingress-controller的pod内来查看一下nginx配置。在pod内的/etc/nginx/nginx.conf里面,除了一些公共的配置外,上面的Ingress生成了如下的nginx.conf配置: ## start server ingress.test.com server { server_name ingress.test.com ; listen 80; set $proxy_upstream_name "-"; location / { set $namespace "default"; set $ingress_name "tomcat"; set $service_name "tomcat-svc"; set $service_port "8080"; set $location_path "/"; rewrite_by_lua_block { balancer.rewrite() } access_by_lua_block { balancer.access() } header_filter_by_lua_block { } body_filter_by_lua_block { } log_by_lua_block { balancer.log() monitor.call() } port_in_redirect off; set $proxy_upstream_name "default-tomcat-svc-8080"; set $proxy_host $proxy_upstream_name; client_max_body_size 100m; proxy_set_header Host $best_http_host; # Pass the extracted client certificate to the backend # Allow websocket connections proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $connection_upgrade; proxy_set_header X-Request-ID $req_id; proxy_set_header X-Real-IP $the_real_ip; proxy_set_header X-Forwarded-For $the_real_ip; proxy_set_header X-Forwarded-Host $best_http_host; proxy_set_header X-Forwarded-Port $pass_port; proxy_set_header X-Forwarded-Proto $pass_access_scheme; proxy_set_header X-Original-URI $request_uri; proxy_set_header X-Scheme $pass_access_scheme; # Pass the original X-Forwarded-For proxy_set_header X-Original-Forwarded-For $http_x_forwarded_for; # mitigate HTTPoxy Vulnerability # https://www.nginx.com/blog/mitigating-the-httpoxy-vulnerability-with-nginx/ proxy_set_header Proxy ""; # Custom headers to proxied server proxy_connect_timeout 10s; proxy_send_timeout 60s; proxy_read_timeout 60s; proxy_buffering off; proxy_buffer_size 4k; proxy_buffers 4 4k; proxy_request_buffering on; proxy_http_version 1.1; proxy_cookie_domain off; proxy_cookie_path off; # In case of errors try the next upstream server before returning an error proxy_next_upstream error timeout; proxy_next_upstream_tries 3; proxy_pass http://upstream_balancer; proxy_redirect off; } } ## end server ingress.test.com Nginx.conf里面的配置比较多,这里就不在一一详解,后续我们再针对一些常见的功能配置进行说明。 同时最新版本的nginx-ingress-controller已经默认开启了Upstream的动态更新,可以在nginx-ingress-controller的pod内请求:curl http://127.0.0.1:18080/configuration/backends 查看。具体内容如下: [{"name":"default-tomcat-svc-8080","service":{"metadata":{"creationTimestamp":null},"spec":{"ports":[{"protocol":"TCP","port":8080,"targetPort":8080}],"selector":{"app":"tomcat"},"clusterIP":"172.21.6.143","type":"ClusterIP","sessionAffinity":"None"},"status":{"loadBalancer":{}}},"port":8080,"secureCACert":{"secret":"","caFilename":"","pemSha":""},"sslPassthrough":false,"endpoints":[{"address":"172.20.2.141","port":"8080"}],"sessionAffinityConfig":{"name":"","cookieSessionAffinity":{"name":"","hash":""}},"upstreamHashByConfig":{"upstream-hash-by-subset-size":3},"noServer":false,"trafficShapingPolicy":{"weight":0,"header":"","cookie":""}},{"name":"upstream-default-backend","port":0,"secureCACert":{"secret":"","caFilename":"","pemSha":""},"sslPassthrough":false,"endpoints":[{"address":"127.0.0.1","port":"8181"}],"sessionAffinityConfig":{"name":"","cookieSessionAffinity":{"name":"","hash":""}},"upstreamHashByConfig":{},"noServer":false,"trafficShapingPolicy":{"weight":0,"header":"","cookie":""}}] 我们可以看到这里有ingress关联的service及其endpoint的映射关系,这样可以就可以请求到具体的pod的业务了。 路由配置的动态更新可以参考文档了解一下:https://yq.aliyun.com/articles/692732 在后续的文章我们再详细讲一些常见的使用场景。

优秀的个人博客,低调大师

Docker入门-Dockerfile的使用

使用Dockerfile定制镜像 镜像的定制实际上就是定制每一层所添加的配置、文件。我们可以把每一层修改、安装、构建、操作的命令都写入一个脚本,这个脚本就是Dockerfile。 Dockerfile是一个文本文件,其内包含了一条条的指令,每一条指令构建一层,因此每一条指令的内容,就是描述该层应当如何构建。 接下来我们以官方nginx镜像为例,使用Dockerfile来定制。 在一个空白目录中,建立一个文本文件,并命名为Dockerfile: mkdir mynginx cd mynginx touch Dockerfile 其内容为: FROM nginx RUN echo '<h1> Hello,Docker!</h1>' >/usr/share/nginx/html/index.html 这个Dockerfile很简单,一共就两行。涉及到了两条指令,FROM和RUN。 FROM指定基础镜像 所谓定制镜像,一定是以一个镜像为基础,在其上进行定制。基础镜像是必须指定的,而FROM就是指定基础镜像,因此一个Dockerfile中FROM是必备的指令,并且必须是第一条指令。在Docker Hub上有非常多的高质量的官方镜像,有可以直接拿来使用的服务类的镜像,如nginx、redis、mysql、tomcat等;可以在其中寻找一个最符合我们最终目标的镜像为基础镜像进行定制。 如果没有找到对应服务的镜像,官方镜像中还提供了一些更为基础的操作系统镜像,如ubuntu、debian、centos、alpine等,这些操作系统的软件库为我们提供了更广阔的扩展空间。 除了选择现有镜像为基础镜像外,Docker还存在一个特殊的镜像,名为scratch。这个镜像是虚拟的概念,并不实际存在,它表示一个空白的镜像。 FROM scratch ... 如果你以scratch为基础镜像的话,意味着你不以任何镜像为基础,接下来所写的指令将作为镜像第一层开始存在。 对于Linux下静态编译的程序来说,并不需要有操作系统提供运行时支持,所需的一切库都已经在可执行文件里了,因此直接FROM scratch会让镜像体积更加小巧。使用Go语言开发的应用很多会使用这种方式来制作镜像,这也是为什么有人认为Go是特别适应容器微服务架构的语言的原因之一。 RUN执行命令 RUN指令是用来执行命令行命令的。由于命令行的强大能力,RUN指令在定制镜像时是最常用的指令之一。其格式有两种: shell格式:RUN <命令> RUN echo '<h1>Hello,Docker~</h1>' > /usr/share/nginx/html/index.html exec格式: RUN ["可执行文件",“参数1”,“参数2”] RUN tar -xzf redis.tar.gz -C /usr/src/redis --strip-components=1 RUN make -C /usr/src/redis RUN make -C /usr/src/redis install 上面我们利用Dockerfile定制了nginx镜像,现在我们明白了这个Dockerfile的内容,接下来我们来构建这个镜像。 在Dockerfile文件所在目录执行: docker build -t nginx:v3 . 从命令的输出结果中,我们可以清晰的看到镜像的构建过程。在Step2中,RUN指令启动了一个容器782d25b7c611,执行了所要示的命令,并最后提交了这一层ba38ff665f57,随后删除了所用到的这个容器782d25b7c611。 启动构建的Nginx docker run --name nginx-test -p 8081:80 -d nginx:v3 如图所示 Dockerfile指令详解 COPY复制文件 格式: COPY <源路径>...<目标路径> COPY ["<源路径1>",..."<目标路径>"] COPY指令将从构建上下文目录中<源路径>的文件/目录复制到新的一层的镜像内的<目标路径>位置。比如: COPY package.json /usr/src/app/ <源路径>可以是多个,甚至可以是通配符,如: COPY hom* /mydir/ COPY hom?.txt /mydir/ ADD更高级的复制文件 ADD指令和COPY的格式和性质基本一致。但是在COPY基础上增加了一些功能。比如<源路径>可以是一个URL,这种情况下,Docker引擎会试图去下载这个链接的文件放到<目标路径>去。 在Docker官方的Dockerfile最佳实践文档中要求,尽可能的使用COPY,因此COPY的语义很明确,就是复制文件而已,而ADD则包含了更复杂的功能,其行为也不一定很清晰。最适合使用ADD的场合,就是所提及的需要自动解压缩的场合。 因此在COPY和ADD指令中选择的时候,可以遵循这样的原则,所有的文件复制均使用COPY指令,仅在需要自动解压缩的场合使用ADD。 CMD容器启动命令 CMD指令的格式和RUN相似,也是两种格式: shell格式:CMD <命令> exec格式:CMD ["可执行文件",“参数1”,“参数2”] 参数列表格式:CMD [“参数1”,“参数2”...]。在指定了ENTRYPOINT指令后,用CMD指定具体参数。 Docker不是虚拟机,容器就是进程。既然是进程,那么在启动容器的时候,需要指定所运行的程序及参数。CMD指令就是用于指定默认的容器主进程启动命令的。 ENTRYPOINT入口点 ENTRYPOINT的目的和CMD一样,都是在指定容器启动程序及参数。ENTRYPOINT在运行也可以替代,不过比CMD要略显繁琐,需要通过docker run的参数 --entrypoint来指定。 当指定了ENTRYPOINT后,CMD的含义就发生了改变,不再是直接的运行其命令,而是将CMD的内容作为参数传给ENTRYPOINT指令,换句话说实际执行时,将变为: <ENTRYPOINT>"<CMD>" ENV设置环境变量 格式有两种: ENV ENV ==... 这个指令很简单,就是设置环境变量而已,无论是后面的其它指令,如RUN,还是运行时的应用,都可以直接使用这里定义的环境变量。 ENV VERSION=1.0 DEBUG=on NAME="Happy Feet" $VERSION #使用环境变量 下列指令可以支持环境变量展开:ADD、COPY、ENV、EXPOSE、LABEL、USER、WORKDIR、VOLUME、STOPSIGNAL、ONBUILD。 ARG构建参数 格式: ARG <参数名>[=<默认值>]构建参数和ENV的效果一样,都是设置环境变量。所不同的是,ARG所设置的构建环境的环境变量,在将来容器运行时是不会存在这些环境变量的。但是不要因此就使用ARG保存密码之类的信息,因此docker history还是可以看到所有值的。 Dockerfile中的ARG指令是定义参数名称,以及定义其默认值。该默认值可以在构建命令docker build中用 --build-arg <参数名>=<值>来覆盖。 VOLUME定义匿名卷 格式为: VOLUME ["<路径1>","[路径2]"...] VOLUME <路径> 容器运行时应该尽量保持容器存储层不发生写操作,对于数据库需要保存动态数据的应用,其数据库文件应该保存于卷(volume)中,为了防止运行时用户忘记将动态文件所保存目录挂载为卷,在Dockerfile中,我们可以事先指定某些目录挂载为匿名卷,这样在运行时如果用户不指定挂载,其应用也可以正常运行,不会向容器存储层写入大量数据 VOLUME /data 这里的/data目录就会在运行时自动挂载为匿名卷,任何向/data中写入的信息都不会记录进容器存储层,从而保证了容器存储层的无状态化。当然,运行时可以覆盖这个挂载设置。 比如: docker run -d -v mydata:/data xxxx 在这行命令中,就使用了mydata这个命名卷挂载到了/data这个位置,替代了Dockerfile中定义的匿名卷的挂载配置。 EXPOSE声明端口 格式为EXPOSE <端口1>[<端口2>...]。 EXPOSE指令是声明运行时容器提供服务端口,这只是一个声明,在运行时并不会因为这个声明应该就会开启这个端口的服务。 在Dockerfile中写入这样的声明有两个好处: 是帮助镜像使用者理解这个镜像服务的守护端口,以方便配置映射; 在运行是使随机端口映射时,也就是docker run -P时,会自动随机映射EXPOSE端口。 WORKDIR指定工作目录 格式为WORKDIR <工作目录路径> 使用WORKDIR指令可以来指定工作目录(或者称为当前目录),以后各层的当前目录就被改为指定的目录,如该目录不存在,WORKDIR会帮你建立目录。 之前提到一些初学者常犯的错误是把Dockerfile等同于Shell脚本来书写,这种错误的理解还可能会导致出现下面这样的错误: RUN cd /app RUN echo "hello">world.txt 如果将这个Dockerfile进行构建镜像运行后,会发现找不到 /app/world.txt文件。 原因 在Shell中,连续两行是同一个进程执行环境,因此前一个命令修改的内存状态,会直接影响后一个命令。 而在Dockerfile中,这两行RUN命令的执行环境根本不同,是两个完全不同的容器。这就是对Dockerfile构建分层存储的概念不了解导致的错误。 每一个RUN都是启动一个容器、执行命令、然后提交存储层文件变量。第一层RUN cd /app的执行仅仅是当前进程的工作目录变量,一个内存上的变化而已,其结果不会造成任何文件变更。而到第二层的时候,启动的是一个全新的容器,跟第一层的容器更完全没关系,自然不可能继承前一层构建过程中的内存变化。 因此如果需要改变以后各层的工作目录的位置,那么应该使用WORKIDR指令。 USER指定当前用户 格式:USER <用户名> USER指令和WORKDIR相似,都是改变环境状态并影响以后的层。WORKDIR是改变工作目录,USER则是改变之后层的执行RUN,CMD以及ENTRYPOINT这类命令的身份。 当前,和WORKDIR一样,USER只是帮助你切换到指定用户而已,这个用户必须是事先建立好的,否则无法切换。 RUN groupadd -r redis && useradd -r -g redis redis USER redis RUN ["redis-server"] HEALTHCHECK健康检查 格式: HEALTHCHECK [选项] CMD <命令>:设置检查容器健康状况的命令 HEALTHCHECK NONE:如果基础镜像有健康检查指令,可以屏蔽掉其健康检查指令 HEALTHCHECK指令是告诉Docker应该如何进行判断容器的状态是否正常,这是Docker1.12引入的新指令。通过该指令指定一行命令,用这行命令来判断容器主进程的服务状态是否还正常,从而比较真实的反应容器实际状态。 一个镜像指令HEALTHCHECK指令后,用其启动容器,初始状态会为starting,在执行健康检查成功后变为healthy,如果连续一定次数失败,则会变为unhealthy。 HEALTHCHECK支持下列选项: --interval=<间隔>:两次健康检查的间隔,默认为30秒; --timeout=<时长>:健康检查命令运行超时时间,如果超过这个时间,本次健康检查就被视为失败,默认30秒; --retries=<次数>:当连续失败指定次数后,则将容器状态视为unhealthy,默认3次。 为了帮助排障,健康检查命令的输出(包括stdout以及stderr)都会被存储于健康状态里,可以用docker inspect来查看。 ONBUILD为他人做嫁衣裳 格式: ONBUILD <其它指令>ONBUILD是一个特殊的指令,它后面跟的是其它指令,比如RUN,COPY等,而这些指令,在当前镜像构建时并不会被执行。只有当以前镜像为基础镜像,去构建下一级镜像的时候才会被执行。 Dockerfile中的其它指令都是为了定制当前镜像而准备的,唯有ONBUILD是为了帮助别人定制自己而准备的。 其他制作镜像方式 docker save和docker load Docker还提供了docker load和docker save命令,用以将镜像保存为一个tar文件,然后传输到另一个位置上,再加载进来。这是在没有Docker Registry时的做法,现在已经不推荐,镜像迁移应该直接使用Docker Registry,无论是直接使用Docker Hub还是使用内网私有Registry都可以。 例如:保存nginx镜像 docker save nginx|gzip > nginx-latest.tar.gz 然后我们将nginx-latest.tar.gz文件复制到了另一个机器上,再次加载镜像: docker load -i nginx-latest.tar.gz

资源下载

更多资源
Spring

Spring

Spring框架(Spring Framework)是由Rod Johnson于2002年提出的开源Java企业级应用框架,旨在通过使用JavaBean替代传统EJB实现方式降低企业级编程开发的复杂性。该框架基于简单性、可测试性和松耦合性设计理念,提供核心容器、应用上下文、数据访问集成等模块,支持整合Hibernate、Struts等第三方框架,其适用范围不仅限于服务器端开发,绝大多数Java应用均可从中受益。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。

Sublime Text

Sublime Text

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。

WebStorm

WebStorm

WebStorm 是jetbrains公司旗下一款JavaScript 开发工具。目前已经被广大中国JS开发者誉为“Web前端开发神器”、“最强大的HTML5编辑器”、“最智能的JavaScript IDE”等。与IntelliJ IDEA同源,继承了IntelliJ IDEA强大的JS部分的功能。

用户登录
用户注册