xiaoming728

xiaoming728

Spring Boot整合RocketMQ之事务消息

2023-12-11
Spring Boot整合RocketMQ之事务消息

上次简单的了解了一下在Spring Boot下通过使用rocketmq-spring-boot-starter进行普通消息的发送、接收以及使用集群模式来模拟实现广播模式,文章链接。今天来学习一下RocketMQ事务消息的发送。

RocketMQ的事务消息分为3种状态,分别是提交状态、回滚状态、中间状态:

TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。

TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。

TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。

当然因为在项目中我使用的是rocketmq-spring-boot-starter,所以表述上略有不同,但是本质是一样的。

事务消息在解决分布式事务的场景中感觉还是很有用的,虽然我们现在项目的分布式事务是通过Seata来实现的,但是通过事务消息或者消息的最终一次性也是可以的。

事务消息总共分为3个阶段:发送Prepared消息、执行本地事务、发送确认消息。这三个阶段是前后关联的,只有发送Prepared消息成功,才会执行本地事务,本地事务返回的状态是提交,那么就会发送最终的确认消息。如果在结束消息事务时,本地事务状态失败,那么Broker回查线程定时(默认1分钟)扫描每个存储事务状态的表格文件,如果是已经提交或者回滚的消息直接跳过,如果是Prepared状态则会向生产者发起一个检查本地事务的请求。

一、代码修改

首先我创建有一个Service来发送事务消息,代码没有什么特殊的含义,只是拿来当一个demo,代码如下:

public Boolean save(OrderEntity orderEntity) {
    Message<OrderEntity> message = MessageBuilder.withPayload(orderEntity).build();

    log.info(">>>> send tx message start,tx_group={},destination={},payload={} <<<<",TX_GROUP,ORDER_TOPIC + ORDER_TAG,orderEntity);
    TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction("tx_order","order_topic:" + "tx_tag",message,orderEntity.getUserName());
    String sendStatus = sendResult.getSendStatus().name();
    String localTXState = sendResult.getLocalTransactionState().name();
    log.info(">>>> send status={},localTransactionState={} <<<<",sendStatus,localTXState);
    return Boolean.TRUE;
}

使用RocketMQTemplate发送事务消息和普通消息略有不同的是,需要指一个事务生产者组,当然如果传入null,则会使用默认值rocketmq_transaction_default_global_name,发生消息的地址和普通消息一样都Topic:Tag,另外一点不同的是除了发生的Message之外,还可以发送其他的额外参数,不过这些参数只会在执行本地事务的时候会用到。

接下来我们创建一个消息的监听器(消费者),这个和普通消息的监听器一样,代码如下:

@Component
@RocketMQMessageListener(consumerGroup = "tx_consumer",topic = "order_topic")
public class OrderListener implements RocketMQListener<String>{

    @Override
    public void onMessage(String message) {
        log.info(">>>> message={} <<<<",message);
    }
}

除了消费者之外,我们还需要创建事务消息生产者端的消息监听器,注意是生产者,不是消费者,我们需要实现的是RocketMQLocalTransactionListener接口,代码如下:

@RocketMQTransactionListener(txProducerGroup = "tx_order")
public class OrderTXMsgListener implements RocketMQLocalTransactionListener {

    @Autowired
    private UserRepository userRepository;

    private static final Gson GSON = new Gson();

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        log.info(">>>> TX message listener execute local transaction, message={},args={} <<<<",msg,arg);
        // 执行本地事务
        RocketMQLocalTransactionState result = RocketMQLocalTransactionState.COMMIT;
        try {
            String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
            OrderEntity orderEntity = GSON.fromJson(jsonString, OrderEntity.class);
            String userName = (String) arg;
        } catch (Exception e) {
            log.error(">>>> exception message={} <<<<",e.getMessage());
            result = RocketMQLocalTransactionState.UNKNOWN;
        }
        return result;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        log.info(">>>> TX message listener check local transaction, message={} <<<<",msg.getPayload());
        // 检查本地事务
        RocketMQLocalTransactionState result = RocketMQLocalTransactionState.COMMIT;
        try {
            String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
            OrderEntity orderEntity = GSON.fromJson(jsonString, OrderEntity.class);
        } catch (Exception e) {
            // 异常就回滚
            log.error(">>>> exception message={} <<<<",e.getMessage());
            result = RocketMQLocalTransactionState.ROLLBACK;
        }
        return result;
    }
}

@RocketMQTransactionListener表明这个一个生产端的消息监听器,需要配置监听的事务消息生产者组。而实现RocketMQLocalTransactionListener接口,重写执行本地事务的方法和检查本地事务方法。下面,我们通过修改生产者端事务监听器的代码来观察代码的执行情况。

二、消息事务测试

首先还是正常的启动项目,在执行本地事务方法中正常情况下返回的值是COMMIT,即提交事务,这种情况下消费者会直接消费消息,而略过检查本地事务的方法。调用该接口,项目日志输出如下:

>>>> send tx message start,tx_group=tx_order,destination=order_topic:tx_tag,payload=OrderEntity(id=null, userName=lisi, price=8848.00, address=CN-SC-CD-05, createTime=null, updateTime=null, status=20) <<<<
>>>> TX message listener execute local transaction, message=GenericMessage [payload=byte[119], headers={rocketmq_TOPIC=order_topic, rocketmq_FLAG=0, rocketmq_TRANSACTION_ID=C0A800690C3418B4AAC2842438960000, rocketmq_TAGS=tx_tag, id=f32f4848-9acf-20bb-2501-0e6088765897, contentType=application/json, timestamp=1595749766307}],args=lisi <<<<
>>>> send status=SEND_OK,localTransactionState=COMMIT_MESSAGE <<<<
>>>> message={"id":null,"userName":"lisi","price":8848.00,"address":"CN-SC-CD-05","createTime":null,"updateTime":null,"status":"20"} <<<<

通过日志分析可以看出,在执行完本地事务方法之后,返回的本地事务状态是COMMIT_MESSAGE,接着消费者消费消息,和我们的预期是一样的。

接下来我们修改下执行本地事务的方法,让该方法返回状态为RocketMQLocalTransactionState.UNKNOWN,修改之后如下:

    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        log.info(">>>> TX message listener execute local transaction, message={},args={} <<<<",msg,arg);
        // 执行本地事务
        RocketMQLocalTransactionState result = RocketMQLocalTransactionState.COMMIT;
        try {
            String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
            OrderEntity orderEntity = GSON.fromJson(jsonString, OrderEntity.class);
            String userName = (String) arg;
            int r = 11 / 0;
        } catch (Exception e) {
            log.error(">>>> exception message={} <<<<",e.getMessage());
            result = RocketMQLocalTransactionState.UNKNOWN;
        }
        return result;
    }

这样因为发生异常,该方法返回的结果是UNKNOWN,根据上文的分析,执行本地事务方法之后应该会执行检查本地事务方法,重启项目之后,再次调用一下接口,查看日志输出如下:

>>>> send tx message start,tx_group=tx_order,destination=order_topic:tx_tag,payload=OrderEntity(id=null, userName=zhangsan, price=90001.00, address=CN-SC-CD-02, createTime=null, updateTime=null, status=10) <<<<
>>>> TX message listener execute local transaction, message=GenericMessage [payload=byte[124], headers={rocketmq_TOPIC=order_topic, rocketmq_FLAG=0, rocketmq_TRANSACTION_ID=C0A800690E9D18B4AAC2842BF39A0000, rocketmq_TAGS=tx_tag, id=dfd215f4-2aa6-f377-d1a7-ebbe3875769a, contentType=application/json, timestamp=1595750272928}],args=zhangsan <<<<
>>>> exception message=/ by zero <<<<
>>>> send status=SEND_OK,localTransactionState=UNKNOW <<<<
HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=1m22s578ms430µs170ns).
>>>> TX message listener check local transaction, message=GenericMessage [payload=byte[124], headers={rocketmq_QUEUE_ID=0, TRANSACTION_CHECK_TIMES=1, rocketmq_TAGS=tx_tag, rocketmq_BORN_TIMESTAMP=1595750272923, rocketmq_TOPIC=order_topic, rocketmq_FLAG=0, rocketmq_MESSAGE_ID=C0A8006900002A9F00000000000156AB, rocketmq_TRANSACTION_ID=C0A800690E9D18B4AAC2842BF39A0000, rocketmq_SYS_FLAG=0, id=ea3c3a7a-23c6-5acf-4c0f-0fa42f795b41, rocketmq_BORN_HOST=192.168.0.105, contentType=application/json, timestamp=1595750310890}] <<<<
>>>> TX message listener check local transaction, message=GenericMessage [payload=byte[124], headers={rocketmq_QUEUE_ID=0, TRANSACTION_CHECK_TIMES=2, rocketmq_TAGS=tx_tag, rocketmq_BORN_TIMESTAMP=1595750272923, rocketmq_TOPIC=order_topic, rocketmq_FLAG=0, rocketmq_MESSAGE_ID=C0A8006900002A9F0000000000015892, rocketmq_TRANSACTION_ID=C0A800690E9D18B4AAC2842BF39A0000, rocketmq_SYS_FLAG=0, id=cddfa35c-c8b2-cb1b-dce7-a26c6888b99a, rocketmq_BORN_HOST=192.168.0.105, contentType=application/json, timestamp=1595750374536}] <<<<
>>>> message={"id":null,"userName":"zhangsan","price":90001.00,"address":"CN-SC-CD-02","createTime":null,"updateTime":null,"status":"10"} <<<<
>>>> message={"id":null,"userName":"zhangsan","price":90001.00,"address":"CN-SC-CD-02","createTime":null,"updateTime":null,"status":"10"} <<<<

根据日志输出,在Service中返回的事务消息发送状态是SEND_OK,但是返回的本地事务状态是UNKNOW,所以需要执行检查本地事务方法,但是这里出现了一个问题就是检查本地事务方法执行了两次,而且事务消息也被消费了两次,感觉有点不正常了,但是检查发现两条信息日志中rocketmq_TRANSACTION_ID是一样的,这是什么情况??会不会和HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=1m22s578ms430µs170ns).有关呢,因为当时自己使用的DEBUG模式,看代码停留了一段时间,这样导致Broker发起的第一个回查线程挂起,而这时Broker又启动了一个线程,从而执行了两次检查事务的代码,而该方法返回的是COMMIT,所以。

不使用DEBUG模式重新测试一下,日志如下:

>>>> send tx message start,tx_group=tx_order,destination=order_topic:tx_tag,payload=OrderEntity(id=null, userName=wangwu, price=9876.00, address=CN-SC-CD-00, createTime=null, updateTime=null, status=10) <<<<
>>>> TX message listener execute local transaction, message=GenericMessage [payload=byte[121], headers={rocketmq_TOPIC=order_topic, rocketmq_FLAG=0, rocketmq_TRANSACTION_ID=C0A800690E9D18B4AAC28432E4130005, rocketmq_TAGS=tx_tag, id=464edcfe-09c1-cc4a-5ac3-f3df888b0102, contentType=application/json, timestamp=1595750727701}],args=wangwu <<<<
>>>> exception message=/ by zero <<<<
>>>> send status=SEND_OK,localTransactionState=UNKNOW <<<<
>>>> TX message listener check local transaction, message=GenericMessage [payload=byte[121], headers={rocketmq_QUEUE_ID=3, TRANSACTION_CHECK_TIMES=1, rocketmq_TAGS=tx_tag, rocketmq_BORN_TIMESTAMP=1595750727699, rocketmq_TOPIC=order_topic, rocketmq_FLAG=0, rocketmq_MESSAGE_ID=C0A8006900002A9F0000000000016109, rocketmq_TRANSACTION_ID=C0A800690E9D18B4AAC28432E4130005, rocketmq_SYS_FLAG=0, id=77765356-fc4d-6d05-3531-6a67fbbed7f7, rocketmq_BORN_HOST=192.168.0.105, contentType=application/json, timestamp=1595750790917}] <<<<
>>>> message={"id":null,"userName":"wangwu","price":9876.00,"address":"CN-SC-CD-00","createTime":null,"updateTime":null,"status":"10"} <<<<

这里输出的日志信息又没有问题了,我个人认为上面应该就是DEBUG导致的,这里就不再探讨了。

接下来测试一下,在执行本地事务方法中返回ROLLBACK的情况,这里代码就省略了,直接返回ROLLBACK。日志输出如下:

>>>> send tx message start,tx_group=tx_order,destination=order_topic:tx_tag,payload=OrderEntity(id=null, userName=zhaoliu, price=10000.00, address=CN-SC-CD-03, createTime=null, updateTime=null, status=10) <<<<
>>>> TX message listener execute local transaction, message=GenericMessage [payload=byte[123], headers={rocketmq_TOPIC=order_topic, rocketmq_FLAG=0, rocketmq_TRANSACTION_ID=C0A800691A3A18B4AAC284F72B910000, rocketmq_TAGS=tx_tag, id=d5b24a82-8d8b-90ad-7322-adfe2c4f3026, contentType=application/json, timestamp=1595763591062}],args=zhaoliu <<<<
>>>> exception message=/ by zero <<<<
>>>> send status=SEND_OK,localTransactionState=ROLLBACK_MESSAGE <<<<

没有执行检验本地事务的方法,和之前说的一样。到这里我觉得应该基本上可以明白生产者端消息监听器中两个方法的具体作用了,主要还是理解RocketMQ事务消息的基本原理。

校验本地事务方法的返回值和执行本地事务方法的返回值的作用是一样的,这里就不再测试了。

网上找了一个图,感觉非常的直观:

来源:简书 -非典型_程序员

链接:https://www.jianshu.com/p/4d6329281a1e