MapReduce业务 - 图片关联计算
1.概述
最近在和人交流时谈到数据相似度和数据共性问题,而刚好在业务层面有类似的需求,今天和大家分享这类问题的解决思路,分享目录如下所示:
- 业务背景
- 编码实践
- 预览截图
下面开始今天的内容分享。
2.业务背景
目前有这样一个背景,在一大堆数据中,里面存放着图片的相关信息,如下图所示:
上图只是给大家列举的一个示例数据格式,第一列表示自身图片,第二、第三......等列表示与第一列相关联的图片信息。那么我们从这堆数据中如何找出他们拥有相同图片信息的图片。
2.1 实现思路
那么,我们在明确了上述需求后,下面我们来分析它的实现思路。首先,我们通过上图所要实现的目标结果,其最终计算结果如下所示:
pic_001pic_002 pic_003,pic_004,pic_005 pic_001pic_003 pic_002,pic_005 pic_001pic_004 pic_002,pic_005 pic_001pic_005 pic_002,pic_003,pic_004 ......
结果如上所示,找出两两图片之间的共性图片,结果未列完整,只是列举了部分,具体结果大家可以参考截图预览的相关信息。
下面给大家介绍解决思路,通过观察数据,我们可以发现在上述数据当中,我们要计算图片两两的共性图片,可以从关联图片入手,在关联图片中我们可 以找到共性图片的关联信息,比如:我们要计算pic001pic002图片的共性图片,我们可以在关联图片中找到两者(pic001pic002组合)后 对应的自身图片(key),最后在将所有的key求并集即为两者的共性图片信息,具体信息如下图所示:
通过上图,我们可以知道具体的实现思路,步骤如下所示:
- 第一步:拆分数据,关联数据两两组合作为Key输出。
- 第二步:将相同Key分组,然后求并集得到计算结果。
这里使用一个MR来完成此项工作,在明白了实现思路后,我们接下来去实现对应的编码。
3.编码实践
- 拆分数据,两两组合。
public static class PictureMap extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { StringTokenizer strToken = new StringTokenizer(value.toString()); Text owner = new Text(); Set<String> set = new TreeSet<String>(); owner.set(strToken.nextToken()); while (strToken.hasMoreTokens()) { set.add(strToken.nextToken()); } String[] relations = new String[set.size()]; relations = set.toArray(relations); for (int i = 0; i < relations.length; i++) { for (int j = i + 1; j < relations.length; j++) { String outPutKey = relations[i] + relations[j]; context.write(new Text(outPutKey), owner); } } } }
- 按Key分组,求并集
public static class PictureReduce extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { String common = ""; for (Text val : values) { if (common == "") { common = val.toString(); } else { common = common + "," + val.toString(); } } context.write(key, new Text(common)); } }
- 完整示例
package cn.hadoop.hdfs.example; import java.io.IOException; import java.util.Set; import java.util.StringTokenizer; import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.hadoop.hdfs.util.HDFSUtils; import cn.hadoop.hdfs.util.SystemConfig; /** * @Date Aug 31, 2015 * * @Author dengjie * * @Note Find picture relations */ public class PictureRelations extends Configured implements Tool { private static Logger log = LoggerFactory.getLogger(PictureRelations.class); private static Configuration conf; static { String tag = SystemConfig.getProperty("dev.tag"); String[] hosts = SystemConfig.getPropertyArray(tag + ".hdfs.host", ","); conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://cluster1"); conf.set("dfs.nameservices", "cluster1"); conf.set("dfs.ha.namenodes.cluster1", "nna,nns"); conf.set("dfs.namenode.rpc-address.cluster1.nna", hosts[0]); conf.set("dfs.namenode.rpc-address.cluster1.nns", hosts[1]); conf.set("dfs.client.failover.proxy.provider.cluster1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); } public static class PictureMap extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { StringTokenizer strToken = new StringTokenizer(value.toString()); Text owner = new Text(); Set<String> set = new TreeSet<String>(); owner.set(strToken.nextToken()); while (strToken.hasMoreTokens()) { set.add(strToken.nextToken()); } String[] relations = new String[set.size()]; relations = set.toArray(relations); for (int i = 0; i < relations.length; i++) { for (int j = i + 1; j < relations.length; j++) { String outPutKey = relations[i] + relations[j]; context.write(new Text(outPutKey), owner); } } } } public static class PictureReduce extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { String common = ""; for (Text val : values) { if (common == "") { common = val.toString(); } else { common = common + "," + val.toString(); } } context.write(key, new Text(common)); } } public int run(String[] args) throws Exception { final Job job = Job.getInstance(conf); job.setJarByClass(PictureMap.class); job.setMapperClass(PictureMap.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(PictureReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, args[0]); FileOutputFormat.setOutputPath(job, new Path(args[1])); int status = job.waitForCompletion(true) ? 0 : 1; return status; } public static void main(String[] args) { try { if (args.length != 1) { log.warn("args length must be 1 and as date param"); return; } String tmpIn = SystemConfig.getProperty("hdfs.input.path.v2"); String tmpOut = SystemConfig.getProperty("hdfs.output.path.v2"); String inPath = String.format(tmpIn, "t_pic_20150801.log"); String outPath = String.format(tmpOut, "meta/" + args[0]); // bak dfs file to old HDFSUtils.bak(tmpOut, outPath, "meta/" + args[0] + "-old", conf); args = new String[] { inPath, outPath }; int res = ToolRunner.run(new Configuration(), new PictureRelations(), args); System.exit(res); } catch (Exception ex) { ex.printStackTrace(); log.error("Picture relations task has error,msg is" + ex.getMessage()); } } }
4.截图预览
关于计算结果,如下图所示:
5.总结
本篇博客只是从思路上实现了图片关联计算,在数据量大的情况下,是有待优化的,这里就不多做赘述了,后续有时间在为大家分析其中的细节。
6.结束语
这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
ElasticSearch实战-编码实践
1.概述 前面在《ElasticSearch实战-入门》中给大家分享如何搭建这样一个集群,在完成集群的搭建后,今天给大家分享如何实现对应的业务功能模块,下面是今天的分享内容,目录如下所示: 编码实践 效果预览 总结 2.编码实践 由于 ES 集群支持 Restful 接口,我们可以直接通过 Java 来调用 Restful 接口来查询我们需要的数据结果,并将查询到的结果在在我们的业务界面可视化出来。我们知道在 ES 集群的 Web 管理界面有这样一个入口,如下图所示: 我们可以在此界面的入口中拼接 JSON 字符串来查询我们想要的结果,下面,我们通过 Java 的 API 去调用 Restful 接口来查询我们想要的结果。 2.1 字符串拼接实现 接着,我们去实现要查询的核心代码,具体内容实现如下所示: public String buildQueryString(Map<String, Object> param) throws ParseException { SimpleDateFormat dfs = new SimpleDateFormat("yyyy-MM-dd...
- 下一篇
JStorm-介绍
1.概述 JStorm 是一个类似于 Hadoop 的MapReduce的计算系统,它是由Alibaba开源的实时计算模型,它使用Java重写了原生的Storm模型(Clojure和Java混合编 写的),并且再原来的基础上做了许多改进。用户只需按照指定的接口实现一个任务,然后将这个任务提交给JStorm系统,JStorm在接受了任务指令 后,会无间断运行任务,一旦出现异常导致某个Worker发送故障,调度器立刻会分配一个新的Worker去顶替异常的Worker。下面是本次分享的目 录结构: 应用场景 基本术语 JStorm比较 JStorm架构 总结 下面开始今天的内容分享。 2.应用场景 从应用的角度来说,JStorm它是一种分布式的应用;从系统层面来说,它又类似于MapReduce这样的调度系统;而从数据方面来说,它又 是一种基于流水数据的实时处理解决方案。如今,DT时代的当下,用户和企业也不仅仅只满足于离线数据,对于数据的实时性要求也越来越高了。 在早期,Storm和JStorm未问世之前,业界有很多实时计算系统,可谓百家争鸣,自Storm和JStorm出世之后,基本这两者占据...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- CentOS7,CentOS8安装Elasticsearch6.8.6
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS7设置SWAP分区,小内存服务器的救世主
- Docker快速安装Oracle11G,搭建oracle11g学习环境