Alertmanager 源码分析(3)Dispatcher

在前一篇文章中,提到了provider 里面的 SubscribeDispatcher 里面的调用。也找到:

1
2
3
4
5
6
7
8
9
10
func (d *Dispatcher) Run() {
d.done = make(chan struct{})
d.mtx.Lock()
d.aggrGroups = map[*Route]map[model.Fingerprint]*aggrGroup{}
d.metrics.aggrGroups.Set(0)
d.ctx, d.cancel = context.WithCancel(context.Background())
d.mtx.Unlock()
d.run(d.alerts.Subscribe())
close(d.done)
}

首先,我们需要知道 谁调用的它,我们点进去,看一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, logger, dispMetrics)
routes.Walk(func(r *dispatch.Route) {
if r.RouteOpts.RepeatInterval > *retention {
level.Warn(configLogger).Log(
"msg",
"repeat_interval is greater than the data retention period. It can lead to notifications being repeated more often than expected.",
"repeat_interval",
r.RouteOpts.RepeatInterval,
"retention",
*retention,
"route",
r.Key(),
)
}
})

go disp.Run()

我们在cmd/alertmanager/main.go 里面的 run 找到了 Dispatcher 的初始化的。DIspatcher 执行了 Run 操作后,在 d.run(d.alerts.Subscribe()) 里面,我们看run() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
func (d *Dispatcher) run(it provider.AlertIterator) {
cleanup := time.NewTicker(30 * time.Second)
defer cleanup.Stop()

defer it.Close()

for {
select {
// 取到 我们的 alert
case alert, ok := <-it.Next():
if !ok {
// Iterator exhausted for some reason.
if err := it.Err(); err != nil {
level.Error(d.logger).Log("msg", "Error on alert update", "err", err)
}
return
}
// Debug 模式下的 数据输出
level.Debug(d.logger).Log("msg", "Received alert", "alert", alert)

// Log errors but keep trying.
if err := it.Err(); err != nil {
level.Error(d.logger).Log("msg", "Error on alert update", "err", err)
continue
}

now := time.Now()
// 这里做的是 路由的 匹配,根据不同的匹配发送到不同的人
for _, r := range d.route.Match(alert.Labels) {
// 这个看下面的
d.processAlert(alert, r)
}
d.metrics.processingDuration.Observe(time.Since(now).Seconds())

case <-cleanup.C:
d.mtx.Lock()
// aggrGroups map[*Route]map[model.Fingerprint]*aggrGroup
for _, groups := range d.aggrGroups {
// map[model.Fingerprint]*aggrGroup
// 判断我们的组内是否 空了
for _, ag := range groups {
if ag.empty() {
ag.stop()
delete(groups, ag.fingerprint())
d.metrics.aggrGroups.Dec()
}
}
}

d.mtx.Unlock()

case <-d.ctx.Done():
return
}
}
}

aggrGroup 将警报聚集到适用于一组通用路由选项的组中。 它以指定的时间间隔发出通知。

这里,我们先看一下这个 AlertIterator 的结构体:

1
2
3
4
5
6
7
8
9
type AlertIterator interface {
Iterator
// Next returns a channel that will be closed once the iterator is
// exhausted. It is not necessary to exhaust the iterator but Close must
// be called in any case to release resources used by the iterator (even
// if the iterator is exhausted).
Next() <-chan *types.Alert
}

现在再来看 run 方法。直接看注释吧。

run 中,有一个d.processAlert(alert, r),说明我们的告警信息在这里得到了处理,主要的作用就是:processAlert确定警报属于哪个聚合组并将其插入。

具体来看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
groupLabels := getGroupLabels(alert, route)
// 还是和之前的 alerts 一样,得到一个 hash
fp := groupLabels.Fingerprint()

d.mtx.Lock()
defer d.mtx.Unlock()
// 每个路由有不同的分组
group, ok := d.aggrGroups[route]
if !ok {
group = map[model.Fingerprint]*aggrGroup{}
d.aggrGroups[route] = group
}

// 如果不存在 需要的 组,那就创建一个 组
ag, ok := group[fp]
if !ok {
// 新建组,ag.next会用到配置里面的group_wait,
// 即首次启动后多少秒执行第一次 aggrGroup.run() 里面的逻辑
ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
group[fp] = ag
d.metrics.aggrGroups.Inc()
// 当前的 aggroup 开始工作了,告警被这里处理了
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
if err != nil {
lvl := level.Error(d.logger)
if ctx.Err() == context.Canceled {
lvl = level.Debug(d.logger)
}
lvl.Log("msg", "Notify for alerts failed", "num_alerts", len(alerts), "err", err)
}
return err == nil
})
}
// 将 告警 插入到 不同的 组 里面去
ag.insert(alert)
}

我们可以看到,这里的告警处理在 ag.run() 里面被处理了,怎么处理的,我们再看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

func (ag *aggrGroup) run(nf notifyFunc) {
defer close(ag.done)
defer ag.next.Stop()

for {
select {
case now := <-ag.next.C:
ctx, cancel := context.WithTimeout(ag.ctx, ag.timeout(ag.opts.GroupInterval))
ctx = notify.WithNow(ctx, now)
ctx = notify.WithGroupKey(ctx, ag.GroupKey())
ctx = notify.WithGroupLabels(ctx, ag.labels)
ctx = notify.WithReceiverName(ctx, ag.opts.Receiver)
ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval)

ag.mtx.Lock()
ag.next.Reset(ag.opts.GroupInterval)
ag.hasFlushed = true
ag.mtx.Unlock()
// 前面看了那么多,我们可以看到,和配置文件中的 配置是很像的
// 但是前面都没有涉及到 告警 的处理
// 直到这里的 flush,我们看这个 flush 的方法
ag.flush(func(alerts ...*types.Alert) bool {
return nf(ctx, alerts...)
})
cancel()

case <-ag.ctx.Done():
return
}
}
}

看看这里的 notifyFunc :

1
type notifyFunc func(context.Context, ...*types.Alert) bool

我们看这里的flush 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
if ag.empty() {
return
}

var (
alerts = ag.alerts.List()
alertsSlice = make(types.AlertSlice, 0, len(alerts))
now = time.Now()
)
for _, alert := range alerts {
a := *alert
// 这里是在确保当前的告警没有消失
if !a.ResolvedAt(now) {
a.EndsAt = time.Time{}
}
alertsSlice = append(alertsSlice, &a)
}
// 排个序 ?
sort.Stable(alertsSlice)

level.Debug(ag.logger).Log("msg", "flushing", "alerts", fmt.Sprintf("%v", alertsSlice))

if notify(alertsSlice...) {
for _, a := range alertsSlice {
fp := a.Fingerprint()
got, err := ag.alerts.Get(fp)
if err != nil {
level.Error(ag.logger).Log("msg", "failed to get alert", "err", err, "alert", a.String())
continue
}
if a.Resolved() && got.UpdatedAt == a.UpdatedAt {
if err := ag.alerts.Delete(fp); err != nil {
level.Error(ag.logger).Log("msg", "error on delete alert", "err", err, "alert", a.String())
}
}
}
}
}

我们这里的 notify 里面的方法,就是在前面 run 里面给过去的:

1
2
3
4
5
6
7
8
9
10
11
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
if err != nil {
lvl := level.Error(d.logger)
if ctx.Err() == context.Canceled {
lvl = level.Debug(d.logger)
}
lvl.Log("msg", "Notify for alerts failed", "num_alerts", len(alerts), "err", err)
}
return err == nil
})

这里的 Exec 方法 在 notify.go 里面,说明现在已经从 dispatcher 到了 notify 了。现在看一下这个Exec 的接口实现:

1
2
3
type Stage interface {
Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error)
}

后面进行Notify 里面的源码的分析