Flink源码走读(二):Flink+Kafka实现端到端Exactly Once语义
一、前言
Flink通过Checkpoint机制实现了消息对状态影响的Exactly Once语义,即每条消息只会影响Flink内部状态有且只有一次。但无法保证输出到Sink中的数据不重复。以图一所示为例,Flink APP收到Source中的A消息,将其转化为B消息输出到Sink,APP在处理完A1后做了一次Checkpoint,假设APP在处理到A4时发生错误重启,APP将会重新从A2开始消费并处理数据,就会导致B2和B3重复输出到Sink中两次。
本文中端到端Exactly Once的含义就是:Source的每条数据会被处理有且仅有一次,并且输出到Sink中的结果也不重不漏。
要实现端到端Exactly Once语义需要借助于Sink对消息事务的支持。好在Kafka在0.11版本中加入了对事务的支持,Flink使用Kafka的这个特性实现了端到端Exactly Once语义的数据处理。本文先简单介绍Kafka的消息事务,然后对照源码解读下Flink是如何实现输出消息不重不漏的。
二、Kafka消息事务
Kafka实现事务的出发点很简单:允许Producer原子性的发布一组消息,即允许一组消息对Consumer要么全部可见,要么全部不可见,不会存在中间状态。
首先介绍几个Kafka消息事务中的几个概念:
- Transaction Cordinator:Kafka事务的协调器,两阶段提交协调者,负责记录当前正在执行的Transaction,写Transaction log等。
- producer id(PID):用于标识执行事务的producer,由Transaction Cordinator分配,对Kafka客户端使用者透明。在Kafka的事务中,同一个事务只能由一个producer操作,就像mysql事务中所有的sql命令都必须来自同一个客户端连接一样。但是这里所说的“同一个producer”,并不是指同一个运行着producer的进程,而是持有相同PID的producer。例如,进程P1运行着一个Kafka producer,正在执行一个事务,它持有的PID是x,某一时刻进程P1意外终止,启动了另一个进程P2作为Kafka producer,只要进程P2能获取到x当做自己的PID(用相同的transactional id请求Transaction Cordinator),它就能继续之前的事务。换言之,即使是同一个进程P1,在运行过程中改变自身的PID(改变transactional id请求Transaction Cordinator),也就无法执行之前的事务了。对于每个producer id还有一个epoch的概念,用来防止两个进程同时操作同一个事务。
- transactional id:用于标识一个事务,需要客户端使用者指定。客户端调用InitPidRequest(TransactionalId, TransactionTimeoutMs)方法向Transaction Cordinator请求初始化PID,相同的transactional id会得到相同的PID,并且使PID的epoch加一,Kafka只接受具有最大epoch的producer生产的消息,拒绝其他具有相同PID的producer(僵尸实例)。开启一个新的事务只需要生成一个未在使用中的transactional id即可,并没有什么特别的要求,后面我们会看到Flink Kafka Sink是如何生成transactional id的。
- Transaction Marker:消息队列中用于标识事务开始结束的特殊控制消息。
图二展示了2个Producer在向Kafka同一个Topic的同一个Partition写入事务消息时,Kafka是如何存储事务消息的。Producer 1调用BeginTransaction后开始向Topic中生产事务消息,当第一条消息m1到达broker时,Transaction 1便开始了,消息m1中会有一个PID字段标识它是属于Transaction 1的,后面的消息也是相同的道理。Producer 1和Producer 2在一段时间内均向该Topic写入事务消息,消息便按照先后顺序排列在消息队列中。当Producer 1 Commit Transaction时,broker会向消息队列中插入一条控制消息Commit T1(Transaction Marker),同理Producer 2 Abort Transaction时,broker会插入Abort T2的控制消息。通过控制消息,Consumer在顺序消费的过程中,就知道每条消息是否应该可见。
以图二为例,假设m1是该Partition的第一条消息,且只有Producer 1和Producer 2在写入消息。在消息写入到m11时,所有消息对于消费者都是不可见的,因为不确定T1和T2最后是Commit还是Abort。当Producer 1执行Commit后,m1对于消费者是可见,因为m1之前的所有消息都已经确定状态了(只有m1一条消息),而由于m2并未确定状态,因此m2后面的消息对于消费者都是不可见的。当Producer 2执行Abort后,m1、m3、m4、m11便对消费者可见了(因为m12之前的所有消息状态都确定了),m2、m10、m12由于T2 Abort便会在消费的过程中被过滤掉,这种情况下Consumer消费出来消息的Offset便是不连续的。
Kafka事务消息写入的方式可以扩展到多Topic、多Partition的写入,只需要在Commit(Abort)时同时向所有涉及到的Partition写入控制消息,只是多条控制消息的原子性写入就是一个分布式事务问题了,因此Kafka采用了两阶段提交的方式实现事务。
三、Flink利用Kafka消息事务实现端到端Exactly Once语义
Flink实现内部状态Exactly Once的语义基本原理是:隔一段时间做一个Checkpoint,持久化记录当前上游Source处理到哪里了(如Kafka offset),以及当时本地状态的值,如果过了一会进程挂了,就把这个持久化保存的Checkpoint读出来,加载当时的状态,并从当时的位置重新开始处理,这样每条消息一定只会影响自身状态一次。但这种方式是没办法保证输出到下游Sink的数据不重复的。要想下游输出的消息不重,就需要下游Sink支持事务消息,把两次checkpoint之间输出的消息当做一个事务提交,如果新的checkpoint成功,则Commit,否则Abort。这样实现就解决了图一中B2和B3重复输出的问题,执行到A4时出错重启,由于还未产生新的Checkpoint,红色B2和B3所在的Transaction不会Commit,也就对下游消费者不可见。
1. TwoPhaseCommitSinkFunction
首先,我们简单回顾下Flink做checkpoint的流程:当Checkpoint过程开始时,JobManager会向数据流中插入一个Checkpoint barrier,下游的算子收到checkpoint barrier就对本算子的状态做Checkpoint,这样就保证所有算子在checkpoint中的状态是同步的。每个算子在Checkpoint完成之后会告知JobManager,JobManager在所有算子完成Checkpoint之后,会向所有算子推送一个NotifyCheckpointComplete消息。
Flink依赖下游Sink对事务的支持,实现端到端Exactly Once语义,而两阶段提交是解决分布式事务问题一个比较通用的解决方案,因此Flink抽象出了TwoPhaseCommitSinkFunction这个类来完成向下游Sink做两阶段提交的工作。
下面,我们具体来看下TwoPhaseCommitSinkFunction是如何工作的。
TwoPhaseCommitSinkFunction在收到Checkpoint barrier,开始做自身Checkpoint之前,对Sink做pre-commit,在整个系统制作Checkpoint的同时让下游Sink开始执行预提交;同时对Sink做一个begin transaction,开启下一个事务,由于在制作Checkpoint的过程中,Flink仍然可以继续处理后面的消息,这样就能保证后续消息在下一个事务周期中;完成自身Checkpoint后,收到JobManager发来的NotifyCheckpointComplete消息时,对Sink做commit,完成两阶段提交的过程,此时这个周期发送的数据才会对下游的消费者可见。
对着代码我们再来过一下这个过程:
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// this is like the pre-commit of a 2-phase-commit transaction
// we are ready to commit and remember the transaction
checkState(currentTransactionHolder != null, "bug: no transaction object when performing state snapshot");
long checkpointId = context.getCheckpointId();
LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'", name(), context.getCheckpointId(), currentTransactionHolder);
preCommit(currentTransactionHolder.handle);
pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);
currentTransactionHolder = beginTransactionInternal();
LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);
state.clear();
state.add(new State<>(
this.currentTransactionHolder,
new ArrayList<>(pendingCommitTransactions.values()),
userContext));
}
snapshotState继承自CheckpointedFunction接口,也就是收到checkpoint barrier后,执行自身状态checkpoint的函数。可以看到,首先对currentTransaction执行了Pre-Commit,并将currentTransaction放入pendingCommitTransactions中,同时开启了新的Transaction作为currentTransaction,最后将currentTransaction和pendingCommitTransactions都作为自身状态放入checkpoint中(这里将事务信息也放入状态中,可以保证从Checkpoint恢复时能继续之前的事务)。
@Override
public final void notifyCheckpointComplete(long checkpointId) throws Exception {
// the following scenarios are possible here
//
// (1) there is exactly one transaction from the latest checkpoint that
// was triggered and completed. That should be the common case.
// Simply commit that transaction in that case.
//
// (2) there are multiple pending transactions because one previous
// checkpoint was skipped. That is a rare case, but can happen
// for example when:
//
// - the master cannot persist the metadata of the last
// checkpoint (temporary outage in the storage system) but
// could persist a successive checkpoint (the one notified here)
//
// - other tasks could not persist their status during
// the previous checkpoint, but did not trigger a failure because they
// could hold onto their state and could successfully persist it in
// a successive checkpoint (the one notified here)
//
// In both cases, the prior checkpoint never reach a committed state, but
// this checkpoint is always expected to subsume the prior one and cover all
// changes since the last successful one. As a consequence, we need to commit
// all pending transactions.
//
// (3) Multiple transactions are pending, but the checkpoint complete notification
// relates not to the latest. That is possible, because notification messages
// can be delayed (in an extreme case till arrive after a succeeding checkpoint
// was triggered) and because there can be concurrent overlapping checkpoints
// (a new one is started before the previous fully finished).
//
// ==> There should never be a case where we have no pending transaction here
//
Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending");
Throwable firstError = null;
while (pendingTransactionIterator.hasNext()) {
Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
Long pendingTransactionCheckpointId = entry.getKey();
TransactionHolder<TXN> pendingTransaction = entry.getValue();
if (pendingTransactionCheckpointId > checkpointId) {
continue;
}
LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",
name(), checkpointId, pendingTransaction, pendingTransactionCheckpointId);
logWarningIfTimeoutAlmostReached(pendingTransaction);
try {
commit(pendingTransaction.handle);
} catch (Throwable t) {
if (firstError == null) {
firstError = t;
}
}
LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);
pendingTransactionIterator.remove();
}
if (firstError != null) {
throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",
firstError);
}
}
notifyCheckpointComplete接口继承自CheckpointListener,就是收到JobManager发送的NotifyCheckpointComplete消息时执行的函数。这个函数就是简单的将所有pendingTransactions Commit掉。函数的注释解释了一般情况下pendingTransactions应该只有一个,即刚触发的snapshotState中Pre-Commit的pendingTransaction,但也有可能出现多个pendingTransactions的情况,比如上一次checkpoint之后的NotifyCheckpointComplete消息晚到了的情况。
// ------ methods that should be implemented in child class to support two phase commit algorithm ------
/**
* Write value within a transaction.
*/
protected abstract void invoke(TXN transaction, IN value, Context context) throws Exception;
/**
* Method that starts a new transaction.
*
* @return newly created transaction.
*/
protected abstract TXN beginTransaction() throws Exception;
/**
* Pre commit previously created transaction. Pre commit must make all of the necessary steps to prepare the
* transaction for a commit that might happen in the future. After this point the transaction might still be
* aborted, but underlying implementation must ensure that commit calls on already pre committed transactions
* will always succeed.
*
* <p>Usually implementation involves flushing the data.
*/
protected abstract void preCommit(TXN transaction) throws Exception;
/**
* Commit a pre-committed transaction. If this method fail, Flink application will be
* restarted and {@link TwoPhaseCommitSinkFunction#recoverAndCommit(Object)} will be called again for the
* same transaction.
*/
protected abstract void commit(TXN transaction);
/**
* Abort a transaction.
*/
protected abstract void abort(TXN transaction);
TwoPhaseCommitSinkFunction保留了5个函数需要子类去实现:
- invoke:定义了作为sink如何写数据到外部系统。每一个sink都需要定义invoke函数,sink算子每收到一条数据都会触发一次invoke函数,这里的sink函数只是多了一个transaction入参。
- beginTransaction、preCommit、commit、abort:两阶段提交协议的几个步骤。如果外部系统本身支持两阶段提交(如Kafka),这些函数的实现就是调用外部系统两阶段提交协议对应的函数。
2. FlinkKafkaProducer011
了解了TwoPhaseCommitSinkFunction再来看FlinkKafkaProducer011就简单多了。
@Override
public void invoke(KafkaTransactionState transaction, IN next, Context context) throws FlinkKafka011Exception {
checkErroneous();
byte[] serializedKey = schema.serializeKey(next);
byte[] serializedValue = schema.serializeValue(next);
String targetTopic = schema.getTargetTopic(next);
if (targetTopic == null) {
targetTopic = defaultTopicId;
}
Long timestamp = null;
if (this.writeTimestampToKafka) {
timestamp = context.timestamp();
}
ProducerRecord<byte[], byte[]> record;
int[] partitions = topicPartitionsMap.get(targetTopic);
if (null == partitions) {
partitions = getPartitionsByTopic(targetTopic, transaction.producer);
topicPartitionsMap.put(targetTopic, partitions);
}
if (flinkKafkaPartitioner != null) {
record = new ProducerRecord<>(
targetTopic,
flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions),
timestamp,
serializedKey,
serializedValue);
} else {
record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
}
pendingRecords.incrementAndGet();
transaction.producer.send(record, callback);
}
FlinkKafkaProducer011中实现的invoke函数就是将输入的消息(next)构造为一个Kafka record,并调用Kafka客户端的send方法发送出去。
@Override
protected KafkaTransactionState beginTransaction() throws FlinkKafka011Exception {
switch (semantic) {
case EXACTLY_ONCE:
FlinkKafkaProducer<byte[], byte[]> producer = createTransactionalProducer();
producer.beginTransaction();
return new KafkaTransactionState(producer.getTransactionalId(), producer);
case AT_LEAST_ONCE:
case NONE:
// Do not create new producer on each beginTransaction() if it is not necessary
final KafkaTransactionState currentTransaction = currentTransaction();
if (currentTransaction != null && currentTransaction.producer != null) {
return new KafkaTransactionState(currentTransaction.producer);
}
return new KafkaTransactionState(initNonTransactionalProducer(true));
default:
throw new UnsupportedOperationException("Not implemented semantic");
}
}
@Override
protected void preCommit(KafkaTransactionState transaction) throws FlinkKafka011Exception {
switch (semantic) {
case EXACTLY_ONCE:
case AT_LEAST_ONCE:
flush(transaction);
break;
case NONE:
break;
default:
throw new UnsupportedOperationException("Not implemented semantic");
}
checkErroneous();
}
@Override
protected void commit(KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
try {
transaction.producer.commitTransaction();
} finally {
recycleTransactionalProducer(transaction.producer);
}
}
}
@Override
protected void abort(KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
transaction.producer.abortTransaction();
recycleTransactionalProducer(transaction.producer);
}
}
FlinkKafkaProducer011中实现的beginTransaction、preCommit、commit、abort主要就是调用Kafka Producer客户端对应的两阶段提交协议的函数。另外值得注意的有2点:
- 在preCommit函数中调用了flush方法。从TwoPhaseCommitSinkFunction的分析中可以看到preCommit是在snapshotState方法中调用的,而snapshotState方法是在算子Checkpoint的时候触发的。这样就保证了算子在做Checkpoint时,所有该Checkpoint之前的数据都已经安全的发送到了下游(而不是在缓存中)。以图三为例,sink算子在收到第一个Checkpoint barrier时触发Checkpoint操作,而在Checkpoint完成之前,必须保证m1-m5这5条消息都已经发送到了下游,否则如果Checkpoint完成,而m1-m5中有消息没有送达,就会发生消息丢失。在snapshotState方法中保证缓存中的数据都已经发送出去是一个很通用的做法,在自己实现定制化SinkFunction时也要注意。这里的flush方法最终调用的是Kafka Producer客户端的flush方法,这是一个阻塞的方法,会等到所有缓存中的消息真正发给Kafka才返回,所以有时看到Checkpoint时间有毛刺,也可能是受这个flush的影响。
- 在beginTransaction里调用了getTransactionalId,在commit和abort中调用了recycleTransactionalProducer。这里可以回顾下第二部分中提到的如何生成Kafka transactional id的问题,看一下Flink是如何产生这个id的。从下面的代码中可以看出Flink用一个队列作为transactional id的Pool,新的Transaction开始时从队头拿出一个transactional id,Transaction结束时将transactional id放回队尾。因为每开始一个Transaction,都会构造一个新的Kafka Producer,因此availableTransactionalIds初始的大小就是配置的Kafka Producer Pool Size(默认是5)。
/**
* Pool of available transactional ids.
*/
private final BlockingDeque<String> availableTransactionalIds = new LinkedBlockingDeque<>();
/**
* For each checkpoint we create new {@link FlinkKafkaProducer} so that new transactions will not clash
* with transactions created during previous checkpoints ({@code producer.initTransactions()} assures that we
* obtain new producerId and epoch counters).
*/
private FlinkKafkaProducer<byte[], byte[]> createTransactionalProducer() throws FlinkKafka011Exception {
String transactionalId = availableTransactionalIds.poll();
if (transactionalId == null) {
throw new FlinkKafka011Exception(
FlinkKafka011ErrorCode.PRODUCERS_POOL_EMPTY,
"Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.");
}
FlinkKafkaProducer<byte[], byte[]> producer = initTransactionalProducer(transactionalId, true);
producer.initTransactions();
return producer;
}
private void recycleTransactionalProducer(FlinkKafkaProducer<byte[], byte[]> producer) {
availableTransactionalIds.add(producer.getTransactionalId());
producer.close();
}
四、总结
Flink使用Kafka的消息事务实现的端到端Exactly Once消息送达,其实是一个比较通用的解决方案,了解了其原理,可以很快将这种方案套用到其他支持事务的外部存储或消息队列。
Flink使用Kafka事务的方式,对于业务开发中正确使用Kafka也是一个很好的demo,在其他工程中使用Kafka实现消息的强一致性,也可以借鉴Flink的代码。
参考文献
1 Kafka 设计解析(八):Kafka 事务机制与 Exactly Once 语义实现原理。https://www.infoq.cn/article/kafka-analysis-part-8
2 Exactly Once Delivery and Transactional Messaging in Kafka. https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit
3 An Overview of End-to-End Exactly-Once Processing in Apache Flink® (with Apache Kafka, too!). https://www.ververica.com/blog/end-to-end-exactly-once-processing-apache-flink-apache-kafka