Kafka消息消费一致性

Kafka消费端的offset主要由consumer来控制, Kafka降每个consumer所监听的tocpic的partition的offset保存在__consumer_offsets主题中. consumer需要将处理完成的消息的offset提交到服务端, 主要有ConsumerCoordinator完成的.

每次从kafka拉取数据之前, 假如是异步提交offset, 会先调用已经完成的offset commit的callBack, 然后检查ConsumerCoordinator的连接状态. 如果设置了自动提交offset, 会继续上次从服务端获取的数据的offset异步提交到服务端. 这里需要注意的是会有几种情况出现:

下面配置影响着数据一致性和性能, 因此需要结合业务场景合理配置一下参数, 进行取舍.

一致性

这里我们针对前面出现的两个问题给出解决方案.

Kafka Java Client

enable.auto.commit设置为false, 并在每处理完一条数据后手动提交offset.

这里需要主意的时, 提交的offset是对当前消息的offset基础上进行加1.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ConsumerTest {
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.186:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.NONE.toString().toLowerCase(Locale.ROOT));
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("my-topic"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            if (!records.isEmpty()) {
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                    //Manually commit each record
                    consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)));
                }
            }
        }
    }
}

Spring Kafka

Comments

comments powered by Disqus