hadoop集群同步实现
#!/usr/bin/env python
#coding=utf-8
#scribe日志接收存在小集群到大集群之间, distcp 同步失败的情况,需要手动进行补入。
#1、如果查询补入的日志量少,则可以之间用脚本处理。如果量大,则使用 hadoop 提交job。
# hadoop job 提交方式:
# hadoop jar /usr/local/hadoop-2.4.0/share/hadoop/tools/lib/hadoop-distcp-2.4.0.jar -m 100 hdfs://scribehadoop/scribelog/common_act/2016/08/02/13/ /file/realtime/distcpv2/scribelog/common_act/2016/08/02/13 --update
# --update 参数表示如果目标地址目录存在,则更新该目录中的内容。
#手动同步脚本使用方法: python manual_check_sync.py dst_path
#脚本完成大集群和小集群之间的目录大小比较,目录文件比较。 输出差异文件列表。最后完成同步入库。
import sys,os,commands,re
import logging,logging.handlers
Module=sys.argv[1].split("scribelog")[1]
logger1 = logging.getLogger('mylogger1')
logger1.setLevel(logging.DEBUG)
# 创建一个handler,用于写入日志文件
filehandle = logging.FileHandler('log/test.log',mode='a')
# 再创建一个handler,用于输出到控制台
consehandle = logging.StreamHandler()
# 定义handler的输出格式formatter
formatter = logging.Formatter('[%(asctime)s]:%(message)s',datefmt='%F %T')
#formatter = logging.Formatter('[%(asctime)s-line:%(lineno)d-%(levelname)s]:%(message)s',datefmt='%F %T')
filehandle.setFormatter(formatter)
consehandle.setFormatter(formatter)
logger1.addHandler(filehandle)
logger1.addHandler(consehandle)
little_cluster="hdfs://yz632.hadoop.com.cn/scribelog"
large_cluster="hdfs://ns1"
#logger1.info("源目录(小集群):%s%s" % ( little_cluster,Module))
logger1.info("源目录(小集群):%s%s" %(little_cluster,Module))
logger1.info("目标目录(大集群):%s%s" %(large_cluster,sys.argv[1]))
#统计目录大小等情况
du_little=commands.getoutput("hadoop fs -count " + little_cluster + Module)
du_large=commands.getoutput("hadoop fs -count " + large_cluster + sys.argv[1])
#获取的值是str类型,所以需要转为list来做比较。
logger1.info(" DIR_COUNT FILECOUNT CONTENTSIZE ")
logger1.info("小集群目录信息:%s" %(du_little))
logger1.info("大集群目录信息:%s" %(du_large))
#Python的str类有split方法,只能根据指定的某个字符分隔字符串,re模块中提供的split方法可以定义多个分隔符。这里可以用单个空格作为分隔即可。
du_little=re.split(' | | ' ,du_little)
du_large=re.split(' | | ' ,du_large)
#du_little=du_little.split(" ")
#du_large=du_large.split(" ")
#print du_large,du_little
#print du_large[3],du_little[3]
#print du_large[6],du_little[6]
#print du_large[7],du_little[7]
#
if du_little[3] == du_large[3] and du_little[6] == du_large[6] and du_little[7] == du_large[7]:
logger1.info("大小集群文件数量、大小一致,不需要同步")
exit()
#如果大小不一致,取出目录下所有文件
little_list=commands.getoutput("hadoop fs -lsr " + little_cluster + Module + "|grep -v \"^d\"" )
large_list =commands.getoutput("hadoop fs -lsr " + large_cluster + sys.argv[1]+"|grep -v \"^d\"")
#logger1.info( "小集群情况:%s " %(little_list))
#logger1.info( "大集群情况:%s ")%(large_list ))
list1=[]
list2=[]
lost_list=[]
for i in little_list.split("\n"):
list1.append(i.split("scribelog")[1])
for i in large_list.split("\n"):
list2.append(i.split("scribelog")[1])
#logger1.info("小集群目录文件:",list1
#logger1.info("大集群目录文件" ,list2
logger1.info("对比大小集群文件---》未同步文件列表:")
for i in list1:
if i not in list2:
logger1.info(i)
lost_list.append(i)
logger1.info(lost_list)
#拉取小集群文件到本地tmp目录中
for i in lost_list:
s=commands.getstatusoutput("hadoop fs -get " + little_cluster + i+" /tmp/")
logger1.info("拉取小集群文件到本地tmp目录中%s" %s )
#入库到大集群
for i in lost_list:
logger1.info(i)
j=commands.getstatusoutput("sudo su - datacopy -s /bin/bash -c '/usr/local/hadoop-2.4.0/bin/hadoop fs -put /tmp/" +i.split("/")[-1] + " /file/realtime/distcpv2/scribelog"+i+" '" )
logger1.info(j)
if j[0] == 0 :
logger1.info(" %s + 同步完成" %(i))
#clear tmp file
for i in lost_list:
commands.getstatusoutput("rm -f /tmp/"+i.split("/")[-1] )
#当同步目录时,可能会出现put的时候提示没有目录存在。参考赵兵的 put - src dest 方法,是否可以避免此类问题有待验证。
du_little=commands.getoutput("hadoop fs -count " + little_cluster + Module)
du_large=commands.getoutput("hadoop fs -count " + large_cluster + sys.argv[1])
logger1.info(" DIR_COUNT FILECOUNT CONTENTSIZE ")
logger1.info("小集群目录信息:%s" %(du_little))
logger1.info("大集群目录信息:%s" %(du_large))
#Python的str类有split方法,但是这个split方法只能根据指定的某个字符分隔字符串,re模块中提供的split方法可以用来做这件事情
du_little=re.split(' | | ' ,du_little)
du_large=re.split(' | | ' ,du_large)
if du_little[3] == du_large[3] and du_little[6] == du_large[6] and du_little[7] == du_large[7]:
logger1.info("小集群文件数量、大小一致,不需要同步")
exit()

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
【MaxCompute学习】隐式转化的问题
有一次计算一个数据的百分比,想把小数结果取2位,并拼接一个百分号展示在结果报表中。用到的sql如下 selectconcat(round(10230/1497409,4)*100,'%')fromdual; 很奇怪局部数据并没有保留2位小数,比如上面的数据返回的是67.99999999999999 我计算了下上面的结果大概得到的数据为0.0068 selectconcat(round(0.0066,4)*100,'%')fromdual;--0.66% selectconcat(round(0.0067,4)*100,'%')fromdual;--0.67% selectconcat(round(0.0068,4)*100,'%')fromdual;--0.6799999999999999% sele
- 下一篇
[R]高性能计算SparkR
Why SparkR Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载 。 而Spark力图整合机器学习(MLib)、图算法(GraphX)、流式计算(Spark Streaming)和数据仓库(Spark SQL)等领域,通过计算引擎Spark,弹性分布式数据集(RDD),架构出一个新的大数据应用平台。 SparkR 是一个提供轻量级前端的 R 包,在 R 的基础上加入了 Spark 的分布式计算和存储等特性。在 Spark 1.6.1 中,SparkR 提供了一个分布式数据框(DataFrame)的实现,它能够支持诸如选取、过滤和聚集等操作。这个特性与 R 语言自身提供的特性类似,但 SparkR 能够作用于更大规模的数据集。SparkR 是一个提供轻量级前端的 R 包,在 R 的基础上加入了 Spark 的分布式计算和存储等特性。汇集了spark和R本身的诸多优点,如下图。 S...
相关文章
文章评论
共有0条评论来说两句吧...