使用Blink CEP实现差值聚合计算
使用Blink SQL+UDAF实现差值聚合计算介绍了如何使用Blink SQL+UDAF实现实时流上的差值聚合计算,后来在与@付典就业务需求和具体实现方式进行探讨时,付典提出通过CEP实现的思路和方法。
本文介绍通过CEP实现实时流上的差值聚合计算。
感谢@付典在实现过程中的指导。笔者水平有限,若有纰漏,请批评指出。
一、客户需求
电网公司每天采集各个用户的电表数据(格式如下表),其中data_date为电表数据上报时间,cons_id为电表id,r1为电表度数,其他字段与计算逻辑无关,可忽略。为了后续演示方便,仅输入cons_id=100000002的数据。
no(string) | data_date(string) | cons_id(string) | org_no(string) | r1(double) |
---|---|---|---|---|
101 | 20190716 | 100000002 | 35401 | 13.76 |
101 | 20190717 | 100000002 | 35401 | 14.12 |
101 | 20190718 | 100000002 | 35401 | 16.59 |
101 | 20190719 | 100000002 | 35401 | 18.89 |
表1:输入数据
电网公司希望通过实时计算(Blink)对电表数据处理后,每天得到每个电表最近两天(当天和前一天)的差值数据,结果类似如下表:
cons_id(string) | data_date(string) | subDegreeR1(double) |
---|---|---|
100000002 | 20190717 | 0.36 |
100000002 | 20190718 | 2.47 |
100000002 | 20190719 | 2.3 |
表2:期望的输出数据
二、需求分析
根据业务需求以及CEP跨事件模式匹配的特性,定义两个CEP事件e1和e2,输出e2.r1-e1.r1即可得到差值。
三、CEP开发及测试结果
参考复杂事件处理(CEP)语句,CEP代码如下:
CREATE TABLE input_dh_e_mp_read_curve ( `no` VARCHAR, data_date VARCHAR, cons_id VARCHAR, org_no VARCHAR, r1 DOUBLE, ts as TO_TIMESTAMP(concat(data_date,'000000'),'yyyyMMddHHmmss') ,WATERMARK wk FOR ts as withOffset(ts, 2000) ) WITH ( type = 'datahub', endPoint = 'http://dh-cn-shanghai.aliyun-inc.com', roleArn='acs:ram::XXX:role/aliyunstreamdefaultrole', project = 'jszc_datahub', topic = 'input_dh_e_mp_read_curve' ); CREATE TABLE data_out( cons_id varchar ,data_date varchar ,subDegreeR1 DOUBLE )with( type = 'print' ); insert into data_out select cons_id, data_date, subDegreeR1 from input_dh_e_mp_read_curve MATCH_RECOGNIZE( PARTITION BY cons_id ORDER BY ts MEASURES e2.data_date as data_date, e2.r1 - e1.r1 as subDegreeR1 ONE ROW PER MATCH AFTER MATCH SKIP TO NEXT ROW PATTERN(e1 e2) DEFINE e1 as TRUE, e2 as TRUE );
由于使用了print connector,从对应的sink的taskmanager.out日志中可以查看到输出如下:
task-1> (+)100000002,20190717,0.35999999999999943 task-1> (+)100000002,20190718,2.4700000000000006
对比期望输出(表2),20190717和20190718两个窗口的数据均正确,表明业务逻辑正确,但此输出与期望输出有少许差异:
(1)20190719的数据没有输出,这是因为我们设置了watermark,测试环境下20190719之后没有数据进来触发20190719对应的窗口的结束。
四、其他说明
1、对比使用Blink SQL+UDAF实现差值聚合计算(1),我们可以看出使用CEP开发代码非常简洁,所以在跨事件处理的情况下CEP还是非常的合适。从另外一个方面讲,同样的需求有不同的实现方式,所以融会贯通Blink SQL中的各种语法,利用更合适的语法来实现业务需求,将可能大大提升工作效率和业务性能。
2、在实现本案例时,笔者发现使用CEP时有如下需要注意的地方:
(1)partiton by里的字段(如本案的cons_id),默认会带到输出里,若同时在MEASURES中定义,则可能会报类似如下错误:
(2)define及其内容必须定义,否则前端页面提示类似如下错误:
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Red Hat Enterprise Linux 8.2 发布
红帽推出了其最新的 RHEL 版本 Red Hat Enterprise Linux 8.2,专为混合云时代的互连性而构建,新功能旨在帮助组织从其他 Red Hat Enterprise Linux订阅中获得更多价值。Red Hat Enterprise Linux副总裁兼总经理 Stefanie Chiras 表示: “目前,IT 组织需要利用其已建立的软件堆栈中的现有技术做更多的事情;他们需要经常与远程或有限的 IT 团队合作,以提高运营稳定性和维护服务可用性。Red Hat Enterprise Linux 8.2 通过主动、智能的监视功能和可用于企业的容器工具提供了更多功能,使IT团队能够满足当今的关键需求,同时随时准备应对云原生的未来,支持它。 ” 同时,8.2 版提供了对主动操作和安全风险管理产品 Red Hat Insights 的更新,包括: 改进了对 IT 安全性、合规性状况和运营效率的可见性,有助于消除手动方法并提高管理大型复杂环境的效率,同时增强这些部署的安全性和合规性。 新的策略和补丁程序服务可帮助组织定义和监视重要的内部策略,并确定哪些 Red Hat 产品咨...
- 下一篇
阿里云视觉智能开放平台--人脸识别使用教程(使用本地图片)
概述 前面在博客:阿里云视觉智能开放平台--人脸识别使用教程 介绍了如何在智能视觉开放平台使用人脸识别的接口,示例主要演示了1:N人脸查找的使用流程,使用的是OSS的图片,发现很多同学对本地图片的使用疑问较多,这里以人脸属性识别API为例演示如何使用本地图片。 Step By Step 1、SDK获取,注意这里面需要使用新版SDK,实际也就是方法中带有:Advance的Request 您可以通过地址:https://mvnrepository.com/artifact/com.aliyun/SDK包名称 查看不同服务SDK的版本 如人脸识别的新版SDK:facebody20191230 https://mvnrepository.com/artifact/com.aliyun/facebody20191230 2、pom.xml <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.52&...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- Docker安装Oracle12C,快速搭建Oracle学习环境
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS6,CentOS7官方镜像安装Oracle11G
- CentOS关闭SELinux安全模块
- CentOS7设置SWAP分区,小内存服务器的救世主
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装