Kafka消息消费一致性
Kafka消费端的offset主要由consumer来控制, Kafka降每个consumer所监听的tocpic的partition的offset保存在__consumer_offsets主题中. consumer需要将处理完成的消息的offset提交到服务端, 主要有ConsumerCoordinator完成的.
每次从kafka拉取数据之前, 假如是异步提交offset, 会先调用已经完成的offset commit的callBack, 然后检查ConsumerCoordinator的连接状态. 如果设置了自动提交offset, 会继续上次从服务端获取的数据的offset异步提交到服务端. 这里需要注意的是会有几种情况出现:
- 消息处理耗时较多, 假如处理单条消息的耗时为t, 拉取的消息个数为n. t * n > auto_commit_interval_ms, 会导致没有处理完的消息的offset被commit到服务端. 假如此时消费端挂掉, 没有处理完的数据将会丢失.
- 假如消息处理完成, offset还未commit到服务端的时候消费端挂掉, 已经处理完的消息会被再次消费.
下面配置影响着数据一致性和性能, 因此需要结合业务场景合理配置一下参数, 进行取舍.
-
enable.auto.commit
默认为true -
auto.commit.interval.ms
默认为5000 ms (5s) -
max.poll.records
默认为500 -
fetch.max.bytes
默认为52428800 bytes (50Mib).
一致性
这里我们针对前面出现的两个问题给出解决方案.
Kafka Java Client
把enable.auto.commit
设置为false
, 并在每处理完一条数据后手动提交offset.
这里需要主意的时, 提交的offset是对当前消息的offset基础上进行加1.
public class ConsumerTest {
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.186:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.NONE.toString().toLowerCase(Locale.ROOT));
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
if (!records.isEmpty()) {
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
//Manually commit each record
consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)));
}
}
}
}
}
Spring Kafka
-
把
enable.auto.commit
设置为false
-
设置
ContainerProperties
的ackMode
为MANUAL_IMMEDIATE
-
使用AcknowledgingMessageListener作为listener, 并在消息处理完成后调用acknowledgment.acknowledge().
public class SpringConsumerTest {
public static void main(String[] args) {
String bootstrapServer = "192.168.31.186:9092";
String groupId = "spring-consumer-group";
ContainerProperties containerProperties = new ContainerProperties("my-topic");
containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
containerProperties.setMessageListener(
(AcknowledgingMessageListener<String, String>) (consumerRecord, acknowledgment) -> {
System.out.println(consumerRecord);
acknowledgment.acknowledge();
});
Map<String, Object> consumerConfigs = new HashMap<>();
consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerConfigs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfigs.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 10 * 1000);
consumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
KafkaMessageListenerContainer<String, String> listenerContainer = new KafkaMessageListenerContainer<String, String>(
new DefaultKafkaConsumerFactory(consumerConfigs),
containerProperties);
listenerContainer.start();
}
}