首页 文章 精选 留言 我的

精选列表

搜索[整理],共9145篇文章
优秀的个人博客,低调大师

linux常用基本命令整理小结

linux系统遵循的基本原则 由目标单一的小程序组成,组合小程序完成复杂任务; 一切皆文件; 尽量避免捕捉用户接口; 配置文件保存为纯文本文件; Linux命令行常识 命令格式 命令+选项+参数 选项: 短选项:- 多个短选项可以结合:-a -b = -ab 长选项:-- 参数:命令的作用对象; 命令类型 内置命令:shell内部设置,内建; 外部命令:在文件系统的某个路径下有一个与命令名称相应的可执行文件; 检测是内外部命令的方式:使用type命令; type命令使用方法: type+要检查的命令 环境变量 定义:命令的内存空间(方便命令的使用) 多命令:利用“;”隔开; man手册使用 man手册分章 1:用户命令(/bin, /usr/bin, /usr/local/bin) 2:系统调用 3:库用户 4:特殊文件(设备文件) 5:文件格式(配置文件的语法) 6:游戏 7:杂项(Miscellaneous) 8: 管理命令(/sbin, /usr/sbin, /usr/local/sbin) 命令解读: <>:必选 []:可选(可省略) ...:可以出现多次 |:多选一 {}:分组 MAN: NAME:命令名称及功能简要说明 SYNOPSIS:用法说明,包括可用的选项 DESCRIPTION:命令功能的详尽说明,可能包括每一个选项的意义 OPTIONS:说明每一个选项的意义 FILES:此命令相关的配置文件 BUGS: EXAMPLES:使用示例 SEE ALSO:另外参照 操作方法: 翻屏: 向后翻一屏:SPACE 向前翻一屏:b 向后翻一行:ENTER 向前翻一行:k 查找: /KEYWORD: 向后 n: 下一个 N:前一个 ?KEYWORD:向前 n: 下一个 N:前一个 q: 退出 文件系统操作相关命令 文件系统目录结构 /boot: 系统启动相关的文件,如内核、initrd,以及grub (bootloader) /dev: 设备文件 设备文件: 块设备:随机访问,数据块 字符设备:线性访问,按字符为单位 设备号:主设备号(major)和次设备号(minor) /etc:配置文件 (Linux哲学:将配置文件保存成纯文本文件) /home:用户的家目录,每一个用户的家目录通常默认为/home/ USERNAME /root:管理员的家目录; /lib:库文件 静态库, .a 动态库, .dll, .so (shared object) /lib/modules:内核模块文件 /media:挂载点目录,移动设备(挂载:将设备关联到该文件系统 的某个目录上) /mnt:挂载点目录,额外的临时文件系统 /opt:可选目录,第三方程序的安装目录(optional 可选的) /proc:伪文件系统,内核映射文件(内核的根本功能属性 如:各 种协议的支撑 路由功能的支撑等) /sys:伪文件系统,跟硬件设备相关的属性映射文件 /tmp:临时文件, /var/tmp(一个月后自动删除其内容 任何用户 都可以创建 但只能删除自己的内容) /var:可变化的文件 /bin: 可执行文件, 用户命令 /sbin:管理命令 注意: 1) /bin /sbin 存放和系统启动相关的命令 2) /usr:shared, read-only (全局共享只读文件) /usr/bin /usr/sbin /usr/lib (系统启动后 提供基本功能所使用的相关命令和可执行程序) 3) /usr/local:(第3方软件 非关键性程序) /usr/local/bin /usr/local/sbin /usr/local/lib 文件系统的基本命令 1)文件管理命令 touch命令 语法: # touch(选项)(参数) 功能: 修改文件的时间戳;创建新的空文件; 选项: -a:改变访问时间 -m:改变修改时间 -t:改变为指定时间 e.g. # touch -mt 199301101123 -c:不创建任何文件 事实上,stat命令显示的是文件i节点的信息。Linux文件系统以块为单位存储信息,为了找到某一个文件所在存储空间的位置,用i节点对每个文件进行索引。 stat命令 语法: # stat (选项)(参数) 功能: 用于显示文件或文件系统的详细信息 选项: -f:不显示文件本身的信息,显示文件所在文件系统的信息 -L:显示符号链接 -t:只显示摘要信息 file命令 语法: file(选项)(参数) 功能: 显示文件的文件类型; 选项: -L:直接显示符号链接所指向的文件类型 参数: 要确定的文件类型的文件列表,多个文件之间用空格隔开,可以使用shell通配符匹配多个文件 rm命令 语法: # rm (选项)(参数) 功能: 删除文件 选项: -i:删除文件前询问用户 -f:强行 -r:递归 注意:rm -rf / (一定要小心 这是致命的命令) cp命令 语法: #cp (选项)(SRC DEST) 功能: 拷贝文件或目录 选项: -r:递归 -i:执行命令前询问用户 -f:强行 -p:保留原文件或目录的属性 -a:归档复制,常用于备份 注意: 多个文件到一个目录 #cp /etc/{passwd,inittab,rc.d/rc.sysinit} /tmp/ ( "{}"表示命令行展开) mv命令 语法: # mv (选项)(SRC DEST) 功能: 1)当目标文件为目录文件时,源文件移动到该目录下,且文件名不变;其次,源文件可以为多个,所有源文件都被移动个该目录下且文件名不变。 2)当目标文件不是目录文件时,源文件和目标文件在同一目录下,则是在修改目标文件的文件名,反之,则是源文件(只能为一个)覆盖目标件。 选项: -b:覆盖前先备份 -f:强行 -i:执行前询问用户 -t:移动多个源文件 cat/tac命令 语法: # cat/tac (选项)(参数) 功能: 1)一次显示整个文件: cat filename 2)从键盘创建文件:cat > filename 3)将几个文件合并为一个文件:cat file1 file2 > file 选项: -b:对非空输出行号 -E:在每行结束处显示$ -n:对输出的所有行进行编号,由1开始 more命令 语法: # more (选项)(参数) 功能: 按页显示文件内容 选项: +n:从笫n行开始显示 -n:定义屏幕大小为n行 -p:通过清除窗口而不是滚屏来对文件进行换页,与-c选项相似 操作: Enter 向下n行,需要定义。默认为1行 Ctrl+F 向下滚动一屏 空格键 向下滚动一屏 Ctrl+B 返回上一屏 = 输出当前行的行号 :f 输出文件名和当前行的行号 V 调用vi编辑器 !命令 调用Shell,并执行命令 q 退出more less命令 语法: # less (选项)(参数) 功能: less 与 more 类似,但使用 less 可以随意浏览文件,而 more 仅能向前移动,却不能向后移动,而且 less 在查看之前不会加载整个文件。 选项: -b <缓冲区大小> 设置缓冲区的大小 -e 当文件显示结束后,自动离开 -f 强迫打开特殊文件,例如外围设备代号、目录和二进制文件 -g 只标志最后搜索的关键词 -i 忽略搜索时的大小写 -m 显示类似more命令的百分比 -N 显示每行的行号 -o <文件名> 将less 输出的内容在指定文件中保存起来 -Q 不使用警告音 -s 显示连续空行为一行 -S 行过长时间将超出部分舍弃 -x <数字> 将“tab”键显示为规定的数字空格 操作: /字符串:向下搜索“字符串”的功能 ?字符串:向上搜索“字符串”的功能 n:重复前一个搜索(与 / 或 ? 有关) N:反向重复前一个搜索(与 / 或 ? 有关) b 向后翻一页 d 向后翻半页 h 显示帮助界面 Q 退出less 命令 u 向前滚动半页 y 向前滚动一行 空格键 滚动一行 回车键 滚动一页 [pagedown]: 向下翻动一页 [pageup]: 向上翻动一页 head命令 语法: # head (选项)(参数) 功能: head 用来显示档案的开头至标准输出中,默认head命令打印其相应文件的开头10行。 选项: -q 隐藏文件名 -v 显示文件名 -c<字节> 显示字节数 -n<行数> 显示的行数 tail命令 语法: # head (选项)(参数) 功能: 用于显示指定文件末尾内容,不指定文件时,作为输入信息进行处理。常用查看日志文件。 选项: -f 循环读取 -q 不显示处理信息 -v 显示详细的处理信息 -c<数目> 显示的字节数 -n<行数> 显示行数 cut命令 语法: # cut(选项)(参数) 功能: 1)显示文件内容; 2)连接两个或多个文件:cut f1 f2 > f3 选项: -b:仅显示行中指定直接范围的内容; -c:仅显示行中指定范围的字符; -d:指定字段的分隔符,默认的字段分隔符为“TAB”; -f:显示指定字段的内容; -n:与“-b”选项连用,不分割多字节字符; --complement:补足被选择的字节、字符或字段; --out-delimiter=<字段分隔符>:指定输出内容是的字段分割符; --help:显示指令的帮助信息; --version:显示指令的版本信息。 join命令 语法: # join (选项)(参数) 功能: “将两个文件里指定栏位同样的行连接起来”,即依照两个文件里共有的某一列,将相应的行拼接成一行。 选项: -a FILENUM:除了显示匹配好的行另外将指定序号(1或2)文件里部匹配的行显示出来 -e EMPTY:将须要显示可是文件里不存在的域用此选项指定的字符取代 -i :忽略大写和小写 -j FIELD :等同于 -1 FIELD -2 FIELD,-j指定一个域作为匹配字段 -o FORMAT:以指定格式输出 -t CHAR :以指定字符作为输入输出的分隔符join 默认以空白字符做分隔符(空格和\t),能够使用 join -t $'\t'来指定使用tab做分隔符 -v FILENUM:与-a相似 但值显示文件里没匹配上的行 -1 FIELD:以file1中FIELD字段进行匹配 -2 FIELD:以file2中FIELD字段进行匹配 --help :打印命令帮助文件 样例: 文件 file1.txt aa 1 2 bb 2 3 cc 4 6 dd 3 3 文件file2.txt aa 2 1 bb 8 2 ff 2 4 cc 4 4 dd 5 5 # join file1.txt file2.txt 输出:aa 1 2 2 1 bb 2 3 8 2 2)目录管理命令 ls 语法: ls (选项)(参数) 功能: 显示目录列表 选项: -l:长格式 文件类型: -:普通文件 (f) d: 目录文件 b: 块设备文件 (block) c: 字符设备文件 (character) l: 符号链接文件(symbolic link file) p: 命令管道文件(pipe) s: 套接字文件(socket) 文件权限:9位,每3位一组,每一组:rwx(读,写,执行), r-- 文件硬链接的次数 文件的属主(owner) 文件的属组(group) 文件大小(size),单位是字节 时间戳(timestamp):最近一次被修改的时间 访问:access 修改:modify,文件内容发生了改变 改变:change,metadata,元数据(文件的属性) -h:做单位转换 -a: 显示以.开头的隐藏文件 . 表示当前目录 .. 表示父目录 -A 不包含 . 和 ..文件 -d: 显示目录自身属性 -i: index node, inode -r: 逆序显示 -R: 递归(recursive)显示 cd 语法: cd (选项)(参数) 功能: 切换工作目录; 参数: cd ~USERNAME: 进入指定用户的家目录 cd -:在当前目录和前一次所在的目录之间来回切 mkdir 语法: mkdir (选项)(参数) 功能: 创建目录 选项: -m<目标属性>或--mode<目标属性>:建立目录的同时设置目录的权限; -p或--parents: 若所要建立目录的上层目录目前尚未建立,则会一并建立上层目录; 3)排序命令 sort 语法: sort (选项)(参数) 功能: 将文件进行排序,并将排序结果标准输出。sort命令既可以从特定的文件,也可以从stdin中获取输入。 选项: -n:数值排序 -r: 降序 -t: 字段分隔符 -k: 以哪个字段为关键字进行排序 -u: 排序后相同的行只显示一次 -f: 排序时忽略字符大小写 uniq 语法: uniq(选项)(参数) 功能: 报告或删除文件中重复的行 选项: -c: 显示文件中行重复的次数 -d: 只显示重复的行 4)统计命令 wc 语法:(word count) wc (选项)(参数) 功能: 用来计算数字。利用wc指令我们可以计算文件的Byte数、字数或是列数,若不指定文件名称,或是所给予的文件名为“-”,则wc指令会从标准输入设备读取数据。 选项: -c或--bytes或——chars:只显示Bytes数; -l或——lines:只显示列数; -w或——words:只显示字数。 5)字符处理命令 tr 语法: tr(选项)(字符集1 字符集2) 功能: 对来自标准输入的字符进行替换、压缩和删除。 选项: -c或——complerment:取代所有不属于第一字符集的字符; -d或——delete:删除所有属于第一字符集的字符; -s或--squeeze-repeats:把连续重复的字符以单独一个字符表示; -t或--truncate-set1:先删除第一字符集较第二字符集多出的字符。 参数: 字符集1:指定要转换或删除的原字符集。当执行转换操作时,必须使用参数“字符集2”指定转换的目标字符集。但执行删除操作时,不需要参数“字符集2”; 字符集2:指定要转换成的目标字符集。 详解见链接: https://www.cnblogs.com/ginvip/p/6354440.html 6)日期时间管理命令 date 语法: date (选项)(参数) 功能: 显示或设置系统时间与日期。 选项: -d<字符串>:显示字符串所指的日期与时间。字符串前后必须加上双引号; -s<字符串>:根据字符串来设置日期与时间。字符串前后必须加上双引号; -u:显示GMT; --help:在线帮助; --version:显示版本信息。 详细操作见链接: http://man.linuxde.net/date clock/hwclock 语法: clock (选项)(参数) 功能: 设置修改时间 详细操作见链接: http://blog.csdn.net/YuYunTan/article/details/52589019

优秀的个人博客,低调大师

hadoop常用进程和web端口整理

应用场景 Hadoop部署完分布式集群后,运行了一些组件,会产生很多进程,和web可以访问的端口,容易混淆,这里把一些常见的进程,进程的作用和端口进行归纳,总结,方便区分。 操作步骤 1. HDFS进程及端口 HDFS启动后包含的进程有:NameNode,DataNode,SecondaryNameNode 启动方法,在主节点: # cd /opt/hadoop2.6.0/sbin/ # ./start-dfs.sh 启动完后访问端口为50070,通过主节点IP加上端口即可访问: 2. YARN进程及端口 YARN启动后包含的进程有:Resourcemanager,NodeManager 启动方法,在主节点: # cd /opt/hadoop2.6.0/sbin/ # ./start-yarn.sh 启动完后访问端口为8088,通过主节点IP加上端口即可访问: 3. ZooKeeper进程 Zookeeper启动后包含的进程是:QuorumPeerMain 注: HQuorumPeer进程是hbase管理的zookeeper QuorumPeerMain进程就是zookeeper独立的进程 也就是说,如果您用的是hbase的zookeeper,那么就是HQuorumPeer进程 启动方法,在需要启动ZooKeeper的节点上: # cd /opt/zookeeper3.4.10/bin/ # zkServer.sh start 4. HBASE进程及端口 Hbase启动后包含的进程有 : HMaster,HRegionServer 启动方法,在主节点: # cd /opt/hbase1.2.6/bin # ./start-hbase.sh 启动完后访问端口为16010,通过主节点IP加上端口即可访问: 5. Spark进程及端口 Spark启动后包含的进程有 : Master Worker 启动方法,在主节点: # cd /opt/spark1.6.1/sbin # ./start-all.sh 启动完后访问端口为8080,通过主节点IP加上端口即可访问:【资源监控地址master】 启动后4040任务监控端口并没有开启,需要spark-shell –master spark://hadoop0:7077,执行:【任务监控地址Drive】 注意: spark-shell 【local模式,在4040端口监控任务】 spark-shell --master spark://hadoop0:7077 【standalone模式,在8080端口监控任务】 spark-shell --master yarn-client 【yarn-client模式,在yarn 8088端口监控任务】 spark-shell --master yarn-cluster 【yarn-cluster模式,在yarn 8088端口监控任务】

优秀的个人博客,低调大师

SQL知识整理二:锁、游标、索引

锁的模式 锁模式 描述 共享(S) 用于不更改或不更新数据(只读操作),如SELECT语句 更新(U) 用于可更新的资源中。防止当多个会话在读取、锁定以及随后可能进行的资源更新时发生常见形式的死锁。 排它(X) 用于数据修改操作,例如INSERT、UPDATE或DELETE。确保不会同时对同一资源进行多重更新 意向 SQL Server有在资源的低层获得共享锁或排它锁的意向意向锁的类型为:意向共享(IS)、意向排它(IX)以及意向排它共享(SIX) 架构 在执行依赖于表架构的操作时使用。架构锁的类型为:架构修改(Sch-M)和架构稳定(Sch-S) 大容量更新(BU) 向表中大容量复制数据并指定了TABLOCK提示时使用 死锁 死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。 死锁的四个必要条件 互斥条件(Mutual exclusion):资源不能被共享,只能由一个进程使用。 请求与保持条件(Hold and wait):已经得到资源的进程可以再次申请新的资源。 非剥夺条件(No pre-emption):已经分配的资源不能从相应的进程中被强制地剥夺。 循环等待条件(Circular wait):系统中若干进程组成环路,该环路中每个进程都在等待相邻进程正占用的资源 死锁的处理方法 查看那个spid处于wait状态,然后用kill spid来干掉(即破坏死锁的第四个必要条件:循环等待)。 使用SET LOCK_TIMEOUT timeout_period(单位为毫秒)来设定锁请求超时。默认情况下,数据库没有超时期限(timeout_period值为-1,可以用SELECT @@LOCK_TIMEOUT来查看该值,即无限期等待)。 SQL Server内部有一个锁监视器线程执行死锁检查,锁监视器对特定线程启动死锁搜索,检测到死锁后,数据库引擎选择运行回滚开销最小的事务的会话作为死锁牺牲品回滚死锁牺牲品的事务并释放该事务持有的所有锁,使其他线程的事务可以请求资源并继续运行。 游标 游标定义: 可以对一个select的结果集进行处理,或是不需要全部处理,就会返回一个对记录集进行处理之后的结果。 游标实际上是一种能从多条数据记录的结果集中每次提取一条记录的机制。游标可以完成: # 允许定位到结果集中的特定行 # 从结果集的当前位置检索一行或多行数据 # 支持对结果集中当前位置的进行修改 由于游标是将记录集进行一条条的操作,所以这样给服务器增加负担,一般在操作复杂的结果集的情况下,才使用游标。SQL Server 2005有三种游标:T-SQL游标、API游标、客户端游标。 游标的基本操作 游标的基本操作有定义游标、打开游标、循环读取游标、关闭游标、删除游标。 A、 定义游标 declare cursor_name --游标名称 cursor [local | global] --全局、局部 [forward only | scroll] --游标滚动方式 [read_only | scroll_locks | optimistic] --读取方式 for select_statements --查询语句 [for update | of column_name ...] --修改字段 参数: forward only | scroll:前一个参数,游标只能向后移动;后一个参数,游标可以随意移动 read_only:只读游标 scroll_locks:游标锁定,游标在读取时,数据库会将该记录锁定,以便游标完成对记录的操作 optimistic:该参数不会锁定游标;此时,如果记录被读入游标后,对游标进行更新或删除不会超过 B、 打开游标 open cursor_name; 游标打开后,可以使用全局变量@@cursor_rows显示读取记录条数 C、 检索游标 fetch cursor_name; 检索方式如下: fetch first; 读取第一行 fetch next; 读取下一行 fetch prior; 读取上一行 fetch last; 读取最后一行 fetch absolute n; 读取某一行 如果n为正整数,则读取第n条记录 如果n为负数,则倒数提取第n条记录 如果n为,则不读取任何记录 fetch pelative n 如果n为正整数,则读取上次读取记录之后第n条记录 如果n为负数,则读取上次读取记录之前第n条记录 如果n为,则读取上次读取的记录 D、 关闭游标 close cursor_name; E、 删除游标 deallocate cursor_name; 游标操作示例 --创建一个游标 declare cursor_stu cursor scroll for select id, name, age from student; --打开游标 open cursor_stu; --存储读取的值 declare @id int, @name nvarchar(20), @age varchar(20); --读取第一条记录 fetch first from cursor_stu into @id, @name, @age; --循环读取游标记录 print '读取的数据如下:'; --全局变量 while (@@fetch_status = 0) begin print '编号:' + convert(char(5), @id) + ', 名称:' + @name + ', 类型:' + @age; --继续读取下一条记录 fetch next from cursor_stu into @id, @name, @age; end --关闭游标 close area_cursor; --删除游标 --deallocate area_cursor; 索引 聚集索引定义: 聚簇索引即建立在聚簇上的索引,创建聚簇索引时,需要对已有表数据重新进行排序(若表中已有数据),即删除原始的表数据后再将排序结果按物理顺序插回,故聚簇索引建立完毕后,建立聚簇索引的列中的数据已经全部按序排列。 一个表中只能包含一个聚簇索引,但该索引可以包含多个列。 B-树索引中,聚簇索引的叶层就是数据页。 聚集索引最佳实践: 首先创建聚集索引 聚集索引键最好是唯一值 聚集索引上的列需要足够短,检索一定范围和预先排序数据时使用,因为聚集索引的叶子与数据页面相同,索引顺序也是数据物理顺序,读取数据时,磁头是按照顺序读取,而不是随机定位读取数据 在频繁更新的列上不要设计聚集索引,他将导致所有的非聚集所有的更新,阻塞非聚集索引的查询 不要使用太长的关键字,因为非聚集索引实际包含了聚集索引值 不要在太多并发度高的顺序插入,这将导致页面分割,设置合理的填充因子是个不错的选择 聚集索引示例: CREATE CLUSTERED INDEX IX_tb_heap_test_id ON dbo.tb_heap_test (id) WITH (ONLINE=ON) 非聚集索引定义: 非聚簇索引类似书本索引,索引与数据存放在不同的物理区域,建立非聚簇索引时数据本身不进行排序。一个表中科含多个非聚簇索引。 B-树索引中,非聚簇索引的叶层仍是索引页,其以指针指向数据页实际存储位置。 非聚集索引最佳实践 频繁更新的列,不适合做聚集索引,但可以做非聚集索引 宽关键字,例如很宽的一列或者一组列,不适合做聚集索引的列可作非聚集索引列 检索大量的行不宜做非聚集索引,但是可以使用覆盖索引来消除这种影响 非聚集索引示例: CREATE INDEX IX_tb_clustered_update_ID ON dbo.tb_clustered_update (ID) WITH (ONLINE=ON) 非聚集与聚集用法之比较 检索的数据行 一般地,检索数据量大的一般使用聚集索引,因为聚集索引的叶子页面与数据页面在相同。相反,检索少量的数据可能非聚集索引更有利,但注意书签查找消耗资源的力度,不过可考虑覆盖索引解决这个问题。 数据是否排序 如果数据需要预先排序,需要使用聚集索引,若不需要预先排序就那就选择聚集索引。 索引键的宽度 索引键如果太宽,不仅会影响数据查询性能,还影响非聚集索引,因此,若索引键比较小,可以作为聚集索引,如果索引键够大,考虑非聚集索引,如果很大的话,可以用INCLUDE创建覆盖索引 列更新的频度 列更新频率高的话,应该避免考虑所用非聚集索引,否则可考虑聚集索引。 书签查找开销 如果书签查找开销较大,应该考虑聚集索引,否则可使用非聚集索引,更佳是使用覆盖索引,不过得根据具体的查询语句而看 覆盖索引 覆盖索引可显著减少查询的逻辑读次数,使用INCLUDE语句添加列的方式更容易实现,他不仅减小索引中索引列的数据,还可以减少索引键的大小,原因是包含列只保存在索引的叶子级别上,而不是索引的叶子页面。覆盖索引充当一个伪的聚集索引。覆盖索引还能够有效的减少阻塞和死锁的发生,与聚集索引类似,因为聚集索引值发生一次锁,非覆盖索引可能发生两次,一次锁数据,一次锁索引,以确保数据的一致性。覆盖索引相当于数据的一个拷贝,与数据页面隔离,因此也只发生一次锁。 覆盖索引示例: CREATE INDEX IX_IX_tb_booklookup_name_type_other ON dbo.tb_booklookup (name) INCLUDE ([type],other) WITH ( ONLINE=ON ) 本文转自程兴亮博客园博客,原文链接: http://www.cnblogs.com/chengxingliang/p/3307473.html ,如需转载请自行联系原作者

优秀的个人博客,低调大师

Spark-Spark Streaming例子整理(一)

(摘自王家林) 流(Streaming),在大数据时代为数据流处理,就像水流一样,是数据流;既然是数据流处理,就会想到数据的流入、数据的加工、数据的流出。 日常工作、生活中数据来源很多不同的地方。例如:工业时代的汽车制造、监控设备、工业设备会产生很多源数据;信息时代的电商网站、日志服务器、社交网络、金融交易系统、黑客攻击、垃圾邮件、交通监控等;通信时代的手机、平板、智能设备、物联网等会产生很多实时数据,数据流无处不在。 在大数据时代SparkStreaming能做什么? 平时用户都有网上购物的经历,用户在网站上进行的各种操作通过Spark Streaming流处理技术可以被监控,用户的购买爱好、关注度、交易等可以进行行为分析。在金融领域,通过Spark Streaming流处理技术可以对交易量很大的账号进行监控,防止罪犯洗钱、财产转移、防欺诈等。在网络安全性方面,黑客攻击时有发生,通过Spark Streaming流处理技术可以将某类可疑IP进行监控并结合机器学习训练模型匹配出当前请求是否属于黑客攻击。其他方面,如:垃圾邮件监控过滤、交通监控、网络监控、工业设备监控的背后都是Spark Streaming发挥强大流处理的地方。 大数据时代,数据价值一般怎么定义? 所有没经过流处理的数据都是无效数据或没有价值的数据;数据产生之后立即处理产生的价值是最大的,数据放置越久或越滞后其使用价值越低。以前绝大多数电商网站盈利走的是网络流量(即用户的访问量),如今,电商网站不仅仅需要关注流量、交易量,更重要的是要通过数据流技术让电商网站的各种数据流动起来,通过实时流动的数据及时分析、挖掘出各种有价值的数据;比如:对不同交易量的用户指定用户画像,从而提供不同服务质量;准对用户访问电商网站板块爱好及时推荐相关的信息。 SparkStreaming VSHadoopMR: Spark Streaming是一个准实时流处理框架,而Hadoop MR是一个离线、批处理框架;很显然,在数据的价值性角度,Spark Streaming完胜于Hadoop MR。 SparkStreaming VS Storm: Spark Streaming是一个准实时流处理框架,处理响应时间一般以分钟为单位,也就是说处理实时数据的延迟时间是秒级别的;Storm是一个实时流处理框架,处理响应是毫秒级的。所以在流框架选型方面要看具体业务场景。需要澄清的是现在很多人认为Spark Streaming流处理运行不稳定、数据丢失、事务性支持不好等等,那是因为很多人不会驾驭Spark Streaming及Spark本身。在Spark Streaming流处理的延迟时间方面,Spark定制版本,会将Spark Streaming的延迟从秒级别推进到100毫秒之内甚至更少。 SparkStreaming优点: 1、提供了丰富的API,企业中能快速实现各种复杂的业务逻辑。 2、流入Spark Streaming的数据流通过和机器学习算法结合,完成机器模拟和图计算。 3、Spark Streaming基于Spark优秀的血统。 SparkStreaming能不能像Storm一样,一条一条处理数据? Storm处理数据的方式是以条为单位来一条一条处理的,而Spark Streaming基于单位时间处理数据的,SparkStreaming能不能像Storm一样呢?答案是:可以的。 业界一般的做法是Spark Streaming和Kafka搭档即可达到这种效果,入下图: Kafka业界认同最主流的分布式消息框架,此框架即符合消息广播模式又符合消息队列模式。 Kafka内部使用的技术: 1、 Cache 2、 Interface 3、 Persistence(默认最大持久化一周) 4、 Zero-Copy技术让Kafka每秒吞吐量几百兆,而且数据只需要加载一次到内核提供其他应用程序使用 外部各种源数据推进(Push)Kafka,然后再通过Spark Streaming抓取(Pull)数据,抓取的数据量可以根据自己的实际情况确定每一秒中要处理多少数据。 通过Spark Streaming动手实战wordCount实例 这里是运行一个Spark Streaming的程序:统计这个时间段内流进来的单词出现的次数. 它计算的是:他规定的时间段内每个单词出现了多少次。 1、先启动下Spark集群: 我们从集群里面打开下官方网站 接受这个数据进行加工,就是流处理的过程,刚才那个WordCount就是以1s做一个单位。 刚才运行的时候,为什么没有结果呢?因为需要数据源。 2、获取数据源: 新开一个命令终端,然后输入: $nc-lk9999 现在我们拷贝数据源进入运行: 然后按回车运行 DStream和RDD关系: 没有输入数据会打印的是空结果: 但是实际上,Job的执行是Spark Streaming框架帮我们产生的和开发者自己写的Spark代码业务逻辑没有关系,而且Spark Streaming框架的执行时间间隔可以手动配置,如:每隔一秒钟就会产生一次Job的调用。所以在开发者编写好的Spark代码时(如:flatmap、map、collect),不会导致job的运行,job运行是Spark Streaming框架产生的,可以配置成每隔一秒中都会产生一次job调用。Spark Streaming流进来的数据是DStream,但Spark Core框架只认RDD,这就产生矛盾了? Spark Streaming框架中,作业实例的产生都是基于rdd实例来产生,你写的代码是作业的模板,即rdd是作业的模板,模板一运行rdd就会被执行,此时action必须处理数据。RDD的模板就是DStream离散流,RDD之间存在依赖关系,DStream就有了依赖关系,也就构成了DStream 有向无环图。这个DAG图,是模板。Spark Streaming只不过是在附在RDD上面一层薄薄的封装而已。你写的代码不能产生Job,只有框架才能产生Job. 如果一秒内计算不完数据,就只能调优了. 总结: 使用Spark Streaming可以处理各种数据来源类型,如:数据库、HDFS,服务器log日志、网络流,其强大超越了你想象不到的场景,只是很多时候大家不会用,其真正原因是对Spark、spark streaming本身不了解。 Scala和Java二种方式实战Spark Streaming开发 一、Java方式开发 1、开发前准备:假定您以搭建好了Spark集群。 2、开发环境采用eclipse maven工程,需要添加Spark Streaming依赖。 3、Spark streaming 基于Spark Core进行计算,需要注意事项: 设置本地master,如果指定local的话,必须配置至少二条线程,也可通过sparkconf来设置,因为Spark Streaming应用程序在运行的时候,至少有一条线程用于不断的循环接收数据,并且至少有一条线程用于处理接收的数据(否则的话无法有线程用于处理数据),随着时间的推移,内存和磁盘都会不堪重负)。 温馨提示: 对于集群而言,每隔exccutor一般肯定不只一个Thread,那对于处理Spark Streaming应用程序而言,每个executor一般分配多少core比较合适?根据我们过去的经验,5个左右的core是最佳的(段子:分配为奇数个core的表现最佳,例如:分配3个、5个、7个core等) 接下来,让我们开始动手写写Java代码吧! 第一步:创建SparkConf对象 第二步:创建SparkStreamingContext 我们采用基于配置文件的方式创建SparkStreamingContext对象: 第三步,创建Spark Streaming输入数据来源: 我们将数据来源配置为本地端口9999(注意端口要求没有被占用): 第四步:我们就像对RDD编程一样,基于DStream进行编程,原因是DStream是RDD产生的模板,在Spark Streaming发生计算前,其实质是把每个Batch的DStream的操作翻译成为了RDD操作。 1、flatMap操作: 2、 mapToPair操作: 3、reduceByKey操作: 4、print等操作: 温馨提示: 除了print()方法将处理后的数据输出之外,还有其他的方法也非常重要,在开发中需要重点掌握,比如SaveAsTextFile,SaveAsHadoopFile等,最为重要的是foreachRDD方法,这个方法可以将数据写入Redis,DB,DashBoard等,甚至可以随意的定义数据放在哪里,功能非常强大。 一、Scala方式开发 第一步,接收数据源: 第二步,flatMap操作: 第三步,map操作: 第四步,reduce操作: 第五步,print()等操作: 第六步:awaitTermination操作 总结: 使用Spark Streaming可以处理各种数据来源类型,如:数据库、HDFS,服务器log日志、网络流,其强大超越了你想象不到的场景,只是很多时候大家不会用,其真正原因是对Spark、spark streaming本身不了解。 StreamingContext、DStream、Receiver深度剖析 一、StreamingContext功能及源码剖析: 1、 通过SparkStreaming对象jssc,创建应用程序主入口,并连上Driver上的接收数据服务端口9999写入源数据: 2、 Spark Streaming的主要功能有: 主程序的入口; 提供了各种创建DStream的方法接收各种流入的数据源(例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等); 通过构造函数实例化Spark Streaming对象时,可以指定master URL、appName、或者传入SparkConf配置对象、或者已经创建的SparkContext对象; 将接收的数据流传入DStreams对象中; 通过Spark Streaming对象实例的start方法启动当前应用程序的流计算框架或通过stop方法结束当前应用程序的流计算框架; 二、DStream功能及源码剖析: 1、 DStream是RDD的模板,DStream是抽象的,RDD也是抽象 2、 DStream的具体实现子类如下图所示: 3、 以StreamingContext实例的socketTextSteam方法为例,其执行完的结果返回DStream对象实例,其源码调用过程如下图: socket.getInputStream获取数据,while循环来存储储蓄数据(内存、磁盘) 三、Receiver功能及源码剖析: 1、Receiver代表数据的输入,接收外部输入的数据,如从Kafka上抓取数据; 2、Receiver运行在Worker节点上; 3、Receiver在Worker节点上抓取Kafka分布式消息框架上的数据时,具体实现类是KafkaReceiver; 4、Receiver是抽象类,其抓取数据的实现子类如下图所示: 5、 如果上述实现类都满足不了您的要求,您自己可以定义Receiver类,只需要继承Receiver抽象类来实现自己子类的业务需求。 四、StreamingContext、DStream、Receiver结合流程分析: (1)inputStream代表了数据输入流(如:Socket、Kafka、Flume等) (2)Transformation代表了对数据的一系列操作,如flatMap、map等 (3)outputStream代表了数据的输出,例如wordCount中的println方法: 数据数据在流进来之后最终会生成Job,最终还是基于Spark Core的RDD进行执行:在处理流进来的数据时是DStream进行Transformation由于是StreamingContext所以根本不会去运行,StreamingContext会根据Transformation生成”DStream的链条”及DStreamGraph,而DStreamGraph就是DAG的模板,这个模板是被框架托管的。当我们指定时间间隔的时候,Driver端就会根据这个时间间隔来触发Job而触发Job的方法就是根据OutputDStream中指定的具体的function,例如wordcount中print,这个函数一定会传给ForEachDStream,它会把函数交给最后一个DStream产生的RDD,也就是RDD的print操作,而这个操作就是RDD触发Action。 总结: 使用Spark Streaming可以处理各种数据来源类型,如:数据库、HDFS,服务器log日志、网络流,其强大超越了你想象不到的场景,只是很多时候大家不会用,其真正原因是对Spark、spark streaming本身不了解。 基于HDFS的SparkStreaming案例实战 一:Spark集群开发环境准备 启动HDFS,如下图所示: 通过web端查看节点正常启动,如下图所示: 2.启动Spark集群,如下图所示: 通过web端查看集群启动正常,如下图所示: 3.启动start-history-server.sh,如下图所示: 二:HDFS的SparkStreaming案例实战(代码部分) package com.dt.spark.SparkApps.sparkstreaming;import org.apache.spark.SparkConf;import org.apache.spark.SparkContext;import org.apache.spark.api.Java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.*;import scala.Tuple2;import java.util.Arrays;/** * Created by Jonson on 2016/4/17. */public class SparkStreamingOnHDFS { public static void main(String[] args){/** *第一步:配置SparkConf * 1. 至少两条线程: * 因为Spark Streaming应用程序在运行的时候,至少有一条线程用于不断的循环接收数据, * 并且至少有一条线程用于处理接收的数据(否则的话无法有线程用于处理数据,随着时间的推移,内存和磁盘都不堪重负) * 2. 对于集群而言,每个Executor一般而言肯定不止一个线程,对于处理Spark Streaming的应用程序而言,每个Executor一般 * 分配多少个Core合适呢?根据我们过去的经验,5个左右的core是最佳的(分配为奇数个Core为最佳)。 */ final SparkConf conf = new SparkConf().setMaster("spark://Master:7077").setAppName("SparkOnStreamingOnHDFS");/** *第二步:创建SparkStreamingContext,这个是Spark Streaming应用程序所有功能的起始点和程序调度的核心 * 1,SparkStreamingContext的构建可以基于SparkConf参数,也可以基于持久化SparkStreamingContext的内容 * 来恢复过来(典型的场景是Driver崩溃后重新启动,由于Spark Streaming具有连续7*24小时不间断运行的特征, * 所有需要在Driver重新启动后继续上一次的状态,此时状态的恢复需要基于曾经的checkpoint) * 2,在一个Spark Streaming应用程序中可以创建若干个SparkStreamingContext对象,使用下一个SparkStreamingContext * 之前需要把前面正在运行的SparkStreamingContext对象关闭掉,由此,我们获得一个重大启发:SparkStreamingContext * 是Spark core上的一个应用程序而已,只不过Spark Streaming框架箱运行的话需要Spark工程师写业务逻辑 */// JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));//Durations.seconds(5)设置每隔5秒 final String checkpointDirectory = "hdfs://Master:9000/library/SparkStreaming/Checkpoint_Data"; JavaStreamingContextFactory factory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { returncreateContext(checkpointDirectory,conf); } };/** *可以从失败中恢复Driver,不过还需要制定Driver这个进程运行在Cluster,并且提交应用程序的时候 * 指定 --supervise; */ JavaStreamingContext jsc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory);/** *现在是监控一个文件系统的目录 * 此处没有Receiver,Spark Streaming应用程序只是按照时间间隔监控目录下每个Batch新增的内容(把新增的) * 作为RDD的数据来源生成原始的RDD */ //指定从HDFS中监控的目录 JavaDStream lines = jsc.textFileStream("hdfs://Master:9000/library/SparkStreaming/Data");/** *第四步:接下来就像对于RDD编程一样基于DStreaming进行编程!!! * 原因是: * DStreaming是RDD产生的模板(或者说类)。 * 在Spark Streaming具体发生计算前其实质是把每个batch的DStream的操作翻译成对RDD的操作!! * 对初始的DStream进行Transformation级别的处理,例如Map,filter等高阶函数的编程,来进行具体的数据计算。 * 第4.1步:将每一行的字符串拆分成单个单词 */ JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String,String>() { public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); } });/** *第4.2步:对初始的JavaRDD进行Transformation级别的处理,例如map,filter等高阶函数等的编程,来进行具体的数据计算 * 在4.1的基础上,在单词拆分的基础上对每个单词实例计数为1,也就是word => (word,1) */ JavaPairDStream<String,Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String,Integer>(word,1); } });/** *第4.3步:在每个单词实例计数的基础上统计每个单词在文件中出现的总次数 */ JavaPairDStream<String,Integer> wordscount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } });/** *此处的print并不会直接触发Job的执行,因为现在的一切都是在Spark Streaming框架控制下的,对于Spark而言具体是否 * 触发真正的Job运行是基于设置的Duration时间间隔的 * 一定要注意的是:Spark Streaming应用程序要想执行具体的Job,对DStream就必须有output Stream操作, * output Stream有很多类型的函数触发,例如:print,saveAsTextFile,saveAsHadoopFiles等,其实最为重要的一个方法是 * foraeachRDD,因为Spark Streaming处理的结果一般都会放在Redis,DB,DashBoard等上面,foreachRDD主要就是用来完成这些 * 功能的,而且可以随意的自定义具体数据到底存放在哪里!!! */ wordscount.print(); /** * Spark Streaming执行引擎也就是Driver开始运行,Driver启动的时候是位于一条新的线程中的。 * 当然其内部有消息循环体用于接收应用程序本身或者Executor的消息; */ jsc.start(); jsc.awaitTermination(); jsc.close(); } /** *工厂化模式构建JavaStreamingContext */ private static JavaStreamingContext createContext(String checkpointDirectory,SparkConf conf){ System.out.println("Creating new context"); SparkConf = conf; JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,Durations.seconds(5)); ssc.checkpoint(checkpointDirectory); return ssc; }} 代码打包在集群中运行 创建目录 2.脚本运行 脚本内容如下: 此时Spark Streaming会每隔5秒执行一次,不断的扫描监控目录下是否有新的文件。 3.上传文件到HDFS中的Data目录下 4.输出结果 三:Spark Streaming on HDFS源码解密 JavaStreamingContextFactory的create方法可以创建JavaStreamingContext 而我们在具体实现的时候覆写了该方法,内部就是调用createContext方法来具体实现。上述实战案例中我们实现了createContext方法。 /*** Factory interface for creating a new JavaStreamingContext*/trait JavaStreamingContextFactory { def create(): JavaStreamingContext} 3.checkpoint: 一方面:保持容错 一方面保持状态 在开始和结束的时候每个batch都会进行checkpoint **Sets the context to periodically checkpoint the DStream operations for master * fault-tolerance. The graph will be checkpointed every batch interval.* @param directory HDFS-compatible directory where the checkpoint data will be reliably stored*/def checkpoint(directory: String) { ssc.checkpoint(directory)}4.remember: 流式处理中过一段时间数据就会被清理掉,但是可以通过remember可以延长数据在程序中的生命周期,另外延长RDD更长的时间。 应用场景: 假设数据流进来,进行ML或者Graphx的时候有时需要很长时间,但是bacth定时定条件的清除RDD,所以就可以通过remember使得数据可以延长更长时间。/** * Sets each DStreams in this context to remember RDDs it generated in the last given duration.* DStreams remember RDDs only for a limited duration of duration and releases them for garbage* collection. This method allows the developer to specify how long to remember the RDDs (* if the developer wishes to query old data outside the DStream computation).* @param duration Minimum duration that each DStream should remember its RDDs*/def remember(duration: Duration) { ssc.remember(duration)}5.在JavaStreamingContext中,getOrCreate方法源码如下: 如果设置了checkpoint ,重启程序的时候,getOrCreate()会重新从checkpoint目录中初始化出StreamingContext。 /** Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be* recreated from the checkpoint data. If the data does not exist, then the provided factory* will be used to create a JavaStreamingContext.** @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program* @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext* @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactor.*/@deprecated("use getOrCreate without JavaStreamingContextFactor", "1.4.0")def getOrCreate( checkpointPath: String, factory: JavaStreamingContextFactory ): JavaStreamingContext = { val ssc = StreamingContext.getOrCreate(checkpointPath, () => { factory.create.ssc }) new JavaStreamingContext(ssc)}异常问题思考: 为啥会报错? Streaming会定期的进行checkpoint。 重新启动程序的时候,他会从曾经checkpoint的目录中,如果没有做额外配置的时候,所有的信息都会放在checkpoint的目录中(包括曾经应用程序信息),因此下次再次启动的时候就会报错,无法初始化ShuffleDStream。 总结: 使用Spark Streaming可以处理各种数据来源类型,如:数据库、HDFS,服务器log日志、网络流,其强大超越了你想象不到的场景,只是很多时候大家不会用,其真正原因是对Spark、spark streaming本身不了解。 SparkStreaming数据源Flume实际案例 一、什么是Flume? flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用。Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera。但随着 FLume 功能的扩展,Flume OG 代码工程臃肿、核心组件设计不合理、核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重,为了解决这些问题,2011 年 10 月 22 号,cloudera 完成了 Flume-728,对 Flume 进行了里程碑式的改动:重构核心组件、核心配置以及代码架构,重构后的版本统称为 Flume NG(next generation);改动的另一原因是将 Flume 纳入 apache 旗下,cloudera Flume 改名为 Apache Flume。 flume的特点: flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。 flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。flume的可靠性 当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:end-to-end(收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送。),Store on failure(这也是scribe采用的策略,当数据接收方crash时,将数据写到本地,待恢复后,继续发送),Besteffort(数据发送到接收方后,不会进行确认)。flume的可恢复性: 还是靠Channel。推荐使用FileChannel,事件持久化在本地文件系统里(性能较差)。flume的一些核心概念: Agent 使用JVM 运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。 Client 生产数据,运行在一个独立的线程。 Source 从Client收集数据,传递给Channel。 Sink 从Channel收集数据,运行在一个独立线程。 Channel 连接 sources 和 sinks ,这个有点像一个队列。 Events 可以是日志记录、 avro 对象等。 Flume以agent为最小的独立运行单位。一个agent就是一个JVM。单agent由Source、Sink和Channel三大组件构成,如下图: 值得注意的是,Flume提供了大量内置的Source、Channel和Sink类型。不同类型的Source,Channel和Sink可以自由组合。组合方式基于用户设置的配置文件,非常灵活。比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。Flume支持用户建立多级流,也就是说,多个agent可以协同工作,并且支持Fan-in、Fan-out、Contextual Routing、Backup Routes,这也正是NB之处。如下图所示: 二、Flume+Kafka+SparkStreaming应用场景: 1、Flume集群采集外部系统的业务信息,将采集后的信息发生到Kafka集群,最终提供Spark Streaming流框架计算处理,流处理完成后再将最终结果发送给Kafka存储,架构如下图: 2、Flume集群采集外部系统的业务信息,将采集后的信息发生到Kafka集群,最终提供Spark Streaming流框架计算处理,流处理完成后再将最终结果发送给Kafka存储,同时将最终结果通过Ganglia监控工具进行图形化展示,架构如下图: 3、我们要做:Spark streaming 交互式的360度的可视化,Spark streaming 交互式3D可视化UI;Flume集群采集外部系统的业务信息,将采集后的信息发生到Kafka集群,最终提供Spark Streaming流框架计算处理,流处理完成后再将最终结果发送给Kafka存储,将最终结果同时存储在数据库(MySQL)、内存中间件(Redis、MemSQL)中,同时将最终结果通过Ganglia监控工具进行图形化展示,架构如下图: 三、Kafka数据写入Spark Streaming有二种方式: 一种是Receivers,这个方法使用了Receivers来接收数据,Receivers的实现使用到Kafka高层次的消费者API,对于所有的Receivers,接收到的数据将会保存在Spark分布式的executors中,然后由Spark Streaming启动的Job来处理这些数据;然而,在默认的配置下,这种方法在失败的情况下会丢失数据,为了保证零数据丢失,你可以在Spark Streaming中使用WAL日志功能,这使得我们可以将接收到的数据保存到WAL中(WAL日志可以存储在HDFS上),所以在失败的时候,我们可以从WAL中恢复,而不至于丢失数据。 另一种是DirectAPI,产生数据和处理数据的时候是在两台机器上?其实是在同一台数据上,由于在一台机器上有Driver和Executor,所以这台机器要足够强悍。 Flume集群将采集的数据放到Kafka集群中,Spark Streaming会实时在线的从Kafka集群中通过DirectAPI拿数据,可以通过Kafka中的topic+partition查询最新的偏移量(offset)来读取每个batch的数据,即使读取失败也可再根据偏移量来读取失败的数据,保证应用运行的稳定性和数据可靠性。 温馨提示: 1、Flume集群数据写入Kafka集群时可能会导致数据存放不均衡,即有些Kafka节点数据量很大、有些不大,后续会对分发数据进行自定义算法来解决数据存放不均衡问题。 2、个人强烈推荐在生产环境下用DirectAPI,但是我们的发行版,会对DirectAPI进行优化,降低其延迟。 总结: 实际生产环境下,搜集分布式的日志以Kafka为核心。 使用Spark Streaming可以处理各种数据来源类型,如:数据库、HDFS,服务器log日志、网络流,其强大超越了你想象不到的场景,只是很多时候大家不会用,其真正原因是对Spark、spark streaming本身不了解。 Spark Streaming on Kafka解析和安装实战 本课分2部分讲解: 第一部分,讲解Kafka的概念、架构和用例场景; 第二部分,讲解Kafka的安装和实战。 由于时间关系,今天的课程只讲到如何用官网的例子验证Kafka的安装是否成功。后续课程会接着讲解如何集成SparkStreaming和Kafka。 一、Kafka的概念、架构和用例场景 http://kafka.apache.org/documentation.html#introdution 1、Kafka的概念 Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。zookeeper和kafka和大数据是不只能用于大数据的,集群启不启动,它都可以使用,也可以用于Javaserver普通的企业级平台上。 什么是消息组件: 以帅哥和美女聊天为例,帅哥如何和美女交流呢?这中间通常想到的是微信、QQ、电话、邮件等通信媒介,这些通信媒介就是消息组件,帅哥把聊天信息发送给消息组件、消息组件将消息推送给美女,这就是常说的生产者、消费者模型,kafka不仅仅说是生产者消费者模式中广播的概念,也可以实现队列的方式,kafka的消费者中有一个group的概念,group中可以有很多实体,也可以只有一个实体,group中只有一个实体的话,就是队列的方式,所以从消息驱动的角度讲,它是广播的方式和队列的方式的完美结合体。而且在发送信息时可以将内容进行分类,即所谓的Topic主题。Kafka就是这样的通信组件,将不同对象组件粘合起来的纽带, 且是解耦合方式传递数据。 完善的流处理系统的特点: 1)能在线的以非常低的延迟,来处理数据,而且是稳定可靠的 2)能对流进来的数据进行非常复杂的分析,而不是简单的仅仅统计的分析 3)不仅能处理当前在线的数据,也能处理过去一天,一周,一个月甚至一年的数据 Apache Kafka与传统消息系统相比,有以下不同的特点: 分布式系统,易于向外扩展; 在线低延迟,同时为发布和订阅提供高吞吐量; 流进来的数据一般处理完后就消失了,也可以将消息存储到磁盘,因此可以处理1天甚至1周前内容,所以kafka不仅是一个消息中间件,还是一个存储系统 2、Kafka的架构 Kafka既然具备消息系统的基本功能,那么就必然会有组成消息系统的组件: Topic,Producer和Consumer。Kafka还有其特殊的Kafka Cluster组件。 Topic主题: 代表一种数据的类别或类型,工作、娱乐、生活有不同的Topic,生产者需要说明把说明数据分别放在那些Topic中,里面就是一个个小对象,并将数据数据推到Kafka,消费者获取数据是pull的过程。一组相同类型的消息数据流。这些消息在Kafka会被分区存放,并且有多个副本,以防数据丢失。每个分区的消息是顺序写入的,并且不可改写。 - Producer(生产者):把数据推到Kafka系统的任何对象。 - Kafka Cluster(Kafka集群):把推到Kafka系统的消息保存起来的一组服务器,也叫Broker。因为Kafka集群用到了Zookeeper作为底层支持框架,所以由一个选出的服务器作为Leader来处理所有消息的读和写的请求,其他服务器作为Follower接受Leader的广播同步备份数据,以备灾难恢复时用。 - Consumer(消费者):从Kafka系统订阅消息的任何对象。 消费者可以有多个,并且某些消费者还可以组成Consumer Group。多个Consumer Group之间组成消息广播的关系,所以各个Group可以拉相同的消息数据。在Consumer Group内部,各消费者之间对Consumer Group拉出来的消息数据是队列先进先出的关系,某个消息数据只能给该Group的一个消费者使用,同一个Group中的实体是互斥的,对一个消息,这样是避免重复消费。如果有多个group,每个group中只有一个实体,这就是队列的方式了,因为它是互斥的。如果不是一个实体,则是广播模式,如下图所示,广播只能广播给一个group中的一个消费实体 kafka的数据传输是基于kernel(内核)级别的(传输速度接近0拷贝-ZeroCopy)、没有用户空间的参与。Linux本身是软件,软件启动时第一个启动进程叫init,在init进程启动后会进入用户空间;kafka是用java写的,是基于jvm虚拟机的。例如:在分布式系统中,机器A上的应用程序需要读取机器B上的Java服务数据,由于Java程序对应的JVM是用户空间级别而且数据在磁盘上,A上应用程序读取数据时会首先进入机器B上的内核空间再进入机器B的用户空间,读取用户空间的数据后,数据再经过B机器上的内核空间分发到网络中(之所以要再经过B的内核,因为要通过网络通信,不通过内核,哪里来的网络通信),机器A网卡接收到传输过来的数据后再将数据写入A机器的内核空间,从而最终将数据传输给A的用户空间进行处理。如下图:网络本身是一种硬件,磁盘只是硬件的一种。 正常情况下,外部系统从Java程序中读取数据,传输给内核空间并依赖网卡将数据写入到网络中,从而把数据传输出去。其实Java本身是内核的一层外衣,Java Socket编程,操作的各种数据都是在JVM的用户空间中进行的。而Kafka操作数据是放在内核空间的,通常内核空间处理数据的速度比用户空间快上万倍,因为没用用户态和内核态的切换,所以通过kafka可以实现高速读、写数据。只要磁盘空间足够大,可以无限量的存储数据,kafka的数据就是存储在磁盘中的,不是存在内核中的。而很多消息组件是把数据存内存中的。kafka用zookeeperg管理元数据,而且按顺序写数据,比随机写要快很多。又有副本! 3、Kafka的用例场景 类似微信,手机和邮箱等等这样大家熟悉的消息组件,Kafka也可以: - 支持文字/图片 - 可以存储内容 - 分门别类 从内容消费的角度,Kafka把邮箱中的邮件类型看成是Topic。 二、Kafka的安装和实战 http://kafka.apache.org/documentation.html#quickstart 1、安装和配置Zookeeper Kafka集群模式需要提前安装好Zookeeper。 - 提示:Kafka单例模式不需要安装额外的Zookeeper,可以使用内置的Zookeeper。 - Kafka集群模式需要至少3台服务器。本课实战用到的服务器Hostname:master,slave1,slave2。 - 本课中用到的Zookeeper版本是Zookeeper-3.4.6。 1) 下载Zookeeper 进入http://www.apache.org/dyn/closer.cgi/zookeeper/,你可以选择其他镜像网址去下载,用官网推荐的镜像:http://mirror.bit.edu.cn/apache/zookeeper/。提示:可以直接下载群里的Zookeeper安装文件。 下载zookeeper-3.4.6.tar.gz 1) 安装Zookeeper 提示:下面的步骤发生在master服务器。 以ubuntu14.04举例,把下载好的文件放到/root目录,用下面的命令解压: cd /root tar -zxvf zookeeper-3.4.6.tar.gz 解压后在/root目录会多出一个zookeeper-3.4.6的新目录,用下面的命令把它剪切到指定目录即安装好Zookeeper了: cd /root mv zookeeper-3.4.6 /usr/local/spark 之后在/usr/local/spark目录会多出一个zookeeper-3.4.6的新目录。下面我们讲如何配置安装好的Zookeeper。 2) 配置Zookeeper 提示:下面的步骤发生在master服务器。 配置.bashrc - 打开文件:vi /root/.bashrc - 在PATH配置行前添加: export ZOOKEEPER_HOME=/usr/local/spark/zookeeper-3.4.6 - 最后修改PATH: export PATH=${JAVA_HOME}/bin:${ZOOKEEPER_HOME}/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${SCALA_HOME}/bin:${SPARK_HOME}/bin:${SPARK_HOME}/sbin:${HIVE_HOME}/bin:${KAFKA_HOME}/bin:$PATH - 使配置的环境变量立即生效:source /root/.bashrc 创建data目录 - cd $ZOOKEEPER_HOME - mkdir data 创建并打开zoo.cfg文件 - cd $ZOOKEEPER_HOME/conf - cp zoo_sample.cfg zoo.cfg - vi zoo.cfg 配置zoo.cfg # 配置Zookeeper的日志和服务器身份证号等数据存放的目录。 # 千万不要用默认的/tmp/zookeeper目录,因为/tmp目录的数据容易被意外删除。 dataDir=../data # Zookeeper与客户端连接的端口 clientPort=2181 # 在文件最后新增3行配置每个服务器的2个重要端口:Leader端口和选举端口 # server.A=B:C:D:其中 A 是一个数字,表示这个是第几号服务器; # B 是这个服务器的hostname或ip地址; # C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口; # D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举, # 选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。 # 如果是伪集群的配置方式,由于 B 都是一样,所以不同的 Zookeeper 实例通信 # 端口号不能一样,所以要给它们分配不同的端口号。 server.1=master:2888:3888 server.2=slave1:2888:3888 server.3=slave2:2888:3888 改成如下方式: dataDir=/usr/local/spark/zookeeper-3.4.6/data dataLogDir=/usr/local/spark/zookeeper-3.4.6/logs clientPort=2181 server.0=master1:2888:3888 server.1=work1:2888:3888 server.2=work2:2888:3888 创建并打开myid文件 - cd $ZOOKEEPER_HOME/data - touch myid - vi myid 配置myid 按照zoo.cfg的配置,myid的内容就是1。要写成0,和上面zoo.cfg里面的配置server.0,server.1,server.2一致,所以下面work1中myid内容为1,work2中myid内容为2 3) 同步master的安装和配置到slave1和slave2 - 在master服务器上运行下面的命令 cd /root scp ./.bashrc root@slave1:/root scp ./.bashrc root@slave2:/root cd /usr/local/spark scp -r ./zookeeper-3.4.6 root@slave1:/usr/local/spark scp -r ./zookeeper-3.4.6 root@slave2:/usr/local/spark - 在slave1服务器上运行下面的命令 vi $ZOOKEEPER_HOME/data/myid 按照zoo.cfg的配置,myid的内容就是1。 - 在slave2服务器上运行下面的命令 vi $ZOOKEEPER_HOME/data/myid 按照zoo.cfg的配置,myid的内容就是2。 4) 启动Zookeeper服务 - 在master服务器上运行下面的命令 zkServer.sh start - 在slave1服务器上运行下面的命令 source /root/.bashrc zkServer.sh start - 在slave1服务器上运行下面的命令 source /root/.bashrc zkServer.sh start 5) 验证Zookeeper是否安装和启动成功 - 在master服务器上运行命令:jps和zkServer.sh status root@master:/usr/local/spark/zookeeper-3.4.6/bin# jps 3844 QuorumPeerMain 4790 Jps zkServer.sh status root@master:/usr/local/spark/zookeeper-3.4.6/bin# zkServer.sh status JMX enabled by default Using config: /usr/local/spark/zookeeper-3.4.6/bin/../conf/zoo.cfg Mode: follower - 在slave1服务器上运行命令:jps和zkServer.sh status source /root/.bashrc root@slave1:/usr/local/spark/zookeeper-3.4.6/bin# jps 3462 QuorumPeerMain 4313 Jps root@slave1:/usr/local/spark/zookeeper-3.4.6/bin# zkServer.sh status JMX enabled by default Using config: /usr/local/spark/zookeeper-3.4.6/bin/../conf/zoo.cfg Mode: follower - 在slave2服务器上运行命令:jps和zkServer.sh status root@slave2:/usr/local/spark/zookeeper-3.4.6/bin# jps 4073 Jps 3277 QuorumPeerMain root@slave2:/usr/local/spark/zookeeper-3.4.6/bin# zkServer.sh status JMX enabled by default Using config: /usr/local/spark/zookeeper-3.4.6/bin/../conf/zoo.cfg Mode: leader 至此,代表Zookeeper已经安装和配置成功。 2、安装和配置Kafka 本课中用到的Kafka版本是Kafka-2.10-0.9.0.1。 1) 下载Kafka 进入http://kafka.apache.org/downloads.html,左键单击kafka_2.10-0.9.0.1.tgz。提示:可以直接下载群里的Kafka安装文件。 下载kafka_2.10-0.9.0.1.tgz 1) 安装Kafka 提示:下面的步骤发生在master服务器。 以ubuntu14.04举例,把下载好的文件放到/root目录,用下面的命令解压: cd /root tar -zxvf kafka_2.10-0.9.0.1.tgz 解压后在/root目录会多出一个kafka_2.10-0.9.0.1的新目录,用下面的命令把它剪切到指定目录即安装好Kafka了: cd /root mv kafka_2.10-0.9.0.1 /usr/local 之后在/usr/local目录会多出一个kafka_2.10-0.9.0.1的新目录。下面我们讲如何配置安装好的Kafka。 2) 配置Kafka 提示:下面的步骤发生在master服务器。 配置.bashrc - 打开文件:vi /root/.bashrc - 在PATH配置行前添加: export KAFKA_HOME=/usr/local/kafka_2.10-0.9.0.1 - 最后修改PATH: export PATH=${JAVA_HOME}/bin:${ZOOKEEPER_HOME}/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${SCALA_HOME}/bin:${SPARK_HOME}/bin:${SPARK_HOME}/sbin:${HIVE_HOME}/bin:${KAFKA_HOME}/bin:$PATH - 使配置的环境变量立即生效:source /root/.bashrc 打开server.properties - cd $ZOOKEEPER_HOME/config - vi server.properties 配置server.properties broker.id=0 port=9092 zookeeper.connect=master:2181,slave1:2181,slave2:2181 3) 同步master的安装和配置到slave1和slave2 - 在master服务器上运行下面的命令 cd /root scp ./.bashrc root@slave1:/root scp ./.bashrc root@slave2:/root cd /usr/local scp -r ./kafka_2.10-0.9.0.1 root@slave1:/usr/local scp -r ./kafka_2.10-0.9.0.1 root@slave2:/usr/local - 在slave1服务器上运行下面的命令 vi $KAFKA_HOME/config/server.properties 修改broker.id=1。 - 在slave2服务器上运行下面的命令 vi $KAFKA_HOME/config/server.properties 修改broker.id=2。 4) 启动Kafka服务 - 在master服务器上运行下面的命令,nohup,在集群上终端不输出启动日志 cd $KAFKA_HOME/bin nohup ./kafka-server-start.sh ../config/server.properties & - 在slave1服务器上运行下面的命令 source /root/.bashrc cd $KAFKA_HOME/bin nohup ./kafka-server-start.sh ../config/server.properties & - 在slave2服务器上运行下面的命令 source /root/.bashrc cd $KAFKA_HOME/bin kafka-server-start.sh ../config/server.properties & 5) 验证Kafka是否安装和启动成功 - 在任意服务器上运行命令创建Topic“HelloKafka”: kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 3 --partitions 1 --topic HelloKafka - 在任意服务器上运行命令为创建的Topic“HelloKafka”生产一些消息: kafka-console-producer.sh --broker-list master:9092,slave1:9092,slave2:9092 --topic HelloKafka 输入下面的消息内容: This is DT_Spark! I’m Rocky! Life is short, you need Spark! - 在任意服务器上运行命令从指定的Topic“HelloKafka”上消费(拉取)消息: kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --from-beginning --topic HelloKafka 过一会儿,你会看到打印的消息内容: This is DT_Spark! I’m Rocky! Life is short, you need Spark! - 在任意服务器上运行命令查看所有的Topic名字: kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181 - 在任意服务器上运行命令查看指定Topic的概况: kafka-topics.sh --describe --zookeepermaster:2181,slave1:2181,slave2:2181 --topic HelloKafka 至此,代表Kafka已经安装和配置成功。 总结: 使用Spark Streaming可以处理各种数据来源类型,如:数据库、HDFS,服务器log日志、网络流,其强大超越了你想象不到的场景,只是很多时候大家不会用,其真正原因是对Spark、spark streaming本身不了解。

优秀的个人博客,低调大师

Spark-Spark Streaming例子整理(三)

一、解密SparkStreaming另类在线实验 二、瞬间理解SparkStreaming本质 Spark源码定制,自己动手改进Spark源码,通常在电信、金融、教育、医疗、互联网等领域都有自己不同的业务,如果Sprak官方版本没有你需要的业务功能,你自己可以定制、扩展Spark的功能,满足公司的业务需要。 选择SparkStreaming框架源码研究、二次开发的原因 1、Spark起初只有Spark Core基础框架没有其他的子框架(Spark SQL、Spark Streaming、Spark ML、Spark Graphx、Spark R等),在其后加入了各种子框架来满足不同的需求。而分析这些子框架发现,选择Spark Streaming框架来研究,是最明智的选择,通过研究Spark Core上的Spark Streaming子框架,是迈向精通Spark力量源泉和解决所有问题之道。 2、Spark有很多子框架,我们选择Spark Streaming而为什么不用其他框架? Spark SQL涉及了很多SQL语法细节的解析和优化,当然分析其解析、优化从而集中精力去研究Spark而言是一件重要的事情,但不是最重要的事情,所以Spark SQL不太适合作为具体的子框架值得我们去研究。 目前Spark R现在不成熟,支撑功能有限。 图计算,从各版本演进而言Graphx几乎没有改进,这种趋势,Graphx是不是已经发展基本到尽头了;另外图计算而言有很多数学级别的算法,而要把Spark做到极致,数学对我们来说重要,但对于研究而言不是最重要的。 Mechine Learning在封装了Vector向量、Metrics构建了众多的算法库,从而涉及了太多的数学知识,所有选择ML其实也不是太好的选择。 最后筛选出SparkStreaming子框架才是最佳的研究切入黄金点。 Spark Streaming是流式计算框架,一切数据如果和流处理不相关的话都是无效的数据。流式处理才是我们真正对大数据的初步印象,数据流进来它立即会给我们一个反馈,而不是数据挖掘、图计算。Spark强悍地方是使用流处理可以完美的结合使用机器学习的成果、图计算的成果、Spark sql或者说spark R的成果。这得益于Spark的一体化、多元化的技术架构的设计,也就是说通过一个技术堆栈可以调用技术堆栈中所有的东西,根本不需要任何的设置,这是Spark无可匹敌之处也是SparkStreaming一统天下的根源。这个时代的流处理单打独斗是不行了,SparkStreaming和多个兄弟框架联合起来,无可匹敌。如果你精通SparkStreaming的话,恭喜你,因为SparkStreaming以及背后的几个兄弟框架正好展示了Spark大数据无穷的魅力。 整个Spark的所有应用程序,哪些程序容易出问题?肯定是SparkStreaming程序是最容易出问题的,因为数据是不断流入的,ss要动态的控制数据的流入、作业的切分、数据的处理,所以最容易出问题,但最容易出问题的地方同时也是最受关注的地方也是展示大数据最神奇魅力的地方。这些特色结合的话,也是最需要技术人才的地方。关注SparkStreaming在Spark的发展,你会很清晰知道,ss跟其他自框架不同之处,ss很象SparkCore上的一个应用程序。 正如世界万物发展一样,任何技术都有其关键点或转折点,SparkStreaming运行在SparkCore上,所以很多性能调优都是建立在SparkCore上的;Spark是大数据的龙脉,SparkStreaming是龙脉的穴位。 接下来感受一下龙脉和穴位 研究SparkStreaming时,有困惑你的东西,SparkStreaming数据不断流进来,根据batchInterval时间片不断生成Job,并将Job提交集群处理,如果能清晰的看到数据的流入和数据的处理,你心里会很很踏实。 如何能清晰的看到数据的处理过程呢?只需要一个小技巧:就是把SparkStreaming中的batchInterval放的足够大,例如说从30秒调整为1分钟一次batch,或者5分钟一次batch,你会很清晰的看到整个流程序的运行过程。 以广告点击在线黑名单的过滤为例 调整时间维度: 我们把时间从30秒调至300秒: 读取SparkStreaming Socket端口的数据: 打包程序发布至集群部署环境: 检查集群进程: 通过webui检查HDFS启动情况: 启动history-server监控进程及其对应的webui监控界面: 至此整个集群环境启动一切OK。 开始启动运行SparkStreaming应用程序 启动外部请求SparkStreaming服务端口的客户端: 输入待处理的数据流: 看结果如下: 看webui控制台: 点击链接进入后产生了0~4个Job: 有意思的是SparkStreaming应用程序启动实际执行的是一个Job,但真正执行的是5个Job,其分别是Receiver Job,Output Job,Output Job,Output Job,Start Job. 第 0 个Job是不是我们逻辑中的代码?不是的,不是reduceByKey的执行结果Job,如下图: SparkStreaming在启动的过程中会自动启动一些Job,如start操作: SparkStreaming最像一个应用程序,就算是算一次,也执行了好几个Job,就像spark应用程序一样,可以启动不同的Job完成不同的功能。 继续看Job1: 通过Job告诉你内幕:通过追踪Receiver发现其会产生makeRDD,实际上作为整个Job独立的一个stage,只在一台机器上执行,而且执行了1.5分钟,刚才启动SparkStreaming,没有任务执行1.5分钟的,如下图: 思考一下什么东西执行了1.5分钟,而整个Job只运行了2分钟? 答案就是ReceiverTracker接收器运行的,它需要接收流入的数据。这个Job就是Receiver,并且执行了1.5分钟,而启动的Receiver就是一个Job。 结论: SparkStreaming启动Receiver的是一个Job,在具体的Cluster的Worker上的executor中,启动Receiver是通过Job启动的。通过作业的运行时间看出,整个SparkStreaming运行的时间是2分钟,其中有个Job运行了1.5分钟,这个Job就是Receiver,其实指的是Receiver启动运行的时间,Receiver是在executor中运行的,也就是说SparkStreaming框架在启动Recevier是通过Job启动的。而且Receiver(可以启动多个receiver接收数据)就是在一个executor中运行且通过一个Task去接收我们的数据: 从这个角度讲Receiver接收数据和普通job有什么区别?没有区别。转过来给我们启发:在一个Spark application中可以启动很多的job,这些job之间可以相互配合。例如:SparkStreaming框架默认启动job给你接收数据,然后为后续的处理做准备,为你写复杂的应用程序奠定了一个良好的基础。这就是你写非常复杂的Spark应用程序的黄金切入点,复杂的程序一般都是有多个job构成的。 上图的Process_local即内存节点,SparkStreaming在默认情况下接收数据是memory_and_disk_ser_2的方式,也就是说接收的数据量比较少内存能存下的话默认情况下是不会存储磁盘的,在这里直接使用内存中。 看下第0个job: 在4个worker上启动4个executor,是在最大化的使用计算资源,通过第1个job 不断接收数据。 这里处理数据有shuffle read,shuffle write,通过socketTextStream即rdd,这里叫blockRdd,而且blockrdd来自于socketTextStream的方法: 其实是inputStream帮我们在固定时间间隔内会产生固定的rdd,接收数据是在一个executor的task中接收的,但现在处理数据是transform操作发生在executor里面的发生在4个executor,这个结果告诉我们在一台机器上接收数据,但实际上是在四台机器上处理数据的。最大化利用集群资源处理数据。SparkStreaming程序执行时就是一个batch级别的Job,里面做了很多事情。整个处理,其实只有一个Job真正在执行,但产生很多Job相互协调来完成复杂的业务处理,这个情况告诉我们SparkStreaming并不是网络、博客、书籍、官网上讲的那么简单。 SparkStreaming本身是随着流进来的数据按照时间为单位生成job,然后触发job在Cluster上执行的流式处理的引擎,它本身是加上以时间为维度的批处理,实例中以300秒为会产生一批数据,基于这一批数据会生成rdd,基于rdd会触发job,rdd的生成、job的触发,都是SparkStreaming框架去做的。SparkStreaming中有个至关只要的东西叫DStream,我们每隔一定时间都会生成rdd,产生rdd的依赖或触发job具体的执行。每隔时间,所以弄了一个DStream,DStream代表时空的概念,时间为维度,随着时间的推进不断产生rdd,实际上DStream就是rdd的集合,只不过是有时间的先后顺序;空间维度实际上是DStream的处理层面,我们对DStream进行处理实际上是对DStream里面的每个rdd的处理。整个时空是一个很大的概念,时间固定的话,可以锁定对空间的操作,操作其实就是transform,对DStream的操作会构建DStream Graph。 总结: 随着时间为维度有个DStream Graph,同时在时间维度下有个空间维度,空间维度就是操作,空间维度确定的情况下时间不断推进的时候他就不断把空间维度的DStream Graph实例化成rdd的graph,然后触发具体的job进行执行。 一、解密SparkStreaming运行机制 二、解密SparkStreaming架构 SparkStreaming运行时更像SparkCore上的应用程序,SparkStreaming程序启动后会启动很多job,每个batchIntval、windowByKey的job、框架运行启动的job。例如,Receiver启动时也启动了job,此job为其他job服务,所以需要做复杂的Spark程序,往往多个job之间互相配合。SparkStreaming是最复杂的应用程序,如果对SparkStreaming了如指掌的话,做其他的Spark应用程序没有任何问题。看下官网:Spark sql,SparkStreaming,Sparkml,Sparkgraphx子框架都是后面开发出来的,我们要洞悉Spark Core 的话,SparkStreaming是最好的切入方式。 进入Spark官网,可以看到SparkCore和其他子框架的关系: SparkStreaming启动后,数据不断通过inputStream流进来,根据时间划分成不同的job、就是batchs of input data,每个job有一序列rdd的依赖。Rdd的依赖有输入的数据,所以这里就是不同的rdd依赖构成的batch,这些batch是不同的job,根据spark引擎来得出一个个结果。DStream是逻辑级别的,而RDD是物理级别的。DStream是随着时间的流动内部将集合封装RDD。对DStream的操作,转过来是对其内部的RDD操作。 我是使用SparkCore 编程都是基于rdd编程,rdd间有依赖关系,如下图右侧的依赖关系图,SparkStreaming运行时,根据时间为维度不断的运行。Rdd的dag依赖是空间维度,而DStream在rdd的基础上加上了时间维度,所以构成了SparkStreaming的时空维度。 SparkStreaming在rdd的基础上增加了时间维度,运行时可以清晰看到jobscheduler、mappartitionrdd、shuffledrdd、blockmaanager等等,这些都是SparkCore的内容,而DStream、jobgenerator、socketInputDstream等等都是SparkStreaming的内容,如下图运行过程可以很清晰的看到: 现在通过SparkStreaming的时空维度来细致说明SparkStreaming运行机制 时间维度:按照固定时间间隔不断地产生job对象,并在集群上运行: 包含有batch interval,窗口长度,窗口滑动时间等 空间维度:代表的是RDD的依赖关系构成的具体的处理逻辑的步骤,是用DStream来表示的: 1、需要RDD,DAG的生成模板 2、TimeLine的job控制器、 3、InputStream和outputstream代表的数据输入输出 4、具体Job运行在Spark Cluster之上,此时系统容错就至关重要 5、事务处理,在处理出现奔溃的情况下保证Exactly once的事务语义一致性 随着时间的流动,基于DStream Graph不断生成RDD Graph,也就是DAG的方式生成job,并通过Job Scheduler的线程池的方式提交给Spark Cluster不断的执行, 由上图可知,RDD 与 DStream之间的关系如下: 1、RDD是物理级别的,而 DStream 是逻辑级别的; 2、DStream是RDD的封装模板类,是RDD进一步的抽象; 3、DStream要依赖RDD进行具体的数据计算; Spark Streaming源码解析 1、StreamingContext方法中调用JobScheduler的start方法: val ssc = new StreamingContext(conf, Seconds(5)) val lines = ssc.socketTextStream("Master", 9999) ......//业务处理代码略 ssc.start() ssc.awaitTermination() 我们进入JobScheduler start方法的内部继续分析: 1、JobScheduler 通过onReceive方法接收各种消息并存入enventLoop消息循环体中。 2、通过rateController对流入SparkStreaming的数据进行限流控制。 3、在JobScheduler的start内部会构造JobGenerator和ReceiverTacker,并且调用JobGenerator和ReceiverTacker的start方法。 ReceiverTacker的启动方法: 1、ReceiverTracker启动后会创建ReceiverTrackerEndpoint这个消息循环体,来接收运行在Executor上的Receiver发送过来的消息。 2、ReceiverTracker启动后会在Spark Cluster中启动executor中的Receivers。 JobGenerator的启动方法: 1、JobGenerator启动后会启动以batchInterval时间间隔发送GenerateJobs消息的定时器 a.SparkStreaming Job架构和运行机制 b. Spark Streaming Job 容错架构和运行机制 注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解。 上节回顾: 上节课谈到Spark Streaming是基于DStream编程。DStream是逻辑级别的,而RDD是物理级别的。DStream是随着时间的流动内部将集合封装RDD。对DStream的操作,归根结底还是对其RDD进行的操作。 如果将Spark Streaming放在坐标系中,并以Y轴表示对RDD的操作,RDD的依赖关系构成了整个job的逻辑应用,以X轴作为时间。随着时间的流逝,以固定的时间间隔(Batch Interval)产生一个个job实例,进而在集群中运行。 同时也为大家详细总结并揭秘 Spark Streaming五大核心特征:特征1:逻辑管理、特征2:时间管理、特征3:流式输入和输出、特征4:高容错、特征5:事务处理。最后结合Spark Streaming源码做了进一步解析。 ** 开讲 ** 由上一讲可以得知,以固定的时间间隔(Batch Interval)产生一个个job实例。那么在时间维度和空间维度组成的时空维度的Spark Streaming中,Job的架构和运行机制、及其容错架构和运行机制是怎样的呢? 那我们从爱因斯坦的相对时空讲起吧: a、时间和空间是紧密联系的统一体,也称为时空连续体。 b、时空是相对的,不同的观察者看到的时间,长度,质量都可以不一样。 c、对于两个没有联系的事件,没有绝对的先后顺序。但是因果关系可以确定事件的先后,比如Job的实例产生并运行在集群中,那么Job实例的产生事件必然发生在Job运行集群中之前。 就是说Job的实例产生和单向流动的时间之间,没有必然的联系;在这里时间只是一种假象。 怎么更好的理解这句话呢?那我们就得从以下方面为大家逐步解答。 什么是Spark Streaming Job 架构和运行机制 ? 对于一般的Spark应用程序来说,是RDD的action操作触发了Job的运行。那对于SparkStreaming来说,Job是怎么样运行的呢?我们在编写SparkStreaming程序的时候,设置了BatchDuration,Job每隔BatchDuration时间会自动触发,这个功能是Spark Streaming框架提供了一个定时器,时间一到就将编写的程序提交给Spark,并以Spark job的方式运行。 通过案例透视Job架构和运行机制 案例代码如下: 将上述代码打成JAR包,再上传到集群中运行 集群中运行结果如下 运行过程总图如下 案例详情解析 a、 首先通过StreamingContext调用start方法,其内部再启动JobScheduler的Start方法,进行消息循环; (StreamingContext.scala,610行代码) (JobScheduler.scala,83行代码) b、 在JobScheduler的start内部会构造JobGenerator和ReceiverTacker; (JobScheduler.scala,82、83行代码) c、 然后调用JobGenerator和ReceiverTacker的start方法执行以下操作: (JobScheduler.scala,79、98行代码) (ReceiverTacker.scala,149、157行代码) JobGenerator启动后会不断的根据batchDuration生成一个个的Job ; (JobScheduler.scala,208行代码) ReceiverTracker的作用主要是两点: 1.对Receiver的运行进行管理,ReceiverTracker启动时会调用lanuchReceivers()方法,进而会使用rpc通信启动Receiver(实际代码中,Receiver外面还有一层包装ReceiverSupervisor实现高可用) (ReceiverTracker.scala,423行代码) 2.管理Receiver的元数据,供Job对数据进行索引,元数据的核心结构是receivedBlockTracker (ReceiverTracker.scala,106~112行代码) d、 在Receiver收到数据后会通过ReceiverSupervisor存储到Executor的BlockManager中 ; e、 同时把数据的Metadata信息发送给Driver中的ReceiverTracker,在ReceiverTracker内部会通过ReceivedBlockTracker来管理接受到的元数据信息; 这里面涉及到两个Job的概念: 每个BatchInterval会产生一个具体的Job,其实这里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD的DAG而已,从Java角度讲,相当于Runnable接口实例,此时要想运行Job需要提交给JobScheduler,在JobScheduler中通过线程池的方式找到一个单独的线程来提交Job到集群运行(其实是在线程中基于RDD的Action触发真正的作业的运行) 为什么使用线程池呢? a 、作业不断生成,所以为了提升效率,我们需要线程池;这和在Executor中通过线程池执行Task有异曲同工之妙; b 、有可能设置了Job的FAIR公平调度的方式,这个时候也需要多线程的支持; Spark Streaming Job 容错架构和运行机制 Spark Streaming是基于DStream的容错机制,DStream是随着时间流逝不断的产生RDD,也就是说DStream是在固定的时间上操作RDD,容错会划分到每一次所形成的RDD。 Spark Streaming的容错包括 Executor 与 Driver两方面的容错机制 : a、 Executor 容错: 1. 数据接收:分布式方式、wal方式,先写日志再保存数据到Executor 2. 任务执行安全性 Job基于RDD容错 : b、Driver容错 : checkpoint 。 基于RDD的特性,它的容错机制主要就是两种: 1. 基于checkpoint; 在stage之间,是宽依赖,产生了shuffle操作,lineage链条过于复杂和冗长,这时候就需要做checkpoint。 2. 基于lineage(血统)的容错: 一般而言,spark选择血统容错,因为对于大规模的数据集,做检查点的成本很高。 考虑到RDD的依赖关系,每个stage内部都是窄依赖,此时一般基于lineage容错,方便高效。 总结: stage内部做lineage,stage之间做checkpoint。

优秀的个人博客,低调大师

Spark MLlib知识点学习整理

MLlib的设计原理:把数据以RDD的形式表示,然后在分布式数据集上调用各种算法。MLlib就是RDD上一系列可供调用的函数的集合。 操作步骤: 1、用字符串RDD来表示信息。 2、运行MLlib中的一个特征提取算法来吧文本数据转换为数值的特征。给操作会返回一个向量RDD。 3、对向量RDD调用分类算法,返回一个模型对象,可以使用该对象对新的数据点进行分类。 4、使用MLlib的评估函数在测试数据集上评估模型。 机器学习基础: 机器学习算法尝试根据 训练数据 使得表示算法行为的数学目标最大化,并以此来进行预测或作出决定。包括分类、回归、聚类,每种都有不一样的目标。 所有的学习算法都需要定义每个数据点的特征集,也就是传给学习函数的值。 更重要的在于如何去正确的定义特征。 例如: 在产品推荐的任务中,仅仅机上一个额外的特征(推荐给用户的书籍也可能取决于用户看过的电影),就有可能极大地改进结果。 当数据已成为特征向量的形式后,大多数机器学习算法会根据这些向量优化一个定义好的数学模型。 然后算法会再运行结束时返回一个代表学习决定的模型。 MLlib数据类型 1、Vector 一个数学向量。MLlib既支持稠密向量也支持稀疏向量。前者表示向量的每一位都存储下来,后者则存储非零位以节省空间。 稠密向量:把所有唯独的值存放在一个浮点整数组中。 稀疏向量只把各维度中的非零值存储下来。当最多只有10%的元素为非零元素时,通常更倾向于使用稀疏向量。 spark中创建向量的方式有 import org.apache.spark.mllib.linalg.Vectors //创建稠密向量<1.0,2.0,3.0>;Vectors.dense接收一串值或一个数组 val denseVec1 = Vectors.dense(1.0,2.0,3.0)) val denseVec2 = Vectors.dense(Array(1.0,2.0,3.0)) //创建稀疏向量<1.0,0.0,2.0,0.0> 向量的维度(4) 以及非零位的位置和对应的值 val sparseVec1 = Vectors.sparse(4,Array(0,2),Array(1.0,2.0)) 2、LabeledPoint 诸如分类和回归的算法这样的监督学习算法中,LabeledPoint用来表示带标签的数据点。它包含一个特征向量与一个标签(由一个浮点数表示),位置在mllib.regression包中。 3、Rating 用户对一个产品的评分,在mllib.recomendation包中,用于产品推荐。 4、各种Model类 每个Model都是训练算法的结果,一般有一个predict()方法可以用来对新的数据点或数据点组成的RDD应用该模型进行预测。 特征转化: TF-IDF:词频,逆文档频率是一种用来从文本文档中生成特征向量的简单方法。它为文档中的每个词计算两个统计值:一个是词频(TF),也就是每个词在文档中出现的次数,另一个是逆文档频率(IDF),用来衡量一个词语特定文档的相关度。 MLlib有两个算法可以用来计算TF-IDF:HashTF和TF HashTF从一个文档中计算出给定大小的词频向量。为了将词和向量顺序对应起来,所以使用了哈希。HashingTF使用每个单词对所需向量的长度S取模得出的哈希值,把所有单词映射到一个0到S-1之间的数字上。由此可以保证生成一个S维的向量。随后当构建好词频向量后,使用IDF来计算逆文档频率,然后将它们与词频相乘计算TF-IDF。 MLlib统计 1、Statistics.colStats(rdd) 计算由向量组成的RDD的统计性综述,保存着向量集合中每列的最大值、最小值、平均值和方差。 2、statistics.corr(rdd,method_ 计算由向量组成的RDD中的列间的相关矩阵,使用皮卡森相关或斯皮尔曼相关中的一种。 3、statistics.corr(rdd1,rdd2,method) 计算两个由浮点值组成的RDD的相关矩阵。 4、Statistics.chiSqTest(rdd) 计算由LabeledPoint对象组成的RDD中每个特征与标签的皮卡森独立性测试结果。返回一个ChiSqTestResult对象,其中有p值、测试统计及每个特征的自由度。 分类与回归 监督试学习指算法尝试使用有标签的训练数据(已知结果的数据点)根据对象的特征预测的结果。在分类中,预测出的变量是离散的(就是一个在有限集中的值,叫做类别) 。比如,分类可能是将邮件文卫垃圾邮件和非垃圾邮件,也有可能是文本所使用的语言。在回归中,预测出的是变量是连续的(根据年龄和体重预测一个人的身高) 线性回归: 1、numIteratrions 要运行的迭代次数(默认值:100) 2、stepSize 梯度下降的步长(默认值:1.0) 3、intercept 是否给数据加上一个干扰特征或者偏差特征--也就是一个值始终未1的特征(默认值:false) 4、regParam Lasso和ridge的正规化参数(默认值:1.0) import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.regression.LinearRegressionWithSGD val points: RDD[LabeledPoint] = //.. val lr = new LinearRegressiionWithSGD().setNumIterations(200).setIntercept(true) val model = lr.run(points) println("weight: %s, intercept: %s".format(model.weights, model.intercept)) 逻辑回归 用来寻找一个分割阴性和阳性示例的线性分割平面。在MLlib中,接收一组标签为0或1的LabeledPoint,返回可以预测新点的分类的LogisticRegressionModel对象。 决策树与随机深林 决策树是一个灵活的模型,可以用来进行分类,也可以用来进行回归。决策树以节点树的形式表示,每个节点基于数据的特征作出一个二元决定(比如这个人的年龄是否大于20?),而树的每个叶节点则包含一种预测结果(例如,这个人是不是会买一个产品?)决策树的吸引力在于模型本身容易检查,而且决策树既支持分类的特征,也支持连续的特征。 参考于:《Spark快速大数据分析》

优秀的个人博客,低调大师

spark RDD transformation与action函数整理

1.创建RDD val lines = sc.parallelize(List("pandas","i like pandas")) 2.加载本地文件到RDD val linesRDD = sc.textFile("yangsy.txt") 3.过滤 filter 需要注意的是 filter并不会在原有RDD上过滤,而是根据filter的内容重新创建了一个RDD val spark = linesRDD.filter(line => line.contains("damowang")) 4.count() 也是aciton操作 由于spark为懒加载 之前的语句不管对错其实都没执行 只有到调用action 如count() first() foreach()等操作的时候 才会真正去执行 spark.count() 5.foreach(println) 输出查看数据 (使用take可获取少量数据,如果工程项目中为DataFrame,可以调用show(1)) 这里提到一个东西,就是调用collect()函数 这个函数会将所有数据加载到driver端,一般数据量巨大的时候还是不要调用collect函数()否则会撑爆dirver服务器 虽然我们项目中暂时的确是用collect()把4000多万数据加载到dirver上了- =) spark.take(1).foreach(println) 6.常见的转化操作和行动操作 常见的转化操作如map()和filter() 比如计算RDD中各值的平方: val input = sc.parallelize(List(1,2,3,4)) val result = input.map(x => x*x) println(result.collect().mkString(",")) 7.flatMap() 与map类似,不过返回的是一个返回值序列的迭代器。得到的是一个包含各种迭代器可访问的所有元素的RDD。简单的用途比如把一个字符串切分成单词 val lines = sc.parallelize(List("xiaojingjing is my love","damowang","kings_landing")) val words = lines.flatMap(line => line.split(",")) //调用first()返回第一个值 words.first() 归类总结RDD的transformation操作: 对一个数据集(1,2,3,3)的RDD进行基本的RDD转化操作 map: 将函数应用于RDD中的每个元素,将返回值构成一个新的RDD eg: rdd.map(x => x+1) result: {2,3,4,4) flatmap:将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD,通常用来拆分 eg:rdd.flatMap(x => x.split(",")) .take(1).foreach(println) result: 1 flter:返回一个由通过传给filter的函数的元素组成的RDD eg:rdd.filter(x => x != 1) result: {2,3,3} distinct:用来去重 eg:rdd.distinct() {1,2,3} 对数据分别为{1,2,3}和{3,4,5}的RDD进行针对两个RDD的转换操作 union: 生成一个包含所有两个RDD中所有元素的RDD eg: rdd.union(other) result:{1,2,3,3,4,5} intersection:求两个元素中的共同的元素 eg:rdd.intersection(ohter) result:{3} substract() 移除RDD中的内容 eg:rdd.substract(other) result:{1,2} cartesian() 与另一个RDD的笛卡尔积 eg:rdd.cartesian(other) result:{(1,3),(1,4),(1,5)....(3,5)} 以上皆为transformation操作,下来action操作 9.reduce 并行整合RDD中所有数据 val lines1 = sc.parallelize(List(1,2,3,3)) lines1.reduce((x,y) => x + y) 10.reducebykey 最简单的就是实现wordcount的 统计出现的数目,原理在于map函数将rdd转化为一个二元组,再通过reduceByKey进行元祖的归约。 val linesRDD = sc.textFile("yangsy.txt")val count = linesRDD.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_).collect() 11.aggregate函数 与reduce相似,不过返回的是不同类型的函数 val result = input.aggregate((0,0))(acc.value) => (acc._1+value,acc._2+1),(acc1,acc2) =>(acc1._1 + acc2._1 , acc1._2 + acc2._2)) 还有很多比如count(),take(num)等就不一一练习了 12.collect函数还有foreach函数 其实刚才已经用到了,这里也不多说了~ 归纳总结RDD的action操作: 对一个数据为{1,2,3,3}的RDD的操作 collect: 返回RDD中的所有元素 rdd.collect() count: RDD中的元素的个数 countByValue: 返回各元素在RDD中出现的次数 : eg:rdd.countByValue() [(1,1),(2,1),(3,2)....] take(num): 从RDD中返回num个元素 top(num) : 从RDD中返回最前面的num个元素 takeSample(withReplacement,num,[seed]) : 从RDD中返回任意一些元素 eg: rdd.takeSample(false,1) reduce(func): 并行整合RDD中所有的数据 rdd.reduce(x,y) => x + y) foreach(func):对RDD中的每个元素使用给定的函数 在调用persist()函数将数据缓存如内存 想删除的话可以调用unpersist()函数 Pari RDD的转化操作 由于Pair RDD中包含二元组,所以需要传递的函数应当操作二元组而不是独立的元素 12.reduceByKey(fuc) 其实刚才wordcount应经用过 就是将相同的key的value进行合并 val lines1 = sc.parallelize(List((1,2),(3,4),(3,6)))val lines = lines1.reduceByKey((x,y) => x + y)lines.take(2).foreach(println) 13.groupByKey 将相同键的值进行分组 val lines1 = sc.parallelize(List((1,2),(3,4),(3,6))) lines1.groupByKey() lines.take(3).foreach(println) 14.mapValues 对pair RDD中的每个值应用一个函数而不改变键 val lines1 = sc.parallelize(List((1,2),(3,4),(3,6))) val lines = lines1.mapValues(x => x+1) lines.take(3).foreach(println) 15.sortByKey 返回一个根据键排序的RDD val lines1 = sc.parallelize(List((1,2),(4,3),(3,6))) val lines = lines1.sortByKey() lines.take(3).foreach(println) 针对两个不同的pair RDD的转化操作 16.subtractByKey 删掉RDD中键与其他RDD中的键相同的元素 val lines1 = sc.parallelize(List((1,2),(4,3),(3,6))) val lines2 = sc.parallelize(List((1,3),(5,3),(7,6))) val lines = lines1.subtractByKey(lines2) lines.take(3).foreach(println) 17.join 对两个RDD具有相同键的进行合并 val lines1 = sc.parallelize(List((1,2),(4,3),(3,6))) val lines2 = sc.parallelize(List((1,3),(5,3),(7,6))) val lines = lines1.join(lines2) lines.take(3).foreach(println) 18.rightOuterJoin 对两个RDD进行连接操作,确保第一个RDD的键必须存在 相反的为leftOuterJoin val lines1 = sc.parallelize(List((1,2),(4,3),(3,6))) val lines2 = sc.parallelize(List((1,3),(5,3),(7,6))) val lines = lines1.rightOuterJoin(lines2) lines.take(3).foreach(println) 19.cogroup 将两个RDD中拥有相同键的数据分组 val lines1 = sc.parallelize(List((1,2),(4,3),(3,6))) val lines2 = sc.parallelize(List((1,3),(5,3),(7,6))) val lines = lines1.cogroup(lines2) lines.take(3).foreach(println) 20. 用Scala对第二个元素进行筛选 val lines1 = sc.parallelize(List((1,2),(4,3),(3,6))) val result = lines1.filter{case(key,value) => value < 3} result.take(3).foreach(println) 聚合操作 21.在scala中使用reduceByKey()和mapValues()计算每个值对应的平均值 这个过程是这样的 首先通过mapValues函数,将value转化为了(2,1),(3,1),(6,1),(4,1) 然后通过reduceByKey合并相同键的结果 (其实就是mapreduce) val lines1 = sc.parallelize(List(("panda",2),("pink",3),("panda",6),("pirate",4))) val lines = lines1.mapValues(x =>(x,1)).reduceByKey((x,y) => (x._1 + y._1 , x._2 + y._2))lines.take(3).foreach(println) 22.countByValue 其实原理跟reduceByKey一样 另一半wordCount val linesRDD = sc.textFile("yangsy.txt") val count = linesRDD.flatMap(line => line.split(" ")).countByValue() 22.并行度问题 在执行聚合操作或者分组操作的时候,可以要求Spark使用给定的分区数,Spark始终尝试根据集群的大小推出一个有意义的默认值,但是有时候可能要对并行度进行调优来获取更好的性能。 (重要)比如wordcount,多加一个参数代表需要执行的partition的size val linesRDD = sc.textFile("yangsy.txt") val count = linesRDD.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_,10) 读取HDFS中csv文件 import java.io.StringReader import au.com.bytecode.opencsv.CSVReader val input = sc.textFile("test.csv") val result = input.map{line => val reader = new CSVReader(new StringReader(line)); reader.readNext()} result.collect()

资源下载

更多资源
优质分享App

优质分享App

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

Nacos

Nacos

Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。

Spring

Spring

Spring框架(Spring Framework)是由Rod Johnson于2002年提出的开源Java企业级应用框架,旨在通过使用JavaBean替代传统EJB实现方式降低企业级编程开发的复杂性。该框架基于简单性、可测试性和松耦合性设计理念,提供核心容器、应用上下文、数据访问集成等模块,支持整合Hibernate、Struts等第三方框架,其适用范围不仅限于服务器端开发,绝大多数Java应用均可从中受益。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。