自定义输入格式,将明星微博数据排序后按粉丝数 关注数 微博数 分别输出到不同文件中。
![]()
![]()
![]()
![]()
![]()
![]()
![]()
![]()
![]()
代码
1 package zhouls.bigdata.myMapReduce.ScoreCount;
2
3 import java.io.DataInput;
4 import java.io.DataOutput;
5 import java.io.IOException;
6 import org.apache.hadoop.io.WritableComparable;
7 /**
8 * 学习成绩读写类
9 * 数据格式参考:19020090017 小讲 90 99 100 89 95
10 * @author Bertron
11 * 需要自定义一个 ScoreWritable 类实现 WritableComparable 接口,将学生各门成绩封装起来。
12 */
13 public class ScoreWritable implements WritableComparable< Object > {//其实这里,跟TVPlayData一样的
14 // 注意: Hadoop通过Writable接口实现的序列化机制,不过没有提供比较功能,所以和java中的Comparable接口合并,提供一个接口WritableComparable。(自定义比较)
15 // Writable接口提供两个方法(write和readFields)。
16
17
18 private float Chinese;
19 private float Math;
20 private float English;
21 private float Physics;
22 private float Chemistry;
23
24
25 // 问:这里我们自己编程时,是一定要创建一个带有参的构造方法,为什么还要显式的写出来一个带无参的构造方法呢?
26 // 答:构造器其实就是构造对象实例的方法,无参数的构造方法是默认的,但是如果你创造了一个带有参数的构造方法,那么无参的构造方法必须显式的写出来,否则会编译失败。
27
28 public ScoreWritable(){}//java里的无参构造函数,是用来在创建对象时初始化对象
29 //在hadoop的每个自定义类型代码里,好比,现在的ScoreWritable,都必须要写无参构造函数。
30
31
32 //问:为什么我们在编程的时候,需要创建一个带有参的构造方法?
33 //答:就是能让赋值更灵活。构造一般就是初始化数值,你不想别人用你这个类的时候每次实例化都能用另一个构造动态初始化一些信息么(当然没有需要额外赋值就用默认的)。
34
35 public ScoreWritable(float Chinese,float Math,float English,float Physics,float Chemistry){//java里的有参构造函数,是用来在创建对象时初始化对象
36 this.Chinese = Chinese;
37 this.Math = Math;
38 this.English = English;
39 this.Physics = Physics;
40 this.Chemistry = Chemistry;
41 }
42
43 //问:其实set和get方法,这两个方法只是类中的setxxx和getxxx方法的总称,
44 // 那么,为什么在编程时,有set和set***两个,只有get***一个呢?
45
46 public void set(float Chinese,float Math,float English,float Physics,float Chemistry){
47 this.Chinese = Chinese;//即float Chinese赋值给private float Chinese;
48 this.Math = Math;
49 this.English = English;
50 this.Physics = Physics;
51 this.Chemistry = Chemistry;
52 }
53 // public float get(float Chinese,float Math,float English,float Physics,float Chemistry){因为这是错误的,所以对于set可以分开,get只能是get***
54 // return Chinese;
55 // return Math;
56 // return English;
57 // return Physics;
58 // return Chemistry;
59 // }
60
61
62 public float getChinese() {//拿值,得返回,所以需有返回类型float
63 return Chinese;
64 }
65 public void setChinese(float Chinese){//设值,不需,所以空返回类型
66 this.Chinese = Chinese;
67 }
68 public float getMath() {//拿值
69 return Math;
70 }
71 public void setMath(float Math){//设值
72 this.Math = Math;
73 }
74 public float getEnglish() {//拿值
75 return English;
76 }
77 public void setEnglish(float English){//设值
78 this.English = English;
79 }
80 public float getPhysics() {//拿值
81 return Physics;
82 }
83 public void setPhysics(float Physics){//设值
84 this.Physics = Physics;
85 }
86 public float getChemistry() {//拿值
87 return Chemistry;
88 }
89 public void setChemistry(float Chemistry) {//拿值
90 this.Chemistry = Chemistry;
91 }
92
93 // 实现WritableComparable的readFields()方法
94 // 对象不能传输的,需要转化成字节流!
95 // 将对象转换为字节流并写入到输出流out中是序列化,write 的过程(最好记!!!)
96 // 从输入流in中读取字节流反序列化为对象 是反序列化,readFields的过程(最好记!!!)
97 public void readFields(DataInput in) throws IOException {//拿代码来说的话,对象就是比如Chinese、Math。。。。
98 Chinese = in.readFloat();//因为,我们这里的对象是float类型,所以是readFloat()
99 Math = in.readFloat();
100 English = in.readFloat();//注意:反序列化里,需要生成对象对吧,所以,是用到的是get那边对象
101 Physics = in.readFloat();
102 Chemistry = in.readFloat();
103 // in.readByte()
104 // in.readChar()
105 // in.readDouble()
106 // in.readLine()
107 // in.readFloat()
108 // in.readLong()
109 // in.readShort()
110 }
111
112 // 实现WritableComparable的write()方法,以便该数据能被序列化后完成网络传输或文件输出
113 // 将对象转换为字节流并写入到输出流out中是序列化,write 的过程(最好记!!!)
114 // 从输入流in中读取字节流反序列化为对象 是反序列化,readFields的过程(最好记!!!)
115 public void write(DataOutput out) throws IOException {//拿代码来说的话,对象就是比如Chinese、Math。。。。
116 out.writeFloat(Chinese);//因为,我们这里的对象是float类型,所以是writeFloat()
117 out.writeFloat(Math);
118 out.writeFloat(English);//注意:序列化里,需要对象对吧,所以,用到的是set那边的对象
119 out.writeFloat(Physics);
120 out.writeFloat(Chemistry);
121 // out.writeByte()
122 // out.writeChar()
123 // out.writeDouble()
124 // out.writeFloat()
125 // out.writeLong()
126 // out.writeShort()
127 // out.writeUTF()
128 }
129
130 public int compareTo(Object o) {//java里的比较,Java String.compareTo()
131 return 0;
132 }
133
134
135 // Hadoop中定义了两个序列化相关的接口:Writable 接口和 Comparable 接口,这两个接口可以合并成一个接口 WritableComparable。
136 // Writable 接口中定义了两个方法,分别为write(DataOutput out)和readFields(DataInput in)
137 // 所有实现了Comparable接口的对象都可以和自身相同类型的对象比较大小
138
139
140 // Hadoop中定义了两个序列化相关的接口:Writable 接口和 Comparable 接口,这两个接口可以合并成一个接口 WritableComparable。
141 // Writable 接口中定义了两个方法,分别为write(DataOutput out)和readFields(DataInput in)
142 // 所有实现了Comparable接口的对象都可以和自身相同类型的对象比较大小
143
144
145 // 源码是
146 // package java.lang;
147 // import java.util.*;
148 // public interface Comparable {
149 // /**
150 // * 将this对象和对象o进行比较,约定:返回负数为小于,零为大于,整数为大于
151 // */
152 // public int compareTo(T o);
153 // }
154
155 }
1 package zhouls.bigdata.myMapReduce.WeiboCount;
2
3 import java.io.IOException;
4
5 import org.apache.hadoop.conf.Configuration;
6 import org.apache.hadoop.fs.FSDataInputStream;
7 import org.apache.hadoop.fs.FileSystem;
8 import org.apache.hadoop.fs.Path;
9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.InputSplit;
11 import org.apache.hadoop.mapreduce.JobContext;
12 import org.apache.hadoop.mapreduce.RecordReader;
13 import org.apache.hadoop.mapreduce.TaskAttemptContext;
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
16 import org.apache.hadoop.util.LineReader;
17 19
20
21 //其实这个程序,就是在实现InputFormat接口,TVPlayInputFormat是InputFormat接口的实现类
22 //比如 WeiboInputFormat extends FileInputFormat implements InputFormat。
23
24 //问:自定义输入格式 WeiboInputFormat 类,首先继承 FileInputFormat,然后分别重写 isSplitable() 方法和 createRecordReader() 方法。
25
26
27 public class WeiboInputFormat extends FileInputFormat<Text,WeiBo>{
28
29 // 线路是: boolean isSplitable() -> RecordReader<Text,WeiBo> createRecordReader() -> WeiboRecordReader extends RecordReader<Text, WeiBo >
30
31
32 @Override
33 protected boolean isSplitable(JobContext context, Path filename) {//这是InputFormat的isSplitable方法
34 //isSplitable方法就是是否要切分文件,这个方法显示如果是压缩文件就不切分,非压缩文件就切分。
35 // 如果不允许分割,则isSplitable==false,则将第一个block、文件目录、开始位置为0,长度为整个文件的长度封装到一个InputSplit,加入splits中
36 // 如果文件长度不为0且支持分割,则isSplitable==true,获取block大小,默认是64MB
37 return false; //整个文件封装到一个InputSplit
38 //要么就是return true; //切分64MB大小的一块一块,再封装到InputSplit
39 }
40
41
42
43
44 @Override
45 public RecordReader<Text, WeiBo> createRecordReader(InputSplit arg0,TaskAttemptContext arg1) throws IOException, InterruptedException{
46 // RecordReader<k1, v1>是返回类型,返回的RecordReader对象的封装
47 // createRecordReader是方法,在这里是,WeiboInputFormat.createRecordReader。WeiboInputFormat是InputFormat类的实例
48 // InputSplit input和TaskAttemptContext context是传入参数
49
50 // isSplitable(),如果是压缩文件就不切分,整个文件封装到一个InputSplit
51 // isSplitable(),如果是非压缩文件就切,切分64MB大小的一块一块,再封装到InputSplit
52
53 //这里默认是系统实现的的RecordReader,按行读取,下面我们自定义这个类WeiboRecordReader。
54 //类似与Excel、WeiBo、TVPlayData代码写法
55 return new WeiboRecordReader();//新建一个ScoreRecordReader实例,所有才有了上面RecordReader<Text,ScoreWritable>,所以才如下ScoreRecordReader,写我们自己的
56 }
57
58
59
60 public class WeiboRecordReader extends RecordReader<Text, WeiBo>{
61 //LineReader in是1,行号。
62 //Text line; 俞灏明 俞灏明 10591367 206 558,每行的相关记录
63 public LineReader in;//行读取器
64 public Text line;//每行数据类型
65 public Text lineKey;//自定义key类型,即k1
66 public WeiBo lineValue;//自定义value类型,即v1
67
68
69 @Override
70 public void close() throws IOException {//关闭输入流
71 if(in !=null){
72 in.close();
73 }
74 }
75 @Override
76 public Text getCurrentKey() throws IOException, InterruptedException {//获取当前的key,即CurrentKey
77 return lineKey;//返回类型是Text,即Text lineKey
78 }
79 @Override
80 public WeiBo getCurrentValue() throws IOException,InterruptedException {//获取当前的Value,即CurrentValue
81 return lineValue;//返回类型是WeiBo,即WeiBo lineValue
82 }
83 @Override
84 public float getProgress() throws IOException, InterruptedException {//获取进程,即Progress
85 return 0;//返回类型是float,即float 0
86 }
87
88
89
90 @Override
91 public void initialize(InputSplit input, TaskAttemptContext context)throws IOException, InterruptedException{//初始化,都是模板
92 FileSplit split=(FileSplit)input;//获取split
93 Configuration job=context.getConfiguration();
94 Path file=split.getPath();//得到文件路径
95 FileSystem fs=file.getFileSystem(job);
96
97 FSDataInputStream filein=fs.open(file);//打开文件
98 in=new LineReader(filein,job); //输入流in
99 line=new Text();//每行数据类型
100 lineKey=new Text();//自定义key类型,即k1。//新建一个Text实例作为自定义格式输入的key
101 lineValue = new WeiBo();//自定义value类型,即v1。//新建一个TVPlayData实例作为自定义格式输入的value
102 }
103
104
105 //此方法读取每行数据,完成自定义的key和value
106 @Override
107 public boolean nextKeyValue() throws IOException, InterruptedException{//这里面,才是篡改的重点
108 int linesize=in.readLine(line); //line是每行数据,我们这里用到的是in.readLine(str)这个构造函数,默认读完读到文件末尾。其实这里有三种。
109
110 // 是SplitLineReader.readLine -> SplitLineReader extends LineReader -> org.apache.hadoop.util.LineReader
111
112 // in.readLine(str)//这个构造方法执行时,会首先将value原来的值清空。默认读完读到文件末尾
113 // in.readLine(str, maxLineLength)//只读到maxLineLength行
114 // in.readLine(str, maxLineLength, maxBytesToConsume)//这个构造方法来实现不清空,前面读取的行的值
115
116
117 if(linesize==0) return false;
118
119 //通过分隔符'\t',将每行的数据解析成数组 pieces
120 String[] pieces = line.toString().split("\t");
121 //因为,我们这里是。默认读完读到文件末尾。line是Text类型。pieces是String[],即String数组。
122
123 if(pieces.length != 5){
124 throw new IOException("Invalid record received");
125 }
126
127 int a,b,c;
128
129 try{
130 a = Integer.parseInt(pieces[2].trim());//粉丝,//将String类型,如pieces[2]转换成,float类型,给a
131 b = Integer.parseInt(pieces[3].trim());//关注
132 c = Integer.parseInt(pieces[4].trim());//微博数
133 }catch(NumberFormatException nfe)
134 {
135 throw new IOException("Error parsing floating poing value in record");
136 }
137
138
139 //自定义key和value值
140 lineKey.set(pieces[0]); //完成自定义key数据
141 lineValue.set(b, a, c);//完成自定义value数据
142 // 或者写
143 // lineValue.set(Integer.parseInt(pieces[2].trim()),Integer.parseInt(pieces[3].trim()),Integer.parseInt(pieces[4].trim()));
144
145
146 // pieces[0] pieces[1] pieces[2] ... pieces[4]
147 // 俞灏明 俞灏明 10591367 206 558
148 // 李敏镐 李敏镐 22898071 11 268
149 // 大自然保护协会-马云 大自然保护协会-马云 15616866 0 39
150 // 林心如 林心如 57488649 214 5940
151 // 时尚小编Anne 时尚小编Anne 10064227 136 2103
152 // 黄晓明 黄晓明 22616497 506 2011
153 // 张靓颖 张靓颖 27878708 238 3846
154 // 张成龙2012 张成龙2012 9813621 199 744
155 // 吳君如大美女 吳君如大美女 18490338 190 412
156 // 李娜 李娜 23309493 81 631
157 // 徐小平 徐小平 11659926 1929 13795
158 // 唐嫣 唐嫣 24301532 200 2391
159 // 有斐君 有斐君 8779383 577 4251
160
161
162 return true;
163 }
164
165
166
167 }
168 }
1 package zhouls.bigdata.myMapReduce.WeiboCount;
2
3 import java.io.IOException;
4 import java.util.Arrays;
5 import java.util.Comparator;
6 import java.util.HashMap;
7 import java.util.Set;
8 import java.util.Map;
9
10 import org.apache.hadoop.conf.Configuration;
11 import org.apache.hadoop.conf.Configured;
12 import org.apache.hadoop.fs.FileSystem;
13 import org.apache.hadoop.fs.Path;
14
15 import org.apache.hadoop.io.IntWritable;
16 import org.apache.hadoop.io.Text;
17 import org.apache.hadoop.mapreduce.Job;
18 import org.apache.hadoop.mapreduce.Mapper;
19 import org.apache.hadoop.mapreduce.Reducer;
20 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
21 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
22 import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
23 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
24 import org.apache.hadoop.util.Tool;
25 import org.apache.hadoop.util.ToolRunner;
26
27 public class WeiboCount extends Configured implements Tool{
28 public static class WeiBoMapper extends Mapper<Text, WeiBo, Text, Text>{
29 @Override
30 protected void map(Text key, WeiBo value, Context context) throws IOException, InterruptedException{
31 context.write(new Text("follower"),new Text(key.toString() + "\t" + value.getFollowers()));
32 context.write(new Text("friend"),new Text(key.toString() + "\t" + value.getFriends()));
33 context.write(new Text("statuses"),new Text(key.toString() + "\t" + value.getStatuses()));
34 }
35 }
36
37 public static class WeiBoReducer extends Reducer<Text, Text, Text, IntWritable> {
38 private MultipleOutputs<Text, IntWritable> mos;
39
40 protected void setup(Context context) throws IOException,InterruptedException{
41 mos = new MultipleOutputs<Text, IntWritable>(context);
42 }
43
44 private Text text = new Text();
45
46 protected void reduce(Text Key, Iterable<Text> Values,Context context) throws IOException, InterruptedException{
47 int N = context.getConfiguration().getInt("reduceHasMaxLength", Integer.MAX_VALUE);
48 Map<String,Integer> m = new HashMap<String,Integer>();
49 for(Text value:Values){//星型for循环,意思是把Values的值传给Text value
50 //value=名称+(粉丝数 或 关注数 或 微博数)
51 String[] records = value.toString().split("\t");
52 m.put(records[0],Integer.parseInt(records[1].toString()));
53 }
54
55 //对Map内的数据进行排序
56 Map.Entry<String, Integer>[] entries = getSortedHashtableByValue(m);
57 for(int i = 0; i< N && i< entries.length;i++){
58 if(Key.toString().equals("follower")){
59 mos.write("follower",entries[i].getKey(), entries[i].getValue());
60 }else if(Key.toString().equals("friend")){
61 mos.write("friend", entries[i].getKey(), entries[i].getValue());
62 }else if(Key.toString().equals("status")){
63 mos.write("statuses", entries[i].getKey(), entries[i].getValue());
64 }
65 }
66 }
67
68 protected void cleanup(Context context) throws IOException,InterruptedException {
69 mos.close();
70 }
71 }
72
73
74 public int run(String[] args) throws Exception{
75 Configuration conf = new Configuration();// 配置文件对象
76 Path mypath = new Path(args[1]);
77 FileSystem hdfs = mypath.getFileSystem(conf);// 创建输出路径
78 if (hdfs.isDirectory(mypath)){
79 hdfs.delete(mypath, true);
80 }
81
82 Job job = new Job(conf, "weibo");// 构造任务
83 job.setJarByClass(WeiboCount.class);// 主类
84
85 job.setMapperClass(WeiBoMapper.class);// Mapper
86 job.setMapOutputKeyClass(Text.class);// Mapper key输出类型
87 job.setMapOutputValueClass(Text.class);// Mapper value输出类型
88
89 job.setReducerClass(WeiBoReducer.class);// Reducer
90 job.setOutputKeyClass(Text.class);
91 job.setOutputValueClass(IntWritable.class);
92 FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径
93 FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径
94 job.setInputFormatClass(WeiboInputFormat.class);// 自定义输入格式
95 //自定义文件输出类别
96 MultipleOutputs.addNamedOutput(job, "follower", TextOutputFormat.class,Text.class, IntWritable.class);
97 MultipleOutputs.addNamedOutput(job, "friend", TextOutputFormat.class,Text.class, IntWritable.class);
98 MultipleOutputs.addNamedOutput(job, "status", TextOutputFormat.class,Text.class, IntWritable.class);
99 job.waitForCompletion(true);
100 return 0;
101 }
102
103
104 //对Map内的数据进行排序(只适合小数据量)
105 public static Map.Entry[] getSortedHashtableByValue(Map h){
106 Set set = h.entrySet();
107 Map.Entry[] entries = (Map.Entry[]) set.toArray(new Map.Entry[set.size()]);
108 Arrays.sort(entries, new Comparator(){
109 public int compare(Object arg0, Object arg1){
110 Long key1 = Long.valueOf(((Map.Entry) arg0).getValue().toString());
111 Long key2 = Long.valueOf(((Map.Entry) arg1).getValue().toString());
112 return key2.compareTo(key1);
113 } });
114 return entries;
115 }
116
117 public static void main(String[] args) throws Exception{
118 // String[] args0 = { "hdfs://HadoopMaster:9000/weibo/weibo.txt",
119 // "hdfs://HadoopMaster:9000/out/weibo/" };
120
121 String[] args0 = { "./data/weibo/weibo.txt",
122 "./out/weibo/" };
123
124 int ec = ToolRunner.run(new Configuration(), new WeiboCount(), args0);
125 System.exit(ec);
126 }
127 }
128
本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/6164435.html,如需转载请自行联系原作者