TDSQL for PG 并行框架原理解析
共 17954字,需浏览 36分钟
·
2024-07-17 21:36
并行方式简介
查询并行
查询并行是指将一个查询分解为多个子查询,在多个处理器上同时执行这些子查询。查询并行通常用于处理计算密集型和IO密集型的查询,例如,涉及多个表连接、聚合、表扫描等操作的查询。查询并行可以有效地提高查询性能,因为每个处理器只需要处理查询的一部分。
这种并行方式在传统数据库中使用比较多,比如Oracle、PostgreSQL,TDSQL for PG 也采用的是这种并行方式。这种方式的好处是能将查询任务分解为多个任务,分布在多个处理器(甚至跨服务器的处理器)上并行执行,最终通过 Gather 节点将结果汇总。
相比其他的并行方式,查询并行的调度更简单,正因为如此,资源的使用效率不是最高的。另外,这种并行方式需要在处理器之间传输和同步数据,系统开销较大。
pipeline并行
管道 (pipeline) 并行是指将一个操作的输出作为另一个操作的输入,这样多个操作可以同时进行。这种并行方式通常用于数据库查询处理中的多个阶段,例如,从磁盘读取数据、过滤数据、排序数据等。
pipeline并行可以提高资源利用率,因为 pipeline 中的各个阶段、pipeline 之间可以并行、异步执行,而不是等待前一个阶段完成。
ClickHouse、Doris 等使用的就是这种并行方式。pipeline 并行的好处是能充分的利用资源,结合线程池技术,可以非常精细的调度任务,目的是提升数据处理的吞吐量。
但是这种并行方式不够灵活,因为每个处理阶段的输入输出是固定的,限制了处理阶段之间的交互和协作,同时还需要管理和协调好各个处理阶段,提升了调度的复杂度。与之对应的是 DAG(Directed Acyclic Graph) 方式,典型的产品就是 Spark。
任务并行
任务并行是指在多个处理器上同时执行不同的任务。这种并行方式通常用于处理多个独立的查询或事务。任务并行可以提高系统的吞吐量,因为多个查询或事务可以同时进行。
TDSQL for PG 的后台任务,比如 autovacuum、checkpointer 等就是这种并行方式,任务之间独立执行,互不干扰。
数据并行
数据并行是指在多个处理器上同时对数据集的不同部分执行相同的操作。这通常是通过将数据划分为多个分区来实现的,每个处理器负责处理一个分区。
数据并行可以有效地提高查询性能,因为每个处理器只需要处理数据的一部分。通常来说,上面的并行方式都会结合数据并行来执行。
指令并行
本文指的指令并行是利用SIMD指令的并行,SIMD指令可以减少分支预测的开销,提高内存访问的局部性、cache的命中率。数据库中的排序算法可以利用 SIMD 指令进行并行比较和交换,join 也可以使用 SIMD 进行并行的匹配,最常用的是压缩和编码用 SIMD 提升性能。
TDSQL for PG 主要使用了查询并行、数据并行、任务并行这几种方式,本文重点要分析的是查询并行的框架和原理。
并行框架概述
TDSQL for PG 并行框架总体流程
-
server 进程是资源调度进程,负责进程的分配 -
backend 是并行任务的发起进程,负责并行执行环境的初始化,也负责通过 Gather 和 GatherMerge 节点汇总结果 -
Background Worker 进程是任务的具体执行者,并返回结果给backend 进程。
执行的流程跟单进程时一样,都会依次调用 CreateQueryDesc(), ExecutorStart() , ExecutorRun(), ExecutorFinish(), ExecutorEnd() 函数。
区别在于 Background Worker 需要先从动态共享内存中恢复执行需要的环境,以及执行结束后清理动态内存。
TDSQL for PG 的并行框架主要流程如下图所示:
1. Client 连接到 server 以后 server 进程为其创建一个 backend 进程,banckend 进程在生成执行计划的过程中识别出是否需要并行执行,如果能并行执行就会创建 Background Worker 进程。
/* 进入并行模式,阻止不安全的状态修改 */
EnterParallelMode();
/* 创建并行执行的上下文,并插入到全局链表 pcxt_list 中 */
pcxt = CreateParallelContext();
/* 估算变量占用的 DSM 的大小,包括变量本身的大小和 key 的大学. */
shm_toc_estimate_chunk(&pcxt->estimator, size);
shm_toc_estimate_keys(&pcxt->estimator, keys);
/* 创建 DSM 并拷贝数据 */
InitializeParallelDSM(pcxt);
/* 在 DSM 中申请空间. */
space = shm_toc_allocate(pcxt->toc, size);
shm_toc_insert(pcxt->toc, key, space);
/* 注册 background worker */
LaunchParallelWorkers(pcxt);
/* 执行并行任务(计划) */
/* 等待并行 worker 执行结束 */
/* 读取共享内存中的结果 */
DestroyParallelContext(pcxt);
/* 退出并行执行模式 */
ExitParallelMode();
通信机制
并行执行避免不了进程或线程之间的通信,TDSQL for PG 的并行框架采用的是进程模型,主要用到了两种通信机制,一个是信号,一个是共享内存。
1. 信号
信号主要是控制流,在并行框架中,后端进程注册 background worker 时向 server 进程发送信号(SendPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE)),server 进程调用 sigusr1_handler()处理信号,并创建 background worker 进程。
当 background worker 执行结束,会通过信号通知 backend 进程。
2. 动态共享内存
关键数据结构分析
ParallelContext
typedef struct ParallelContext
{
dlist_node node; /* 双向链表的挂载点 */
SubTransactionId subid; /* 调用GetCurrentSubTransactionId获取子事务ID */
int nworkers; /* 计划的Worker数量 */
int nworkers_launched; /* 实际发起的Worker数量 */
bool leader_participate; /* 主进程是否参与执行 */
char *library_name; /* 库的名称,一般是postgres */
char *function_name; /* background Worker的执行函数,用户自定义,select对应的是ParallelQueryMain */
ErrorContextCallback *error_context_stack; /* 错误上下文栈 */
shm_toc_estimator estimator; /* 共享内存大小估算 */
dsm_segment *seg; /* 动态共享内存的状态信息 */
void *private_memory; /* 动态共享内存申请失败后回退到非并行执行是使用的内存。*/
shm_toc *toc; /* Shared Memory Table of Contents */
ParallelWorkerInfo *worker; /* 是一个数组,每个Worker一个,记录Worker的信息 */
int nknown_attached_workers; /* attach到error queue的Worker数量 */
bool *known_attached_workers; /* 数组,标记每个Worker attach的状态 */
} ParallelContext;
每次创建ParallelContext后都会插入到双向链表pcxt_list中,这个双向链表用于记录活跃的并行上下文。
ParallelExecutorInfo
typedef struct ParallelExecutorInfo
{
PlanState *planstate; /* plan subtree we're running in parallel */
ParallelContext *pcxt; /* parallel context we're using */
BufferUsage *buffer_usage; /* points to bufusage area in DSM */
uint64 *processed_count; /* processed tuple count area in DSM */
SharedExecutorInstrumentation *instrumentation; /* optional */
struct SharedJitInstrumentation *jit_instrumentation; /* optional */
dsa_area *area; /* points to DSA area in DSM */
dsa_pointer param_exec; /* serialized PARAM_EXEC parameters */
bool finished; /* set true by ExecParallelFinish */
/* These two arrays have pcxt->nworkers_launched entries: */
shm_mq_handle **tqueue; /* tuple queues for worker output */
struct TupleQueueReader **reader; /* tuple reader/writer support */
} ParallelExecutorInfo;
这个数据结构记录了并行执行时的各种信息,由函数mq_bytes_read和mq_bytes_written按 8 bytes 读写,必须用 memory barrier 同步。
shm_mq
struct shm_mq
{
slock_t mq_mutex;
PGPROC *mq_receiver;
PGPROC *mq_sender;
pg_atomic_uint64 mq_bytes_read;
pg_atomic_uint64 mq_bytes_written;
Size mq_ring_size;
bool mq_detached;
uint8 mq_ring_offset;
char mq_ring[FLEXIBLE_ARRAY_MEMBER];
};
共享内存中的队列。
● mq_receiver和mq_bytes_read只能被 receiver 改变。同理,mq_sender和mq_bytes_writte 只能被 sender 改变。
● mq_receiver和mq_sender受mq_mutex保护,一旦设置就不能改变,所以设置以后可以无锁的读。
● mq_bytes_read 和 mq_bytes_written按 8 bytes 读写,必须用 memory barrier 同步。
shm_mq_handle
struct shm_mq_handle
{
shm_mq *mqh_queue;
dsm_segment *mqh_segment;
BackgroundWorkerHandle *mqh_handle;
char *mqh_buffer;
Size mqh_buflen;
Size mqh_consume_pending;
Size mqh_send_pending;
Size mqh_partial_bytes;
Size mqh_expected_bytes;
bool mqh_length_word_complete;
bool mqh_counterparty_attached;
MemoryContext mqh_context;
};
● mqh_queue
指向关联的消息队列
● mqh_segment
指向包含该消息队列的动态共享内存
● mqh_handle
与该消息队列绑定的后台工作进程,由shm_mq_attach()绑定。
对于超过ring buffer大小的数据,或者出现了回卷的数据,就把队列中的chunk拷贝到mqh_buffer。
● mqh_buflen
mqh_buflen 是mqh_buffer的长度。
● mqh_consume_pending
已经写到queue中,但是还没有更新到共享内存的数据大小。只有当数据大小超过 ring buffer 的 1/4,或者tuple queue慢了的时候才更新共享内存。
●mqh_partial_bytes、mqh_expected_bytes、and mqh_length_word_complete
● mqh_counterparty_attached
用于记录对手(sender 或者 receiver)是否已经挂载到queue。从而不必要的 mutex 申请。
● mqh_context
关键函数分析
ExecInitParallelPlan()
该函数主要初始化并行执行需要的一些基础信息,在并行的发起节点调用,比如Gather、GatherMerge、RemoteFragment(分布式场景下也支持节点内并行)等。
这个函数的核心流程如下:
ParallelExecutorInfo *
ExecInitParallelPlan(PlanState *planstate, EState *estate,
Bitmapset *sendParams, int nworkers,
int64 tuples_needed)
{
...
/* 序列化执行计划 */
pstmt_data = ExecSerializePlan(planstate->plan, estate);
/* 为返回值申请空间 */
pei = palloc0(sizeof(ParallelExecutorInfo));
/* 创建并行框架上下文 */
pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
pei->pcxt = pcxt;
/* 动态共享内存大小估算,为每个需要传递给background worker的变量估算内存大小,包括执行计划、BufferUsage、tuple queues等。*/
shm_toc_estimate_chunk(&pcxt->estimator, ...);
shm_toc_estimate_keys(&pcxt->estimator, 1);
...
/* 为每个可并行的node估算其需要的共享内存大小 */
ExecParallelEstimate(planstate, &e);
...
/* 为DSA估算空间。DSA的大小可以在执行过程中改变,所以可能会更新的状态放到这个区域。DSA会绑定多个DSM,当DSA大小不够时,可以创建新的DSM。*/
shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
shm_toc_estimate_keys(&pcxt->estimator, 1);
/* 为并行框架建立动态共享内存段,并将Worker需要的状态 copy 到共享内存。*/
InitializeParallelDSM(pcxt);
/* 在DSM中为并行执行需要的状态信息申请共享内存并插入到toc中。*/
shm_toc_allocate(pcxt->toc, ...);
shm_toc_insert(pcxt->toc, ..., ...);
...
/* 为每个Worker创建一个tuple queue,用于leader和Worker之间传递执行结果。*/
pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
...
/* 遍历planstate中所有的node,为其初始化共享内存,把状态信息拷贝的共享内存。*/
ExecParallelInitializeDSM(planstate, &d);
...
return pei;
}
ExecSerializePlan
static char *
ExecSerializePlan(Plan *plan, EState *estate)
{
/* 实际调用copyObjectImpl()对执行计划中的算子、表达式进行深度拷贝,会递归调用一些列以“_copy”开头的函数。*/
plan = copyObject(plan);
/* 复制PlannedStmt,复制background worker必要的信息,最后序列化返回*/
pstmt = makeNode(PlannedStmt);
...
return nodeToString(pstmt);
}
CreateParallelContext
ParallelContext *
CreateParallelContext(const char *library_name, const char *function_name, int nworkers);
创建并行框架的上下文,library_name 是要加载的库的名称,通常为“postgres”, function_name 是并行执行函数的名称。
在background worker进程的入口函数ParallelWorkerMain()中会通过这个函数名从library中加载函数并执行,nworkers 是并行执行的进程数。
ExecParallelSetupTupleQueues()
Worker 进程执行完 plan segment 后,结果通过共享内存消息队列传递给 leader 进程,这个函数就是为每个 worker 创建一个共享队列shm_mq_handle。
static shm_mq_handle **
ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
{
/* 为每个worker的shm_mq_handle申请内存 */
responseq = (shm_mq_handle **)
palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
/* 如果不需要重新初始化,那么就在DSM中为每一个worker的tuple queue申请空间;否则就直接查找。 */
if (!reinitialize)
tqueuespace =
shm_toc_allocate(pcxt->toc, mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
else
tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
/* 为每个 worker 创建消息队列,并将 leader 进程设置为接收者,然后将 mq、dsm_segment 关联起来。*/
for (i = 0; i < pcxt->nworkers; ++i)
{
shm_mq *mq;
mq = shm_mq_create(tqueuespace +
((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
(Size) PARALLEL_TUPLE_QUEUE_SIZE);
shm_mq_set_receiver(mq, MyProc);
responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
}
/* 插入到toc中, 方便在 background worker 中查表恢复 */
if (!reinitialize)
shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
return responseq;
}
LaunchParallelWorkers()
发起background worker的函数,主要是调用以下代码来完成:RegisterDynamicBackgroundWorker()
void
LaunchParallelWorkers(ParallelContext *pcxt)
{
BackgroundWorker worker;
...
/* 谁发起worker谁就是leader */
BecomeLockGroupLeader();
...
// 注册worker
for (i = 0; i < pcxt->nworkers; ++i)
{
memcpy(worker.bgw_extra, &i, sizeof(int));
if (!any_registrations_failed &&
RegisterDynamicBackgroundWorker(&worker,
&pcxt->worker[i].bgwhandle))
{
/* 注册成功 */
}
else
{
/* 当超过了max_worker_processes的限制,则注册失败。设置any_registrations_failed = true,防止继续注册。
any_registrations_failed = true;
...
}
...
}
RegisterDynamicBackgroundWorker()
这个函数主要是从BackgroundWorkerData中获取一个可用的BackgroundWorkerSlot,将其设置为已经占用。
然后给 server 发送一个PMSIGNAL_BACKGROUND_WORKER_CHANGE信号,通知server ,background worker 的状态有变化。
此时 server 遍历BackgroundWorkerSlot,找到刚注册的 background worker,为其创建进程。
bool
RegisterDynamicBackgroundWorker(BackgroundWorker *worker,
BackgroundWorkerHandle **handle)
{
...
for (slotno = 0; slotno < BackgroundWorkerData->total_slots; ++slotno)
{
if (!slot->in_use)
{
...
}
}
/* If we found a slot, tell the postmaster to notice the change. */
if (success)
SendPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE);
if (success && handle)
{
*handle = palloc(sizeof(BackgroundWorkerHandle));
(*handle)->slot = slotno;
(*handle)->generation = generation;
}
}
ParallelWorkerMain
void
ParallelWorkerMain(Datum main_arg)
{
...
/* attach 共享内存,读取 toc中的内容 */
seg = dsm_attach(DatumGetUInt32(main_arg));
toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
/* 注册退出时的回调函数 */
on_shmem_exit(ParallelWorkerShutdown, (Datum) 0);
/* 设置错误消息队列,将当前worker设为发送者 */
error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
mq = (shm_mq *) (error_queue_space +
ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
shm_mq_set_sender(mq, MyProc);
mqh = shm_mq_attach(mq, seg, NULL);
/* 从库中查找background worker要执行的函数,例如ParallelQueryMain。*/
entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false);
library_name = entrypointstate;
function_name = entrypointstate + strlen(library_name) + 1;
entrypt = LookupParallelWorkerFunction(library_name, function_name);
...
/* 多次调用 shm_toc_lookup(shm_toc *toc, uint64 key, bool noError), 从 toc 中读取状态、参数等 */
...
/* 执行 ParallelQueryMain、_bt_parallel_build_main 等。*/
entrypt(seg, toc);
/* 退出并行模式 */
ExitParallelMode();
PopActiveSnapshot();
EndParallelWorkerTransaction();
DetachSession();
...
}
ParallelQueryMain
并行 query 的入口函数,在ParallelWorkerMain()中被调用。
void
ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
{
...
/* 设置tuple的接收者 */
receiver = ExecParallelGetReceiver(seg, toc);
/* 反序列化 PlannedStmt,ParamListInfo,SQL,并创建 QueryDesc */
queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
...
ExecutorStart(queryDesc, fpes->eflags);
...
/* 初始化 PlanState,根据node类型调用不同的ExecXXXInitializeWorker();*/
ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
...
ExecutorRun(queryDesc,
ForwardScanDirection,
fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed,
true);
....
ExecutorEnd(queryDesc);
...
}
性能分析与优化
使用并行框架能提升执行的效率,但是也带来了额外的开销,经过实际的测试,这个开销大概在5 ~ 10毫秒,也就是说启动平行框架需要5毫秒以上。其中这个开销的主要有以下几部分组成:
1. 序列化
﹀
﹀
﹀
技术干货丨TDSQL 列存引擎 LibraDB 计算模型的设计与思考
技术干货丨 TDSQL for MySQL DDL执行框架
分布式数据库时代,需要什么样的产品?