您现在的位置是:首页 > 文章详情

Struct复杂数据类型的UDF编写、GenericUDF编写

日期:2019-12-19点击:249

一、背景介绍:
MaxCompute 2.0版本升级后,Java UDF支持的数据类型从原来的BIGINT、STRING、DOUBLE、BOOLEAN扩展了更多基本的数据类型,同时还扩展支持了ARRAY、MAP、STRUCT等复杂类型,以及Writable参数。Java UDF使用复杂数据类型的方法,STRUCT对应com.aliyun.odps.data.Struct。com.aliyun.odps.data.Struct从反射看不出Field Name和Field Type,所以需要用@Resolve注解来辅助。即如果需要在UDF中使用STRUCT,要求在UDF Class上也标注上@Resolve注解。但是当我们Struct类型中的field有很多字段的时候,这个时候需要我们去手动的添加@Resolve注解就不是那么的友好。针对这一个问题,我们可以使用Hive 中的GenericUDF去实现。MaxCompute 2.0支持Hive风格的UDF,部分Hive UDF、UDTF可以直接在MaxCompute上使用。
二、复杂数据类型UDF示例
示例定义了一个有三个复杂数据类型的UDF,其中第一个用ARRAY作为参数,第二个用MAP作为参数,第三个用STRUCT作为参数。由于第三个Overloads用了STRUCT作为参数或者返回值,因此要求必须对UDF Class添加@Resolve注解,指定STRUCT的具体类型。
1.代码编写

@Resolve("struct<a:bigint>,string->string") public class UdfArray extends UDF { public String evaluate(List<String> vals, Long len) { return vals.get(len.intValue()); } public String evaluate(Map<String,String> map, String key) { return map.get(key); } public String evaluate(Struct struct, String key) { return struct.getFieldValue("a") + key; } }

2.打jar包添加资源

add jar UdfArray.jar 

3.创建函数

create function my_index as 'UdfArray' using 'UdfArray.jar';

4.使用UDF函数

select id, my_index(array('red', 'yellow', 'green'), colorOrdinal) as color_name from colors;

三、使用Hive的GenericUDF
这里我们使用Struct复杂数据类型作为示例,主要处理的逻辑是当我们结构体中两个字段前后没有差异时不返回,如果前后有差异将新的字段及其值组成新的结构体返回。示例中Struct的Field为3个。使用GenericUDF方式可以解决需要手动添加@Resolve注解。
1.创建一个MaxCompute表

CREATE TABLE IF NOT EXISTS `tmp_ab_struct_type_1` ( `a1` struct<a:STRING,b:STRING,c:string>, `b1` struct<a:STRING,b:STRING,c:string> ); 

2.表中数据结构如下

insert into table tmp_ab_struct_type_1 SELECT named_struct('a',1,'b',3,'c','2019-12-17 16:27:00'), named_struct('a',5,'b',6,'c','2019-12-18 16:30:00'); 

查询数据如下所示:

1576811346298_FEB20147-DD74-4a10-8D6E-780D91DCBC93.png

3.编写GenericUDF处理逻辑
(1)QSC_DEMOO类

package com.aliyun.udf.struct; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import java.util.ArrayList; import java.util.List; /** * Created by ljw on 2019-12-17 * Description: */ @SuppressWarnings("Duplicates") public class QSC_DEMOO extends GenericUDF { StructObjectInspector soi1; StructObjectInspector soi2; /** * 避免频繁Struct对象 */ private PubSimpleStruct resultStruct = new PubSimpleStruct(); private List<? extends StructField> allStructFieldRefs; //1. 这个方法只调用一次,并且在evaluate()方法之前调用。该方法接受的参数是一个arguments数组。该方法检查接受正确的参数类型和参数个数。 //2. 输出类型的定义 @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { String error = ""; //检验参数个数是否正确 if (arguments.length != 2) { throw new UDFArgumentException("需要两个参数"); } //判断参数类型是否正确-struct ObjectInspector.Category arg1 = arguments[0].getCategory(); ObjectInspector.Category arg2 = arguments[1].getCategory(); if (!(arg1.equals(ObjectInspector.Category.STRUCT))) { error += arguments[0].getClass().getSimpleName(); throw new UDFArgumentTypeException(0, "\"array\" expected at function STRUCT_CONTAINS, but \"" + arg1.name() + "\" " + "is found" + "\n" + error); } if (!(arg2.equals(ObjectInspector.Category.STRUCT))) { error += arguments[1].getClass().getSimpleName(); throw new UDFArgumentTypeException(0, "\"array\" expected at function STRUCT_CONTAINS, but \"" + arg2.name() + "\" " + "is found" + "\n" + error); } //输出结构体定义 ArrayList<String> structFieldNames = new ArrayList(); ArrayList<ObjectInspector> structFieldObjectInspectors = new ArrayList(); soi1 = (StructObjectInspector) arguments[0]; soi2 = (StructObjectInspector) arguments[1]; StructObjectInspector toValid = null; if (soi1 == null) toValid = soi2; else toValid = soi1; //设置返回类型 allStructFieldRefs = toValid.getAllStructFieldRefs(); for (StructField structField : allStructFieldRefs) { structFieldNames.add(structField.getFieldName()); structFieldObjectInspectors.add(structField.getFieldObjectInspector()); } return ObjectInspectorFactory.getStandardStructObjectInspector(structFieldNames, structFieldObjectInspectors); } //这个方法类似UDF的evaluate()方法。它处理真实的参数,并返回最终结果。 @Override public Object evaluate(DeferredObject[] deferredObjects) throws HiveException { //将hive中的struct类型转换成com.aliyun.odps.data.Struct, 如果有错误,请调试,查看deferredObjects的数据是什么样子的 //然后自己进行重新封装 !!! ArrayList list1 = (ArrayList) deferredObjects[0].get(); ArrayList list2 = (ArrayList) deferredObjects[1].get(); int len = list1.size(); ArrayList fieldNames = new ArrayList<>(); ArrayList fieldValues = new ArrayList<>(); for (int i = 0; i < len ; i++) { if (!list1.get(i).equals(list2.get(i))) { fieldNames.add(allStructFieldRefs.get(i).getFieldName()); fieldValues.add(list2.get(i)); } } if (fieldValues.size() == 0) return null; return fieldValues; } //这个方法用于当实现的GenericUDF出错的时候,打印出提示信息。而提示信息就是你实现该方法最后返回的字符串。 @Override public String getDisplayString(String[] strings) { return "Usage:" + this.getClass().getName() + "(" + strings[0] + ")"; } }

(2)PubSimpleStruct类

package com.aliyun.udf.struct; import com.aliyun.odps.data.Struct; import com.aliyun.odps.type.StructTypeInfo; import com.aliyun.odps.type.TypeInfo; import java.util.List; public class PubSimpleStruct implements Struct { private StructTypeInfo typeInfo; private List<Object> fieldValues; public StructTypeInfo getTypeInfo() { return typeInfo; } public void setTypeInfo(StructTypeInfo typeInfo) { this.typeInfo = typeInfo; } public void setFieldValues(List<Object> fieldValues) { this.fieldValues = fieldValues; } public int getFieldCount() { return fieldValues.size(); } public String getFieldName(int index) { return typeInfo.getFieldNames().get(index); } public TypeInfo getFieldTypeInfo(int index) { return typeInfo.getFieldTypeInfos().get(index); } public Object getFieldValue(int index) { return fieldValues.get(index); } public TypeInfo getFieldTypeInfo(String fieldName) { for (int i = 0; i < typeInfo.getFieldCount(); ++i) { if (typeInfo.getFieldNames().get(i).equalsIgnoreCase(fieldName)) { return typeInfo.getFieldTypeInfos().get(i); } } return null; } public Object getFieldValue(String fieldName) { for (int i = 0; i < typeInfo.getFieldCount(); ++i) { if (typeInfo.getFieldNames().get(i).equalsIgnoreCase(fieldName)) { return fieldValues.get(i); } } return null; } public List<Object> getFieldValues() { return fieldValues; } @Override public String toString() { return "PubSimpleStruct{" + "typeInfo=" + typeInfo + ", fieldValues=" + fieldValues + '}'; } } 

3、打jar包,添加资源

add jar test.jar; 

4、创建函数

CREATE FUNCTION UDF_DEMO as 'com.aliyun.udf.test.UDF_DEMOO' using 'test.jar'; 

5、测试使用UDF函数

set odps.sql.hive.compatible=true; select UDF_DEMO(a1,b1) from tmp_ab_struct_type_1;

查询结果如下所示:

1576811361785_5BC15482-A394-4353-9E17-D6A53AB54960.png


注意:
(1)在使用兼容的Hive UDF的时候,需要在SQL前加set odps.sql.hive.compatible=true;语句,set语句和SQL语句一起提交执行。

(2)目前支持兼容的Hive版本为2.1.0,对应Hadoop版本为2.7.2。如果UDF是在其他版本的Hive/Hadoop开发的,则可能需要使用此Hive/Hadoop版本重新编译。
有疑问可以咨询阿里云MaxCompute技术支持:刘建伟

 <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>2.1.0</version> </dependency> 

欢迎加入“MaxCompute开发者社区2群”,点击链接申请加入或扫描二维码
https://h5.dingtalk.com/invite-page/index.html?bizSource=____source____&corpId=dingb682fb31ec15e09f35c2f4657eb6378f&inviterUid=E3F28CD2308408A8&encodeDeptId=0054DC2B53AFE745
6766293bc74543c99e7c493dc15cd39b.png

原文链接:https://yq.aliyun.com/articles/740002
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章