首页 文章 精选 留言 我的

精选列表

搜索[学习],共10000篇文章
优秀的个人博客,低调大师

一脸懵逼学习Storm的搭建--(一个开源的分布式实时计算系统)

Storm的官方网址:http://storm.apache.org/index.html 1:集群部署的基本流程(基本套路): 集群部署的流程:下载安装包、解压安装包、修改配置文件、分发安装包、启动集群; 1:安装一个zookeeper集群,之前已经部署过,这里省略,贴一下步骤; 安装配置zooekeeper集群: 1.1:解压 tar -zxvf zookeeper-3.4.5.tar.gz 1.2:修改配置 cd /home/hadoop/zookeeper-3.4.5/conf/ cp zoo_sample.cfg zoo.cfg vim zoo.cfg 修改:dataDir=/home/hadoop/zookeeper-3.4.5/tmp 在最后添加: server.1=master:2888:3888 server.2=slaver1:2888:3888 server.3=slaver2:2888:3888 保存退出 然后创建一个tmp文件夹 mkdir /home/hadoop/zookeeper-3.4.5/tmp 再创建一个空文件 touch /home/hadoop/zookeeper-3.4.5/tmp/myid 最后向该文件写入ID echo 1 > /home/hadoop/zookeeper-3.4.5/tmp/myid 1.3将配置好的zookeeper拷贝到其他节点: scp -r /home/hadoop/zookeeper-3.4.5/ slaver1:/home/hadoop/ scp -r /home/hadoop/zookeeper-3.4.5/ slaver2:/home/hadoop/ 注意:修改slaver1、slaver2对应/home/hadoop/zookeeper-3.4.5/tmp/myid内容 slaver1: echo 2 > /home/hadoop/zookeeper-3.4.5/tmp/myid slaver2: echo 3 > /home/hadoop/zookeeper-3.4.5/tmp/myid 2、上传storm的安装包,解压缩: [root@master hadoop]# tar -zxvf apache-storm-0.9.2-incubating.tar.gz 3、修改配置文件storm.yaml: 可以创建一个软连接,方便操作storm:[root@master soft]# ln -s apache-storm-0.9.2-incubating storm 修改内容如下所示,下面两个配置均可: #指定storm使用的zk集群 storm.zookeeper.servers: - "zk01" - "zk02" - "zk03" #指定storm集群中的nimbus节点所在的服务器 nimbus.host: "storm01" #指定nimbus启动JVM最大可用内存大小 nimbus.childopts: "-Xmx1024m" #指定supervisor启动JVM最大可用内存大小 supervisor.childopts: "-Xmx1024m" #指定supervisor节点上,每个worker启动JVM最大可用内存大小 worker.childopts: "-Xmx768m" #指定ui启动JVM最大可用内存大小,ui服务一般与nimbus同在一个节点上。 ui.childopts: "-Xmx768m" #指定supervisor节点上,启动worker时对应的端口号,每个端口对应槽,每个槽位对应一个worker supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 此次配置使用下面的,进行集群安装: #所使用的zookeeper集群主机 storm.zookeeper.servers: - "master" - "slaver1" - "slaver2" #nimbus所在的主机名 nimbus.host: "master"supervisor.slots.ports-6701-6702-6703-6704-6705 然后将master修改好的storm发送到slaver1,slaver2: [root@master hadoop]# scp -r apache-storm-0.9.2-incubating/ slaver1:/home/hadoop/ [root@master hadoop]# scp -r apache-storm-0.9.2-incubating/ slaver2:/home/hadoop/ 4:启动storm集群,首先启动你的Zookeeper集群,然后再启动你的storm集群哈。 启动Zookeeper集群: 然后启动Storm集群: 启动storm 在nimbus主机上,在nimbus.host所属的机器上启动 nimbus服务: nohup ./storm nimbus 1>/dev/null 2>&1 & 或者使用命令:nohup ./storm nimbus &在nimbus.host所属的机器上启动ui服务: nohup ./storm ui 1>/dev/null 2>&1 & 或者使用命令:nohup ./storm ui & 在supervisor主机上,在其它个点击上启动supervisor服务: nohup ./storm supervisor 1>/dev/null 2>&1 & 或者使用命令:nohup ./storm supervisor & 注意,解释: 1>/dev/null:代表标准输入到这个目录; 2>&1:代表标准输出也到这个目录下面; &:代表这个是后台运行; 如下启动storm方便观察,最后一行是卡住不动的哦: 查看进程如下所示: 可以启动一下storm的ui查看: 查看一下进程如: 启动ui以后可以在浏览器访问,如: 启动supervisor [root@slaver1 bin]# ./storm supervisor 然后可以启动剩下的storm: [root@slaver2 bin]# ./storm supervisor 启动以后可以查看进程jps的启动情况,然后可以去浏览器查看自己http://192.168.3.129:8080/index.html的页面各个启动情况,如supervisor等等。 5:Storm常用操作命令: 1:有许多简单且有用的命令可以用来管理拓扑,它们可以提交、杀死、禁用、再平衡拓扑。 提交任务命令格式:storm jar 【jar路径】 【拓扑包名.拓扑类名】 【拓扑名称】 2:bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.6.jar storm.starter.WordCountTopology wordcount 杀死任务命令格式:storm kill 【拓扑名称】 -w 10(执行kill命令时可以通过-w [等待秒数]指定拓扑停用以后的等待时间) 3:storm kill topology-name -w 10 停用任务命令格式:storm deactivte 【拓扑名称】 4:storm deactivte topology-name 5:我们能够挂起或停用运行中的拓扑。当停用拓扑时,所有已分发的元组都会得到处理,但是spouts的nextTuple方法不会被调用。销毁一个拓扑,可以使用kill命令。它会以一种安全的方式销毁一个拓扑,首先停用拓扑,在等待拓扑消息的时间段内允许拓扑完成当前的数据流。 启用任务命令格式:storm activate【拓扑名称】 storm activate topology-name 重新部署任务命令格式:storm rebalance 【拓扑名称】 storm rebalance topology-name 再平衡使你重分配集群任务。这是个很强大的命令。比如,你向一个运行中的集群增加了节点。再平衡命令将会停用拓扑,然后在相应超时时间之后重分配工人,并重启拓扑。 注意使用storm运行jar包的时候是没有输入输出路径的,区别于hadoop离线分析: 1 [root@master storm]# bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.2-incubating.jar storm.starter.WordCountTopology wordcount 2 Running: /home/hadoop/soft/jdk1.7.0_65/bin/java -client -Dstorm.options= -Dstorm.home=/home/hadoop/soft/apache-storm-0.9.2-incubating -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /home/hadoop/soft/apache-storm-0.9.2-incubating/lib/commons-codec-1.6.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/hiccup-0.3.6.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/curator-client-2.4.0.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/clout-1.0.1.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/json-simple-1.1.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/reflectasm-1.07-shaded.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/httpclient-4.3.3.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/jgrapht-core-0.9.0.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/jline-2.11.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/ring-servlet-0.3.11.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/asm-4.0.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/clojure-1.5.1.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/joda-time-2.0.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/minlog-1.2.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/logback-classic-1.0.6.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/kryo-2.21.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/netty-3.6.3.Final.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/log4j-over-slf4j-1.6.6.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/commons-lang-2.5.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/tools.logging-0.2.3.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/commons-logging-1.1.3.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/compojure-1.1.3.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/logback-core-1.0.6.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/math.numeric-tower-0.0.1.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/ring-devel-0.3.11.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/servlet-api-2.5.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/commons-fileupload-1.2.1.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/guava-13.0.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/snakeyaml-1.11.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/jetty-6.1.26.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/disruptor-2.10.1.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/clj-time-0.4.1.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/jetty-util-6.1.26.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/httpcore-4.3.2.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/core.incubator-0.1.0.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/carbonite-1.4.0.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/commons-io-2.4.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/clj-stacktrace-0.2.4.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/slf4j-api-1.6.5.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/curator-framework-2.4.0.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/netty-3.2.2.Final.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/ring-core-1.1.5.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/chill-java-0.3.5.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/commons-exec-1.1.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/tools.macro-0.1.0.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/storm-core-0.9.2-incubating.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/ring-jetty-adapter-0.3.11.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/zookeeper-3.4.5.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/servlet-api-2.5-20081211.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/objenesis-1.2.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/lib/tools.cli-0.2.4.jar:examples/storm-starter/storm-starter-topologies-0.9.2-incubating.jar:/home/hadoop/soft/apache-storm-0.9.2-incubating/conf:/home/hadoop/soft/apache-storm-0.9.2-incubating/bin -Dstorm.jar=examples/storm-starter/storm-starter-topologies-0.9.2-incubating.jar storm.starter.WordCountTopology wordcount 3 712 [main] INFO backtype.storm.StormSubmitter - Jar not uploaded to master yet. Submitting jar... 4 772 [main] INFO backtype.storm.StormSubmitter - Uploading topology jar examples/storm-starter/storm-starter-topologies-0.9.2-incubating.jar to assigned location: storm-local/nimbus/inbox/stormjar-76fccf41-491e-4d61-8a98-4092c8630161.jar 5 Start uploading file 'examples/storm-starter/storm-starter-topologies-0.9.2-incubating.jar' to 'storm-local/nimbus/inbox/stormjar-76fccf41-491e-4d61-8a98-4092c8630161.jar' (2927299 bytes) 6 [==================================================] 2927299 / 2927299 7 File 'examples/storm-starter/storm-starter-topologies-0.9.2-incubating.jar' uploaded to 'storm-local/nimbus/inbox/stormjar-76fccf41-491e-4d61-8a98-4092c8630161.jar' (2927299 bytes) 8 941 [main] INFO backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: storm-local/nimbus/inbox/stormjar-76fccf41-491e-4d61-8a98-4092c8630161.jar 9 941 [main] INFO backtype.storm.StormSubmitter - Submitting topology wordcount in distributed mode with conf {"topology.workers":3,"topology.debug":true} 10 1431 [main] INFO backtype.storm.StormSubmitter - Finished submitting topology: wordcount 11 [root@master storm]# 使用storm自带的统计的demo启动起来以后ui如下所示: 然后点击wordCount以后如下所示,点击下面的按钮是相应的操作: 点击上面的按钮是相应的操作; 出现如下错误,这里贴一下,出错原因是:/storm/conf/storm.yaml的配置nimbus.host: "master"前面多了一个空格,删除空格即可: 1 [root@slaver1 bin]# ./storm supervisor 2 Exception in thread "main" java.lang.ExceptionInInitializerError 3 at java.lang.Class.forName0(Native Method) 4 at java.lang.Class.forName(Class.java:190) 5 at backtype.storm.config$loading__4910__auto__.invoke(config.clj:17) 6 at backtype.storm.config__init.load(Unknown Source) 7 at backtype.storm.config__init.<clinit>(Unknown Source) 8 at java.lang.Class.forName0(Native Method) 9 at java.lang.Class.forName(Class.java:270) 10 at clojure.lang.RT.loadClassForName(RT.java:2098) 11 at clojure.lang.RT.load(RT.java:430) 12 at clojure.lang.RT.load(RT.java:411) 13 at clojure.core$load$fn__5018.invoke(core.clj:5530) 14 at clojure.core$load.doInvoke(core.clj:5529) 15 at clojure.lang.RestFn.invoke(RestFn.java:408) 16 at clojure.core$load_one.invoke(core.clj:5336) 17 at clojure.core$load_lib$fn__4967.invoke(core.clj:5375) 18 at clojure.core$load_lib.doInvoke(core.clj:5374) 19 at clojure.lang.RestFn.applyTo(RestFn.java:142) 20 at clojure.core$apply.invoke(core.clj:619) 21 at clojure.core$load_libs.doInvoke(core.clj:5417) 22 at clojure.lang.RestFn.applyTo(RestFn.java:137) 23 at clojure.core$apply.invoke(core.clj:621) 24 at clojure.core$use.doInvoke(core.clj:5507) 25 at clojure.lang.RestFn.invoke(RestFn.java:408) 26 at backtype.storm.command.config_value$loading__4910__auto__.invoke(config_value.clj:16) 27 at backtype.storm.command.config_value__init.load(Unknown Source) 28 at backtype.storm.command.config_value__init.<clinit>(Unknown Source) 29 at java.lang.Class.forName0(Native Method) 30 at java.lang.Class.forName(Class.java:270) 31 at clojure.lang.RT.loadClassForName(RT.java:2098) 32 at clojure.lang.RT.load(RT.java:430) 33 at clojure.lang.RT.load(RT.java:411) 34 at clojure.core$load$fn__5018.invoke(core.clj:5530) 35 at clojure.core$load.doInvoke(core.clj:5529) 36 at clojure.lang.RestFn.invoke(RestFn.java:408) 37 at clojure.lang.Var.invoke(Var.java:415) 38 at backtype.storm.command.config_value.<clinit>(Unknown Source) 39 Caused by: while parsing a block mapping 40 in 'reader', line 18, column 1: 41 storm.zookeeper.servers: 42 ^ 43 expected <block end>, but found BlockMappingStart 44 in 'reader', line 23, column 2: 45 nimbus.host: "master" 46 ^ 47 48 at org.yaml.snakeyaml.parser.ParserImpl$ParseBlockMappingKey.produce(ParserImpl.java:570) 49 at org.yaml.snakeyaml.parser.ParserImpl.peekEvent(ParserImpl.java:158) 50 at org.yaml.snakeyaml.parser.ParserImpl.checkEvent(ParserImpl.java:143) 51 at org.yaml.snakeyaml.composer.Composer.composeMappingNode(Composer.java:230) 52 at org.yaml.snakeyaml.composer.Composer.composeNode(Composer.java:159) 53 at org.yaml.snakeyaml.composer.Composer.composeDocument(Composer.java:122) 54 at org.yaml.snakeyaml.composer.Composer.getSingleNode(Composer.java:105) 55 at org.yaml.snakeyaml.constructor.BaseConstructor.getSingleData(BaseConstructor.java:120) 56 at org.yaml.snakeyaml.Yaml.loadFromReader(Yaml.java:481) 57 at org.yaml.snakeyaml.Yaml.load(Yaml.java:424) 58 at backtype.storm.utils.Utils.findAndReadConfigFile(Utils.java:141) 59 at backtype.storm.utils.Utils.readStormConfig(Utils.java:188) 60 at backtype.storm.utils.Utils.<clinit>(Utils.java:71) 61 ... 36 more 62 Exception in thread "main" java.lang.ExceptionInInitializerError 63 at java.lang.Class.forName0(Native Method) 64 at java.lang.Class.forName(Class.java:190) 65 at backtype.storm.config$loading__4910__auto__.invoke(config.clj:17) 66 at backtype.storm.config__init.load(Unknown Source) 67 at backtype.storm.config__init.<clinit>(Unknown Source) 68 at java.lang.Class.forName0(Native Method) 69 at java.lang.Class.forName(Class.java:270) 70 at clojure.lang.RT.loadClassForName(RT.java:2098) 71 at clojure.lang.RT.load(RT.java:430) 72 at clojure.lang.RT.load(RT.java:411) 73 at clojure.core$load$fn__5018.invoke(core.clj:5530) 74 at clojure.core$load.doInvoke(core.clj:5529) 75 at clojure.lang.RestFn.invoke(RestFn.java:408) 76 at clojure.core$load_one.invoke(core.clj:5336) 77 at clojure.core$load_lib$fn__4967.invoke(core.clj:5375) 78 at clojure.core$load_lib.doInvoke(core.clj:5374) 79 at clojure.lang.RestFn.applyTo(RestFn.java:142) 80 at clojure.core$apply.invoke(core.clj:619) 81 at clojure.core$load_libs.doInvoke(core.clj:5417) 82 at clojure.lang.RestFn.applyTo(RestFn.java:137) 83 at clojure.core$apply.invoke(core.clj:621) 84 at clojure.core$use.doInvoke(core.clj:5507) 85 at clojure.lang.RestFn.invoke(RestFn.java:408) 86 at backtype.storm.command.config_value$loading__4910__auto__.invoke(config_value.clj:16) 87 at backtype.storm.command.config_value__init.load(Unknown Source) 88 at backtype.storm.command.config_value__init.<clinit>(Unknown Source) 89 at java.lang.Class.forName0(Native Method) 90 at java.lang.Class.forName(Class.java:270) 91 at clojure.lang.RT.loadClassForName(RT.java:2098) 92 at clojure.lang.RT.load(RT.java:430) 93 at clojure.lang.RT.load(RT.java:411) 94 at clojure.core$load$fn__5018.invoke(core.clj:5530) 95 at clojure.core$load.doInvoke(core.clj:5529) 96 at clojure.lang.RestFn.invoke(RestFn.java:408) 97 at clojure.lang.Var.invoke(Var.java:415) 98 at backtype.storm.command.config_value.<clinit>(Unknown Source) 99 Caused by: while parsing a block mapping 100 in 'reader', line 18, column 1: 101 storm.zookeeper.servers: 102 ^ 103 expected <block end>, but found BlockMappingStart 104 in 'reader', line 23, column 2: 105 nimbus.host: "master" 106 ^ 107 108 at org.yaml.snakeyaml.parser.ParserImpl$ParseBlockMappingKey.produce(ParserImpl.java:570) 109 at org.yaml.snakeyaml.parser.ParserImpl.peekEvent(ParserImpl.java:158) 110 at org.yaml.snakeyaml.parser.ParserImpl.checkEvent(ParserImpl.java:143) 111 at org.yaml.snakeyaml.composer.Composer.composeMappingNode(Composer.java:230) 112 at org.yaml.snakeyaml.composer.Composer.composeNode(Composer.java:159) 113 at org.yaml.snakeyaml.composer.Composer.composeDocument(Composer.java:122) 114 at org.yaml.snakeyaml.composer.Composer.getSingleNode(Composer.java:105) 115 at org.yaml.snakeyaml.constructor.BaseConstructor.getSingleData(BaseConstructor.java:120) 116 at org.yaml.snakeyaml.Yaml.loadFromReader(Yaml.java:481) 117 at org.yaml.snakeyaml.Yaml.load(Yaml.java:424) 118 at backtype.storm.utils.Utils.findAndReadConfigFile(Utils.java:141) 119 at backtype.storm.utils.Utils.readStormConfig(Utils.java:188) 120 at backtype.storm.utils.Utils.<clinit>(Utils.java:71) 121 ... 36 more 122 Exception in thread "main" java.lang.ExceptionInInitializerError 123 at java.lang.Class.forName0(Native Method) 124 at java.lang.Class.forName(Class.java:190) 125 at backtype.storm.config$loading__4910__auto__.invoke(config.clj:17) 126 at backtype.storm.config__init.load(Unknown Source) 127 at backtype.storm.config__init.<clinit>(Unknown Source) 128 at java.lang.Class.forName0(Native Method) 129 at java.lang.Class.forName(Class.java:270) 130 at clojure.lang.RT.loadClassForName(RT.java:2098) 131 at clojure.lang.RT.load(RT.java:430) 132 at clojure.lang.RT.load(RT.java:411) 133 at clojure.core$load$fn__5018.invoke(core.clj:5530) 134 at clojure.core$load.doInvoke(core.clj:5529) 135 at clojure.lang.RestFn.invoke(RestFn.java:408) 136 at clojure.core$load_one.invoke(core.clj:5336) 137 at clojure.core$load_lib$fn__4967.invoke(core.clj:5375) 138 at clojure.core$load_lib.doInvoke(core.clj:5374) 139 at clojure.lang.RestFn.applyTo(RestFn.java:142) 140 at clojure.core$apply.invoke(core.clj:619) 141 at clojure.core$load_libs.doInvoke(core.clj:5417) 142 at clojure.lang.RestFn.applyTo(RestFn.java:137) 143 at clojure.core$apply.invoke(core.clj:621) 144 at clojure.core$use.doInvoke(core.clj:5507) 145 at clojure.lang.RestFn.invoke(RestFn.java:408) 146 at backtype.storm.command.config_value$loading__4910__auto__.invoke(config_value.clj:16) 147 at backtype.storm.command.config_value__init.load(Unknown Source) 148 at backtype.storm.command.config_value__init.<clinit>(Unknown Source) 149 at java.lang.Class.forName0(Native Method) 150 at java.lang.Class.forName(Class.java:270) 151 at clojure.lang.RT.loadClassForName(RT.java:2098) 152 at clojure.lang.RT.load(RT.java:430) 153 at clojure.lang.RT.load(RT.java:411) 154 at clojure.core$load$fn__5018.invoke(core.clj:5530) 155 at clojure.core$load.doInvoke(core.clj:5529) 156 at clojure.lang.RestFn.invoke(RestFn.java:408) 157 at clojure.lang.Var.invoke(Var.java:415) 158 at backtype.storm.command.config_value.<clinit>(Unknown Source) 159 Caused by: while parsing a block mapping 160 in 'reader', line 18, column 1: 161 storm.zookeeper.servers: 162 ^ 163 expected <block end>, but found BlockMappingStart 164 in 'reader', line 23, column 2: 165 nimbus.host: "master" 166 ^ 167 168 at org.yaml.snakeyaml.parser.ParserImpl$ParseBlockMappingKey.produce(ParserImpl.java:570) 169 at org.yaml.snakeyaml.parser.ParserImpl.peekEvent(ParserImpl.java:158) 170 at org.yaml.snakeyaml.parser.ParserImpl.checkEvent(ParserImpl.java:143) 171 at org.yaml.snakeyaml.composer.Composer.composeMappingNode(Composer.java:230) 172 at org.yaml.snakeyaml.composer.Composer.composeNode(Composer.java:159) 173 at org.yaml.snakeyaml.composer.Composer.composeDocument(Composer.java:122) 174 at org.yaml.snakeyaml.composer.Composer.getSingleNode(Composer.java:105) 175 at org.yaml.snakeyaml.constructor.BaseConstructor.getSingleData(BaseConstructor.java:120) 176 at org.yaml.snakeyaml.Yaml.loadFromReader(Yaml.java:481) 177 at org.yaml.snakeyaml.Yaml.load(Yaml.java:424) 178 at backtype.storm.utils.Utils.findAndReadConfigFile(Utils.java:141) 179 at backtype.storm.utils.Utils.readStormConfig(Utils.java:188) 180 at backtype.storm.utils.Utils.<clinit>(Utils.java:71) 181 ... 36 more 开始我用的apache-storm-0.9.2-incubating.tar.gz版本,使用如下官方demo,出现的问题是storm的ui的点击wordcount没有 Spouts (All time),Bolts (All time),Topology Visualization,Topology Configuration等等这些内容;解决方法,我是换的新版本解决问题: [root@slaver1 storm]# bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.6.jar storm.starter.WordCountTopology wordcount 待续......

优秀的个人博客,低调大师

一脸懵逼学习MapReduce的原理和编程(Map局部处理,Reduce汇总)和MapReduce几种运行方式

1:MapReduce的概述: (1):MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题. (2):MapReduce由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单。 (3):这两个函数的形参是key、value对,表示函数的输入信息。 2:MapReduce执行步骤: (1): map任务处理 (a):读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。 (b):写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。 (2)reduce任务处理 (a)在reduce之前,有一个shuffle的过程对多个map任务的输出进行合并、排序。 (b)写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。 (c)把reduce的输出保存到文件中。 例子:实现WordCountApp3:map、reduce键值对格式: 4:MapReduce流程: (1)代码编写 (2)作业配置 (3)提交作业 (4)初始化作业 (5)分配任务 (6)执行任务 (7)更新任务和状态 (8)完成作业 5:MapReduce介绍及wordcount和wordcount的编写和提交集群运行的案例: WcMap类进行单词的局部处理: 1 package com.mapreduce; 2 3 4 import java.io.IOException; 5 6 import org.apache.commons.lang.StringUtils; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Mapper; 10 11 /*** 12 * 13 * @author Administrator 14 * 1:4个泛型中,前两个是指定mapper输入数据的类型,KEYIN是输入的key的类型,VALUEIN是输入的value的值 15 * KEYOUT是输入的key的类型,VALUEOUT是输入的value的值 16 * 2:map和reduce的数据输入和输出都是以key-value的形式封装的。 17 * 3:默认情况下,框架传递给我们的mapper的输入数据中,key是要处理的文本中一行的起始偏移量,这一行的内容作为value 18 * 4:key-value数据是在网络中进行传递,节点和节点之间互相传递,在网络之间传输就需要序列化,但是jdk自己的序列化很冗余 19 * 所以使用hadoop自己封装的数据类型,而不要使用jdk自己封装的数据类型; 20 * Long--->LongWritable 21 * String--->Text 22 */ 23 public class WcMap extends Mapper<LongWritable, Text, Text, LongWritable>{ 24 25 //重写map这个方法 26 //mapreduce框架每读一行数据就调用一次该方法 27 @Override 28 protected void map(LongWritable key, Text value, Context context) 29 throws IOException, InterruptedException { 30 //具体业务逻辑就写在这个方法体中,而且我们业务要处理的数据已经被框架传递进来,在方法的参数中key-value 31 //key是这一行数据的起始偏移量,value是这一行的文本内容 32 33 //1:切分单词,首先拿到单词value的值,转化为String类型的 34 String str = value.toString(); 35 //2:切分单词,空格隔开,返回切分开的单词 36 String[] words = StringUtils.split(str," "); 37 //3:遍历这个单词数组,输出为key-value的格式,将单词发送给reduce 38 for(String word : words){ 39 //输出的key是Text类型的,value是LongWritable类型的 40 context.write(new Text(word), new LongWritable(1)); 41 } 42 43 44 } 45 } WcReduce进行单词的计数处理: 1 package com.mapreduce; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.LongWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Reducer; 8 9 /*** 10 * 11 * @author Administrator 12 * 1:reduce的四个参数,第一个key-value是map的输出作为reduce的输入,第二个key-value是输出单词和次数,所以 13 * 是Text,LongWritable的格式; 14 */ 15 public class WcReduce extends Reducer<Text, LongWritable, Text, LongWritable>{ 16 17 //继承Reducer之后重写reduce方法 18 //第一个参数是key,第二个参数是集合。 19 //框架在map处理完成之后,将所有key-value对缓存起来,进行分组,然后传递一个组<key,valus{}>,调用一次reduce方法 20 //<hello,{1,1,1,1,1,1.....}> 21 @Override 22 protected void reduce(Text key, Iterable<LongWritable> values,Context context) 23 throws IOException, InterruptedException { 24 //将values进行累加操作,进行计数 25 long count = 0; 26 //遍历value的list,进行累加求和 27 for(LongWritable value : values){ 28 29 count += value.get(); 30 } 31 32 //输出这一个单词的统计结果 33 //输出放到hdfs的某一个目录上面,输入也是在hdfs的某一个目录 34 context.write(key, new LongWritable(count)); 35 } 36 37 38 } WcRunner用来描述一个特定的作业 1 package com.mapreduce; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 12 13 14 /*** 15 * 1:用来描述一个特定的作业 16 * 比如,该作业使用哪个类作为逻辑处理中的map,那个作为reduce 17 * 2:还可以指定该作业要处理的数据所在的路径 18 * 还可以指定改作业输出的结果放到哪个路径 19 * @author Administrator 20 * 21 */ 22 public class WcRunner { 23 24 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 25 //创建配置文件 26 Configuration conf = new Configuration(); 27 //获取一个作业 28 Job job = Job.getInstance(conf); 29 30 //设置整个job所用的那些类在哪个jar包 31 job.setJarByClass(WcRunner.class); 32 33 //本job使用的mapper和reducer的类 34 job.setMapperClass(WcMap.class); 35 job.setReducerClass(WcReduce.class); 36 37 //指定reduce的输出数据key-value类型 38 job.setOutputKeyClass(Text.class); 39 job.setOutputValueClass(LongWritable.class); 40 41 42 //指定mapper的输出数据key-value类型 43 job.setMapOutputKeyClass(Text.class); 44 job.setMapOutputValueClass(LongWritable.class); 45 46 //指定要处理的输入数据存放路径 47 FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000/wc/srcdata")); 48 49 //指定处理结果的输出数据存放路径 50 FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/wc/output")); 51 52 //将job提交给集群运行 53 job.waitForCompletion(true); 54 } 55 56 } 书写好上面的三个类以后打成jar包上传到虚拟机上面进行运行: 然后启动你的hadoop集群:start-dfs.sh和start-yarn.sh启动集群;然后将jar分发到节点上面进行运行; 之前先造一些数据,如下所示: 内容自己随便搞吧: 然后上传到hadoop集群上面,首选创建目录,存放测试数据,将数据上传到创建的目录即可;但是输出目录不需要手动创建,会自动创建,自己创建会报错: 然后将jar分发到节点上面进行运行;命令格式如hadoop jar 自己的jar包 主类的路径 正常性运行完过后可以查看一下运行的效果: 6:MapReduce的本地模式运行如下所示(本地运行需要修改输入数据存放路径和输出数据存放路径): 1 package com.mapreduce; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 12 13 14 /*** 15 * 1:用来描述一个特定的作业 16 * 比如,该作业使用哪个类作为逻辑处理中的map,那个作为reduce 17 * 2:还可以指定该作业要处理的数据所在的路径 18 * 还可以指定改作业输出的结果放到哪个路径 19 * @author Administrator 20 * 21 */ 22 public class WcRunner { 23 24 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 25 //创建配置文件 26 Configuration conf = new Configuration(); 27 //获取一个作业 28 Job job = Job.getInstance(conf); 29 30 //设置整个job所用的那些类在哪个jar包 31 job.setJarByClass(WcRunner.class); 32 33 //本job使用的mapper和reducer的类 34 job.setMapperClass(WcMap.class); 35 job.setReducerClass(WcReduce.class); 36 37 //指定reduce的输出数据key-value类型 38 job.setOutputKeyClass(Text.class); 39 job.setOutputValueClass(LongWritable.class); 40 41 42 //指定mapper的输出数据key-value类型 43 job.setMapOutputKeyClass(Text.class); 44 job.setMapOutputValueClass(LongWritable.class); 45 46 //指定要处理的输入数据存放路径 47 //FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000/wc/srcdata/")); 48 FileInputFormat.setInputPaths(job, new Path("d:/wc/srcdata/")); 49 50 51 //指定处理结果的输出数据存放路径 52 //FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/wc/output/")); 53 FileOutputFormat.setOutputPath(job, new Path("d:/wc/output/")); 54 55 56 //将job提交给集群运行 57 job.waitForCompletion(true); 58 } 59 60 } 然后去自己定义的盘里面创建文件夹即可: 然后直接运行出现下面的错误: log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).log4j:WARN Please initialize the log4j system properly.log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.Exception in thread "main" java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses. at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:120) at org.apache.hadoop.mapreduce.Cluster.<init>(Cluster.java:82) at org.apache.hadoop.mapreduce.Cluster.<init>(Cluster.java:75) at org.apache.hadoop.mapreduce.Job$9.run(Job.java:1255) at org.apache.hadoop.mapreduce.Job$9.run(Job.java:1251) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) at org.apache.hadoop.mapreduce.Job.connect(Job.java:1250) at org.apache.hadoop.mapreduce.Job.submit(Job.java:1279) at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1303) at com.mapreduce.WcRunner.main(WcRunner.java:57) 解决办法: 缺少Jar包:hadoop-mapreduce-client-common-2.2.0.jar 好吧,最后还是没有实现在本地运行此运行,先在这里记一下吧。下面这个错搞不定,先做下笔记吧; log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).log4j:WARN Please initialize the log4j system properly.log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.Exception in thread "main" java.lang.IllegalArgumentException: Pathname /c:/wc/output from hdfs://master:9000/c:/wc/output is not a valid DFS filename. at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:194) at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:102) at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1124) at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1120) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1120) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1398) at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:145) at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:458) at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:343) at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1285) at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1282) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) at org.apache.hadoop.mapreduce.Job.submit(Job.java:1282) at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1303) at com.mapreduce.WcRunner.main(WcRunner.java:57) 7:MapReduce程序的几种提交运行模式: 本地模型运行1:在windows的eclipse里面直接运行main方法,就会将job提交给本地执行器localjobrunner执行 ----输入输出数据可以放在本地路径下(c:/wc/srcdata/) ----输入输出数据也可以放在hdfs中(hdfs://master:9000/wc/srcdata)2:在linux的eclipse里面直接运行main方法,但是不要添加yarn相关的配置,也会提交给localjobrunner执行 ----输入输出数据可以放在本地路径下(/home/hadoop/wc/srcdata/) ----输入输出数据也可以放在hdfs中(hdfs://master:9000/wc/srcdata) 集群模式运行1:将工程打成jar包,上传到服务器,然后用hadoop命令提交 hadoop jar wc.jar cn.itcast.hadoop.mr.wordcount.WCRunner2:在linux的eclipse中直接运行main方法,也可以提交到集群中去运行,但是,必须采取以下措施: ----在工程src目录下加入 mapred-site.xml 和 yarn-site.xml ----将工程打成jar包(wc.jar),同时在main方法中添加一个conf的配置参数 conf.set("mapreduce.job.jar","wc.jar"); 3:在windows的eclipse中直接运行main方法,也可以提交给集群中运行,但是因为平台不兼容,需要做很多的设置修改 ----要在windows中存放一份hadoop的安装包(解压好的) ----要将其中的lib和bin目录替换成根据你的windows版本重新编译出的文件 ----再要配置系统环境变量 HADOOP_HOME 和 PATH ----修改YarnRunner这个类的源码

优秀的个人博客,低调大师

徐立:1200层神经网络夺冠ImageNet,深度学习越深越好?| 新智元 AI 领军人物专访

人工智能领域的创业浪潮中,计算机视觉技术(CV)可以说是一个较为火热的方向,呈遍地开花之势。在这片江湖中,有四家公司特别突出,有CV领域的“一桶筐汤”之称,可以看成是具有巨大潜力的“四小龙”。 其中,“汤”在这里指的就是创建于2014年的商汤科技开发有限公司。其他三家“一”指创立于2016年的依图科技。“桶”指的是2014年的“格灵深瞳”,“筐”指的是成立于2016年的旷世科技(Face++)。这四家公司最新公布的融资都都超过了数千万美元,其中有两家融资额超过1亿美元,有公司已经走到C轮。 2017年1月,新智元启动“寻找AI独角兽”创业大赛,比赛评出了2016年的年度创业家,凭借2016年在融资、技术和商业化应用上的亮眼表现,商汤科技CEO 徐立获得专家评审的一致赞赏,高票当选。在即将于2017年3月27日举办的“2017新智

资源下载

更多资源
优质分享App

优质分享App

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

腾讯云软件源

腾讯云软件源

为解决软件依赖安装时官方源访问速度慢的问题,腾讯云为一些软件搭建了缓存服务。您可以通过使用腾讯云软件源站来提升依赖包的安装速度。为了方便用户自由搭建服务架构,目前腾讯云软件源站支持公网访问和内网访问。

Spring

Spring

Spring框架(Spring Framework)是由Rod Johnson于2002年提出的开源Java企业级应用框架,旨在通过使用JavaBean替代传统EJB实现方式降低企业级编程开发的复杂性。该框架基于简单性、可测试性和松耦合性设计理念,提供核心容器、应用上下文、数据访问集成等模块,支持整合Hibernate、Struts等第三方框架,其适用范围不仅限于服务器端开发,绝大多数Java应用均可从中受益。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。

用户登录
用户注册