分布式事务(5)---最终一致性方案之可靠消息

2021-12-14

 

分布式事务(1)-理论基础

分布式事务(2)---强一致性分布式事务解决方案

分布式事务(3)---强一致性分布式事务Atomikos实战

分布式事务(4)---最终一致性方案之TCC

 

可靠消息最终一致性是解决分布式事务中一种典型的柔性方案。通常有两种实现方式,一种是基于本地消息表,一种是基于RocketMQ的事务消息。需要注意发送消息的一致性和消息的可靠性。

基本原理:

事务发起方执行本地事务成功后发出一条消息,事务参与方也就是消息的消费者,接收到消息并执行成功本地事务。这样来达到数据的最终一致性。

需要注意发起方一定能够将消息发送出去,参与方一定能成功接收到消息。这样来确保消息的可靠性。否则同样会出现分布式事务问题。

本地消息表

为了防止在使用消息一致性方案时,出现消息丢失,可以使用本地消息表来保证消息的发送。通过本地事务将业务数据和消息写入本地数据库,这一步操作是本地事务可以保证消息表必然会写入数据。然后通过定时任务读物本地消息表中的数据,将消息发送给消息中间件。如果发送失败,进行重试,因此还涉及到幂等操作。消费方接收到消息之后,执行业务(本地事务)成功,则完成分布式事务,若失败则进行重试。如果多次任然失败,则通知事务发起方进行事务回滚。

 

方案存在如下缺点:

1.消息表耦合在业务库中,需要额外的处理发送消息的操作,不利于消息的扩展,同事如果消息表堆积了大量消息数据,会对业务操作产生一定的性能影响。

2.消息发送失败需要重试,需要保证操作的相关操作的幂等

3.多次重试依然失败需要人工干预

4.消息服务与业务耦合,不利于消息服务的扩展。

 

RocketMQ事务消息

RocketMQ在4.3版本后引入了完整的事务消息机制,其内部实现了本地消息表的逻辑,使用其事务消息极大的减轻了开发的工作量。

在RocketMQ中,producer和broker具有双向通信能力,使得broker自然的具备了事务协调者的能力。

RocketMQ事务消息分布式事务解决方案原理图:

 

 

 

 

 

 

 roketMQ事务消息案例,官方复制粘贴:

事务消息共有三种状态,提交状态、回滚状态、中间状态:

  • TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
  • TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
  • TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。

1、创建事务性生产者 使用 TransactionMQProducer类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class TransactionProducer { public static void main(String[] args) throws MQClientException, InterruptedException { TransactionListener transactionListener = new TransactionListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 10; i++) { try { Message msg = new Message("TopicTest1234", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); Thread.sleep(10); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } } for (int i = 0; i < 100000; i++) { Thread.sleep(1000); } producer.shutdown(); } } 2、实现事务的监听接口 当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务。它返回前一节中提到的三个事务状态之一。checkLocalTransaction 方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。 public class TransactionListenerImpl implements TransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { int value = transactionIndex.getAndIncrement(); int status = value % 3; localTrans.put(msg.getTransactionId(), status); return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { Integer status = localTrans.get(msg.getTransactionId()); if (null != status) { switch (status) { case 0: return LocalTransactionState.UNKNOW; case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; } } 3. 事务消息使用上的限制 1.事务消息不支持延时消息和批量消息。 2.为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener 类来修改这个行为。 3.事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。 4.事务性消息可能不止一次被检查或消费。 5.提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。 6.事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。

  

消息发送的一致性

消息发送的一致性指的事务发起方执行本地事务成功则一定能把其产生的消息发送出去。这里涉及到消息发送与确认机制,消息发送的不可靠性,如何保证消息发送的一致性。

消息发送与确认机制:

常规中间的消息发送与确认机制如下:

1.生产者执行本地事务,然后将消息发送到MQ,这里可以是同步或者异步

2.MQ接收到消息后,将消息数据持久化到磁盘。这个MQ都会提供相应的配置

3.MQ向生产者返回发送结果(消息状态或者异常)

4.消费者监听消费消息

5.消费者执行本地事务

6.消费者向消息MQ确认消费消息

这种流程一般来说无法保证消息发送的一致性。

消息发送如何不一致:

1.先操作数据库,再发送消息。数据库写入了,但消息可能没有发送出去,事务参与方就没有消息可消费

    public void tx() {
        //1.执行业务
        //2.发送消息
    }

2.先发消息,在操作库。消息发出去,但是本地事务执行失败,参与方可以执行业务,但是发起方没有执行业务

    public void tx() {
        //1.发送消息
        //2.执行业务
    }

3.同一事务中,先发消息,再操作库。和第二点一样,事务回滚无法控制消息的回滚

    @Transactional
    public void tx() {
        //1.发送消息
        //2.执行业务
    }

4.同一事务中,先操作库,再发送消息。这种看似正常,数据保存成功,消息发送失败,事务会回滚。但是如果事务执行成功,消息发送成功,由于网络原因,导致发送消息相应超时,抛出异常回滚了事务,这个时候消息可能已经被事务参与方消费了,并执行了业务。所以还是需要发送确认机制。流程参考上面RocketMQ事务消息流程图

    @Transactional
    public void tx() {
        //1.执行业务
        //2.发送消息
    }

 

消息接收的一致性:

消息接收与确认

 消息接收的一致性在一定程度上需要满足消息的接收与确认机制:

1.MQ向消费方投递消息

2.消费方收到消息,执行本地事务,执行成功/失败,将结果发送给MQ

3.中间件处理消费者发来的结果,成功则清除消息记录,失败则根据不同的情况处理,比如rabbitMQ,可以设置重回队列

4.MQ投递消息失败会进行重试,多次投递失败,将消息转入死信队列,以便后面人工处理

5.消费方执行完业务,如果如法将结果发送给MQ,同样应该引入重试机制,比如另起线程,扫表数据状态,将结果发送给MQ

需要注意:1.消息接收接口需要保证幂等;2.涉及到重试,最好设置重试次数,以免进入死循环。

 

消息接收不一致:

1.接收消息的接口没有幂等,如果消息重复投递则会导致数据不一致。

2.消费者可能无法接收消息,此时MQ并没有重试投递,导致事务参与方业务没有执行,引起数据不一致

3.消费者执行完本地业务后,无法将结果反馈给MQ,MQ无法正确的处理消息,进行了重试,消费接口又没有幂等导致数据一不一致

如何保证消息接收的一致性:

1.限制MQ消息投递重试的最大次数

2.消息接收接口保证幂等

3.事务参与方与MQ之间需要确认机制

4.失败的消息转入私信队列,后续人工干预处理