您现在的位置是:首页 > 文章详情

MapReduce优化倒序排序

日期:2021-06-01点击:506

  1. 第一次完成正常的统计总流量数据,第二步将结果进行排序

  2. context.write(总流量,手机号)

  3. SorFlowBean实现WritableComparable接口重写compareTo方法

     @Override  public int compareTo(FlowBean o) {   // 倒序排列,从大到小   return this.sumFlow > o.getSumFlow() ? -1 : 1;  }

SortFlowBean类

 package com.zyd.sortflowbean; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class SortFlowBean implements WritableComparable{ private long upFlow; private long downFlow; private long sumFlow; //反序列化需要反射调用空参构造函数 public SortFlowBean() { super(); } public SortFlowBean(long upFlow, long downFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public void set(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } @Override /**  * 反序列化方法 注意反序列化的顺序和序列化的顺序完全一致  */ public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } /**  * 序列化方法  */ @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } /**  * 重点  */ @Override public int compareTo(SortFlowBean o) { //倒序排序,从大到小 return this.sumFlow>o.getSumFlow()?-1:1; } }

对于mapper方法优化为一个对象,reduce方法则直接输出结果即可,驱动函数根据输入输出重写配置即可
把需要做操作的数据作为key 不需要的或者重复的累加数据作为value

package com.zyd.sortflowbean; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /**  *   * @author Administrator  *  */ public class FlowSortMapper extends Mapper{ SortFlowBean sBean = new SortFlowBean(); Text v = new Text(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { // 1 获取一行 String line = value.toString(); // 2 截取字段 String[] fields = line.split("\t"); //3 封装对象以及获取电话号码 long upFlow = Long.parseLong(fields[1]); long downFlow = Long.parseLong(fields[2]); sBean.set(upFlow, downFlow); v.set(fields[0]); //4 写出去 context.write(sBean, v); } }

Reducer方法

 package com.zyd.sortflowbean; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /**  * 输出的时候要求是key 是flowBean value 是Text 手机号  * @author Administrator  *  */ public class FlowSortReducer extends Reducer{ @Override protected void reduce(SortFlowBean bean, Iterablevalues,Context context) throws IOException, InterruptedException { Text v = values.iterator().next(); context.write(v, bean); } }

实现的主驱动类

 package com.zyd.sortflowbean; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.zyd.flowsum.FlowBean; public class FlowSorDriver  { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1 获取job信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //2 获取jar的存储路径 job.setJarByClass(FlowSorDriver.class); //3 关联map和reduce的class类 job.setMapperClass(FlowSortMapper.class); job.setReducerClass(FlowSortReducer.class); //4 设置map阶段输出的key和value类型 job.setMapOutputKeyClass(SortFlowBean.class); job.setMapOutputValueClass(Text.class); //5 设置最后输出数据的key和value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(SortFlowBean.class); //6 设置输入数据的路径 和输出数据的路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //7 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0:1); } }

               

原文链接:https://blog.51cto.com/bigdata/2842218
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章