技术是需要自己去沉淀的,沉淀的过程就是在与不断的学习和理解。加油,奥里给

Kafka 的相关总结

Kafka 架构

我们直接看个图:
Archive

Kafka 压测

Kafka 官方自带压力测试脚本(kafka-consumer-perf-test.shkafka-producer-perf-test.sh), Kafka 压测时,,可以查看到那些地方出现了瓶颈(CPU , Memory , 网络IO),其中,网络IO应该是最容易到达瓶颈的。

Kafka 的机器数量

Kafka 机器数 = 2 * (峰值生产速度 * 副本数 / 100) + 1

Kafka 分区数

  • 创建只有 1 个分区的 topic
  • 测试这个 topic 的 producer 的吞吐量和 consumer 的吞吐量
  • 假设值均为 Tp , Tc , 单位可以是 MB/s
  • 假设总的目标吞吐量是 Tt , 那么分区数 = Tt / Max(Tp , Tc)
  • 分区数一般设置 3-10 个

副本数

2 / 3 个

Topic的数量

  • 一般是 多少个 日志类型,就有多少个Topic
  • 如果是中台服务,应该根据部门,项目,以及日志类型三者来确定Topic

Kafka 丢不丢数据

  • ACK = 0 ,相当于异步发送,消息发送完毕即 offset 增加,继续生产;
  • ACK = 1 ,Leader 收到Leader Replica ,对一个消息的接收 ACK 才增加 Offset , 然后继续生产;
  • ACK = -1,Leader收到所有的 Replica ,对一个消息的接受ACK才增加 Offset,然后继续生产。

Kafka 的ISR副本同步队列

ISR ,副本同步队列。ISR 包括Leader , Follower 。如果Leader进程挂掉,会在ISR队列中选择一个服务作为新的 Leader , 有 replica.lag.max,messages(延迟条数) 和 replica.lag.time.max.ms(延迟时间)两个参数决定一台服务是否可以加入 ISR副本队列,在 0.10 版本中移除了 replica.lag.max.messages 参数,防止服务频繁的进去队列。

任意一个维度超过阈值都会把Follower剔除出ISR,存入 OSR 列表,新加入的 Follower 也会先寸放在 OSR。

Kafka 分区分配策略

Kafka内部存在两种默认的分区策略: Range / RoundRobin

Range

Range 是默认策略。Range 是对每个Topic 而言的。策略如下:

  • 对同一个 Topic 里面的分区按照序号进行排序,并对消费者按照字母来排序;
  • Partitions 分区的个数 除以 消费者线程的总数 来决定消费几个分区
  • 如果除不尽,那么前面几个消费者线程将会多消费一个分区

Kafka 挂了怎么办

  • Flume 有记录
  • 日志有记录
  • 短期内没事

Kafla 消息积压,Kafka 消费能力不足怎么处理

  • 消费能力不行时,则可以考虑增加Topic 的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。
  • 下游的数据处理不及时: 提高每批次拉取的数量。批次拉取数据过少(拉取数据 / 处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压。

Kafka 幂等性

Producer 的幂等性指的是当发送同一条消息时,数据在 Server 端只会被持久化一次,数据不丢不重,但是这里的幂等性是有条件的。

  • 只能保证 Producer 在单个会话内不丢不重,如果 Producer 出现意外挂掉在重启是无法保证的。
  • 幂等性不能跨多个 Topic-Partition ,只能保证单个 Partition 内的幂等性,当涉及多个 Topic-Partition 时,这中间状态并没有同步。

Kafka 事物

Kafka从0.11版本开始引入了事务支持。事务可以保证Kafka在Exactly Once语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。

  1. Producer事务

为了实现跨分区跨会话的事务,需要引入一个全局唯一的Transaction ID,并将Producer获得的PID和Transaction ID绑定。这样当Producer重启后就可以通过正在进行的Transaction ID获得原来的PID。

为了管理Transaction,Kafka引入了一个新的组件Transaction Coordinator。Producer就是通过和Transaction Coordinator交互获得Transaction ID对应的任务状态。Transaction Coordinator还负责将事务所有写入Kafka的一个内部Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。

  1. Consumer事务

上述事务机制主要是从Producer方面考虑,对于Consumer而言,事务的保证就会相对较弱,尤其时无法保证Commit的信息被精确消费。这是由于Consumer可以通过offset访问任意信息,而且不同的Segment File生命周期不同,同一事务的消息可能会出现重启后被删除的情况。

Kafka 数据重复

幂等性 + ACK-1 + 事务

Kafka 数据重复,可以在下一级,SparkStraming / Flink / Redis 里面进行去重,去重的手段就是: 分组 -> 开窗(id) -> 取第一个值

Kafka 参数优化

1)Broker参数配置(server.properties)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1、网络和io操作线程配置优化
# broker处理消息的最大线程数(默认为3)
num.network.threads=cpu核数+1
# broker处理磁盘IO的线程数
num.io.threads=cpu核数*2

2、log数据文件刷盘策略
# 每当producer写入10000条消息时,刷数据到磁盘
log.flush.interval.messages=10000
# 每间隔1秒钟时间,刷数据到磁盘
log.flush.interval.ms=1000

3、日志保留策略配置
# 保留三天,也可以更短(log.cleaner.delete.retention.ms)
log.retention.hours=72

4、Replica相关配置
offsets.topic.replication.factor:3
# 这个参数指新创建一个topic时,默认的Replica数量,Replica过少会影响数据的可用性,太多则会白白浪费存储资源,一般建议在2~3为宜。

2)Producer优化(producer.properties)

1
2
3
4
5
buffer.memory:33554432 (32m)
# 在Producer端用来存放尚未发送出去的Message的缓冲区大小。缓冲区满了之后可以选择阻塞发送或抛出异常,由block.on.buffer.full的配置来决定。

compression.type:none
# 默认发送不进行压缩,推荐配置一种适合的压缩算法,可以大幅度的减缓网络压力和Broker的存储压力。

3)Kafka内存调整(kafka-server-start.sh)

1
2
# 默认内存1个G,生产环境尽量不要超过6个G。
export KAFKA_HEAP_OPTS="-Xms4g -Xmx4g"

Kafka 高效读写数据

1. Kafka 本身是分布式集群,同时采用分区技术,并发度高.

2. 顺序读写磁盘

Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

3. 零复制技术

zero-copy