c#扩展出MapReduce方法
MapReduce方法主体:
1 public static IDictionary<TKey, TResult> MapReduce<TInput, TKey, TValue, TResult>(this IList<TInput> inputList, 2 Func<MapReduceData<TInput>, KeyValueClass<TKey, TValue>> map, Func<TKey, IList<TValue>, TResult> reduce) 3 { 4 object locker = new object(); 5 ConcurrentDictionary<TKey, TResult> result = new ConcurrentDictionary<TKey, TResult>(); 6 //保存map出来的结果 7 ConcurrentDictionary<TKey, IList<TValue>> mapDic = new ConcurrentDictionary<TKey, IList<TValue>>(); 8 var parallelOptions = new ParallelOptions(); 9 parallelOptions.MaxDegreeOfParallelism = Environment.ProcessorCount; 10 //并行map 11 Parallel.For(0, inputList.Count(), parallelOptions, t => 12 { 13 MapReduceData<TInput> data = new MapReduceData<TInput> 14 { 15 Data = inputList[t], 16 Index = t, 17 List = inputList, 18 }; 19 var pair = map(data); 20 if (pair != null && pair.Valid) 21 { 22 //锁住防止并发操作list造成数据缺失 23 lock (locker) 24 { 25 //将匹配出来的结果加入结果集放入字典 26 IList<TValue> list = null; 27 if (mapDic.ContainsKey(pair.Key)) 28 { 29 list = mapDic[pair.Key]; 30 } 31 else 32 { 33 list = new List<TValue>(); 34 mapDic[pair.Key] = list; 35 } 36 list.Add(pair.Value); 37 } 38 } 39 }); 40 41 //并行reduce 42 Parallel.For(0, mapDic.Keys.Count, parallelOptions, t => 43 { 44 KeyValuePair<TKey, IList<TValue>> pair = mapDic.ElementAt(t); 45 result[pair.Key] = reduce(pair.Key, pair.Value); 46 }); 47 return result; 48 }
KeyValueClass定义:
1 public class KeyValueClass<K, V> 2 { 3 public KeyValueClass(K key, V value) 4 { 5 Key = key; 6 Value = value; 7 } 8 9 public KeyValueClass() 10 { 11 12 } 13 14 public K Key { get; set; } 15 16 public V Value { get; set; } 17 }
Console测试:
1 List<TestClass> listTestClass = new List<TestClass>(); 2 listTestClass.Add(new TestClass { a = "a", g = 1 }); 3 listTestClass.Add(new TestClass { a = "b", g = 3 }); 4 listTestClass.Add(new TestClass { a = "c", g = 4 }); 5 listTestClass.Add(new TestClass { a = "d", g = 2 }); 6 listTestClass.Add(new TestClass { a = "e", g = 1 }); 7 listTestClass.Add(new TestClass { a = "f", g = 2 }); 8 listTestClass.Add(new TestClass { a = "g", g = 5 }); 9 listTestClass.Add(new TestClass { a = "h", g = 6 }); 10 IDictionary<int, string> dic = listTestClass.MapReduce(t => 11 { 12 if (t.g < 5) 13 { 14 return new KeyValueClass<int, string>(t.g, t.a); 15 } 16 return null; 17 }, (key, values) => 18 { 19 return string.Join(",", values); 20 });
TestClass定义:
1 public class TestClass 2 { 3 public string a { get; set; } 4 public string b { get; set; } 5 6 public string d { get; set; } 7 8 //public DateTime f { get; set; } 9 10 public int g { get; set; } 11 12 public List<TestClass> test { get; set; } 13 14 public Dictionary<string, string> dic { get; set; } 15 }
结果:
1:a,e
2:d,f
3:b
4:c
词频性能测试

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
MapReduce的模式,算法以及用例
本文译自Mapreduce Patterns, Algorithms, and Use Cases 在这篇文章里总结了几种网上或者论文中常见的MapReduce模式和算法,并系统化的解释了这些技术的不同之处。所有描述性的文字和代码都使用了标准hadoop的MapReduce模型,包括Mappers, Reduces, Combiners, Partitioners,和 sorting。如下图所示。 基本MapReduce模式 计数与求和 问题陈述: 有许多文档,每个文档都有一些字段组成。需要计算出每个字段在所有文档中的出现次数或者这些字段的其他什么统计值。例如,给定一个log文件,其中的每条记录都包含一个响应时间,需要计算出平均响应时间。 解决方案: 让我们先从简单的例子入手。在下面的代码片段里,Mapper每遇到指定词就把频次记1,Reducer一个个遍历这些词的集合然后把他们的频次加和。 1 class Mapper 2 method Map(docid id, doc d) 3 for all term t in doc d do 4 Emit(term t, count 1 ...
- 下一篇
namenode磁盘满引发recover edits文件报错
前段时间公司hadoop集群宕机,发现是namenode磁盘满了, 清理出部分空间后,重启集群时,重启失败。 又发现集群Secondary namenode 服务也恰恰坏掉,导致所有的操作log持续写入edits.new 文件,等集群宕机的时候文件大小已经达到了丧心病狂的70G+..重启集群报错 加载edits文件失败。分析加载文件报错原因是磁盘不足导致最后写入的log只写入一半就宕机了。由于log不完整,hadoop再次启动加载edits文件时读取文件报错。由于edits.new 文件过大,存储了好多操作log,所以必须要对其进行修复。 尝试删除文件的最后几行,结果还是报错。于是查看源码对edits 文件结构进行分析发现是二进制格式,首行为版本号,然后是hadoop运行过程中的log记录内容,由操作码 +长度(非必须)+其他项组成。 edits文件格式分析图 解决办法 报错位置在源码中的方法为org.apache.hadoop.hdfs.server.namenode.FSEditLog.loadFSEdits(EditLogInputStreamedits)方法中读取文件最后位置时...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- CentOS关闭SELinux安全模块