RocketMQ事务消息

RocketMQ当本地事务失败后返回RocketMQLocalTransactionState.ROLLBACK.
但是MQ还会再检查我的本地事务有没有成功
我在网上看的如果RollBack了消息就会删除.
所以很疑惑,请大佬指教

package com.lm.bank1.message;

import com.alibaba.fastjson.JSONObject;
import com.lm.bank1.entity.TxInfoLog;
import com.lm.bank1.model.AccountChangeEvent;
import com.lm.bank1.service.AccountInfoService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

/**
 * @author lm
 */
@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "producer_1")
public class ProducerListener implements RocketMQLocalTransactionListener {

    @Autowired
    private AccountInfoService accountInfoService;

    /**
     * 事务消息发送后的回调方法,当消息发送给mq成功,此方法被回调
     *
     * @param msg
     * @param arg
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            String msgStr = new String((byte[]) msg.getPayload());
            JSONObject jsonObject = JSONObject.parseObject(msgStr);
            String str = jsonObject.getString("accountChangeEvent");
            AccountChangeEvent accountChangeEvent = JSONObject.parseObject(str, AccountChangeEvent.class);
                 //1.假设业务代码执行失败,事务回滚,抛出异常--------------------------------
            accountInfoService.doUpdate(accountChangeEvent);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            e.printStackTrace();
              //2.业务代码执行失败后给mq返回rollback---------------------------
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    /**
     * mq检查本地事务是否成功
     * @param msg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        System.out.println("检查本地事务");
        String msgStr = new String((byte[]) msg.getPayload());
        JSONObject jsonObject = JSONObject.parseObject(msgStr);
        String str = jsonObject.getString("accountChangeEvent");
        AccountChangeEvent accountChangeEvent = JSONObject.parseObject(str, AccountChangeEvent.class);
        String txNo = accountChangeEvent.getTxNo();
        TxInfoLog txInfoLog = accountInfoService.checkTx(txNo);
        if (null != txInfoLog && txInfoLog.getIsSuccess()) {
            return RocketMQLocalTransactionState.COMMIT;
        } else {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}

这样也看不出来啥问题。你开DEBUG日志,打个断点啥的看看。确定你是给MQ返回了 RocketMQLocalTransactionState.COMMIT; 也要确定MQ收到了这条消息。

我是模拟的本地事务失败了,给MQ发送的rollback.
我打过断点,如果发送的是commit,mq是可以收到消息,消费端进行消费.mq也不会触发本地事务回查

听起来像是一个BUG,你可以把代码整理一下,去官方的Github提问看看。我对RocketMQ也只是一知半解。

好的,谢谢你