Tablestore结合Spark的流批一体SQL实战
背景介绍
电子商务模式是指在网络环境和大数据环境下基于一定技术基础的商务运作方式和盈利模式,对于数据的分析和可视化是电商运营中最重要的部分之一,而电商大屏提供了数据分析和可视化的完美结合。电商大屏包含有全量订单和实时订单的聚合,全量订单的聚合提供的是全景的综合数据视图,而实时订单的聚合展示的是实时的运营指标数据。本文将通过结合Tablestore和Spark的流批一体存储和计算,来自建电商大屏完成电商数据的分析和可视化,其效果图如下。
架构设计
在本次的电商大屏实战中,客户端会实时向Tablestore插入原始订单数据,实时流计算会通过Spark Structured Streaming实时统计一个窗口周期时间内的订单数和订单金额统计,并将聚合结果写回Tablestore,最终在DataV大屏上进行展示,而离线批计算通过Spark SQL进行原始订单数据的总金额和用户维度总金额的离线聚合,聚合结果也会写回Tablestore, 并最终在DataV大屏上进行展示,整个场景的架构图如下图所示。
准备工作
- 创建阿里云E-MapReduce的Hadoop集群,文档参见创建集群。
- 下载E-MapReduce的最新SDK包,包名的格式为
emr-datasources_shaded_*.jar
,里面会包含有Tablestore相关的Spark批流Source和Sink。
数据源说明
数据源是一张简单的原始订单表OrderSource,表有两个主键UserId(用户ID)和OrderId(订单ID)和两个属性列price(价格)和timestamp(订单时间),数据示例如下图所示。
批流SQL流程详解
创建数据源表
1.登陆EMR Header机器,执行以下命令,启动sql客户端,该客户端用于批流SQL计算,其中emr-datasources_shaded_*.jar为准备工作中下载的EMR最新版的SDK包。
streaming-sql --driver-class-path emr-datasources_shaded_*.jar --jars emr-datasources_shaded_*.jar --master yarn-client --num-executors 8 --executor-memory 2g --executor-cores 2
2.创建原始订单数据表(Source表)的外表order_source,该外表将用于后续的流批SQL执行。
DROP TABLE IF EXISTS order_source; CREATE TABLE order_source USING tablestore OPTIONS( endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com", access.key.id="", access.key.secret="", instance.name="vehicle-test", table.name="OrderSource", tunnel.id="2b7bbf3d-d6c4-4cea-89fe-71998bccaf19", catalog='{"columns": {"UserId": {"col": "UserId", "type": "string"}, "OrderId": {"col": "OrderId", "type": "string"},"price": {"cols": "price", "type": "double"}, "timestamp": {"cols": "timestamp", "type": "long"}}}' );
参数说明:
参数名 | 解释 |
---|---|
endpoint | 表格存储实例的访问地址 |
access.key.id | 阿里云账号AK ID |
access.key.secret | 阿里云账号AK Secret |
instance.name | 表格存储实例名 |
table.name | 表格存储表名 |
tunnel.id | 表格存储的增量通道ID, 该参数用于实时的增量SQL, 批量SQL时非必须。 |
catalog | 表的字段Schema定义,上述示例中对应的四个列为UserId(主键), OrderId(主键), price, timestamp,数据类型分别为string, string, double, long。 |
实时流计算
实时流计算将实时统计一个窗口周期时间内的订单数和订单金额统计,并将聚合结果写回Tablestore。首先创建流计算的Sink外表order_stream_sink(对应Tablestore表OrderStreamSink),然后运行流计算SQL进行实时聚合,最后将聚合结果实时写回Tablestore目的表中。
Sink表的各参数含义和Source表一致,其中catalog字段的内容有所不同,对应的Sink表中有四个字段,begin(开始时间,主键列,格式为2019-11-27 14:54:00),end(结束时间,主键列),count(订单数),totalPrice(订单总金额)。
// 创建Sink表order_stream_sink对应Tablestore的表OrderStreamSink(主键为begin和end两列) DROP TABLE IF EXISTS order_stream_sink; CREATE TABLE order_stream_sink USING tablestore OPTIONS( endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com", access.key.id="", access.key.secret="", instance.name="vehicle-test", table.name="OrderStreamSink", catalog='{"columns": {"begin": {"col": "begin", "type": "string"},"end": {"col": "end", "type": "string"}, "count": {"col": "count", "type": "long"}, "totalPrice": {"col": "totalPrice", "type": "double"}}}' ); // 在order_source表上创建视图order_source_stream_view CREATE SCAN order_source_stream_view ON order_source USING STREAM OPTIONS ("maxoffsetsperchannel"="10000"); // 在视图order_source_stream_view上运行STREAM SQL作业,以下样例会按30s粒度进行订单数和订单金额的聚合, // 聚合结果将写回Tablestore表OrderStreamSink。 CREATE STREAM job1 options( checkpointLocation='/tmp/spark/cp/job1', outputMode='update' ) INSERT INTO order_stream_sink SELECT CAST(window.start AS String) AS begin, CAST(window.end AS String) AS end, count(*) AS count, CAST(sum(price) AS Double) AS totalPrice FROM order_source_stream_view GROUP BY window(to_timestamp(timestamp / 1000), "30 seconds");
在运行Stream SQL后,可以实时得到聚合结果,聚合结果样例如下图所示,聚合结果存放在OrderStreamSink表中,通过Tablestore和DataV的直连功能,可以很容易的将结果绘制在DataV的大屏上。
离线批计算
离线批计算将进行原始订单数据的总金额和用户维度总金额的离线聚合,首先会创建两张Sink表分别存放历史总金额和用户维度总金额的聚合数据,然后直接在源表order_source上运行批计算SQL,最后得到聚合结果。
// 批计算任务 // 用户维度结果表:OrderBatchSink(主键UserId, 属性列count,totalPrice) // 总数据维度结果表:OrderTotalSink(主键Count, 属性列totalPrice) DROP TABLE IF EXISTS order_batch_sink; CREATE TABLE order_batch_sink USING tablestore OPTIONS( endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com", access.key.id="", access.key.secret="", instance.name="vehicle-test", table.name="OrderBatchSink", tunnel.id="", catalog='{"columns": {"UserId": {"col": "UserId", "type": "string"}, "count": {"col": "count", "type": "long"}, "totalPrice": {"col": "totalPrice", "type": "double"}}}' ); DROP TABLE IF EXISTS order_totol_sink; CREATE TABLE order_total_sink USING tablestore OPTIONS( endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com", access.key.id="", access.key.secret="", instance.name="vehicle-test", table.name="OrderTotalSink", tunnel.id="", catalog='{"columns": {"count": {"col": "count", "type": "long"}, "totalPrice": {"col": "totalPrice", "type": "double"}}}' );
运行以下批计算SQL进行用户维度聚合结果的更新。
// SQL命令 INSERT INTO order_batch_sink SELECT UserId, count(*) AS count, sum(price) AS totalPrice FROM order_source GROUP BY UserId; // 实际运行 spark-sql> INSERT INTO order_batch_sink SELECT UserId, count(*) AS count, sum(price) AS totalPrice FROM order_source GROUP BY UserId; Time taken: 5.107 seconds
运行以下批计算SQL进行总数据维度结果的更新。
// SQL命令 INSERT INTO order_total_sink SELECT count(*) AS count, sum(price) AS totalPrice FROM order_source; // 实际运行 spark-sql> INSERT INTO order_total_sink SELECT count(*) AS count, sum(price) AS totalPrice FROM order_source; Time taken: 4.272 seconds
写在最后
本文通过使用一套存储(Tablestore)和一套计算(Spark)完成了批流计算的有效结合,更多有关批流一体的细节和干货可以参见Tablestore结合Spark的云上流批一体大数据架构。
对Tablestore有任何问题,随时欢迎同我们进行交流,钉钉群号:11789671(1群)、23307953(2群)。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
万字干货 | 每秒7亿次请求,阿里新一代数据库如何支撑?
作者:正研,阿里云数据库技术专家 导读 Lindorm,就是云操作系统飞天中面向大数据存储处理的重要组成部分。Lindorm是基于HBase研发的、面向大数据领域的分布式NoSQL数据库,集大规模、高吞吐、快速灵活、实时混合能力于一身,面向海量数据场景提供世界领先的高性能、可跨域、多一致、多模型的混合存储处理能力。 目前,Lindorm已经全面服务于阿里经济体中的大数据结构化、半结构化存储场景。 注:Lindorm是阿里内部HBase分支的别称,在阿里云上对外售卖的版本叫做HBase增强版,之后文中出现的HBase增强版和Lindorm都指同一个产品。 2019年以来,Lindorm已经服务了包括淘宝、天猫、蚂蚁、菜鸟、妈妈、优酷、高德、大文娱等数十个BU,在今年的双十一中,Lindorm峰值请求达到了7.5亿次每秒,天吞吐22.9万亿次,
- 下一篇
每秒7亿次请求,阿里新一代数据库如何支撑?
作者 | 正研 2019年以来,Lindorm已经服务了包括淘宝、天猫、蚂蚁、菜鸟、妈妈、优酷、高德、大文娱等数十个BU,在今年的双十一中,Lindorm峰值请求达到了7.5亿次每秒,天吞吐22.9万亿次,平均响应时间低于3ms,整体存储的数据量达到了数百PB。 这些数字的背后,凝聚了HBase&Lindorm团队多年以来的汗水和心血。Lindorm脱胎于HBase,是团队多年以来承载数百PB数据,亿级请求量,上千个业务后,在面对规模成本压力,以及HBase自身缺陷下,全面重构和引擎升级的全新产品。相比HBase,Lindorm无论是性能,功能还是可用性上,都有了巨大飞跃。本文将从功能、可用性、性能成本、服务生态等维度介绍Lindorm的核心能力与业务表现,最后分享部分我们正在进行中的一些项目。 极致优化,超强性能 Lindorm
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- Mario游戏-低调大师作品
- CentOS6,CentOS7官方镜像安装Oracle11G
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Windows10,CentOS7,CentOS8安装Nodejs环境
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS8编译安装MySQL8.0.19
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS8安装Docker,最新的服务器搭配容器使用