Alertmanager 源码阅读分析篇(2)Alerts接收 这里的Alertmanager
的告警接收,我们主要讲的是Prometheus
的告警怎么发送到Alertmanager
的。但是我们不关心 Prometheus
的发送,我们关心的是Alertmanager
的接收。
我们先看到api/v1
里面的注册接口:
1 2 3 4 5 6 7 8 9 r.Options("/*path" , wrap(func(w http.ResponseWriter, r *http.Request) {})) r.Get("/status" , wrap(api.status)) r.Get("/receivers" , wrap(api.receivers)) r.Get("/alerts" , wrap(api.listAlerts)) r.Post("/alerts" , wrap(api.addAlerts)) r.Get("/silences" , wrap(api.listSilences)) r.Post("/silences" , wrap(api.setSilence)) r.Get("/silence/:sid" , wrap(api.getSilence)) r.Del("/silence/:sid" , wrap(api.delSilence))
可以看到一个 POST
请求的,/alerts
接口。这就是我们的Prometheus
产生告警后发送到Alertmanager
的入口。现在我们跟着这个接口往里面走。可以看到这个:
1 2 3 4 5 6 7 8 9 10 11 func (api *API) addAlerts (w http.ResponseWriter, r *http.Request) { var alerts []*types.Alert if err := api.receive(r, &alerts); err != nil { api.respondError(w, apiError{ typ: errorBadData, err: err, }, nil ) return } api.insertAlerts(w, r, alerts...) }
这个很好理解,这就是把POST
中的 Body
提取出来,按照结构体Alert
的形式序列化。完成后就执行了insert
操作。看这个 insert
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 func (api *API) insertAlerts (w http.ResponseWriter, r *http.Request, alerts ...*types.Alert) { now := time.Now() api.mtx.RLock() resolveTimeout := time.Duration(api.config.Global.ResolveTimeout) api.mtx.RUnlock() for _, alert := range alerts { alert.UpdatedAt = now if alert.StartsAt.IsZero() { if alert.EndsAt.IsZero() { alert.StartsAt = now } else { alert.StartsAt = alert.EndsAt } } if alert.EndsAt.IsZero() { alert.Timeout = true alert.EndsAt = now.Add(resolveTimeout) } if alert.EndsAt.After(time.Now()) { api.m.Firing().Inc() } else { api.m.Resolved().Inc() } } var ( validAlerts = make ([]*types.Alert, 0 , len (alerts)) validationErrs = &types.MultiError{} ) for _, a := range alerts { removeEmptyLabels(a.Labels) if err := a.Validate(); err != nil { validationErrs.Add(err) api.m.Invalid().Inc() continue } validAlerts = append (validAlerts, a) } if err := api.alerts.Put(validAlerts...); err != nil { api.respondError(w, apiError{ typ: errorInternal, err: err, }, nil ) return } if validationErrs.Len() > 0 { api.respondError(w, apiError{ typ: errorBadData, err: validationErrs, }, nil ) return } api.respond(w, nil ) }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func (a *Alert) Validate () error { if a.StartsAt.IsZero() { return fmt.Errorf("start time missing" ) } if !a.EndsAt.IsZero() && a.EndsAt.Before(a.StartsAt) { return fmt.Errorf("start time must be before end time" ) } if err := a.Labels.Validate(); err != nil { return fmt.Errorf("invalid label set: %s" , err) } if len (a.Labels) == 0 { return fmt.Errorf("at least one label pair required" ) } if err := a.Annotations.Validate(); err != nil { return fmt.Errorf("invalid annotations: %s" , err) } return nil }
前面,我们在 insertAlert
里面有谈到Put
方法,现在我们来看一下这个 Put
方法:
1 2 3 4 5 type Alerts struct { sync.Mutex c map [model.Fingerprint]*types.Alert cb func ([]*types.Alert) }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 func (a *Alerts) Put (alerts ...*types.Alert) error { for _, alert := range alerts { fp := alert.Fingerprint() if old, err := a.alerts.Get(fp); err == nil { if (alert.EndsAt.After(old.StartsAt) && alert.EndsAt.Before(old.EndsAt)) || (alert.StartsAt.After(old.StartsAt) && alert.StartsAt.Before(old.EndsAt)) { alert = old.Merge(alert) } } if err := a.alerts.Set(alert); err != nil { level.Error(a.logger).Log("msg" , "error on set alert" , "err" , err) continue } a.mtx.Lock() for _, l := range a.listeners { select { case l.alerts <- alert: case <-l.done: } } a.mtx.Unlock() } return nil }
在阅读这个方法前,先知道这个方法是在那个文件目录下的,当前的这个方法是在provider/mem/mem.go
,在上一篇《Alertmanager
源码阅读分析篇(1)告警流程剖析》我们有说到,API 过来的 数据 会先 缓存到 ALert Provider
, 这里开始做这个缓存了。看看是怎么缓存的。我们直接看源码,说明会写在注释里面。
我们现在找到了 数据到哪里去了,现在看 l.alerts
是怎么来的:
1 2 3 4 type listeningAlerts struct { alerts chan *types.Alert done chan struct {} }
这是他的结构体,那这个是在哪里来的?在找下去:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (a *Alerts) Subscribe () provider .AlertIterator { a.mtx.Lock() defer a.mtx.Unlock() var ( done = make (chan struct {}) alerts = a.alerts.List() ch = make (chan *types.Alert, max(len (alerts), alertChannelLength)) ) for _, a := range alerts { ch <- a } a.listeners[a.next] = listeningAlerts{alerts: ch, done: done} a.next++ return provider.NewAlertIterator(ch, done, nil ) }
我们找到了这个,在 mem.go
里面用到这个的就三个地方,那就只能是这个了。这里就比较清楚了。这里将 我们接收到的告警进行Subscribe
。这里,我们根据前面说道的 架构图,可以猜测到这里的 Subscribe
应该是到 Dispatcher
。所以我们看下这个调用关系:
1 2 3 4 5 6 7 8 9 10 11 12 13 type Alerts interface { Subscribe() AlertIterator GetPending() AlertIterator Get(model.Fingerprint) (*types.Alert, error) Put(...*types.Alert) error }
这是provider
中的 Alerts
接口,按照我们的猜测,Subscribe
应该是到 Dispatcher
。然后我们发现这个 Subscribe
除了自身的调用外,有Dispatcher
和 Inhibitor
两个调用。根据流程,我们继续看Dispatcher
里面的。
1 2 3 4 5 6 7 8 9 10 func (d *Dispatcher) Run () { d.done = make (chan struct {}) d.mtx.Lock() d.aggrGroups = map [*Route]map [model.Fingerprint]*aggrGroup{} d.metrics.aggrGroups.Set(0 ) d.ctx, d.cancel = context.WithCancel(context.Background()) d.mtx.Unlock() d.run(d.alerts.Subscribe()) close (d.done) }
这里显然就是做批次处理了,我们的 Notify
也就是通知模块,就是在这个后面进行处理的。所以呢,要不要额外的分开处理呢?
同样在 Inhibitor
里:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func (ih *Inhibitor) Run () { var ( g run.Group ctx context.Context ) ih.mtx.Lock() ctx, ih.cancel = context.WithCancel(context.Background()) ih.mtx.Unlock() runCtx, runCancel := context.WithCancel(ctx) for _, rule := range ih.rules { go rule.scache.Run(runCtx, 15 *time.Minute) } g.Add(func () error { ih.run(runCtx) return nil }, func (err error) { runCancel() }) if err := g.Run(); err != nil { level.Warn(ih.logger).Log("msg" , "error running inhibitor" , "err" , err) } }
这个是做 告警 抑制的 , 但是笔者不用这个。嘿嘿