Hive UDF开发
HIVE允许用户使用UDF(user defined function)对数据进行处理。用户可以使用‘show functions’ 查看function list,可以使用'describe function function-name'查看函数说明。 [plain]view plaincopy hive>showfunctions; OK ! != ...... Timetaken:0.275seconds hive>descfunctionsubstr; OK substr(str,pos[,len])-returnsthesubstringofstrthatstartsatposandisoflengthlenorsubstr(bin,pos[,len])-returnsthesliceofbytearraythatstartsatposandisoflengthlen Timetaken:0.095seconds hive提供的build-in函数包括以下几类:1. 关系操作符:包括 = 、 <> 、 <= 、>=等2. 算数操作符:包括 + 、 - 、 *、/等3. 逻辑操作符:包括AND 、 && 、 OR 、 || 等4. 复杂类型构造函数:包括map、struct、create_union等5. 复杂类型操作符:包括A[n]、Map[key]、S.x6. 数学操作符:包括ln(double a)、sqrt(double a)等7. 集合操作符:包括size(Array<T>)、sort_array(Array<T>)等8. 类型转换函数:binary(string|binary)、cast(expr as <type>)9. 日期函数:包括from_unixtime(bigint unixtime[, string format])、unix_timestamp()等10.条件函数:包括if(boolean testCondition, T valueTrue, T valueFalseOrNull)等11. 字符串函数:包括acat(string|binary A, string|binary B...)等12. 其他:xpath、get_json_objectscii(string str)、con 编写Hive UDF有两种方式: 1. extends UDF ,重写evaluate方法 2. extends GenericUDF,重写initialize、getDisplayString、evaluate方法 编写UDF代码实例(更多例子参考https://svn.apache.org/repos/asf/hive/tags/release-0.8.1/ql/src/java/org/apache/hadoop/hive/ql/udf/):功能:大小转小写ToLowerCase.java: [plain]view plaincopy packagetest.udf; importorg.apache.hadoop.hive.ql.exec.UDF; importorg.apache.hadoop.io.Text; publicclassToLowerCaseextendsUDF{ publicTextevaluate(finalTexts){ if(s==null){returnnull;} returnnewText(s.toString().toLowerCase()); } } 功能:计算array中去重后元素个数 UDFArrayUniqElementNumber.java [java]view plaincopy packagetest.udf; importorg.apache.hadoop.hive.ql.exec.Description; importorg.apache.hadoop.hive.ql.exec.UDFArgumentException; importorg.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; importorg.apache.hadoop.hive.ql.metadata.HiveException; importorg.apache.hadoop.hive.ql.udf.generic.GenericUDF; importorg.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; importorg.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; importorg.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; importorg.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; importorg.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; importorg.apache.hadoop.io.IntWritable; /** *UDF: *Getnubmerofobjectswithduplicateelementseliminated *@authorxiaomin.zhou */ @Description(name="array_uniq_element_number",value="_FUNC_(array)-Returnsnubmerofobjectswithduplicateelementseliminated.",extended="Example:\n" +">SELECT_FUNC_(array(1,2,2,3,3))FROMsrcLIMIT1;\n"+"3") publicclassUDFArrayUniqElementNumberextendsGenericUDF{ privatestaticfinalintARRAY_IDX=0; privatestaticfinalintARG_COUNT=1;//NumberofargumentstothisUDF privatestaticfinalStringFUNC_NAME="ARRAY_UNIQ_ELEMENT_NUMBER";//ExternalName privateListObjectInspectorarrayOI; privateObjectInspectorarrayElementOI; privatefinalIntWritableresult=newIntWritable(-1); publicObjectInspectorinitialize(ObjectInspector[]arguments) throwsUDFArgumentException{ //Checkiftwoargumentswerepassed if(arguments.length!=ARG_COUNT){ thrownewUDFArgumentException("Thefunction"+FUNC_NAME +"accepts"+ARG_COUNT+"arguments."); } //CheckifARRAY_IDXargumentisofcategoryLIST if(!arguments[ARRAY_IDX].getCategory().equals(Category.LIST)){ thrownewUDFArgumentTypeException(ARRAY_IDX,"\"" +org.apache.hadoop.hive.serde.Constants.LIST_TYPE_NAME +"\""+"expectedatfunctionARRAY_CONTAINS,but" +"\""+arguments[ARRAY_IDX].getTypeName()+"\"" +"isfound"); } arrayOI=(ListObjectInspector)arguments[ARRAY_IDX]; arrayElementOI=arrayOI.getListElementObjectInspector(); returnPrimitiveObjectInspectorFactory.writableIntObjectInspector; } publicIntWritableevaluate(DeferredObject[]arguments) throwsHiveException{ result.set(0); Objectarray=arguments[ARRAY_IDX].get(); intarrayLength=arrayOI.getListLength(array); if(arrayLength<=1){ result.set(arrayLength); returnresult; } //elementcompare;Algorithmcomplexity:O(N^2) intnum=1; inti,j; for(i=1;i<arrayLength;i++) { ObjectlistElement=arrayOI.getListElement(array,i); for(j=i-1;j>=0;j--) { if(listElement!=null){ Objecttmp=arrayOI.getListElement(array,j); if(ObjectInspectorUtils.compare(tmp,arrayElementOI,listElement, arrayElementOI)==0){ break; } } } if(-1==j) { num++; } } result.set(num); returnresult; } publicStringgetDisplayString(String[]children){ assert(children.length==ARG_COUNT); return"array_uniq_element_number("+children[ARRAY_IDX]+")"; } } 生成udf.jarhive有三种方法使用自定义的UDF函数 1. 临时添加UDF如下: [plain]view plaincopy hive>select*fromtest; OK Hello wORLD ZXM ljz Timetaken:13.76seconds hive>addjar/home/work/udf.jar; Added/home/work/udf.jartoclasspath Addedresource:/home/work/udf.jar hive>createtemporaryfunctionmytestas'test.udf.ToLowerCase'; OK Timetaken:0.103seconds hive>showfunctions; ...... mytest ...... hive>selectmytest(test.name)fromtest; ...... OK hello world zxm ljz Timetaken:38.218seconds 这种方式在会话结束后,函数自动销毁,因此每次打开新的会话,都需要重新add jar并且create temporary function2. 进入会话前自动创建使用hive -i参数在进入hive时自动初始化 [plain]view plaincopy $cathive_init addjar/home/work/udf.jar; createtemporaryfunctionmytestas'test.udf.ToLowerCase'; $hive-ihive_init Logginginitializedusingconfigurationinfile:/home/work/hive/hive-0.8.1/conf/hive-log4j.properties Hivehistoryfile=/tmp/work/hive_job_log_work_201209200147_1951517527.txt hive>showfunctions; ...... mytest ...... hive>selectmytest(test.name)fromtest; ...... OK hello world zxm ljz 方法2和方法1本质上是相同的,区别在于方法2在会话初始化时自动完成3. 自定义UDF注册为hive内置函数可参考:hive利器自定义UDF+重编译hive 和前两者相比,第三种方式直接将用户的自定义函数作为注册为内置函数,未来使用起来非常简单,但这种方式也非常危险,一旦出错,将是灾难性的,因此,建议如果不是特别通用,并且固化下来的函数,还是使用前两种方式比较靠谱。 本文转自 yntmdr 51CTO博客,原文链接:http://blog.51cto.com/yntmdr/1716940,如需转载请自行联系原作者