Kafka的消息可靠传递

Kafka提供的基础保障可以用来构建可靠的系统, 却无法保证完全可靠. 需要在可靠性和吞吐之间做取舍.

  • Kafka在分区上提供了消息的顺序保证.
  • 生产的消息在写入到所有的同步分区上后被认为是已提交 (不需要刷到硬盘). 生产者可以选择在消息提交完成后接收broker的确认, 是写入leader之后, 或者所有的副本
  • 只要有一个副本存在, 提交的消息就不会丢失
  • 消费者只能读取到已提交的消息

复制

Kafka的复制机制保证每个分区有多个副本, 每个副本可以作为leader或者follower的角色存在. 为了保证副本的同步, 需要做到:

  • 保持到zk的连接会话: 每隔6s向zk发送心跳, 时间可配置
  • 每隔10s向leader拉取消息, 时间可配置
  • 从leader拉取最近10s的写入的消息. 保持不间断的从leader获取消息是不够的, 必须保证几乎没有延迟

Broker配置

复制因子

default.replication.factor broker级别的副本数设置, 通过这个配置来控制自动创建的topic的副本数. 为N的时候, 可以容忍失去N-1个副本, 保证topic的可读写.

脏副本的leader选举

unclean.leader.election.enable 0.11.0.0之前的版本, 默认为true; 之后的版本默认为false. 这个设置控制不同步的副本能否参与leader的选举. 如果设置为true, 当没有同步副本可用的时候, 不同步的副本会成为leader, 意味着有数据丢失. 如果设置为false, 则意味着系统会处于不可用的状态, 该部分没有leader提供服务. 需要在可用性一致性之间做取舍.

最小同步副本数

min.insync.replicas 这个设置可以作用于broker和topic级别. 假如broker数为3, 最小同步副本数为2. 当2个同步副本中的一个出现问题, 集群便不会再接受生产者的发送消息请求. 同事客户端会收到NotEnoughReplicasException. 此时, 消费者还可以继续读取存在的数据. 唯一的同步副本变成只读.

可靠系统中使用生产者

发送确认

acks 可选0, 1或者all. 设置影响吞吐和一致性.

  • acks=0 意味着消息发送出去后就认为是成功写入topic.
  • acks=1 发送后等待leader写入后确认
  • acks=all 发送后等待所有副本写入后确认

重试

retries 消息发送后会收到成功或者错误码. 错误有两种, 可重试的和不可重试的. 对于可重试的错误, 生产者会重复发送, 而reties控制重试的次数. 比如borker返回LEADER_NOT_AVAILABLE错误, 生产者会自动进行重试(retries不等于0), 因为broker之后会选择新的leader. 如果返回INVALID_CONFIG, 重试也不会解决问题. 同时retries有可能导致消息重复, 这就是Kafka消息的at least once保证. 在0.11.0.0之后, 提供了幂等的特性, 保证消息的exactly one. 对于跨数据中心的复制(比如MirrorMaker), 默认设置为Integer.MAX_VALUE

额外的错误处理

使用生产者内置的重试是一个正确处理多种错误而不丢失消息的简单途径. 但是开发者还需要处理其他的错误, 比如:

  • 不可重试错误
  • 发送之前的错误
  • 场试完所有的重试次数后还是未成功发送.

可靠系统中使用消费者

已提交消息已提交偏移量 完全不同的两个概念, 前者是对生产者有效, 后者是对消费者有效.

重要设置

  • group.id 两个有相同group.id并且订阅同一个topic的消费者, 会分配到topic下分区的一个子集, 并且是独立的子集.
  • auto.offset.reset 这个参数控制当broker端没有发现任何提交的偏移量的时候, 消费者应该从什么位置开始读取消息. 接受earliestlatest两种设置. earliest意思是会从0开始读取, 而latest意思是从最末尾开始.
  • enable.auto.commit 按照时间计划提交偏移量或者代码中手动提交. 对consumer来说这是一个重大的决定. 自动提交会保证只提交循环中已经处理的数据, 但是有可能会在下次提交始前系统崩溃. 这就导致已经被处理的消息的偏移量没有提交到broker. 下次拉取的时候(consumer重新上线或者rebalance时候由其他消费者处理该分区)会重新拉取已经处理过的消息, 重复消费. 假如你是将拉取的消息交由其他的线程处理, 那自动提交可能会到时消息被拉取, 却没有被处理. 自动提交的好处是吞吐量大.
  • auto.commit.interval.msenable.auto.commit设置为true的时候, 通过这个配置控制自动提交的时间间隔. 越大吞吐就越大, 一致性就越低. 越小, 则会增加提交的次数, 影响吞吐, 但是会提高一致性.

准确提交偏移量

总是提交已经处理过得消息

假如你是在循环中处理所有的消息, 并且不需要维护跨多次轮询的状态, 会比较容易实现. 可以使用自动提交, 或者在轮询循环的末尾进行偏移量提交.

提交频率是性能和系统崩溃时重复的消息数量间的取舍

一次轮询循环中可以进行多次偏移量提交, 甚至每处理一条提交一次. 或者几个轮询提交一次. 提交会有性能上的开销, 类似生产者的acks=all

保证你清楚的了解将要提交什么偏移量

常见的一个陷阱就是一次轮询循环中的偏移量提交了读到的最大偏移量, 而不是已经处理过得最大偏移量. 会导致消息丢失.

再平衡

准确处理consumer的再平衡(consumer上线或者下线). 再平衡会引起先从消费者上摘取某些分区, 然后在分配某些分区. 通过实现RebalanceListener接口来实现控制.

消费者可能需要重试

某些场景下, 暂时不提交偏移量, 下次轮询的时候会重复拉取消息. 比如数据库连接暂时不可用的情况下.

消费者可能需要维护状态

某些场景下, 需要在多个轮询间存在聚合运算.

处理长时间的处理

有些时候, 消息的处理耗时较长, 比如与其他系统交互或者进行比较复杂的运算. 某些Kafka版本的消费者, 两次轮询的间隔不能太长 (0.10.0.0之前版本的消费者没有单独的心跳进程, 是通过轮询同时达到心跳目的). 太长, 消费者则会被认为是下线, 会发生再平衡.

有且只有一次的消息投递

有些场景需要至少一次的语义(没有消息丢失); 而某些场景则需要有些只有一次的语义. 但是当前Kafka没有提供完美的有且只有一次的支持. 需要与其他系统结合一起实现, 比如使用唯一的key写入数据库或者redis等存储中.

comments powered by Disqus