Apache Spark™ 3.0中全新的Structured Streaming UI
作者:Genmao Yu
原文链接:https://databricks.com/blog/2020/07/29/a-look-at-the-new-structured-streaming-ui-in-apache-spark-3-0.html
编译:邵嘉阳,计算机科学与技术大三在读,Apache Spark 中文社区志愿者
在Apache Spark 2.0中,我们迎来了Structured Streaming——构建分布式流处理应用的最佳平台。统一的API(SQL,Dataset和DataFrame)以及Spark内置的大量函数为开发者实现复杂的需求提供了便利,比如流的聚合,流-流连接和窗口支持。开发者们普遍喜欢通过Spark Streaming中的DStream的方式来管理他们的流,那么类似的功能什么时候能在Structured Streaming中得到实现呢?这不,在Apache Spark 3.0中,全新的Structured Streaming可视化UI和开发者们见面了。
新的Structured Streaming UI会提供一些有用的信息和统计数据,以此来监视所有流作业,便于在开发调试过程中排除故障。同时,开发者还能够获得实时的监测数据,这能使生产流程更直观。在这个新的UI中,我们会看到两组统计数据:1)流查询作业的聚合信息;2)流查询的具体统计信息,包括输入速率(Input Rate)、处理速率(Process Rate)、输入行数(Input Rows)、批处理持续时间(Batch Duration)和操作持续时间(Operation Duration)等。
流查询作业的聚合信息
开发者提交的流SQL查询会被列在Structured Streaming一栏中,包括正在运行的流查询(active)和已完成的流查询(completed)。结果表则会显示流查询的一些基本信息,包括查询名称、状态、ID、运行ID、提交时间、查询持续时间、最后一批的ID以及一些聚合信息,如平均输入速率和平均处理速率。流查询有三种状态:运行(RUNNING)、结束(FINISHED)、失败(FAILED)。所有结束(FINISHED)和失败(FAILED)的查询都在已完成的流式查询表中列出。Error列显示有关失败查询的详细信息。
我们可以通过单击Run ID链接查看流查询的详细信息。
详细的统计信息
Statistics页面显示了包括输入速率、处理速率、延迟和详细的操作持续时间在内的一系列指标。通过图表,开发者能全面了解已提交的流查询的状态,并且轻松地调试查询处理中的异常情况。
它包含以下指标:
- Input Rate:数据到达的聚合速率(跨所有源)。
- Process Rate: Spark处理数据的聚合速率(跨所有源)。
- Batch Duration: 每一批的处理时间。
- Operation Duration: 执行各种操作所花费的时间(以毫秒为单位)。
被追踪的操作罗列如下: - addBatch:从源读取微批的输入数据、对其进行处理并将批的输出写入接收器所花费的时间。这应该会占用微批处理的大部分时间。
- getBatch:准备逻辑查询以从源读取当前微批的输入所花费的时间。
- getOffset:查询源是否有新的输入数据所花费的时间。
- walCommit:将偏移量写入元数据日志。
- queryPlanning:生成执行计划。
需要注意的是,由于数据源的类型不同,一个查询可能不会包含以上列出的所有操作。
使用UI解决流的性能故障
在这一部分中,我们会看到新的UI是怎样实时、直观地显示查询执行过程中的异常情况的。我们会在每个例子中预先假设一些条件,样例查询看起来是这样的:
import java.util.UUID val bootstrapServers = ... val topics = ... val checkpointLocation = "/tmp/temporary-" + UUID.randomUUID.toString val lines = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("subscribe", topics) .load() .selectExpr("CAST(value AS STRING)") .as[String] val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count() val query = wordCounts.writeStream .outputMode("complete") .format("console") .option("checkpointLocation", checkpointLocation) .start()
由于处理能力不足而增加延迟
在第一种情况下,我们希望尽快处理Apache Kafka数据。在每一批中,流作业将处理Kafka中所有可用的数据。如果处理能力不足以处理批数据,那么延迟将迅速增加。最直观的现象是Input Rows和Batch Duration会呈线性上升。Process Rate提示流作业每秒最多只能处理大约8000条记录,但是当前的输入速率是每秒大约20000条记录。产生问题的原因一目了然,那么我们可以为流作业提供更多的执行资源,或者添加足够的分区来处理与生产者匹配所需的所有消费者。
稳定但高延迟
第二种情况下,延迟并没有持续增加,而是保持稳定,如下截图所示:
我们发现在相同的Input Rate下,Process Rate可以保持稳定。这意味着作业的处理能力足以处理输入数据。然而,每批的延迟仍然高达20秒。这里,高延迟的主要原因是每个批中有太多数据,那么我们可以通过增加这个作业的并行度来减少延迟。在为Spark任务添加了10个Kafka分区和10个内核之后,我们发现延迟大约为5秒——比20秒要好得多。
使用操作持续时间图进行故障排除
操作持续时间图(Operation Duration Chart)显示了执行各种操作所花费的时间(以毫秒为单位)。这对于了解每个批处理的时间分布和故障排除非常有用。让我们以Apache Spark社区中的性能改进“Spark-30915:在查找最新批处理ID时避免读取元数据日志文件“为例。
在某次查询中我们发现,当压缩后的元数据日志很大时,下一批要花费比其他批更多的时间来处理。
在进行代码审查之后,我们发现这是由对压缩日志文件的不必要读取造成的并进行了修复。新的操作持续时间图确认了我们想法:
未来的开发方向
如上所示,新的Structured Streaming UI将通过提供更有用的流查询信息帮助开发者更好地监视他们的流作业。作为早期发布版本,新的UI仍在开发中,并将在未来的发布中得到改进。有几个未来可以实现的功能,包括但不限于:
- 更多的流查询执行细节:延迟数据,水印,状态数据指标等等。
- 在Spark历史服务器中支持Structured Streaming UI。
- 对于不寻常的情况有更明显的提示:发生延迟等。
近期活动:
8月24日开始 Spark 实战训练营正式开课
免费报名链接:https://developer.aliyun.com/learning/trainingcamp/spark/2

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Elasticsearch 场景化检索及全观测运维解决方案
本课程讲师 : 沐泽 — 阿里巴巴 Elasticsearch 产品经理如果你想收看完整视频,请点击《飞天大数据产品价值解读 — 全文检索引擎 Elasticsearch》 一、关于 Elasticsearch Elasticsearch是一个开源的信息检索、分析引擎,它能够支持全文检索,结构化搜索和数据分析。Elasticsearch在业内是非常主流和热门的一个搜索引擎,在整个DB-Engine热门指数排行上面是全球热度第七的数据库,在检索方面更是全球热度第一的检索引擎。它应用的场景,如下图所示,包括各类信息查询,比如订单查询,以及地理位置查询,以及日志数据的检索和分析,数据分析和可视化。 整个Elastic Stack开源产品生态矩阵包括Beats、Logstash、Elasticsearch和Kibana这几部分,也是大家通常所知道的ELK,各个部分的能力概括如下图所示。整个这样的一套开源技术产品在开源领域,已经有很多开发者都会在自己的业务中使用。而这些都在阿里云上提供开箱即用的全托管的云服务。 阿里云Elasticsearch提供全托管Elastic Stack服务,100%兼...
- 下一篇
Elastic Stack 实现地理空间数据采集与可视化分析
随着人类在不断地探索空间,地理空间数据越来越多。 收集信息的速度以及提供位置信息的来源正在迅速增长。政府和商业卫星继续扩张。与GPS一起,它们提供了一系列不同的空间丰富的数据源,包括天气和温度模式,土地使用,土壤化学,减灾和响应,电信等。 移动设备和底层网络将人员,汽车,卡车和大量踏板车变成了位置信息的来源。计算机网络将位置信息嵌入IP地址元数据中,这可以帮助IT管理员在分布式基础架构中为用户提供支持,或者帮助执法部门以及我们的网络运营商找到并阻止坏人。所有这些数据都是令人兴奋的,它激发了每个人内部的创造力来利用它。提出新的问题,构思新的想法,并建立新的期望。这些新事物不容易解决。他们需要以不同格式存储的数据或跨非空间维度(如主题标签或网络域)的相关性存储的数据。尽管专家长期以来拥有执行复杂的地理空间分析的工具,但这些工具并不总是能够完成非传统来源的混合或处理当今数据集规模的任务。现在事情变得更加复杂。Elastic Stack 是一个高效的存储,分析及搜索软件栈。Elastic Stack 正在积极地推动这一进程。为我们提供更多的数据,更多的用途和更多的利益。 准备工作: 我将使用 ...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- Docker安装Oracle12C,快速搭建Oracle学习环境
- 设置Eclipse缩进为4个空格,增强代码规范
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS关闭SELinux安全模块
- CentOS7设置SWAP分区,小内存服务器的救世主
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Linux系统CentOS6、CentOS7手动修改IP地址