首页 文章 精选 留言 我的

精选列表

搜索[整合],共10003篇文章
优秀的个人博客,低调大师

关于网络爬虫的资料整合

关于通用爬虫的介绍 前言:我们生活在一个充满数据的时代。每天,来自商业、社会以及我们的日常生活所产生「图像、音频、视频、文本、定位信息」等各种各样的海量数据,注入到我们的万维网(WWW)、计算机和各种数据存储设备,其中万维网则是最大的信息载体。数据的爆炸式增长、规模庞大和广泛可用的数据,使得我们真正进入到了“大数据(Big Data)时代”。我们急需功能强大的数据处理技术(Data Technology),从这些海量数据中发现有价值的信息。网络爬虫(Web Crawler)技术,则成为了当下万维网数据(Web Data)收集中,最为高效灵活的解决方案。什么是网络爬虫?网络爬虫 获取数据方式主要有哪些? 1:企业产生的数据:百度搜索指数、腾讯公司业绩数据、阿里巴巴集团财务及运营数据、新浪微博微指数等.... 大型互联网公司拥有海量用户,有天然的数据积累优势,还有一些有数据意识的中小型企业,也开始积累自己的数据。2:数据平台购买数据:数据堂、国云数据市场、贵阳大数据交易所 等... 在各个数据交易平台上购买各行各业各种类型的数据,根据数据信息、获取难易程度的不同,价格也会有所不同。3:政府/机构公开的数据:中华人民共和国国家统计局数据、中国人民银行调查统计、世界银行公开数据、联合国数据、纳斯达克、新浪财经美股实时行情 等... 通常都是各地征服统计上报,或者是行业内专业的网站、机构等提供。4:数据管理咨询公司:麦肯锡、埃森哲、尼尔森、中国互联网络信息中心、艾瑞咨询 等... 通常这样的公司有很庞大的数据团队,一般通过市场调研、问卷调查、固定的样本检测、与各行各业的其他公司合作、专家对话来获取数据,并根据客户需求制定商业解决方案。5:爬虫爬取网络数据: 如果数据市场上没有需要的数据,或者价格太高不愿意购买,那么可以利用爬虫技术,抓取网站上的数据。 通用爬虫:通用网络爬虫 就是 捜索引擎抓取系统,目的是将互联网上的所有的网页下载到本地,形成一个互联网内容的镜像备份。 它决定着整个搜索引擎内容的丰富性和时效性,因此它的性能优劣直接影响着搜索引擎的效果。通用搜索引擎(Search Engine)工作原理第一步:抓取网页搜索引擎网络爬虫的基本工作流程如下: 首先选取一部分的初始URL,将这些URL放入 待抓取URL队列 ; 取出待抓取URL,解析DNS得到主机的IP,并将URL对应的网页下载下来,存储进已下载网页库中,并且将这些URL放进 已抓取URL队列 。 分析网页中包含的其他URL,并且将URL放入 待抓取URL队列,从而进入下一个循环.... PS : 搜索引擎如何获取一个新网站的URL: 新网站向搜索引擎主动提交网址:(如百度http://zhanzhang.baidu.com/linksubmit/url) 在其他网站上设置新网站外链(尽可能处于搜索引擎爬虫爬取范围) 搜索引擎和DNS解析服务商(如DNSPod等)合作,新网站域名将被迅速抓取。 网站的Robots协议 搜索引擎蜘蛛跟进URL抓取时,它需要遵从一些命令或文件的内容,如标注为nofollow的链接,或者是网站的Robots协议。 Robots协议(也叫爬虫协议、机器人协议等),全称是“网络爬虫排除标准”(Robots Exclusion Protocol),网站通过Robots协议告诉搜索引擎哪些页面可以抓取,哪些页面不能抓取,例如: 淘宝网:https://www.taobao.com/robots.txt 腾讯网: http://www.qq.com/robots.txt 第二步:数据存储 搜索引擎爬虫爬取到的网页,将存入原始页面数据库,页面和用户浏览器得到的HTML是完全一样的。 搜索引擎爬虫在抓取页面时,也做一定的重复内容检测,一旦遇到访问权重很低的网站上有大量抄袭、采集或者复制的内容,很可能就不再爬行。第三步:预处理 搜索引擎将爬虫抓取回来的页面,进行各种步骤的预处理。 提取文字 中文分词 消除噪音(比如版权声明文字、导航条、广告等……) 索引处理 特殊文件处理除了HTML文件外,搜索引擎通常还能抓取和索引以文字为基础的多种文件类型,如 PDF、Word、WPS、XLS、PPT、TXT 文件等。我们在搜索结果中也经常会看到这些文件类型。 但搜索引擎还不能处理图片、视频、Flash 这类非文字内容,也不能执行脚本和程序。 第四步:提供检索服务,网站排名 搜索引擎在对信息进行组织和处理后,为用户提供关键字检索服务,将用户检索相关的信息展示给用户。 同时会根据页面的PageRank值(链接的访问量排名)来进行网站排名,这样Rank值高的网站在搜索结果中会排名较前,当然也可以直接使用 Money 购买搜索引擎网站排名,简单粗暴。 完整流程图:但是,这些通用性搜索引擎也存在着一定的局限性: 通用搜索引擎所返回的结果都是网页,而大多情况下,网页里90%的内容对用户来说都是无用的。 不同领域、不同背景的用户往往具有不同的检索目的和需求,搜索引擎无法提供针对具体某个用户的搜索结果。 万维网数据形式的丰富和网络技术的不断发展,图片、数据库、音频、视频多媒体等不同数据大量出现,通用搜索引擎对这些文件无能为力,不能很好地发现和获取。 通用搜索引擎大多提供基于关键字的检索,难以支持根据语义信息提出的查询,无法准确理解用户的具体需求。 本文最终解释权归本文作者所有,未经允许不得私自转载

优秀的个人博客,低调大师

springboot整合druid连接池

依赖 //mysql 驱动 compile group: 'mysql', name: 'mysql-connector-java', version: '5.1.21' //druid compile group: 'com.alibaba', name: 'druid', version: '1.1.10' 1. 新建druid配置信息类DruidConfiguration.java package com.futao.springmvcdemo.foundation.configuration; import com.alibaba.druid.pool.DruidDataSource; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import javax.sql.DataSource; import java.sql.SQLException; /** * @author futao * Created on 2018/10/11. * ConfigurationProperties(prefix = "spring.datasource")使用规则,注入的字段如果为private,则必须具有setter方法 */ @Component @ConfigurationProperties(prefix = "spring.datasource") public class DruidConfiguration { /** * 数据库地址 */ private String url; /** * 用户名 */ private String userName; /** * 密码 */ private String password; /** * 初始化连接数量 */ private int initialSize; /** * 最小闲置连接 */ private int minIdle; /** * 最大存活连接 */ private int maxActive; /** * 配置获取连接等待超时的时间 */ private long maxWait; /** * 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 */ private long timeBetweenEvictionRunsMillis; /** * 配置一个连接在池中最小生存的时间,单位是毫秒 */ private long minEvictableIdleTimeMillis; /** * 配置一个连接在池中最大生存的时间,单位是毫秒 */ private long maxEvictableIdleTimeMillis; /** * */ private boolean testWhileIdle; /** * */ private boolean testOnBorrow; /** * */ private boolean testOnReturn; /** * */ private boolean poolPreparedStatements; /** * */ private int maxOpenPreparedStatements; /** * */ private boolean asyncInit; @Bean public DataSource druidDataSource() throws SQLException { DruidDataSource druidDataSource = new DruidDataSource(); druidDataSource.setUrl(url); druidDataSource.setUsername(userName); druidDataSource.setPassword(password); druidDataSource.setMaxActive(maxActive); druidDataSource.setInitialSize(initialSize); druidDataSource.setMaxWait(maxWait); druidDataSource.setMinIdle(minIdle); druidDataSource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis); druidDataSource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis); druidDataSource.setMaxEvictableIdleTimeMillis(maxEvictableIdleTimeMillis); druidDataSource.setTestWhileIdle(testWhileIdle); druidDataSource.setTestOnBorrow(testOnBorrow); druidDataSource.setTestOnReturn(testOnReturn); druidDataSource.setPoolPreparedStatements(poolPreparedStatements); druidDataSource.setMaxOpenPreparedStatements(maxOpenPreparedStatements); druidDataSource.setAsyncInit(asyncInit); return druidDataSource; } public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public int getInitialSize() { return initialSize; } public void setInitialSize(int initialSize) { this.initialSize = initialSize; } public int getMinIdle() { return minIdle; } public void setMinIdle(int minIdle) { this.minIdle = minIdle; } public int getMaxActive() { return maxActive; } public void setMaxActive(int maxActive) { this.maxActive = maxActive; } public long getMaxWait() { return maxWait; } public void setMaxWait(long maxWait) { this.maxWait = maxWait; } public long getTimeBetweenEvictionRunsMillis() { return timeBetweenEvictionRunsMillis; } public void setTimeBetweenEvictionRunsMillis(long timeBetweenEvictionRunsMillis) { this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis; } public long getMinEvictableIdleTimeMillis() { return minEvictableIdleTimeMillis; } public void setMinEvictableIdleTimeMillis(long minEvictableIdleTimeMillis) { this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis; } public long getMaxEvictableIdleTimeMillis() { return maxEvictableIdleTimeMillis; } public void setMaxEvictableIdleTimeMillis(long maxEvictableIdleTimeMillis) { this.maxEvictableIdleTimeMillis = maxEvictableIdleTimeMillis; } public boolean isTestWhileIdle() { return testWhileIdle; } public void setTestWhileIdle(boolean testWhileIdle) { this.testWhileIdle = testWhileIdle; } public boolean isTestOnBorrow() { return testOnBorrow; } public void setTestOnBorrow(boolean testOnBorrow) { this.testOnBorrow = testOnBorrow; } public boolean isTestOnReturn() { return testOnReturn; } public void setTestOnReturn(boolean testOnReturn) { this.testOnReturn = testOnReturn; } public boolean isPoolPreparedStatements() { return poolPreparedStatements; } public void setPoolPreparedStatements(boolean poolPreparedStatements) { this.poolPreparedStatements = poolPreparedStatements; } public int getMaxOpenPreparedStatements() { return maxOpenPreparedStatements; } public void setMaxOpenPreparedStatements(int maxOpenPreparedStatements) { this.maxOpenPreparedStatements = maxOpenPreparedStatements; } public boolean isAsyncInit() { return asyncInit; } public void setAsyncInit(boolean asyncInit) { this.asyncInit = asyncInit; } } 2. 在application.yml中配置配置信息 spring: # 数据源 datasource: url: jdbc:mysql://localhost:3306/springboot?useUnicode=true&characterEncoding=utf8 username: root password: 123456 driver-class-name: com.mysql.jdbc.Driver type: com.alibaba.druid.pool.DruidDataSource platform: mysql initialSize: 5 minIdle: 10 maxActive: 20 maxWait: 60000 timeBetweenEvictionRunsMillis: 2000 minEvictableIdleTimeMillis: 600000 maxEvictableIdleTimeMillis: 900000 testWhileIdle: true testOnBorrow: false testOnReturn: false poolPreparedStatements: true maxOpenPreparedStatements: 20 asyncInit: true filters: stat,wall,log4j logSlowSql: true 3. 添加过滤器DruidFilter.java druid除了数据连接池,还集成了对站点的URL进行统计的功能,利用filter忽略druid对某些资源的统计 package com.futao.springmvcdemo.controller; import com.alibaba.druid.support.http.WebStatFilter; import javax.servlet.annotation.WebFilter; import javax.servlet.annotation.WebInitParam; /** * @author futao * Created on 2018/10/11. */ @WebFilter(filterName = "DruidFilter", urlPatterns = "/*", initParams = { @WebInitParam(name = "exclusions", value = "*.js,*.gif,*.jpg,*.bmp,*.png,*.css,*.ico,/druid/*")// 忽略资源 }) public class DruidFilter extends WebStatFilter { } 4. 定义DruidServlet.java显示druid管理页面 package com.futao.springmvcdemo.controller; import com.alibaba.druid.support.http.StatViewServlet; import javax.servlet.annotation.WebInitParam; import javax.servlet.annotation.WebServlet; /** * @author futao * Created on 2018/10/11. */ @WebServlet(urlPatterns = "/druid/*", initParams = { @WebInitParam(name = "allow", value = "127.0.0.1"),// IP白名单 (没有配置或者为空,则允许所有访问) @WebInitParam(name = "deny", value = "192.168.16.111"),// IP黑名单 (存在共同时,deny优先于allow) @WebInitParam(name = "loginUsername", value = "admin"),// 用户名 @WebInitParam(name = "loginPassword", value = "admin"),// 密码 @WebInitParam(name = "resetEnable", value = "false")// 禁用HTML页面上的“Reset All”功能 }) public class DruidServlet extends StatViewServlet { private static final long serialVersionUID = -6085007333934055609L; } 5. 对于基于注解的Filter和Servlet需要在SpringBoot的启动类上打上自动扫描注解@@ServletComponentScan package com.futao.springmvcdemo; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.web.servlet.ServletComponentScan; /** * @author futao * ServletComponentScan 开启servlet和filter */ @SpringBootApplication @ServletComponentScan @MapperScan("com.futao.springmvcdemo.dao") //@EnableAspectJAutoProxy public class SpringmvcdemoApplication { public static void main(String[] args) { SpringApplication.run(SpringmvcdemoApplication.class, args); } } 截一波运行图

优秀的个人博客,低调大师

关于SpringMVC整合freemarker报错问题

错误信息: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'FreeMarkerConfigurer' defined in class path resource [spring-mvc.xml]: Invocation of init method failed; nested exception is java.lang.NoSuchMethodError: freemarker.template.Configuration.<init>(Lfreemarker/template/Version;)V 报错原因:freemarker版本问题 通常情况下将其换为2.3.23 如下所示: <dependency> <groupId>org.freemarker</groupId> <artifactId>freemarker</artifactId> <version>2.3.23</version> </dependency> 即可解决该问题

优秀的个人博客,低调大师

idea整合 spring boot jsp mybatis

spring boot 开发起来确实要简单许多 ,spring boot 包含了 spring mvc ;内置tomcat ;启动只需要主方法即可 1.使用idea新建一个spring boot项目 file----new 一个project 选择 Spring Initializr 然后next 最下面是pom.xml <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>demo</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>demo</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.3.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- tomcat 的支持.--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.tomcat.embed</groupId> <artifactId>tomcat-embed-jasper</artifactId> <scope>provided</scope> </dependency> <!-- MySql驱动 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.21</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.9</version> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.3.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>

优秀的个人博客,低调大师

SpringBoot 整合(五)Swagger2

日常我们开发完后端接口,如果是返回restful,写API文档是免不了的,Swagger可以帮我们解决大多数问题(自动生成API文档)。 他会帮我们生成一个html页面,大概就是这个样子。 好了,开始正文,如果你觉得有需要的话,往下看。 1. 添加依赖 <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.6.1</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.6.1</version> </dependency> 2. 修改启动项 添加注解 @EnableSwagger2 //开启swagger文档生成 3. 给Controller或者字段添加注释 3.1 给Controller方法添加注释。 @ApiOperation(value = "条件查询用户") @GetMapping("/user") @JsonView(User.UserSimpleView.class) public List query(UserQueryCondition condition, @PageableDefault(page = 2,size = 7,sort = "username,asc")Pageable pageable){ System.out.println(ReflectionToStringBuilder.toString(condition, ToStringStyle.DEFAULT_STYLE)); List<User> users = new ArrayList<>(); users.add(new User()); users.add(new User()); users.add(new User()); return users; } 然后访问http://127.0.0.1:8080/swagger-ui.html 3.2 给方法中的字段添加注释 方法一: @RequestMapping("/user/{id:\\d+}") @ApiImplicitParam(name = "id",value = "用户id") public User getInfo( @PathVariable String id){ User user = new User(); user.setUsername("FantJ"); return user; } 方法二: @RequestMapping("/user/{id:\\d+}") public User getInfo(@ApiParam("用户id") @PathVariable String id){ User user = new User(); user.setUsername("FantJ"); return user; } 方法一是再方法上面加注解,方法二是再参数位加注解。 3.3 给实体类的属性添加注释 @ApiModelProperty("用户名") private String username; 最后所有注解的总结 @Api:修饰整个类,描述Controller的作用 @ApiOperation:描述一个类的一个方法,或者说一个接口 @ApiParam:单个参数描述 @ApiModel:用对象来接收参数 @ApiProperty:用对象接收参数时,描述对象的一个字段 @ApiResponse:HTTP响应其中1个描述 @ApiResponses:HTTP响应整体描述 @ApiIgnore:使用该注解忽略这个API @ApiError :发生错误返回的信息 @ApiImplicitParam:一个请求参数 @ApiImplicitParams:多个请求参数 生产中遇到的问题集锦 1. url是127.0.0.1,但是服务在云主机上。 那如何来配置这个url呢?我们添加一个配置类 package com.tyut.web.config; import io.swagger.annotations.Contact; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import springfox.documentation.builders.ApiInfoBuilder; import springfox.documentation.builders.RequestHandlerSelectors; import springfox.documentation.service.ApiInfo; import springfox.documentation.spi.DocumentationType; import springfox.documentation.spring.web.plugins.Docket; import springfox.documentation.swagger2.annotations.EnableSwagger2; /** * Created by Fant.J. * 2018/4/30 17:20 */ @Configuration @EnableSwagger2 public class SwaggerConfig { public static final String SWAGGER_SCAN_BASE_PACKAGE = "com.tyut.web.controller"; public static final String VERSION = "1.0.0"; ApiInfo apiInfo() { return new ApiInfoBuilder() .title("Swagger API") .description("This is to show api description") .license("Apache 2.0") .licenseUrl("http://www.apache.org/licenses/LICENSE-2.0.html") .termsOfServiceUrl("") .version(VERSION) // .contact(new Contact("","", "844072586@qq.com")) 联系方式 .build(); } @Bean public Docket customImplementation(){ return new Docket(DocumentationType.SWAGGER_2) .select() .apis(RequestHandlerSelectors.basePackage(SWAGGER_SCAN_BASE_PACKAGE)) .build() .host("47.xxx.xxx.96") .directModelSubstitute(org.joda.time.LocalDate.class, java.sql.Date.class) .directModelSubstitute(org.joda.time.DateTime.class, java.util.Date.class) .apiInfo(apiInfo()); } } 2. 修改controller描述 在controller上加注解@Api(description = "公告API") 介绍下我的所有文集: 流行框架 SpringCloudspringbootnginxredis 底层实现原理: Java NIO教程Java reflection 反射详解Java并发学习笔录Java Servlet教程jdbc组件详解Java NIO教程Java语言/版本 研究

优秀的个人博客,低调大师

Spark 实时计算整合案例

1.概述 最近有同学问道,除了使用 Storm 充当实时计算的模型外,还有木有其他的方式来实现实时计算的业务。了解到,在使用 Storm 时,需要编写基于编程语言的代码。比如,要实现一个流水指标的统计,需要去编写相应的业务代码,能不能有一种简便的方式来实现这一需求。在解答了该同学的疑惑后,整理了该实现方案的一个案例,供后面的同学学习参考。 2.内容 实现该方案,整体的流程是不变的,我这里只是替换了其计算模型,将 Storm 替换为 Spark,原先的数据收集,存储依然可以保留。 2.1 Spark Overview Spark 出来也是很久了,说起它,应该并不会陌生。它是一个开源的类似于 Hadoop MapReduce 的通用并行计算模型,它拥有 Hadoop MapReduce 所具有的有点,但与其不同的是,MapReduce 的 JOB 中间输出结果可以保存在内存中,不再需要回写磁盘,因而,Spark 能更好的适用于需要迭代的业务场景。 2.2 Flow 上面只是对 Spark 进行了一个简要的概述,让大家知道其作用,由于本篇博客的主要内容并不是讲述 Spark 的工作原理和计算方法,多的内容,这里笔者就不再赘述,若是大家想详细了解 Spark 的相关内容,可参考官方文档。[参考地址] 接下来,笔者为大家呈现本案例的一个实现流程图,如下图所示: 通过上图,我们可以看出,首先是采集上报的日志数据,将其存放于消息中间件,这里消息中间件采用的是 Kafka,然后在使用计算模型按照业务指标实现相应的计算内容,最后是将计算后的结果进行持久化,DB 的选择可以多样化,这里笔者就直接使用了 Redis 来作为演示的存储介质,大家所示在使用中,可以替换该存储介质,比如将结果存放到 HDFS,HBase Cluster,或是 MySQL 等都行。这里,我们使用 Spark SQL 来替换掉 Storm 的业务实现编写。 3.实现 在介绍完上面的内容后,我们接下来就去实现该内容,首先我们要生产数据源,实际的场景下,会有上报好的日志数据,这里,我们就直接写一个模拟数据类,实现代码如下所示: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 object KafkaIPLoginProducer { private val uid = Array( "123dfe" , "234weq" , "213ssf" ) private val random = new Random() private var pointer = - 1 def getUserID() : String = { pointer = pointer + 1 if (pointer > = users.length) { pointer = 0 uid(pointer) } else { uid(pointer) } } def plat() : String = { random.nextInt( 10 ) + "10" } def ip() : String = { random.nextInt( 10 ) + ".12.1.211" } def country() : String = { "中国" + random.nextInt( 10 ) } def city() : String = { "深圳" + random.nextInt( 10 ) } def location() : JSONArray = { JSON.parseArray( "[" + random.nextInt( 10 ) + "," + random.nextInt( 10 ) + "]" ) } def main(args : Array[String]) : Unit = { val topic = "test_data3" val brokers = "dn1:9092,dn2:9092,dn3:9092" val props = new Properties() props.put( "metadata.broker.list" , brokers) props.put( "serializer.class" , "kafka.serializer.StringEncoder" ) val kafkaConfig = new ProducerConfig(props) val producer = new Producer[String, String](kafkaConfig) while ( true ) { val event = new JSONObject() event .put( "_plat" , "1001" ) .put( "_uid" , "10001" ) .put( "_tm" , (System.currentTimeMillis / 1000 ).toString()) .put( "ip" , ip) .put( "country" , country) .put( "city" , city) .put( "location" , JSON.parseArray( "[0,1]" )) println( "Message sent: " + event) producer.send( new KeyedMessage[String, String](topic, event.toString)) event .put( "_plat" , "1001" ) .put( "_uid" , "10001" ) .put( "_tm" , (System.currentTimeMillis / 1000 ).toString()) .put( "ip" , ip) .put( "country" , country) .put( "city" , city) .put( "location" , JSON.parseArray( "[0,1]" )) println( "Message sent: " + event) producer.send( new KeyedMessage[String, String](topic, event.toString)) event .put( "_plat" , "1001" ) .put( "_uid" , "10002" ) .put( "_tm" , (System.currentTimeMillis / 1000 ).toString()) .put( "ip" , ip) .put( "country" , country) .put( "city" , city) .put( "location" , JSON.parseArray( "[0,1]" )) println( "Message sent: " + event) producer.send( new KeyedMessage[String, String](topic, event.toString)) event .put( "_plat" , "1002" ) .put( "_uid" , "10001" ) .put( "_tm" , (System.currentTimeMillis / 1000 ).toString()) .put( "ip" , ip) .put( "country" , country) .put( "city" , city) .put( "location" , JSON.parseArray( "[0,1]" )) println( "Message sent: " + event) producer.send( new KeyedMessage[String, String](topic, event.toString)) Thread.sleep( 30000 ) } } } 上面代码,通过 Thread.sleep() 来控制数据生产的速度。接下来,我们来看看如何实现每个用户在各个区域所分布的情况,它是按照坐标分组,平台和用户ID过滤进行累加次数,逻辑用 SQL 实现较为简单,关键是在实现过程中需要注意的一些问题,比如对象的序列化问题。这里,细节的问题,我们先不讨论,先看下实现的代码,如下所示: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 object IPLoginAnalytics { def main(args : Array[String]) : Unit = { val sdf = new SimpleDateFormat( "yyyyMMdd" ) var masterUrl = "local[2]" if (args.length > 0 ) { masterUrl = args( 0 ) } // Create a StreamingContext with the given master URL val conf = new SparkConf().setMaster(masterUrl).setAppName( "IPLoginCountStat" ) val ssc = new StreamingContext(conf, Seconds( 5 )) // Kafka configurations val topics = Set( "test_data3" ) val brokers = "dn1:9092,dn2:9092,dn3:9092" val kafkaParams = Map[String, String]( "metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder" ) val ipLoginHashKey = "mf::ip::login::" + sdf.format( new Date()) // Create a direct stream val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) val events = kafkaStream.flatMap(line = > { val data = JSONObject.fromObject(line. _ 2 ) Some(data) }) def func(iter : Iterator[(String, String)]) : Unit = { while (iter.hasNext) { val item = iter.next() println(item. _ 1 + "," + item. _ 2 ) } } events.foreachRDD { rdd = > // Get the singleton instance of SQLContext val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) import sqlContext.implicits. _ // Convert RDD[String] to DataFrame val wordsDataFrame = rdd.map(f = > Record(f.getString( "_plat" ), f.getString( "_uid" ), f.getString( "_tm" ), f.getString( "country" ), f.getString( "location" ))).toDF() // Register as table wordsDataFrame.registerTempTable( "events" ) // Do word count on table using SQL and print it val wordCountsDataFrame = sqlContext.sql( "select location,count(distinct plat,uid) as value from events where from_unixtime(tm,'yyyyMMdd') = '" + sdf.format( new Date()) + "' group by location" ) var results = wordCountsDataFrame.collect().iterator /** * Internal Redis client for managing Redis connection {@link Jedis} based on {@link RedisPool} */ object InternalRedisClient extends Serializable { @ transient private var pool : JedisPool = null def makePool(redisHost : String, redisPort : Int, redisTimeout : Int, maxTotal : Int, maxIdle : Int, minIdle : Int) : Unit = { makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle, true , false , 10000 ) } def makePool(redisHost : String, redisPort : Int, redisTimeout : Int, maxTotal : Int, maxIdle : Int, minIdle : Int, testOnBorrow : Boolean, testOnReturn : Boolean, maxWaitMillis : Long) : Unit = { if (pool == null ) { val poolConfig = new GenericObjectPoolConfig() poolConfig.setMaxTotal(maxTotal) poolConfig.setMaxIdle(maxIdle) poolConfig.setMinIdle(minIdle) poolConfig.setTestOnBorrow(testOnBorrow) poolConfig.setTestOnReturn(testOnReturn) poolConfig.setMaxWaitMillis(maxWaitMillis) pool = new JedisPool(poolConfig, redisHost, redisPort, redisTimeout) val hook = new Thread { override def run = pool.destroy() } sys.addShutdownHook(hook.run) } } def getPool : JedisPool = { assert(pool ! = null ) pool } } // Redis configurations val maxTotal = 10 val maxIdle = 10 val minIdle = 1 val redisHost = "dn1" val redisPort = 6379 val redisTimeout = 30000 InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle) val jedis = InternalRedisClient.getPool.getResource while (results.hasNext) { var item = results.next() var key = item.getString( 0 ) var value = item.getLong( 1 ) jedis.hincrBy(ipLoginHashKey, key, value) } } ssc.start() ssc.awaitTermination() } } /** Case class for converting RDD to DataFrame */ case class Record(plat : String, uid : String, tm : String, country : String, location : String) /** Lazily instantiated singleton instance of SQLContext */ object SQLContextSingleton { @ transient private var instance : SQLContext = _ def getInstance(sparkContext : SparkContext) : SQLContext = { if (instance == null ) { instance = new SQLContext(sparkContext) } instance } } 我们在开发环境进行测试的时候,使用 local[k] 部署模式,在本地启动 K 个 Worker 线程来进行计算,而这 K 个 Worker 在同一个 JVM 中,上面的示例,默认使用 local[k] 模式。这里我们需要普及一下 Spark 的架构,架构图来自 Spark 的官网,[链接地址] 这里,不管是在 local[k] 模式,Standalone 模式,还是 Mesos 或是 YARN 模式,整个 Spark Cluster 的结构都可以用改图来阐述,只是各个组件的运行环境略有不同,从而导致他们可能运行在分布式环境,本地环境,亦或是一个 JVM 实利当中。例如,在 local[k] 模式,上图表示在同一节点上的单个进程上的多个组件,而对于 YARN 模式,驱动程序是在 YARN Cluster 之外的节点上提交 Spark 应用,其他组件都是运行在 YARN Cluster 管理的节点上的。 而对于 Spark Cluster 部署应用后,在进行相关计算的时候会将 RDD 数据集上的函数发送到集群中的 Worker 上的 Executor,然而,这些函数做操作的对象必须是可序列化的。上述代码利用 Scala 的语言特性,解决了这一问题。 4.结果预览 在完成上述代码后,我们执行代码,看看预览结果如下,执行结果,如下所示: 4.1 启动生产线程 4.2 Redis 结果预览 5.总结 整体的实现内容不算太复杂,统计的业务指标,这里我们使用 SQL 来完成这部分工作,对比 Storm 来说,我们专注 SQL 的编写就好,难度不算太大。可操作性较为友好。 6.结束语 这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

优秀的个人博客,低调大师

RMDB与hadoop的实时整合

一、MySQL的Hadoop Applier 实现原理是:把hadoop作为MYSQL 的slave,实时把数据同步到hadoop,支持apache hadoop 通过分析MYSQL的binlog日志,在hdfs产生一个目录(同表名),所有的表记录都存储在一个文件中,用户的操作如插入,更新,删除都会产生一笔记录追加到文件末尾. 但如何利用hdfs上的这个数据,需要用户自己定义逻辑,把表中的数据插入到hbase表 详见:http://dev.mysql.com/tech-resources/articles/mysql-hadoop-applier.html 二、GoldenGate的HDFS Adapter Oracle GoldGate's 也有类似的工具,通过分析Trails File把数据实时同步到hadoop ORACLE官方网站提供了Hdfs Adapter,但不提供服务支持 详见:https://blogs.oracle.com/dataintegration/entry/streaming_relational_transactions_to_hadoop

优秀的个人博客,低调大师

SpringBoot整合MongoDB多数据源

依赖 // build.gradle implementation 'org.springframework.boot:spring-boot-starter-data-mongodb' 配置文件 # application.yml spring: data: mongodb: primary: uri: mongodb://localhost:27017/db1 secondary: uri: mongodb://localhost:27017/db2 主数据库配置 // PrimaryMongoConfig.java package com.fengwenyi.springboot_mongo_multi_source.config; import com.mongodb.MongoClientURI; import org.springframework.boot.autoconfigure.mongo.MongoProperties; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.data.mongodb.MongoDbFactory; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.SimpleMongoDbFactory; import org.springframework.data.mongodb.repository.config.EnableMongoRepositories; /** * MongoDB Primary Config * @author Erwin Feng * @since 2019-07-01 17:12 */ @Configuration @EnableMongoRepositories(basePackages = "com.fengwenyi.springboot_mongo_multi_source.primary", mongoTemplateRef = "primaryMongoTemplate") public class PrimaryMongoConfig { @Bean @Primary @ConfigurationProperties(prefix="spring.data.mongodb.primary") public MongoProperties primaryMongoProperties() { return new MongoProperties(); } @Primary @Bean(name = "primaryMongoTemplate") public MongoTemplate primaryMongoTemplate() throws Exception { return new MongoTemplate(primaryFactory(primaryMongoProperties())); } @Bean @Primary public MongoDbFactory primaryFactory(MongoProperties mongoProperties) throws Exception { return new SimpleMongoDbFactory(new MongoClientURI(primaryMongoProperties().getUri())); } } 副数据库配置 // SecondaryMongoConfig.java package com.fengwenyi.springboot_mongo_multi_source.config; import com.mongodb.MongoClientURI; import org.springframework.boot.autoconfigure.mongo.MongoProperties; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.data.mongodb.MongoDbFactory; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.SimpleMongoDbFactory; import org.springframework.data.mongodb.repository.config.EnableMongoRepositories; /** * MongoDB Secondary Config * @author Erwin Feng * @since 2019-07-01 17:12 */ @Configuration @EnableMongoRepositories(basePackages = "com.fengwenyi.springboot_mongo_multi_source.secondary", mongoTemplateRef = "secondaryMongoTemplate") public class SecondaryMongoConfig { @Bean @ConfigurationProperties(prefix="spring.data.mongodb.secondary") public MongoProperties secondaryMongoProperties() { return new MongoProperties(); } @Bean(name = "secondaryMongoTemplate") public MongoTemplate secondaryMongoTemplate() throws Exception { return new MongoTemplate(secondaryFactory(secondaryMongoProperties())); } @Bean public MongoDbFactory secondaryFactory(MongoProperties mongoProperties) throws Exception { return new SimpleMongoDbFactory(new MongoClientURI(secondaryMongoProperties().getUri())); } } 用户实体,副 // User.java package com.fengwenyi.springboot_mongo_multi_source.secondary.entity; import lombok.Data; import lombok.experimental.Accessors; import org.springframework.data.annotation.Id; import org.springframework.data.mongodb.core.mapping.Document; import java.io.Serializable; import java.time.Instant; /** * 用户 * @author Erwin Feng * @since 2019-07-01 17:15 */ @Data @Accessors(chain = true) @Document(collection = "t_user") public class User implements Serializable { private static final long serialVersionUID = -7229906944062898852L; /** ID */ @Id private String id; /** 用户名 */ private String username; /** 年龄 */ private Integer age; /** 注册时间 */ private Instant registerTime; } 用户查询仓库,副 // UserRepository.java package com.fengwenyi.springboot_mongo_multi_source.secondary.repository; import com.fengwenyi.springboot_mongo_multi_source.secondary.entity.User; import org.springframework.data.mongodb.repository.MongoRepository; import java.util.List; /** * 用户 * @author Erwin Feng * @since 2019-07-01 17:18 */ public interface UserRepository extends MongoRepository<User, String> { /** * 通过用户名查询 * @param username 用户名 * @return */ List<User> findAllByUsername(String username); } 登录日志实体,主 // LoginLog.java package com.fengwenyi.springboot_mongo_multi_source.primary.entity; import lombok.Data; import lombok.experimental.Accessors; import org.springframework.data.annotation.Id; import java.io.Serializable; import java.time.Instant; /** * 登录日志 * @author Erwin Feng * @since 2019-07-01 17:18 */ @Data @Accessors(chain = true) public class LoginLog implements Serializable { private static final long serialVersionUID = -6694661682102504919L; /** ID */ @Id private String id; /** 用户ID */ private String uid; /** 用户名 */ private String username; /** 登录时间 */ private Instant loginTime; } 登录日志查询仓库,主 // LoginLogRepository.java package com.fengwenyi.springboot_mongo_multi_source.primary.repository; import com.fengwenyi.springboot_mongo_multi_source.primary.entity.LoginLog; import org.springframework.data.mongodb.repository.MongoRepository; /** * 登录日志 * @author Erwin Feng * @since 2019-07-01 17:21 */ public interface LoginLogRepository extends MongoRepository<LoginLog, String> { } 初始化 // InitController.java package com.fengwenyi.springboot_mongo_multi_source.controller; import com.fengwenyi.springboot_mongo_multi_source.secondary.entity.User; import com.fengwenyi.springboot_mongo_multi_source.secondary.repository.UserRepository; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RestController; import javax.annotation.PostConstruct; import java.time.Instant; import java.util.ArrayList; import java.util.List; /** * 初始化工具类 * @author Erwin Feng * @since 2019-07-01 17:24 */ @RestController public class InitController { /** [mongo] 用户 */ @Autowired private UserRepository userRepository; @PostConstruct public void init() { List<User> all = userRepository.findAll(); if (all.size() > 0) return; userRepository.save(new User().setUsername("Zhangsan").setAge(20).setRegisterTime(Instant.now())); List<User> users = new ArrayList<>(); User u1 = new User().setUsername("u1").setAge(19).setRegisterTime(Instant.now()); User u2 = new User().setUsername("u2").setAge(20).setRegisterTime(Instant.now()); User u3 = new User().setUsername("u3").setAge(10).setRegisterTime(Instant.now()); users.add(u1); users.add(u2); users.add(u3); userRepository.saveAll(users); } } 测试代码 // TestController.java package com.fengwenyi.springboot_mongo_multi_source.controller; import com.fengwenyi.springboot_mongo_multi_source.primary.entity.LoginLog; import com.fengwenyi.springboot_mongo_multi_source.primary.repository.LoginLogRepository; import com.fengwenyi.springboot_mongo_multi_source.secondary.entity.User; import com.fengwenyi.springboot_mongo_multi_source.secondary.repository.UserRepository; import net.iutil.ApiResult; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.query.Query; import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.time.Instant; import java.util.List; /** * 测试 * @author Erwin Feng * @since 2019-07-01 17:28 */ @RestController @RequestMapping("/test") public class TestController { /** [mongo] 用户 */ @Autowired private UserRepository userRepository; /** [mongo] 登录日志 */ @Autowired private LoginLogRepository loginLogRepository; /** [mongo] */ @Autowired private MongoTemplate mongoTemplate; /** * 登录 * @param username * @return */ @GetMapping("/login") public ApiResult login(String username) { if (StringUtils.isEmpty(username)) return ApiResult.error().setMsg("用户名不能为空"); List<User> users = userRepository.findAllByUsername(username); if (users.size() == 1) { // 记录日志 loginLogRepository.save(new LoginLog().setUid(users.get(0).getId()).setUsername(username).setLoginTime(Instant.now())); return ApiResult.success(); } if (users.size() == 0) return ApiResult.error().setMsg("用户名查询失败"); return ApiResult.error().setMsg("用户异常"); } /** * 登录日志 * @return */ @GetMapping("/login-log") public ApiResult loginLog() { Query query = new Query(); List<LoginLog> loginLogs = mongoTemplate.find(query, LoginLog.class); return ApiResult.success(loginLogs); } } 测试用户登录 GET http://localhost:8080/test/login?username=Zhangsan 响应: { "code": 0, "msg": "Success" } 测试登录日志 GET http://localhost:8080/test/login-log 响应: { "code": 0, "msg": "Success", "data": [ { "id": "5d19d7f5cede54c46b6b20c5", "uid": "5d19d560cede54c45701e12a", "username": "Zhangsan", "loginTime": "2019-07-01T09:52:53.447Z" }, { "id": "5d19da82cede54c46f77579a", "uid": "5d19d560cede54c45701e12a", "username": "Zhangsan", "loginTime": "2019-07-01T10:03:46.496Z" }, { "id": "5d19df5fcede54c46f77579b", "uid": "5d19d560cede54c45701e12a", "username": "Zhangsan", "loginTime": "2019-07-01T10:24:31.272Z" }, { "id": "5d19df6acede54c46f77579c", "uid": "5d19d560cede54c45701e12b", "username": "u1", "loginTime": "2019-07-01T10:24:42.199Z" }, { "id": "5d19df6dcede54c46f77579d", "uid": "5d19d560cede54c45701e12d", "username": "u3", "loginTime": "2019-07-01T10:24:45.421Z" } ] } 代码 springboot-mongo-multi-source

资源下载

更多资源
Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

Oracle

Oracle

Oracle Database,又名Oracle RDBMS,或简称Oracle。是甲骨文公司的一款关系数据库管理系统。它是在数据库领域一直处于领先地位的产品。可以说Oracle数据库系统是目前世界上流行的关系数据库管理系统,系统可移植性好、使用方便、功能强,适用于各类大、中、小、微机环境。它是一种高效率、可靠性好的、适应高吞吐量的数据库方案。

Eclipse

Eclipse

Eclipse 是一个开放源代码的、基于Java的可扩展开发平台。就其本身而言,它只是一个框架和一组服务,用于通过插件组件构建开发环境。幸运的是,Eclipse 附带了一个标准的插件集,包括Java开发工具(Java Development Kit,JDK)。

Sublime Text

Sublime Text

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。