Structed Streaming 小案例
1 首先是官网:
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.ht
2.注意官方文档中的着重表示的地方例如(黑体加重,斜体等)
我们都知道spark streaming 是基于spark core API
那Structed Streaming基于的是什么?
没错就是Spark SQL。 所以DataFrame/DataSet API 包括hive的一些functions 不要太好用哦!!!
下面这句话:“you can express your streaming computaion the same way
you would express a batch computation on a static data”
就保证了我们在实际开发时的成本比较低,当我们在开发一个的应用中包含流计算和批计算。
3.特点:
可扩展性,容错性(这都是必备的好吗?) 精确的一次语义 低延迟
4.关键点:
Continuously processing
databricks的blog上这篇文章写的也很好 https://databricks.com/blog/2016/07/28/continuous-applications-evolving-streaming-in-apache-spark-2-0.html
5.做到了端到端
延时1ms时能保证至at least one 的语义 延时100ms左右时,能做到 exactly once。
6.然后就是流与其他的各种join, watermark的引入,总之和flink 相互发展促进。
Finally:最后贴一个小的 案例:
object testSSApp extends App { val spark: SparkSession = SparkSession.builder().appName("baidu").master("local[2]").getOrCreate() // 结构化流 private val read = spark.readStream.format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") .option("maxOffsetperTrigger", "1000000") .option("kafkaConsumer.pollTimeoutMs", "1000") .load() //读取的kafak 数据为json格式 val result = read.selectExpr("CAST(value AS STRING)") .select( get_json_object(col("value"), path = "$.uri").alias("uri"), get_json_object(col("value"), path = "$.market").alias("market") ).groupBy(window(col("timestamp"),"5min","1min"), col("shop")) .agg(count("market").alias("uv"), approx_count_distinct("uri").alias("pv")).select("*") val query = result.writeStream.trigger(Trigger.ProcessingTime(10000)).outputMode("Update") .format("console").start() //这里展示以console输出,实际中是回写到kafak或者外部存储。 query.awaitTermination() }
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
asp.Net Core免费开源分布式异常日志收集框架Exceptionless安装配置以及简单使用图文教程
最近在学习张善友老师的NanoFabric框架的时了解到Exceptionless :https://exceptionless.com/!因此学习了一下这个开源框架!下面对Exceptionless的学习做下笔记! Exceptionless是什么?能做什么呢? “Exceptionless”这个词的定义是:没有异常。Exceptionless可以为您的ASP.NET、Web API、WebFrm、WPF、控制台和MVC应用程序提供实时错误、特性和日志报告。它将收集的信息组织成简单的可操作的数据,这些数据将帮助你很方便的查看异常信息。还有最重要的是,它是开源的! Exceptionless的使用方式有哪些? 1.官网创建帐号,并新建应用程序以及项目,然后生成apikey(数据存储在Exceptionless) 2.自己搭建Exceptionless的环境,部署在本地(数据存储在本地) Exceptionless的运行环境有哪些要求?需要安装哪些软件,进行什么配置呢? .NET 4.6.1(安装了.net core 或者vs2017的话环境应该都没问题,不需要额外安装) Java JD...
- 下一篇
全域赋能和智慧全球,阿里巴巴大数据技术前瞻与案例
摘要:2016年1月20日,阿里云大数据产品家族数加在上海正式发布,在两年之后的今天,阿里云的大数据产品又有了什么样的发展变化呢?本文中阿里云高级产品专家班公就将为大家揭秘阿里巴巴大数据技术前瞻与案例。 本文内容根据演讲视频以及PPT整理而成。 本次分享将主要围绕以阿里巴巴大数据产品的整体布局进行分享,这里就会提到两个比较核心的关键词: 全域赋能和智慧全球。 大数据行业发展到今天已经远远超越本身这个行业,也不仅仅是服务于某一类或者某几类行业了,现在大数据已经深入到各行各业了。大数据领域最核心的一点其实就是计算能力的升级,其实现在计算力就像水电煤一样成为了一种新的能源,而能源的特点是具备普惠的能力。而计算力如果能够做到普惠,那就需要大幅度地提升核心计算力的性能、稳定性以及性价比。那么就需要关注在计算力方面的基础引擎的建设。其次,近年来人
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- 设置Eclipse缩进为4个空格,增强代码规范
- CentOS8安装Docker,最新的服务器搭配容器使用
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- CentOS关闭SELinux安全模块
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2整合Redis,开启缓存,提高访问速度
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程