关于TopN这是在大数据领域或者非大数据环境下,都会用到的一个运营指标,也是不可或缺的指标之一。TopN在Flink和Spark里面,其实是将分成两种,一种是窗口TopN,另一种是全局TopN,两者一般会结合使用,根据我们的实际的需要去选择合适的TopN算法。这里主要讲全局TopN.
准备工作 集群环境搭建(Hadoop,Flume,Kafka,Flink) 此处自行准备即可,我们的数据流方向为:product -> flume -> kafka -> flink
数据准备
数据格式: {orderID},{goodsID},{areaID},{amount},{createTime}
数据格式说明:订单ID,商品ID,地域ID,订单价格,订单创建时间
开发工具: PyCharm
数据产生的源程序:flink-TopN
数据分析 将得到的区域areaId+商品gdsId维度的销售额按照区域areaId分组,然后求得TopN的销售额商品,并且定时更新输出与窗口TopN不同,全局TopN没有时间窗口的概念,也就没有时间的概念,因此使用ProcessingTime语义即可,并且也不能再使用Window算子来操作,但是在这个过程中需要完成数据累加操作与定时输出功能,选择ProcessFunction函数来完成,使用State保存中间结果数据,保证数据一致性语义,使用定时器来完成定时输出功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 orderStream.keyBy(orders => { orders.areaID + "_" + orders.goodsID }).process(new KeyedProcessFunction[String, Order, GoodsSales]() { var orderState: ValueState[Int] = _ var orderStateDesc: ValueStateDescriptor[Int] = _ override def open (parameters: Configuration) : Unit = { orderStateDesc = new ValueStateDescriptor[Int]("order-state" , TypeInformation.of(classOf[Int])) orderState = getRuntimeContext.getState(orderStateDesc) } override def processElement (value: Order, ctx: KeyedProcessFunction[String, Order, GoodsSales]#Context, out: Collector[GoodsSales]) : Unit = { val currentState: Int = orderState.value() if (currentState == null ) { orderState.update(value.amount) } else { orderState.update(currentState + value.amount) } out.collect(GoodsSales(value.areaID, value.goodsID, orderState.value(), value.createTime)) } })
使用keyBy按照areaId+gdsId来分组,然后使用KeyedProcessFunction来完成累加操作。在KeyedProcessFunction里面定义了一个ValueState来保存每个分组的销售额,processElement完成销售额累加操作,并且不断更新ValueState与collect输出。说明: 这里使用ValueState来完成累加过程显得比较繁琐,可以使用ReducingState来替代,这里只是为了表现出累加这个过程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 salesStream.keyBy(_.areaID) .process(new KeyedProcessFunction[Int, GoodsSales, Void] { var topState: ValueState[util.TreeSet[GoodsSales]] = _ var topStateDesc: ValueStateDescriptor[util.TreeSet[GoodsSales]] = _ var mapState: MapState[Int, GoodsSales] = _ var mapStateDesc: MapStateDescriptor[Int, GoodsSales] = _ var fireState: ValueState[Long] = _ var fireStateDesc: ValueStateDescriptor[Long] = _ val internal: Long = 60000 val N: Int = 3 override def open (parameters: Configuration) : Unit = { topStateDesc = new ValueStateDescriptor[util.TreeSet[GoodsSales]]("top-state" , TypeInformation.of(classOf[util.TreeSet[GoodsSales]])) topState = getRuntimeContext.getState(topStateDesc) mapStateDesc = new MapStateDescriptor[Int, GoodsSales]("mapping-state" , TypeInformation.of(classOf[Int]), TypeInformation.of(classOf[GoodsSales])) mapState = getRuntimeContext.getMapState(mapStateDesc) fireStateDesc = new ValueStateDescriptor[Long]("time-state" , TypeInformation.of(classOf[Long])) fireState = getRuntimeContext.getState(fireStateDesc) } override def processElement (value: GoodsSales, ctx: KeyedProcessFunction[Int, GoodsSales, Void]#Context, out: Collector[Void]) : Unit = { val top: util.TreeSet[GoodsSales] = topState.value() val currTime = ctx.timerService().currentProcessingTime() if (top == null ) { val goods = new util.TreeSet[GoodsSales](new Comparator[GoodsSales] { override def compare (o1: GoodsSales, o2: GoodsSales) : Int = (o1.amount - o2.amount) }) goods.add(value) topState.update(goods) mapState.put(value.goodsID, value) } else { mapState.contains(value.goodsID) match { case true => { val oldValue: GoodsSales = mapState.get(value.goodsID) mapState.put(value.goodsID, value) val values: util.TreeSet[GoodsSales] = topState.value() values.remove(oldValue) values.add(value) topState.update(values) } case false => { if (top.size() >= N) { val first: GoodsSales = top.first() if (value.amount > first.amount) { top.pollFirst() top.add(value) mapState.put(value.goodsID, value) topState.update(top) } } else { top.add(value) mapState.put(value.goodsID, value) topState.update(top) } } } } if (currTime == null ) { val start: Long = currTime - (currTime % internal) val nextFireTimestamp: Long = start + internal ctx.timerService().registerProcessingTimeTimer(nextFireTimestamp) fireState.update(nextFireTimestamp) } } override def onTimer (timestamp: Long, ctx: KeyedProcessFunction[Int, GoodsSales, Void]#OnTimerContext, out: Collector[Void]) : Unit = { println("timestamp: " + timestamp) val goodsSales: util.TreeSet[GoodsSales] = topState.value() val resultIter: util.Iterator[GoodsSales] = goodsSales.iterator() while (resultIter.hasNext) { println(resultIter.next()) } val firstTimestamp: Long = fireState.value() if (firstTimestamp != null && (firstTimestamp == timestamp)) { fireState.clear() fireState.update(timestamp + internal) ctx.timerService().registerProcessingTimeTimer(timestamp + internal) } } })
至此,结束 具体程序,可参考 GlobalTopN