关于kafka的Producer的粘性分区的坑
问题背景:
某集群集群做了kafka集群切换zk服务变更。在变更期间,停止多个kafka业务(主要是Flink流作业)30分钟左右。变更结束后,恢复kafka业务,其中一个Flink作业写Kafka不稳定,运行几分钟后其中一个partition写入超时,导致作业失败。
问题分析:
- 从报错堆栈上来看,数据向topic所在节点发送数据过程中出现了超时,也就是producer端与kafka端连接超时。触发报错的场景一般有两种原因:
(1)网络问题:发送数据的客户端到服务端之间存在一定网络延迟,导致发送失败。通过检测网络质量,这个场景能够排除。
(2)Kafka服务端异常。分区所在的kafka节点本身存在异常,导致数据发送超时。例如:磁盘、CPU等硬件资源使用过载会出现处理能力下降等。
在登录这个异常的broker节点后,通过磁盘io命令查看磁盘使用率,发现磁盘io长时间处于了90%以上。
2. 磁盘io长时间处于100%,与节点的数据流量异常有关。对比异常的broker节点和其它的broker节点,发现数据流量较之前增长明显。并且这个节点上的分区大小相比较于其它分区大了将近10倍。
3. 通常出现这种情况时,往往有如下几种场景:
(1)数据带了key而导致的数据倾斜。例如如下写法:
通过排查业务侧的代码。数据中并没有带有key值,因此该假设不成立。
(2)分区倾斜:业务的数据量大,但是分区数量少。Topic的分区数均衡200分区,每个分区数量不一致。如果分区倾斜每个分区中的数据量应该是一致的。不会出现这种现象。故不成立。
4. 通过观查,出现问题的节点只有一个。也就是说,只要topic的分区在异常的broker节点上,这些分区的数据量就会异常。如果停止了这个异常节点,流量会迁移到另外一个节点上。
原因分析
Producer的粘性分区特性
先来了解一下2.4版本以后引进的粘性特性:https://cwiki.apache.org/confluence/display/KAFKA/KIP-480:+Sticky+Partitioner
关于粘性的介绍可以看下这篇文章:https://developer.huawei.cn/consumer/cn/forum/topic/0203860215257330221?fid=0101592429757310384
划重点:
在6.5.1版本(1.1.0版本)之前,如果数据没有key默认的分区散列算法如下:
图一:原始的分区散列算法
- 每条数据会随机选择一个分区
- 数据进入分区所在的Deque队列,deque队列中以batch为单位进行数据缓存,每个batch大小默认为16384bytes(由生产者参数batch.size决定)
- 当满足batch大小满足条件或者超过ling.ms设定时间时,触发数据发送。
651版本(kafka为2.4版本)以后,数据不带key的默认发送场景为粘性发送。
图二:粘性分区散列算法
(1)随即挑选一个可用分区(如果leader不为-1或者none均为可用分区,被选择过的分区在下次选择时候不再作为候选分区)。见代码:
(2) 当至少将分区填满或者达到linger.ms上限后,发送整个分区的数据。
根据上述说明,当设置了linger.ms就意味着要等到到达linger.ms设置的限定时间或者batch.size后才能发送数据。
1. 使用原始的发送方法。数据均匀散列到各个分区,batch.size很难填满,此时就必须要等待到达linger.ms设定的时间限制。在到达时间后,topic的所有分区同时发送请求,例如图一中的topic有三个分区,等待时间超过linger.ms后才会发送请求。
2. 使用粘性分区发送。数据会集中发送到一个分区,这个分区会写满一个batch才会选择另外一个分区。如果在linger.ms设定的时间内写满,那么就会体现发送这个batch的数据,并且在同一时间只产生一个请求。
通过比对,粘性分区从吞吐率和资源使用上都有一定程度的优化。但是粘性设计仍然存在一定的缺陷。见粘性优化方案:KAFKA-10888
二,粘性分区的问题
回到问题中,为什么粘性会带来数据倾斜。上文提到如果数据的发送依赖于linger.ms和batch.size两个参数。在默认情况下linger.ms会配置为0,也就是立即发送。这样每个分区中的数据难达到batch.size的大小就会立即发送。
生产者中有一个参数能够限制生产者最大的请求数量:max.in.flight.requests.per.connection 该参数能够限制生产者与一个broker的链接上最大的请求数量,也就是说当生产者与broker建立一个常链接后,这个链接上能够持有的最多未通过acks确认的发送请求最大数。默认值为5。假设5个链接全部被占用,那么生产者中的数据将的缓存起来,当有可用的链接时。缓存中的数据将以batch的形式发出去。
在有可用发送线程的情况,如下图:
Producerbatch能够及时发送到kafka的broker节点,并且由于linger.ms设置为0,batch.Size不会写满就会发送。
如果kafka的broker节点出现性能问题,例如CPU、磁盘IO、网络等问题导致节点响应慢,就会出现大批量的batch挤压,多数batch都会被填满。如下图:
此时,这样就会产生这样的现象:
1.无异常的节点batch无法写满,发送的量少,分区中的数据量少。
2. 异常节点由于响应慢,请求池被占用完,大量的数据挤压,每个batch的数据全部写满。分区中的数据会越来越多。最后所达到的现象就是。每个分区的数据量差异变大。
更严重的是,如果这个现象一旦出现,性能差的节点会成为短板节点,很难自行恢复,并且性可能会越来越差。
解决方案:
通过修改分区散列算法能够规避这个问题。
(1)Kafka生产者原生API:将散列算法修改为RoundRobin 随机算法。如下配置
初始化properties时加入配置:"partitioner.class",并且修改value为"org.apache.kafka.clients.producer.RoundRobinPartitioner"
(2)如果使用的是Flink作为生产者。如果配置了下图中的内容将使用粘性分区。
可以将上图的红框内容替换为Optional.of(new FlinkFixedPartitioner<>())。
- 点赞
- 收藏
- 关注作者
评论(0)