Alertmanager 源码阅读分析篇(2)Alerts接收

这里的Alertmanager 的告警接收,我们主要讲的是Prometheus的告警怎么发送到Alertmanager

的。但是我们不关心 Prometheus 的发送,我们关心的是Alertmanager 的接收。

我们先看到api/v1 里面的注册接口:

1
2
3
4
5
6
7
8
9
r.Options("/*path", wrap(func(w http.ResponseWriter, r *http.Request) {}))
r.Get("/status", wrap(api.status))
r.Get("/receivers", wrap(api.receivers))
r.Get("/alerts", wrap(api.listAlerts))
r.Post("/alerts", wrap(api.addAlerts))
r.Get("/silences", wrap(api.listSilences))
r.Post("/silences", wrap(api.setSilence))
r.Get("/silence/:sid", wrap(api.getSilence))
r.Del("/silence/:sid", wrap(api.delSilence))

可以看到一个 POST 请求的,/alerts 接口。这就是我们的Prometheus 产生告警后发送到Alertmanager 的入口。现在我们跟着这个接口往里面走。可以看到这个:

1
2
3
4
5
6
7
8
9
10
11
func (api *API) addAlerts(w http.ResponseWriter, r *http.Request) {
var alerts []*types.Alert
if err := api.receive(r, &alerts); err != nil {
api.respondError(w, apiError{
typ: errorBadData,
err: err,
}, nil)
return
}
api.insertAlerts(w, r, alerts...)
}

这个很好理解,这就是把POST 中的 Body 提取出来,按照结构体Alert 的形式序列化。完成后就执行了insert 操作。看这个 insert 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...*types.Alert) {
// 这里就不需要做太多的介绍了
now := time.Now()
api.mtx.RLock()
// 拿到 配置文件中的 ResolveTimeout , 也就是警报声明为已处理的时间
resolveTimeout := time.Duration(api.config.Global.ResolveTimeout)
api.mtx.RUnlock()
// 对 过来的 告警信息 进行轮训,这里的目的 其实就是为了更新当前告警的时间
for _, alert := range alerts {
alert.UpdatedAt = now
if alert.StartsAt.IsZero() {
if alert.EndsAt.IsZero() {
alert.StartsAt = now
} else {
alert.StartsAt = alert.EndsAt
}
}
if alert.EndsAt.IsZero() {
alert.Timeout = true
alert.EndsAt = now.Add(resolveTimeout)
}
if alert.EndsAt.After(time.Now()) {
api.m.Firing().Inc()
} else {
api.m.Resolved().Inc()
}
}
var (
validAlerts = make([]*types.Alert, 0, len(alerts))
validationErrs = &types.MultiError{}
)
for _, a := range alerts {
// 见名知意,清理掉单个 alert 空的 labels
removeEmptyLabels(a.Labels)
// 这里是在 判断 当前的这个 alert 是一个标准的不,这个方法在后面提供出来了
if err := a.Validate(); err != nil {
validationErrs.Add(err)
// 这个是 Alertmanager 自身的监控指标,对于不合理的 alert , alertmanager 自身做了指标监控
api.m.Invalid().Inc()
continue
}
// 合理的 告警 组合起来,成了一个数组
validAlerts = append(validAlerts, a)
}
// 这里是 重点了。这里的 Put 在下面讲,这里的注释篇幅不够。
if err := api.alerts.Put(validAlerts...); err != nil {
api.respondError(w, apiError{
typ: errorInternal,
err: err,
}, nil)
return
}
// 这不是重点,不多费口舌
if validationErrs.Len() > 0 {
api.respondError(w, apiError{
typ: errorBadData,
err: validationErrs,
}, nil)
return
}
api.respond(w, nil)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (a *Alert) Validate() error {
if a.StartsAt.IsZero() {
return fmt.Errorf("start time missing")
}
if !a.EndsAt.IsZero() && a.EndsAt.Before(a.StartsAt) {
return fmt.Errorf("start time must be before end time")
}
if err := a.Labels.Validate(); err != nil {
return fmt.Errorf("invalid label set: %s", err)
}
if len(a.Labels) == 0 {
return fmt.Errorf("at least one label pair required")
}
if err := a.Annotations.Validate(); err != nil {
return fmt.Errorf("invalid annotations: %s", err)
}
return nil
}

前面,我们在 insertAlert 里面有谈到Put 方法,现在我们来看一下这个 Put 方法:

1
2
3
4
5
type Alerts struct {
sync.Mutex
c map[model.Fingerprint]*types.Alert
cb func([]*types.Alert)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (a *Alerts) Put(alerts ...*types.Alert) error {
for _, alert := range alerts {
// 这里 会给 每一个 alert 生成 一个 unique hash
fp := alert.Fingerprint()
// 这里在根据生成的 hash 去取到一个 新的 alert , 返回的 alert 和 a.alert 是不一样的
// 同时,这里做了在 merage 前的一个 校验
if old, err := a.alerts.Get(fp); err == nil {
if (alert.EndsAt.After(old.StartsAt) && alert.EndsAt.Before(old.EndsAt)) ||
(alert.StartsAt.After(old.StartsAt) && alert.StartsAt.Before(old.EndsAt)) {
alert = old.Merge(alert)
}
}
// 这里讲 merge 后的 alert 添加到 map 里面去,key 为 hash 值
if err := a.alerts.Set(alert); err != nil {
level.Error(a.logger).Log("msg", "error on set alert", "err", err)
continue
}
a.mtx.Lock()
for _, l := range a.listeners {
select {
// 这是我们数据的去向
case l.alerts <- alert:
case <-l.done:
}
}
a.mtx.Unlock()
}
return nil
}

在阅读这个方法前,先知道这个方法是在那个文件目录下的,当前的这个方法是在provider/mem/mem.go ,在上一篇《Alertmanager源码阅读分析篇(1)告警流程剖析》我们有说到,API 过来的 数据 会先 缓存到 ALert Provider , 这里开始做这个缓存了。看看是怎么缓存的。我们直接看源码,说明会写在注释里面。

我们现在找到了 数据到哪里去了,现在看 l.alerts 是怎么来的:

1
2
3
4
type listeningAlerts struct {
alerts chan *types.Alert
done chan struct{}
}

这是他的结构体,那这个是在哪里来的?在找下去:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (a *Alerts) Subscribe() provider.AlertIterator {
a.mtx.Lock()
defer a.mtx.Unlock()

var (
done = make(chan struct{})
alerts = a.alerts.List()
ch = make(chan *types.Alert, max(len(alerts), alertChannelLength))
)

for _, a := range alerts {
ch <- a
}

a.listeners[a.next] = listeningAlerts{alerts: ch, done: done}
a.next++

return provider.NewAlertIterator(ch, done, nil)
}

我们找到了这个,在 mem.go 里面用到这个的就三个地方,那就只能是这个了。这里就比较清楚了。这里将 我们接收到的告警进行Subscribe。这里,我们根据前面说道的 架构图,可以猜测到这里的 Subscribe 应该是到 Dispatcher。所以我们看下这个调用关系:

1
2
3
4
5
6
7
8
9
10
11
12
13
type Alerts interface {
// Subscribe returns an iterator over active alerts that have not been
// resolved and successfully notified about.
// They are not guaranteed to be in chronological order.
Subscribe() AlertIterator
// GetPending returns an iterator over all alerts that have
// pending notifications.
GetPending() AlertIterator
// Get returns the alert for a given fingerprint.
Get(model.Fingerprint) (*types.Alert, error)
// Put adds the given alert to the set.
Put(...*types.Alert) error
}

这是provider 中的 Alerts 接口,按照我们的猜测,Subscribe 应该是到 Dispatcher。然后我们发现这个 Subscribe 除了自身的调用外,有DispatcherInhibitor 两个调用。根据流程,我们继续看Dispatcher里面的。

1
2
3
4
5
6
7
8
9
10
func (d *Dispatcher) Run() {
d.done = make(chan struct{})
d.mtx.Lock()
d.aggrGroups = map[*Route]map[model.Fingerprint]*aggrGroup{}
d.metrics.aggrGroups.Set(0)
d.ctx, d.cancel = context.WithCancel(context.Background())
d.mtx.Unlock()
d.run(d.alerts.Subscribe())
close(d.done)
}

这里显然就是做批次处理了,我们的 Notify 也就是通知模块,就是在这个后面进行处理的。所以呢,要不要额外的分开处理呢?

同样在 Inhibitor 里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (ih *Inhibitor) Run() {
var (
g run.Group
ctx context.Context
)
ih.mtx.Lock()
ctx, ih.cancel = context.WithCancel(context.Background())
ih.mtx.Unlock()
runCtx, runCancel := context.WithCancel(ctx)

for _, rule := range ih.rules {
go rule.scache.Run(runCtx, 15*time.Minute)
}
g.Add(func() error {
ih.run(runCtx)
return nil
}, func(err error) {
runCancel()
})
if err := g.Run(); err != nil {
level.Warn(ih.logger).Log("msg", "error running inhibitor", "err", err)
}
}

这个是做 告警 抑制的 , 但是笔者不用这个。嘿嘿