Ignite集成Spark之IgniteDataFrames
本系列共两篇文章,主要探讨如何将Ignite和Spark进行集成。
下面简要地回顾一下在第一篇文章中所谈到的内容。
Ignite是一个分布式的内存数据库、缓存和处理平台,为事务型、分析型和流式负载而设计,在保证扩展性的前提下提供了内存级的性能。
Spark是一个流式数据和计算引擎,通常从HDFS或者其他存储中获取数据,一直以来,他都倾向于OLAP型业务,并且聚焦于MapReduce类型负载。
因此,这两种技术是可以互补的。
将Ignite与Spark整合
整合这两种技术会为Spark用户带来若干明显的好处:
- 通过避免大量的数据移动,获得真正可扩展的内存级性能;
- 提高RDD、DataFrame和SQL的性能;
- 在Spark作业之间更方便地共享状态和数据。
下图中显示了如何整合这两种技术,并且标注了显著的优势:
在第一篇文章中,主要聚焦于IgniteRDD,而本文会聚焦于IgniteDataFrames。
IgniteDataframes
Spark的DataFrame API为描述数据引入了模式的概念,Spark通过表格的形式进行模式的管理和数据的组织。
DataFrame是一个组织为命名列形式的分布式数据集,从概念上讲,DataFrame等同于关系数据库中的表,并允许Spark使用Catalyst查询优化器来生成高效的查询执行计划。而RDD只是跨集群节点分区化的元素集合。
Ignite扩展了DataFrames,简化了开发,改进了将Ignite作为Spark的内存存储时的数据访问时间,好处包括:
- 通过Ignite读写DataFrames时,可以在Spark作业之间共享数据和状态;
- 通过优化Spark的查询执行计划加快SparkSQL查询,这些主要是通过IgniteSQL引擎的高级索引以及避免了Ignite和Spark之间的网络数据移动实现的。
IgniteDataframes示例
下面通过一些代码以及搭建几个小程序的方式,了解Ignite DataFrames如何使用,如果想实际运行这些代码,可以从GitHub上下载。
一共会写两个Java的小应用,然后在IDE中运行,还会在这些Java应用中执行一些SQL。
一个Java应用会从JSON文件中读取一些数据,然后创建一个存储于Ignite的DataFrame,这个JSON文件Ignite的发行版中已经提供,另一个Java应用会从Ignite的DataFrame中读取数据然后使用SQL进行查询。
下面是写应用的代码:
public class DFWriter {
private static final String CONFIG = "config/example-ignite.xml";
public static void main(String args[]) {
Ignite ignite = Ignition.start(CONFIG);
SparkSession spark = SparkSession
.builder()
.appName("DFWriter")
.master("local")
.config("spark.executor.instances", "2")
.getOrCreate();
Logger.getRootLogger().setLevel(Level.OFF);
Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);
Dataset<Row> peopleDF = spark.read().json(
resolveIgnitePath("resources/people.json").getAbsolutePath());
System.out.println("JSON file contents:");
peopleDF.show();
System.out.println("Writing DataFrame to Ignite.");
peopleDF.write()
.format(IgniteDataFrameSettings.FORMAT_IGNITE())
.option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG)
.option(IgniteDataFrameSettings.OPTION_TABLE(), "people")
.option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS(), "id")
.option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS(), "template=replicated")
.save();
System.out.println("Done!");
Ignition.stop(false);
}
}
在DFWriter
中,首先创建了SparkSession
,它包含了应用名,之后会使用spark.read().json()
读取JSON文件并且输出文件内容,下一步是将数据写入Ignite存储。下面是DFReader
的代码:
public class DFReader {
private static final String CONFIG = "config/example-ignite.xml";
public static void main(String args[]) {
Ignite ignite = Ignition.start(CONFIG);
SparkSession spark = SparkSession
.builder()
.appName("DFReader")
.master("local")
.config("spark.executor.instances", "2")
.getOrCreate();
Logger.getRootLogger().setLevel(Level.OFF);
Logger.getLogger("org.apache.ignite").setLevel(Level.OFF);
System.out.println("Reading data from Ignite table.");
Dataset<Row> peopleDF = spark.read()
.format(IgniteDataFrameSettings.FORMAT_IGNITE())
.option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG)
.option(IgniteDataFrameSettings.OPTION_TABLE(), "people")
.load();
peopleDF.createOrReplaceTempView("people");
Dataset<Row> sqlDF = spark.sql("SELECT * FROM people WHERE id > 0 AND id < 6");
sqlDF.show();
System.out.println("Done!");
Ignition.stop(false);
}
}
在DFReader
中,初始化和配置与DFWriter
相同,这个应用会执行一些过滤,需求是查找所有的id > 0 以及 < 6的人,然后输出结果。
在IDE中,通过下面的代码可以启动一个Ignite节点:
public class ExampleNodeStartup {
public static void main(String[] args) throws IgniteException {
Ignition.start("config/example-ignite.xml");
}
}
到此,就可以对代码进行测试了。
运行应用
首先在IDE中启动一个Ignite节点,然后运行DFWriter
应用,输出如下:
JSON file contents:
+-------------------+---+------------------+
| department| id| name|
+-------------------+---+------------------+
|Executive Committee| 1| Ivan Ivanov|
|Executive Committee| 2| Petr Petrov|
| Production| 3| John Doe|
| Production| 4| Ann Smith|
| Accounting| 5| Sergey Smirnov|
| Accounting| 6|Alexandra Sergeeva|
| IT| 7| Adam West|
| Head Office| 8| Beverley Chase|
| Head Office| 9| Igor Rozhkov|
| IT| 10|Anastasia Borisova|
+-------------------+---+------------------+
Writing DataFrame to Ignite.
Done!
如果将上面的结果与JSON文件的内容进行对比,会显示两者是一致的,这也是期望的结果。
下一步会运行DFReader
,输出如下:
Reading data from Ignite table.
+-------------------+--------------+---+
| DEPARTMENT| NAME| ID|
+-------------------+--------------+---+
|Executive Committee| Ivan Ivanov| 1|
|Executive Committee| Petr Petrov| 2|
| Production| John Doe| 3|
| Production| Ann Smith| 4|
| Accounting|Sergey Smirnov| 5|
+-------------------+--------------+---+
Done!
这也是期望的输出。
总结
通过本文,会发现使用Ignite DataFrames是如何简单,这样就可以通过Ignite DataFrame进行数据的读写了。
未来,这些代码示例也会作为Ignite发行版的一部分进行发布。
关于Ignite和Spark的集成,内容就是这些了。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
雷神 Thor —— TiDB 自动化运维平台
作者:瞿锴,同程艺龙资深 DBA 背景介绍 随着互联网的飞速发展,业务量可能在短短的时间内爆发式地增长,对应的数据量可能快速地从几百 GB 涨到几百个 TB,传统的单机数据库提供的服务,在系统的可扩展性、性价比方面已经不再适用。为了应对大数据量下业务服务访问的性能问题,MySQL 数据库常用的分库、分表方案会随着 MySQL Sharding(分片)的增多,业务访问数据库逻辑会越来越复杂。而且对于某些有多维度查询需求的表,需要引入额外的存储或牺牲性能来满足查询需求,这样会使业务逻辑越来越重,不利于产品的快速迭代。 TiDB 的架构 TiDB 作为 PingCAP 旗下开源的分布式数据库产品,具有多副本强一致性的同时能够根据业务需求非常方便的进行弹性伸缩,并且扩缩容期间对上层业务无感知。TiDB 包括三大核心组件:TiDB/TiKV/PD。 TiDB Server:主要负责 SQL 的解析器和优化器,它相当于计算执行层,同时也负责客户端接入和交互。 TiKV Server:是一套分布式的 Key-Value 存储引擎,它承担整个数据库的存储层,数据的水平扩展和多副本高可用特性都是在这一层...
-
下一篇
分布式锁与实现(一)基于Redis实现
目前几乎很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。分布式的CAP理论告诉我们“任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项。”所以,很多系统在设计之初就要对这三者做出取舍。在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证“最终一致性”,只要这个最终时间是在用户可以接受的范围内即可。 在很多场景中,我们为了保证数据的最终一致性,需要很多的技术方案来支持,比如分布式事务、分布式锁等。 选用Redis实现分布式锁原因 Redis有很高的性能 Redis命令对此支持较好,实现起来比较方便 在此就不介绍Redis的安装了,具体在Linux和Windows中的安装可以查看我前面的博客。 http://www.cnblogs.com/liuyang0/p/6504826.html 使用命令介绍 SETNX SETNX key val 当且仅当key不存在时,set一个key为val...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS7,CentOS8安装Elasticsearch6.8.6
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS8编译安装MySQL8.0.19
- Docker容器配置,解决镜像无法拉取问题
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- Dcoker安装(在线仓库),最新的服务器搭配容器使用