maxCompute的UDAF demo,实现累加功能

需求:将表中数据按照name聚合,并且count进行累加

name count
Jan 1
Jan 2
Feb 3
Feb 1
Mar 1
Mar 5

预期结果:

name count
Jan 3
Feb 7
Mar 13

使用idea的maxCompute studio新增UDAF

_
_
然后自动生成未实现的方法,我的字段name是string,count是bigint
所以@Resolve("string,bigint->bigint")

新增一个class用来存储字段

        private String  name;
        private Long count;
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(name);
            out.writeLong(count);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            name =  in.readUTF();
            count = in.readLong();
        }
    }

这样newBuffer就可以这样写了

    public Writable newBuffer() {
        return new MyBuffer();
    }

还需要一个Map来存储key(name)和value(count),一个long类型的参数存储累加的值

    Long old_count = 0L;//存储累加值
    private LongWritable ret = new LongWritable();//存储输出值

完整代码参考:

public class UDAFTest extends Aggregator {
    private static class MyBuffer implements Writable {
        private String  name;
        private Long count;
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(name);
            out.writeLong(count);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            name =  in.readUTF();
            count = in.readLong();
        }
    }



    @Override
    public Writable newBuffer() {
        return new MyBuffer();
    }
    Map<String,Long> map = new LinkedHashMap<>();
    Long old_count = 0L;
    @Override
    public void iterate(Writable buffer, Writable[] args) throws UDFException {
        String arg = String.valueOf(args[0]);
        Long cnt = Long.parseLong(String.valueOf(args[1]));
        MyBuffer buf = (MyBuffer) buffer;

        if (arg != null) {
            if(map.containsKey(arg)){
                Long newcnt = map.get(arg);
                old_count = cnt+newcnt;
                map.put(arg,old_count);
            }else {
                map.put(arg,old_count+cnt);
            }
        }
        buf.name = arg;
        buf.count = map.get(arg);

    }
    private LongWritable ret = new LongWritable();
    @Override
    public Writable terminate(Writable arg0) throws UDFException {
        MyBuffer buffer = (MyBuffer) arg0;
        ret.set(buffer.count);
        return ret;
    }

    @Override
    public void merge(Writable buffer, Writable partial) throws UDFException {
        MyBuffer buf = (MyBuffer) buffer;
        MyBuffer p = (MyBuffer) partial;
        buf.name = p.name;
        buf.count = p.count;
    }


}

然后通过maxCompute studio发布下
_

发布名为test20191119,这样就可以在Dataworks中调用了。

其中原表数据:
_

_

优秀的个人博客,低调大师

微信关注我们

转载内容版权归作者及来源网站所有!本站原创内容转载请注明来源!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

相关文章

发表评论

资源下载

更多资源
Mario,低调大师唯一一个Java游戏作品

Mario,低调大师唯一一个Java游戏作品

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

Apache Tomcat7、8、9(Java Web服务器)

Apache Tomcat7、8、9(Java Web服务器)

Tomcat是Apache 软件基金会(Apache Software Foundation)的Jakarta 项目中的一个核心项目,由Apache、Sun 和其他一些公司及个人共同开发而成。因为Tomcat 技术先进、性能稳定,而且免费,因而深受Java 爱好者的喜爱并得到了部分软件开发商的认可,成为目前比较流行的Web 应用服务器。

Eclipse(集成开发环境)

Eclipse(集成开发环境)

Eclipse 是一个开放源代码的、基于Java的可扩展开发平台。就其本身而言,它只是一个框架和一组服务,用于通过插件组件构建开发环境。幸运的是,Eclipse 附带了一个标准的插件集,包括Java开发工具(Java Development Kit,JDK)。

Java Development Kit(Java开发工具)

Java Development Kit(Java开发工具)

JDK是 Java 语言的软件开发工具包,主要用于移动设备、嵌入式设备上的java应用程序。JDK是整个java开发的核心,它包含了JAVA的运行环境(JVM+Java系统类库)和JAVA工具。