【转载】MaxCompute full outer join改写left anti join实践
背景
ods层数据同步时经常会遇到增全量合并的模型,即T-1天增量表 + T-2全量表 = T-1全量表
。可以通过full outer join脚本来完成合并,但是数据量很大时非常消耗资源。
insert overwrite table tb_test partition(ds='${bizdate}') select case when a.id is not null then a.id esle b.id end as id ,if(a.name is not null, a.name, b.name) as name ,coalesce(a.age, b.age) as age --这3种写法一样,都是优先取delta表的字段 from ( select * from tb_test_delta where ds='${bizdate}' ) a full outer join ( select * from tb_test where ds='${bizdate-1}' ) b on a.id =b.id;
这种写法可实现新增和更新操作:
- 新增是指增量表中新出现的数据,而全量表中没有;
- 更新是指增量表和全量表中都有的数据,但优先取增量表的数据,覆盖历史表的数据。
如下图所示,R2_1是增量表当天去重后增量数据,M3是全量表前一天的数据,而J4_2_3则是full outer join的执行图。
将J4_2_3展开会发现里面将增量和全量进行了merge join,当数据量很大(1288亿条)时会产生很大的shuffle开销。此时优化方案就是将full outer join改成 union all,从而避免join shuffle。
优化模型
结论:full outer join改成hash cluster + left join +union all可以有效地降低计算成本,且有两种应用场景。先将模型进行抽象,假设有a和b两个表,a是增量表,b是全量表:
with a as ( select * from values (1,'111') ,(2,'two') ,(7,'777') as (id,name) ) --增量 ,b as ( select * from values (1,'') ,(2,'222') ,(3,'333') ,(4,'444') as (id,name) ) --全量
场景1:只合并新增数据到全量表
left anti join相当于not in,增量not in全量,过滤后只剩下完全新增的id,对全量中已有的id不修改:
--查询完全新增的id select * from a left anti join b on a.id=b.id ; --结果如下 +------------+------+ | id | name | +------------+------+ | 7 | 777 | +------------+------+
--完全新增的合并全量表 select * from a --增量表 left anti join b on a.id=b.id union all select * from b --全量表 --结果如下 +------------+------+ | id | name | +------------+------+ | 1 | | | 2 | 222 | | 3 | 333 | | 4 | 444 | | 7 | 777 | +------------+------+
场景2:合并新增数据到全量表,且更新历史数据
全量not in增量,过滤后只剩下历史的id,然后union all增量,既新增也修改
--查询历史全量数据 select * from b left anti join a on a.id=b.id; --结果如下 +------------+------+ | id | name | +------------+------+ | 3 | 333 | | 4 | 444 | +------------+------+
--合并新增数据到全量表,且更新历史数据 select * from b --全量表 left anti join a on a.id=b.id union all select * from a ; --增量表 --结果如下 +------------+------+ | id | name | +------------+------+ | 1 | 111 | | 2 | two | | 7 | 777 | | 3 | 333 | | 4 | 444 | +------------+------+
优化实践
步骤1:表属性修改
表、作业属性修改,对原来的表、作业进行属性优化,可以提升优化效果。
set odps.sql.reducer.instances=3072; --可选。默认最大1111个reducer,1111哈希桶。 alter table table_name clustered by(contact_id) sorted by(contact_id) into 3072 buckets;--必选
步骤2:按照上述模型的场景1 或者 场景2进行代码改造。
这里先给出代码改造后的资源消耗对比:
原来的full outer jion | left anti join初始化 | 原来的full outer jion | left anti join第二天以后 | |
---|---|---|---|---|
时间消耗 | 8h30min38s | 1h4min48s | 7h32min30s | 32min30s |
cpu消耗 | 29666.02 Core * Min | 65705.30 Core * Min | 31126.86 Core * Min | 30589.29 Core * Min |
mem消耗 | 109640.80 GB * Min | 133922.25 GB * Min | 114764.80 GB * Min | 65509.28 GB * Min |
可以发现hash cluster分桶操作在初始化有额外的开销,主要是按主键进行散列和排序,但是这是值得的,可一劳永逸,后续的读取速度非常快。以前每天跑需要8小时,现在除了分桶初始化需要1小时,以后每天实际只需要30分钟。
初始化执行图
图1:
- M2是读全量表。
-
M4是读取增量表,在场景2的模型中增量表被读取了两次,其中:
- R5_4是对主键去重(row_number)后用于后面的union all,里面包含了所有的增量数据;
- R1_4是对主键去重(row_number)后用于left anti join,里面只包含了主键。
- J3_1_2是left anti join,将它展开后看到这里还是有mergJoin,但是这只是初始化的操作,后面每天就不会有了。展开后如图2。
- R6_3_5是将增量和全量进行union all,展开后如图3。
- R7_6则是将索引信息写入元数据,如图3的MetaCollector1会在R7_6中sink。
因此:图1中除了R5_4和R1_4是去重必须的,有shuffle。还有J3_1_2和R6_3_5这两个地方有shuffle。
图2:
图3:
第二天以后的执行图
图1:
同上,图1中的R3_2和R1_2是对增量去重必要对操作,有shuffle,这里忽略。
初始化执行图的J3_1_2和R6_3_5已经被合并到了M4_1_3,将其展开后如图2。即left anti join 和 union all这两步操作在一个阶段完成了,且这个阶段是Map 任务(M4_1_3),而不是Join任务或Reduce任务。而且全量表不在单独占用一个Map任务,也被合并到了M4_1_3,因此整个过程下来没有shuffle操作,速度提升非常明显。也就是说只需要一个M4_1_3就能完成所有到操作,直接sink到表。
R5_4则是将索引信息写入元数据,如图2的MetaCollector1会在R5_4中sink。
图2:
原创:阿里菜鸟-数据 鹤方

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
运维大杀器来了,未来云上服务器或将实现无人值守
云原生时代下,企业的IT运维面临架构复杂化、业务需求多样化和运维数据海量化等挑战,如何能够实现精准告警、异常智能诊断、根因定位、异常预测和异常自动修复,已成为企业数字化转型的急迫需求。 9月26日,阿里巴巴高级技术专家滕圣波在《GOPS全球运维大会》上发表了题为《云上服务器无人值守与自助服务实战》的主题演讲,分享了阿里云弹性计算团队如何利用人工智能技术赋能运维自动化,实现云上服务器无人值守,帮助用户降低云服务器实例管理的复杂性,来保障实例服务的稳定和高效运行。本文根据滕圣波的演讲整理。 图:阿里巴巴高级技术专家滕圣波 本文内容架构:1、云上服务器为什么需要无人值守?2、阿里云无人值守的自服务实战3、无人值守背后的数据和AI 1、云上服务器为什么需要无人值守? 运维是一种服务,既包含基础设施软件服务、也包含人力服务,服务的对象是企业中使用基础设施的业务团队,而云计算IaaS是一种运维服务,服务的对象已发展为使用云服务的开发人员和运维团队。随着云计算的广泛落地,大部分企业已经上云,当前就有100万多家用户的业务运行在阿里云平台上,阿里云平台服务的用户也越来越多。 随着平台用户规模的扩大,我...
- 下一篇
基于 Flink + Hive 构建流批一体准实时数仓
基于 Hive 的离线数仓往往是企业大数据生产系统中不可缺少的一环。Hive 数仓有很高的成熟度和稳定性,但由于它是离线的,延时很大。在一些对延时要求比较高的场景,需要另外搭建基于 Flink 的实时数仓,将链路延时降低到秒级。但是一套离线数仓加一套实时数仓的架构会带来超过两倍的资源消耗,甚至导致重复开发。 想要搭建流式链路就必须得抛弃现有的 Hive 数仓吗?并不是,借助 Flink 可以实现已有的 Hive 离线数仓准实时化。本文整理自 Apache Flink Committer、阿里巴巴技术专家李劲松的分享,文章将分析当前离线数仓实时化的难点,详解 Flink 如何解决 Hive 流批一体准实时数仓的难题,实现更高效、合理的资源配置。文章大纲如下: 离线数仓实时化的难点 Flink 在流批一体的探索 构建流批一体准实时数仓应用实践 离线数仓实时化的难点 离线数仓 上图是一个典型的离线数仓,假设现在公司有一个需求,目前公司的数据量很大,需要每天出一个报表且输出到业务数据库中。首先是刚入库的业务数据,大致分为两种,一种是 MySQL 的 binlog,另外一种是业务系统中的业务打点...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7安装Docker,走上虚拟化容器引擎之路
- SpringBoot2全家桶,快速入门学习开发网站教程
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS8编译安装MySQL8.0.19
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- 设置Eclipse缩进为4个空格,增强代码规范
- SpringBoot2整合Thymeleaf,官方推荐html解决方案