目的

基于当前最流行的 kafka-docker 仓库 启动 kafka-connector;主要有以下变量:

  • KAFKA_BOOTSTRAP_SERVERS
    kafka 集群地址
  • KAFK_GROUP_ID
    内置消费组 id, 默认为 mm2
  • KAFKA_SASL_ENABLE
    kafka 是否开启 sasl,此处特指 scram
  • KAFKA_ADMIN_PASSWORD
    kafka 中 admin 的密码

变动部分

改动处一

拷贝一份 DockerfileDockerfile-KafkaConnector,并将 Dockerfile-KafkaConnector 文件内中的所有 start-kafka.sh 替换为 start-kafka-connector.sh

改动处二

增加 start-kafka-connector.sh 脚本文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#!/bin/bash -e


if [[ -z "$KAFKA_BOOTSTRAP_SERVERS" ]]; then
echo "ERROR: missing mandatory config: KAFKA_BOOTSTRAP_SERVERS"
exit 1
fi


if [[ -z "$KAFK_GROUP_ID" ]]; then
export KAFK_GROUP_ID=mm2
fi


(
function updateConfig() {
key=$1
value=$2
file=$3


# Omit $value here, in case there is sensitive information
echo "[Configuring] '$key' in '$file'"


# If config exists in file, replace it. Otherwise, append to file.
if grep -E -q "^#?$key=" "$file"; then
sed -r -i "s@^#?$key=.*@$key=$value@g" "$file" #note that no config values may contain an '@' char
else
echo "$key=$value" >> "$file"
fi
}


EXCLUSIONS="|KAFKA_VERSION|KAFKA_HOME|KAFKA_HEAP_OPTS|KAFKA_SASL_ENABLE|KAFKA_ADMIN_PASSWORD|"
CONFIG_FILE=$KAFKA_HOME/config/connect-distributed.properties


for VAR in $(env)
do
env_var=$(echo "$VAR" | cut -d= -f1)
if [[ "$EXCLUSIONS" = *"|$env_var|"* ]]; then
echo "Excluding $env_var from broker config"
continue
fi


if [[ $env_var =~ ^KAFKA_ ]]; then
kafka_name=$(echo "$env_var" | cut -d_ -f2- | tr '[:upper:]' '[:lower:]' | tr _ .)
updateConfig "$kafka_name" "${!env_var}" "$CONFIG_FILE"
fi
done


if [[ "$KAFKA_SASL_ENABLE" = "true" ]]; then
if [ ! KAFKA_ADMIN_PASSWORD ]; then
KAFKA_ADMIN_PASSWORD=123456
fi


updateConfig "security.protocol" "SASL_PLAINTEXT" "$CONFIG_FILE"
updateConfig "sasl.mechanism" "SCRAM-SHA-256" "$CONFIG_FILE"
updateConfig "sasl.jaas.config" "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"$KAFKA_ADMIN_PASSWORD\";" "$CONFIG_FILE"


updateConfig "producer.security.protocol" "SASL_PLAINTEXT" "$CONFIG_FILE"
updateConfig "producer.sasl.mechanism" "SCRAM-SHA-256" "$CONFIG_FILE"
updateConfig "producer.sasl.jaas.config" "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"$KAFKA_ADMIN_PASSWORD\";" "$CONFIG_FILE"


updateConfig "admin.security.protocol" "SASL_PLAINTEXT" "$CONFIG_FILE"
updateConfig "admin.sasl.mechanism" "SCRAM-SHA-256" "$CONFIG_FILE"
updateConfig "admin.sasl.jaas.config" "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"$KAFKA_ADMIN_PASSWORD\";" "$CONFIG_FILE"
fi


)
exec "$KAFKA_HOME/bin/connect-distributed.sh" "$KAFKA_HOME/config/connect-distributed.properties"

生成镜像

1
docker build -f Dockerfile-KafkaConnector -t harbor.oneitfarm.com/cidata/kafka-connector:2.5.0 .