在用户的使用上,要启动幂等生产者只需要添加设置 enable.idempotence 为 true 就好,让我们继续关注下细节,看看启用幂等生产者后 kafka client 会做什么。首先 client会强制设置一些生产者的配置值。
-
acks 会被强制设置为all,如果客户本来使用的是0,1级别的 acks 那么用户需要考虑下被设置为 all 的时候对于自己业务性能的影响,如果用户本来就是设置为all的情况,那么使用幂等生产者是几乎不会有额外代价的。
-
retries 必须设置为大于1的数字,一般 librdkafka 和 java kafka client 会把 retries设置为一个非常大的数比如 Integer.MAX_VALUE,基本靠近于无限重试。确保消息一定会成功发送。
-
max.inflight.requests.per.connection 必须小于5,其中 java kafka client 如果版本小于1.0.0,会把 max.inflight.requests.per.connection 设置为 1,确保一条数据链路上一次只有一个请求,这会导致一定情况下 tps 有所下降。
-
发送的消息格式必须是 v2 格式。不支持低版本的消息格式。
完成生产者配置之后,client 开始执行生产消息的发送,这里我们省略在上文提到过的生产 api 的逻辑,只关注于启用幂等后多出来的逻辑和步骤
-
生产者 client 向 broker 发起 InitProducerId request 请求一个 PID,后续发送的消息,都会带上这一个 PID 用于标明生产者的身份。
-
每个消息会带上一个单调递增的 Sequence ID。kafka server会记录下同一个PID最后一次提交消息的 Sequence ID,如果当前发送的消息 Sequence ID 小于等于最后一次提交的 ID,那么 server 会认为当前消息已经过期了,并拒绝接受消息。client 收到这样的拒绝请求后就可以感知到之前的消息一定是投递成功了,并停止重试发送,丢弃掉消息。
通过以上的这些步骤,kafka 确保了每个消费者对于单 partition 操作的一个幂等性,这是一个非常实用的功能,特别是在使用消费者 api 的时候本来就已经设置了acks 为 all 的业务,启用生产者幂等几乎没有额外消耗,这也是一个 kafka 推出了比较久的功能了(从 kafka 0.11 开始支持),但是目前看起使用本功能的用户还是比较少。
事务消息(Transactional Messaging)简介
事务消息是目前 kafka 为了确保恰好一次语义所提供的最强约束,他确保了一个生产者如果生产多个相互关联的消息到不同的 partition 上时要么最后同时成功,要么同时失败。同时启用事物消息的前提必须启用幂等生产者,所以单 partition 上的恰好一次语义就由幂等的特性来保证。
不过一般很少有业务会直接使用 kafka 的事物消息,会涉及使用事物消息的业务其实基本上都是通过 kafka stream 进行流处理,而 kafka stream 依赖于事务消息并且对于业务隐藏掉了事务细节,所以这里我们来看看如何直接使用事务消息并继续尝试分析下client 在这期间做了什么,先让我们放出一份代码片段。
5. 在这一步中主要涉及到以下几个配置
-
acks 为 -1 或者 all,代表所有处于 isr(in-sync) 列表中的 replicas 都写入消息成功后才会返回成功给客户端,同时在 topic 级别也提供了一个min.insync.replicas 配置,如果 isr 中的 replicas 少于这个配置的值,那么写入同样会失败。这是 kafka 所能提供的最强约束了。
-
acks 为 0,代表消息只要投递到 client 的 tcp socket 缓冲区后就认为已经发送出去了,client 不再关注是否 kafka 集群是否收到或者写入成功。在这种模式下kafka 只提供 at-most-once 语义,在容忍数据丢失的情况下,是性能最好的模式。
-
acks 为 1 代表发送消息到 replicas leader 写入成功就返回成功,不关注其余follower 是否写入成功,如果投递消息后,leader 马上挂掉了,消息是会丢失的。
-
这个模式在大多数时候可以确保消息不丢失,是一个性能和安全性权衡的模式。
-
server 将根据 client 提供的 acks 配置值来确定服务端的写入会在什么情况下返回给客户端
-
client 如果接收到 produce 写入失败,那么将会重试 retries 配置的次数,每次重试之间间隔 retry.backoff.ms 所定义的时间。重试次数耗尽之后才会返回失败到业务逻辑。
以上就是整个 producer api 在使用过程中的一些细节了,明白了这些细节,在生产时遇到kafka的一些奇怪报错就会有一些思路去定位和处理。当然从代码上来看,代码里面还有一些配置在上面的文章中没有覆盖到,这里我在一起介绍一下
-
compression.type 用于配置压缩,kafka 提供不同的压缩模式,包括 none(不压缩),gzip,snappy,lz4,以及 zstd(需要2.1.0以上版本的kafka)。
-
一般来说我们比较推荐 lz4 格式的压缩,在比较轻的 cpu 负载下,可以提供不错的压缩比,和非常高的吞吐量,整体的性能和性价比会优于其他几个压缩方式,所以一般没有强烈的压缩比需求的话,使用 lz4 是比较好的选择。
-
key.serializer 和 value.serializer 是序列化器,这个只是在 java 的客户端中特有的,用于决定如何把 key 和 value 的值序列化,这里就不细说了。
-
request.timeout.ms 这个配置定义了网络请求超时时间,任何一个 kafka client对于 server 的请求,如果在本参数规定时间内没有收到答复,那么就都会取消请求并认为请求失败,并将逻辑转移到失败处理逻辑,这个约束是比较强的约束。
-
max.block.ms 这个配置项定义了 client 内部的一个阻塞时间,比如如果内部的异步队列满了,kafka client 调用 send 会等待这样一个时间,直到超时返回,这个参数需要注意的一点是用户自定义配置的序列化器和分区器中花费的时间不会计入这个参数超时中。
幂等生产者(Idempotent Producer)简介
幂等生产者提供了生产者在单一分区上的恰好一次语义,但是他不能覆盖到生产者对于复数 partition 操作的一致性,这种一致性需要通过后续的事务消息来解决。
现在让我们先看下幂等生产者如何使用,以及一些涉及到的细节。
为什么我们需要使用到幂等生产者,其主要的原因是生产者发送消息到服务端后,如果遇到了网络问题导致连接断开,生产者是无法感知到消息到底是写入成功还是失败,对于 kafka 一般的生产者 api 来说我们会设置 retries 参数,始终去进行重试,这也就是我们所谓至少一次语义,因为我们无法感知是否写入成功,如果写入成功,但是我们没有接收到成功的回复,我们进行重试动作,就会导致消息的重复写入,如果消息消费依赖于消息顺序,这种重试甚至会导致顺序的错乱。
现在通过幂等生产者,kafka 可以在我们进行这样的重试的时候丢弃掉这种重复写入的消息。
现在让我们看看如何使用幂等生产者。(这里让我们来看下代码,代码中让我们忽略掉一些不重要的配置)。

(编辑:南通站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|