您现在的位置是:首页 > 文章详情

maxCompute的UDAF demo,实现累加功能

日期:2019-11-18点击:731

需求:将表中数据按照name聚合,并且count进行累加

name count
Jan 1
Jan 2
Feb 3
Feb 1
Mar 1
Mar 5

预期结果:

name count
Jan 3
Feb 7
Mar 13

使用idea的maxCompute studio新增UDAF

_
_
然后自动生成未实现的方法,我的字段name是string,count是bigint
所以@Resolve("string,bigint->bigint")

新增一个class用来存储字段

 private String name; private Long count; @Override public void write(DataOutput out) throws IOException { out.writeUTF(name); out.writeLong(count); } @Override public void readFields(DataInput in) throws IOException { name = in.readUTF(); count = in.readLong(); } }

这样newBuffer就可以这样写了

 public Writable newBuffer() { return new MyBuffer(); }

还需要一个Map来存储key(name)和value(count),一个long类型的参数存储累加的值

 Long old_count = 0L;//存储累加值 private LongWritable ret = new LongWritable();//存储输出值

完整代码参考:

public class UDAFTest extends Aggregator { private static class MyBuffer implements Writable { private String name; private Long count; @Override public void write(DataOutput out) throws IOException { out.writeUTF(name); out.writeLong(count); } @Override public void readFields(DataInput in) throws IOException { name = in.readUTF(); count = in.readLong(); } } @Override public Writable newBuffer() { return new MyBuffer(); } Map<String,Long> map = new LinkedHashMap<>(); Long old_count = 0L; @Override public void iterate(Writable buffer, Writable[] args) throws UDFException { String arg = String.valueOf(args[0]); Long cnt = Long.parseLong(String.valueOf(args[1])); MyBuffer buf = (MyBuffer) buffer; if (arg != null) { if(map.containsKey(arg)){ Long newcnt = map.get(arg); old_count = cnt+newcnt; map.put(arg,old_count); }else { map.put(arg,old_count+cnt); } } buf.name = arg; buf.count = map.get(arg); } private LongWritable ret = new LongWritable(); @Override public Writable terminate(Writable arg0) throws UDFException { MyBuffer buffer = (MyBuffer) arg0; ret.set(buffer.count); return ret; } @Override public void merge(Writable buffer, Writable partial) throws UDFException { MyBuffer buf = (MyBuffer) buffer; MyBuffer p = (MyBuffer) partial; buf.name = p.name; buf.count = p.count; } }

然后通过maxCompute studio发布下
_

发布名为test20191119,这样就可以在Dataworks中调用了。

其中原表数据:
_

_

原文链接:https://yq.aliyun.com/articles/727522
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章