您现在的位置是:首页 > 文章详情

Spark Streaming 的saveAsTextFiles遇到的坑

日期:2019-01-09点击:598

使用sparkStreaming消费数据,并使用Dstream的 saveAsTextFile保存数据到hdfs中,通过使用这个方法,生成的文件夹存在问题,

代码例子如下:

    resultRdd.map(x=>x).saveAsTextFiles("hdfs:ip//data/storage/20181010/"+(new Date()))  //new Date()自行转化

    ssc.start()

    ssc.awaitermination()


而hsfs中目录显示为

   /data/storage/20181010/201810100708223-1547016648000

   /data/storage/20181010/201810100708223-1547016652000

   /data/storage/20181010/201810100708223-1547016658000

   .........................................


从中发现最后面多了一条横杠 -和时间戳1547016648000,是根据间隔时间自动生成的,但是我不想要他后面的-1547016648000,

并且201810100708223日期固定住了

查看saveAsTextFiles源码


def saveAsTextFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope {

  val saveFunc = (rdd: RDD[T], time: Time) => {

    val file = rddToFileName(prefix, suffix,time)

    rdd.saveAsTextFile(file)

  }

  this.foreachRDD(saveFunc)

}

saveAsTextFiles方法中也是调用了saveAsTextFile方法,其中有个添加时间戳的方法。


于是我根据源码自己使用foreachRDD,生成文件使用saveAsTextFile


resultRdd.foreachRDD{

rdd=>{}

rdd.map(x=>x).saveAsTextFile("hdfs:ip//data/storage/20181010/"+(new Date()))  //new Date()自行转化

}



ssc.start()

ssc.awaitermination()


现在hsfs中目录显示为

   /data/storage/20181010/201810100708223

   /data/storage/20181010/201810100708460


达到自己想要的结果,根据streaming 间隔时间生成文件夹,并其中包含文件。

原文链接:https://yq.aliyun.com/articles/685968
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章