关于TopN这是在大数据领域或者非大数据环境下,都会用到的一个运营指标,也是不可或缺的指标之一。TopN在Flink和Spark里面,其实是将分成两种,一种是窗口TopN,另一种是全局TopN,两者一般会结合使用,根据我们的实际的需要去选择合适的TopN算法。这里主要讲窗口TopN.
准备工作
集群环境搭建(Hadoop,Flume,Kafka,Flink)
此处自行准备即可,我们的数据流方向为:product -> flume -> kafka -> flink

数据准备
数据格式: {orderID},{goodsID},{areaID},{amount},{status},{createTime}
数据格式说明:订单ID,商品ID,地域ID,订单价格,订单状态,订单创建时间
开发工具: PyCharm
数据产生的源程序:
flink-TopN
数据分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| orderStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Order](Time.seconds(30)) { override def extractTimestamp(element: Order): Long = element.createTime }).keyBy(order => { order.areaID + "_" + order.goodsID }).timeWindow(Time.minutes(1)) .reduce(new ReduceFunction[Order] { override def reduce(value1: Order, value2: Order): Order = { Order(value1.orderID, value1.goodsID, value1.areaID, value1.amount + value2.amount, value1.createTime) } }).keyBy(_.areaID) .timeWindow(Time.minutes(1)) .apply(new WindowFunction[Order, Order, Int, TimeWindow] { override def apply(key: Int, window: TimeWindow, input: Iterable[Order], out: Collector[Order]): Unit = { val orders = new TreeSet[Order](new Comparator[Order] { override def compare(o1: Order, o2: Order): Int = (o1.amount - o2.amount) })
input.foreach( result => { if (orders.size() >= 3) { val firstOrder: Order = orders.first() if (result.amount > firstOrder.amount) { orders.pollFirst() orders.add(result) } else { orders.add(result) } } } ) input.foreach(result => println("areaID: " + key + "order: " + result )) } })
|
上述程序只贴出部分,详细程序可参考 WindowsTopN
查看结果:
