FlinkSQL 之 temporary join 开发
关于实时flinkSQL的双流join的背景知识可以先阅读以下文章:
https://www.51cto.com/article/713922.html
目前我们有一条流量日志明细的TT流A,以及一条商品标签的TT流B,在flink中对A流和B流进行双流join类似于将A流关联一个hbase维表。temporary join有以下特点:
1. 单流驱动:虽然是双流join,但数据下发只由一条流驱动。
2. 需要定义versioned table,versioned table记录了每个时刻的属性信息,双流join时被动查询。类似于银行汇率表,在货币兑换的时候需要参考兑换时刻的汇率。
3. 查询携带时间版本信息:temporary join携带由两条流的watermark触发,因此查询到的属性是对应时间内的属性。
![]()
应用场景&实例分享
当需要根据实时汇率*货币金额计算总金额,实时商品价格*成交件数计算总成交金额时,经常会使用temporary join获取实时的汇率和价格信息。在笔者的流量升级业务迭代中,我们需要获取实时的商品标签,因此需要定义商品标签的versioned table,写法如下:
CREATE TEMPORARY TABLE `tag_ri` (
`id` VARCHAR,
`tag` VARCHAR,
`time` VARCHAR,
`ts` AS `TO_TIMESTAMP`(`time`, 'yyyy-MM-dd HH:mm:ss'),
WATERMARK FOR `ts` AS `withOffset`(`ts`, 0) --定义watermark
) WITH (
'connector' = 'tt',
'router' = '******',
'topic' = 'tag_ri',
'lineDelimiter' = '\n',
'fieldDelimiter' = '\u0001',
'encoding' = 'utf-8'
);
--定义version table
CREATE TEMPORARY VIEW `tag`
AS
SELECT `id`
, `tag`
, `time`
, `ts`
FROM ( SELECT `id`
, `tag`
, `time`
, `ts`
, ROW_NUMBER() OVER (PARTITION BY `id` --关联主键
ORDER BY `time` DESC) AS `rownum`
FROM `tag_ri`
)
WHERE `rownum` = 1;
同上我们也需要定义流量日志明细流的watermark,并进行双流join
CREATE TEMPORARY TABLE `log_ri` (
`id` VARCHAR,
`time` VARCHAR,
......
`ts` AS `TO_TIMESTAMP`(`time`, 'yyyy-MM-dd HH:mm:ss'),
WATERMARK FOR `ts` AS `withOffset`(`ts`, 0)
) WITH (
'connector' = 'tt',
'router' = '******',
'topic' = 'log_ri',
'lineDelimiter' = '\n',
'fieldDelimiter' = '\u0001',
'encoding' = 'utf-8',
);
select `a`.`id`
,......
,`b`.`tag`
from (
SELECT *
FROM `log_ri`
) AS `a`
LEFT JOIN `tag` FOR SYSTEM_TIME AS OF `a`.`ts` AS `b` ON `a`.`id` = `b`.`id`
结果如下:
--商品标签信息
12:00> SELECT * FROM tag_ri;
id tag(商品标签)
======= =======================
t1 A
12:30> SELECT * FROM tag_ri;
id tag(商品标签)
======= =======================
t1 B
--流量明细日志查询 t1商品共三条明细
SELECT * FROM log_ri;
id time
======= ========
t1 12:00
t1 12:15
t1 12:30
--执行temporary join
select `a`.`id`
,`a`.`time`
,`b`.`tag`
from (
SELECT *
FROM `log_ri`
) AS `a`
LEFT JOIN `tag` FOR SYSTEM_TIME AS OF `a`.`ts` AS `b` ON `a`.`id` = `b`.`id`
id time tag(商品标签)
======= ======== =======================
t1 12:00 A
t1 12:15 A
t1 12:30 B
![]()
开发经验
▐ 稀疏数据处理
由于temporary join是由两条流的watermark触发,如果versioned table是一条稀疏的流(在一段时间内无数据流入),那么join可能存在等待不下发数据的现象,可以通过设置参数 set table.exec.source.idle-timeout = 10s ,可以让A流数据不进行等待,具体参数介绍可以参考:
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/config/#table-exec-source-idle-timeout
▐ 数据延迟下发
-
问题
在实际开发中,我们发现temporay join后数据一直等待不下发,整点才会进行下发的现象。
-
原因分析
我们结合SQL语法,对TT日志进行回流分析:代码逻辑是四路source union后, join 定义的versioned table
select a.*
,b.tag
from
(
select * from source_1
union all
select * from source_2
union all
select * from source_3
union all
select * from source_4
) a
temporay join
b流
-
解法
总结
-
https://www.51cto.com/article/713922.html
-
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/config/#table-exec-source-idle-timeout
团队介绍
我们是淘天集团-业务技术-商家数据团队,专注于开发和维护生意参谋这一全渠道、全链路、一站式的数据平台,同时也负责品牌数据银行和策略中心两大产品。旨在为商家提供全面的数据服务,包括但不限于经营分析、市场洞察、客群洞察等,以帮助商家提高商业决策效率。
本文分享自微信公众号 - 大淘宝技术(AlibabaMTT)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
云原生周刊:K8s 未来三大发展方向
开源项目推荐 Beszel 轻量级高颜值的 Docker 监控平台。这是一个轻量级的服务器监控平台,包括 Docker 统计、历史数据和警报功能。它拥有友好的 Web 界面,配置简单、开箱即用,支持自动备份、多用户、OAuth 认证和 API 访问等功能。 Karate 开源的 API 自动测试框架。这是一款基于 Java 的 API 测试框架,可与 Spring Boot、Maven 等 Java 生态系统无缝集成。它整合了 API 测试自动化、模拟、性能测试和 UI 自动化等功能,支持使用类似 Cucumber 的语法编写测试用例,并提供了一个跨平台的可执行文件,即使对 Java 不熟悉也能轻松上手。 Kubernetes Goat Kubernetes 安全攻防演练平台。该项目是用于构建漏洞百出、易受攻击的集群环境,让开发者可以在真实场景中学习 K8s 攻击和防御技巧。 Envd 高效的 AI 开发环境搭建工具。这是一个为 AI/ML 项目提供可复现开发环境的命令行工具。只需简单的配置语言和命令,即可快速创建基于容器的开发环境,支持远程构建、依赖缓存和导入远程仓库等功能。 文章推...
- 下一篇
一次性奖励 100 万元!武汉发布全国首个开源体系建设方案
10月25日,武汉印发《关于促进武汉市开源体系建设的实施方案》(以下简称《实施方案》)。据了解,这也是全国城市中首个公开发布的体系化开源建设方案。 开源,早期是指将软件的源代码、相关文档等开放共享,使得任何人可按照特定规则对源代码进行修改、完善和使用。源代码对于程序、软件、游戏等,好比是盖房子的施工图、做菜的菜谱。“菜谱”一经公布供自由使用、学习、修改、分享,创造出更多新风味、新菜品,就是“开源”。 开源模式已成为全球软件技术和产业创新的主导模式,覆盖软件开发的全域场景,全球97%的软件开发者和99%的企业使用开源软件。 为加速融入基于全球开发者众研众用众创的开源生态,我国“十四五”规划首次将“开源生态”作为国家战略重点。此次武汉发布的《实施方案》,也是我国系统布局“十四五”开源生态发展后首个区域方案。 据《实施方案》,武汉将实施开源技术筑基工程、开源项目众创工程、开源企业培育工程、开源平台引源工程、开源数据开放工程、开源人才聚智工程。武汉提出目标,力争到2027年,突破10项以上开源软件技术,孵化300个以上开源项目,聚集200家以上开源创新企业,建设10个以上高水平开源社区和开源公...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS8安装Docker,最新的服务器搭配容器使用
- Linux系统CentOS6、CentOS7手动修改IP地址
- 2048小游戏-低调大师作品
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- CentOS8编译安装MySQL8.0.19
- CentOS6,CentOS7官方镜像安装Oracle11G
- CentOS7,8上快速安装Gitea,搭建Git服务器
- SpringBoot2整合Thymeleaf,官方推荐html解决方案