Flink单机版安装与wordCount

Flink为大数据处理工具,类似hadoop,spark.但它能够在大规模分布式系统中快速处理,与spark相似也是基于内存运算,并以低延迟性和高容错性主城,其核心特性是实时的处理流数据。从此大数据生态圈又再填一员。。。具体详解,还要等之后再分享,这里就先简要带过~

 

 

Flink的机制:

当Flink启动时,会拉起一个jobmanager和一个或多个taskManager,jobmanager作用就好比spark中的driver,taskManager的作用就好比spark中的worker.

 

flink源码:http://www.apache.org/dyn/closer.lua/flink/flink-0.10.1/flink-0.10.1-src.tgz

下载与hadoop2.6兼容版本:http://apache.dataguru.cn/flink/flink-0.10.1/flink-0.10.1-bin-hadoop26-scala_2.10.tgz

下载完毕后确定确定配置了jdk

java -version

执行 bin/start-local.sh 启动local模式 (conf下默认配置的是localhost 其他参数暂且不必配置)

 bin/start-local.sh
tail log/flink-*-jobmanager-*.log

 

 

随后可以导入idea 进行wordcount测试 ,这里用官网的example包,记得导入

package test

import org.apache.flink.api.scala._
import org.apache.flink.examples.java.wordcount.util.WordCountData

/**
 * Created by root on 12/15/15.
 */
object WordCount {
  def main(args: Array[String]) {
    if (!parseParameters(args)) {
      return
    }

    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = getTextDataSet(env)

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    if (fileOutput) {
      counts.writeAsCsv(outputPath, "\n", " ")
      env.execute("Scala WordCount Example")
    } else {
      counts.print()
    }

  }

  private def parseParameters(args: Array[String]): Boolean = {
    if (args.length > 0) {
      fileOutput = true
      if (args.length == 2) {
        textPath = args(0)
        outputPath = args(1)
        true
      } else {
        System.err.println("Usage: WordCount <text path> <result path>")
        false
      }
    } else {
      System.out.println("Executing WordCount example with built-in default data.")
      System.out.println("  Provide parameters to read input data from a file.")
      System.out.println("  Usage: WordCount <text path> <result path>")
      true
    }
  }

  private def getTextDataSet(env: ExecutionEnvironment): DataSet[String] = {
    if (fileOutput) {
      env.readTextFile(textPath)
    }
    else {
      env.fromCollection(WordCountData.WORDS)
    }

运行一下子:

优秀的个人博客,低调大师

微信关注我们

原文链接:https://yq.aliyun.com/articles/609094

转载内容版权归作者及来源网站所有!

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

相关文章

发表评论

资源下载

更多资源
Mario,低调大师唯一一个Java游戏作品

Mario,低调大师唯一一个Java游戏作品

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

Oracle Database,又名Oracle RDBMS

Oracle Database,又名Oracle RDBMS

Oracle Database,又名Oracle RDBMS,或简称Oracle。是甲骨文公司的一款关系数据库管理系统。它是在数据库领域一直处于领先地位的产品。可以说Oracle数据库系统是目前世界上流行的关系数据库管理系统,系统可移植性好、使用方便、功能强,适用于各类大、中、小、微机环境。它是一种高效率、可靠性好的、适应高吞吐量的数据库方案。

Eclipse(集成开发环境)

Eclipse(集成开发环境)

Eclipse 是一个开放源代码的、基于Java的可扩展开发平台。就其本身而言,它只是一个框架和一组服务,用于通过插件组件构建开发环境。幸运的是,Eclipse 附带了一个标准的插件集,包括Java开发工具(Java Development Kit,JDK)。

Java Development Kit(Java开发工具)

Java Development Kit(Java开发工具)

JDK是 Java 语言的软件开发工具包,主要用于移动设备、嵌入式设备上的java应用程序。JDK是整个java开发的核心,它包含了JAVA的运行环境(JVM+Java系统类库)和JAVA工具。