关于 kafka 的一些经验

发布时间 2023-03-29 19:57:37作者: owenqing

1. kafka 高吞吐原因

  1. PageCache + 顺序写磁盘 (读与写)
    1. producer 请求:Server端的I/O线程统一将请求写到操作系统的 PageCache 后立即返回,当消息达到一定阈值后, Kafka 应用本身会操作系统内核会触发强制刷盘
    2. comsumer 请求: 主要利用了 zero copy 技术,当 broker 接收到读取数据的请求时,会向操作系统发送 sendfile 系统调用,操作系统接收后,首先试图从 PageCache 中获取数据。如果不存在,则发生缺页中断将数据读取到 kernel buffer, 随后通过 DMA 直接将数据拷贝到网卡。
  2. Zero Copy 零拷贝技术 (跳过用户态缓冲区)
  3. 分区技术
  4. 批量发送
  5. 数据压缩

【阅读】基于SSD的Kafka应用层缓存架构设计与实现

2 kafka 消息丢失问题

MQ 消息丢失一般可以从三个阶段讨论:producer, broker, comsumer

3.1 生产者阶段

为了提高效率,kafka 在生产者端会攒批发送(达到一定的量或超过时间阈值就会发送)如果程序崩溃,buffer 中的数据会丢失
解决方案:

  1. 异步发送改同步发送
  2. 消息先写入日志,在通过 filebeat 等工具发送(提供更大的缓冲池)

3.2 Broker 阶段

这是个同步刷盘和异步刷盘的问题,异步刷盘就有丢失数据的风险。Linux 刷盘触发条件:

  • 主动调用sync或fsync函数
  • 可用内存低于阀值
  • dirty data时间达到阀值。dirty是pagecache的一个标识位,当有数据写入到pageCache时,pagecache被标注为dirty,数据刷盘以后,dirty标志清除

Kafka 没有提供同步刷盘的机制,也就是说靠单个 broker 是无法保证消息的完整性的。 kafka 通过 producer 与 kbroer 协调来尽可能的保证消息不丢失。

kafka通过producer和broker协同处理单个broker丢失参数的情况。一旦producer发现broker消息丢失,即可自动进行retry。除非retry次数超过阀值(可配置),消息才会丢失。此时需要生产者客户端手动处理该情况。那么producer是如何检测到数据丢失的呢?是通过ack机制,类似于http的三次握手的方式

ack 参数:
0: producer不等待broker的响应,效率最高,但是消息很可能会丢
1: leader broker收到消息后,不等待其他follower的响应,即返回ack (数据写入 leader 的 PageCache 就会返回 ack。数据刷盘成功后才会开启复制。follower 没有拉取完整数据 leader 挂了,则消息丢失)
all: 写入 leader, 等待所有的 ISR( in sync replicas 可配置) follwer 都确认了,在返回 ack。(leader 挂掉,重新选举。如果这时 follwer 全部挂了(掉电),消息就丢失了,非常极端)
【阅读】kafka 消息问题

3. Kafka 数据清理

  • delete 模式
  • compact 模式
  • compact & delete 模式

设置模式 log.cleanup.policy = compact。可以设置到 topic 级别

3.1 删除策略

```shell

# delete 模式
cleanup.policy     => delete
segment.bytes      => 1.07G                             
segment.ms         => 7 days      (与上一参数共同控制 segment 滚动逻辑)
retention.ms       => 7 day				(消息保留时间)
retention.bytes    => Infinite    (保留大小)

# compact 模式
cleanup.policy            => compact
segment.bytes             => 1.07G                             
segment.ms                => 180 days  (与上一参数共同控制 segment 滚动逻辑)
min.cleanable.dirty.ratio => 0.5%      (默认 0.5 重复)   

min.cleanable.dirty.ratio脏数据比例:未清理的数据和已清理数据的比例 0.5% 没什么问题
compcat 模式起初导入数据时可以调整 segment.ms为 1-2 min,快速生成 segment 文件,进行 compat。观察到 topic 存储大小降下来稳定后,设置为 180days。
注意:segment.ms 时间多小会导致非常多的小文件。

delete.retention.ms墓碑消息保留时间。时间过短会有问题, 设置 1 day 就好
消息序列 => [+1, + 2, +3, -1, -2, -3] 。消费者读取到+3就挂了,压缩后墓碑消息被删除。就丢失了 -1, -2, -3 的数据信息。为了避免出错,消费者不能停止太久

4. kafka partiton 数量对集群性能的影响

partition 数量越多会提升吞吐,过多也会有问题
过多 partition 数量的危害:

  1. 文件句柄开销。每个 partiton 都有很多 segment 文件(xx.log xx.index)分区越多文件越多,可能突破 ulimit -n 的限制,从而抛出 open too many file 的异常
  2. 降低系统可用性。parttion 数量越多的 topic 选举耗时越久
  3. 客户端与服务端内存开销大。(socket 建立与客户端 batch 缓存)

注:经验策略,峰值 1000/s, 平均 300/s 可设置一个 partition

5. 如何解决消息消费不过来

  1. 消费者并发消费(顺序无法保障)
  2. 扩充 partiton (顺序无法保障)

max.poll.records 默认 500
max.poll.interval.ms 默认 300s。两次 poll 之间的间隔,超过这个时间会触发 rebalance
如果消费时间频繁超过 max.poll.interval.ms会表现为消费不动。

如果出现上述现象可以考虑:

  1. 降低 poll 的数量,每次少消费一点
  2. 提高消费能力

6. Kafka 文件存储机制

假设 foo topic 4 个分区


|--foo-0
|--foo-1
|--foo-2
|--foo-3

每个 partition 中有很多 segment 文件。
segment 由 xxx.index xxx.log 索引文件和数据文件组成。
image.png
image.png