Alertmanager 源码分析(4)Kafka支持

前面我们的源码一直走到了notify.go 这个文件里面,但是这里面具体怎么实现的?我并不是很关心。我是带着任务来的。增加 Kafka 支持。 改造 Alertmanager 的消息通知,我们需要知道这几个文件:

  • notify/notify,go 这里面是 方法的接口
  • config/config.go 这里面是 alertmanager.yml 配置文件序列化
  • config/notifiers.go 这里面是 告警通知方式的配置的生成

我们在notify 目录下,找到了 email , webhook等通知方式的源码,我们简单点,根据webhook 来进行Kafka 的源码支持。

首先,我们现增加相关的配置类的信息,在 config 的文件目录中的 config 增加配置信息,在 config/notifiers.go 里面增加 kafka 的基本配置,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Receiver configuration provides configuration on how to contact a receiver.
type Receiver struct {
// A unique identifier for this receiver.
Name string `yaml:"name" json:"name"`

EmailConfigs []*EmailConfig `yaml:"email_configs,omitempty" json:"email_configs,omitempty"`
PagerdutyConfigs []*PagerdutyConfig `yaml:"pagerduty_configs,omitempty" json:"pagerduty_configs,omitempty"`
HipchatConfigs []*HipchatConfig `yaml:"hipchat_configs,omitempty" json:"hipchat_configs,omitempty"`
SlackConfigs []*SlackConfig `yaml:"slack_configs,omitempty" json:"slack_configs,omitempty"`
WebhookConfigs []*WebhookConfig `yaml:"webhook_configs,omitempty" json:"webhook_configs,omitempty"`
OpsGenieConfigs []*OpsGenieConfig `yaml:"opsgenie_configs,omitempty" json:"opsgenie_configs,omitempty"`
WechatConfigs []*WechatConfig `yaml:"wechat_configs,omitempty" json:"wechat_configs,omitempty"`
PushoverConfigs []*PushoverConfig `yaml:"pushover_configs,omitempty" json:"pushover_configs,omitempty"`
VictorOpsConfigs []*VictorOpsConfig `yaml:"victorops_configs,omitempty" json:"victorops_configs,omitempty"`
KafkaConfigs []*KafkaConfig `yaml:"kafka_configs,omitempty" json:"kafka_configs,omitempty"`
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89

type RequiredAckCode int8

func (ra RequiredAckCode) GetAck() sarama.RequiredAcks {
return []sarama.RequiredAcks{
sarama.NoResponse,
sarama.WaitForAll,
sarama.WaitForLocal,
}[int(ra)]
}

type Producer struct {
MaxMessageBytes int `json:"max_message_bytes" yaml:"max_message_bytes"`
RequiredAcks RequiredAckCode `json:"required_acks" yaml:"required_acks"`
RetryMax int `json:"retry_max" yaml:"retry_max"`
}
type SASL struct {
Enabled bool `json:"enabled" yaml:"enabled"`
Username string `json:"username" yaml:"username"`
Password string `json:"password" yaml:"password"`
SCRAMClientGeneratorFunc string `json:"scram_client_generator_func" yaml:"scram_client_generator_func"`
}
type KafkaNetConfig struct {
SASL *SASL `json:"sasl" yaml:"sasl"`
DialTimeout time.Duration `json:"dial_timeout,omitempty" yaml:"dial_timeout,omitempty"`
ReadTimeout time.Duration `json:"read_timeout,omitempty" yaml:"read_timeout,omitempty"`
WriteTimeout time.Duration `json:"write_timeout,omitempty" yaml:"write_timeout,omitempty"`
}

type KafkaConfig struct {
NotifierConfig `yaml:",inline" json:",inline"`
Brokers []string `json:"brokers" yaml:"brokers"`
Version KafkaVersion `json:"version" yaml:"version"`
MetadataTimeout time.Duration `json:"metadata_timeout" yaml:"metadata_timeout"`
KafkaNetConfig *KafkaNetConfig `json:"kafka_net_config" yaml:"kafka_net_config"`
KafkaProducerConfig *Producer `json:"kafka_producer_config" yaml:"kafka_producer_config"`
MaxAlerts uint64 `json:"max_alerts,omitempty" yaml:"max_alerts,omitempty"`
Topic string `json:"topic" yaml:"topic"`
}

func (c *KafkaConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = DefaultKafkaConfig
type plain KafkaConfig
if err := unmarshal((*plain)(c)); err != nil {
return err
}
if c.Version == "" {
return fmt.Errorf("missing version in kafka config")
}
if c.Brokers == nil {
return fmt.Errorf("missing brokers in kafka config")
}
if c.Topic == "" {
return fmt.Errorf("missing topic in kafka config")
}
return nil
}

type KafkaVersion string

func (kv KafkaVersion) GetVersion() sarama.KafkaVersion {
switch kv {
case "V2_6_0_0":
return sarama.V2_6_0_0
case "V2_5_0_0":
return sarama.V2_5_0_0
case "V2_4_0_0":
return sarama.V2_4_0_0
case "V2_3_0_0":
return sarama.V2_3_0_0
case "V2_2_0_0":
return sarama.V2_2_0_0
case "V2_1_0_0":
return sarama.V2_1_0_0
case "V2_0_1_0":
return sarama.V2_0_1_0
case "V2_0_0_0":
return sarama.V2_0_0_0
case "V1_1_1_0":
return sarama.V2_5_0_0
case "V1_1_0_0":
return sarama.V1_1_0_0
case "V1_0_0_0":
return sarama.V1_0_0_0
default:
return sarama.V0_11_0_2

}
}

可根据自身进行调整。

接下来就是我们的 Kafka 的发送的客户端了。我们在 notify/kafka.go 中增加代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
type Notifier struct {
conf *config.KafkaConfig
tmpl *template.Template
logger log.Logger
client *Client
counter uint64
sync.RWMutex
}

func New(conf *config.KafkaConfig, t *template.Template, l log.Logger) (*Notifier, error) {
// 创建一个 Kafka 的 客户端
client, err := newKafkaClient(conf)
if err != nil {
return nil, err
}
client.logger = l
level.Info(l).Log("kafka client", "kafka client created success")
go client.AsyncSendMsg(context.Background())
return &Notifier{
conf: conf,
tmpl: t,
logger: l,
client: client,
}, nil
}

type Message struct {
*template.Data
Version string `json:"version"`
GroupKey string `json:"groupKey"`
}

func (n *Notifier) Notify(ctx context.Context, alerts ...*types.Alert) (bool, error) {
atomic.AddUint64(&n.counter, 1)
data := notify.GetTemplateData(ctx, n.tmpl, alerts, n.logger)
level.Info(n.logger).Log("counter", atomic.LoadUint64(&n.counter))
groupKey, err := notify.ExtractGroupKey(ctx)
if err != nil {
level.Debug(n.logger).Log("groupKey", groupKey)
return false, errors.Wrap(err, "groupKey error")
}
msg := &Message{
Version: "4",
Data: data,
GroupKey: groupKey.String(),
}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(msg); err != nil {
level.Error(n.logger).Log("json", err)
return false, errors.Wrap(err, "json encode error")
}
go input(buf.String())
return false, nil
}

var messageChan = make(chan string)

func input(msg string) {
messageChan <- msg
}

func (c *Client) AsyncSendMsg(ctx context.Context) {
go func() {
defer func() {
if r := recover(); r != nil {
level.Debug(c.logger).Log("status", "failed", "recovery result", r)
}
}()
for err := range c.clientAsync.Errors() {
level.Error(c.logger).Log("status", "failed", "error", err)
}
}()
go func() {
defer func() {
if r := recover(); r != nil {
level.Error(c.logger).Log("recovery", r)

}
}()
for successes := range c.clientAsync.Successes() {
level.Info(c.logger).Log("status", "success", "topic", successes.Topic, "offset", successes.Offset, "partition", successes.Partition)
}
}()
ProducerLoop:
for {
select {
case msg := <-messageChan:
c.clientAsync.Input() <- &sarama.ProducerMessage{Topic: c.topic, Value: sarama.StringEncoder(msg)}
case <-ctx.Done():
c.clientAsync.AsyncClose()
break ProducerLoop
}
}
}

type Client struct {
clientAsync sarama.AsyncProducer
clientSync sarama.SyncProducer
logger log.Logger
topic string
}

func newKafkaClient(kafkaConfig *config.KafkaConfig) (*Client, error) {
config := sarama.NewConfig()
config.Producer.RequiredAcks = kafkaConfig.KafkaProducerConfig.RequiredAcks.GetAck()
config.Producer.Retry.Max = kafkaConfig.KafkaProducerConfig.RetryMax

if kafkaConfig.KafkaProducerConfig.MaxMessageBytes == 0 {
kafkaConfig.KafkaProducerConfig.MaxMessageBytes = 1000000
}
config.Producer.MaxMessageBytes = kafkaConfig.KafkaProducerConfig.MaxMessageBytes
config.Producer.Return.Successes = true
config.Net.DialTimeout = kafkaConfig.KafkaNetConfig.DialTimeout
config.Metadata.Timeout = kafkaConfig.MetadataTimeout
config.Version = kafkaConfig.Version.GetVersion()
config.Net.SASL.Enable = kafkaConfig.KafkaNetConfig.SASL.Enabled
config.Net.SASL.User = kafkaConfig.KafkaNetConfig.SASL.Username
config.Net.SASL.Password = kafkaConfig.KafkaNetConfig.SASL.Password
if kafkaConfig.KafkaNetConfig.SASL.SCRAMClientGeneratorFunc == "SHA256" || kafkaConfig.KafkaNetConfig.SASL.SCRAMClientGeneratorFunc == "" {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA256}
}
config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
} else if kafkaConfig.KafkaNetConfig.SASL.SCRAMClientGeneratorFunc == "SHA512" {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
}
config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
}
asyncProducer, err := sarama.NewAsyncProducer(kafkaConfig.Brokers, config)
if err != nil {
return nil, errors.Wrapf(err, "new kafka async producer error:{%s}", err.Error())
}
syncProducer, err := sarama.NewSyncProducer(kafkaConfig.Brokers, config)
if err != nil {
return nil, errors.Wrapf(err, "new kafka sync producer error:{%s}", err.Error())
}
return &Client{
clientAsync: asyncProducer,
clientSync: syncProducer,
topic: kafkaConfig.Topic,
}, nil
}

如此,我们的 Kafka 的发送就好了。接下来就是进行测试了。

在测试的环节中,我发现了一个问题,就是我接收到告警后,这里的 Notify 方法会循环被执行多次,给我的感觉是发送失败了,在进行 retry 的次数 。这一点目前还是不是很清楚这究竟是为什么。