恰好一次发送和事务消息(译)
Kafka提供“至少一次”交付语义, 这意味着发送的消息可以传送一次或多次. 人们真正想要的是“一次”语义,因为重复的消息没有被传递。
普遍地发声重复消息的情况有两种:
- 如果客户端尝试向集群发送消息并获取网络错误, 则重试可能会导致重复. 如果在发送消息之前发生网络错误, 则不会发生重复. 但是, 如果在将消息附加到日志之后发生网络错误, 但在将响应发送给发件人之前, 发件人将不知道发生了什么. 唯一的选择是重试和冒险重复或放弃并声明消息丢失。
- 如果客户端尝试向集群发送消息并获取网络错误, 则重试可能会导致重复. 如果在发送消息之前发生网络错误, 则不会发生重复. 但是, 如果在将消息附加到日志之后发生网络错误, 但在将响应发送给发件人之前, 发件人将不知道发生了什么. 唯一的选择是重试和冒险重复或放弃并声明消息丢失。
第二种情况可以通过使用Kafka提供的偏移量由消费者处理. 他们可以将偏移量与其输出进行存储, 然后确保新消费者始终从最后存储的偏移量中提取. 或者, 他们可以使用偏移量作为一种关键字, 并使用它来对其输出的任何最终目标系统进行重复数据删除。
Producer API改动
KafkaProducer.java
public interface Producer<K,V> extends Closeable {
/**
* Needs to be called before any of the other transaction methods. Assumes that
* the transactional.id is specified in the producer configuration.
*
* This method does the following:
* 1. Ensures any transactions initiated by previous instances of the producer
* are completed. If the previous instance had failed with a transaction in
* progress, it will be aborted. If the last transaction had begun completion,
* but not yet finished, this method awaits its completion.
* 2. Gets the internal producer id and epoch, used in all future transactional
* messages issued by the producer.
*
* @throws IllegalStateException if the TransactionalId for the producer is not set
* in the configuration.
*/
void initTransactions() throws IllegalStateException;
/**
* Should be called before the start of each new transaction.
*
* @throws ProducerFencedException if another producer is with the same
* transactional.id is active.
*/
void beginTransaction() throws ProducerFencedException;
/**
* Sends a list of consumed offsets to the consumer group coordinator, and also marks
* those offsets as part of the current transaction. These offsets will be considered
* consumed only if the transaction is committed successfully.
*
* This method should be used when you need to batch consumed and produced messages
* together, typically in a consume-transform-produce pattern.
*
* @throws ProducerFencedException if another producer is with the same
* transactional.id is active.
*/
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException;
/**
* Commits the ongoing transaction.
*
* @throws ProducerFencedException if another producer is with the same
* transactional.id is active.
*/
void commitTransaction() throws ProducerFencedException;
/**
* Aborts the ongoing transaction.
*
* @throws ProducerFencedException if another producer is with the same
* transactional.id is active.
*/
void abortTransaction() throws ProducerFencedException;
/**
* Send the given record asynchronously and return a future which will eventually contain the response information.
*
* @param record The record to send
* @return A future which will eventually contain the response information
*
*/
public Future<RecordMetadata> send(ProducerRecord<K, V> record);
/**
* Send a record and invoke the given callback when the record has been acknowledged by the server
*/
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
}
OutOfSequenceException
如果broker检测到数据丢失,生产者将抛出OutOfOrderSequenceException。 换句话说,如果它接收到大于其预期的序列的序列号。 未来将返回此异常,并传递给回调(如果有)。 这是一个致命的异常,新的Producer方法如send,beginTransaction,commitTransaction等将会抛出IlegalStateException。
应用示例
public class KafkaTransactionsExample {
public static void main(String args[]) {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);
// Note that the ‘transactional.id’ configuration _must_ be specified in the
// producer config in order to use transactions.
KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);
// We need to initialize transactions once per producer instance. To use transactions,
// it is assumed that the application id is specified in the config with the key
// transactional.id.
//
// This method will recover or abort transactions initiated by previous instances of a
// producer with the same app id. Any other transactional messages will report an error
// if initialization was not performed.
//
// The response indicates success or failure. Some failures are irrecoverable and will
// require a new producer instance. See the documentation for TransactionMetadata for a
// list of error codes.
producer.initTransactions();
while(true) {
ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
if (!records.isEmpty()) {
// Start a new transaction. This will begin the process of batching the consumed
// records as well
// as an records produced as a result of processing the input records.
//
// We need to check the response to make sure that this producer is able to initiate
// a new transaction.
producer.beginTransaction();
// Process the input records and send them to the output topic(s).
List<ProducerRecord<String, String>> outputRecords = processRecords(records);
for (ProducerRecord<String, String> outputRecord : outputRecords) {
producer.send(outputRecord);
}
// To ensure that the consumed and produced messages are batched, we need to commit
// the offsets through
// the producer and not the consumer.
//
// If this returns an error, we should abort the transaction.
sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets());
// Now that we have consumed, processed, and produced a batch of messages, let's
// commit the results.
// If this does not report success, then the transaction will be rolled back.
producer.endTransaction();
}
}
}
}
新增配置
Broker配置
配置 | 描述 |
---|---|
transactional.id.timeout.ms | 事务协调器在主动过期生成器TransactionalId之前等待的最大时间(以ms为单位),而不从中接收任何事务状态更新。默认为604800000(7天)。 这允许定期的每周生产者工作来维护其ID。 |
max.transaction.timeout.ms | 允许的最大的事务超时时间. 如果一个客户端的事务请求超出这个设置, broker会在InitPidRequest的时候返回一个InvalidTransactionTimeout. 这样可以防止客户端太大的超时,这可能会延迟消费者从包含在事务中的主题中读取消息. 默认值为900000(15分钟)。 这是在消息的交易需要发送的时间段内的保守上限。 |
transaction.state.log.replication.factor | 事务状态主题(__transaction_state)的副本数, 默认为3 |
transaction.state.log.num.partitions | 事务状态主题(__transaction_state)的的分区数, 默认为50 |
transaction.state.log.min.isr | 事务状态主题的每个分区的最小数量的异步副本需要被视为联机的。 默认为2 |
transaction.state.log.segment.bytes | 事务状态主题的段大小。默认值:104857600字节。100m |
生产者配置
配置 | 描述 |
---|---|
enable.idempotence | 是否启用幂等(默认情况下为false)。 如果禁用,生产者将不会在生成请求中设置PID字段,并且当前的生产者传递语义将生效。 请注意,必须启用幂等才能使用事务。当启用幂等时,我们强制执行acks = all,retries> 1和max.inflight.requests.per.connection = 1。 没有这些配置的这些值,我们不能保证幂等。 如果这些设置未被应用程序显式覆盖,则在启用幂等时,生产者将设置acks = all,retries = Integer.MAX_VALUE和max.inflight.requests.per.connection = 1。 |
transaction.timeout.ms | 在主动中止正在进行的事务之前,事务协调器将等待生产者的事务状态更新的最长时间(以ms为单位)。 |
transactional.id | 用于事务传递的TransactionalId。 这使得可以跨越多个生产者会话的可靠性语义,因为它允许客户端保证在开始任何新事务之前使用相同的TransactionalId的事务已经完成。 如果没有提供TransactionalId,则生产者被限制为幂等传递。请注意,如果配置了TransactionalId,则必须启用enable.idempotence。默认值为空,这意味着无法使用事务。 |
消费者配置
配置 | 描述 |
---|---|
isolation.level | 以下是可能的值(默认为read_uncommitted):read_uncommitted:在偏移顺序中消费已提交和未提交的消息; read_committed:仅以偏移顺序消耗非事务性消息或已提交事务消息。 为了保持偏移顺序,该设置意味着我们必须缓冲消费者中的消息,直到我们看到给定事务中的所有消息。 |
2
Idempotent Producer
幂等生产者保障
为了实现幂等生产者语义, 引入了producer id
的概念, 下面称PID
. 每个producer在初始化的时候会被分配一个唯一PID. PID的分配对用户来说是完全透明的, 且没有被客户端暴露.
PID是从0开始单调递增的, 还有一个将要将要接受消息的主题分区的序号. 序号会随着producer向broker发送消息增长. broker在内存中维护着从每个PID中发过来的序号. 如果序号不是比上次提交PID/TopicParition组中的的序号大一, broker会拒绝producer的请求. 带有较小序号的消息会引发重复错误, producer可以忽略该错误. 带有较大的序号的消息会导致超出序号的错误, 意味着存在消息丢失, 这是致命的错误.
为了保证每条消息都被恰好一次地持久化在log中, producer需要在失败的时候重试请求. 每个生产者实例都会得到一个新的唯一的PID, 因此我们只能在单一的生产者会话中保证幂等.
这些幂等生成器语义对于无状态应用程序(如指标跟踪和审计)是非常有用的。
事务保障
在核心上, 事务保证使应用程序能够以原子方式生成多个主题分区, 对这些主题分区的所有写入将成功或失败作为一个单元。
此外, 由于消费者进度被记录为对偏移主题的写入, 所以利用上述能力来使得应用能够将消费和产生的消息批量化成单个原子单元. 只有整个“消费变换产品”全部执行, 才能将消息集合视为消费。
为了跨多个生产者会话实现幂等, 需要提供一个在应用层面可以稳定的跨多个会话的transactionalId. transactionalId由用户提供.
有transactionalId后, Kafka可以保证:
- 一个给定的transactionalId只有一个活跃的producer. 如果有新的使用同一个transactionalId的producer实例上线, 旧的实例会被隔离.
- 跨应用会话的事务恢复, 当一个应用实例死掉后, broker会结束(取消或者提交)未完成的事务以保护新上线的实例, 在恢复工作之前将新实例置于干净的状态.
注意这里提到的事务保障是从producer的角度. 在consumer端, 保障就会弱一些. 特别是, 我们不能保证承诺事务的所有消息都将一起被消费。原因如下:
- 对于压缩主题, 事务的一些消息可能被较新版本覆盖。
- 事务可能跨越日志段. 因此, 当旧段删除时, 我们可能会在事务的第一部分丢失一些消息。
- 消费者可能会在事务中寻求任意的offset, 因此缺少一些初始消息。
- 消费者可能不会从参与事务的所有分区中消费. 因此, 他们永远无法读取包含该事务的所有消息。
关键概念
实现事务, 即确保一组消息以原子方式产生和消费, 我们介绍几个新概念:
- 我们引进一个称为事务协调器(Transaction Coordinator)的新实体。与消费者组协调器类似, 每个生产者都被分配一个事务协调器, 所有分配PID和管理事务的逻辑都由事务协调器完成。
- 我们引入一个名为事务日志(Transaction Log)的新的内部kafka主题(__transaction_state)。与Consumer Offsets主题(__consumer_offsets)类似, 事务日志是每个事务的持久和复制记录。事务日志是事务协调器的状态存储, 最新版本的日志的快照封装了每个活动事务的当前状态。
- 我们引入控制消息(Control Messages)的概念。这些是写入用户主题的特殊消息, 由客户端处理, 但不会暴露给用户。例如, 它们被用于让broker向消费者表明先前提取的消息是否已经原子性地提交。以前在这里提出控制消息。
- 我们引入了TransactionalId的概念, 使用户能够以持续的方式唯一地识别生产者。具有相同TransactionalId的生产者的不同实例将能够恢复(或中止)由上一个实例实例化的任何事务。
- 我们引入生产者代(producer epoch)的概念, 这使我们能够确保只有一个具有给定的TransactionalId的生产者的合法活动实例, 从而使我们能够在发生故障的情况下维护事务保证。
除了上述新概念之外, 我们还引入了新的请求类型, 新版本的现有请求以及新版本的核心消息格式, 以支持事务。所有这些的细节将推迟到其他文档。
数据流
在上图中, 尖锐的边框表示不同的机器. 底部的圆形盒子表示Kafka TopicPartitions, 而对角圆形的框代表在broker内部运行的逻辑实体。
每个箭头表示RPC或写入Kafka主题. 这些操作按照每个箭头旁边的数字表示的顺序进行. 下面的部分编号为与上图中的操作相匹配, 并描述相关操作。
1. 查找一个事务协调器 — FindCoordinatorRequest
事务协调器是分配PIDs和管理事务的核心组件, producer的第一件事是发送一个FindCoordinatorRequest请求(之前被称为GroupCoordinatorRequest, 但是现在更名为更一般的用法)到broker去获取其coordinator的位置. 译者补充比如ip, port.
2. 获取一个Producer Id — InitPidRequest
获取到coordinator位置之后, 下一步是获取producer的PID. 这个通过发送InitPidRequest请求到事务协调器完成.
2.1当有指定TransactionlId时
如果有配置transactionl.id, TransactionalId会随着InitPidRequest请求发出, 同时在2a中将PID和TransactionalId的对应关系保存在事务日志中. 这使我们能够将TransactionalId返回相同的PID给生产者的未来实例, 因此可以恢复或中止以前不完整的事务。
除了返回PID之外, InitPidRequest还执行以下任务:
1. 提升PID的代, 使生产者的任何之前的僵尸实例被隔离起来, 不能处理事务.
2. 恢复(向前滚动或回滚)由生产者的上一个实例没有完成的任务事务.
InitPIDRequest的处理是同步完成的. 一旦返回, producer可以发送数据和开始新的事务.
2.2当没有指定TransactionalId
如果没有配置TransactionalId, 会分配一个新的PID. 这是producer只在单一的session中实现了幂等语义和事务语义.
3. 启动事务 — beginTransaction() API
新的KafkaProducer
有一个beginTransaction()方法用来发出开始事务的信号. 生产者记录指示交易已经开始的本地状态, 但是在发送第一条记录之前, 在协调器看来事务还没有开始.
4. 消费-转换-生产循环
在这个阶段, producer开始执行组成事务消费-转换-生产消息的流程. 这是一个很长的阶段, 可能包含多个请求
4.1 AddPartitionsToTxnRequest
作为事务的一部分,生产者首次将新的TopicPartition作为事务的一部分发送给事务协调器。 协调器在步骤4.1a中记录了将此TopicPartition添加到事务中。 我们需要这些信息,以便我们可以将提交或中止标记写入每个TopicPartition(有关详细信息,请参阅第5.2节)。 如果这是添加到事务的第一个分区,协调器也将启动事务计时器。
4.2 ProduceRequest
生产者通过一个或多个ProduceRequests(从生产者的发送方法触发)向用户的主题分区写入一堆消息。 这些请求包括如4.2a所示的PID,代和序号。
4.3 AddOffsetCommitsToTxnRequest
生产者有一个新的KafkaProducer.sendOffsetsToTransaction API方法,它可以批量消费和生成的消息。 此方法接受Map <TopicPartitions,OffsetAndMetadata>和groupId参数。
sendOffsetsToTransaction方法向事务协调器发送一个带有groupId的AddOffsetCommitsToTxnRequests,从而可以在内部__consumer-offsets主题中推导出该消费者组的TopicPartition。 事务协调器将在步骤4.3a中将该主题分区添加到事务日志中。
4.4 TxnOffsetCommitRequest
另外作为sendOffset的一部分,生产者将向消费者协调器发送一个TxnOffsetCommitRequest,以在__consumer-offsets主题中保留偏移量(步骤4.4a)。 消费者协调员通过使用作为该请求的一部分发送的PID和生产者代来验证生产者是否允许发出请求(而不是僵尸)。
消费的offsets在事务提交之前不可见,这是我们现在将讨论的过程。
5. 提交或者终结事务
一旦写入数据,用户必须调用KafkaProducer的新的commitTransaction或abortTransaction API方法。 这些方法将分别开始提交或中止事务。
5.1 EndTxnRquest
当生产者完成事务时,必须调用新引入的KafkaProducer.endTransaction或KafkaProducer.abortTransaction API方法。 前者使得步骤4
中生产的数据可用于下游消费者。 后者有效地从日志中擦除生成的数据: 用户永远不可访问。 下游消费者将读取并丢弃已中止的消息。
无论调用哪个生产者方法,生产者向事务协调器发出一个EndTxnRequest请求,附加数据指示事务是提交还是中止。 在收到此请求后,协调器:
- 将PREPARE_COMMIT或PREPARE_ABORT消息写入事务日志。 (步骤5.1a)
- 通过WriteTxnMarkerRequest开始向用户日志写入称为COMMIT(或ABORT)标记的命令消息的过程。 (见下文第5.2节)。
- 最后将COMMITTED(或ABORTED)消息写入事务日志。 (见下文5.3)。
5.2 WriteTxnMarkerRequest
该请求由事务协调器发送给作为事务一部分的每个主题分配的leader. 在收到此请求后, 每个代理将向日志写入COMMIT(PID)或ABORT(PID)控制消息。 (步骤5.2a)
该消息向消费者指示具有给定PID的消息是否必须传递给用户或丢弃。 因此,消费者将缓冲具有PID的消息,直到它读取相应的COMMIT或ABORT消息,此时它将分别递送或丢弃消息。
请注意,如果__consumer-offsets主题是事务中的TopicPartition之一,则提交(或中止)标记也将写入日志,并且通知消费者协调器,以便在以下情况下实现这些偏移量 在中止情况下提交或忽略它们(左侧的步骤5.2a)。
5.3 Writing the final Commit or Abort Message
在所有提交或中止标记写入数据日志之后,事务协调器将最后的COMMITTED或ABORTED消息写入事务日志,指示事务完成(图中的步骤5.3)。 此时,可以删除与事务日志中的事务有关的大多数消息。
我们只需要保留完成的事务的PID以及时间戳,所以我们最终可以删除生产者的TransactionalId-> PID映射。 请参阅下面的过期PID部分。