0%

大数据计算引擎之Flink 窗口TopN

关于TopN这是在大数据领域或者非大数据环境下,都会用到的一个运营指标,也是不可或缺的指标之一。TopN在Flink和Spark里面,其实是将分成两种,一种是窗口TopN,另一种是全局TopN,两者一般会结合使用,根据我们的实际的需要去选择合适的TopN算法。这里主要讲窗口TopN.

准备工作

集群环境搭建(Hadoop,Flume,Kafka,Flink)

此处自行准备即可,我们的数据流方向为:product -> flume -> kafka -> flink
flink-topn-data.png

数据准备

数据格式: {orderID},{goodsID},{areaID},{amount},{status},{createTime}

数据格式说明:订单ID,商品ID,地域ID,订单价格,订单状态,订单创建时间

开发工具: PyCharm

数据产生的源程序:
flink-TopN

数据分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
orderStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Order](Time.seconds(30)) {
override def extractTimestamp(element: Order): Long = element.createTime
}).keyBy(order => {
order.areaID + "_" + order.goodsID
}).timeWindow(Time.minutes(1))
.reduce(new ReduceFunction[Order] {
override def reduce(value1: Order, value2: Order): Order = {
Order(value1.orderID, value1.goodsID, value1.areaID, value1.amount + value2.amount, value1.createTime)
}
}).keyBy(_.areaID)
.timeWindow(Time.minutes(1))
.apply(new WindowFunction[Order, Order, Int, TimeWindow] {
override def apply(key: Int, window: TimeWindow, input: Iterable[Order], out: Collector[Order]): Unit = {
val orders = new TreeSet[Order](new Comparator[Order] {
override def compare(o1: Order, o2: Order): Int = (o1.amount - o2.amount)
})

input.foreach(
result => {
if (orders.size() >= 3) {
val firstOrder: Order = orders.first()
if (result.amount > firstOrder.amount) {
// 不要这个
orders.pollFirst()
orders.add(result)
} else {
orders.add(result)
}
}
}
)
input.foreach(result => println("areaID: " + key + "order: " + result ))
}
})

上述程序只贴出部分,详细程序可参考 WindowsTopN
查看结果:
windows-result

这是打赏的地方...

本文标题:大数据计算引擎之Flink 窗口TopN

文章作者:Mr.Sun

发布时间:2019年12月18日 - 11:23:56

最后更新:2020年06月15日 - 10:05:35

原始链接:http://www.blog.sun-iot.xyz/posts/9eb8d396

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

---------Thanks for your attention---------