如何利用补偿机制减少事务使用频次
来源:juejin.cn/post/7304973928039710761
推荐:https ://t.zsxq.com/17W63bYix
前言
最近接手了公司的ERP系统,大体框架用的是目前比较流行的spring cloud alibaba那一套,设计上甚至还用上了DDD的思想。初看项目可能不错,对于两三年前的设计思路而言确实算得上跟上了【潮流】。但【外观】再美,实际接手后却有点无法直视,结合日常系统表现(动不动的报空指针错误、几乎一个月一次的OOM以及隔三岔五的修补数据),有这样糟糕的业务实现不出问题反而是不可思议的。说实话,当有幸见识到这堆代码的【风采】后,已经萌生了提桶跑路的想法,奈何如今已是39岁高龄,又早已不是孑然一身,迫于生活不得不平复心情继续维持着这只有两年看起来却十分有年代感的代码。
主题
第一个显而易见的问题就是对事务的“滥用”,在我看来确实已经算是滥用的程度了。“前人们”可能对事务的看法过于简单,几乎所有能修改的地方都会加上事务。即使里面存在大批量的查询语句,即使使用的注解式事务@Transactional(rollbackFor = Exception.class)并不能生效,但可能怀着【加上总归是好的】的想法,导致现在通篇的代码都能看到这个注解。于是一来,就和现在其他几个小伙伴分享过,事务真的不能用得如此随心所欲。
开始正文
事务是必须的吗
我觉得首先要思考的是,你的业务真的需要事务吗?
其实,大部分的业务对事务的要求并没有你想象中的那么高,更不用说更复杂的分布式事务。就我现在的系统而言,很多业务都有主单和细单的概念,而状态值往往跟随在主单上。我的意思是,编写代码的时候先保存细单再保存主单,主单保存失败了又能怎么样呢,只是多了一份脏数据而已(先不讨论这种数据怎么处理),对于业务的体验上好像并没有什么影响。又比如审批,我们的审批需要调钉钉的审批功能,然而调用时为了保持事务一致,将调钉钉审批功能写在了事务里,成功后再更新审批状态为审批中,但其实完全没必要等钉钉的审批接口返回,毕竟第三方的都是【不可控的】
如何填坑
在看了无数这种操作后,我决定在我们后端开发小组里引入补偿的设计(其实大部分成熟的系统都有很好的补偿机制)。
解决思路
-
很多业务分析下来其实并不需要事务,也不一定需要严格的实时性,保证【准实时性】即可
-
需要共有的日志去记录甚至能重试失败操作即可
上机操作
第一步,利用 MethodInterceptor 抽离需要处理的方法。
public <T> T proxy(T t) {
Enhancer enhancer = new Enhancer();
enhancer.setSuperclass(t.getClass());
enhancer.setCallback(this);
return (T) enhancer.create();
}
@Override
public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable {
if (BooleanUtils.isFalse(isAsync)) {
return methodProxy.invokeSuper(o, objects);
}
Map<String, AsyncDTO> proxyMethodList = AsyncProxyUtil.asyncMethod.get(method.getDeclaringClass().getName());
if (Objects.isNull(proxyMethodList) || CollectionUtils.isEmpty(proxyMethodList)) {
return methodProxy.invokeSuper(o, objects);
}
if (Objects.isNull(proxyMethodList.get(method.getName()))) {
return methodProxy.invokeSuper(o, objects);
}
Parameter[] parameters = method.getParameters();
if (Objects.isNull(parameters) || parameters.length == 0) {
throw new BusinessException(ErrorCode.FAIL, "["+ method.getDeclaringClass().getName() +"]方法["+ method.getName() +"] 缺少bizCode参数");
}
String bizCode = null;
List<AsyncParamDTO> paramDTOList = new ArrayList<>();
for (int i = 0; i < parameters.length; i++) {
AsyncParamDTO asyncParamDTO = new AsyncParamDTO();
if (Objects.isNull(objects[i])) {
asyncParamDTO.setValue(null);
} else {
asyncParamDTO.setValue(objects[i]);
}
asyncParamDTO.setType(parameters[i].getType().getName());
if (parameters[i].isAnnotationPresent(AsyncBizCode.class)) {
bizCode = String.valueOf(objects[i]);
}
paramDTOList.add(asyncParamDTO);
}
if (StringUtils.isEmpty(bizCode)) {
throw new BusinessException(ErrorCode.FAIL, "["+ method.getDeclaringClass().getName() +"]方法["+ method.getName() +"] 缺少bizCode参数");
}
AsyncDTO asyncDTO = proxyMethodList.get(method.getName());
AsyncMsgDTO asyncMsgDTO = new AsyncMsgDTO();
asyncMsgDTO.setId(IdWorker.getId());
asyncMsgDTO.setValue(asyncDTO.getValue());
asyncMsgDTO.setBeanName(asyncDTO.getBeanName());
asyncMsgDTO.setBizCode(bizCode);
asyncMsgDTO.setParamList(paramDTOList);
asyncMsgDTO.setMethodName(method.getName());
Boolean send = aliRocketMqProducer.send(JSON.toJSONString(asyncMsgDTO), this.getTagName(), UUID.randomUUID().toString());
if (BooleanUtils.isFalse(send)) {
throw new BusinessException(ErrorCode.FAIL, "异步消息发送失败");
}
return null;
}
利用aop的特性,获取方法名称、参数、类和业务信息等,并保存到库里,同时发送消息实现异步处理的效果。这里的mq用的是阿里云的rocket mq。库用的是mysql库,当然也可用其他的库去存储,如mongodb等。
第二步,解析消息并执行方法。
public void doAsyncMsg(AsyncMsgDTO dto, boolean isDebug) {
AsyncLog asyncLog = asyncLogService.lambdaQuery().eq(AsyncLog::getId, dto.getId()).one();
if (Objects.isNull(asyncLog)) {
asyncLog = new AsyncLog();
asyncLog.setId(dto.getId());
asyncLog.setNum(1);
asyncLog.setGmtCreate(new Date());
} else {
if (asyncLog.getNum() > 6 && !isDebug) {
return;
}
asyncLog.setNum(asyncLog.getNum() + 1);
}
asyncLog.setParams(JSONObject.toJSONString(dto.getParamList()));
asyncLog.setBizCode(dto.getBizCode());
asyncLog.setValue(dto.getValue());
asyncLog.setBeanName(dto.getBeanName());
asyncLog.setMethodName(dto.getMethodName());
asyncLog.setRequestDate(new Date());
asyncLogService.saveOrUpdate(asyncLog);
try {
if (StringUtils.isEmpty(dto.getBeanName())) {
throw new BusinessException(ErrorCode.FAIL, "未设置bean名称");
}
if (StringUtils.isEmpty(dto.getMethodName())) {
throw new BusinessException(ErrorCode.FAIL, "未设置需要执行的方法");
}
Object object = applicationContext.getBean(dto.getBeanName());
if (Objects.isNull(object)) {
throw new BusinessException(ErrorCode.FAIL, "未获取到对象");
}
Class[] classArr = new Class[dto.getParamList().size()];
if (!CollectionUtils.isEmpty(dto.getParamList())) {
for (int i = 0; i < dto.getParamList().size(); i++) {
Class paramClass = Class.forName(dto.getParamList().get(i).getType());
classArr[i] = paramClass;
}
}
Object[] param = new Object[dto.getParamList().size()];
Method method = ReflectionUtils.findMethod(object.getClass(), dto.getMethodName(), classArr);
if (Objects.isNull(method)) {
throw new BusinessException(ErrorCode.FAIL, "未获取到需要执行的方法");
}
if (!CollectionUtils.isEmpty(dto.getParamList())) {
Parameter[] parameters = method.getParameters();
for (int i = 0; i < parameters.length; i++) {
Object value = dto.getParamList().get(i).getValue();
Class<?> paramClass = parameters[i].getType();
if (Objects.isNull(value)) {
param[i] = null;
} else if (paramClass == Long.class) {
param[i] = Long.parseLong(String.valueOf(value));
} else if (paramClass == BigDecimal.class) {
param[i] = new BigDecimal(String.valueOf(value));
} else if (paramClass == Integer.class) {
param[i] = value;
} else if (paramClass.isEnum()) {
Object[] enumConstants = parameters[i].getType().getEnumConstants();
Optional<Object> select = Arrays.stream(enumConstants)
.filter(item -> item.toString().equalsIgnoreCase(String.valueOf(value)))
.findFirst();
if (select.isPresent()) {
param[i] = select.get();
}
} else if (paramClass == String.class) {
param[i] = value;
} else if (paramClass == List.class) {
Type genericType = parameters[i].getParameterizedType();
ParameterizedType pt = (ParameterizedType) genericType;
Class<?> genericClazz = (Class<?>)pt.getActualTypeArguments()[0];
param[i] = JSON.parseArray(((JSONArray)value).toJSONString(), genericClazz);
} else if (paramClass == Set.class) {
Type genericType = parameters[i].getParameterizedType();
param[i] = JSON.parseObject(((JSONArray)value).toJSONString(), genericType);
} else if (paramClass == Map.class) {
Type genericType = parameters[i].getParameterizedType();
param[i] = JSON.parseObject(((JSONObject)value).toJSONString(), genericType);
} else if (paramClass == Date.class) {
param[i] = new Date( Long.parseLong(String.valueOf(value)));
} else {
param[i] = JSONObject.toJavaObject((JSONObject) value, paramClass);
}
}
}
method.invoke(object, param);
} catch (InvocationTargetException ex) {
log.error("执行异步消息方法失败===={}===={}", dto.getBizCode(), ex.getTargetException());
asyncLog.setResult(ex.getTargetException().getMessage());
asyncLogService.saveOrUpdate(asyncLog);
throw new BusinessException(ErrorCode.FAIL, ex.getTargetException().getMessage());
} catch (Exception e) {
log.error("执行异步消息方法失败===={}===={}", dto.getBizCode(), e);
asyncLog.setResult(e.getMessage());
asyncLogService.saveOrUpdate(asyncLog);
throw new BusinessException(ErrorCode.FAIL, "执行异步消息方法失败");
}
asyncLog.setResult("执行成功");
asyncLog.setReturnDate(new Date());
asyncLogService.saveOrUpdate(asyncLog);
}
mq的重试,6次过后不再重试,需要手动处理
第三步,新增自定义注解和配置。
业务类型code,用于跟踪处理日志
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PARAMETER})
public @interface AsyncBizCode {
}
标记为需要处理的方法
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface AsyncEnable {
String value() default "";
}
解析方法参数的实体
@Data
public class AsyncMsgDTO {
private Long id;
private String beanName;
private String value;
private String bizCode;
private List<AsyncParamDTO> paramList;
private String methodName;
}
配置信息,用于启动时保存处理方法的信息集合
@Configuration
@Slf4j
public class AsyncConfig implements CommandLineRunner {
@Resource
private ApplicationContext applicationContext;
@Override
public void run(String... args) throws Exception {
Map<String, Object> asyncClass = this.applicationContext.getBeansWithAnnotation(AsyncEnable.class);
for (Map.Entry<String, Object> entry : asyncClass.entrySet()) {
Class clazz = entry.getValue().getClass();
Method[] methods = entry.getValue().getClass().getDeclaredMethods();
List<String> methodList = new ArrayList<>();
Map<String, AsyncDTO> methodDtoMap = new HashMap<>();
for (Method method: methods) {
if (!method.isAnnotationPresent(AsyncEnable.class)) {
continue;
}
AsyncEnable asyncEnable = method.getAnnotation(AsyncEnable.class);
methodList.add(method.getName());
AsyncDTO asyncDTO = new AsyncDTO();
asyncDTO.setBeanName(entry.getKey());
asyncDTO.setValue(asyncEnable.value());
methodDtoMap.put(method.getName(), asyncDTO);
}
AsyncProxyUtil.asyncMethod.put(clazz.getName(), methodDtoMap);
}
}
}
日志表结构
CREATE TABLE `async_log` (
`id` bigint(20) NOT NULL COMMENT 'id',
`bean_name` varchar(255) NOT NULL COMMENT 'bean名称',
`value` varchar(255) DEFAULT NULL COMMENT '异步说明',
`biz_code` varchar(30) NOT NULL COMMENT '业务编号',
`params` text COMMENT '参数',
`method_name` varchar(255) NOT NULL COMMENT '执行方法名',
`num` int(3) NOT NULL DEFAULT '1' COMMENT '执行次数',
`result` text COMMENT '执行结果',
`gmt_create` datetime NOT NULL COMMENT '创建时间',
`request_date` datetime(3) NOT NULL COMMENT '请求时间',
`return_date` datetime(3) DEFAULT NULL COMMENT '返回时间',
`is_handle` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否处理',
PRIMARY KEY (`id`) USING BTREE,
KEY `idx_biz_code` (`biz_code`) USING BTREE,
KEY `idx_value` (`value`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='异步执行日志表';
使用
调用方
asyncProxyUtil.proxy(this).save(bizCode, currentDate, userId, userName);
方法
@AsyncEnable(value = "保存")
public void save(@AsyncBizCode Long tempId, Date currentDate, Long userId, String userName) {
//业务处理
}
总结
-
这里也可以选其他的实现方式,也有更简单的写法,只是提供的一种业务处理上的思路,我这里用MethodInterceptor去实现主要想兼容两种带调用方式,即能正常的走原有的调方法的逻辑,也能走异步处理,同时两种业务处理逻辑保持一致
-
日志的存储方式应系统而定,这里只是因为我们这个项目用mysql比较合适,但其实我更推荐用mongodb
-
事务不是那么随便加的,业务执行失败没有保持一致性并不可怕,只要前期的业务校验阶段保证数据的正确性,后期执行失败的完善补偿逻辑,这些都不成问题。可怕的是不去思考,因为思维的懒惰而慢慢走向平庸,一步步变成被指导的对象。
-
最后说一句,虽然目前我们面临的环境不是很乐观,我也在失业的边缘徘徊,但平时有时间还是应该多思考,努力做好一名工程师而不是【码农】