首页 文章 精选 留言 我的

精选列表

搜索[分布式锁],共10000篇文章
优秀的个人博客,低调大师

Hive、MapReduce、Spark分布式生成唯一数值型ID

在实际业务场景下,经常会遇到在Hive、MapReduce、Spark中需要生成唯一的数值型ID。 一般常用的做法有: MapReduce中使用1个Reduce来生成; Hive中使用row_number分析函数来生成,其实也是1个Reduce; 借助HBase或Redis或Zookeeper等其它框架的计数器来生成; 数据量不大的情况下,可以直接使用1和2方法来生成,但如果数据量巨大,1个Reduce处理起来就非常慢。 在数据量非常大的情况下,如果你仅仅需要唯一的数值型ID,注意:不是需要”连续的唯一的数值型ID”,那么可以考虑采用本文中介绍的方法,否则,请使用第3种方法来完成。 Spark中生成这样的非连续唯一数值型ID,非常简单,直接使用zipWithUniqueId()即可。 参考zipWithUniqueId()的方法,在MapReduce和Hive中,实现如下: 在Spark中,zipWithUniqueId是通过使用分区Index作为每个分区ID的开始值,在每个分区内,ID增长的步长为该RDD的分区数,那么在MapReduce和Hive中,也可以照此思路实现,Spark中的分区数,即为MapReduce中的Map数,Spark分区的Index,即为Map Task的ID。Map数,可以通过JobConf的getNumMapTasks(),而Map Task ID,可以通过参数mapred.task.id获取,格式如:attempt_1478926768563_0537_m_000004_0,截取m_000004_0中的4,再加1,作为该Map Task的ID起始值。注意:这两个只均需要在Job运行时才能获取。另外,从图中也可以看出,每个分区/Map Task中的数据量不是绝对一致的,因此,生成的ID不是连续的。 下面的UDF可以在Hive中直接使用: packagecom.lxw1234.hive.udf; importorg.apache.hadoop.hive.ql.exec.MapredContext; importorg.apache.hadoop.hive.ql.exec.UDFArgumentException; importorg.apache.hadoop.hive.ql.metadata.HiveException; importorg.apache.hadoop.hive.ql.udf.UDFType; importorg.apache.hadoop.hive.ql.udf.generic.GenericUDF; importorg.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; importorg.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; importorg.apache.hadoop.io.LongWritable; @UDFType(deterministic=false,stateful=true) publicclassRowSeq2extendsGenericUDF{ privatestaticLongWritableresult=newLongWritable(); privatestaticfinalcharSEPARATOR='_'; privatestaticfinalStringATTEMPT="attempt"; privatelonginitID=0l; privateintincrement=0; @Override publicvoidconfigure(MapredContextcontext){ increment=context.getJobConf().getNumMapTasks(); if(increment==0){ thrownewIllegalArgumentException("mapred.map.tasksiszero"); } initID=getInitId(context.getJobConf().get("mapred.task.id"),increment); if(initID==0l){ thrownewIllegalArgumentException("mapred.task.id"); } System.out.println("initID:"+initID+"increment:"+increment); } @Override publicObjectInspectorinitialize(ObjectInspector[]arguments) throwsUDFArgumentException{ returnPrimitiveObjectInspectorFactory.writableLongObjectInspector; } @Override publicObjectevaluate(DeferredObject[]arguments)throwsHiveException{ result.set(getValue()); increment(increment); returnresult; } @Override publicStringgetDisplayString(String[]children){ return"RowSeq-func()"; } privatesynchronizedvoidincrement(intincr){ initID+=incr; } privatesynchronizedlonggetValue(){ returninitID; } //attempt_1478926768563_0537_m_000004_0//return0+1 privatelonggetInitId(StringtaskAttemptIDstr,intnumTasks) throwsIllegalArgumentException{ try{ String[]parts=taskAttemptIDstr.split(Character.toString(SEPARATOR)); if(parts.length==6){ if(parts[0].equals(ATTEMPT)){ if(!parts[3].equals("m")&&!parts[3].equals("r")){ thrownewException(); } longresult=Long.parseLong(parts[4]); if(result>=numTasks){//iftaskid>=numtasks thrownewException("TaskAttemptIdstring:"+taskAttemptIDstr +"parseID["+result+"]>=numTasks["+numTasks+"].."); } returnresult+1; } } }catch(Exceptione){} thrownewIllegalArgumentException("TaskAttemptIdstring:"+taskAttemptIDstr +"isnotproperlyformed"); } } 有一张去重后的用户id(字符串类型)表,需要位每个用户id生成一个唯一的数值型seq: ADDjarfile:///tmp/udf.jar; CREATEtemporaryfunctionseq2as'com.lxw1234.hive.udf.RowSeq2'; hive>>desclxw_all_ids; OK idstring Timetaken:0.074seconds,Fetched:1row(s) hive>select*fromlxw_all_idslimit5; OK 01779E7A06ABF5565A4982_cookie 031E2D2408C29556420255_cookie 03371ADA0B6E405806FFCD_cookie 0517C4B701BC1256BFF6EC_cookie 05F12ADE0E880455931C1A_cookie Timetaken:0.215seconds,Fetched:5row(s) hive>selectcount(1)fromlxw_all_ids; 253402337 hive>createtablelxw_all_ids2asselectid,seq2()asseqfromlxw_all_ids; … HadoopjobinformationforStage-1:numberofmappers:27;numberofreducers:0 … 该Job使用了27个Map Task,没有使用Reduce,那么将会产生27个结果文件。 再看结果表中的数据: hive>select*fromlxw_all_ids2limit10; OK 766CA2770527B257D332AA_cookie1 5A0492DB0000C557A81383_cookie28 8C06A5770F176E58301EEF_cookie55 6498F47B0BCAFE5842B83A_cookie82 6DA33CB709A23758428A44_cookie109 B766347B0D27925842AC2D_cookie136 5794357B050C99584251AC_cookie163 81D67A7B011BEA5842776C_cookie190 9D2F8EB40AEA525792347D_cookie217 BD21077B09F9E25844D2C1_cookie244 hive>selectcount(1),count(distinctseq)fromlxw_all_ids2; 253402337253402337 limit 10只从第一个结果文件,即MapTaskId为0的结果文件中拿了10条,这个Map中,start=1,increment=27,因此生成的ID如上所示。 count(1),count(distinct seq)的值相同,说明seq没有重复值,你可以试试max(seq),结果必然大于253402337,说明seq是”非连续唯一数值型ID“. 本文作者:佚名 来源:51CTO

优秀的个人博客,低调大师

程超:手把手教你动手扩展分布式调用链

一、说在前面 微服务是当下最火的词语,现在很多公司都在推广微服务,当服务越来越多的时候,我们是否会纠结以下几个问题: 面对一笔超时的订单,究竟是哪一步处理时间超长呢? 数据由于并发莫名篡改,到底都谁有重大嫌疑呢? 处理遗漏了一笔订单,曾经是哪个环节出错把它落下了? 系统莫名的报错,究竟是哪一个服务报的错误? 每个服务那么多实例服务器,如何快速定位到是哪一个实例服务器报错的呢? 现在很多系统都要求可用性达到99.9%以上,那么我们除了增加系统健壮性减少故障的同时,我们又如何在真正发生故障的时候,快速定位和解决问题,也将是我们的重中之重。 在做微服务框架选择的时候,Spring Cloud无疑是当下最火的,但是因为Spring Cloud是近二年的后起新秀,以及在使用方式上面的差别,目前在很多中小企业还是以dubbo为主,不过遗憾的是,dubbo从官

优秀的个人博客,低调大师

Ceph分布式存储学习指南1.6 Ceph文件系统

1.6 Ceph文件系统 Ceph文件系统(也就是CephFS)是一个兼容POSIX的文件系统,它利用Ceph存储集群来保存用户数据。Linux内核驱动程序支持CephFS,这也使得CephFS高度适用于各大Linux操作系统发行版。CephFS将数据和元数据分开存储,为上层的应用程序提供较高的性能以及可靠性。 在Cpeh集群内部,Ceph文件系统库(libcephfs)运行在RADOS库(librados)之上,后者是Ceph存储集群协议,由文件、块和对象存储共用。要使用CephFS,你的集群节点上最少要配置一个Ceph元数据服务器(MDS)。然而,需要注意的是,单一的MDS服务器将成为Ceph文件系统的单点故障。MDS配置后,客户端可以采用多种方式使用CephFS。如果要把Ceph挂载成文件系统,客户端可以使用本地Linux内核的

资源下载

更多资源
Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

Nacos

Nacos

Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。

Spring

Spring

Spring框架(Spring Framework)是由Rod Johnson于2002年提出的开源Java企业级应用框架,旨在通过使用JavaBean替代传统EJB实现方式降低企业级编程开发的复杂性。该框架基于简单性、可测试性和松耦合性设计理念,提供核心容器、应用上下文、数据访问集成等模块,支持整合Hibernate、Struts等第三方框架,其适用范围不仅限于服务器端开发,绝大多数Java应用均可从中受益。

Sublime Text

Sublime Text

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。

用户登录
用户注册