0%

大数据计算引擎之Flink 全局TopN

关于TopN这是在大数据领域或者非大数据环境下,都会用到的一个运营指标,也是不可或缺的指标之一。TopN在Flink和Spark里面,其实是将分成两种,一种是窗口TopN,另一种是全局TopN,两者一般会结合使用,根据我们的实际的需要去选择合适的TopN算法。这里主要讲全局TopN.

准备工作

集群环境搭建(Hadoop,Flume,Kafka,Flink)

此处自行准备即可,我们的数据流方向为:product -> flume -> kafka -> flink
flink-topn-data.png

数据准备

数据格式: {orderID},{goodsID},{areaID},{amount},{createTime}

数据格式说明:订单ID,商品ID,地域ID,订单价格,订单创建时间

开发工具: PyCharm

数据产生的源程序:
flink-TopN

数据分析

将得到的区域areaId+商品gdsId维度的销售额按照区域areaId分组,然后求得TopN的销售额商品,并且定时更新输出与窗口TopN不同,全局TopN没有时间窗口的概念,也就没有时间的概念,因此使用ProcessingTime语义即可,并且也不能再使用Window算子来操作,但是在这个过程中需要完成数据累加操作与定时输出功能,选择ProcessFunction函数来完成,使用State保存中间结果数据,保证数据一致性语义,使用定时器来完成定时输出功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
orderStream.keyBy(orders => {
orders.areaID + "_" + orders.goodsID
}).process(new KeyedProcessFunction[String, Order, GoodsSales]() {
var orderState: ValueState[Int] = _
var orderStateDesc: ValueStateDescriptor[Int] = _

override def open(parameters: Configuration): Unit = {
orderStateDesc = new ValueStateDescriptor[Int]("order-state", TypeInformation.of(classOf[Int]))
orderState = getRuntimeContext.getState(orderStateDesc)
}

override def processElement(value: Order, ctx: KeyedProcessFunction[String, Order, GoodsSales]#Context, out: Collector[GoodsSales]): Unit = {
val currentState: Int = orderState.value()
if (currentState == null) {
orderState.update(value.amount)
} else {
orderState.update(currentState + value.amount)
}

out.collect(GoodsSales(value.areaID, value.goodsID, orderState.value(), value.createTime))
}
})

使用keyBy按照areaId+gdsId来分组,然后使用KeyedProcessFunction来完成累加操作。在KeyedProcessFunction里面定义了一个ValueState来保存每个分组的销售额,processElement完成销售额累加操作,并且不断更新ValueState与collect输出。
说明: 这里使用ValueState来完成累加过程显得比较繁琐,可以使用ReducingState来替代,这里只是为了表现出累加这个过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
salesStream.keyBy(_.areaID)
.process(new KeyedProcessFunction[Int, GoodsSales, Void] {
var topState: ValueState[util.TreeSet[GoodsSales]] = _
var topStateDesc: ValueStateDescriptor[util.TreeSet[GoodsSales]] = _

var mapState: MapState[Int, GoodsSales] = _
var mapStateDesc: MapStateDescriptor[Int, GoodsSales] = _

// 定时器的触发
var fireState: ValueState[Long] = _
var fireStateDesc: ValueStateDescriptor[Long] = _

val internal: Long = 60000
val N: Int = 3

override def open(parameters: Configuration): Unit = {
topStateDesc = new ValueStateDescriptor[util.TreeSet[GoodsSales]]("top-state", TypeInformation.of(classOf[util.TreeSet[GoodsSales]]))
topState = getRuntimeContext.getState(topStateDesc)

mapStateDesc = new MapStateDescriptor[Int, GoodsSales]("mapping-state", TypeInformation.of(classOf[Int]), TypeInformation.of(classOf[GoodsSales]))
mapState = getRuntimeContext.getMapState(mapStateDesc)

fireStateDesc = new ValueStateDescriptor[Long]("time-state", TypeInformation.of(classOf[Long]))
fireState = getRuntimeContext.getState(fireStateDesc)

}

override def processElement(value: GoodsSales, ctx: KeyedProcessFunction[Int, GoodsSales, Void]#Context, out: Collector[Void]): Unit = {
val top: util.TreeSet[GoodsSales] = topState.value()
val currTime = ctx.timerService().currentProcessingTime()

if (top == null) {
val goods = new util.TreeSet[GoodsSales](new Comparator[GoodsSales] {
override def compare(o1: GoodsSales, o2: GoodsSales): Int = (o1.amount - o2.amount)
})
goods.add(value)
topState.update(goods)

mapState.put(value.goodsID, value)
} else {
mapState.contains(value.goodsID) match {
// 模式匹配,已经存在商品的销售数量
case true => {
val oldValue: GoodsSales = mapState.get(value.goodsID)
mapState.put(value.goodsID, value)

val values: util.TreeSet[GoodsSales] = topState.value()
values.remove(oldValue) // 在当前状态里面移除掉
// 更新旧的商品销售数量
values.add(value)
topState.update(values)
}
case false => {
// 找 TopN
if (top.size() >= N) {
val first: GoodsSales = top.first()
if (value.amount > first.amount) {
top.pollFirst()
top.add(value)

mapState.put(value.goodsID, value)
topState.update(top)
}
} else {
top.add(value)
mapState.put(value.goodsID, value)
topState.update(top)
}
}
}
}


if (currTime == null) {
val start: Long = currTime - (currTime % internal)
val nextFireTimestamp: Long = start + internal
ctx.timerService().registerProcessingTimeTimer(nextFireTimestamp)
fireState.update(nextFireTimestamp)
}
}

override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Int, GoodsSales, Void]#OnTimerContext, out: Collector[Void]): Unit = {
println("timestamp: " + timestamp)

val goodsSales: util.TreeSet[GoodsSales] = topState.value()

val resultIter: util.Iterator[GoodsSales] = goodsSales.iterator()
while (resultIter.hasNext) {
println(resultIter.next())
}

val firstTimestamp: Long = fireState.value()

if (firstTimestamp != null && (firstTimestamp == timestamp)) {
fireState.clear()
fireState.update(timestamp + internal)
ctx.timerService().registerProcessingTimeTimer(timestamp + internal)
}

}
})

至此,结束
具体程序,可参考 GlobalTopN
global-result

这是打赏的地方...

本文标题:大数据计算引擎之Flink 全局TopN

文章作者:Mr.Sun

发布时间:2019年12月18日 - 11:24:16

最后更新:2020年06月15日 - 10:05:35

原始链接:http://www.blog.sun-iot.xyz/posts/23f93c01

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

---------Thanks for your attention---------