TDSQL for PG 并行框架原理解析

腾讯云数据库

共 17954字,需浏览 36分钟

 ·

2024-07-17 21:36

并行方式简介

查询并行

查询并行是指将一个查询分解为多个子查询,在多个处理器上同时执行这些子查询。查询并行通常用于处理计算密集型和IO密集型的查询,例如,涉及多个表连接、聚合、表扫描等操作的查询。查询并行可以有效地提高查询性能,因为每个处理器只需要处理查询的一部分。

这种并行方式在传统数据库中使用比较多,比如Oracle、PostgreSQL,TDSQL for PG 也采用的是这种并行方式。这种方式的好处是能将查询任务分解为多个任务,分布在多个处理器(甚至跨服务器的处理器)上并行执行,最终通过 Gather 节点将结果汇总。

相比其他的并行方式,查询并行的调度更简单,正因为如此,资源的使用效率不是最高的。另外,这种并行方式需要在处理器之间传输和同步数据,系统开销较大。

pipeline并行

管道 (pipeline) 并行是指将一个操作的输出作为另一个操作的输入,这样多个操作可以同时进行。这种并行方式通常用于数据库查询处理中的多个阶段,例如,从磁盘读取数据、过滤数据、排序数据等。

pipeline并行可以提高资源利用率,因为 pipeline 中的各个阶段、pipeline 之间可以并行、异步执行,而不是等待前一个阶段完成。

ClickHouse、Doris 等使用的就是这种并行方式。pipeline 并行的好处是能充分的利用资源,结合线程池技术,可以非常精细的调度任务,目的是提升数据处理的吞吐量。

但是这种并行方式不够灵活,因为每个处理阶段的输入输出是固定的,限制了处理阶段之间的交互和协作,同时还需要管理和协调好各个处理阶段,提升了调度的复杂度。与之对应的是 DAG(Directed Acyclic Graph) 方式,典型的产品就是 Spark。

任务并行

任务并行是指在多个处理器上同时执行不同的任务。这种并行方式通常用于处理多个独立的查询或事务。任务并行可以提高系统的吞吐量,因为多个查询或事务可以同时进行。

TDSQL for PG 的后台任务,比如 autovacuum、checkpointer 等就是这种并行方式,任务之间独立执行,互不干扰。

数据并行

数据并行是指在多个处理器上同时对数据集的不同部分执行相同的操作。这通常是通过将数据划分为多个分区来实现的,每个处理器负责处理一个分区。

数据并行可以有效地提高查询性能,因为每个处理器只需要处理数据的一部分。通常来说,上面的并行方式都会结合数据并行来执行。

指令并行

本文指的指令并行是利用SIMD指令的并行,SIMD指令可以减少分支预测的开销,提高内存访问的局部性、cache的命中率。数据库中的排序算法可以利用 SIMD 指令进行并行比较和交换,join 也可以使用 SIMD 进行并行的匹配,最常用的是压缩和编码用 SIMD 提升性能。

TDSQL for PG 主要使用了查询并行、数据并行、任务并行这几种方式,本文重点要分析的是查询并行的框架和原理。

并行框架概述

TDSQL for PG 并行框架总体流程

在并行框架中有三种进程角色,分部是 server 进程,backend 进程(也称作 leader 进程)和 Background Worker 进程。
  • server 进程是资源调度进程,负责进程的分配
  • backend 是并行任务的发起进程,负责并行执行环境的初始化,也负责通过 Gather 和 GatherMerge 节点汇总结果
  • Background Worker 进程是任务的具体执行者,并返回结果给backend 进程。

执行的流程跟单进程时一样,都会依次调用 CreateQueryDesc(), ExecutorStart() , ExecutorRun(), ExecutorFinish(), ExecutorEnd() 函数。

区别在于 Background Worker 需要先从动态共享内存中恢复执行需要的环境,以及执行结束后清理动态内存。

TDSQL for PG 的并行框架主要流程如下图所示:

1.  Client 连接到 server 以后 server 进程为其创建一个 backend 进程,banckend 进程在生成执行计划的过程中识别出是否需要并行执行,如果能并行执行就会创建 Background Worker 进程。

2.  如果并行执行,backend 进程先调用ExecInitParallelPlan()函数初始化并行执行需要的环境。
包括执行计划的序列化(ExecSerializePlan()),动态共享内存初始化InitializeParallelDSM(), 动态共享内存初始化又包含动态共享内存段的创建,library、GUC、snapshot 等的序列化和拷贝。
3.  接着后端进程调用LaunchParallelWorkers()注册 Background Worker。
注册的方式是调用RegisterDynamicBackgroundWorker()查找可用的 Background Worker 槽位,如果找到就向 server 进程发送PMSIGNAL_BACKGROUND_WORKER_CHANGE信号。
4.  server 进程处理信号(sigusr1_handler())
调用BackgroundWorkerStateChange() 遍历所有的 Background Worker 槽位,找到刚注册的槽位,实例化一个RegisteredBgWorker并 push 到全局变量中。
5.  接下来 server 进程调用maybe_start_bgworkers()遍历BackgroundWorkerList
为里面的每个RegisteredBgWorker fork进程。fork 出来的进程执行ParallelWorkerMain(),ParallelWorkerMain()就是 background worker 的入口函数。
并行框架的使用的大致流程如下:
/* 进入并行模式,阻止不安全的状态修改 */
    
EnterParallelMode();

/* 创建并行执行的上下文,并插入到全局链表 pcxt_list 中 */
pcxt = CreateParallelContext();

/* 估算变量占用的 DSM 的大小,包括变量本身的大小和 key 的大学. */
shm_toc_estimate_chunk(&pcxt->estimator, size);
shm_toc_estimate_keys(&pcxt->estimator, keys);

/* 创建 DSM 并拷贝数据 */
InitializeParallelDSM(pcxt);

/* 在 DSM 中申请空间. */
space = shm_toc_allocate(pcxt->toc, size);
shm_toc_insert(pcxt->toc, key, space);

/* 注册 background worker */
LaunchParallelWorkers(pcxt);

/* 执行并行任务(计划) */

/* 等待并行 worker 执行结束 */

/* 读取共享内存中的结果 */

DestroyParallelContext(pcxt);

/* 退出并行执行模式 */
ExitParallelMode();

通信机制

并行执行避免不了进程或线程之间的通信,TDSQL for PG 的并行框架采用的是进程模型,主要用到了两种通信机制,一个是信号,一个是共享内存。

1.  信号

信号主要是控制流,在并行框架中,后端进程注册 background worker 时向 server 进程发送信号SendPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE)),server 进程调用 sigusr1_handler()处理信号,并创建 background worker 进程。

当 background worker 执行结束,会通过信号通知 backend 进程。

2.  动态共享内存

在 TDSQL for PG 的并行框架中,动态共享内存主要用来传递状态和数据。
从 backend 进程传递给 background worker 的状态主要有执行计划、GUC、事务信息、snapshot 信息等,这部分使用的动态共享内存在启动并行执行的初始化阶段调用InitializeParallelDSM()来完成。
数据主要是从 background Worker 返回给 backend 的执行结果和错误信息,这些结果通过基于共享内存的消息队列shm_mq来传递,也就是 tuple queue, 每个 background Worker 和 backend 之间都有一个消息队列,是多对一的关系。
ExecParallelCreateReaders()函数负责为每个background Worker 创建 tuple queue 。同样的也会创建多对一的错误消息队列,用于 background Worker 传递具体的错误信息给 backend。
对于普通的 SELECT 语句,background Worker 写数据到 tuple queue,backend 进程从 tuple queue 中读取结果。
TDSQL for PG 还实现了 INSERT 和 UPDATE 的并行执行,此时 background Worker 通过共享内存中的变量把结果传给 backend 进程,而不需要通过 tuple queue。

关键数据结构分析

ParallelContext

typedef struct ParallelContext
    
{
dlist_node node; /* 双向链表的挂载点 */
SubTransactionId subid; /* 调用GetCurrentSubTransactionId获取子事务ID */
int nworkers; /* 计划的Worker数量 */
int nworkers_launched; /* 实际发起的Worker数量 */
bool leader_participate; /* 主进程是否参与执行 */
char *library_name; /* 库的名称,一般是postgres */
char *function_name; /* background Worker的执行函数,用户自定义,select对应的是ParallelQueryMain */
ErrorContextCallback *error_context_stack; /* 错误上下文栈 */
shm_toc_estimator estimator; /* 共享内存大小估算 */
dsm_segment *seg; /* 动态共享内存的状态信息 */
void *private_memory; /* 动态共享内存申请失败后回退到非并行执行是使用的内存。*/
shm_toc *toc; /* Shared Memory Table of Contents */
ParallelWorkerInfo *worker; /* 是一个数组,每个Worker一个,记录Worker的信息 */
int nknown_attached_workers; /* attach到error queue的Worker数量 */
bool *known_attached_workers; /* 数组,标记每个Worker attach的状态 */
} ParallelContext;

次创建ParallelContext后都会插入到双向链表pcxt_list中,这个双向链表用于记录活跃的并行上下文。

ParallelExecutorInfo

typedef struct ParallelExecutorInfo
    
{
PlanState *planstate; /* plan subtree we're running in parallel */
ParallelContext *pcxt; /* parallel context we're using */
BufferUsage *buffer_usage; /* points to bufusage area in DSM */
uint64 *processed_count; /* processed tuple count area in DSM */
SharedExecutorInstrumentation *instrumentation; /* optional */
struct SharedJitInstrumentation *jit_instrumentation; /* optional */
dsa_area *area; /* points to DSA area in DSM */
dsa_pointer param_exec; /* serialized PARAM_EXEC parameters */
bool finished; /* set true by ExecParallelFinish */
/* These two arrays have pcxt->nworkers_launched entries: */
shm_mq_handle **tqueue; /* tuple queues for worker output */
struct TupleQueueReader **reader; /* tuple reader/writer support */
} ParallelExecutorInfo;

这个数据结构记录了并行执行时的各种信息,由函数mq_bytes_readmq_bytes_written按 8 bytes 读写,必须用 memory barrier 同步。

shm_mq

struct shm_mq
{
        slock_t mq_mutex;
PGPROC *mq_receiver;
PGPROC *mq_sender;
pg_atomic_uint64 mq_bytes_read;
pg_atomic_uint64 mq_bytes_written;
Size mq_ring_size;
bool mq_detached;
uint8 mq_ring_offset;
        char mq_ring[FLEXIBLE_ARRAY_MEMBER];
};

共享内存中的队列。

 mq_receivermq_bytes_read只能被 receiver 改变。同理mq_sendermq_bytes_writte 只能被 sender 改变。

 mq_receivermq_sendermq_mutex保护,一旦设置就不能改变,所以设置以后可以无锁的读。

 mq_bytes_readmq_bytes_written按 8 bytes 读写,必须用 memory barrier 同步。

shm_mq_handle

struct shm_mq_handle
    
{
shm_mq *mqh_queue;
dsm_segment *mqh_segment;
BackgroundWorkerHandle *mqh_handle;
char *mqh_buffer;
Size mqh_buflen;
Size mqh_consume_pending;
Size mqh_send_pending;
Size mqh_partial_bytes;
Size mqh_expected_bytes;
bool mqh_length_word_complete;
bool mqh_counterparty_attached;
MemoryContext mqh_context;
};
用于管理共享队列的数据结构。

● mqh_queue

指向关联的消息队列

● mqh_segment

指向包含该消息队列的动态共享内存

● mqh_handle

与该消息队列绑定的后台工作进程,由shm_mq_attach()绑定。

● mqh_buffer

对于超过ring buffer大小的数据,或者出现了回卷的数据,就把队列中的chunk拷贝到mqh_buffer。

● mqh_buflen

mqh_buflen 是mqh_buffer的长度。

● mqh_consume_pending

mqh_consume_pending超过环形缓冲区1/4大小时,说明数据已经消费掉了,需要更新共享内存中的数据。
● mqh_send_pending

已经写到queue中,但是还没有更新到共享内存的数据大小。只有当数据大小超过 ring buffer 的 1/4,或者tuple queue慢了的时候才更新共享内存。

●mqh_partial_bytes、mqh_expected_bytes、and mqh_length_word_complete

这三个变量用来跟踪非阻塞操作的状态,记录的是length word的发送情况。
当调用者尝试非阻塞操作时,但是返回了SHM_MQ_WOULD_BLOCK那么需要稍后用相同的参数重新调用这个参数,所以需要记录状态还有多少数据没有被发送。
发送数据时shm_mq_sendv()),先发送要发送的字节数nbytes(类型是Size),mqh_length_word_complete 就是记录nbytes的几个字节是否都发送完了。
此时 mqh_partial_bytes 表示已经发生了几个字节,也可以用于记录 payload 发送了多少字节。
● mqh_length_word_complete
用于跟踪是否完整的接收或者发送了所有的数据。
mqh_partial_bytes 记录了读或者写了多少bytes数据,而mqh_expected_bytes 只记录期望读的负载数据的总大小。

● mqh_counterparty_attached

用于记录对手(sender 或者 receiver)是否已经挂载到queue。从而不必要的 mutex 申请。

● mqh_context

shm_mq_handle所在的上下文,所有内存的申请都要在这个上下文内进行。


关键函数分析

ExecInitParallelPlan()

该函数主要初始化并行执行需要的一些基础信息,在并行的发起节点调用,比如Gather、GatherMerge、RemoteFragment(分布式场景下也支持节点内并行)等。

这个函数的核心流程如下:

     

ParallelExecutorInfo *
    
ExecInitParallelPlan(PlanState *planstate, EState *estate,
Bitmapset *sendParams, int nworkers,
int64 tuples_needed)
{
...

/* 序列化执行计划 */
pstmt_data = ExecSerializePlan(planstate->plan, estate);

/* 为返回值申请空间 */
pei = palloc0(sizeof(ParallelExecutorInfo));

/* 创建并行框架上下文 */
pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
pei->pcxt = pcxt;

/* 动态共享内存大小估算,为每个需要传递给background worker的变量估算内存大小,包括执行计划、BufferUsage、tuple queues等。*/
shm_toc_estimate_chunk(&pcxt->estimator, ...);
shm_toc_estimate_keys(&pcxt->estimator, 1);
...

/* 为每个可并行的node估算其需要的共享内存大小 */
ExecParallelEstimate(planstate, &e);

...

/* 为DSA估算空间。DSA的大小可以在执行过程中改变,所以可能会更新的状态放到这个区域。DSA会绑定多个DSM,当DSA大小不够时,可以创建新的DSM。*/
shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
shm_toc_estimate_keys(&pcxt->estimator, 1);

/* 为并行框架建立动态共享内存段,并将Worker需要的状态 copy 到共享内存。*/
InitializeParallelDSM(pcxt);

/* 在DSM中为并行执行需要的状态信息申请共享内存并插入到toc中。*/
shm_toc_allocate(pcxt->toc, ...);
shm_toc_insert(pcxt->toc, ..., ...);
...

/* 为每个Worker创建一个tuple queue,用于leader和Worker之间传递执行结果。*/
pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);

...

/* 遍历planstate中所有的node,为其初始化共享内存,把状态信息拷贝的共享内存。*/
ExecParallelInitializeDSM(planstate, &d);

...

return pei;
}

ExecSerializePlan

用于执行计划的序列化,序列化以后放入共享内存,传递给background worker,再经过反序列化。
static char *
    
ExecSerializePlan(Plan *plan, EState *estate)
{
/* 实际调用copyObjectImpl()对执行计划中的算子、表达式进行深度拷贝,会递归调用一些列以“_copy”开头的函数。*/
plan = copyObject(plan);

/* 复制PlannedStmt,复制background worker必要的信息,最后序列化返回*/
pstmt = makeNode(PlannedStmt);
...
return nodeToString(pstmt);
}

CreateParallelContext

ParallelContext *
    
CreateParallelContext(const char *library_name, const char *function_name, int nworkers);

创建并行框架的上下文,library_name 是要加载的库的名称,通常为“postgres”, function_name 是并行执行函数的名称。

在background worker进程的入口函数ParallelWorkerMain()中会通过这个函数名从library中加载函数并执行,nworkers 是并行执行的进程数。

ExecParallelSetupTupleQueues()

Worker 进程执行完 plan segment 后,结果通过共享内存消息队列传递给 leader 进程,这个函数就是为每个 worker 创建一个共享队列shm_mq_handle

     

static shm_mq_handle **
    
ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
{
/* 为每个worker的shm_mq_handle申请内存 */
responseq = (shm_mq_handle **)
palloc(pcxt->nworkers * sizeof(shm_mq_handle *));

/* 如果不需要重新初始化,那么就在DSM中为每一个worker的tuple queue申请空间;否则就直接查找。 */
if (!reinitialize)
tqueuespace =
shm_toc_allocate(pcxt->toc, mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
else
tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);

/* 为每个 worker 创建消息队列,并将 leader 进程设置为接收者,然后将 mq、dsm_segment 关联起来。*/
for (i = 0; i < pcxt->nworkers; ++i)
{
shm_mq *mq;

mq = shm_mq_create(tqueuespace +
((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
(Size) PARALLEL_TUPLE_QUEUE_SIZE);

shm_mq_set_receiver(mq, MyProc);
responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
}

/* 插入到toc中, 方便在 background worker 中查表恢复 */
if (!reinitialize)
shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);

return responseq;
}

LaunchParallelWorkers()

发起background worker的函数,主要是调用以下代码来完成:RegisterDynamicBackgroundWorker()

     

void
    
LaunchParallelWorkers(ParallelContext *pcxt)
{
BackgroundWorker worker;
...

/* 谁发起worker谁就是leader */
BecomeLockGroupLeader();

...

// 注册worker
for (i = 0; i < pcxt->nworkers; ++i)
{
memcpy(worker.bgw_extra, &i, sizeof(int));
if (!any_registrations_failed &&
RegisterDynamicBackgroundWorker(&worker,
&pcxt->worker[i].bgwhandle))
{
                        /* 注册成功 */
}
else
{
/* 当超过了max_worker_processes的限制,则注册失败。设置any_registrations_failed = true,防止继续注册。
any_registrations_failed = true;
...
}

...

}

RegisterDynamicBackgroundWorker()

这个函数主要是从BackgroundWorkerData中获取一个可用的BackgroundWorkerSlot将其设置为已经占用。

然后给 server 发送一个PMSIGNAL_BACKGROUND_WORKER_CHANGE信号,通知server ,background worker 的状态有变化。

此时 server 遍历BackgroundWorkerSlot找到刚注册的 background worker,为其创建进程。

     

bool
    
RegisterDynamicBackgroundWorker(BackgroundWorker *worker,
BackgroundWorkerHandle **handle)
{
...

for (slotno = 0; slotno < BackgroundWorkerData->total_slots; ++slotno)
{

if (!slot->in_use)
{
...
}
}

/* If we found a slot, tell the postmaster to notice the change. */
if (success)
SendPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE);

if (success && handle)
{
*handle = palloc(sizeof(BackgroundWorkerHandle));
(*handle)->slot = slotno;
(*handle)->generation = generation;
}
}

ParallelWorkerMain

background worker 进程的入口函数,属于并行框架内固定的函数。
由这个函数调用实际的执行函数,对于select、update、Insert 语句,执行函数就是ParallelQueryMain()对于并行创建索引,执行函数就是_bt_parallel_build_main()
调用CreateParallelContext()创建ParallelContext时,执行函数的名称作为参数传递,例如CreateParallelContext("postgres", "ParallelQueryMain", nworkers)
这个函数的主要任务就是从共享内存中反操作读取信息,准备 background worker 执行需要的环境。

     

void
    
ParallelWorkerMain(Datum main_arg)
{
...

/* attach 共享内存,读取 toc中的内容 */
seg = dsm_attach(DatumGetUInt32(main_arg));
toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));

/* 注册退出时的回调函数 */
on_shmem_exit(ParallelWorkerShutdown, (Datum) 0);

/* 设置错误消息队列,将当前worker设为发送者 */
error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
mq = (shm_mq *) (error_queue_space +
ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
shm_mq_set_sender(mq, MyProc);
mqh = shm_mq_attach(mq, seg, NULL);

/* 从库中查找background worker要执行的函数,例如ParallelQueryMain。*/
entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false);
library_name = entrypointstate;
function_name = entrypointstate + strlen(library_name) + 1;

entrypt = LookupParallelWorkerFunction(library_name, function_name);

...

/* 多次调用 shm_toc_lookup(shm_toc *toc, uint64 key, bool noError), 从 toc 中读取状态、参数等 */
...

/* 执行 ParallelQueryMain、_bt_parallel_build_main 等。*/
entrypt(seg, toc);

/* 退出并行模式 */
ExitParallelMode();

PopActiveSnapshot();

EndParallelWorkerTransaction();

DetachSession();
...
}

ParallelQueryMain

并行 query 的入口函数,在ParallelWorkerMain()中被调用。

     

void
    
ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
{
...

/* 设置tuple的接收者 */
receiver = ExecParallelGetReceiver(seg, toc);

/* 反序列化 PlannedStmt,ParamListInfo,SQL,并创建 QueryDesc */
queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);

...

ExecutorStart(queryDesc, fpes->eflags);

...

/* 初始化 PlanState,根据node类型调用不同的ExecXXXInitializeWorker();*/
ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);

...

ExecutorRun(queryDesc,
ForwardScanDirection,
fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed,
true);

....

ExecutorEnd(queryDesc);

...
}


性能分析与优化

使用并行框架能提升执行的效率,但是也带来了额外的开销,经过实际的测试,这个开销大概在5 ~ 10毫秒,也就是说启动平行框架需要5毫秒以上。其中这个开销的主要有以下几部分组成:

1.  序列化

序列化包括 plan,planstate,snap,GUC、library 等。GUC 的序列化耗时最高。
GUC序列化耗时包括两部分,一部分是遍历所有的 GUC 参数,估算参数占用的内存大小,一部分是将所有参数序列化。因为 TDSQL for PG 的参数有超过 800 个,内存大小估算耗时大概 100 微秒,序列化耗时更高。
GUC参数占用内存的大小是不变的。因此可以不用每次启动并行框架时计算一次,可以放在系统启动阶段时完成,比如放到build_guc_variables()函数中。
因为每次执行的参数可能会不一样,所以不能在系统启动阶段完成,可以在系统启动时序列化,启动并行框架时判断参数是否有变化,如果有就重新序列化并保存。
2.  动态共享内存的申请
动态共享内存的申请主要耗时点在函数dsm_create()并行框架初始化过程中有两个地方调用这个函数。
一个是 GetSessionDsmHandle() 中调用,用来申请 session 内部进程共享的内存,每个session只需要调用一次;一次是为并行上下文申请共享内存。每一次调用100微秒以上。
3.  进程间数据传输
background worker 进程将结果传递给 leader 进程的耗时也不可避免,跟数据量成正比。
调整shmq 的缓冲区大小并不能提升性能。因此需要结合优化器把这部分代价也加入到执行计划之中。


-- 更多精彩 --

技术干货丨TDSQL 列存引擎 LibraDB 计算模型的设计与思考


技术干货丨 TDSQL for MySQL DDL执行框架


分布式数据库时代,需要什么样的产品?



浏览 99
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报