您现在的位置是:首页 > 文章详情

Flink实战:全局TopN分析与实现

日期:2019-11-27点击:546

在上一篇Flink实战: 窗口TopN分析与实现中实现了在一个窗口内的分组topN,但是在实际中也会遇到没有窗口期的topN,例如在一些实时大屏监控展示中,展示历史到现在所有的TopN数据,将这个称之为全局topN,仍然以计算区域维度销售额topN的商品为例,看一下全局TopN的实现方法。
先将需求分解为以下几步:

  • 按照区域areaId+商品gdsId分组,计算每个分组的累计销售额
  • 将得到的区域areaId+商品gdsId维度的销售额按照区域areaId分组,然后求得TopN的销售额商品,并且定时更新输出

与窗口TopN不同,全局TopN没有时间窗口的概念,也就没有时间的概念,因此使用ProcessingTime语义即可,并且也不能再使用Window算子来操作,但是在这个过程中需要完成数据累加操作与定时输出功能,选择ProcessFunction函数来完成,使用State保存中间结果数据,保证数据一致性语义,使用定时器来完成定时输出功能。

销售额统计

对数据流按照区域areaId+商品gdsId分组,不断累加销售额保存起来,然后输出到下游。

1. `val env =StreamExecutionEnvironment.getExecutionEnvironment` 2. `env.setParallelism(1)` 3. `val kafkaConfig =newProperties();` 4. `kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");` 5. `kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");` 7. `val consumer =newFlinkKafkaConsumer011[String]("topic1",newSimpleStringSchema(), kafkaConfig)` 8. `val orderStream = env.addSource(consumer)` 9. `.map(x =>{` 10. `val a = x.split(",")` 11. `Order(a(0), a(1).toLong, a(2), a(3).toDouble, a(4))` 12. `})` 14. `val salesStream=orderStream.keyBy(x =>{` 15. `x.areaId +"_"+ x.gdsId` 16. `}).process(newKeyedProcessFunction[String,Order,GdsSales](){` 18. `var orderState:ValueState[Double]= _` 19. `var orderStateDesc:ValueStateDescriptor[Double]= _` 21. `override def open(parameters:Configuration):Unit={` 22. `orderStateDesc =newValueStateDescriptor[Double]("order-state",TypeInformation.of(classOf[Double]))` 23. `orderState = getRuntimeContext.getState(orderStateDesc)` 24. `}` 26. `override def processElement(value:Order, ctx:KeyedProcessFunction[String,Order,GdsSales]#Context,out:Collector[GdsSales]):Unit={` 28. `val currV = orderState.value()` 29. `if(currV ==null){` 30. `orderState.update(value.amount)` 31. `}else{` 32. `val newV = currV + value.amount` 33. `orderState.update(newV)` 34. `}` 35. `out.collect(GdsSales.of(value.areaId, value.gdsId, orderState.value(), value.orderTime))` 36. `}` 37. `})` 

使用keyBy按照areaId+gdsId来分组,然后使用KeyedProcessFunction来完成累加操作。在KeyedProcessFunction里面定义了一个ValueState来保存每个分组的销售额,processElement完成销售额累加操作,并且不断更新ValueState与collect输出。
说明:这里使用ValueState来完成累加过程显得比较繁琐,可以使用ReducingState来替代,这里只是为了表现出累加这个过程。

区域TopN计算

上一步得到的salesStream是一个按照区域areaId+商品gdsId维度的销售额,并且是不断更新输出到下游的,接下来就需要完成TopN的计算,在Flink实战: 窗口TopN分析与实现中分析到TopN的计算不需要保存所有的结果数据,使用红黑树来模拟类似优先级队列功能即可,但是与其不同在于:窗口TopN每次计算TopN是一个全量的窗口结果,而全局TopN其销售额是会不断变动的,因此需要做以下逻辑判断:

  1. 如果TreeSet[GdsSales]包含该商品的销售额数据,则需要更新该商品销售额,这个过程包含判断商品gdsId是否存在与移除该GdsSales对象功能,但是TreeSet不具备直接判断gdsId是否存在功能,那么可以使用一种额外的数据结构Map, key为商品gdsId, value为商品销售额数据GdsSales,该value对应TreeSet[GdsSales]中数据
  2. 如果TreeSet[GdsSales]包含该商品的销售额数据,当TreeSet里面的数据到达N, 就获取第一个节点数据(最小值)与当前需要插入的数据进行比较,如果比其大,则直接舍弃,如果比其小,那么就将TreeSet中第一个节点数据删除,插入新的数据
    实现代码如下:
1. `salesStream.keyBy(_.getAreaId)` 2. `.process(newKeyedProcessFunction[String,GdsSales,Void]{` 3. `var topState:ValueState[java.util.TreeSet[GdsSales]]= _` 4. `var topStateDesc:ValueStateDescriptor[java.util.TreeSet[GdsSales]]= _` 6. `var mappingState:MapState[String,GdsSales]= _` 7. `var mappingStateDesc:MapStateDescriptor[String,GdsSales]= _` 8. `val interval:Long=60000` 9. `val N:Int=3` 10. `override def open(parameters:Configuration):Unit={` 11. `topStateDesc =newValueStateDescriptor[java.util.TreeSet[GdsSales]]("top-state",TypeInformation.of(classOf[java.util.TreeSet[GdsSales]]))` 12. `topState = getRuntimeContext.getState(topStateDesc)` 14. `mappingStateDesc =newMapStateDescriptor[String,GdsSales]("mapping-state",TypeInformation.of(classOf[String]),TypeInformation.of(classOf[GdsSales]))` 15. `mappingState = getRuntimeContext.getMapState(mappingStateDesc)` 16. `}` 17. `override def processElement(value:GdsSales, ctx:KeyedProcessFunction[String,GdsSales,Void]#Context,out:Collector[Void]):Unit={` 19. `val top = topState.value()` 20. `if(top ==null){` 21. `val topMap: util.TreeSet[GdsSales]=new util.TreeSet[GdsSales](newComparator[GdsSales]{` 22. `override def compare(o1:GdsSales, o2:GdsSales):Int=(o1.getAmount - o2.getAmount).toInt` 23. `})` 24. `topMap.add(value)` 25. `topState.update(topMap)` 26. `mappingState.put(value.getGdsId, value)` 27. `}else{` 28. `mappingState.contains(value.getGdsId) match {` 29. `case true=>{//已经存在该商品的销售数据` 30. `val oldV = mappingState.get(value.getGdsId)` 31. `mappingState.put(value.getGdsId, value)` 32. `val values = topState.value()` 33. `values.remove(oldV)` 34. `values.add(value)//更新旧的商品销售数据` 35. `topState.update(values)` 36. `}` 37. `case false=>{//不存在该商品销售数据` 38. `if(top.size()>= N){//已经达到N 则判断更新` 39. `val min = top.first()` 40. `if(value.getAmount > min.getAmount){` 41. `top.pollFirst()` 42. `top.add(value)` 43. `mappingState.put(value.getGdsId, value)` 44. `topState.update(top)` 45. `}` 46. `}else{//还未到达N则直接插入` 47. `top.add(value)` 48. `mappingState.put(value.getGdsId, value)` 49. `topState.update(top)` 50. `}` 51. `}}}}` 52. `})` 

在open中定义个两个state:ValueState与MapState, ValueState保存该区域下的TopN商品销售数据GdsSales,MapState保存了商品gdsId与商品销售数据GdsSale的对应关系。
在processElement中,首先会判断ValueState是否为空,如果为空则定义按照销售额比较升序排序的Comparator 的TreeSet,则走更新逻辑判断。

定时输出

到这里我们已经计算出了每个时刻的TopN数据,存储在ValueState[java.util.TreeSet[GdsSales]] 中,现在希望每隔1min将TopN的数据输出,可以使用在时间系统系列里面提供较为底层的直接获取到InternalTimeService来完成,由于ProcessFunction本身提供了定时调用功能,我们就按照在窗口实用触发器:ContinuousEventTimeTrigger中讲到的持续触发器的原理来实现,

1. `var fireState:ValueState[Long]= _` 2. `var fireStateDesc:ValueStateDescriptor[Long]= _` 3. `//放在open方法中` 4. `fireStateDesc =newValueStateDescriptor[Long]("fire-time",TypeInformation.of(classOf[Long]))` 5. `fireState = getRuntimeContext.getState(fireStateDesc)` 

定义了一个ValueState,保存每一次触发的时间,不使用ReducingState是因为没有Window里面在使用SessionWindow的合并机制。

1. `//放在processElement里面` 2. `val currTime = ctx.timerService().currentProcessingTime()` 3. `//1min输出一次` 4. `if(fireState.value()==null){` 5. `val start = currTime -(currTime % interval)` 6. `val nextFireTimestamp = start + interval` 7. `ctx.timerService().registerProcessingTimeTimer(nextFireTimestamp)` 8. `fireState.update(nextFireTimestamp)` 9. `}` 

对于每一个区域areaId(key)在processElement只需要注册一次即可。

1. `override def onTimer(timestamp:Long, ctx:KeyedProcessFunction[String,GdsSales,Void]#OnTimerContext,out:Collector[Void]):Unit={` 2. `println(timestamp +"===")` 3. `topState.value().foreach(x =>{` 4. `println(x)` 5. `})` 6. `val fireTimestamp = fireState.value()` 7. `if(fireTimestamp !=null&&(fireTimestamp == timestamp)){` 8. `fireState.clear()` 9. `fireState.update(timestamp + interval)` 10. `ctx.timerService().registerProcessingTimeTimer(timestamp + interval)` 11. `}` 12. `}` 

onTimer定时输出,并且注册下一个触发的时间点。

测试

准备数据

1. `//2019-11-16 21:25:10` 2. `orderId01,1573874530000,gdsId03,300,beijing` 3. `orderId02,1573874540000,gdsId01,100,beijing` 4. `orderId02,1573874540000,gdsId04,200,beijing` 5. `orderId02,1573874540000,gdsId02,500,beijing` 6. `orderId01,1573874530000,gdsId01,300,beijing` 

等到2019-11-16 21:26:00得到结果

1. `1573910760000===` 2. `GdsSales{areaId='beijing', gdsId='gdsId03', amount=300.0}` 3. `GdsSales{areaId='beijing', gdsId='gdsId01', amount=400.0}` 4. `GdsSales{areaId='beijing', gdsId='gdsId02', amount=500.0}` 

接着在生产一条数据

1. `orderId02,1573874540000,gdsId04,500,beijing` 

等到2019-11-16 21:27:00得到结果

1. `1573910820000===` 2. `GdsSales{areaId='beijing', gdsId='gdsId01', amount=400.0}` 3. `GdsSales{areaId='beijing', gdsId='gdsId02', amount=500.0}` 4. `GdsSales{areaId='beijing', gdsId='gdsId04', amount=700.0}` 

至此完成全局topN的全部实现。

总结

全局TopN要求状态保存所有的聚合数据,对于key比较多的情况,不管是销售额数据还是定时器数据都会占用比较多的内存,可以选择RocksDb作为StateBackend。

                                         ----------------------------------- 

精彩推文:

1. Flink中延时调用设计与实现[](http://mp.weixin.qq.com/s?__biz=MzU5MTc1NDUyOA==&mid=2247483943&idx=1&sn=d3e1002255cbd6cfe16855639f82b80f&chksm=fe2b6668c95cef7e3639c575ac8cb899e7317f82c2b446a23335c8f33dc97a968ea7f345b914&scene=21#wechat_redirect)

2. Flink维表关联系列之Hbase维表关联:LRU策略

3. 你应该了解的Watermark

4. Flink exactly-once系列之事务性输出实现

5. Flink时间系统系列之实例讲解:如何做定时输出

关注回复Flink获取系列文章~

image

原文链接:https://yq.aliyun.com/articles/728344
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章