使用Airflow调度MaxCompute
一、环境准备
• Python 2.7.5 PyODPS支持Python2.6以上版本
• Airflow apache-airflow-1.10.7
1.安装MaxCompute需要的包
pip install setuptools>=3.0 pip install requests>=2.4.0 pip install greenlet>=0.4.10 # 可选,安装后能加速Tunnel上传。 pip install cython>=0.19.0 # 可选,不建议Windows用户安装。 pip install pyodps
注意:如果requests包冲突,先卸载再安装对应的版本
2.执行如下命令检查安装是否成功
python -c "from odps import ODPS"
二、开发步骤
1.在Airflow家目录编写python调度脚本Airiflow_MC.py
#-*- coding: UTF-8 -*- import sys from odps import ODPS from odps import options from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime, timedelta import time #修改系统默认编码。 reload(sys) sys.setdefaultencoding('utf8') default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date':datetime(2020,1,15), 'retry_delay': timedelta(minutes=5), } dag = DAG( 'Airiflow_MC', default_args=default_args, schedule_interval=timedelta(seconds=30)) #打印时间 def get_time(): print '当前时间是{}'.format(time.time()) return time.time() #执行MaxCompute的查询任务 def mc_job (): #MaxCompute参数设置 options.sql.settings = {'options.tunnel.limit_instance_tunnel': False, 'odps.sql.allow.fullscan': True} odps = ODPS('**your-access-id**', '**your-secret-access-key**', '**your-default-project**',endpoint='**your-end-point**') # project = odps.get_project('my_project') # 取到某个项目。 project = odps.get_project() # 取到默认项目。 # 获取表。 # t = odps.get_table('tableName') # 接受传入的分区参数。 with odps.execute_sql('select * from tableName').open_reader() as reader: count = reader.count print("查询表数据条数:{}".format(count)) for record in reader: print record return count t1 = PythonOperator ( task_id = 'get_time' , provide_context = False , python_callable = get_Time , dag = dag ) t2 = PythonOperator ( task_id = 'mc_job' , provide_context = False , python_callable = mc_job , dag = dag ) t2.set_upstream(t1)
2.提交
python Airiflow_MC.py
3.进行测试
# print the list of active DAGs airflow list_dags # prints the list of tasks the "tutorial" dag_id airflow list_tasks Airiflow_MC # prints the hierarchy of tasks in the tutorial DAG airflow list_tasks Airiflow_MC --tree #测试task airflow test Airiflow_MC get_time 2010-01-16 airflow test Airiflow_MC mc_job 2010-01-16
4.运行调度任务
登录到web界面点击按钮运行
5.查看任务运行结果
1.点击view log
2.查看结果

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
程序设计的5个底层逻辑,决定你能走多快 | 11月22号云栖号夜读
点击订阅云栖夜读日刊,专业的技术干货,不容错过! 阿里专家原创好文 1.程序设计的5个底层逻辑,决定你能走多快 肉眼看计算机是由CPU、内存、显示器这些硬件设备组成,但大部分人从事的是软件开发工作。计算机底层原理就是连通硬件和软件的桥梁,理解计算机底层原理才能在程序设计这条路上越走越快,越走越轻松。从操作系统层面去理解高级编程语言的执行过程,会发现好多软件设计都是同一种套路,很多语言特性都依赖于底层机制,今天董鹏为你一一揭秘。阅读更多》》 2.TOP100直击|如何在一周内上线50个用户增长策略 一篇干货好文,值得一读!阅读更多》》 3.轻松构建基于 Serverless 架构的弹性高可用视频处理系统 随着计算机技术和 Internet 的日新月异,视频点播技术因其良好的人机交互性和流媒体传输技术倍受教育、娱乐等行业青睐,而在当前, 云计算平台厂商的产品线不断成熟完善, 如果想要搭建视频点播类应用,告别刀耕火种, 直接上云会扫清硬件采购、 技术等各种障碍。阅读更多》》 4.MySQL用户如何构建实时数仓 依托数据库生态,AnalyticDB for MySQL可以给用户提供分析场景下的...
- 下一篇
全方位事件监控管理,阿里云日志服务Kubernetes事件中心 正式上线
2020年2月21日,阿里云日志服务Kubernetes事件中心正式上线,为Kubernetes事件提供集中化采集、存储、分析、可视化、告警等能力,帮助Kubernetes使用者快速构建准实时、高可靠、全方位的事件监控管理。 据介绍,Kubernetes事件中心能够实时将系统中产生的事件采集到中心化的存储系统中,事件中心内置多种可视化报表,通过报表中的各类统计图表,可以迅速了解集群中发生的异常情况,并支持各种维度的筛选过滤。事件中心提供数十种告警项,当集群发生异常时,可通过邮件、短信、语音、钉钉机器人、WebHook等方式发出告警通知。同时事件中心的数据支持自定义的处理与分析,可以对接Flink、SparkStream、MaxCompute、Hadoop等各类流计算/离线计算引擎。 阿里云产品负责人周郎表示,Kubernetes事件中心由阿里在大规模使用Kubernetes中积累的经验提炼而成,是一款使用门槛低但功能非常强大的产品,只需控制台点击开通即可获得事件中心的各种能力。 第三方统计数据显示,当前Kubernetes在容器编排领域占据70%以上的份额,Kubernetes已经成为...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Hadoop3单机部署,实现最简伪集群
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Windows10,CentOS7,CentOS8安装Nodejs环境
- 设置Eclipse缩进为4个空格,增强代码规范
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- CentOS7设置SWAP分区,小内存服务器的救世主
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- Linux系统CentOS6、CentOS7手动修改IP地址
- Docker安装Oracle12C,快速搭建Oracle学习环境