事务消息 Transactional-Message-Based Distributed Transactions
事务消息就是今天文章的主角了,它主要是适用于异步更新的场景,并且对数据实时性要求不高的地方。
它的目的是为了解决消息生产者与消息消费者的数据一致性问题。
交互流程
事务消息交互流程如下图所示。
- The producer sends a message to a MQ broker.
- The MQ broker saves the message and marks it as not ready for delivery. A message in this state is called a half message. After that, the broker sends an acknowledgment message (ACK) back to the producer.
- The producer executes the local transaction.
- The producer sends a second ACK to the broker to submit the execution result of the local transaction. The execution result may be
Commit
orRollback
.- If the status of the message received by the broker is
Commit
, the broker marks the half message as deliverable and delivers the message to the consumer. - If the status of the message received by the broker is
Rollback
, the broker rolls back the transaction and does not deliver the half message to the consumer.
- If the status of the message received by the broker is
- If the network is disconnected or the producer application is restarted and the broker does not receive a second ACK or the status of the half message is Unknown, the broker waits a period of time and sends a request to a producer in the producer cluster to query the status of the half message.
- After the producer receives the request, the producer checks the execution result of the local transaction that corresponds to the half message.
- The producer sends another ACK to the MQ broker based on the execution result of the local transaction. Then, the broker processes the half message by following Step 4.
事务消息回查步骤如下:
- 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。
Scenarios
Scenario1
比如你点外卖,我们先选了炸鸡加入购物车,又选了瓶可乐,然后下单,付完款这个流程就结束了。
而购物车里面的数据就很适合用消息通知异步删除,因为一般而言我们下完单不会再去点开这个店家的菜单,而且就算点开了购物车里还有这些菜品也没有关系,影响不大。
我们希望的就是下单成功之后购物车的菜品最终会被删除,所以要点就是下单和发消息这两个步骤要么都成功要么都失败。
Scenario2
以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括:
- 主分支订单系统状态更新:由未支付变更为支付成功。
- 物流系统状态新增:新增待发货物流记录,创建订单物流记录。
- 积分系统状态变更:变更用户积分,更新用户积分表。
- 购物车系统状态变更:清空购物车,更新用户购物车记录。
RocketMQ 事务消息
我们先来看一下 RocketMQ 是如何实现事务消息的。
- The producer sends a message to a MQ broker.
- The MQ broker saves the message and marks it as not ready for delivery. A message in this state is called a half message. After that, the broker sends an acknowledgment message (ACK) back to the producer.
- The producer executes the local transaction.
- The producer sends a second ACK to the broker to submit the execution result of the local transaction. The execution result may be
Commit
orRollback
.- If the status of the message received by the broker is
Commit
, the broker marks the half message as deliverable and delivers the message to the consumer. - If the status of the message received by the broker is
Rollback
, the broker rolls back the transaction and does not deliver the half message to the consumer.
- If the status of the message received by the broker is
- If the network is disconnected or the producer application is restarted and the broker does not receive a second ACK or the status of the half message is Unknown, the broker waits a period of time and sends a request to a producer in the producer cluster to query the status of the half message.
- After the producer receives the request, the producer checks the execution result of the local transaction that corresponds to the half message.
- The producer sends another ACK to the MQ broker based on the execution result of the local transaction. Then, the broker processes the half message by following Step 4.
Note
- RocketMQ 的事务消息也可以被认为是一个两阶段提交,简单的说就是在事务开始的时候会先发送一个半消息给 Broker。
- 半消息的意思是,这个消息此时对 Consumer 是不可见的,而且也不是存在真正要发送的队列中,而是一个特殊队列。
- 发送完半消息之后再执行本地事务,再根据本地事务的执行结果来决定是向 Broker 发送提交消息,还是发送回滚消息。
此时有人说这一步发送提交或者回滚消息失败了怎么办?
- 影响不大,Broker 会定时的向 Producer 来反查这个事务是否成功,具体的就是 Producer 需要暴露一个接口,通过这个接口 Broker 可以得知事务到底有没有执行成功,没成功就返回未知,因为有可能事务还在执行,会进行多次查询。
- 如果成功那么就将半消息恢复到正常要发送的队列中,这样消费者就可以消费这条消息了。
事务消息生命周期
- 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态。
- 事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。
- 消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。
- 提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。
- 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。具体信息,请参见消费重试。
- 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
- 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见消息存储和清理机制。
Kafka 事务消息
Kafka 的事务消息和 RocketMQ 的事务消息又不一样了,RocketMQ 解决的是本地事务的执行和发消息这两个动作满足事务的约束。
而 Kafka 事务消息则是用在一次事务中需要发送多个消息的情况,保证多个消息之间的事务约束,即多条消息要么都发送成功,要么都发送失败。
Kafka 的事务有事务协调者角色,事务协调者其实就是 Broker 的一部分。
在开始事务的时候,生产者会向事务协调者发起请求表示事务开启,事务协调者会将这个消息记录到特殊的日志-事务日志中,然后生产者再发送真正想要发送的消息,这里 Kafka 和 RocketMQ 处理不一样,Kafka 会像对待正常消息一样处理这些事务消息,由消费端来过滤这个消息。
然后发送完毕之后生产者会向事务协调者发送提交或者回滚请求,由事务协调者来进行两阶段提交,如果是提交那么会先执行预提交,即把事务的状态置为预提交然后写入事务日志,然后再向所有事务有关的分区写入一条类似事务结束的消息,这样消费端消费到这个消息的时候就知道事务好了,可以把消息放出来了。
最后协调者会向事务日志中再记一条事务结束信息,至此 Kafka 事务就完成了,我拿 confluent.io 上的图来总结一下这个流程。
Reference
- https://rocketmq.apache.org/zh/docs/featureBehavior/04transactionmessage/
- https://juejin.cn/post/6844904106532962311
- https://help.aliyun.com/document_detail/43348.html
- https://xie.infoq.cn/article/53240651a2ef7c173f50a3194