RocketMQ当本地事务失败后返回RocketMQLocalTransactionState.ROLLBACK.
但是MQ还会再检查我的本地事务有没有成功
我在网上看的如果RollBack了消息就会删除.
所以很疑惑,请大佬指教
package com.lm.bank1.message;
import com.alibaba.fastjson.JSONObject;
import com.lm.bank1.entity.TxInfoLog;
import com.lm.bank1.model.AccountChangeEvent;
import com.lm.bank1.service.AccountInfoService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
/**
* @author lm
*/
@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "producer_1")
public class ProducerListener implements RocketMQLocalTransactionListener {
@Autowired
private AccountInfoService accountInfoService;
/**
* 事务消息发送后的回调方法,当消息发送给mq成功,此方法被回调
*
* @param msg
* @param arg
* @return
*/
@Transactional(rollbackFor = Exception.class)
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
String msgStr = new String((byte[]) msg.getPayload());
JSONObject jsonObject = JSONObject.parseObject(msgStr);
String str = jsonObject.getString("accountChangeEvent");
AccountChangeEvent accountChangeEvent = JSONObject.parseObject(str, AccountChangeEvent.class);
//1.假设业务代码执行失败,事务回滚,抛出异常--------------------------------
accountInfoService.doUpdate(accountChangeEvent);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
e.printStackTrace();
//2.业务代码执行失败后给mq返回rollback---------------------------
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* mq检查本地事务是否成功
* @param msg
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
System.out.println("检查本地事务");
String msgStr = new String((byte[]) msg.getPayload());
JSONObject jsonObject = JSONObject.parseObject(msgStr);
String str = jsonObject.getString("accountChangeEvent");
AccountChangeEvent accountChangeEvent = JSONObject.parseObject(str, AccountChangeEvent.class);
String txNo = accountChangeEvent.getTxNo();
TxInfoLog txInfoLog = accountInfoService.checkTx(txNo);
if (null != txInfoLog && txInfoLog.getIsSuccess()) {
return RocketMQLocalTransactionState.COMMIT;
} else {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
}