【凯哥】第三届数据库大赛创新上云性能挑战赛-ADB赛道参赛总结

共 3227字,需浏览 7分钟

 ·

2021-10-12 16:16

前言:经过三个多月的角逐,成绩最终以 34.380 的成绩定格在第四名,大佬太多,膜拜。文章末尾会附上 git 仓库地址,如果有疑问的地方欢迎到 github 上与我交流。

队伍排名

赛题简介

硬件资源:8C8G
存储介质:英特尔® 傲腾™ PMem 非易失性持久化内存(APP Direct Mode)
数据总量:两张表,每张表两列,每列 10 亿条数据,总量为 32 亿个 Long
数据特征:均匀分布

因为数据均匀分布,考虑使用从高位截取若干位分桶的方式对数据做索引。
更多赛题信息这里不再赘述,请至 https://tianchi.aliyun.com/competition/entrance/531895/information 查看

整体方案

Load 阶段:整体采用纯异步(四组线程)的方式,每组线程负责消费加工上游线程组的产品,并将产出物交付给下游线程组,同时根据每组线程特性及负载调整该组线程数量、独享或共享,线程组有共享或独享能力后可以精细化调整线程数量(因为每组线程耗时不均匀,需要精细调整线程数量来弥补“木桶短板”),解析分桶分离后分桶线程 CPU Cache 更易处理,各组线程间通讯均使用 ConcurrentLinkedQueue + Thread.sleep,分桶数量为 2048,每桶一个文件。

查询阶段:采用多线程+桶排序+快速选择方式。

整体方案

读取

读取采用双文件并行处理,每个文件配置 3 个读取线程,首先将每个文件分成三份,第三份略小(因为结尾可能不足一个读取块),线程内 MMAP 读取,每块读取大小为 8MB + 64 字节,多读取的 64 字节用于行尾溢出处理,读取完成后将数据块投递到解析 Long 线程组。

读取

解析 Long

解析 Long 线程组为共享线程组,数量为 3,解析 Long 时先倒序找到逗号或换行符,然后我们可以得到一个 Long 数字的起始位置,有了起始位置那么我们可以对解析 Long 这个定长循环做循环展开

 1private static long read_long_19(long array) {  
2    long v0 = (unsafe.getByte(array + 0) - '0') * 1000000000000000000L;  
3    long v1 = (unsafe.getByte(array + 1) - '0') * 100000000000000000L;  
4    long v2 = (unsafe.getByte(array + 2) - '0') * 10000000000000000L;  
5    long v3 = (unsafe.getByte(array + 3) - '0') * 1000000000000000L;  
6    long v4 = (unsafe.getByte(array + 4) - '0') * 100000000000000L;  
7    long v5 = (unsafe.getByte(array + 5) - '0') * 10000000000000L;  
8    long v6 = (unsafe.getByte(array + 6) - '0') * 1000000000000L;  
9    long v7 = (unsafe.getByte(array + 7) - '0') * 100000000000L;  
10    long v8 = (unsafe.getByte(array + 8) - '0') * 10000000000L;  
11    long v9 = (unsafe.getByte(array + 9) - '0') * 1000000000L;  
12    long v10 = (unsafe.getByte(array + 10) - '0') * 100000000L;  
13    long v11 = (unsafe.getByte(array + 11) - '0') * 10000000L;  
14    long v12 = (unsafe.getByte(array + 12) - '0') * 1000000L;  
15    long v13 = (unsafe.getByte(array + 13) - '0') * 100000L;  
16    long v14 = (unsafe.getByte(array + 14) - '0') * 10000L;  
17    long v15 = (unsafe.getByte(array + 15) - '0') * 1000L;  
18    long v16 = (unsafe.getByte(array + 16) - '0') * 100L;  
19    long v17 = (unsafe.getByte(array + 17) - '0') * 10L;  
20    long v18 = (unsafe.getByte(array + 18) - '0') * 1L;  
21    return v0 + v1 + v2 + v3 + v4 + v5 + v6 + v7 + v8 + v9 + v10 + v11 + v12 + v13 + v14 + v15 + v16 + v17 + v18;  
22}

分桶

分桶线程为独享线程,每个文件配置一个分桶线程,线程内分两阶段对两列进行分桶,为加快分桶速度,我们从 CPU Cache 缓存命中和帮助 CPU 分支预测两个方面做优化,同时为了加快落盘速度,我们可以对数据做些简单压缩。

数据压缩:因为我们分桶超过了 256,所以在存储数据时可以舍弃掉高位的一个字节(服务器字节序为小端模式,可以刚好通过覆盖的方式舍弃掉高位一个字节),仅存储剩余的 7 个字节。

CPU Cache:首先我们申请一块与 CPU L2 缓存大小匹配的内存作为临时缓存,经查询 L2 的缓存大小为 1M, 我的分桶数量为 2048, 那么每桶的数量应为 1M / 8 /2048 = 64, 因为对数据做了压缩,所以实际申请的内存数量为 7 * 64 * 2048 = 917504 可以被 CPU Cache 覆盖,分桶时先将数据写到临时缓存,等临时缓存满时再一次性将该桶的数据复制到写缓存,写缓存满时再将写缓存投递到落盘线程。

CPU 分支预测:因为数据大致是均匀的,因此当一个桶的数据满时,其他桶的数据也接近满了,这时我们可以直接将所有桶的数据都复制到写缓存,这样基本可以消除判断临时缓存是否已满带来的损耗

分桶

分桶代码:

 1private void store_order_values(  
2        ByteBuf long_buf, long lv_1_buf, int[] pos_array)
 throws InterruptedException 
{  
3    long buf_offset = long_buf.offset;  
4    long buf_write_index = long_buf.write_index;  
5    int size = (int) (buf_write_index - buf_offset) >>> 3;  
6    long value;  
7    for (int i = 0; i < size; i++) {  
8        value = unsafe.getLong(buf_offset + ((long) i << 3));  
9        int index = index(value);  
10        int put_pos = pos_array[index]++;  
11        if (((put_pos) & CP_LV_1_COUNT_MASK) == CP_LV_1_COUNT_MASK) {  
12            long put_address = lv_1_buf + put_pos * 7L;  
13            for (int j = 0; j < 7; j++) {  
14                unsafe.putByte(put_address++, (byte) value);  
15                value >>= 8;  
16            }  
17            do_flush(lv_1_buf, pos_array, order_size_array, order_byte_buf_array, table.order_key_bucket_array);  
18        } else {  
19            unsafe.putLong(lv_1_buf + put_pos * 7L, value);  
20        }  
21    }  
22}

分桶线程虽然只给了两个,但是分桶依旧快于其他线程(四组线程耗时分别是:23.5+,23+,22+,22.5+),于是在分桶线程等待获取 Long 数组时,让分桶线程分担一部分写操作。

 1parse_task = work_parse_tasks.poll();  
2if (parse_task == null) {  
3    WriteTask write_task = WriteThread.work_write_tasks.poll();  
4    if (write_task == null) {  
5        Thread.sleep(1);  
6    } else {  
7        write_task.write();  
8        WriteThread.free_write_tasks.offer(write_task);  
9    }  
10    continue;  
11}

落盘

落盘线程为共享线程,数量为 5 个,落盘时拉取到写 Task 后使用 FileChannel 写入到对应的文件,落盘结束后将每个桶的大小记录到 Meta 文件。

查询

查询阶段 load 时将桶信息从 Meta 文件中恢复到内存。

查询采用 N*3 个线程并行查询,即每次查询请求附带两个辅助查询线程配合主查询线程并行查询,线程内先对读取的数据进行桶排序,桶排序完成后在主线程使用快速选择定位查询结果,线程间通讯采用阻塞队列进行通信。经过多线程处理和桶排序后,快速选择起到的提升作用不明显,和直接 Arrays.sort 区别不大。

可探索的改进

一桶一文件的劣势非常明显,需要初始化大量文件(2048 * 4),需要写大量小文件,查询时相比于多桶一文件的 4096 分桶需要多读取一倍的数据,改成多桶一个文件可以充分利用傲腾强大的随机读取能力,将查询性能提升一倍。

总结

  • 拆分解析 Long 与分桶线程,以减少分桶线程数提高缓存命中,同时留给其他线程组更多线程使线程资源分配更合理。

  • 解析 Long 时循环展开以减少分支指令带来的开销

  • 使用临时缓存以增加分桶速度提升缓存命中

  • 缓存复制时根据数据均匀特性帮助分支预测

  • 分桶线程分担写任务以更大发挥 PMem 多线程写能力

  • 数据压缩提升写入速度

  • 多线程查询提升 IO 利用率

源码地址:https://github.com/wangkaish/ali_race2021_r2_adb

END -

「技术分享」某种程度上,是让作者和读者,不那么孤独的东西。欢迎关注我的微信公众号:「Kirito的技术分享」


浏览 35
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报