实现一个小轮子:用AOP实现异步上传
共 13381字,需浏览 27分钟
·
2022-07-26 07:05
点击关注公众号,Java干货及时送达👇
文章来源:https://c1n.cn/2jnRk
背景
代码与实现
结语
背景
相信很多系统里都有这一种场景:用户上传 Excel,后端解析 Excel 生成相应的数据,校验数据并落库。
这就引发了一个问题:如果 Excel 的行非常多,或者解析非常复杂,那么解析+校验的过程就非常耗时。
如果接口是一个同步的接口,则非常容易出现接口超时,进而返回的校验错误信息也无法展示给前端,这就需要从功能上解决这个问题。
进一步的,如果我们每一个上传的任务都写一次线程池异步+日志记录的代码就显得非常冗余。同时,非业务代码也侵入了业务代码导致代码可读性下降。
从通用性的角度上讲,这种业务场景非常适合模板方法的设计模式。即设计一个抽象类,定义上传的抽象方法,同时实现记录日志的方法。
//伪代码,省略了一些步骤
@Slf4j
public abstract class AbstractUploadService<T> {
public static ThreadFactory commonThreadFactory = new ThreadFactoryBuilder().setNameFormat("-upload-pool-%d")
.setPriority(Thread.NORM_PRIORITY).build();
public static ExecutorService uploadExecuteService = new ThreadPoolExecutor(10, 20, 300L,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024), commonThreadFactory, new ThreadPoolExecutor.AbortPolicy());
protected abstract String upload(List<T> data);
protected void execute(String userName, List<T> data) {
// 生成一个唯一编号
String uuid = UUID.randomUUID().toString().replace("-", "");
uploadExecuteService.submit(() -> {
// 记录日志
writeLogToDb(uuid, userName, updateTime, "导入中");
// 一个字符串,用于记录upload的校验信息
String errorLog = "";
//执行上传
try {
errorLog = upload(data);
writeSuccess(uuid, "导入中", updateTime);
} catch (Exception e) {
LOGGER.error("导入错误", e);
//计入导入错误日志
writeFailToDb(uuid, "导入失败", e.getMessage(), updateTime);
}
/**
* 检查一下upload是不是返回了错误日志,如果有,需要注意记录
*
* 因为错误日志可能比较长,
* 可以写入一个文件然后上传到公司的文件服务器,
* 然后在查看结果的时候允许用户下载该文件,
* 这里不展开只做示意
*/
if (StringUtils.isNotEmpty(errorLog)) {
writeFailToDb(uuid, "导入失败", errorLog, updateTime);
}
});
}
}
如上文所示,模板方法的方式虽然能够极大地减少重复代码,但是仍有下面两个问题:
upload 方法得限定死参数结构,一旦有变化,不是很容易更改参数类型 or 数量
每个上传的 service 还是要继承一下这个抽象类,还是不够简便和优雅
为解决上面两个问题,我也经常进行思考,结果在某次自定义事务提交 or 回滚的方法的时候得到了启发。
这个上传的逻辑过程和事务提交的逻辑过程非常像,都是在实际操作前需要做初始化操作,然后在异常或者成功的时候做进一步操作。
这种完全可以通过环装切面的方式实现,由此,我写了一个小轮子给团队使用。(当然了,这个小轮子在本人所在的大团队内部使用的很好,但是不一定适合其他人,但是思路一样,大家可以扩展自己的功能)
多说无益,上代码!
代码与实现
public class FileUploadLog {
private Integer id;
// 唯一编码
private String batchNo;
// 上传到文件服务器的文件key
private String key;
// 错误日志文件名
private String fileName;
//上传状态
private Integer status;
//上传人
private String createName;
//上传类型
private String uploadType;
//结束时间
private Date endTime;
// 开始时间
private Date startTime;
}
public enum UploadType {
未知(1,"未知"),
类型2(2,"类型2"),
类型1(3,"类型1");
private int code;
private String desc;
private static Map<Integer, UploadType> map = new HashMap<>();
static {
for (UploadType value : UploadType.values()) {
map.put(value.code, value);
}
}
UploadType(int code, String desc) {
this.code = code;
this.desc = desc;
}
public int getCode() {
return code;
}
public String getDesc() {
return desc;
}
public static UploadType getByCode(Integer code) {
return map.get(code);
}
}
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Upload {
// 记录上传类型
UploadType type() default UploadType.未知;
}
@Component
@Aspect
@Slf4j
public class UploadAspect {
public static ThreadFactory commonThreadFactory = new ThreadFactoryBuilder().setNameFormat("upload-pool-%d")
.setPriority(Thread.NORM_PRIORITY).build();
public static ExecutorService uploadExecuteService = new ThreadPoolExecutor(10, 20, 300L,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024), commonThreadFactory, new ThreadPoolExecutor.AbortPolicy());
@Pointcut("@annotation(com.aaa.bbb.Upload)")
public void uploadPoint() {}
@Around(value = "uploadPoint()")
public Object uploadControl(ProceedingJoinPoint pjp) {
// 获取方法上的注解,进而获取uploadType
MethodSignature signature = (MethodSignature)pjp.getSignature();
Upload annotation = signature.getMethod().getAnnotation(Upload.class);
UploadType type = annotation == null ? UploadType.未知 : annotation.type();
// 获取batchNo
String batchNo = UUID.randomUUID().toString().replace("-", "");
// 初始化一条上传的日志,记录开始时间
writeLogToDB(batchNo, type, new Date)
// 线程池启动异步线程,开始执行上传的逻辑,pjp.proceed()就是你实现的上传功能
uploadExecuteService.submit(() -> {
try {
String errorMessage = pjp.proceed();
// 没有异常直接成功
if (StringUtils.isEmpty(errorMessage)) {
// 成功,写入数据库,具体不展开了
writeSuccessToDB(batchNo);
} else {
// 失败,因为返回了校验信息
fail(errorMessage, batchNo);
}
} catch (Throwable e) {
LOGGER.error("导入失败:", e);
// 失败,抛了异常,需要记录
fail(e.toString(), batchNo);
}
});
return new Object();
}
private void fail(String message, String batchNo) {
// 生成上传错误日志文件的文件key
String s3Key = UUID.randomUUID().toString().replace("-", "");
// 生成文件名称
String fileName = "错误日志_" +
DateUtil.dateToString(new Date(), "yyyy年MM月dd日HH时mm分ss秒") + ExportConstant.txtSuffix;
String filePath = "/home/xxx/xxx/" + fileName;
// 生成一个文件,写入错误数据
File file = new File(filePath);
OutputStream outputStream = null;
try {
outputStream = new FileOutputStream(file);
outputStream.write(message.getBytes());
} catch (Exception e) {
LOGGER.error("写入文件错误", e);
} finally {
try {
if (outputStream != null)
outputStream.close();
} catch (Exception e) {
LOGGER.error("关闭错误", e);
}
}
// 上传错误日志文件到文件服务器,我们用的是s3
upFileToS3(file, s3Key);
// 记录上传失败,同时记录错误日志文件地址到数据库,方便用户查看错误信息
writeFailToDB(batchNo, s3Key, fileName);
// 删除文件,防止硬盘爆炸
deleteFile(file)
}
}
至此整个异步上传功能就完成了,是不是很简单?(笑)
@Upload(type = UploadType.类型1)
public String upload(List<ClassOne> items) {
if (items == null || items.size() == 0) {
return;
}
//校验
String error = uploadCheck(items);
if (StringUtils.isNotEmpty) {
return error;
}
//删除旧的
deleteAll();
//插入新的
batchInsert(items);
}
结语
1. 面试难题:分布式 Session 实现难点,这篇就够!
最近面试BAT,整理一份面试资料《Java面试BATJ通关手册》,覆盖了Java核心技术、JVM、Java并发、SSM、微服务、数据库、数据结构等等。
获取方式:点“在看”,关注公众号并回复 Java 领取,更多内容陆续奉上。
PS:因公众号平台更改了推送规则,如果不想错过内容,记得读完点一下“在看”,加个“星标”,这样每次新文章推送才会第一时间出现在你的订阅列表里。
点“在看”支持小哈呀,谢谢啦😀