您现在的位置是:首页 > 文章详情

Structed Streaming 小案例

日期:2018-06-17点击:531

1 首先是官网:

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.ht

2.注意官方文档中的着重表示的地方例如(黑体加重,斜体等)
我们都知道spark streaming 是基于spark core API
那Structed Streaming基于的是什么?

 没错就是Spark SQL。 所以DataFrame/DataSet API 包括hive的一些functions 不要太好用哦!!! 

下面这句话:“you can express your streaming computaion the same way
you would express a batch computation on a static data”
就保证了我们在实际开发时的成本比较低,当我们在开发一个的应用中包含流计算和批计算。

3.特点:

可扩展性,容错性(这都是必备的好吗?) 精确的一次语义 低延迟 

4.关键点:

Continuously processing 

databricks的blog上这篇文章写的也很好 https://databricks.com/blog/2016/07/28/continuous-applications-evolving-streaming-in-apache-spark-2-0.html

5.做到了端到端

延时1ms时能保证至at least one 的语义 延时100ms左右时,能做到 exactly once。 

6.然后就是流与其他的各种join, watermark的引入,总之和flink 相互发展促进。

Finally:最后贴一个小的 案例:

 object testSSApp extends App { val spark: SparkSession = SparkSession.builder().appName("baidu").master("local[2]").getOrCreate() // 结构化流 private val read = spark.readStream.format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") .option("maxOffsetperTrigger", "1000000") .option("kafkaConsumer.pollTimeoutMs", "1000") .load() //读取的kafak 数据为json格式 val result = read.selectExpr("CAST(value AS STRING)") .select( get_json_object(col("value"), path = "$.uri").alias("uri"), get_json_object(col("value"), path = "$.market").alias("market") ).groupBy(window(col("timestamp"),"5min","1min"), col("shop")) .agg(count("market").alias("uv"), approx_count_distinct("uri").alias("pv")).select("*") val query = result.writeStream.trigger(Trigger.ProcessingTime(10000)).outputMode("Update") .format("console").start() //这里展示以console输出,实际中是回写到kafak或者外部存储。 query.awaitTermination() } 
原文链接:https://yq.aliyun.com/articles/624479
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章