首页 文章 精选 留言 我的

精选列表

搜索[学习],共10000篇文章
优秀的个人博客,低调大师

Hbase 学习(三)Coprocessors

Coprocessors 之前我们的filter都是在客户端定义,然后传到服务端去执行的,这个Coprocessors是在服务端定义,在客户端调用,然后在服务端执行,他有点儿想我们熟悉的存储过程,传一些参数进去,然后进行我们事先定义好的操作,我们常常用它来做一些比如二次索引啊,统计函数什么的,它也和自定义filter一样,需要事先定好,然后在hbase-env.sh中的HBASE_CLASSPATH中指明,就像我的上一篇中的写的那样。 Coprocessors分两种,observer和endpoint。 (1)observer就像触发器一样,当某个事件发生的时候,它就出发。 已经有一些内置的接口让我们去实现,RegionObserver、MasterObserver、WALObserver,看名字就大概知道他们是干嘛的。 (2)endpoint可以认为是自定义函数,可以把这个理解为关系数据库的存储过程。 所有的Coprocessor都是实现自Coprocessor 接口,它分SYSTEM和USER,前者的优先级比后者的优先级高,先执行。 它有两个方法,start和stop方法,两个方法都有一个相同的上下文对象CoprocessorEnvironment。 void start(CoprocessorEnvironment env) throws IOException; void stop(CoprocessorEnvironment env) throws IOException; 这是CoprocessorEnvironment的方法。 Working with Tables 对表进行操作的时候,必须先调用getTable方法活得HTable,不可以自己定义一个HTable,目前貌似没有禁止,但是将来会禁止。 并且在对表操作的时候,不能对行加锁。 Coprocessor Loading Coprocessor加载需要在配置文件里面全局加载,比如在hbase-site.xml中设置。 <property> <name>hbase.coprocessor.region.classes</name> <value>coprocessor.RegionObserverExample,coprocessor.AnotherCoprocessor</value> </property> <property> <name>hbase.coprocessor.master.classes</name> <value>coprocessor.MasterObserverExample</value> </property> <property> <name>hbase.coprocessor.wal.classes</name> <value>coprocessor.WALObserverExample,bar.foo.MyWALObserver</value> </property> 我们自定义的时间可以注册到三个配置项上,分别是hbase.coprocessor.region.classes,hbase.coprocessor.master.classes, hbase.coprocessor.wal.classes上,他们分别负责region,master,wal,注册到region的要特别注意小心,因为它是针对所有表的。 <property> <name>hbase.coprocessor.region.classes</name> <value>coprocessor.RegionObserverExample</value> </property> 注册到这三个触发器上,可以监控到几乎所有我们的操作上面,非常恐怖。。可以说是想要什么就有什么,详细的代码大家自己去摸索。 EndPoint的可以用来定义聚合函数,我们可以调用CoprocessorProtocol中的方法来实现我们的需求。 调用coprocessorProxy() 传一个单独的row key,这是在单独一个region上操作的。 要在所有region上面操作,我们要调用coprocessorExec()方法 传一个开始row key 和结束row key。 Demo 说了那么多废话,我都不好意思再说了,来个例子吧,统计行数的。 public interface RowCountProtocol extends CoprocessorProtocol { long getRowCount() throws IOException; long getRowCount(Filter filter) throws IOException; long getKeyValueCount() throws IOException; } public class RowCountEndpoint extends BaseEndpointCoprocessor implements RowCountProtocol { private long getCount(Filter filter, boolean countKeyValues) throws IOException { Scan scan = new Scan(); scan.setMaxVersions(1); if (filter != null) { scan.setFilter(filter); } RegionCoprocessorEnvironment environment = (RegionCoprocessorEnvironment) getEnvironment(); // use an internal scanner to perform scanning. InternalScanner scanner = environment.getRegion().getScanner(scan); int result = 0; try { List<KeyValue> curVals = new ArrayList<KeyValue>(); boolean done = false; do { curVals.clear(); done = scanner.next(curVals); result += countKeyValues ? curVals.size() : 1; } while (done); } finally { scanner.close(); } return result; } @Override public long getRowCount() throws IOException { return getRowCount(new FirstKeyOnlyFilter()); } @Override public long getRowCount(Filter filter) throws IOException { return getCount(filter, false); } @Override public long getKeyValueCount() throws IOException { return getCount(null, true); } }写完之后,注册一下吧。 <property> <name>hbase.coprocessor.region.classes</name> <value>coprocessor.RowCountEndpoint</value> </property> JAVA 客户端调用 在服务端定义之后,我们怎么在客户端用java代码调用呢,看下面的例子你就明白啦! public class EndPointExample { public static void main(String[] args) throws IOException { Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "testtable"); try { Map<byte[], Long> results = table.coprocessorExec( RowCountProtocol.class, null, null, new Batch.Call<RowCountProtocol, Long>() { @Override public Long call(RowCountProtocol counter) throws IOException { return counter.getRowCount(); } }); long total = 0; for (Map.Entry<byte[], Long> entry : results.entrySet()) { total += entry.getValue().longValue(); System.out.println("Region: " + Bytes.toString(entry.getKey()) + ", Count: " + entry.getValue()); } System.out.println("Total Count: " + total); } catch (Throwable throwable) { throwable.printStackTrace(); } } } 通过table的coprocessorExec方法调用,然后调用RowCountProtocol接口的getRowCount()方法。 然后遍历每个Region返回的结果,合起来就是最终的结果,打印结果如下。 Region: testtable,,1303417572005.51f9e2251c29ccb2...cbcb0c66858f., Count: 2 Region: testtable,row3,1303417572005.7f3df4dcba3f...dbc99fce5d87., Count: 3 Total Count: 5在上面的例子当中,我们是用Batch.Call()方法来调用接口当中的方法,我们可以用另外一个方法来简化上述代码,来看例子。 Batch.Call call =Batch.forMethod(RowCountProtocol.class,"getKeyValueCount"); Map<byte[], Long> results = table.coprocessorExec(RowCountProtocol.class, null, null, call); 采用Batch.Call方法调用同时调用多个方法 Map<byte[], Pair<Long, Long>> results =table.coprocessorExec( RowCountProtocol.class, null, null, new Batch.Call<RowCountProtocol, Pair<Long, Long>>() { public Pair<Long, Long> call(RowCountProtocol counter) throws IOException { return new Pair(counter.getRowCount(),counter.getKeyValueCount()); } }); long totalRows = 0; long totalKeyValues = 0; for (Map.Entry<byte[], Pair<Long, Long>> entry :results.entrySet()) { totalRows += entry.getValue().getFirst().longValue(); totalKeyValues +=entry.getValue().getSecond().longValue(); System.out.println("Region: " +Bytes.toString(entry.getKey()) +", Count: " + entry.getValue()); } System.out.println("Total Row Count: " + totalRows); System.out.println("Total KeyValue Count: " +totalKeyValues); 调用coprocessorProxy()在单个region上执行 RowCountProtocol protocol = table.coprocessorProxy(RowCountProtocol.class, Bytes.toBytes("row4")); long rowsInRegion = protocol.getRowCount(); System.out.println("Region Row Count: " +rowsInRegion);上面这个例子是查找row4行所在region的数据条数,这个可以帮助我们统计每个region上面的数据分布。

优秀的个人博客,低调大师

Sqoop基础学习(1)

1. Sqoop的导入过程 在开始导入之前,Sqoop会通过JDBC来获得所需要的数据库元数据 1.导入表的列名、数据类型等; 2.接着这些数据库的数据类型(varchar、number等)会把映射成Java的数据类型(String、int等),根据这些信息,Sqoop会生成一个与表名同名的类用来完成反序列的工作,保持表中的每一行记录; 3.Sqoop启动MapReduce作业 4.启动的作业在input的过程中,会通过JDBC读取数据库表中的内容; 5.这是会使用Sqoop生成的类进行反序列话 6.最后再将这些记录写到HDFS中,在写入HDFS的过程中,同样会使用Sqoop生成的类进行序列化。 2. Sqoop的导出过程 1/2 Sqoop根据目标表的结构会生成一个Java类 3.该类作用为序列化和反序列化 4.接着启动一个MapReduce作业 5.在作业中会生成的Java类从HDFS中读取数据 6.并生成一批INSERT语句,每条语句都会向MySQL的目标表中插入多条记录

优秀的个人博客,低调大师

MapReduce设计模式学习

一:概要模式 1:简介 概要设计模式更接近简单的MR应用,因为基于键将数据分组是MR范型的核心功能,所有的键将被分组汇入reducer中 本章涉及的概要模式有数值概要(numerical summarization),倒排索引(inverted index),计数器计数(counting with counter) 2:概要设计模式包含 2.1:关于Combiner和paritioner combiner:reducer之前调用reducer函数,对数据进行聚合,极大的减少通过网络传输到reducer端的key/value数量,适用的条件是你可以任意的改变值的顺序,并且可以随意的将计算进行分组,同时需要注意的是一个combiner函数只对一个map函数有作用 partitioner:许多概要模式通过定制partitioner函数实现更优的将键值对分发到n个reducer中,着这样的需求场景会比较少,但如果任务的执行时间要求很高,数据量非常大,且存在数据倾斜的情况,定制partitioner将是非常有效的解决方案 源码分析请点击 编程实例请点击 2.2:数值概要模式 2.2.1:数值概要模式:计算数据聚合统计的一般性模式 2.2.2:数值概要应用的场景需要满足以下亮点: 1:要处理的数据是数值数据或者计数 2:数据可以按照某些特定的字段分组 2.2.3:适用场景: 1:单词计数 (可以使用combiner) 2:最大值/最小值/计数 (可以使用combiner) 3:平均值 (可以使用combiner,但必须做相应的处理,即迂回算法,举例如下) 给定用户的评论列表,按天计算每小时的评论长度 Map:context.write(1,tuple(1,1小时的平均长度)) reducer: 处理:sum += tulpe.gethour * tuple.getavrg count += tuple.gethour 输出: key=1 value=sum/count 4:中位数/标准差 2.3:倒排索引概要 适用场景:通常用在需要快速搜索查询响应的场景,可以对一个查询结果进行预处理并存储在一个数据库中 倒排索引实战1 倒排索引实战2 2.4:计数器计数 已知应用 统计记录数:简单的对指定时间段的记录数进行统计是很常见的,统计小数量级的唯一实例计数 汇总:用来执行对数据的某些字段进行汇总 二:过滤模式 1:简介 过滤模式也可以被认为是一种搜索形式,如果你对找出所有具备特定信息的记录感兴趣,就可以过滤掉不匹配搜索条件的其他记录,与大多数基础模式类似,过滤作为一种抽象模式为其他模式服务,过滤简单的对某一条记录进行评估,并基于某个条件作出判断,以确定当前这条记录是保留还是丢弃 2:适用场景 2.1:过滤, 使用过滤的唯一必要条件是数据可以被解析成记录,并可以通过非常特定的准则来确定它们是否需要保留,不需要reducer函数 近距离观察数据:准备一个特定的子集,子集中的记录有某些共同属性或者具备某些有趣的特性,需要进一步深入的分析。 跟踪某个事件的线索:从一个较大数据集中抽取一个连续事件作为线索来做案例研究。 分布式grep:通过一个正则表达式匹配每一行,输出满足条件的行 数据清理:数据有时是畸形的,不完整的 或者是格式错误的,过滤可以用于验证每一条数据是否满足记录,将不满足的数据删除 简单随机抽样:可以使用随机返回True or False的评估函数做过滤,可以通过调小true返回的概率实现对结果集合大小的控制 移除低分值数据:将不满足某个特定阀值的记录过滤出去 2.2:布隆过滤, 对每一条记录,抽取其中一个特征,如果抽取的特性是布隆过滤中所表示的值的集合成员,则保留记录 移除大多数不受监视的值:最直接的使用案例是清楚不感兴趣的值 对成本很高的集合成员资格检查做数据的预先过滤: 2.3:Top10,不管输入数据的大小是多少,你都可以精确的知道输出的结果的记录数 异类分析: 选取感兴趣的数据: 引人注目的指标面板: 2.4:去重,过滤掉数据集中的相似数据,找出唯一的集合 数据去重:代码举例 抽取重复值: 规避内连接的数据膨胀: 三:数据组织模式 1:分层结构模式 分层模式是从数据中创造出不同于原结构的新纪录 适用场景:数据源被外部链接,数据是结构化的并且是基于行的 <MultipleInputs类:用于指定多个Mapper任务进行不同格式文件的输入> 2:分区和分箱模式 分区:将记录进行分类(即分片,分区或者分箱),但他并不关心记录的顺序,目地是将数据集中相似的记录分成不同的,更小的数据集,在该模式下数据是通过自定义Map的分区器进行分区的。 分箱:是在不考虑记录顺序的情况下对记录进行分类,目的是将数据集中每条记录归档到一个或者多个举例 两者的不同之处在于分箱是在Map阶段对数据进行拆分,其好处是减少reduce的工作量,通常使资源分布更有效,缺点是每个mapper将为每个可能输出的箱子创建文件,对后续的分析十分不利 3:全排序和混排模式 全排序:关注的是数据从记录到记录的顺序,目的是能够按照指定的键进行并行排序。适用的范围是排序的键必须具有可比性只有这样数据才能被排序 混排序:关注记录在数据集中的顺序,目的是将一个给定的记录完全随机化 4:数据生成模式 四:连接模式 SQL连接模式包括内连接和外连接 eg:A表 B表 内连接:只连接两个表中都用的外键连接(eg 以ID作为连接键,只连接有相同ID) 外连接: 1:做外连接 以用户ID为外键的A+B做外连接 以A表为基准,A表数据全部显示,B表中不在A表中的ID显示为null 2:右外连接 和做外连接相反 3:全外连接 左外连接和右外连接的合并,有相同ID 的显示,没有相同ID的显示为NULL 反连接:全外连接减去内连接的结果 1:reduce端连接: 相当其他连接模式来讲用时最长,但是也是实现简单并且支持所有不同类型的操作 适用场景:1:多个大数据需要按一个外键做链接操作,如果除了一个数据集以外,其他所有的数据集都可以放入内存,可以尝试使用复制连接 2:你需要灵活的执行任意类型的连接操作 等效的SQL:Select user.id,user.location,comment.uprotes from user [inner | left | right] join comments on user.id=comments.userid 优化方案:可以使用布隆过滤器执行reduce端的连接 2:复制连接: 是一种特殊类型的连接操作,是在一个打的数据集和许多小的数据集之间通过MAP端执行的连接的操作,该模式完全消除了混排数据到reduce的需求 适用场景: 1:要执行的连接类型是由内连接或者左外连接,且大的输入数据集在连接操作符的“左边”时 2:除一个大的数据集外,所有的数据集都可以存入每个Map任务的内存中 性能分析:因为不需要reduce,因此在所有连接模式是最快的一种,代价是数据量,数据要能完全的储存在JVM中,这极大的受限于你愿意为每个Map和reduce分配多少内存 3:组合连接: 是一种非常特殊的连接操作,他可以在map端对许多非常大的格式化输入做连接,需要预先组织好的或者是使用特定的方式预处理过的,即在使用这个类型的连接操作之前,必须按照外键对数据集进行排序个分区,并以一种非常特殊的方式读入数据集 Hadoop通过CompositeInputFormat来支持组合连接方式 仅适用于内连接和全外连,每一个mapper的输入都需要按照指定的方式做分区和排序,对于每一个输入数据集都要分成相同数目的分区,此外,对应于某个特定的外链所做的所有记录必须处于同一分区中 通常情况下这发生在几个作业的输出有相同数量的reducer和相同的外键,并且输出文件是不可拆分的即不大于一个hdfs文件快的大小或是gzip压缩的 适用场景: 1:需要执行的是内连接或者全外连接 2:所有的数据集都足够大 3:所有的数据集都可以用相同的外键当mapper的输入键读取 4:所有的数据集有相同的数据的分区 5:数据集不会经常改变 6:每一个分区都是按照外键排序的,并且所有的外键都出现在关联分区的每个数据集中 4:笛卡尔积: 是一种有效的将多个输入源的灭一个记录跟所有其他记录配对的方式 适用场景: 1:需要分析各个记录的所有配对之间的关系 2:没有其他方法可以解决这个问题 3:对执行时间没有限制 等效的SQL:SELECT * FROM t1,t2 等效的PIG:CROSS a,b; 五:元模式 关于模式的模式 1:作业链 针对MapReduce处理小的文件时,优化的办法是可以在作业中始终执行CombineFileInputFormat加载间歇性的输出,在进入mapper处理之前,CombineFileInputFormat会将小的块组合在一起形成较大的输入split当执行做个作业的作业链时,可以使用job.submit方法代替job.waitForCompletion()来并行的启动多个作业,调用submit方法后会立即返回至当前线程,而作业在后台运行,该方法允许一次执行多个任务,job.isComplete()是检查一个作业是否完成的非阻塞方法,该方法可以通过不断轮询的方式判断所有作业是否完成如果检测到一个依赖的作业失败了,此时你应该退出整个作业链,而不是试图让他继续示例:(1)基本作业(2)并行作业链(3)关于Shelll脚本(4)关于JobControl 2:链折叠 链折叠是应用于MapReduce作业链的一种优化方法,基本上他是一个经验法则,即每一条记录都可以提交至多个mapper或者一个reducer,然后再交给一个mapper这种合并处理能够减少很多读取文件和传输数据的时间,作业链的这种结构使得这种方法是可行的,因为map阶段是完全无法共享的,因此map并不关心数据的组织形式和或者数据有没有分组,在构建大的作业链时,通过将作业链折叠,使得map阶段合并起来带来很大的性能提升链折叠的主要优点是减少mapreduce管道中移动的数据量作业链中有许多模式,可以通过下面介绍的这些方法来查找和确认哪些可以折叠(1)看看作业链的map阶段,如果多个map阶段是相邻的,将他们合并到一个阶段(2)如果作业链是以map阶段结束,将这个阶段移动到前一个reducer里边,他除去了写临时数据的IO操作,然后在reduce中执行只有map的作业,这同一也能减少任务启动的开销(3)注意,作业链的第一个map阶段无法 从下一个优化中获益,尽可能的在减少数据量(如过滤)的操作和增加数据量(如丰富)的操作之间拆分每个map阶段(合并或者其他)注意:(1)合并阶段需要大量的内存,例如将5个复制连接合并在一起可能不是一个好的选择,因为他将可能超过任务可用的总内存,在这些情况下,最好将这些操作分开(2)不管一个作业是不是作业链,都要尽早尽可能的去过滤掉更多的数据,mr作业开销最大的部分通常都是管道推送数据:加载数据,混排/排序阶段,以及存储数据实现折叠链有两种主要方法:(1)手动裁剪然后将代码粘贴在一起(2)使用特殊类ChainMapper和ChainReducer(特殊类介绍参考:http://www.iteye.com/topic/1134144) 3:作业归并 和作业链折叠一样,作业归并是另一种减少MR管道IO管道的优化方法,通过作业归并可以使得加载同一份数据的两个不相关作业共享MR管道,作业归并最主要的优点是数据只需要加载和解析一次。先决条件是:两个作业必须有相同的中间键和输出格式,因为他们将共享管道,因而需要使用相同的数据类型,如果这的确是一个问题的话,可以使用序列化或者多态,但会增加复制度作业归并步骤如下:(1)将两个mapper代码放在一起(2)在mapper中生成键和值时,需要用标签加以标记,以区别map源(3)在reducer中,在解析出标签后使用if语句切换到相应的reducer代码中去执行(4)使用multipleOutputs将作业的输出分来 六:输入输出模式 自定义输入与输出 在hadoop自定义输入和输出 Hadoop允许用户修改从磁盘加载数据的方式,修改方式有两种: 1:配置如何根据HDFS的块生成连续的输入分块,配置记录在map阶段如何实现。 为此将要用到的两个类即,RecordReader和InputFormat 2:hadoop也允许用户通过类似的方式修改数据的存储形式 通过OutputFormat和RecordWriter实现 1:生成数据 这个模式下是只有Map的 (1)InputFormat凭空创建split (2)RecordReader读入虚的split并根据他生成随机记录 (3)某些情况下,能够在split中赋予一些信息,告诉recordreader生成什么 (4)通常情况下,IdentityMap仅仅将读入的数据原样输出 2:外部源输出 外部源输出详细描述:在作业提交之前,OutputFormat将验证作业配置中指定的输出规范。RecordReader负责将所有的键值对写入外部源 性能分析:必须小心数据接收者能否处理并行连接。有1000个任务将数据写入到单个SQL数据库中,者=这工作起来并不好,为避免这种情况你可能不得不让每个reducer多处理一些数据以减少写入到数据接收者的并行度,如果数据接收者支持并行写入,那么这未必是个问题。 3:外部源输入 在MapReduce中数据是以并行的方式加载而不是以串行的方式,为了能够大规模的读取数据,源需要有定义良好的边界 MR实现该模式的瓶颈将是数据源或网络,数据源对于多连接可能不具很好的扩展性,同时给定的数据源可能与MR集群的网络不在同一个网络环境下 4:分区裁剪 分区裁剪模式将通过配置决定框架如何选取输入split以及如何基于文件名过滤加载到MR作业的文件 描述:分区裁剪模式是在InputFormat中实现的,其中getsplit方法是我们需要特别注意的,因为他确定了要创建的输入split,进而确定map任务的个数, RecordReader的实现依赖于数据是如何存储的

优秀的个人博客,低调大师

【整理】Hadoop学习资料

Hadoop视频资料: http://pan.baidu.com/s/1o7YPdTC 注:有些视频是转载的,没有解压密码!so sorry!!! Hadoop运维全纪录: http://blog.51cto.com/zt/505 加入群【clouderahadoop】:258669058不定期发布资料的word文档、个人在工作中的遇见的bug和心得! Hadoop 的调优和运维对于 Hadoop 来说是很重要的一个环节。对于大规模数据集来说更是如此。 性能调优对于 Hadoop 来说无异于打通“任督二脉”,对于 Hadoop 的计算能力会有质的提升, 而运维之于 Hadoop 来说,就好像"金钟罩,铁布衫"一般,有了稳定的运维, Hadoop 才能在海量数据中大展拳脚,两者相辅相成,缺一不可。

优秀的个人博客,低调大师

storm系统架构学习

Storm架构如下图所示: 1、主控节点(Master Node) 运行Storm nimbus后台服务的节点(Nimbus),它是storm系统的中心,负责接收用户提交的作业(如同spark submit一样 即为jar包形式保存的topology代码),通过Zookeeper向每个工作节点分配处理任务(有进程级的也有线程级别的) 2、工作节点(Work Node) 运行Storm supervisor后台服务的节点。用来监听nimbus分配的任务并下载作业副本,启动、暂停或撤销任务的工作进程及其线程。其中工作进程执行指定topology的子集,而同一个topology可以由多个工作进程完成;一个工作进程由多个工作线程组成,工作线程是spout/bolt的运行时实例,数量是由spout/bolt的数目及其配置确定。 3、控制台节点(Web console Node) 运行storm UI后台服务的节点。实际上是一个Web服务器,在指定端口提供页面服务。用户可以通过使用浏览器访问控制台节点的Web页面,提交、暂停和撤销作业,也可以以只读的形式获取系统配置、作业及各个组件的运行时状态。(如果需要实现作业的管理,Storm UI须和Storm nimbus部署在同一台机器上,UI进程会检查本机是否存在nimbus的连接,若不存在可导致UI部分功能无法正常工作.) 4、协调节点(Coordinate Node) 运行Zookeeper进程的节点,numbus和supervisor之间所有的协调,包括分布式状态维护和分布式配置管理,都是通过该协调节点实现的。 作业提交: 1、首先,如同spark-submit执行一样,将作业达成jar包,通过Storm的客户端命令或者控制台节点的Web接口,提交至Storm系统的主控节点。 2、主控节点根据系统的全局配置和作业中的局部配置,将接受的代码分发至调度的工作节点。 3、工作节点下载来自主控节点的代码包,并根据主控节点的调度生成相关的工作进程和线程。

优秀的个人博客,低调大师

Hadoop MapReduce编程学习

一直在搞spark,也没时间弄hadoop,不过Hadoop基本的编程我觉得我还是要会吧,看到一篇不错的文章,不过应该应用于hadoop2.0以前,因为代码中有 conf.set("mapred.job.tracker","192.168.1.2:9001");新框架中已改为 Yarn-site.xml 中的 resouceManager 及 nodeManager 具体配置项,新框架中历史 job 的查询已从 Job tracker 剥离,归入单独的mapreduce.jobtracker.jobhistory 相关配置.mapred.job.tracker的主要用途在于合并map之后的中间文件,就如同spark的repatition函数吧,为了防止接下来shuffle所造成的RDD过多,合并下~ 转自:http://www.cnblogs.com/xia520pi/archive/2012/06/04/2534533.html 1、数据去重 "数据去重"主要是为了掌握和利用并行化思想来对数据进行有意义的筛选。统计大数据集上的数据种类个数、从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据去重。下面就进入这个实例的MapReduce程序设计。 1.1 实例描述 对数据文件中的数据进行去重。数据文件中的每行都是一个数据。 样例输入如下所示: 1)file1: 2012-3-1 a 2012-3-2 b 2012-3-3 c 2012-3-4 d 2012-3-5 a 2012-3-6 b 2012-3-7 c 2012-3-3 c 2)file2: 2012-3-1 b 2012-3-2 a 2012-3-3 b 2012-3-4 d 2012-3-5 a 2012-3-6 c 2012-3-7 d 2012-3-3 c 样例输出如下所示: 2012-3-1 a 2012-3-1 b 2012-3-2 a 2012-3-2 b 2012-3-3 b 2012-3-3 c 2012-3-4 d 2012-3-5 a 2012-3-6 b 2012-3-6 c 2012-3-7 c 2012-3-7 d 1.2 设计思路 数据去重的最终目标是让原始数据中出现次数超过一次的数据在输出文件中只出现一次。我们自然而然会想到将同一个数据的所有记录都交给一台reduce机器,无论这个数据出现多少次,只要在最终结果中输出一次就可以了。具体就是reduce的输入应该以数据作为key,而对value-list则没有要求。当reduce接收到一个<key,value-list>时就直接将key复制到输出的key中,并将value设置成空值。 在MapReduce流程中,map的输出<key,value>经过shuffle过程聚集成<key,value-list>后会交给reduce。所以从设计好的reduce输入可以反推出map的输出key应为数据,value任意。继续反推,map输出数据的key为数据,而在这个实例中每个数据代表输入文件中的一行内容,所以map阶段要完成的任务就是在采用Hadoop默认的作业输入方式之后,将value设置为key,并直接输出(输出中的value任意)。map中的结果经过shuffle过程之后交给reduce。reduce阶段不会管每个key有多少个value,它直接将输入的key复制为输出的key,并输出就可以了(输出中的value被设置成空了)。 1.3 程序代码 程序代码如下所示: packagecom.hebut.mr; importjava.io.IOException; importorg.apache.hadoop.conf.Configuration; importorg.apache.hadoop.fs.Path; importorg.apache.hadoop.io.IntWritable; importorg.apache.hadoop.io.Text; importorg.apache.hadoop.mapreduce.Job; importorg.apache.hadoop.mapreduce.Mapper; importorg.apache.hadoop.mapreduce.Reducer; importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat; importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat; importorg.apache.hadoop.util.GenericOptionsParser; publicclassDedup { //map将输入中的value复制到输出数据的key上,并直接输出 publicstaticclassMapextendsMapper<Object,Text,Text,Text>{ privatestaticTextline=newText();//每行数据 //实现map函数 publicvoidmap(Object key,Text value,Context context) throwsIOException,InterruptedException{ line=value; context.write(line,newText("")); } } //reduce将输入中的key复制到输出数据的key上,并直接输出 publicstaticclassReduceextendsReducer<Text,Text,Text,Text>{ //实现reduce函数 publicvoidreduce(Text key,Iterable<Text> values,Context context) throwsIOException,InterruptedException{ context.write(key,newText("")); } } publicstaticvoidmain(String[] args)throwsException{ Configuration conf =newConfiguration(); //这句话很关键 conf.set("mapred.job.tracker","192.168.1.2:9001"); String[] ioArgs=newString[]{"dedup_in","dedup_out"}; String[] otherArgs =newGenericOptionsParser(conf, ioArgs).getRemainingArgs(); if(otherArgs.length!= 2) { System.err.println("Usage: Data Deduplication <in> <out>"); System.exit(2); } Job job =newJob(conf,"Data Deduplication"); job.setJarByClass(Dedup.class); //设置Map、Combine和Reduce处理类 job.setMapperClass(Map.class); job.setCombinerClass(Reduce.class); job.setReducerClass(Reduce.class); //设置输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //设置输入和输出目录 FileInputFormat.addInputPath(job,newPath(otherArgs[0])); FileOutputFormat.setOutputPath(job,newPath(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } 1.4 代码结果 1)准备测试数据 通过Eclipse下面的"DFS Locations"在"/user/hadoop"目录下创建输入文件"dedup_in"文件夹(备注:"dedup_out"不需要创建。)如图1.4-1所示,已经成功创建。 图1.4-1 创建"dedup_in" 图1.4.2 上传"file*.txt" 然后在本地建立两个txt文件,通过Eclipse上传到"/user/hadoop/dedup_in"文件夹中,两个txt文件的内容如"实例描述"那两个文件一样。如图1.4-2所示,成功上传之后。 从SecureCRT远处查看"Master.Hadoop"的也能证实我们上传的两个文件。 查看两个文件的内容如图1.4-3所示: 图1.4-3 文件"file*.txt"内容 2)查看运行结果 这时我们右击Eclipse的"DFS Locations"中"/user/hadoop"文件夹进行刷新,这时会发现多出一个"dedup_out"文件夹,且里面有3个文件,然后打开双其"part-r-00000"文件,会在Eclipse中间把内容显示出来。如图1.4-4所示。 图1.4-4 运行结果 此时,你可以对比一下和我们之前预期的结果是否一致。 2、数据排序 "数据排序"是许多实际任务执行时要完成的第一项工作,比如学生成绩评比、数据建立索引等。这个实例和数据去重类似,都是先对原始数据进行初步处理,为进一步的数据操作打好基础。下面进入这个示例。 2.1 实例描述 对输入文件中数据进行排序。输入文件中的每行内容均为一个数字,即一个数据。要求在输出中每行有两个间隔的数字,其中,第一个代表原始数据在原始数据集中的位次,第二个代表原始数据。 样例输入: 1)file1: 2 32 654 32 15 756 65223 2)file2: 5956 22 650 92 3)file3: 26 54 6 样例输出: 1 2 2 6 3 15 4 22 5 26 6 32 7 32 8 54 9 92 10 650 11 654 12 756 13 5956 14 65223 2.2 设计思路 这个实例仅仅要求对输入数据进行排序,熟悉MapReduce过程的读者会很快想到在MapReduce过程中就有排序,是否可以利用这个默认的排序,而不需要自己再实现具体的排序呢?答案是肯定的。 但是在使用之前首先需要了解它的默认排序规则。它是按照key值进行排序的,如果key为封装int的IntWritable类型,那么MapReduce按照数字大小对key排序,如果key为封装为String的Text类型,那么MapReduce按照字典顺序对字符串排序。 了解了这个细节,我们就知道应该使用封装int的IntWritable型数据结构了。也就是在map中将读入的数据转化成IntWritable型,然后作为key值输出(value任意)。reduce拿到<key,value-list>之后,将输入的key作为value输出,并根据value-list中元素的个数决定输出的次数。输出的key(即代码中的linenum)是一个全局变量,它统计当前key的位次。需要注意的是这个程序中没有配置Combiner,也就是在MapReduce过程中不使用Combiner。这主要是因为使用map和reduce就已经能够完成任务了。 2.3 程序代码 程序代码如下所示: packagecom.hebut.mr; importjava.io.IOException; importorg.apache.hadoop.conf.Configuration; importorg.apache.hadoop.fs.Path; importorg.apache.hadoop.io.IntWritable; importorg.apache.hadoop.io.Text; importorg.apache.hadoop.mapreduce.Job; importorg.apache.hadoop.mapreduce.Mapper; importorg.apache.hadoop.mapreduce.Reducer; importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat; importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat; importorg.apache.hadoop.util.GenericOptionsParser; publicclassSort { //map将输入中的value化成IntWritable类型,作为输出的key publicstaticclassMapextends Mapper<Object,Text,IntWritable,IntWritable>{ privatestaticIntWritabledata=newIntWritable(); //实现map函数 publicvoidmap(Object key,Text value,Context context) throwsIOException,InterruptedException{ String line=value.toString(); data.set(Integer.parseInt(line)); context.write(data,newIntWritable(1)); } } //reduce将输入中的key复制到输出数据的key上, //然后根据输入的value-list中元素的个数决定key的输出次数 //用全局linenum来代表key的位次 publicstaticclassReduceextends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{ privatestaticIntWritablelinenum=newIntWritable(1); //实现reduce函数 publicvoidreduce(IntWritable key,Iterable<IntWritable> values,Context context) throwsIOException,InterruptedException{ for(IntWritableval:values){ context.write(linenum, key); linenum=newIntWritable(linenum.get()+1); } } } publicstaticvoidmain(String[] args)throwsException{ Configuration conf =newConfiguration(); //这句话很关键 conf.set("mapred.job.tracker","192.168.1.2:9001"); String[] ioArgs=newString[]{"sort_in","sort_out"}; String[] otherArgs =newGenericOptionsParser(conf, ioArgs).getRemainingArgs(); if(otherArgs.length!= 2) { System.err.println("Usage: Data Sort <in> <out>"); System.exit(2); } Job job =newJob(conf,"Data Sort"); job.setJarByClass(Sort.class); //设置Map和Reduce处理类 job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); //设置输出类型 job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); //设置输入和输出目录 FileInputFormat.addInputPath(job,newPath(otherArgs[0])); FileOutputFormat.setOutputPath(job,newPath(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } 2.4 代码结果 1)准备测试数据 通过Eclipse下面的"DFS Locations"在"/user/hadoop"目录下创建输入文件"sort_in"文件夹(备注:"sort_out"不需要创建。)如图2.4-1所示,已经成功创建。 图2.4-1 创建"sort_in" 图2.4.2 上传"file*.txt" 然后在本地建立三个txt文件,通过Eclipse上传到"/user/hadoop/sort_in"文件夹中,三个txt文件的内容如"实例描述"那三个文件一样。如图2.4-2所示,成功上传之后。 从SecureCRT远处查看"Master.Hadoop"的也能证实我们上传的三个文件。 查看两个文件的内容如图2.4-3所示: 图2.4-3 文件"file*.txt"内容 2)查看运行结果 这时我们右击Eclipse的"DFS Locations"中"/user/hadoop"文件夹进行刷新,这时会发现多出一个"sort_out"文件夹,且里面有3个文件,然后打开双其"part-r-00000"文件,会在Eclipse中间把内容显示出来。如图2.4-4所示。 图2.4-4 运行结果 3、平均成绩 "平均成绩"主要目的还是在重温经典"WordCount"例子,可以说是在基础上的微变化版,该实例主要就是实现一个计算学生平均成绩的例子。 3.1 实例描述 对输入文件中数据进行就算学生平均成绩。输入文件中的每行内容均为一个学生的姓名和他相应的成绩,如果有多门学科,则每门学科为一个文件。要求在输出中每行有两个间隔的数据,其中,第一个代表学生的姓名,第二个代表其平均成绩。 样本输入: 1)math: 张三 88 李四 99 王五 66 赵六 77 2)china: 张三 78 李四 89 王五 96 赵六 67 3)english: 张三 80 李四 82 王五 84 赵六 86 样本输出: 张三 82 李四 90 王五 82 赵六 76 3.2 设计思路 计算学生平均成绩是一个仿"WordCount"例子,用来重温一下开发MapReduce程序的流程。程序包括两部分的内容:Map部分和Reduce部分,分别实现了map和reduce的功能。 Map处理的是一个纯文本文件,文件中存放的数据时每一行表示一个学生的姓名和他相应一科成绩。Mapper处理的数据是由InputFormat分解过的数据集,其中InputFormat的作用是将数据集切割成小数据集InputSplit,每一个InputSlit将由一个Mapper负责处理。此外,InputFormat中还提供了一个RecordReader的实现,并将一个InputSplit解析成<key,value>对提供给了map函数。InputFormat的默认值是TextInputFormat,它针对文本文件,按行将文本切割成InputSlit,并用LineRecordReader将InputSplit解析成<key,value>对,key是行在文本中的位置,value是文件中的一行。 Map的结果会通过partion分发到Reducer,Reducer做完Reduce操作后,将通过以格式OutputFormat输出。 Mapper最终处理的结果对<key,value>,会送到Reducer中进行合并,合并的时候,有相同key的键/值对则送到同一个Reducer上。Reducer是所有用户定制Reducer类地基础,它的输入是key和这个key对应的所有value的一个迭代器,同时还有Reducer的上下文。Reduce的结果由Reducer.Context的write方法输出到文件中。 3.3 程序代码 程序代码如下所示: packagecom.hebut.mr; importjava.io.IOException; importjava.util.Iterator; importjava.util.StringTokenizer; importorg.apache.hadoop.conf.Configuration; importorg.apache.hadoop.fs.Path; importorg.apache.hadoop.io.IntWritable; importorg.apache.hadoop.io.LongWritable; importorg.apache.hadoop.io.Text; importorg.apache.hadoop.mapreduce.Job; importorg.apache.hadoop.mapreduce.Mapper; importorg.apache.hadoop.mapreduce.Reducer; importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat; importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat; importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat; importorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat; importorg.apache.hadoop.util.GenericOptionsParser; publicclassScore { publicstaticclassMapextends Mapper<LongWritable, Text, Text, IntWritable> { //实现map函数 publicvoidmap(LongWritablekey, Text value, Context context) throwsIOException, InterruptedException { //将输入的纯文本文件的数据转化成String String line = value.toString(); //将输入的数据首先按行进行分割 StringTokenizer tokenizerArticle =newStringTokenizer(line,"\n"); //分别对每一行进行处理 while(tokenizerArticle.hasMoreElements()) { //每行按空格划分 StringTokenizer tokenizerLine =newStringTokenizer(tokenizerArticle.nextToken()); String strName = tokenizerLine.nextToken();//学生姓名部分 String strScore = tokenizerLine.nextToken();//成绩部分 Text name =newText(strName); intscoreInt = Integer.parseInt(strScore); //输出姓名和成绩 context.write(name,newIntWritable(scoreInt)); } } } publicstaticclassReduceextends Reducer<Text, IntWritable, Text, IntWritable> { //实现reduce函数 publicvoidreduce(Text key, Iterable<IntWritable> values, Context context)throwsIOException, InterruptedException { intsum = 0; intcount = 0; Iterator<IntWritable> iterator = values.iterator(); while(iterator.hasNext()) { sum += iterator.next().get();//计算总分 count++;//统计总的科目数 } intaverage = (int) sum / count;//计算平均成绩 context.write(key,newIntWritable(average)); } } publicstaticvoidmain(String[] args)throwsException { Configuration conf =newConfiguration(); //这句话很关键 conf.set("mapred.job.tracker","192.168.1.2:9001"); String[] ioArgs =newString[] {"score_in","score_out"}; String[] otherArgs =newGenericOptionsParser(conf, ioArgs).getRemainingArgs(); if(otherArgs.length!= 2) { System.err.println("Usage: Score Average <in> <out>"); System.exit(2); } Job job =newJob(conf,"Score Average"); job.setJarByClass(Score.class); //设置Map、Combine和Reduce处理类 job.setMapperClass(Map.class); job.setCombinerClass(Reduce.class); job.setReducerClass(Reduce.class); //设置输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //将输入的数据集分割成小数据块splites,提供一个RecordReder的实现 job.setInputFormatClass(TextInputFormat.class); //提供一个RecordWriter的实现,负责数据输出 job.setOutputFormatClass(TextOutputFormat.class); //设置输入和输出目录 FileInputFormat.addInputPath(job,newPath(otherArgs[0])); FileOutputFormat.setOutputPath(job,newPath(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } 3.4 代码结果 1)准备测试数据 通过Eclipse下面的"DFS Locations"在"/user/hadoop"目录下创建输入文件"score_in"文件夹(备注:"score_out"不需要创建。)如图3.4-1所示,已经成功创建。 图3.4-1 创建"score_in" 图3.4.2 上传三门分数 然后在本地建立三个txt文件,通过Eclipse上传到"/user/hadoop/score_in"文件夹中,三个txt文件的内容如"实例描述"那三个文件一样。如图3.4-2所示,成功上传之后。 备注:文本文件的编码为"UTF-8",默认为"ANSI",可以另存为时选择,不然中文会出现乱码。 从SecureCRT远处查看"Master.Hadoop"的也能证实我们上传的三个文件。 查看三个文件的内容如图3.4-3所示: 图3.4.3 三门成绩的内容 2)查看运行结果 这时我们右击Eclipse的"DFS Locations"中"/user/hadoop"文件夹进行刷新,这时会发现多出一个"score_out"文件夹,且里面有3个文件,然后打开双其"part-r-00000"文件,会在Eclipse中间把内容显示出来。如图3.4-4所示。 图3.4-4 运行结果 4、单表关联 前面的实例都是在数据上进行一些简单的处理,为进一步的操作打基础。"单表关联"这个实例要求从给出的数据中寻找所关心的数据,它是对原始数据所包含信息的挖掘。下面进入这个实例。 4.1 实例描述 实例中给出child-parent(孩子——父母)表,要求输出grandchild-grandparent(孙子——爷奶)表。 样例输入如下所示。 file: child parent Tom Lucy Tom Jack Jone Lucy Jone Jack Lucy Mary Lucy Ben Jack Alice Jack Jesse Terry Alice Terry Jesse Philip Terry Philip Alma Mark Terry Mark Alma 家族树状关系谱: 图4.2-1 家族谱 样例输出如下所示。 file: grandchild grandparent Tom Alice Tom Jesse Jone Alice Jone Jesse Tom Mary Tom Ben Jone Mary Jone Ben Philip Alice Philip Jesse Mark Alice Mark Jesse 4.2 设计思路 分析这个实例,显然需要进行单表连接,连接的是左表的parent列和右表的child列,且左表和右表是同一个表。 连接结果中除去连接的两列就是所需要的结果——"grandchild--grandparent"表。要用MapReduce解决这个实例,首先应该考虑如何实现表的自连接;其次就是连接列的设置;最后是结果的整理。 考虑到MapReduce的shuffle过程会将相同的key会连接在一起,所以可以将map结果的key设置成待连接的列,然后列中相同的值就自然会连接在一起了。再与最开始的分析联系起来: 要连接的是左表的parent列和右表的child列,且左表和右表是同一个表,所以在map阶段将读入数据分割成child和parent之后,会将parent设置成key,child设置成value进行输出,并作为左表;再将同一对child和parent中的child设置成key,parent设置成value进行输出,作为右表。为了区分输出中的左右表,需要在输出的value中再加上左右表的信息,比如在value的String最开始处加上字符1表示左表,加上字符2表示右表。这样在map的结果中就形成了左表和右表,然后在shuffle过程中完成连接。reduce接收到连接的结果,其中每个key的value-list就包含了"grandchild--grandparent"关系。取出每个key的value-list进行解析,将左表中的child放入一个数组,右表中的parent放入一个数组,然后对两个数组求笛卡尔积就是最后的结果了。 4.3 程序代码 程序代码如下所示。 packagecom.hebut.mr; importjava.io.IOException; importjava.util.*; importorg.apache.hadoop.conf.Configuration; importorg.apache.hadoop.fs.Path; importorg.apache.hadoop.io.IntWritable; importorg.apache.hadoop.io.Text; importorg.apache.hadoop.mapreduce.Job; importorg.apache.hadoop.mapreduce.Mapper; importorg.apache.hadoop.mapreduce.Reducer; importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat; importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat; importorg.apache.hadoop.util.GenericOptionsParser; publicclassSTjoin { publicstaticinttime= 0; /* * map将输出分割child和parent,然后正序输出一次作为右表, *反序输出一次作为左表,需要注意的是在输出的value中必须 *加上左右表的区别标识。 */ publicstaticclassMapextendsMapper<Object, Text, Text, Text> { //实现map函数 publicvoidmap(Object key, Text value, Context context) throwsIOException, InterruptedException { String childname =newString();//孩子名称 String parentname =newString();//父母名称 String relationtype =newString();//左右表标识 //输入的一行预处理文本 StringTokenizer itr=newStringTokenizer(value.toString()); String[] values=newString[2]; inti=0; while(itr.hasMoreTokens()){ values[i]=itr.nextToken(); i++; } if(values[0].compareTo("child") != 0) { childname = values[0]; parentname = values[1]; //输出左表 relationtype ="1"; context.write(newText(values[1]),newText(relationtype + "+"+ childname +"+"+ parentname)); //输出右表 relationtype ="2"; context.write(newText(values[0]),newText(relationtype + "+"+ childname +"+"+ parentname)); } } } publicstaticclassReduceextendsReducer<Text, Text, Text, Text> { //实现reduce函数 publicvoidreduce(Text key, Iterable<Text> values, Context context) throwsIOException, InterruptedException { //输出表头 if(0 ==time) { context.write(newText("grandchild"),newText("grandparent")); time++; } intgrandchildnum = 0; String[] grandchild =newString[10]; intgrandparentnum = 0; String[] grandparent =newString[10]; Iteratorite = values.iterator(); while(ite.hasNext()) { String record = ite.next().toString(); intlen = record.length(); inti = 2; if(0 == len) { continue; } //取得左右表标识 charrelationtype = record.charAt(0); //定义孩子和父母变量 String childname =newString(); String parentname =newString(); //获取value-list中value的child while(record.charAt(i) !='+') { childname += record.charAt(i); i++; } i = i + 1; //获取value-list中value的parent while(i < len) { parentname += record.charAt(i); i++; } //左表,取出child放入grandchildren if('1'== relationtype) { grandchild[grandchildnum] = childname; grandchildnum++; } //右表,取出parent放入grandparent if('2'== relationtype) { grandparent[grandparentnum] = parentname; grandparentnum++; } } //grandchild和grandparent数组求笛卡尔儿积 if(0 != grandchildnum && 0 != grandparentnum) { for(intm = 0; m < grandchildnum; m++) { for(intn = 0; n < grandparentnum; n++) { //输出结果 context.write(newText(grandchild[m]),newText(grandparent[n])); } } } } } publicstaticvoidmain(String[] args)throwsException { Configuration conf =newConfiguration(); //这句话很关键 conf.set("mapred.job.tracker","192.168.1.2:9001"); String[] ioArgs =newString[] {"STjoin_in","STjoin_out"}; String[] otherArgs =newGenericOptionsParser(conf, ioArgs).getRemainingArgs(); if(otherArgs.length!= 2) { System.err.println("Usage: Single Table Join <in> <out>"); System.exit(2); } Job job =newJob(conf,"Single Table Join"); job.setJarByClass(STjoin.class); //设置Map和Reduce处理类 job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); //设置输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //设置输入和输出目录 FileInputFormat.addInputPath(job,newPath(otherArgs[0])); FileOutputFormat.setOutputPath(job,newPath(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } 4.4 代码结果 1)准备测试数据 通过Eclipse下面的"DFS Locations"在"/user/hadoop"目录下创建输入文件"STjoin_in"文件夹(备注:"STjoin_out"不需要创建。)如图4.4-1所示,已经成功创建。 图4.4-1 创建"STjoin_in" 图4.4.2 上传"child-parent"表 然后在本地建立一个txt文件,通过Eclipse上传到"/user/hadoop/STjoin_in"文件夹中,一个txt文件的内容如"实例描述"那个文件一样。如图4.4-2所示,成功上传之后。 从SecureCRT远处查看"Master.Hadoop"的也能证实我们上传的文件,显示其内容如图4.4-3所示: 图4.4-3 表"child-parent"内容 2)运行详解 (1)Map处理: map函数输出结果如下所示。 child parentàà 忽略此行 Tom Lucyàà <Lucy,1+Tom+Lucy> <Tom,2+Tom+Lucy > Tom Jackàà <Jack,1+Tom+Jack> <Tom,2+Tom+Jack> Jone Lucy àà <Lucy,1+Jone+Lucy> <Jone,2+Jone+Lucy> Jone Jackàà <Jack,1+Jone+Jack> <Jone,2+Jone+Jack> Lucy Maryàà <Mary,1+Lucy+Mary> <Lucy,2+Lucy+Mary> Lucy Benàà <Ben,1+Lucy+Ben> <Lucy,2+Lucy+Ben> Jack Aliceàà <Alice,1+Jack+Alice> <Jack,2+Jack+Alice> Jack Jesseàà <Jesse,1+Jack+Jesse> <Jack,2+Jack+Jesse> Terry Aliceàà <Alice,1+Terry+Alice> <Terry,2+Terry+Alice> Terry Jesseàà <Jesse,1+Terry+Jesse> <Terry,2+Terry+Jesse> Philip Terryàà <Terry,1+Philip+Terry> <Philip,2+Philip+Terry> Philip Almaàà <Alma,1+Philip+Alma> <Philip,2+Philip+Alma> Mark Terryàà <Terry,1+Mark+Terry> <Mark,2+Mark+Terry> Mark Alma àà <Alma,1+Mark+Alma> <Mark,2+Mark+Alma> (2)Shuffle处理 在shuffle过程中完成连接。 map函数输出 排序结果 shuffle连接 <Lucy,1+Tom+Lucy> <Tom,2+Tom+Lucy> <Jack,1+Tom+Jack> <Tom,2+Tom+Jack> <Lucy,1+Jone+Lucy> <Jone,2+Jone+Lucy> <Jack,1+Jone+Jack> <Jone,2+Jone+Jack> <Mary,1+Lucy+Mary> <Lucy,2+Lucy+Mary> <Ben,1+Lucy+Ben> <Lucy,2+Lucy+Ben> <Alice,1+Jack+Alice> <Jack,2+Jack+Alice> <Jesse,1+Jack+Jesse> <Jack,2+Jack+Jesse> <Alice,1+Terry+Alice> <Terry,2+Terry+Alice> <Jesse,1+Terry+Jesse> <Terry,2+Terry+Jesse> <Terry,1+Philip+Terry> <Philip,2+Philip+Terry> <Alma,1+Philip+Alma> <Philip,2+Philip+Alma> <Terry,1+Mark+Terry> <Mark,2+Mark+Terry> <Alma,1+Mark+Alma> <Mark,2+Mark+Alma> <Alice,1+Jack+Alice> <Alice,1+Terry+Alice> <Alma,1+Philip+Alma> <Alma,1+Mark+Alma> <Ben,1+Lucy+Ben> <Jack,1+Tom+Jack> <Jack,1+Jone+Jack> <Jack,2+Jack+Alice> <Jack,2+Jack+Jesse> <Jesse,1+Jack+Jesse> <Jesse,1+Terry+Jesse> <Jone,2+Jone+Lucy> <Jone,2+Jone+Jack> <Lucy,1+Tom+Lucy> <Lucy,1+Jone+Lucy> <Lucy,2+Lucy+Mary> <Lucy,2+Lucy+Ben> <Mary,1+Lucy+Mary> <Mark,2+Mark+Terry> <Mark,2+Mark+Alma> <Philip,2+Philip+Terry> <Philip,2+Philip+Alma> <Terry,2+Terry+Alice> <Terry,2+Terry+Jesse> <Terry,1+Philip+Terry> <Terry,1+Mark+Terry> <Tom,2+Tom+Lucy> <Tom,2+Tom+Jack> <Alice,1+Jack+Alice, 1+Terry+Alice, 1+Philip+Alma, 1+Mark+Alma > <Ben,1+Lucy+Ben> <Jack,1+Tom+Jack, 1+Jone+Jack, 2+Jack+Alice, 2+Jack+Jesse > <Jesse,1+Jack+Jesse, 1+Terry+Jesse > <Jone,2+Jone+Lucy, 2+Jone+Jack> <Lucy,1+Tom+Lucy, 1+Jone+Lucy, 2+Lucy+Mary, 2+Lucy+Ben> <Mary,1+Lucy+Mary, 2+Mark+Terry, 2+Mark+Alma> <Philip,2+Philip+Terry, 2+Philip+Alma> <Terry,2+Terry+Alice, 2+Terry+Jesse, 1+Philip+Terry, 1+Mark+Terry> <Tom,2+Tom+Lucy, 2+Tom+Jack> (3)Reduce处理 首先由语句"0 != grandchildnum && 0 != grandparentnum"得知,只要在"value-list"中没有左表或者右表,则不会做处理,可以根据这条规则去除无效的shuffle连接。 无效的shuffle连接 有效的shuffle连接 <Alice,1+Jack+Alice, 1+Terry+Alice, 1+Philip+Alma, 1+Mark+Alma > <Ben,1+Lucy+Ben> <Jesse,1+Jack+Jesse, 1+Terry+Jesse > <Jone,2+Jone+Lucy, 2+Jone+Jack> <Mary,1+Lucy+Mary, 2+Mark+Terry, 2+Mark+Alma> <Philip,2+Philip+Terry, 2+Philip+Alma> <Tom,2+Tom+Lucy, 2+Tom+Jack> <Jack,1+Tom+Jack, 1+Jone+Jack, 2+Jack+Alice, 2+Jack+Jesse > <Lucy,1+Tom+Lucy, 1+Jone+Lucy, 2+Lucy+Mary, 2+Lucy+Ben> <Terry,2+Terry+Alice, 2+Terry+Jesse, 1+Philip+Terry, 1+Mark+Terry> 然后根据下面语句进一步对有效的shuffle连接做处理。 // 左表,取出child放入grandchildren if ('1' == relationtype) { grandchild[grandchildnum] = childname; grandchildnum++; } // 右表,取出parent放入grandparent if ('2' == relationtype) { grandparent[grandparentnum] = parentname; grandparentnum++; } 针对一条数据进行分析: <Jack,1+Tom+Jack, 1+Jone+Jack, 2+Jack+Alice, 2+Jack+Jesse > 分析结果:左表用"字符1"表示,右表用"字符2"表示,上面的<key,value-list>中的"key"表示左表与右表的连接键。而"value-list"表示以"key"连接的左表与右表的相关数据。 根据上面针对左表与右表不同的处理规则,取得两个数组的数据如下所示: grandchild Tom、Jone(grandchild[grandchildnum] = childname;) grandparent Alice、Jesse(grandparent[grandparentnum] = parentname;) 然后根据下面语句进行处理。 for (int m = 0; m < grandchildnum; m++) { for (int n = 0; n < grandparentnum; n++) { context.write(new Text(grandchild[m]), new Text(grandparent[n])); } } 处理结果如下面所示: Tom Jesse Tom Alice Jone Jesse Jone Alice 其他的有效shuffle连接处理都是如此。 3)查看运行结果 这时我们右击Eclipse的"DFS Locations"中"/user/hadoop"文件夹进行刷新,这时会发现多出一个"STjoin_out"文件夹,且里面有3个文件,然后打开双其"part-r-00000"文件,会在Eclipse中间把内容显示出来。如图4.4-4所示。 图4.4-4 运行结果 5、多表关联 多表关联和单表关联类似,它也是通过对原始数据进行一定的处理,从其中挖掘出关心的信息。下面进入这个实例。 5.1 实例描述 输入是两个文件,一个代表工厂表,包含工厂名列和地址编号列;另一个代表地址表,包含地址名列和地址编号列。要求从输入数据中找出工厂名和地址名的对应关系,输出"工厂名——地址名"表。 样例输入如下所示。 1)factory: factoryname addressed Beijing Red Star 1 Shenzhen Thunder 3 Guangzhou Honda 2 Beijing Rising 1 Guangzhou Development Bank2 Tencent 3 Back of Beijing 1 2)address: addressID addressname 1 Beijing 2 Guangzhou 3 Shenzhen 4 Xian 样例输出如下所示。 factoryname addressname Back of Beijing Beijing Beijing Red Star Beijing Beijing Rising Beijing Guangzhou Development BankGuangzhou Guangzhou Honda Guangzhou Shenzhen Thunder Shenzhen Tencent Shenzhen 5.2 设计思路 多表关联和单表关联相似,都类似于数据库中的自然连接。相比单表关联,多表关联的左右表和连接列更加清楚。所以可以采用和单表关联的相同的处理方式,map识别出输入的行属于哪个表之后,对其进行分割,将连接的列值保存在key中,另一列和左右表标识保存在value中,然后输出。reduce拿到连接结果之后,解析value内容,根据标志将左右表内容分开存放,然后求笛卡尔积,最后直接输出。 这个实例的具体分析参考单表关联实例。下面给出代码。 5.3 程序代码 程序代码如下所示: packagecom.hebut.mr; importjava.io.IOException; importjava.util.*; importorg.apache.hadoop.conf.Configuration; importorg.apache.hadoop.fs.Path; importorg.apache.hadoop.io.IntWritable; importorg.apache.hadoop.io.Text; importorg.apache.hadoop.mapreduce.Job; importorg.apache.hadoop.mapreduce.Mapper; importorg.apache.hadoop.mapreduce.Reducer; importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat; importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat; importorg.apache.hadoop.util.GenericOptionsParser; publicclassMTjoin { publicstaticinttime= 0; /* *在map中先区分输入行属于左表还是右表,然后对两列值进行分割, *保存连接列在key值,剩余列和左右表标志在value中,最后输出 */ publicstaticclassMapextendsMapper<Object, Text, Text, Text> { //实现map函数 publicvoidmap(Object key, Text value, Context context) throwsIOException, InterruptedException { String line = value.toString();//每行文件 String relationtype =newString();//左右表标识 //输入文件首行,不处理 if(line.contains("factoryname") ==true || line.contains("addressed") ==true) { return; } //输入的一行预处理文本 StringTokenizer itr =newStringTokenizer(line); String mapkey =newString(); String mapvalue =newString(); inti = 0; while(itr.hasMoreTokens()) { //先读取一个单词 String token = itr.nextToken(); //判断该地址ID就把存到"values[0]" if(token.charAt(0) >='0'&& token.charAt(0) <='9') { mapkey = token; if(i > 0) { relationtype ="1"; }else{ relationtype ="2"; } continue; } //存工厂名 mapvalue += token +" "; i++; } //输出左右表 context.write(newText(mapkey),newText(relationtype +"+"+ mapvalue)); } } /* * reduce解析map输出,将value中数据按照左右表分别保存, *然后求出笛卡尔积,并输出。 */ publicstaticclassReduceextendsReducer<Text, Text, Text, Text> { //实现reduce函数 publicvoidreduce(Text key, Iterable<Text> values, Context context) throwsIOException, InterruptedException { //输出表头 if(0 ==time) { context.write(newText("factoryname"),newText("addressname")); time++; } intfactorynum = 0; String[] factory =newString[10]; intaddressnum = 0; String[]address=newString[10]; Iteratorite = values.iterator(); while(ite.hasNext()) { String record = ite.next().toString(); intlen = record.length(); inti = 2; if(0 == len) { continue; } //取得左右表标识 charrelationtype = record.charAt(0); //左表 if('1'== relationtype) { factory[factorynum] = record.substring(i); factorynum++; } //右表 if('2'== relationtype) { address[addressnum] = record.substring(i); addressnum++; } } //求笛卡尔积 if(0 != factorynum && 0 != addressnum) { for(intm = 0; m < factorynum; m++) { for(intn = 0; n < addressnum; n++) { //输出结果 context.write(newText(factory[m]), newText(address[n])); } } } } } publicstaticvoidmain(String[] args)throwsException { Configuration conf =newConfiguration(); //这句话很关键 conf.set("mapred.job.tracker","192.168.1.2:9001"); String[] ioArgs =newString[] {"MTjoin_in","MTjoin_out"}; String[] otherArgs =newGenericOptionsParser(conf, ioArgs).getRemainingArgs(); if(otherArgs.length!= 2) { System.err.println("Usage: Multiple Table Join <in> <out>"); System.exit(2); } Job job =newJob(conf,"Multiple Table Join"); job.setJarByClass(MTjoin.class); //设置Map和Reduce处理类 job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); //设置输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //设置输入和输出目录 FileInputFormat.addInputPath(job,newPath(otherArgs[0])); FileOutputFormat.setOutputPath(job,newPath(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } 5.4 代码结果 1)准备测试数据 通过Eclipse下面的"DFS Locations"在"/user/hadoop"目录下创建输入文件"MTjoin_in"文件夹(备注:"MTjoin_out"不需要创建。)如图5.4-1所示,已经成功创建。 图5.4-1 创建"MTjoin_in" 图5.4.2 上传两个数据表 然后在本地建立两个txt文件,通过Eclipse上传到"/user/hadoop/MTjoin_in"文件夹中,两个txt文件的内容如"实例描述"那两个文件一样。如图5.4-2所示,成功上传之后。 从SecureCRT远处查看"Master.Hadoop"的也能证实我们上传的两个文件。 图5.4.3 两个数据表的内容 2)查看运行结果 这时我们右击Eclipse的"DFS Locations"中"/user/hadoop"文件夹进行刷新,这时会发现多出一个"MTjoin_out"文件夹,且里面有3个文件,然后打开双其"part-r-00000"文件,会在Eclipse中间把内容显示出来。如图5.4-4所示。 图5.4-4 运行结果 6、倒排索引 "倒排索引"是文档检索系统中最常用的数据结构,被广泛地应用于全文搜索引擎。它主要是用来存储某个单词(或词组)在一个文档或一组文档中的存储位置的映射,即提供了一种根据内容来查找文档的方式。由于不是根据文档来确定文档所包含的内容,而是进行相反的操作,因而称为倒排索引(Inverted Index)。 6.1 实例描述 通常情况下,倒排索引由一个单词(或词组)以及相关的文档列表组成,文档列表中的文档或者是标识文档的ID号,或者是指文档所在位置的URL,如图6.1-1所示。 图6.1-1 倒排索引结构 从图6.1-1可以看出,单词1出现在{文档1,文档4,文档13,……}中,单词2出现在{文档3,文档5,文档15,……}中,而单词3出现在{文档1,文档8,文档20,……}中。在实际应用中,还需要给每个文档添加一个权值,用来指出每个文档与搜索内容的相关度,如图6.1-2所示。 图6.1-2 添加权重的倒排索引 最常用的是使用词频作为权重,即记录单词在文档中出现的次数。以英文为例,如图6.1-3所示,索引文件中的"MapReduce"一行表示:"MapReduce"这个单词在文本T0中出现过1次,T1中出现过1次,T2中出现过2次。当搜索条件为"MapReduce"、"is"、"Simple"时,对应的集合为:{T0,T1,T2}∩{T0,T1}∩{T0,T1}={T0,T1},即文档T0和T1包含了所要索引的单词,而且只有T0是连续的。 图6.1-3 倒排索引示例 更复杂的权重还可能要记录单词在多少个文档中出现过,以实现TF-IDF(Term Frequency-Inverse Document Frequency)算法,或者考虑单词在文档中的位置信息(单词是否出现在标题中,反映了单词在文档中的重要性)等。 样例输入如下所示。 1)file1: MapReduce is simple 2)file2: MapReduce is powerful is simple 3)file3: Hello MapReduce bye MapReduce 样例输出如下所示。 MapReduce file1.txt:1;file2.txt:1;file3.txt:2; is file1.txt:1;file2.txt:2; simple file1.txt:1;file2.txt:1; powerful file2.txt:1; Hello file3.txt:1; bye file3.txt:1; 6.2 设计思路 实现"倒排索引"只要关注的信息为:单词、文档URL及词频,如图3-11所示。但是在实现过程中,索引文件的格式与图6.1-3会略有所不同,以避免重写OutPutFormat类。下面根据MapReduce的处理过程给出倒排索引的设计思路。 1)Map过程 首先使用默认的TextInputFormat类对输入文件进行处理,得到文本中每行的偏移量及其内容。显然,Map过程首先必须分析输入的<key,value>对,得到倒排索引中需要的三个信息:单词、文档URL和词频,如图6.2-1所示。 图6.2-1 Map过程输入/输出 这里存在两个问题:第一,<key,value>对只能有两个值,在不使用Hadoop自定义数据类型的情况下,需要根据情况将其中两个值合并成一个值,作为key或value值;第二,通过一个Reduce过程无法同时完成词频统计和生成文档列表,所以必须增加一个Combine过程完成词频统计。 这里讲单词和URL组成key值(如"MapReduce:file1.txt"),将词频作为value,这样做的好处是可以利用MapReduce框架自带的Map端排序,将同一文档的相同单词的词频组成列表,传递给Combine过程,实现类似于WordCount的功能。 2)Combine过程 经过map方法处理后,Combine过程将key值相同的value值累加,得到一个单词在文档在文档中的词频,如图6.2-2所示。如果直接将图6.2-2所示的输出作为Reduce过程的输入,在Shuffle过程时将面临一个问题:所有具有相同单词的记录(由单词、URL和词频组成)应该交由同一个Reducer处理,但当前的key值无法保证这一点,所以必须修改key值和value值。这次将单词作为key值,URL和词频组成value值(如"file1.txt:1")。这样做的好处是可以利用MapReduce框架默认的HashPartitioner类完成Shuffle过程,将相同单词的所有记录发送给同一个Reducer进行处理。 图6.2-2 Combine过程输入/输出 3)Reduce过程 经过上述两个过程后,Reduce过程只需将相同key值的value值组合成倒排索引文件所需的格式即可,剩下的事情就可以直接交给MapReduce框架进行处理了。如图6.2-3所示。索引文件的内容除分隔符外与图6.1-3解释相同。 4)需要解决的问题 本实例设计的倒排索引在文件数目上没有限制,但是单词文件不宜过大(具体值与默认HDFS块大小及相关配置有关),要保证每个文件对应一个split。否则,由于Reduce过程没有进一步统计词频,最终结果可能会出现词频未统计完全的单词。可以通过重写InputFormat类将每个文件为一个split,避免上述情况。或者执行两次MapReduce,第一次MapReduce用于统计词频,第二次MapReduce用于生成倒排索引。除此之外,还可以利用复合键值对等实现包含更多信息的倒排索引。 图6.2-3 Reduce过程输入/输出 6.3 程序代码 程序代码如下所示: packagecom.hebut.mr; importjava.io.IOException; importjava.util.StringTokenizer; importorg.apache.hadoop.conf.Configuration; importorg.apache.hadoop.fs.Path; importorg.apache.hadoop.io.IntWritable; importorg.apache.hadoop.io.Text; importorg.apache.hadoop.mapreduce.Job; importorg.apache.hadoop.mapreduce.Mapper; importorg.apache.hadoop.mapreduce.Reducer; importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat; importorg.apache.hadoop.mapreduce.lib.input.FileSplit; importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat; importorg.apache.hadoop.util.GenericOptionsParser; publicclassInvertedIndex { publicstaticclassMapextendsMapper<Object, Text, Text, Text> { privateTextkeyInfo=newText();//存储单词和URL组合 privateTextvalueInfo=newText();//存储词频 privateFileSplitsplit;//存储Split对象 //实现map函数 publicvoidmap(Object key, Text value, Context context) throwsIOException, InterruptedException { //获得<key,value>对所属的FileSplit对象 split= (FileSplit) context.getInputSplit(); StringTokenizer itr =newStringTokenizer(value.toString()); while(itr.hasMoreTokens()) { // key值由单词和URL组成,如"MapReduce:file1.txt" //获取文件的完整路径 //keyInfo.set(itr.nextToken()+":"+split.getPath().toString()); //这里为了好看,只获取文件的名称。 intsplitIndex =split.getPath().toString().indexOf("file"); keyInfo.set(itr.nextToken() +":" +split.getPath().toString().substring(splitIndex)); //词频初始化为1 valueInfo.set("1"); context.write(keyInfo,valueInfo); } } } publicstaticclassCombineextendsReducer<Text, Text, Text, Text> { privateTextinfo=newText(); //实现reduce函数 publicvoidreduce(Text key, Iterable<Text> values, Context context) throwsIOException, InterruptedException { //统计词频 intsum = 0; for(Text value : values) { sum += Integer.parseInt(value.toString()); } intsplitIndex = key.toString().indexOf(":"); //重新设置value值由URL和词频组成 info.set(key.toString().substring(splitIndex + 1) +":"+ sum); //重新设置key值为单词 key.set(key.toString().substring(0, splitIndex)); context.write(key,info); } } publicstaticclassReduceextendsReducer<Text, Text, Text, Text> { privateTextresult=newText(); //实现reduce函数 publicvoidreduce(Text key, Iterable<Text> values, Context context) throwsIOException, InterruptedException { //生成文档列表 String fileList =newString(); for(Text value : values) { fileList += value.toString() +";"; } result.set(fileList); context.write(key,result); } } publicstaticvoidmain(String[] args)throwsException { Configuration conf =newConfiguration(); //这句话很关键 conf.set("mapred.job.tracker","192.168.1.2:9001"); String[] ioArgs =newString[] {"index_in","index_out"}; String[] otherArgs =newGenericOptionsParser(conf, ioArgs) .getRemainingArgs(); if(otherArgs.length!= 2) { System.err.println("Usage: Inverted Index <in> <out>"); System.exit(2); } Job job =newJob(conf,"Inverted Index"); job.setJarByClass(InvertedIndex.class); //设置Map、Combine和Reduce处理类 job.setMapperClass(Map.class); job.setCombinerClass(Combine.class); job.setReducerClass(Reduce.class); //设置Map输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //设置Reduce输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //设置输入和输出目录 FileInputFormat.addInputPath(job,newPath(otherArgs[0])); FileOutputFormat.setOutputPath(job,newPath(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } 6.4 代码结果 1)准备测试数据 通过Eclipse下面的"DFS Locations"在"/user/hadoop"目录下创建输入文件"index_in"文件夹(备注:"index_out"不需要创建。)如图6.4-1所示,已经成功创建。 然后在本地建立三个txt文件,通过Eclipse上传到"/user/hadoop/index_in"文件夹中,三个txt文件的内容如"实例描述"那三个文件一样。如图6.4-2所示,成功上传之后。 从SecureCRT远处查看"Master.Hadoop"的也能证实我们上传的三个文件。 图6.4.3 三个"file*.txt"的内容 2)查看运行结果 这时我们右击Eclipse的"DFS Locations"中"/user/hadoop"文件夹进行刷新,这时会发现多出一个"index_out"文件夹,且里面有3个文件,然后打开双其"part-r-00000"文件,会在Eclipse中间把内容显示出来。如图6.4-4所示。

优秀的个人博客,低调大师

2013年学习方向

1. CMMI项目管理实战 http://product.china-pub.com/61307 2. UML团队开发流程与管理(第2版) http://product.china-pub.com/60715 3.敏捷回顾:团队从优秀到卓越之道http://product.china-pub.com/60869 4. 需求工程:基础、原理和技术 http://product.china-pub.com/3683783 5. IT服务管理标准理解与实施GB/T 24405.1(IDT ISO/IEC 20000-1)实用指南 http://product.china-pub.com/58196 6.写给大家看的项目管理书(第2版)http://product.china-pub.com/197903 7 面向服务的行业解决方案:原理、方法与实践 http://product.china-pub.com/55814 8. ITSS系列培训IT服务项目经理(预订中,估价) http://product.china-pub.com/3660729 9.云计算与分布式系统:从并行处理到物联网 http://product.china-pub.com/3022004 10.精益创业实战:第2版http://product.china-pub.com/3021882 11.移动云计算应用开发入门经典http://product.china-pub.com/3021774 12., 产品经理的五项修炼 http://product.china-pub.com/3021496 13. 移动金融:创建移动互联网时代新金融模式(国内移动金融领域第一本专著) http://product.china-pub.com/3021133 14.敏捷职业发展IBM的经验与方法 http://product.china-pub.com/61032 15; 拨得云开见日出——解构一个典型的云计算系统 http://product.china-pub.com/61025 16.SharePoint 2010云计算解决方案 http://product.china-pub.com/3684029 17. IT产品销售与服务 http://product.china-pub.com/3683336 18. 下一站-用户体验-修订版 http://product.china-pub.com/3725892 19.云计算架构:解决方案设计手册 http://product.china-pub.com/3663270 20. AWS云端企业实战圣经:亚马逊如何构造云端计算 http://product.china-pub.com/3682599 21.云计算与数据中心自动化(思科一线技术专家组精心编写 首次提出了构建和管理云的完整解决方案)http://product.china-pub.com/60421 22. 产品经理修炼之道(Pmcaff创始人费杰力作,国内外20余家大型互联网资深产品经理与技术专家联袂推荐) http://product.china-pub.com/3682533 23/ 云计算实战 http://product.china-pub.com/3682477 24. 大数据-正在到来的数据革命.以及它如何改变政府.商业与我们的生活 http://product.china-pub.com/3712338 25. 情报战争-移动互联时代企业成功密码 http://product.china-pub.com/3709701 26.云计算揭秘-企业实施云计算的核心问题http://product.china-pub.com/3700553 27. 物联网安全技术 http://product.china-pub.com/3661737 28. 移动支付改变生活——电信运营商的移动支付创新与实践 http://product.china-pub.com/3661539 29. 人人都是产品经理Version 1.1(china-pub首发) http://product.china-pub.com/972643 30/.创新的神话(第二版)(中文版)http://product.china-pub.com/199564 31.他山之石:TechStars孵化器中的创业真经(天使投资人谈创业真经)(china-pub首发)http://product.china-pub.com/199568 32. 电子商务安全管理 http://product.china-pub.com/60089 33.苹果帝国风云录http://product.china-pub.com/60001 34. Web 3.0营销实战:巧夺先机(游击营销之父Jay Conrad Levinson倾情作序) http://product.china-pub.com/199269 35/ 修炼之道:互联网产品从设计到运营 http://product.china-pub.com/59998 36/网站转换率优化之道(数字化市场营销的启蒙书)http://product.china-pub.com/199255 37. 中国SOA最佳应用及云计算融合实践 http://product.china-pub.com/199211 38. 硅谷中关村人脉网络(柳传志、胡昭广、张景安、许成钢 倾力推荐!) 39. 上市公司激励约束机制——设计、比较与选择 http://product.china-pub.com/972133 40.转型中的企业云服务 http://product.china-pub.com/59721 41. 电子商务和电子政务安全 http://product.china-pub.com/1192254 42.虚拟化技术实战http://product.china-pub.com/199056 43.构建云应用:概念、模式和实践http://product.china-pub.com/199058 44. 电子商务模拟运作-(第二版)-(含光盘) http://product.china-pub.com/1164555 45. 云计算:应用开发实践(附CD光盘1张) http://product.china-pub.com/199009 46. 基于服务的动态电子商务交互与应用 http://product.china-pub.com/1151893 47.Gamestorming:创新、变革&非凡思维训练http://product.china-pub.com/194691 48. 云那些事儿 http://product.china-pub.com/194669 49/. 500万人的大迁徙—IT业转型路口的思考 http://product.china-pub.com/58708 50. 大话云计算(平生不知云计算,便称精英也扯淡) http://product.china-pub.com/58656 51. TABLES:改变中国互联网未来的六大力量 http://product.china-pub.com/1131546 52. 演讲之禅:一位技术演讲家的自白(原书第2版)(献给需要参加公共演讲的程序员精英们) http://product.china-pub.com/198779 53/互联网创业启示录(StackOverflow.com创办人、《软件随想录》作者Joel Spolsky作序)(china-pub首发) 54.信息安全工程 55. O2O:移动互联网时代的商业革命 http://product.china-pub.com/3766497#ml

资源下载

更多资源
优质分享App

优质分享App

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

腾讯云软件源

腾讯云软件源

为解决软件依赖安装时官方源访问速度慢的问题,腾讯云为一些软件搭建了缓存服务。您可以通过使用腾讯云软件源站来提升依赖包的安装速度。为了方便用户自由搭建服务架构,目前腾讯云软件源站支持公网访问和内网访问。

Nacos

Nacos

Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。

用户登录
用户注册