MapReduce优化倒序排序
第一次完成正常的统计总流量数据,第二步将结果进行排序
context.write(总流量,手机号)
SorFlowBean实现WritableComparable接口重写compareTo方法
@Override public int compareTo(FlowBean o) { // 倒序排列,从大到小 return this.sumFlow > o.getSumFlow() ? -1 : 1; }
SortFlowBean类
package com.zyd.sortflowbean; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class SortFlowBean implements WritableComparable{ private long upFlow; private long downFlow; private long sumFlow; //反序列化需要反射调用空参构造函数 public SortFlowBean() { super(); } public SortFlowBean(long upFlow, long downFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public void set(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } @Override /** * 反序列化方法 注意反序列化的顺序和序列化的顺序完全一致 */ public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } /** * 序列化方法 */ @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } /** * 重点 */ @Override public int compareTo(SortFlowBean o) { //倒序排序,从大到小 return this.sumFlow>o.getSumFlow()?-1:1; } }
对于mapper方法优化为一个对象,reduce方法则直接输出结果即可,驱动函数根据输入输出重写配置即可
把需要做操作的数据作为key 不需要的或者重复的累加数据作为value
package com.zyd.sortflowbean; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * * @author Administrator * */ public class FlowSortMapper extends Mapper{ SortFlowBean sBean = new SortFlowBean(); Text v = new Text(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { // 1 获取一行 String line = value.toString(); // 2 截取字段 String[] fields = line.split("\t"); //3 封装对象以及获取电话号码 long upFlow = Long.parseLong(fields[1]); long downFlow = Long.parseLong(fields[2]); sBean.set(upFlow, downFlow); v.set(fields[0]); //4 写出去 context.write(sBean, v); } }
Reducer方法
package com.zyd.sortflowbean; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * 输出的时候要求是key 是flowBean value 是Text 手机号 * @author Administrator * */ public class FlowSortReducer extends Reducer{ @Override protected void reduce(SortFlowBean bean, Iterablevalues,Context context) throws IOException, InterruptedException { Text v = values.iterator().next(); context.write(v, bean); } }
实现的主驱动类
package com.zyd.sortflowbean; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.zyd.flowsum.FlowBean; public class FlowSorDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1 获取job信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //2 获取jar的存储路径 job.setJarByClass(FlowSorDriver.class); //3 关联map和reduce的class类 job.setMapperClass(FlowSortMapper.class); job.setReducerClass(FlowSortReducer.class); //4 设置map阶段输出的key和value类型 job.setMapOutputKeyClass(SortFlowBean.class); job.setMapOutputValueClass(Text.class); //5 设置最后输出数据的key和value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(SortFlowBean.class); //6 设置输入数据的路径 和输出数据的路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //7 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0:1); } }

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
安卓开发_自定义控件_界面的简单侧滑
主界面 package com.itheima.news; import android.os.Bundle; import android.app.Activity; import android.view.Menu; import android.view.Window; public class NewsHomeActivity extends Activity { @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); //去掉头信息 requestWindowFeature(Window.FEATURE_NO_TITLE); setContentView(R.layout.activity_news_home); } } <RelativeLayout xmlns:android="http://schemas.android.com/apk/res/android" xmlns:tools="http://schemas.and...
- 下一篇
快速入门Kafka系列(6)——Kafka的JavaAPI操作
作为快速入门Kafka系列的第六篇博客,本篇为大家带来的是Kafka的JavaAPI操作~ 码字不易,先赞后看! 文章目录 1. 创建Maven工程并添加jar包 2. 生产者代码 4. Kafka Streams API开发 3.1 自动提交offset 3.2 手动提交offset 3.3 消费完每个分区之后手动提交offset 3.4 指定分区数据进行消费 3.5 重复消费与数据丢失 1. 使用生产者,生产数据 2. kafka当中的数据分区 3. 消费者代码 4.1 创建一个Topic 4.2 开发StreamsAPI Kafka的JavaAPI操作 Kafka的JavaAPI操作 1. 创建Maven工程并添加jar包 首先在IDEA中我们创建一个maven工程,并添加以下依赖的jar包的坐标到pom.xml <dependencies> <dependency> <groupId>org.apache.kafkagroupId> <artifactId>kafka-clientsartifactId> <v...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7设置SWAP分区,小内存服务器的救世主
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- Windows10,CentOS7,CentOS8安装Nodejs环境
- 设置Eclipse缩进为4个空格,增强代码规范
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS8编译安装MySQL8.0.19