Kafka消息消费一致性

Kafka消费端的offset主要由consumer来控制, Kafka降每个consumer所监听的tocpic的partition的offset保存在__consumer_offsets主题中. consumer需要将处理完成的消息的offset提交到服务端, 主要有ConsumerCoordinator完成的.

每次从kafka拉取数据之前, 假如是异步提交offset, 会先调用已经完成的offset commit的callBack, 然后检查ConsumerCoordinator的连接状态. 如果设置了自动提交offset, 会继续上次从服务端获取的数据的offset异步提交到服务端. 这里需要注意的是会有几种情况出现:

  • 消息处理耗时较多, 假如处理单条消息的耗时为t, 拉取的消息个数为n. t * n > auto_commit_interval_ms, 会导致没有处理完的消息的offset被commit到服务端. 假如此时消费端挂掉, 没有处理完的数据将会丢失.
  • 假如消息处理完成, offset还未commit到服务端的时候消费端挂掉, 已经处理完的消息会被再次消费.

下面配置影响着数据一致性和性能, 因此需要结合业务场景合理配置一下参数, 进行取舍.

  • enable.auto.commit 默认为true

  • auto.commit.interval.ms 默认为5000 ms (5s)

  • max.poll.records 默认为500

  • fetch.max.bytes 默认为52428800 bytes (50Mib).

一致性

这里我们针对前面出现的两个问题给出解决方案.

Kafka Java Client

enable.auto.commit设置为false, 并在每处理完一条数据后手动提交offset.

这里需要主意的时, 提交的offset是对当前消息的offset基础上进行加1.

public class ConsumerTest {
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.186:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.NONE.toString().toLowerCase(Locale.ROOT));
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("my-topic"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            if (!records.isEmpty()) {
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                    //Manually commit each record
                    consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)));
                }
            }
        }
    }
}

Spring Kafka

  • enable.auto.commit设置为false

  • 设置ContainerPropertiesackModeMANUAL_IMMEDIATE

  • 使用AcknowledgingMessageListener作为listener, 并在消息处理完成后调用acknowledgment.acknowledge().

public class SpringConsumerTest {
    public static void main(String[] args) {
        String bootstrapServer = "192.168.31.186:9092";
        String groupId = "spring-consumer-group";
        ContainerProperties containerProperties = new ContainerProperties("my-topic");
        containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        containerProperties.setMessageListener(
                (AcknowledgingMessageListener<String, String>) (consumerRecord, acknowledgment) -> {
                    System.out.println(consumerRecord);
                    acknowledgment.acknowledge();
                });
        Map<String, Object> consumerConfigs = new HashMap<>();
        consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        consumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        consumerConfigs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
        consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumerConfigs.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 10 * 1000);
        consumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        KafkaMessageListenerContainer<String, String> listenerContainer = new KafkaMessageListenerContainer<String, String>(
                new DefaultKafkaConsumerFactory(consumerConfigs),
                containerProperties);
        listenerContainer.start();
    }
}

comments powered by Disqus