SpringBoot构建大数据开发框架
为什么使用SpringBoot
1、web工程分层设计,表现层、业务逻辑层、持久层,按照技术职能分为这几个内聚的部分,从而促进技术人员的分工
2、需要各种XML配置,还需要搭建Tomcat或者jetty作为容器来运行,每次构建项目,都需要经历此流程
3、一个整合良好的项目框架不仅仅能实现技术、业务的分离,还应该关注并满足开发人员的“隔离”
springBoot是什么
Spring Boot是由Pivotal团队提供的全新框架,其设计目的是用来简化新Spring应用的初始搭建以及开发过程。该框架致力于实现免XML配置,提供便捷,独立的运行环境,实现“一键运行”满足快速应用开发的需求。从而使开发人员不再需要定义样板化的配置。
从根本上讲,Spring Boot就是一些库的集合,它能够被任意项目的构建系统所使用。
它的优点
使编码变得简单
spring boot采用java config的方式,对spring进行配置,并且提供了大量的注解,极大地提高了工作效率。
使配置变得简单
spring boot提供许多默认配置,当然也提供自定义配置。但是所有spring boot的项目都只有一个配置文件:application.properties/application.yml。用了spring boot,再也不用担心配置出错找不到问题所在了。
使部署变得简单
spring boot内置了三种servlet容器:tomcat,jetty,undertow。
所以,你只需要一个java的运行环境就可以跑spring boot的项目了。spring boot的项目可以打成一个jar包,然后通过java -jar xxx.jar来运行。(spring boot项目的入口是一个main方法,运行该方法即可。 )
使监控变得简单
spring boot提供了actuator包,可以使用它来对你的应用进行监控。它主要提供了以下功能:
构建
整合spark
SpringBoot构建大数据开发框架
分享之前我还是要推荐下我自己创建的大数据学习资料分享群 957205962,这是全国最大的大数据学习交流的地方,2000人聚集,不管你是小白还是大牛,小编我都挺欢迎,今天的源码已经上传到群文件,不定期分享干货,包括我自己整理的一份最新的适合2017年学习的前端资料和零基础入门教程,欢迎初学和进阶中的小伙伴。
Scala版本demo
object WordCount {
def main(args: Array[String]): Unit = {
/**
第一步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息
例如说通过setMaster来设置程序要连接的Spark集群的Master的URL
如果设置为local,则代表Spark程序在本地运行,特别适合于配置条件的较差的人
*/
val conf = new SparkConf()
//设置应用程序的名称,在程序运行的监控界面可以看到名称
conf.setAppName("MyFirstSparkApplication")
//此时程序在本地运行,无需安装Spark的任何集群
conf.setMaster("local")
/**
/创建SparkContext对象,通过传入SparkConf实例来定制Spark运行的具体参数和配置信息
*/
val sc = new SparkContext(conf)
/**
第三步:根据具体的数据来源(HDFS,HBase,Local FS(本地文件系统) ,DB,S3(云上)等)通过SparkContext来创建RDD
RDD的创建基本有三种方式,根据外部的数据来源(例如HDFS),根据Scala集合,由其他的RDD操作产生
数据会被RDD划分成为一系列的Partitions,分配到每个Partition的数据属于一个Task的处理范畴
*/
//文件的路径,最小并行度(根据机器数量来决定)
val lines= sc.textFile("D://hadoop//spark-2.2.0-bin-hadoop2.7//README.md", 1)
//读取本地文件,并设置Partition = 1 //类型推导得出lines为RDD
/**
第四步:对初始的RDD进行Transformation级别的处理,例如map,filter等高阶函数等的编程,来进行具体的数据计算
4.1:将每一行的字符串拆分成单个的单词
4.2:在单词拆分的基础上对每个单词的实例计数为1,也就是word =>(word,1)
4.3:在每个单词实例计数为1基础之上统计每个单词在文件出现的总次数
*/
//对每一行的字符串进行单词的拆分并把所有行的拆分结果通过flat合并成为一个大的单词集合
val words = lines.flatMap { line => line.split(" ") } //words同样是RDD类型
val pairs = words.map { word => (word,1) }
//对相同的key,进行value的累加(包括Local和Reducer级别同时Reduce)
val wordCounts = pairs.reduceByKey(+)
wordCounts.foreach(wordNumberPair => println(wordNumberPair._1 + " : " + wordNumberPair._2))
//注意一定要将SparkContext的对象停止,因为SparkContext运行时会创建很多的对象
sc.stop()
}
}
注意的地方
spark版本自带scala包,需要使用对应的scala sdk版本编译。
不要使用maven仓库作为下载pom文件依赖的java包,因为spark对应的scala包版本有误。
整合Elasticsearch
引入相关jar包
SpringBoot构建大数据开发框架
Scala版本demo
将数据写入es,自动创建es索引
object ESDemo {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("ESDemo").setMaster("local")
//在spark中自动创建es中的索引
conf.set("es.index.auto.creaete","true")
val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
sc.makeRDD(Seq(numbers,airports)).saveToEs("spark/docs")
}
}
读取es中的数据,使用来org.elasticsearch.spark.sql读取索引中的数据
object ESDemo1 {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("ESDemo").setMaster("local")
val sc = new SparkContext(conf)
//创建sqlContext
val sqlContext = new SQLContext(sc)
import sqlContext.implicits.
val options = Map("pushdown" -> "true", "es.nodes" -> "localhost", "es.port" -> "9200")
val spark14DF = sqlContext.read.format("org.elasticsearch.spark.sql").options(options).load("spark/docs")
spark14DF.select("one","two","three").collect().foreach(println())
//将数据注册到临时表
spark14DF.registerTempTable("docs")
val results = sqlContext.sql("SELECT one FROM docs")
results.map(t => "one:"+t(0)).collect().foreach(println)
sc.stop()
}
}
注意的地方
项目中引入的版本需要与安装的easticsearch的版本一致,且需使用对应的elasticsearch-spark包。
整合Kafka
引入相关jar包
SpringBoot构建大数据开发框架
Kafka读取数据写入ES demo
def main(args: Array[String]): Unit = {
import org.apache.spark.SparkConf
val conf = new SparkConf().setMaster("local").setAppName("KafkaToESDemo")
//在spark中自动创建es中的索引
conf.set("es.index.auto.creaete","true")
val ssc = new StreamingContext(conf, Seconds(10))
var topics = Array("orders_four");//kafka topic名称
var group = "" //定义groupID
val kafkaParam = Map( //申明kafka相关配置参数
"bootstrap.servers" -> "XXX.XXX.XX.XX:XXXXX", //kafka 集群IP及端口
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> group, //定义groupID
"auto.offset.reset" -> "earliest",//设置丢数据模式 有 earliest,latest, none
"enable.auto.commit" -> (false: java.lang.Boolean)
);
val offsetRanges = Array()
var stream = KafkaUtils.createDirectStreamString, String)//从kafka读取数据 获取数据流
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //获取offset
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) //存储offset
val wordsDataFrame = rdd.map(record=>record.value().split(","))
.map(temp=>Record(temp(0).toLong,temp(1),temp(2).trim().toInt,temp(3).trim().toInt,temp(4).trim().toInt,temp(5).trim().toInt,temp(6),temp(7),temp(8),temp(9),temp(10),temp(11).toDouble,temp(12).toDouble,temp(13),temp(14).toDouble,temp(15).toDouble,temp(16),temp(17),temp(18),temp(19),temp(20),temp(21),temp(22),temp(23).toInt,temp(24),temp(25).toInt,temp(26).toDouble,temp(27).toDouble,temp(28).toDouble,temp(29).toDouble,temp(30).toDouble,temp(31),temp(32),temp(33).toDouble,temp(34).toInt,temp(35).toDouble,temp(36).toDouble,temp(37).toDouble,temp(38),temp(39),temp(40).toInt,temp(41).toDouble,temp(42).toDouble,temp(43).toDouble,temp(44).toDouble,temp(45).toDouble,temp(46).toDouble,temp(47).toDouble,temp(48)))
.saveToEs("spark/kafkademodocs",
Map("es.mapping.id" -> "id","es.writeoperation" -> "upsert","es.update.retry.on.conflict" -> "5"))
}
ssc.start();
ssc.awaitTermination();
}
注意的地方
spark-streaming-kafka包需要与spark版本一致,也需要与scala sdk版本一致;
写入Es的api方法说明
saveToEs对应DStream type needs to be a Map (either a Scala or a Java one), a JavaBean or a Scala case class.
saveJsonToEs 对应JSON字符串
saveToEsWithMeta 对应pair DStream
整合mybatis
引入相关jar包
使用mybatis-spring-boot包,使mybatis配置变的简单。使用druid数据连接池,性能强大,并提供了sql监控,便于运维管理。
SpringBoot构建大数据开发框架
application.yml 配置信息:
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://tdsql-4b9a8yqj.gz.cdb.myqcloud.com:23/test?serverTimezone=Hongkong&characterEncoding=utf8&useUnicode=true&useSSL=false
username:
password:
type: com.alibaba.druid.pool.DruidDataSource
max-active: 20
initial-size: 1
min-idle: 3
max-wait: 60000
time-between-eviction-runs-millis: 60000
min-evictable-idle-time-millis: 300000
test-while-idle: true
test-on-borrow: false
test-on-return: false
poolPreparedStatements: true
maxOpenPreparedStatements: 20
validationQuery: select 'x'
注册datasource
@Configurationbr/>@EnableTransactionManagement
public class DruidDataSourceConfig implements EnvironmentAware {
private static Logger logger = LoggerFactory.getLogger(DruidDataSourceConfig.class);
private RelaxedPropertyResolver propertyResolver;
public void setEnvironment(Environment env) {
this.propertyResolver = new RelaxedPropertyResolver(env, "spring.datasource.");
}
@Bean
public DataSource dataSource() {
logger.info("开始注入druid");
DruidDataSource datasource = new DruidDataSource();
datasource.setUrl(propertyResolver.getProperty("url"));
datasource.setDriverClassName(propertyResolver.getProperty("driver-class-name"));
datasource.setUsername(propertyResolver.getProperty("username"));
datasource.setPassword(propertyResolver.getProperty("password"));
datasource.setInitialSize(Integer.valueOf(propertyResolver.getProperty("initial-size")));
datasource.setMinIdle(Integer.valueOf(propertyResolver.getProperty("min-idle")));
datasource.setMaxWait(Long.valueOf(propertyResolver.getProperty("max-wait")));
datasource.setMaxActive(Integer.valueOf(propertyResolver.getProperty("max-active")));
datasource.setMinEvictableIdleTimeMillis(Long.valueOf(propertyResolver.getProperty("min-evictable-idle-time-millis")));
datasource.setValidationQuery(propertyResolver.getProperty("validationQuery"));
try {
datasource.setFilters("stat,wall");
} catch (SQLException e) {
e.printStackTrace();
}
logger.info("成功注入druid");
return datasource;
}
@Bean
public SqlSessionFactory sqlSessionFactoryBean() throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSource());
PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
sqlSessionFactoryBean.setMapperLocations(resolver.getResources("classpath:/mybatis/*.xml"));
return sqlSessionFactoryBean.getObject();
}
}
简单的mapper
兼容两种模式,注解和xml方式写sql
@Mapper
public interface UserMapper {
public User findUserInfo();
public User findUserById(Integer id);
@Select("SELECT * FROM USER WHERE NAME = #{name}")
User findByName(@Param("name") String name);
@Insert("insert into user(name,age) value(#{name},#{age})")
int insert(@Param("name") String name,@Param("age") Integer age);
}
对应的xml
SpringBoot构建大数据开发框架
注意:mapper接口方法名称,为id,注解方式和xml中不能重名。
总结
独立运行的Spring项目
Spring Boot可以以jar包的形式进行独立的运行,使用:java -jar xx.jar 就可以成功的运行项目,或者在应用项目的主程序中运行main函数即可;
内嵌的Servlet容器
默认使用内置的tomcat服务器。
提供starter简化Manen配置
spring boot提供各种starter,其实就是一些spring bao的集合,只不过spring boot帮我们整合起来了而已。
上图只是其中的一部分,还有很多其他的。通过这些starter也可以看得出来,spring boot可以和其他主流的框架无缝集成,比如mybatis等。所以,不需要担心你想用的技术spring boot不支持。
自动配置Spring,无xml文件
Spring Boot会根据我们项目中类路径的jar包/类,为jar包的类进行自动配置Bean,这样一来就大大的简化了我们的配置。当然,这只是Spring考虑到的大多数的使用场景,在一些特殊情况,我们还需要自定义自动配置(就在那唯一的配置文件里,而且它不是xml文件!)。
应用监控
Spring Boot提供了基于http、ssh、telnet对运行时的项目进行监控。
后续扩展
Springboot开启应用监控
Spring Boot Admin 用于监控基于 Spring Boot 的应用。官方文档在这里(v1.3.4):《Spring Boot Admin Reference Guide》。
Springboot集成任务调度Quartz
Springboot集成Elasticsearch
Spring Boot为Elasticsearch提供基本的自动配置,Spring Data Elasticsearch提供在它之上的抽象,还有用于收集依赖的spring-boot-starter-data-elasticsearch'Starter'。
提供简单的restful操作elasticsearch.
Springboot集成Neo4J
Neo4j是一个开源的NoSQL图数据库,它使用图(graph)相关的概念来描述数据模型,把数据保存为图中的节点以及节点之间的关系。相比传统rdbms(关系管理系统)的方式,Neo4j更适合大数据关系分析。Spring Boot为使用Neo4j提供很多便利,包括spring-boot-starter-data-neo4j‘Starter’。
Springboot集成redis
Redis是一个缓存,消息中间件及具有丰富特性的键值存储系统。Spring Boot为Jedis客户端library提供基本的自动配置,Spring Data Redis提供了在它之上的抽象,spring-boot-starter-redis'Starter'收集了需要的依赖。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Java日志正确使用姿势
前言 关于日志,在大家的印象中都是比较简单的,只须引入了相关依赖包,剩下的事情就是在项目中“尽情”的打印我们需要的信息了。但是往往越简单的东西越容易让我们忽视,从而导致一些不该有的bug发生,作为一名严谨的程序员,怎么能让这种事情发生呢?所以下面我们就来了解一下关于日志的那些正确使用姿势。 正文 日志规范 命名 首先是日志文件的命名,尽量要做到见名知意,团队里面也必须使用统一的命名规范,不然“脏乱差”的日志文件会影响大家排查问题的效率。这里推荐以projectName_logName_logType.log来命名,这样通过名字就可以清晰的知道该日志文件是属于哪个项目,什么类型,有什么作用。例如在我们MessageServer项目中监控Rabbitmq 消费者相关的日志文件名可以定义成messageserver_rabbitmqconsumer_monitor.log。 保存时间 关于日志保存的时间,普通的日志文件建议保留15天,若比较重要的可根据实际情况延长,具体请参考各自服务器磁盘空间以及日志文件大小作出最优选择。 日志级别 常见的日志级别有以下: DEBUG级别:记录调试程序相关的...
- 下一篇
Apache Flink SQL概览
本篇核心目标是让大家概要了解一个完整的Apache Flink SQL Job的组成部分,以及Apache Flink SQL所提供的核心算子的语义,最后会应用Tumble Window编写一个End-to-End的页面访问的统计示例。 Apache Flink SQL Job的组成 我们做任何数据计算都离不开读取原始数据,计算逻辑和写入计算结果数据三部分,当然基于Apache Flink SQL编写的计算Job也离不开这个三部分,如下所所示: 如上所示,一个完整的Apache Flink SQL Job 由如下三部分: Source Operator - Soruce operator是对外部数据源的抽象, 目前Apache Flink内置了很多常用的数据源实现,比如上图提到的Kafka。 Query Operators - 查询算子主要
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Mario游戏-低调大师作品
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Red5直播服务器,属于Java语言的直播服务器
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS8编译安装MySQL8.0.19
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS6,CentOS7官方镜像安装Oracle11G
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2更换Tomcat为Jetty,小型站点的福音