[雪峰磁针石博客]大数据Hadoop工具python教程9-Luigi工作流
管理Hadoop作业的官方工作流程调度程序是Apache Oozie。与许多其他Hadoop产品一样,Oozie是用Java编写的,是基于服务器的Web应用程序,它运行执行Hadoop MapReduce和Pig的工作流作业。 Oozie工作流是在XML文档中指定的控制依赖性指导非循环图(DAG)中排列的动作集合。虽然Oozie在Hadoop社区中有很多支持,但通过XML属性配置工作流和作业的学习曲线非常陡峭。
Luigi是Spotify创建的Python替代方案,可以构建和配置复杂的批处理作业管道。它处理依赖项解析,工作流管理,可视化等等。它还拥有庞大的社区,并支持许多Hadoop技术。在github上超过1万星。
本章介绍Luigi的安装和工作流程的详细说明。
安装
pip install luigi
工作流
在Luigi中,工作流由一系列操作组成,称为任务。 Luigi任务是非特定的,也就是说,它们可以是任何可以用Python编写的东西。任务的输入和输出数据的位置称为目标(target)。目标通常对应于磁盘上,HDFS上或数据库中的文件位置。除了任务和目标之外,Luigi还利用参数来自定义任务的执行方式。
- 任务
任务是构成Luigi工作流的操作序列。每个任务都声明其依赖于其他任务创建的目标。这样Luigi能够创建依赖链。
- 目标
目标是任务的输入和输出。最常见的目标是磁盘上的文件,HDFS中的文件或数据库中的记录。 Luigi包装了底层文件系统操作,以确保与目标的交互是原子的。这允许从故障点重放工作流,而不必重放任何已经成功完成的任务。
- 参数
参数允许通过允许值从命令行,以编程方式或从其他任务传递任务来自定义任务。例如,任务输出的名称可以通过参数传递给任务的日期来确定。
参考资料
- python测试开发项目实战-目录
- python工具书籍下载-持续更新
- python 3.7极速入门教程 - 目录
- 原文地址
- 本文涉及的python测试开发库 谢谢点赞!
- [本文相关海量书籍下载](https://github.com/china-testing/python-api-tesing/blob/master/books.md
工作流本示例
#!/usr/bin/env python
# 项目实战讨论QQ群630011153 144081101
# https://github.com/china-testing/python-api-tesing
import luigi
class InputFile(luigi.Task):
"""
A task wrapping a Target
"""
input_file = luigi.Parameter()
def output(self):
"""
Return the target for this task
"""
return luigi.LocalTarget(self.input_file)
class WordCount(luigi.Task):
"""
A task that counts the number of words in a file
"""
input_file = luigi.Parameter()
output_file = luigi.Parameter(default='/tmp/wordcount')
def requires(self):
"""
The task's dependencies:
"""
return InputFile(self.input_file)
def output(self):
"""
The task's output
"""
return luigi.LocalTarget(self.output_file)
def run(self):
"""
The task's logic
"""
count = {}
ifp = self.input().open('r')
for line in ifp:
for word in line.strip().split():
count[word] = count.get(word, 0) + 1
ofp = self.output().open('w')
for k, v in count.items():
ofp.write('{}\t{}\n'.format(k, v))
ofp.close()
if __name__ == '__main__':
luigi.run()
执行
$ python wordcount.py WordCount --local-scheduler --input-file /home/hduser_/input2.txt --output-file /tmp/wordcount2.txt
DEBUG: Checking if WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt) is complete
DEBUG: Checking if InputFile(input_file=/home/hduser_/input2.txt) is complete
INFO: Informed scheduler that task WordCount__home_hduser__in__tmp_wordcount2__a94efba0f2 has status PENDING
INFO: Informed scheduler that task InputFile__home_hduser__in_0eced493f7 has status DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 21592] Worker Worker(salt=067173106, workers=1, host=andrew-PC, username=hduser_, pid=21592) running WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt)
INFO: [pid 21592] Worker Worker(salt=067173106, workers=1, host=andrew-PC, username=hduser_, pid=21592) done WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task WordCount__home_hduser__in__tmp_wordcount2__a94efba0f2 has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=067173106, workers=1, host=andrew-PC, username=hduser_, pid=21592) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 2 tasks of which:
* 1 complete ones were encountered:
- 1 InputFile(input_file=/home/hduser_/input2.txt)
* 1 ran successfully:
- 1 WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt)
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====
hduser_@andrew-PC:/home/andrew/code/HadoopWithPython/python/Luigi$ cat /tmp/wordcount2.txt
jack 2
be 2
nimble 1
quick 1

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
[雪峰磁针石博客]pyspark工具机器学习(自然语言处理和推荐系统)1数据演进
在早期员工将数据输入系统,数据点非常有限,只占用少数几个字段。然后是互联网,每个人都可以轻松获取信息。现在,用户可输入并生成自己的数据。随着互联网用户数量呈指数级增长,用户创造的高数据增长率。例如:登录/注册表单允许用户填写自己的详细信息,在各种社交平台上上传照片和视频。这导致了巨大的数据生成以及快速处理数据量的且可扩展的框架的需求。 数据生成 设备都捕获数据,如汽车,建筑物,手机,手表,飞行引擎。 数据处理也从串行转向并行处理。 Spark Spark是处理海量数据集的框架,具有高速并行处理功能。它最初是加州大学伯克利分校AMPLabin 2009的研究项目,于2010年初开源。 2016年,Spark发布了针对深度学习的TensorFrames。 Spark底层使用RDD(弹性分布式数据集Resilient Distributed Dataset)的数据结构。它能够在执行过程中重新创建任何时间点。 RDD使用最后一个创建的RDD,并且总是能够在出现任何错误时进行重构。它们是不可变的,因为原始RDD还在。由于Spark基于分布式框架,因此它适用于master和worker节点设置。执...
-
下一篇
[雪峰磁针石博客]大数据Hadoop工具python教程4-mrjob
mrjob是由Yelp创建的Python MapReduce库,它封装了Hadoop流,允许MapReduce应用程序以更加Pythonic的方式编写。 mrjob用纯Python编写多步MapReduce作业。使用mrjob编写的MapReduce作业可以在本地测试,在Hadoop集群上运行,或使用Amazon Elastic MapReduce(EMR)在云中运行。 使用mrjob编写MapReduce应用程序有许多好处: mrjob目前是非常活跃的框架,每周都有多次提交。 mrjob拥有丰富的文档。 可以在不安装Hadoop的情况下执行和测试mrjob应用程序,在部署到Hadoop集群之前就可开发和测试。 mrjob允许MapReduce应用程序在单个类中编写,而不是为mapper和reducer编写单独的程序。 虽然mrjob是很好的解决方案,但它确实有它的缺点。 mrjob是简化的,因此它不会提供与其他API提供的Hadoop相同级别的访问权限。 mrjob不使用typedbytes,因此其他库可能更快。 安装 $ pip install mrjob 参考资料 python测...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- 面试大杂烩
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- CentOS7设置SWAP分区,小内存服务器的救世主
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长