disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, logger, dispMetrics) routes.Walk(func(r *dispatch.Route) { if r.RouteOpts.RepeatInterval > *retention { level.Warn(configLogger).Log( "msg", "repeat_interval is greater than the data retention period. It can lead to notifications being repeated more often than expected.", "repeat_interval", r.RouteOpts.RepeatInterval, "retention", *retention, "route", r.Key(), ) } })
go disp.Run()
我们在cmd/alertmanager/main.go 里面的 run 找到了 Dispatcher 的初始化的。DIspatcher 执行了 Run 操作后,在 d.run(d.alerts.Subscribe()) 里面,我们看run() 方法:
for { select { // 取到 我们的 alert case alert, ok := <-it.Next(): if !ok { // Iterator exhausted for some reason. if err := it.Err(); err != nil { level.Error(d.logger).Log("msg", "Error on alert update", "err", err) } return } // Debug 模式下的 数据输出 level.Debug(d.logger).Log("msg", "Received alert", "alert", alert)
// Log errors but keep trying. if err := it.Err(); err != nil { level.Error(d.logger).Log("msg", "Error on alert update", "err", err) continue }
now := time.Now() // 这里做的是 路由的 匹配,根据不同的匹配发送到不同的人 for _, r := range d.route.Match(alert.Labels) { // 这个看下面的 d.processAlert(alert, r) } d.metrics.processingDuration.Observe(time.Since(now).Seconds())
case <-cleanup.C: d.mtx.Lock() // aggrGroups map[*Route]map[model.Fingerprint]*aggrGroup for _, groups := range d.aggrGroups { // map[model.Fingerprint]*aggrGroup // 判断我们的组内是否 空了 for _, ag := range groups { if ag.empty() { ag.stop() delete(groups, ag.fingerprint()) d.metrics.aggrGroups.Dec() } } }
d.mtx.Unlock()
case <-d.ctx.Done(): return } } }
aggrGroup 将警报聚集到适用于一组通用路由选项的组中。 它以指定的时间间隔发出通知。
这里,我们先看一下这个 AlertIterator 的结构体:
1 2 3 4 5 6 7 8 9
type AlertIterator interface { Iterator // Next returns a channel that will be closed once the iterator is // exhausted. It is not necessary to exhaust the iterator but Close must // be called in any case to release resources used by the iterator (even // if the iterator is exhausted). Next() <-chan *types.Alert }
现在再来看 run 方法。直接看注释吧。
在 run 中,有一个d.processAlert(alert, r),说明我们的告警信息在这里得到了处理,主要的作用就是:processAlert确定警报属于哪个聚合组并将其插入。
d.mtx.Lock() defer d.mtx.Unlock() // 每个路由有不同的分组 group, ok := d.aggrGroups[route] if !ok { group = map[model.Fingerprint]*aggrGroup{} d.aggrGroups[route] = group }