Flink源码走读(二):Flink+Kafka实现端到端Exactly Once语义

一、前言

Flink通过Checkpoint机制实现了消息对状态影响的Exactly Once语义,即每条消息只会影响Flink内部状态有且只有一次。但无法保证输出到Sink中的数据不重复。以图一所示为例,Flink APP收到Source中的A消息,将其转化为B消息输出到Sink,APP在处理完A1后做了一次Checkpoint,假设APP在处理到A4时发生错误重启,APP将会重新从A2开始消费并处理数据,就会导致B2和B3重复输出到Sink中两次。

图一 Flink输出消息重复

本文中端到端Exactly Once的含义就是:Source的每条数据会被处理有且仅有一次,并且输出到Sink中的结果也不重不漏

要实现端到端Exactly Once语义需要借助于Sink对消息事务的支持。好在Kafka在0.11版本中加入了对事务的支持,Flink使用Kafka的这个特性实现了端到端Exactly Once语义的数据处理。本文先简单介绍Kafka的消息事务,然后对照源码解读下Flink是如何实现输出消息不重不漏的。

二、Kafka消息事务

Kafka实现事务的出发点很简单:允许Producer原子性的发布一组消息,即允许一组消息对Consumer要么全部可见,要么全部不可见,不会存在中间状态。

首先介绍几个Kafka消息事务中的几个概念:

  • Transaction Cordinator:Kafka事务的协调器,两阶段提交协调者,负责记录当前正在执行的Transaction,写Transaction log等。
  • producer id(PID):用于标识执行事务的producer,由Transaction Cordinator分配,对Kafka客户端使用者透明。在Kafka的事务中,同一个事务只能由一个producer操作,就像mysql事务中所有的sql命令都必须来自同一个客户端连接一样。但是这里所说的“同一个producer”,并不是指同一个运行着producer的进程,而是持有相同PID的producer。例如,进程P1运行着一个Kafka producer,正在执行一个事务,它持有的PID是x,某一时刻进程P1意外终止,启动了另一个进程P2作为Kafka producer,只要进程P2能获取到x当做自己的PID(用相同的transactional id请求Transaction Cordinator),它就能继续之前的事务。换言之,即使是同一个进程P1,在运行过程中改变自身的PID(改变transactional id请求Transaction Cordinator),也就无法执行之前的事务了。对于每个producer id还有一个epoch的概念,用来防止两个进程同时操作同一个事务。
  • transactional id:用于标识一个事务,需要客户端使用者指定。客户端调用InitPidRequest(TransactionalId, TransactionTimeoutMs)方法向Transaction Cordinator请求初始化PID,相同的transactional id会得到相同的PID,并且使PID的epoch加一,Kafka只接受具有最大epoch的producer生产的消息,拒绝其他具有相同PID的producer(僵尸实例)。开启一个新的事务只需要生成一个未在使用中的transactional id即可,并没有什么特别的要求,后面我们会看到Flink Kafka Sink是如何生成transactional id的。
  • Transaction Marker:消息队列中用于标识事务开始结束的特殊控制消息。
图二 Kafka中消息存储

图二展示了2个Producer在向Kafka同一个Topic的同一个Partition写入事务消息时,Kafka是如何存储事务消息的。Producer 1调用BeginTransaction后开始向Topic中生产事务消息,当第一条消息m1到达broker时,Transaction 1便开始了,消息m1中会有一个PID字段标识它是属于Transaction 1的,后面的消息也是相同的道理。Producer 1和Producer 2在一段时间内均向该Topic写入事务消息,消息便按照先后顺序排列在消息队列中。当Producer 1 Commit Transaction时,broker会向消息队列中插入一条控制消息Commit T1(Transaction Marker),同理Producer 2 Abort Transaction时,broker会插入Abort T2的控制消息。通过控制消息,Consumer在顺序消费的过程中,就知道每条消息是否应该可见。

以图二为例,假设m1是该Partition的第一条消息,且只有Producer 1和Producer 2在写入消息。在消息写入到m11时,所有消息对于消费者都是不可见的,因为不确定T1和T2最后是Commit还是Abort。当Producer 1执行Commit后,m1对于消费者是可见,因为m1之前的所有消息都已经确定状态了(只有m1一条消息),而由于m2并未确定状态,因此m2后面的消息对于消费者都是不可见的。当Producer 2执行Abort后,m1、m3、m4、m11便对消费者可见了(因为m12之前的所有消息状态都确定了),m2、m10、m12由于T2 Abort便会在消费的过程中被过滤掉,这种情况下Consumer消费出来消息的Offset便是不连续的。

Kafka事务消息写入的方式可以扩展到多Topic、多Partition的写入,只需要在Commit(Abort)时同时向所有涉及到的Partition写入控制消息,只是多条控制消息的原子性写入就是一个分布式事务问题了,因此Kafka采用了两阶段提交的方式实现事务。

三、Flink利用Kafka消息事务实现端到端Exactly Once语义

Flink实现内部状态Exactly Once的语义基本原理是:隔一段时间做一个Checkpoint,持久化记录当前上游Source处理到哪里了(如Kafka offset),以及当时本地状态的值,如果过了一会进程挂了,就把这个持久化保存的Checkpoint读出来,加载当时的状态,并从当时的位置重新开始处理,这样每条消息一定只会影响自身状态一次。但这种方式是没办法保证输出到下游Sink的数据不重复的。要想下游输出的消息不重,就需要下游Sink支持事务消息,把两次checkpoint之间输出的消息当做一个事务提交,如果新的checkpoint成功,则Commit,否则Abort。这样实现就解决了图一中B2和B3重复输出的问题,执行到A4时出错重启,由于还未产生新的Checkpoint,红色B2和B3所在的Transaction不会Commit,也就对下游消费者不可见。

1. TwoPhaseCommitSinkFunction

首先,我们简单回顾下Flink做checkpoint的流程:当Checkpoint过程开始时,JobManager会向数据流中插入一个Checkpoint barrier,下游的算子收到checkpoint barrier就对本算子的状态做Checkpoint,这样就保证所有算子在checkpoint中的状态是同步的。每个算子在Checkpoint完成之后会告知JobManager,JobManager在所有算子完成Checkpoint之后,会向所有算子推送一个NotifyCheckpointComplete消息。

Flink依赖下游Sink对事务的支持,实现端到端Exactly Once语义,而两阶段提交是解决分布式事务问题一个比较通用的解决方案,因此Flink抽象出了TwoPhaseCommitSinkFunction这个类来完成向下游Sink做两阶段提交的工作。

下面,我们具体来看下TwoPhaseCommitSinkFunction是如何工作的。

图三 TwoPhaseCommitSinkFunction工作过程

TwoPhaseCommitSinkFunction在收到Checkpoint barrier,开始做自身Checkpoint之前,对Sink做pre-commit,在整个系统制作Checkpoint的同时让下游Sink开始执行预提交;同时对Sink做一个begin transaction,开启下一个事务,由于在制作Checkpoint的过程中,Flink仍然可以继续处理后面的消息,这样就能保证后续消息在下一个事务周期中;完成自身Checkpoint后,收到JobManager发来的NotifyCheckpointComplete消息时,对Sink做commit,完成两阶段提交的过程,此时这个周期发送的数据才会对下游的消费者可见。

对着代码我们再来过一下这个过程:

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
	// this is like the pre-commit of a 2-phase-commit transaction
	// we are ready to commit and remember the transaction

	checkState(currentTransactionHolder != null, "bug: no transaction object when performing state snapshot");

	long checkpointId = context.getCheckpointId();
	LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'", name(), context.getCheckpointId(), currentTransactionHolder);

	preCommit(currentTransactionHolder.handle);
	pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
	LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);

	currentTransactionHolder = beginTransactionInternal();
	LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);

	state.clear();
	state.add(new State<>(
		this.currentTransactionHolder,
		new ArrayList<>(pendingCommitTransactions.values()),
		userContext));
}

snapshotState继承自CheckpointedFunction接口,也就是收到checkpoint barrier后,执行自身状态checkpoint的函数。可以看到,首先对currentTransaction执行了Pre-Commit,并将currentTransaction放入pendingCommitTransactions中,同时开启了新的Transaction作为currentTransaction,最后将currentTransaction和pendingCommitTransactions都作为自身状态放入checkpoint中(这里将事务信息也放入状态中,可以保证从Checkpoint恢复时能继续之前的事务)。

@Override
public final void notifyCheckpointComplete(long checkpointId) throws Exception {
	// the following scenarios are possible here
	//
	//  (1) there is exactly one transaction from the latest checkpoint that
	//      was triggered and completed. That should be the common case.
	//      Simply commit that transaction in that case.
	//
	//  (2) there are multiple pending transactions because one previous
	//      checkpoint was skipped. That is a rare case, but can happen
	//      for example when:
	//
	//        - the master cannot persist the metadata of the last
	//          checkpoint (temporary outage in the storage system) but
	//          could persist a successive checkpoint (the one notified here)
	//
	//        - other tasks could not persist their status during
	//          the previous checkpoint, but did not trigger a failure because they
	//          could hold onto their state and could successfully persist it in
	//          a successive checkpoint (the one notified here)
	//
	//      In both cases, the prior checkpoint never reach a committed state, but
	//      this checkpoint is always expected to subsume the prior one and cover all
	//      changes since the last successful one. As a consequence, we need to commit
	//      all pending transactions.
	//
	//  (3) Multiple transactions are pending, but the checkpoint complete notification
	//      relates not to the latest. That is possible, because notification messages
	//      can be delayed (in an extreme case till arrive after a succeeding checkpoint
	//      was triggered) and because there can be concurrent overlapping checkpoints
	//      (a new one is started before the previous fully finished).
	//
	// ==> There should never be a case where we have no pending transaction here
	//
	Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
	checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending");
	Throwable firstError = null;

	while (pendingTransactionIterator.hasNext()) {
		Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
		Long pendingTransactionCheckpointId = entry.getKey();
		TransactionHolder<TXN> pendingTransaction = entry.getValue();
		if (pendingTransactionCheckpointId > checkpointId) {
			continue;
		}

		LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",
			name(), checkpointId, pendingTransaction, pendingTransactionCheckpointId);

		logWarningIfTimeoutAlmostReached(pendingTransaction);
		try {
			commit(pendingTransaction.handle);
		} catch (Throwable t) {
			if (firstError == null) {
					firstError = t;
			}
		}

		LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);

		pendingTransactionIterator.remove();
	}

	if (firstError != null) {
		throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",
			firstError);
	}
}

notifyCheckpointComplete接口继承自CheckpointListener,就是收到JobManager发送的NotifyCheckpointComplete消息时执行的函数。这个函数就是简单的将所有pendingTransactions Commit掉。函数的注释解释了一般情况下pendingTransactions应该只有一个,即刚触发的snapshotState中Pre-Commit的pendingTransaction,但也有可能出现多个pendingTransactions的情况,比如上一次checkpoint之后的NotifyCheckpointComplete消息晚到了的情况。

// ------ methods that should be implemented in child class to support two phase commit algorithm ------

/**
 * Write value within a transaction.
 */
protected abstract void invoke(TXN transaction, IN value, Context context) throws Exception;

/**
 * Method that starts a new transaction.
 *
 * @return newly created transaction.
 */
protected abstract TXN beginTransaction() throws Exception;

/**
 * Pre commit previously created transaction. Pre commit must make all of the necessary steps to prepare the
 * transaction for a commit that might happen in the future. After this point the transaction might still be
 * aborted, but underlying implementation must ensure that commit calls on already pre committed transactions
 * will always succeed.
 *
 * <p>Usually implementation involves flushing the data.
 */
protected abstract void preCommit(TXN transaction) throws Exception;

/**
 * Commit a pre-committed transaction. If this method fail, Flink application will be
 * restarted and {@link TwoPhaseCommitSinkFunction#recoverAndCommit(Object)} will be called again for the
 * same transaction.
 */
protected abstract void commit(TXN transaction);

/**
 * Abort a transaction.
 */
protected abstract void abort(TXN transaction);

TwoPhaseCommitSinkFunction保留了5个函数需要子类去实现:

  • invoke:定义了作为sink如何写数据到外部系统。每一个sink都需要定义invoke函数,sink算子每收到一条数据都会触发一次invoke函数,这里的sink函数只是多了一个transaction入参。
  • beginTransaction、preCommit、commit、abort:两阶段提交协议的几个步骤。如果外部系统本身支持两阶段提交(如Kafka),这些函数的实现就是调用外部系统两阶段提交协议对应的函数。

2. FlinkKafkaProducer011

了解了TwoPhaseCommitSinkFunction再来看FlinkKafkaProducer011就简单多了。

@Override
public void invoke(KafkaTransactionState transaction, IN next, Context context) throws FlinkKafka011Exception {
	checkErroneous();

	byte[] serializedKey = schema.serializeKey(next);
	byte[] serializedValue = schema.serializeValue(next);
	String targetTopic = schema.getTargetTopic(next);
	if (targetTopic == null) {
		targetTopic = defaultTopicId;
	}

	Long timestamp = null;
	if (this.writeTimestampToKafka) {
		timestamp = context.timestamp();
	}

	ProducerRecord<byte[], byte[]> record;
	int[] partitions = topicPartitionsMap.get(targetTopic);
	if (null == partitions) {
		partitions = getPartitionsByTopic(targetTopic, transaction.producer);
		topicPartitionsMap.put(targetTopic, partitions);
	}
	if (flinkKafkaPartitioner != null) {
		record = new ProducerRecord<>(
			targetTopic,
			flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions),
			timestamp,
			serializedKey,
			serializedValue);
	} else {
		record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
	}
	pendingRecords.incrementAndGet();
	transaction.producer.send(record, callback);
}

FlinkKafkaProducer011中实现的invoke函数就是将输入的消息(next)构造为一个Kafka record,并调用Kafka客户端的send方法发送出去。

@Override
protected KafkaTransactionState beginTransaction() throws FlinkKafka011Exception {
	switch (semantic) {
		case EXACTLY_ONCE:
			FlinkKafkaProducer<byte[], byte[]> producer = createTransactionalProducer();
			producer.beginTransaction();
			return new KafkaTransactionState(producer.getTransactionalId(), producer);
		case AT_LEAST_ONCE:
		case NONE:
			// Do not create new producer on each beginTransaction() if it is not necessary
			final KafkaTransactionState currentTransaction = currentTransaction();
			if (currentTransaction != null && currentTransaction.producer != null) {
				return new KafkaTransactionState(currentTransaction.producer);
			}
			return new KafkaTransactionState(initNonTransactionalProducer(true));
		default:
			throw new UnsupportedOperationException("Not implemented semantic");
	}
}

@Override
protected void preCommit(KafkaTransactionState transaction) throws FlinkKafka011Exception {
	switch (semantic) {
		case EXACTLY_ONCE:
		case AT_LEAST_ONCE:
			flush(transaction);
			break;
		case NONE:
			break;
		default:
			throw new UnsupportedOperationException("Not implemented semantic");
	}
	checkErroneous();
}

@Override
protected void commit(KafkaTransactionState transaction) {
	if (transaction.isTransactional()) {
		try {
			transaction.producer.commitTransaction();
		} finally {
			recycleTransactionalProducer(transaction.producer);
		}
	}
}

@Override
protected void abort(KafkaTransactionState transaction) {
	if (transaction.isTransactional()) {
		transaction.producer.abortTransaction();
		recycleTransactionalProducer(transaction.producer);
	}
}

FlinkKafkaProducer011中实现的beginTransaction、preCommit、commit、abort主要就是调用Kafka Producer客户端对应的两阶段提交协议的函数。另外值得注意的有2点:

  • 在preCommit函数中调用了flush方法。从TwoPhaseCommitSinkFunction的分析中可以看到preCommit是在snapshotState方法中调用的,而snapshotState方法是在算子Checkpoint的时候触发的。这样就保证了算子在做Checkpoint时,所有该Checkpoint之前的数据都已经安全的发送到了下游(而不是在缓存中)。以图三为例,sink算子在收到第一个Checkpoint barrier时触发Checkpoint操作,而在Checkpoint完成之前,必须保证m1-m5这5条消息都已经发送到了下游,否则如果Checkpoint完成,而m1-m5中有消息没有送达,就会发生消息丢失。在snapshotState方法中保证缓存中的数据都已经发送出去是一个很通用的做法,在自己实现定制化SinkFunction时也要注意。这里的flush方法最终调用的是Kafka Producer客户端的flush方法,这是一个阻塞的方法,会等到所有缓存中的消息真正发给Kafka才返回,所以有时看到Checkpoint时间有毛刺,也可能是受这个flush的影响。
  • 在beginTransaction里调用了getTransactionalId,在commit和abort中调用了recycleTransactionalProducer。这里可以回顾下第二部分中提到的如何生成Kafka transactional id的问题,看一下Flink是如何产生这个id的。从下面的代码中可以看出Flink用一个队列作为transactional id的Pool,新的Transaction开始时从队头拿出一个transactional id,Transaction结束时将transactional id放回队尾。因为每开始一个Transaction,都会构造一个新的Kafka Producer,因此availableTransactionalIds初始的大小就是配置的Kafka Producer Pool Size(默认是5)。
/**
 * Pool of available transactional ids.
 */
private final BlockingDeque<String> availableTransactionalIds = new LinkedBlockingDeque<>();
	
/**
 * For each checkpoint we create new {@link FlinkKafkaProducer} so that new transactions will not clash
 * with transactions created during previous checkpoints ({@code producer.initTransactions()} assures that we
 * obtain new producerId and epoch counters).
 */
private FlinkKafkaProducer<byte[], byte[]> createTransactionalProducer() throws FlinkKafka011Exception {
	String transactionalId = availableTransactionalIds.poll();
	if (transactionalId == null) {
		throw new FlinkKafka011Exception(
			FlinkKafka011ErrorCode.PRODUCERS_POOL_EMPTY,
			"Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.");
	}
	FlinkKafkaProducer<byte[], byte[]> producer = initTransactionalProducer(transactionalId, true);
	producer.initTransactions();
	return producer;
}

private void recycleTransactionalProducer(FlinkKafkaProducer<byte[], byte[]> producer) {
	availableTransactionalIds.add(producer.getTransactionalId());
	producer.close();
}

四、总结

Flink使用Kafka的消息事务实现的端到端Exactly Once消息送达,其实是一个比较通用的解决方案,了解了其原理,可以很快将这种方案套用到其他支持事务的外部存储或消息队列。

Flink使用Kafka事务的方式,对于业务开发中正确使用Kafka也是一个很好的demo,在其他工程中使用Kafka实现消息的强一致性,也可以借鉴Flink的代码。

参考文献

1 Kafka 设计解析(八):Kafka 事务机制与 Exactly Once 语义实现原理。https://www.infoq.cn/article/kafka-analysis-part-8

2 Exactly Once Delivery and Transactional Messaging in Kafka. https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit

3 An Overview of End-to-End Exactly-Once Processing in Apache Flink® (with Apache Kafka, too!). https://www.ververica.com/blog/end-to-end-exactly-once-processing-apache-flink-apache-kafka

本站文章资源均来源自网络,除非特别声明,否则均不代表站方观点,并仅供查阅,不作为任何参考依据!
如有侵权请及时跟我们联系,本站将及时删除!
如遇版权问题,请查看 本站版权声明
THE END
分享
二维码
海报
Flink源码走读(二):Flink+Kafka实现端到端Exactly Once语义
Flink通过Checkpoint机制实现了消息对状态影响的Exactly Once语义,即每条消息只会影响Flink内部状态有且只有一次。但无法保证输出到Si...
<<上一篇
下一篇>>