1.程序需要的材料
文件中各个字段的含义,其中第6,7,8,9是要统计的流量相关的字段.
![]()
文件内容:
13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200
84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200
13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200
13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200
13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200
15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200
15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200
13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200
13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200
13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 200
13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200
13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200
18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200
13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200
13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200
13823070001 20-7C-8F-70-68-1F:CMCC 120.196.100.99 6 3 360 180 200
13600217502 00-1F-64-E2-E8-B1:CMCC 120.196.100.55 18 138 1080 186852 200
二.程序:
1 package mapreducejob;
2
3 /**
4 * 老师给的元数据信息如下:
5 * 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
6 * 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
7 * 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
8 * 第六个字段是上行数据包数.
9 * 第七个字段是下行数据包数
10 * 第八个是上行总流量
11 * 第九个是下行总流量
12 */
13
14 import java.io.DataInput;
15 import java.io.DataOutput;
16 import java.io.IOException;
17
18 import org.apache.hadoop.conf.Configuration;
19 import org.apache.hadoop.fs.Path;
20 import org.apache.hadoop.io.LongWritable;
21 import org.apache.hadoop.io.Text;
22 import org.apache.hadoop.io.Writable;
23 import org.apache.hadoop.mapreduce.Job;
24 import org.apache.hadoop.mapreduce.Mapper;
25 import org.apache.hadoop.mapreduce.Reducer;
26 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
27 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
28
29 public class TrafficApp {
30
31 public static void main(String[] args) throws Exception {
32 Job job = Job.getInstance(new Configuration(),
33 TrafficApp.class.getSimpleName());
34 job.setJarByClass(TrafficApp.class);// 通过jar包运行.
35
36 FileInputFormat.setInputPaths(job, args[0]);// 数据输入,指定数据源
37
38 job.setMapperClass(MyMapper.class);// 给job设置map
39 job.setMapOutputKeyClass(Text.class);
40 job.setMapOutputValueClass(TrafficWritable.class);
41
42 job.setReducerClass(MyReducer.class);
43 job.setOutputKeyClass(Text.class);
44 job.setOutputValueClass(TrafficWritable.class);
45
46 FileOutputFormat.setOutputPath(job, new Path(args[1]));
47
48 job.waitForCompletion(true);// 在集群中运行
49 }
50
51 public static class MyMapper extends
52 Mapper<LongWritable, Text, Text, TrafficWritable> {
53 // 这四个参数分别是<k1,v1>和<k2,v2>
54 // k1代表的是字节的偏移量,v1是原始数据. k2是手机号.v2是每一次的通话流量
55 Text k2 = new Text();// new 一个作为k2手机号.
56 TrafficWritable v2 = new TrafficWritable();// new 一个作为v2
57
58 @Override
59 protected void map(
60 LongWritable key,
61 Text value,
62 Mapper<LongWritable, Text, Text, TrafficWritable>.Context context)
63 throws IOException, InterruptedException {
64 String line = value.toString();
65 String[] splited = line.split("\t");// 以制表符作为拆分符得到一个字节数组.
66 // 通过原始数据文件可以看到这个里面有11个字段,所以这个拆分的数组长度为11
67 k2.set(splited[1]);// k2是手机号,在这个数组中是第二个.
68 v2.set(splited[6], splited[7], splited[8], splited[9]);// v2是代表四个流量
69 // 对应这个被拆分数组的第6,7,8,9个.
70 context.write(k2, v2);
71 }
72
73 }
74
75 public static class MyReducer extends
76 Reducer<Text, TrafficWritable, Text, TrafficWritable> {
77 // 四个参数分别是<k2,v2> <k3,v3>
78 // k2是手机号,v2是流量TrafficWritable k3是手机号,v3是流量汇总.
79 TrafficWritable v3 = new TrafficWritable();
80
81 @Override
82 protected void reduce(
83 Text k2,
84 Iterable<TrafficWritable> v2s,
85 Reducer<Text, TrafficWritable, Text, TrafficWritable>.Context context)
86 throws IOException, InterruptedException {
87 // reduce方法的第一个形参是k2,第二个形参是v2s,第三个形参是一个context上下文
88 // v2s是流量集合.我们在reduce方法中要做的就是把v2汇总起来变成v3.
89 long t1 = 0L;
90 long t2 = 0L;
91 long t3 = 0L;
92 long t4 = 0L;
93 for (TrafficWritable v2 : v2s) {
94 t1 += v2.t1;
95 t2 += v2.t2;
96 t3 += v2.t3;
97 t4 += v2.t4;
98 }
99 v3.set(t1, t2, t3, t4);//构造v3
100 context.write(k2, v3);
101 }
102
103 }
104
105 /**
106 * 针对流量设置一个流量类. 第六个字段是上行数据包数. 第七个字段是下行数据包数. 第八个是上行总流量. 第九个是下行总流量
107 *
108 */
109 static class TrafficWritable implements Writable {
110 // 这个类是流量统计类,这个类包含了该手机号的上传和下载的流量
111 // 在MapReduce中的键值对中代表的是v3,有四列组成.
112 long t1;
113 long t2;
114 long t3;
115 long t4;
116
117 // 再搞一个无产的构造函数,否则容易出错
118 public TrafficWritable() {
119 }
120
121 public void set(long t1, long t2, long t3, long t4) {
122 // 赋值的方法,这个地方是传入的long类型.
123 this.t1 = t1;
124 this.t2 = t2;
125 this.t3 = t3;
126 this.t4 = t4;
127 }
128
129 public void set(String t1, String t2, String t3, String t4) {
130 // 赋值的方法,这个地方是传入的String类型.
131 this.t1 = Long.parseLong(t1);
132 this.t2 = Long.parseLong(t2);
133 this.t3 = Long.parseLong(t3);
134 this.t4 = Long.parseLong(t4);
135 }
136
137 public void readFields(DataInput in) throws IOException {
138 // 四列都通过in.readLong()读进来.
139 this.t1 = in.readLong();
140 this.t2 = in.readLong();
141 this.t3 = in.readLong();
142 this.t4 = in.readLong();
143 }
144
145 public void write(DataOutput out) throws IOException {
146 // 这个对象有四列,必须要把四列都给写出去.
147 out.writeLong(t1);
148 out.writeLong(t2);
149 out.writeLong(t3);
150 out.writeLong(t4);
151 }
152
153 public String toString() {
154 // 在Reduce阶段会用到这个方法,否则输出的是哈希编码
155 return this.t1 + "\t" + this.t2 + "\t" + this.t3 + "\t" + this.t4;
156 }
157 }
158
159 }
//===============================================================
代码二:
1 package mapreduce;
2
3 import java.io.DataInput;
4 import java.io.DataOutput;
5 import java.io.IOException;
6
7 import org.apache.hadoop.conf.Configuration;
8 import org.apache.hadoop.fs.Path;
9 import org.apache.hadoop.io.LongWritable;
10 import org.apache.hadoop.io.Text;
11 import org.apache.hadoop.io.Writable;
12 import org.apache.hadoop.mapreduce.Job;
13 import org.apache.hadoop.mapreduce.Mapper;
14 import org.apache.hadoop.mapreduce.Partitioner;
15 import org.apache.hadoop.mapreduce.Reducer;
16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
18
19 public class TrafficApp {
20 public static void main(String[] args) throws Exception {
21 Job job = Job.getInstance(new Configuration(), TrafficApp.class.getSimpleName());
22 job.setJarByClass(TrafficApp.class);
23
24 FileInputFormat.setInputPaths(job, args[0]);
25
26 job.setMapperClass(TrafficMapper.class);
27 job.setMapOutputKeyClass(Text.class);
28 job.setMapOutputValueClass(TrafficWritable.class);
29
30 job.setNumReduceTasks(2);//设定Reduce的数量为2
31 job.setPartitionerClass(TrafficPartitioner.class);//设定一个Partitioner的类.
32 /*
33 *Partitioner是如何实现不同的Map输出分配到不同的Reduce中?
34 *在不适用指定的Partitioner时,有 一个默认的Partitioner.
35 *就是HashPartitioner.
36 *其只有一行代码,其意思就是过来的key,不管是什么,模numberReduceTasks之后 返回值就是reduce任务的编号.
37 *numberReduceTasks的默认值是1. 任何一个数模1(取余数)都是0.
38 *这个地方0就是取编号为0的Reduce.(Reduce从0开始编号.)
39 */
40
41 job.setReducerClass(TrafficReducer.class);
42 job.setOutputKeyClass(Text.class);
43 job.setOutputValueClass(TrafficWritable.class);
44
45 FileOutputFormat.setOutputPath(job, new Path(args[1]));
46 job.waitForCompletion(true);
47 }
48
49 public static class TrafficPartitioner extends Partitioner<Text,TrafficWritable>{//k2,v2
50
51 @Override
52 public int getPartition(Text key, TrafficWritable value,int numPartitions) {
53 long phoneNumber = Long.parseLong(key.toString());
54 return (int)(phoneNumber%numPartitions);
55 }
56
57 }
58
59
60 /**
61 * 第一个参数是LongWritable类型是文本一行数据开头的字节数
62 * 第二个参数是文本中的一行数据 Text类型
63 * 第三个参数是要输出的手机号 Text类型
64 * 第四个参数是需要我们自定义的流量类型TrafficWritable
65 * @author ABC
66 *
67 */
68 public static class TrafficMapper extends Mapper<LongWritable, Text, Text, TrafficWritable>{
69 Text k2 = new Text();
70 TrafficWritable v2 = null;
71 @Override
72 protected void map(LongWritable key,Text value, Mapper<LongWritable, Text, Text, TrafficWritable>.Context context)
73 throws IOException, InterruptedException {
74 String line = value.toString();
75 String[] splited = line.split("\t");
76
77 k2.set(splited[1]);//这个值对应的是手机号码
78 v2 = new TrafficWritable(splited[6], splited[7], splited[8], splited[9]);
79 context.write(k2, v2);
80 }
81
82 }
83
84 public static class TrafficReducer extends Reducer <Text, TrafficWritable, Text, TrafficWritable>{
85 @Override
86 protected void reduce(Text k2,Iterable<TrafficWritable> v2s,
87 Reducer<Text, TrafficWritable, Text, TrafficWritable>.Context context)
88 throws IOException, InterruptedException {
89 //遍历v2s 流量都这个集合里面
90 long t1 = 0L;
91 long t2 = 0L;
92 long t3 = 0L;
93 long t4 = 0L;
94
95 for (TrafficWritable v2 : v2s) {
96 t1 += v2.getT1();
97 t2 += v2.getT2();
98 t3 += v2.getT3();
99 t4 += v2.getT4();
100 }
101 TrafficWritable v3 = new TrafficWritable(t1, t2, t3, t4);
102 context.write(k2, v3);
103 }
104 }
105
106 public static class TrafficWritable implements Writable{
107 private long t1;
108 private long t2;
109 private long t3;
110 private long t4;
111 //写两个构造方法,一个是有参数的构造方法,一个是无参数的构造方法.
112 //必须要有 一个无参数的构造方法,否则程序运行会报错.
113
114 public TrafficWritable(){
115 super();
116 }
117
118 public TrafficWritable(long t1, long t2, long t3, long t4) {
119 super();
120 this.t1 = t1;
121 this.t2 = t2;
122 this.t3 = t3;
123 this.t4 = t4;
124 }
125 //在程序中读取文本穿过来的都是字符串,所以再搞一个字符串类型的构造方法
126 public TrafficWritable(String t1, String t2, String t3, String t4) {
127 super();
128 this.t1 = Long.parseLong(t1);
129 this.t2 = Long.parseLong(t2);
130 this.t3 = Long.parseLong(t3);
131 this.t4 = Long.parseLong(t4);
132 }
133
134 public void write(DataOutput out) throws IOException {
135 //对各个成员变量进行序列化
136 out.writeLong(t1);
137 out.writeLong(t2);
138 out.writeLong(t3);
139 out.writeLong(t4);
140 }
141
142 public void readFields(DataInput in) throws IOException {
143 //对成员变量进行反序列化
144 this.t1 = in.readLong();
145 this.t2 = in.readLong();
146 this.t3 = in.readLong();
147 this.t4 = in.readLong();
148 }
149
150 public long getT1() {
151 return t1;
152 }
153
154 public void setT1(long t1) {
155 this.t1 = t1;
156 }
157
158 public long getT2() {
159 return t2;
160 }
161
162 public void setT2(long t2) {
163 this.t2 = t2;
164 }
165
166 public long getT3() {
167 return t3;
168 }
169
170 public void setT3(long t3) {
171 this.t3 = t3;
172 }
173
174 public long getT4() {
175 return t4;
176 }
177
178 public void setT4(long t4) {
179 this.t4 = t4;
180 }
181
182 @Override
183 public String toString() {
184 return t1 + "\t" + t2 + "\t" + t3 + "\t" + t4 ;
185 }
186
187 }
188 }
本文转自SummerChill博客园博客,原文链接:http://www.cnblogs.com/DreamDrive/p/6260491.html,如需转载请自行联系原作者