基于文件的多表join实现参考

共 22139字,需浏览 45分钟

 ·

2021-10-10 12:00

走过路过不要错过

点击蓝字关注我们


用例:有N个文件,每个文件只有一列主键,每个文件代表一种属性。即当如PRI1主键在A文件中,说明PRI1具有A属性。这种场景,一般用于数据的筛选,比如需要既有属性A又有属性B的主键有哪些?就是这类场景。

如何处理该场景?

1.解题思路

如果抛却如题所说文件限制,那我们如何解决?

比如,我们可以将每个文件数据导入到redis中,数据结构为hash, redis-key为pri主键,hash-key为属性X, hash-value为1或不存在。在做判定的时候,只需找到对应的key, 再去判断其是否具有对应属性即可解决问题了。

这个方案看起来比较合适,但有两个缺点:1. redis内存数据库,容量有限,不一定能满足大数据量的场景; 2. 针对反向查询的需求无法满足,即想要查找既含有A属性又含有B属性的主键列表,就很难办到。

再比如,我们可以使用类似于mysql之类的关系型数据,先将单文件数据导致单表中,表名以相应属性标识命名,然后以sql形式进行临时计算即可。sql参考如下:

select COALESCE(ta.id,tb.id) as id,      case when ta.id is not null then 1 else 0 end as ta_flag,      case when tb.id is not null then 1 else 0 end as tb_flag   from table_a as ta     full join table_b as tb on ta.id=tb.id;

应该说这种解决方案算是比较好的了,在计算不大的情况下,这种复杂度在数据库领域简直是小场面了。需要再次说明的是,在数据库会新建一个个的小表,它只有一列主键数据,然后在查询的时候再进行计算。这种方案的问题在于,当标识越来越多之后,就会导致小表会越来越多,甚至可能超出数据库限制。原本是一个一般的需求,却要要求非常好数据库支持,也不太好嘛。

不过,上面这个问题,也可以解决。比如我们可以使用行转列的形式,将以上小表转换成一张大表,随后将小表删除,从而达到数据库的普通要求。合并语句也不复杂。参考如下:

create table w_xx as  select COALESCE(ta.id,tb.id) as id,      case when ta.id is not null then 1 else 0 end as ta_flag,      case when tb.id is not null then 1 else 0 end as tb_flag   from table_a as ta     full join table_b as tb on ta.id=tb.id;

如此,基本完美了。

2. 基于文件的行转列数据join

如果我没有外部存储介质,那当如何?如题,直接基于文件,将多个合并起来。看起来并非难事。

如果不考虑内存问题,则可以将每个文件读入为list, 转换为map存储,和上面的redis实现方案类似。只是可能不太现实,也比较简单,忽略实现。

再简单化,如果我们每个文件中保存的主键都是有序的,要想合并就更简单了。
基本思路是,两两文件合并,依次读取行,然后比对是否有相等的值,然后写到新文件中即可。

另外,如果要做并行计算,可以考虑使用上一篇文章提到的 fork/join 框架,非常合场景呢。


2.1. 文件行转列合并主体框架

主要算法为依次遍历各文件,进行数据判定,然后写目标文件。具体实现如下:

/** * 功能描述: 文件合并工具类 * */@Slf4jpublic class FileJoiner {
/** * router结果文件分隔符 */ private static final String CSV_RESULT_FILE_SEPARATOR = ",";
/** * 合并文件语义,等价sql: * select coalesce(a.id, b.id, c.id...) id, * case when a.id is not null then '1' else '' end f_a, * case when b.id is not null then '1' else '' end f_b, * ... * from a * full join b on a.id = b.id * full join c on a.id = c.id * ... * ; */ public JoinFileDescriptor joinById(JoinFileDescriptor a, JoinFileDescriptor b) throws IOException { JoinFileDescriptor mergedDesc = new JoinFileDescriptor(); if(a.getLineCnt() <= 0 && b.getLineCnt() <= 0) { List fieldDesc = new ArrayList<>(); // 先a后b fieldDesc.addAll(a.getFieldInfo()); fieldDesc.addAll(b.getFieldInfo()); mergedDesc.setFieldInfo(fieldDesc); return mergedDesc; } if(a.getLineCnt() <= 0) { List fieldDesc = new ArrayList<>(); // 先b后a fieldDesc.addAll(b.getFieldInfo()); fieldDesc.addAll(a.getFieldInfo()); mergedDesc.setFieldInfo(fieldDesc); return mergedDesc; } if(b.getLineCnt() <= 0) { List fieldDesc = new ArrayList<>(); // 先a后b fieldDesc.addAll(a.getFieldInfo()); fieldDesc.addAll(b.getFieldInfo()); mergedDesc.setFieldInfo(fieldDesc); return mergedDesc; } // 正式合并 a b 表 String mergedPath = a.getPath() + ".m" + a.getDeep(); long cnt = -1; try(BufferedReader aReader = new BufferedReader(new FileReader(a.getPath()))) { try(BufferedReader bReader = new BufferedReader(new FileReader(b.getPath()))) { a.setReader(aReader); b.setReader(bReader); try(OutputStream outputStream = FileUtils.openOutputStream(new File(mergedPath))) { cnt = unionTwoBufferStream(a, b, outputStream); } } } mergedDesc.setPath(mergedPath); mergedDesc.setLineCnt(cnt); mergedDesc.incrDeep(); // 先a后b List fieldDesc = new ArrayList<>(); a.getFieldInfo().forEach(FileFieldDesc::writeOk); b.getFieldInfo().forEach(FileFieldDesc::writeOk); fieldDesc.addAll(a.getFieldInfo()); fieldDesc.addAll(b.getFieldInfo()); mergedDesc.setFieldInfo(fieldDesc); return mergedDesc; }
/** * 合并多文件,无序的,但各字段位置可定位 * * @param fileList 待合并的文件列表 * @param orderedFieldList 需要按序排列 * @return 合并后文件信息及字段列表 * @throws Exception 合并出错抛出 */ public JoinFileDescriptor joinMultiFile(List fileList, List orderedFieldList) throws Exception { ForkJoinPool forkJoinPool = new ForkJoinPool(); FileJoinFJTask fjTask = new FileJoinFJTask(fileList); ForkJoinTask future = forkJoinPool.submit(fjTask); JoinFileDescriptor mergedFile = future.get();// List orderedFieldList = new ArrayList<>();// for (JoinFileDescriptor file1 : fileList) {// List field1 = file1.getFieldInfo().stream()// .map(FileFieldDesc::getFieldName)// .collect(Collectors.toList());// orderedFieldList.addAll(field1);// } return rewriteFileBySelectField(mergedFile, orderedFieldList); }
/** * 按照要求字段顺序重写文件内容 * * @param originFile 当前文件描述 * @param orderedFields 目标字段序列 * @return 处理好的文件实例(元数据或获取) * @throws IOException 写文件异常抛出 */ public JoinFileDescriptor rewriteFileBySelectField(JoinFileDescriptor originFile, List orderedFields) throws IOException { List fieldDescList = originFile.getFieldInfo(); if(checkIfCurrentFileInOrder(fieldDescList, orderedFields)) { log.info("当前文件已按要求排放好,无需再排: {}", orderedFields); return originFile; } Map indicatorMap = composeFieldOrderIndicator(fieldDescList, orderedFields); AtomicLong lineCounter = new AtomicLong(0); String targetFilePath = originFile.getPath() + ".of"; try(BufferedReader aReader = new BufferedReader(new FileReader(originFile.getPath()))) { try(OutputStream outputStream = FileUtils.openOutputStream(new File(targetFilePath))) { String lineData; while ((lineData = aReader.readLine()) != null) { String[] cols = StringUtils.splitPreserveAllTokens( lineData, CSV_RESULT_FILE_SEPARATOR); // 空行 if(cols.length == 0) { continue; } // id,1,... StringBuilder sb = new StringBuilder(cols[0]); for (String f1 : orderedFields) { sb.append(CSV_RESULT_FILE_SEPARATOR); FieldOrderIndicator fieldDescIndicator = indicatorMap.get(f1); if(fieldDescIndicator == null || (fieldDescIndicator.fieldIndex >= cols.length && fieldDescIndicator.fieldDesc.getWriteFlag() == 1)) { continue; } sb.append(cols[fieldDescIndicator.fieldIndex]); } writeLine(outputStream, sb.toString(), lineCounter); } } } JoinFileDescriptor mergedDesc = new JoinFileDescriptor(); mergedDesc.setPath(targetFilePath); mergedDesc.setLineCnt(lineCounter.get()); mergedDesc.setFieldInfo( orderedFields.stream() .map(r -> FileFieldDesc.newField(r, 1)) .collect(Collectors.toList())); return mergedDesc; }
/** * 构造字段下标指示器 * * @param currentFieldDescList 当前字段排列情况 * @param orderedFields 目标序列的字段列表 * @return {"a":{"fieldIndex":1, "fieldDesc":{"name":"aaa", "writeFlag":1}}} */ private Map composeFieldOrderIndicator(List currentFieldDescList, List orderedFields) { Map indicatorMap = new HashMap<>(orderedFields.size()); outer: for (String f1 : orderedFields) { for (int i = 0; i < currentFieldDescList.size(); i++) { FileFieldDesc originField1 = currentFieldDescList.get(i); if (f1.equals(originField1.getFieldName())) { indicatorMap.put(f1, new FieldOrderIndicator(i + 1, originField1)); continue outer; } } indicatorMap.put(f1, null); } return indicatorMap; }
/** * 检测当前文件是按字段先后要求排放好 * * @param currentFieldDescList 现有文件字段排列情况 * @param orderedFields 期望排列的顺序列表 * @return true:已排好序,无需再排; false:未按要求排好 */ private boolean checkIfCurrentFileInOrder(List currentFieldDescList, List orderedFields) { if(orderedFields.size() != currentFieldDescList.size()) { return true; } for (int j = 0; j < orderedFields.size(); j++) { String targetFieldName = orderedFields.get(j); FileFieldDesc possibleFieldDesc = currentFieldDescList.get(j); if(possibleFieldDesc != null && targetFieldName.equals(possibleFieldDesc.getFieldName()) && possibleFieldDesc.getWriteFlag() == 1) { continue; } return false; } return true; }
/** * 计算两个数据流取并集 ( A ∪ B) * * 并将 A/B 标签位写到后置位置中, 1代表存在,空代表存在 * 如A存在且B存在,则写结果为: A,1,1 * 如A存在但B不存在, 则写结果为: A,1, * 如A不存在但B存在, 则写结果为: B,,1 * * 当A或B中存在多列时,以第一列为主键进行关联 * 如A为: 111 * B为: 111,,1,1 * 则合并后的结果为: 111,1,,1,1 * * @return 最终写入的文件行数 */ private long unionTwoBufferStream(JoinFileDescriptor a, JoinFileDescriptor b, OutputStream targetOutputStream) throws IOException { String lineDataLeft; String lineDataRight;// String lineDataLast = null; AtomicLong lineNumCounter = new AtomicLong(0); BufferedReader leftBuffer = a.getReader(); BufferedReader rightBuffer = b.getReader(); lineDataRight = rightBuffer.readLine(); // 主键固定在第一列 int idIndex = 1; String leftId = null; String rightId = getIdColumnValueFromLineData(lineDataRight, idIndex); String lastId = null; int cmpV; while ((lineDataLeft = leftBuffer.readLine()) != null) { // 以左表基础迭代,所以优先检查右表 leftId = getIdColumnValueFromLineData(lineDataLeft, idIndex); if(lineDataRight != null && (cmpV = leftId.compareTo(rightId)) >= 0) { do { if(rightId.equals(lastId)) { lineDataRight = rightBuffer.readLine(); rightId = getIdColumnValueFromLineData( lineDataRight, idIndex); // 合并左右数据 continue; } writeLine(targetOutputStream, joinLineData(cmpV == 0 ? lineDataLeft : null, lineDataRight, a.getFieldInfo(), b.getFieldInfo()), lineNumCounter); lastId = rightId; lineDataRight = rightBuffer.readLine(); rightId = getIdColumnValueFromLineData( lineDataRight, idIndex); } while (lineDataRight != null && (cmpV = leftId.compareTo(rightId)) >= 0); } // 左右相等时,右表数据已写成功,直接跳过即可 if(leftId.equals(lastId)) { continue; } writeLine(targetOutputStream, joinLineData(lineDataLeft, null, a.getFieldInfo(), b.getFieldInfo()), lineNumCounter); lastId = leftId; } // 处理可能剩余的右表数据 while (lineDataRight != null) { rightId = getIdColumnValueFromLineData(lineDataRight, idIndex); if(rightId.equals(lastId)) { lineDataRight = rightBuffer.readLine(); continue; } writeLine(targetOutputStream, joinLineData(null, lineDataRight, a.getFieldInfo(), b.getFieldInfo()), lineNumCounter); lastId = rightId; lineDataRight = rightBuffer.readLine(); } return lineNumCounter.get(); }
/** * 依据字段顺序合并两行数据(以左行为先) * * 最后一个字段为本次需要进行追加的字段 * * @param leftLineData 左边数据 * @param rightLineData 右边数据 * @param leftFields 左边字段信息(可能未写入左边数据中) * @param rightFields 右边字段信息(可能未写入右边数据中) * @return 合并后的结果 */ private String joinLineData(String leftLineData, String rightLineData, List leftFields, List rightFields) { if(StringUtils.isBlank(leftLineData) && StringUtils.isBlank(rightLineData)) { return ""; } int leftEmptyFieldIndex = getFieldEmptyPlaceholderIndex(leftFields); int rightEmptyFieldIndex = getFieldEmptyPlaceholderIndex(rightFields); // 1. 只有右值, 将右值首字段移至行首,其余放右尾部 if(StringUtils.isBlank(leftLineData)) { return joinFieldByRight(rightLineData, leftFields, rightFields, rightEmptyFieldIndex); } // 2. 只有左值 if(StringUtils.isBlank(rightLineData)) { return joinFieldByLeft(leftLineData, leftFields, rightFields, leftEmptyFieldIndex); } // 3. 左右均有部分值 return joinFieldByLeftRight(leftLineData, rightLineData, leftFields, rightFields, leftEmptyFieldIndex, rightEmptyFieldIndex); }
/** * 关联一行仅有右值的数据 * * @param rightLineData 右值数据行(可能含有空值占位未填充) * @param leftFields 左列字段列表 * @param rightFields 右列字段列表 * @param emptyFieldIndex 空占位的 * @return 合并后的字段,此时全部字段均已填充 */ private String joinFieldByRight(String rightLineData, List leftFields, List rightFields, int emptyFieldIndex) { String[] rightCols = StringUtils.splitPreserveAllTokens( rightLineData, CSV_RESULT_FILE_SEPARATOR); if(emptyFieldIndex != -1 && rightCols.length != emptyFieldIndex + 1) { throw new RuntimeException("字段位置不匹配:" + rightCols.length + ", 实际未写:" + (emptyFieldIndex + 1)); } // s1. 填充首列 StringBuilder lineResultBuilder = new StringBuilder(rightCols[0]); // s2. 填充空值左列 for (int i = 0; i < leftFields.size(); i++) { lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR); } // s3. 填充右值有值列 for (int i = 1; i < rightCols.length; i++) { lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR) .append(rightCols[i]); } // s4. 填充右值空值列, 最末留与当前字段使用 if(rightCols.length < rightFields.size() + 1) { if(emptyFieldIndex != -1) { for (int i = emptyFieldIndex; i < rightFields.size() - 1; i++) { lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR); } } // 右值存在字段位写1 lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR).append("1"); } return lineResultBuilder.toString(); }
/** * 关联一行仅有右值的数据 * * @param leftLineData 左值数据行(可能含有空值占位未填充) * @param leftFields 左列字段列表 * @param rightFields 右列字段列表 * @param emptyFieldIndex 空占位的 * @return 合并后的字段,此时全部字段均已填充 */ private String joinFieldByLeft(String leftLineData, List leftFields, List rightFields, int emptyFieldIndex) { String[] cols = StringUtils.splitPreserveAllTokens( leftLineData, CSV_RESULT_FILE_SEPARATOR); if(emptyFieldIndex != -1 && cols.length != emptyFieldIndex + 1) { throw new RuntimeException("字段位置不匹配:" + cols.length + ", 实际未写:" + (emptyFieldIndex + 1)); } // s1. 直接保留左值非空值 StringBuilder lineResultBuilder = new StringBuilder(leftLineData); // s2. 填充左值空值 if(cols.length < rightFields.size() + 1) { if(emptyFieldIndex != -1) { for (int i = emptyFieldIndex; i < leftFields.size() - 1; i++) { lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR); } } lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR).append("1"); } // s3. 填充右值空值 for (int i = 0; i < rightFields.size(); i++) { lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR); } return lineResultBuilder.toString(); } /** * 关联一行仅有右值的数据 * * @param leftLineData 左值数据行(可能含有空值占位未填充) * @param rightLineData 右值数据行(可能含有空值占位未填充) * @param leftFields 左列字段列表 * @param rightFields 右列字段列表 * @param leftEmptyFieldIndex 空占位的 * @param rightEmptyFieldIndex 空占位的 * @return 合并后的字段,此时全部字段均已填充 */ private String joinFieldByLeftRight(String leftLineData, String rightLineData, List leftFields, List rightFields, int leftEmptyFieldIndex, int rightEmptyFieldIndex) { String[] leftCols = StringUtils.splitPreserveAllTokens( leftLineData, CSV_RESULT_FILE_SEPARATOR); if(leftEmptyFieldIndex != -1 && leftCols.length != leftEmptyFieldIndex + 1) { throw new RuntimeException("字段位置不匹配:" + leftCols.length + ", 实际未写:" + (leftEmptyFieldIndex + 1)); } String[] rightCols = StringUtils.splitPreserveAllTokens( rightLineData, CSV_RESULT_FILE_SEPARATOR); if(rightEmptyFieldIndex != -1 && rightCols.length != rightEmptyFieldIndex + 1) { throw new RuntimeException("字段位置不匹配:" + rightCols.length + ", 实际未写:" + (rightEmptyFieldIndex + 1)); } // s1. 直接保留左值非空值 StringBuilder lineResultBuilder = new StringBuilder(leftLineData); // s2. 填充左值空值, 最后一位留给当前字段 if(leftCols.length < leftFields.size() + 1) { if(leftEmptyFieldIndex != -1) { for (int i = leftEmptyFieldIndex; i < leftFields.size() - 1; i++) { lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR); } } // 左值存在字段位写1 lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR).append("1"); } // s3. 填充右值非空值,第一列忽略 for (int i = 1; i < rightCols.length; i++) { lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR) .append(rightCols[i]); } if(rightCols.length < rightFields.size() + 1) { if(rightEmptyFieldIndex != -1) { for (int i = rightEmptyFieldIndex; i < rightFields.size() - 1; i++) { lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR); } } // 右值存在字段位写1 lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR).append("1"); } return lineResultBuilder.toString(); }
/** * 获取首个字段未被填充值的位置 * * @param fieldList 所有字段列表 * @return 首个未填充的字段位置 */ private int getFieldEmptyPlaceholderIndex(List fieldList) { for (int i = 0; i < fieldList.size(); i++) { FileFieldDesc f1 = fieldList.get(i); if(f1.getWriteFlag() == 0) { return i; } } return -1; }
/** * 从一行数据中读取id列字段值 * * @param lineData 该行内容 * @param idIndex id列所在下标,从1开始计算 * @return id的值 */ private String getIdColumnValueFromLineData(String lineData, int idIndex) { if(lineData == null) { return null; } if(idIndex <= 0) { log.warn("id行下标给定错误:{}," + "返回整行,请注意排查原因", idIndex); return lineData; } // 固定使用','分隔多列数据 String[] cols = StringUtils.splitPreserveAllTokens(lineData, CSV_RESULT_FILE_SEPARATOR); // 列超限,返回空 if(idIndex > cols.length) { log.warn("id列下标超限,请排查:{} -> {}", lineData, idIndex); return ""; } return cols[idIndex - 1]; }
/** * 写单行数据到输出流(带计数器) */ private void writeLine(OutputStream outputStream, String lineData, AtomicLong counter) throws IOException { if(counter.get() > 0) { outputStream.write("\n".getBytes()); } outputStream.write(lineData.getBytes()); counter.incrementAndGet(); }
/** * 字段序列号指示器 */ private class FieldOrderIndicator { int fieldIndex; FileFieldDesc fieldDesc; FieldOrderIndicator(int fieldIndex, FileFieldDesc fieldDesc) { this.fieldIndex = fieldIndex; this.fieldDesc = fieldDesc; } }
/** * 文件join任务分解类 */ private static class FileJoinFJTask extends RecursiveTask {
private static final FileJoiner joiner = new FileJoiner();
private List fileList;
public FileJoinFJTask(List fileList) { this.fileList = fileList; }
@Override public JoinFileDescriptor compute() { int len = fileList.size(); if(len > 2) { int mid = len / 2; FileJoinFJTask subTask1 = new FileJoinFJTask(fileList.subList(0, mid)); subTask1.fork(); FileJoinFJTask subTask2 = new FileJoinFJTask(fileList.subList(mid, len)); subTask2.fork();
JoinFileDescriptor m1 = subTask1.join(); JoinFileDescriptor m2 = subTask2.join(); return joinTwoFile(m1, m2); } if(len == 2) { return joinTwoFile(fileList.get(0), fileList.get(1)); } // len == 1 if(len == 1) { return fileList.get(0); } throw new RuntimeException("待合并的文件数为0?->" + fileList.size()); }
/** * 合并两个有序文件 * * @param m1 文件1 * @param m2 文件2 * @return 合并后的文件 */ private JoinFileDescriptor joinTwoFile(JoinFileDescriptor m1, JoinFileDescriptor m2) { try {// System.out.println("join file1:" + m1.getPath().substring(82) + ", fields:" + m1.getFieldInfo()// + ", file2:" + m2.getPath().substring(82) + ", fields:" + m2.getFieldInfo()); return joiner.joinById(m1, m2); } catch (IOException e) { log.error("合并文件失败,{}, {}", m1, m2, e); throw new RuntimeException(e); } } }}

总体算法框架就是这样了,外部调用时,可以串行计算调用 joinById, 自行合并。也可以直接joinMultiFile, 内部进行并行计算了。然后,最后再可以按照自行要求,做顺序固化。此处并行计算的方案,正则上篇中讲到的fork/join.


2.2. 几个辅助类

如上计算过程中,需要使用一些辅助型数据结构,以表达清楚过程。以下为辅助类信息:

// 1. JoinFileDescriptor import java.io.BufferedReader;import java.util.List;
/** * 功能描述: 需要关联join的文件描述类 * */
public class JoinFileDescriptor {
/** * 文件路径 */ private String path;
/** * 文件行数 */ private long lineCnt;
/** * 字段名列表,按先后排列写入文件 */ private List fieldInfo;
/** * 合并深度,未合并时为0 */ private int deep;
public JoinFileDescriptor() { }
public JoinFileDescriptor(String path, int lineCnt, List fieldInfo) { this.path = path; this.lineCnt = lineCnt; this.fieldInfo = fieldInfo; }
private transient BufferedReader reader;
public BufferedReader getReader() { return reader; }
public void setReader(BufferedReader reader) { this.reader = reader; }
public String getPath() { return path; }
public void setPath(String path) { this.path = path; }
public long getLineCnt() { return lineCnt; }
public void setLineCnt(long lineCnt) { this.lineCnt = lineCnt; }
public List getFieldInfo() { return fieldInfo; }
public void setFieldInfo(List fieldInfo) { this.fieldInfo = fieldInfo; }
public int getDeep() { return deep; }
public void incrDeep() { this.deep++; }
@Override public String toString() { return "JoinFileDescriptor{" + "path='" + path + '\'' + ", lineCnt=" + lineCnt + ", fieldInfo=" + fieldInfo + ", deep=" + deep + '}'; }}
// 2. FileFieldDesc/** * 功能描述: 文件字段描述 * */public class FileFieldDesc { /** * 字段名列表,按先后排列写入文件 */ private String fieldName;
/** * 字段是否被真实写入文件, *

* 1:已写入,0:未写入(序号排在前面的字段,需要后字段合并时同步写入) */ private int writeFlag;
private FileFieldDesc(String fieldName) { this.fieldName = fieldName; }
public static FileFieldDesc newField(String fieldName) { return new FileFieldDesc(fieldName); }
public static FileFieldDesc newField(String fieldName, int writeFlag) { FileFieldDesc f = new FileFieldDesc(fieldName); f.setWriteFlag(writeFlag); return f; }
public String getFieldName() { return fieldName; }
public void setFieldName(String fieldName) { this.fieldName = fieldName; }
public int getWriteFlag() { return writeFlag; }
public void setWriteFlag(int writeFlag) { this.writeFlag = writeFlag; }
public void writeOk() { writeFlag = 1; }
@Override public String toString() { return "FileFieldDesc{" + "fieldName='" + fieldName + '\'' + ", writeFlag=" + writeFlag + '}'; }}

还是很简单的吧。


2.3. 单元测试

没有测试不算完成,一个好的测试应该包含所有可能的计算情况,结果。比如几个文件合并,合并后有几行,哪几行的数据应该如何等等。害,那些留给使用者自行完善吧。简单测试如下。

/** * 功能描述: 文件合并工具类测试 * */public class FileJoinerTest {
@Before public void setup() { // 避免log4j解析报错 System.setProperty("catalina.home", "/tmp"); }
@Test public void testJoinById() throws Exception { long startTime = System.currentTimeMillis(); List resultLines; String classpath = this.getClass().getResource("/").getPath(); JoinFileDescriptor file1 = new JoinFileDescriptor( classpath + "file/t0/crowd_a.csv", 4, Collections.singletonList(FileFieldDesc.newField("crowd_a"))); JoinFileDescriptor file2 = new JoinFileDescriptor( classpath + "file/t0/crowd_b.csv", 5, Collections.singletonList(FileFieldDesc.newField("crowd_b"))); FileJoiner joiner = new FileJoiner(); JoinFileDescriptor fileMerged = joiner.joinById(file1, file2); resultLines = FileUtils.readLines(new File(fileMerged.getPath()), "utf-8"); System.out.println("result:" + fileMerged); Assert.assertEquals("合并结果行数不正确", 6L, fileMerged.getLineCnt()); Assert.assertEquals("道行合并结果不正确", "6001,1,1", resultLines.get(0)); Assert.assertEquals("道行合并结果不正确", "6011,,1", resultLines.get(5)); JoinFileDescriptor file3 = new JoinFileDescriptor( classpath + "file/t0/crowd_c.csv", 5, Collections.singletonList(FileFieldDesc.newField("crowd_c"))); fileMerged = joiner.joinById(fileMerged, file3); System.out.println("result3:" + fileMerged);

JoinFileDescriptor file4 = new JoinFileDescriptor( classpath + "file/t0/crowd_d.csv", 4, Collections.singletonList(FileFieldDesc.newField("crowd_d"))); fileMerged = joiner.joinById(fileMerged, file4); System.out.println("result4:" + fileMerged);
JoinFileDescriptor file6 = new JoinFileDescriptor( classpath + "file/t0/crowd_f.csv", 4, Collections.singletonList(FileFieldDesc.newField("crowd_f"))); fileMerged = joiner.joinById(fileMerged, file6); System.out.println("result4:" + fileMerged);
JoinFileDescriptor file5 = new JoinFileDescriptor( classpath + "file/t0/crowd_e.csv", 4, Collections.singletonList(FileFieldDesc.newField("crowd_e"))); fileMerged = joiner.joinById(fileMerged, file5); System.out.println("result4:" + fileMerged);
fileMerged = joiner.rewriteFileBySelectField(fileMerged, Arrays.asList("crowd_a", "crowd_b", "crowd_c", "crowd_d", "crowd_e", "crowd_f")); System.out.println("result4:" + fileMerged);
System.out.println("costTime:" + (System.currentTimeMillis() - startTime) + "ms"); }
@Test public void testJoinByIdUseForkJoin() throws Exception { long startTime = System.currentTimeMillis(); List sortedFileList = new ArrayList<>(); String classpath = this.getClass().getResource("/").getPath(); JoinFileDescriptor file1 = new JoinFileDescriptor( classpath + "file/t0/crowd_a.csv", 4, Collections.singletonList(FileFieldDesc.newField("crowd_a"))); sortedFileList.add(file1);
JoinFileDescriptor file2 = new JoinFileDescriptor( classpath + "file/t0/crowd_b.csv", 5, Collections.singletonList(FileFieldDesc.newField("crowd_b"))); sortedFileList.add(file2);
JoinFileDescriptor file3 = new JoinFileDescriptor( classpath + "file/t0/crowd_c.csv", 5, Collections.singletonList(FileFieldDesc.newField("crowd_c"))); sortedFileList.add(file3);
JoinFileDescriptor file4 = new JoinFileDescriptor( classpath + "file/t0/crowd_d.csv", 4, Collections.singletonList(FileFieldDesc.newField("crowd_d"))); sortedFileList.add(file4);
JoinFileDescriptor file5 = new JoinFileDescriptor( classpath + "file/t0/crowd_e.csv", 10, Collections.singletonList(FileFieldDesc.newField("crowd_e"))); sortedFileList.add(file5);
JoinFileDescriptor file6 = new JoinFileDescriptor( classpath + "file/t0/crowd_f.csv", 10, Collections.singletonList(FileFieldDesc.newField("crowd_f"))); sortedFileList.add(file6); Collections.shuffle(sortedFileList);
FileJoiner joiner = new FileJoiner(); JoinFileDescriptor fileMerged = joiner.joinMultiFile(sortedFileList, Arrays.asList("crowd_a", "crowd_b", "crowd_c", "crowd_d", "crowd_e", "crowd_f")); System.out.println("fileMerged:" + fileMerged); System.out.println("costTime:" + (System.currentTimeMillis() - startTime) + "ms"); }
}

下面这个并行计算没有断言,一是懒得加,二是这种确实也复杂,这也是和分布系统排查问题难表暗合之意。另外值得一提的是,为了验证代码的稳定性,单测中添加了一个文件的随机打乱,从而保证了任意顺序都可拿到最终结果。而在实际应用中,可以按照文件行数大小排序,使用小文件与小文件合,大文件与大文件合,从而避免许多空行读而浪费性能。这也是自己实现的好处,想起来哪里想调整下,立即横刀立马。

下面给几个样例文件:

// crowd_a.csv6001600260036009// crowd_b.csv60016002600360066011// crowd_c.csv6001600360066009...e,f,g...

以上工具类,可以看作是对前面所示sql语义的同等实现,虽不能与官方同日而语,但也有一定的应用场景,只待各位发现。供诸君参考。(谁知道呢,也许你用MR更简单更高效)




往期精彩推荐



腾讯、阿里、滴滴后台面试题汇总总结 — (含答案)

面试:史上最全多线程面试题 !

最新阿里内推Java后端面试题

JVM难学?那是因为你没认真看完这篇文章


END


关注作者微信公众号 —《JAVA烂猪皮》


了解更多java后端架构知识以及最新面试宝典


你点的每个好看,我都认真当成了


看完本文记得给作者点赞+在看哦~~~大家的支持,是作者源源不断出文的动力


原文作者:【等你归去来】

文章出处:https://www.cnblogs.com/yougewe/p/14950347.html

浏览 12
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报