使用 Jupyter Notebook 运行 Delta Lake 入门教程
本文的例子来自 Delta Lake 官方教程。因为官方教程是基于商业软件 Databricks Community Edition 构建,虽然教程中使用的软件特性都是开源 Delta Lake 版本所具备的,但是考虑到国内的网络环境,注册和使用 Databricks Community Edition 门槛较高。所以本文尝试基于开源的 Jupiter Notebook 重新构建这个教程。
准备一个环境安装 Spark 和 jupyter
本文基于 Linux 构建开发环境,同时使用的软件比如 conda、jupyter以及 pyspark 等都可以在 Windows 和 MacOS 上找到,理论上来说也完全可以在这两个系统上完成此教程。
假设系统已经安装 anaconda 或 miniconda,我们使用 conda 来构建开发环境,可以非常方便的安装 pyspark 和 jupyter notebook
conda create --name spark conda activate spark conda install pyspark conda install -c conda-forge jupyterlab
环境变量设置
我们在设置一些环境变量之后,就可以使用 pyspark 命令来创建 jupyter notebook 服务
export SPARK_HOME=$HOME/miniconda3/envs/spark/lib/python3.7/site-packages/pyspark export PYSPARK_DRIVER_PYTHON=jupyter export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
启动服务(注意这里的参数里指定了 Delta Lake 的 package,Spark 会帮忙自动下载依赖):
pyspark --packages io.delta:delta-core_2.11:0.5.0
接下去所有代码在 notebook 里运行
下载需要 parquet 文件
%%bash rm -fr /tmp/delta_demo mkdir -p /tmp/delta_demo/loans/ wget -O /tmp/delta_demo/loans/SAISEU19-loan-risks.snappy.parquet https://pages.databricks.com/rs/094-YMS-629/images/SAISEU19-loan-risks.snappy.parquet ls -al /tmp/delta_demo/loans/
Delta Lake的批流处理
在这里我们进入正题,开始介绍 Delta Lake 的批流处理能力。
首先,我们通过批处理的形式创建一张 Delta Lake 表,数据来自前面我们下载的 parquet 文件,可以和方便的把一张 parquet 表转换为 Delta Lake 表:
import os import shutil from pyspark.sql.functions import * delta_path = "/tmp/delta_demo/loans_delta" # Delete a new delta table with the parquet file if os.path.exists(delta_path): print("Deleting path " + delta_path) shutil.rmtree(delta_path) # Create a new delta table with the parquet file spark.read.format("parquet").load("/tmp/delta_demo/loans") \ .write.format("delta").save(delta_path) print("Created a Delta table at " + delta_path)
我来查一下这张表,数据量是否正确:
# Create a view on the table called loans_delta spark.read.format("delta").load(delta_path).createOrReplaceTempView("loans_delta") print("Defined view 'loans_delta'") spark.sql("select count(*) from loans_delta").show() Defined view 'loans_delta' +--------+ |count(1)| +--------+ | 14705| +--------+
接下去我们会使用Spark Streaming流式写入这张 Delta Lake 表,同时展示 Delta Lake 的 Schema enforcement 能力(本文省略了流式写 Parquet 表的演示部分,那部分指出了 parquet 文件的不足,比如无法强制指定 Schema )
import random from pyspark.sql.functions import * from pyspark.sql.types import * def random_checkpoint_dir(): return "/tmp/delta_demo/chkpt/%s" % str(random.randint(0, 10000)) # User-defined function to generate random state states = ["CA", "TX", "NY", "IA"] @udf(returnType=StringType()) def random_state(): return str(random.choice(states)) # Generate a stream of randomly generated load data and append to the delta table def generate_and_append_data_stream_fixed(table_format, table_path): stream_data = spark.readStream.format("rate").option("rowsPerSecond", 50).load() \ .withColumn("loan_id", 10000 + col("value")) \ .withColumn("funded_amnt", (rand() * 5000 + 5000).cast("integer")) \ .withColumn("paid_amnt", col("funded_amnt") - (rand() * 2000)) \ .withColumn("addr_state", random_state()) \ .select("loan_id", "funded_amnt", "paid_amnt", "addr_state") # *********** FIXED THE SCHEMA OF THE GENERATED DATA ************* query = stream_data.writeStream \ .format(table_format) \ .option("checkpointLocation", random_checkpoint_dir()) \ .trigger(processingTime="10 seconds") \ .start(table_path) return query
启动两个流式作业:
stream_query_1 = generate_and_append_data_stream_fixed(table_format = "delta", table_path = delta_path) stream_query_2 = generate_and_append_data_stream_fixed(table_format = "delta", table_path = delta_path)
因为 Delta Lake 的乐观锁机制,多个流可以同时写入一张表,并保证数据的完整性。
通过批处理的方式来查询一下当前表中的数据量,我们发现有数据被插入了:
spark.sql("select count(*) from loans_delta").show() +--------+ |count(1)| +--------+ | 17605| +--------+
接下去我们停止所有流的写入,接下去会展示 Delta Lake 的其他特性
# Function to stop all streaming queries def stop_all_streams(): # Stop all the streams print("Stopping all streams") for s in spark.streams.active: s.stop() print("Stopped all streams") print("Deleting checkpoints") shutil.rmtree("/tmp/delta_demo/chkpt/", True) print("Deleted checkpoints") stop_all_streams()
Schema evolution(Schema演化)
Delta Lake 支持Schema演化,也就是说我们可以增加或改变表字段。接下去的批处理 SQL 会新增加一些数据,同时这些数据比之前的多了一个“closed”字段。我们将新的 DF 配置参数 mergeSchema
为 true
来显示指明 Delta Lake 表 Schema 的演化:
cols = ['loan_id', 'funded_amnt', 'paid_amnt', 'addr_state', 'closed'] items = [ (1111111, 1000, 1000.0, 'TX', True), (2222222, 2000, 0.0, 'CA', False) ] loan_updates = spark.createDataFrame(items, cols) \ .withColumn("funded_amnt", col("funded_amnt").cast("int")) loan_updates.write.format("delta") \ .mode("append") \ .option("mergeSchema", "true") \ .save(delta_path)
来看一下插入新数据之后的表内容,新增加了 closed 字段,之前的老数据行这个字段默认为 null。
spark.read.format("delta").load(delta_path).show() +-------+-----------+---------+----------+------+ |loan_id|funded_amnt|paid_amnt|addr_state|closed| +-------+-----------+---------+----------+------+ | 0| 1000| 182.22| CA| null| | 1| 1000| 361.19| WA| null| | 2| 1000| 176.26| TX| null| | 3| 1000| 1000.0| OK| null| | 4| 1000| 249.98| PA| null| | 5| 1000| 408.6| CA| null| | 6| 1000| 1000.0| MD| null| | 7| 1000| 168.81| OH| null| | 8| 1000| 193.64| TX| null| | 9| 1000| 218.83| CT| null| | 10| 1000| 322.37| NJ| null| | 11| 1000| 400.61| NY| null| | 12| 1000| 1000.0| FL| null| | 13| 1000| 165.88| NJ| null| | 14| 1000| 190.6| TX| null| | 15| 1000| 1000.0| OH| null| | 16| 1000| 213.72| MI| null| | 17| 1000| 188.89| MI| null| | 18| 1000| 237.41| CA| null| | 19| 1000| 203.85| CA| null| +-------+-----------+---------+----------+------+ only showing top 20 rows
新的数据行具有 closed 字段:
spark.read.format("delta").load(delta_path).filter(col("closed") == True).show() +-------+-----------+---------+----------+------+ |loan_id|funded_amnt|paid_amnt|addr_state|closed| +-------+-----------+---------+----------+------+ |1111111| 1000| 1000.0| TX| true| +-------+-----------+---------+----------+------+
Delta Lake 表的删除操作
除了常规的插入操作,Delta Lake 还支持 update 和 delete 等功能,可以更新表格内容。下面展示删除操作,我们希望删除表格中贷款已经被完全还清的记录。下面几条命令可以简单和清晰的展示删除过程。
首先,我们看看符合条件的记录有多少条:
spark.sql("SELECT COUNT(*) FROM loans_delta WHERE funded_amnt = paid_amnt").show() +--------+ |count(1)| +--------+ | 5134| +--------+
然后,我们执行一个 delete 命令:
from delta.tables import * deltaTable = DeltaTable.forPath(spark, delta_path) deltaTable.delete("funded_amnt = paid_amnt")
最后,我们看一下删除后的结果,发现符合条件的记录都已被删除:
spark.sql("SELECT COUNT(*) FROM loans_delta WHERE funded_amnt = paid_amnt").show() +--------+ |count(1)| +--------+ | 0| +--------+
版本历史和回溯
Delta Lake 还具有很强大历史版本记录和回溯功能。history()
方法清晰的展示了刚才那张表的修改记录,包括最后一次 Delete 操作。
deltaTable.history().show() +-------+-------------------+------+--------+----------------+--------------------+----+--------+---------+-----------+--------------+-------------+ |version| timestamp|userId|userName| operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend| +-------+-------------------+------+--------+----------------+--------------------+----+--------+---------+-----------+--------------+-------------+ | 10|2020-02-22 22:14:06| null| null| DELETE|[predicate -> ["(...|null| null| null| 9| null| false| | 9|2020-02-22 22:13:57| null| null| WRITE|[mode -> Append, ...|null| null| null| 8| null| true| | 8|2020-02-22 22:13:52| null| null|STREAMING UPDATE|[outputMode -> Ap...|null| null| null| 6| null| true| | 7|2020-02-22 22:13:50| null| null|STREAMING UPDATE|[outputMode -> Ap...|null| null| null| 6| null| true| | 6|2020-02-22 22:13:42| null| null|STREAMING UPDATE|[outputMode -> Ap...|null| null| null| 4| null| true| | 5|2020-02-22 22:13:40| null| null|STREAMING UPDATE|[outputMode -> Ap...|null| null| null| 4| null| true| | 4|2020-02-22 22:13:32| null| null|STREAMING UPDATE|[outputMode -> Ap...|null| null| null| 2| null| true| | 3|2020-02-22 22:13:30| null| null|STREAMING UPDATE|[outputMode -> Ap...|null| null| null| 2| null| true| | 2|2020-02-22 22:13:22| null| null|STREAMING UPDATE|[outputMode -> Ap...|null| null| null| 1| null| true| | 1|2020-02-22 22:13:20| null| null|STREAMING UPDATE|[outputMode -> Ap...|null| null| null| 0| null| true| | 0|2020-02-22 22:13:18| null| null| WRITE|[mode -> ErrorIfE...|null| null| null| null| null| true| +-------+-------------------+------+--------+----------------+--------------------+----+--------+---------+-----------+--------------+-------------+
如果我们希望看一下刚才删除操作前的数据表状态,可以很方便的回溯到前一个快照点,并进行再次查询(我们可以看到被删除的记录又出现了)。
previousVersion = deltaTable.history(1).select("version").collect()[0][0] - 1 spark.read.format("delta") \ .option("versionAsOf", previousVersion) \ .load(delta_path) \ .createOrReplaceTempView("loans_delta_pre_delete") \ spark.sql("SELECT COUNT(*) FROM loans_delta_pre_delete WHERE funded_amnt = paid_amnt").show() +--------+ |count(1)| +--------+ | 5134| +--------+
结论
本文通过 jupyter notebook 工具演示了 Delta Lake 的官方教程,你可以在本文末尾下载到完整的 notebook 文件。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
标签遇上监控 - 如何一键配置ECS实例的分组监控
标签遇上监控 - 如何一键配置ECS实例的分组监控 随着云账号下的资源增多,从业务管理的角度非常需要对资源进行分组划分。ECS推荐灵活使用 标签 (TAG) 对资源进行分组,可以帮助您简化对资源的分组运维。 本文将为您介绍如何对一键配置标签监控能力,将自动为每个标签创建云监控应用分组,从而简便查看和管理不同标签分组的监控数据。 背景 标签 标签可以标记资源,便于搜索和资源聚合 云监控 阿里云的一站式监控服务 云监控应用分组 应用分组为云监控的业务角度资源分组方式,提供分组级别的进行报警、监控能力 场景 1. 使用标签对ECS实例进行标记和分组 配置标签的云监控能力前,您需要根据您的业务对ECS实例进行标签分组。 如已配置过ECS实例的标签分组,可以跳过此步。 例如一个云账号下,有2台实例,我们已经给两台实例打上键为 应用,值为 Example-A 的标签。 如何编辑ECS实例的标签? 选择实例列表下部【更多】- 【实例设置】- 【编辑标签】批量操作 使用标签控制台 完成操作。 对于数量较多且跨地域、产品类型的资源,还可以使用标签编辑器快速完成。 您可以参阅 标签设计最佳实践 了解根据业...
- 下一篇
MaxCompute SQL与Hive对比分析及使用注意事项
摘要:一个使用过Hadoop的Hive框架的大数据开发工程师,往往基本掌握了阿里云的大数据计算服务MaxCompute的90%。本次分享主要通过详细对比MaxCompute和Hive各个方面的异同及开发使用的注意事项,方便用户来开发使用MaxCompute,实现从Hive秒速迁移到MaxCompute。 演讲嘉宾简介:刘建伟 以下内容根据演讲视频以及PPT整理而成。本次分享主要围绕以下四个方面:一、MaxCompute和Hive对比内容介绍二、MaxCompute介绍三、MaxCompute和Hive对比四、MaxCompute注意事项及对比总结 一、MaxCompute和Hive对比内容介绍本文主要从文件系统、调度系统、客户端、SQL、Web UI、界面化操作、权限等方面对MaxCompute和Hive进行对比。其中SQL对比中将分别
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
-
Docker使用Oracle官方镜像安装(12C,18C,19C)
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8编译安装MySQL8.0.19
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
推荐阅读
最新文章
- CentOS6,CentOS7官方镜像安装Oracle11G
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- 设置Eclipse缩进为4个空格,增强代码规范
- Mario游戏-低调大师作品
- MySQL8.0.19开启GTID主从同步CentOS8
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- CentOS8编译安装MySQL8.0.19
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池