【凯哥】第三届数据库大赛创新上云性能挑战赛-ADB赛道参赛总结
共 3227字,需浏览 7分钟
·
2021-10-12 16:16
前言:经过三个多月的角逐,成绩最终以 34.380 的成绩定格在第四名,大佬太多,膜拜。文章末尾会附上 git 仓库地址,如果有疑问的地方欢迎到 github 上与我交流。
赛题简介
硬件资源:8C8G
存储介质:英特尔® 傲腾™ PMem 非易失性持久化内存(APP Direct Mode)
数据总量:两张表,每张表两列,每列 10 亿条数据,总量为 32 亿个 Long
数据特征:均匀分布
因为数据均匀分布,考虑使用从高位截取若干位分桶的方式对数据做索引。
更多赛题信息这里不再赘述,请至 https://tianchi.aliyun.com/competition/entrance/531895/information 查看
整体方案
Load 阶段
:整体采用纯异步(四组线程)的方式,每组线程负责消费加工上游线程组的产品,并将产出物交付给下游线程组,同时根据每组线程特性及负载调整该组线程数量、独享或共享,线程组有共享或独享能力后可以精细化调整线程数量
(因为每组线程耗时不均匀,需要精细调整线程数量来弥补“木桶短板”),解析分桶分离后分桶线程 CPU Cache 更易处理,各组线程间通讯均使用 ConcurrentLinkedQueue + Thread.sleep,分桶数量为 2048,每桶一个文件。
查询阶段
:采用多线程+桶排序+快速选择方式。
读取
读取采用双文件并行处理,每个文件配置 3 个读取线程,首先将每个文件分成三份,第三份略小(因为结尾可能不足一个读取块),线程内 MMAP 读取
,每块读取大小为 8MB + 64 字节,多读取的 64 字节用于行尾溢出处理,读取完成后将数据块投递到解析 Long 线程组。
解析 Long
解析 Long 线程组为共享线程组,数量为 3,解析 Long 时先倒序找到逗号或换行符,然后我们可以得到一个 Long 数字的起始位置,有了起始位置那么我们可以对解析 Long 这个定长循环做循环展开
。
1private static long read_long_19(long array) {
2 long v0 = (unsafe.getByte(array + 0) - '0') * 1000000000000000000L;
3 long v1 = (unsafe.getByte(array + 1) - '0') * 100000000000000000L;
4 long v2 = (unsafe.getByte(array + 2) - '0') * 10000000000000000L;
5 long v3 = (unsafe.getByte(array + 3) - '0') * 1000000000000000L;
6 long v4 = (unsafe.getByte(array + 4) - '0') * 100000000000000L;
7 long v5 = (unsafe.getByte(array + 5) - '0') * 10000000000000L;
8 long v6 = (unsafe.getByte(array + 6) - '0') * 1000000000000L;
9 long v7 = (unsafe.getByte(array + 7) - '0') * 100000000000L;
10 long v8 = (unsafe.getByte(array + 8) - '0') * 10000000000L;
11 long v9 = (unsafe.getByte(array + 9) - '0') * 1000000000L;
12 long v10 = (unsafe.getByte(array + 10) - '0') * 100000000L;
13 long v11 = (unsafe.getByte(array + 11) - '0') * 10000000L;
14 long v12 = (unsafe.getByte(array + 12) - '0') * 1000000L;
15 long v13 = (unsafe.getByte(array + 13) - '0') * 100000L;
16 long v14 = (unsafe.getByte(array + 14) - '0') * 10000L;
17 long v15 = (unsafe.getByte(array + 15) - '0') * 1000L;
18 long v16 = (unsafe.getByte(array + 16) - '0') * 100L;
19 long v17 = (unsafe.getByte(array + 17) - '0') * 10L;
20 long v18 = (unsafe.getByte(array + 18) - '0') * 1L;
21 return v0 + v1 + v2 + v3 + v4 + v5 + v6 + v7 + v8 + v9 + v10 + v11 + v12 + v13 + v14 + v15 + v16 + v17 + v18;
22}
分桶
分桶线程为独享线程,每个文件配置一个分桶线程,线程内分两阶段对两列进行分桶,为加快分桶速度,我们从 CPU Cache 缓存命中和帮助 CPU 分支预测两个方面做优化,同时为了加快落盘速度,我们可以对数据做些简单压缩。
数据压缩
:因为我们分桶超过了 256,所以在存储数据时可以舍弃掉高位的一个字节(服务器字节序为小端模式,可以刚好通过覆盖的方式舍弃掉高位一个字节),仅存储剩余的 7 个字节。
CPU Cache
:首先我们申请一块与 CPU L2 缓存大小匹配的内存作为临时缓存,经查询 L2 的缓存大小为 1M, 我的分桶数量为 2048, 那么每桶的数量应为 1M / 8 /2048 = 64, 因为对数据做了压缩,所以实际申请的内存数量为 7 * 64 * 2048 = 917504 可以被 CPU Cache 覆盖,分桶时先将数据写到临时缓存,等临时缓存满时再一次性将该桶的数据复制到写缓存,写缓存满时再将写缓存投递到落盘线程。
CPU 分支预测
:因为数据大致是均匀的,因此当一个桶的数据满时,其他桶的数据也接近满了,这时我们可以直接将所有桶的数据都复制到写缓存,这样基本可以消除判断临时缓存是否已满带来的损耗
。
分桶代码:
1private void store_order_values(
2 ByteBuf long_buf, long lv_1_buf, int[] pos_array) throws InterruptedException {
3 long buf_offset = long_buf.offset;
4 long buf_write_index = long_buf.write_index;
5 int size = (int) (buf_write_index - buf_offset) >>> 3;
6 long value;
7 for (int i = 0; i < size; i++) {
8 value = unsafe.getLong(buf_offset + ((long) i << 3));
9 int index = index(value);
10 int put_pos = pos_array[index]++;
11 if (((put_pos) & CP_LV_1_COUNT_MASK) == CP_LV_1_COUNT_MASK) {
12 long put_address = lv_1_buf + put_pos * 7L;
13 for (int j = 0; j < 7; j++) {
14 unsafe.putByte(put_address++, (byte) value);
15 value >>= 8;
16 }
17 do_flush(lv_1_buf, pos_array, order_size_array, order_byte_buf_array, table.order_key_bucket_array);
18 } else {
19 unsafe.putLong(lv_1_buf + put_pos * 7L, value);
20 }
21 }
22}
分桶线程虽然只给了两个,但是分桶依旧快于其他线程(四组线程耗时分别是:23.5+,23+,22+,22.5+),于是在分桶线程等待获取 Long 数组时,让分桶线程分担一部分写操作。
1parse_task = work_parse_tasks.poll();
2if (parse_task == null) {
3 WriteTask write_task = WriteThread.work_write_tasks.poll();
4 if (write_task == null) {
5 Thread.sleep(1);
6 } else {
7 write_task.write();
8 WriteThread.free_write_tasks.offer(write_task);
9 }
10 continue;
11}
落盘
落盘线程为共享线程,数量为 5 个,落盘时拉取到写 Task 后使用 FileChannel 写入到对应的文件,落盘结束后将每个桶的大小记录到 Meta 文件。
查询
查询阶段 load 时将桶信息从 Meta 文件中恢复到内存。
查询采用 N*3
个线程并行查询,即每次查询请求附带两个辅助查询线程配合主查询线程并行查询,线程内先对读取的数据进行桶排序,桶排序完成后在主线程使用快速选择定位查询结果
,线程间通讯采用阻塞队列进行通信。经过多线程处理和桶排序后,快速选择起到的提升作用不明显,和直接 Arrays.sort 区别不大。
可探索的改进
一桶一文件的劣势非常明显,需要初始化大量文件(2048 * 4),需要写大量小文件,查询时相比于多桶一文件的 4096 分桶需要多读取一倍的数据,改成多桶一个文件可以充分利用傲腾强大的随机读取能力,将查询性能提升一倍。
总结
拆分解析 Long 与分桶线程,以减少分桶线程数提高缓存命中,同时留给其他线程组更多线程使线程资源分配更合理。
解析 Long 时循环展开以减少分支指令带来的开销
使用临时缓存以增加分桶速度提升缓存命中
缓存复制时根据数据均匀特性帮助分支预测
分桶线程分担写任务以更大发挥 PMem 多线程写能力
数据压缩提升写入速度
多线程查询提升 IO 利用率
源码地址:https://github.com/wangkaish/ali_race2021_r2_adb