本文简单介绍了PG插入数据部分的源码,主要内容包括ExecutorRun函数和standard_ExecutorRun函数的实现逻辑,这两个函数均位于execMain.c文件中。
值得一提的是:
1、解读方式:采用自底向上的方式,也就是从调用栈(调用栈请参加第一篇文章)的底层往上逐层解读,建议按此顺序阅读;
2、问题处理:上面几篇解读并不深入,或者说只是浮于表面,但随着调用栈的逐步解读,信息会慢慢浮现,需要耐心和坚持
一、基础信息
ExecutorRun、standard_ExecutorRun函数使用的数据结构、宏定义以及依赖的函数等。
数据结构/宏定义
1、QueryDesc
//查询结构体
//结构体中包含了执行查询所需要的所有信息
typedef struct QueryDesc
{
CmdType operation;
PlannedStmt *plannedstmt;
const char *sourceText;
Snapshot snapshot;
Snapshot crosscheck_snapshot;
DestReceiver *dest;
ParamListInfo params;
QueryEnvironment *queryEnv;
int instrument_options;
TupleDesc tupDesc;
EState *estate;
PlanState *planstate;
bool already_executed;
struct Instrumentation *totaltime;
} QueryDesc;
//快照指针
typedef struct SnapshotData *Snapshot;
#define InvalidSnapshot ((Snapshot) NULL)
typedef bool (*SnapshotSatisfiesFunc) (HeapTuple htup,
Snapshot snapshot, Buffer buffer);
typedef struct SnapshotData
{
SnapshotSatisfiesFunc satisfies;
TransactionId xmin;
TransactionId xmax;
TransactionId *xip;
uint32 xcnt;
TransactionId *subxip;
int32 subxcnt;
bool suboverflowed;
bool takenDuringRecovery;
bool copied;
CommandId curcid;
uint32 speculativeToken;
uint32 active_count;
uint32 regd_count;
pairingheap_node ph_node;
TimestampTz whenTaken;
XLogRecPtr lsn;
} SnapshotData;//存储快照的数据结构
//已Planned的Statement
//也就是说已生成了执行计划的语句
typedef struct PlannedStmt
{
NodeTag type;
CmdType commandType;
uint64 queryId;
bool hasReturning;
bool hasModifyingCTE;
bool canSetTag;
bool transientPlan;
bool dependsOnRole;
bool parallelModeNeeded;
int jitFlags;
struct Plan *planTree;
List *rtable;
List *resultRelations;
List *nonleafResultRelations;
List *rootResultRelations;
List *subplans;
Bitmapset *rewindPlanIDs;
List *rowMarks;
List *relationOids;
List *invalItems;
List *paramExecTypes;
Node *utilityStmt;
int stmt_location;
int stmt_len;
} PlannedStmt;
//参数列表信息
typedef struct ParamListInfoData
{
ParamFetchHook paramFetch;
void *paramFetchArg;
ParamCompileHook paramCompile;
void *paramCompileArg;
ParserSetupHook parserSetup;
void *parserSetupArg;
int numParams;
ParamExternData params[FLEXIBLE_ARRAY_MEMBER];
} ParamListInfoData;
typedef struct ParamListInfoData *ParamListInfo;
//查询环境,使用List存储相关信息
struct QueryEnvironment
{
List *namedRelList;
};
//TODO
typedef struct Instrumentation
{
bool need_timer;
bool need_bufusage;
bool running;
instr_time starttime;
instr_time counter;
double firsttuple;
double tuplecount;
BufferUsage bufusage_start;
double startup;
double total;
double ntuples;
double ntuples2;
double nloops;
double nfiltered1;
double nfiltered2;
BufferUsage bufusage;
} Instrumentation;
依赖的函数
1、InstrStartNode
void
InstrStartNode(Instrumentation *instr)
{
if (instr->need_timer)
{
if (INSTR_TIME_IS_ZERO(instr->starttime))
INSTR_TIME_SET_CURRENT(instr->starttime);
else
elog(ERROR, "InstrStartNode called twice in a row");
}
if (instr->need_bufusage)
instr->bufusage_start = pgBufferUsage;
}
2、ScanDirectionIsNoMovement
//简单判断
#define ScanDirectionIsNoMovement(direction) \
((bool) ((direction) == NoMovementScanDirection))
3、ExecutePlan
//上一节已解读
4、InstrStopNode
//TODO Instrumentation 的理解
void
InstrStopNode(Instrumentation *instr, double nTuples)
{
instr_time endtime;
instr->tuplecount += nTuples;
if (instr->need_timer)
{
if (INSTR_TIME_IS_ZERO(instr->starttime))
elog(ERROR, "InstrStopNode called without start");
INSTR_TIME_SET_CURRENT(endtime);
INSTR_TIME_ACCUM_DIFF(instr->counter, endtime, instr->starttime);
INSTR_TIME_SET_ZERO(instr->starttime);
}
if (instr->need_bufusage)
BufferUsageAccumDiff(&instr->bufusage,
&pgBufferUsage, &instr->bufusage_start);
if (!instr->running)
{
instr->running = true;
instr->firsttuple = INSTR_TIME_GET_DOUBLE(instr->counter);
}
}
5、MemoryContextSwitchTo
#ifndef FRONTEND
static inline MemoryContext
MemoryContextSwitchTo(MemoryContext context)
{
MemoryContext old = CurrentMemoryContext;
CurrentMemoryContext = context;
return old;
}
#endif
二、源码解读
void
ExecutorRun(QueryDesc *queryDesc,
ScanDirection direction, uint64 count,
bool execute_once)
{
if (ExecutorRun_hook)//如果有钩子函数,则执行钩子函数
(*ExecutorRun_hook) (queryDesc, direction, count, execute_once);
else//否则执行标准函数
standard_ExecutorRun(queryDesc, direction, count, execute_once);
}
//标准函数
void
standard_ExecutorRun(QueryDesc *queryDesc,
ScanDirection direction, uint64 count, bool execute_once)
{
EState *estate;//执行器状态信息
CmdType operation;//命令类型,这里是INSERT
DestReceiver *dest;//目标接收器
bool sendTuples;//是否需要传输Tuples
MemoryContext oldcontext;//原内存上下文(PG自己的内存管理器)
Assert(queryDesc != NULL);
estate = queryDesc->estate;//获取执行器状态
Assert(estate != NULL);
Assert(!(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY));
oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);//切换至当前查询上下文,切换前保存原上下文
if (queryDesc->totaltime)//需要计时?如Oracle在sqlplus中设置set timing on的计时
InstrStartNode(queryDesc->totaltime);//
operation = queryDesc->operation;//操作类型
dest = queryDesc->dest;//目标端
estate->es_processed = 0;//进度
estate->es_lastoid = InvalidOid;//最后一个Oid
sendTuples = (operation == CMD_SELECT ||
queryDesc->plannedstmt->hasReturning);//查询语句或者需要返回值的才需要传输Tuples
if (sendTuples)
dest->rStartup(dest, operation, queryDesc->tupDesc);//启动目标端的接收器
if (!ScanDirectionIsNoMovement(direction))//需要扫描
{
if (execute_once && queryDesc->already_executed)
elog(ERROR, "can't re-execute query flagged for single execution");
queryDesc->already_executed = true;
ExecutePlan(estate,
queryDesc->planstate,
queryDesc->plannedstmt->parallelModeNeeded,
operation,
sendTuples,
count,
direction,
dest,
execute_once);//执行
}
if (sendTuples)
dest->rShutdown(dest);//关闭目标端的接收器
if (queryDesc->totaltime)
InstrStopNode(queryDesc->totaltime, estate->es_processed);//完成计时
MemoryContextSwitchTo(oldcontext);//执行完毕,切换回原内存上下文
}
三、跟踪分析
插入测试数据:
testdb=# -- #8 ExecutorRun&standard_ExecutorRun
testdb=# -- 获取pid
testdb=# select pg_backend_pid();
pg_backend_pid
----------------
1529
(1 row)
testdb=# -- 插入1行
testdb=# insert into t_insert values(16,'ExecutorRun/standard_ExecutorRun','ExecutorRun/standard_ExecutorRun','ExecutorRun/standard_ExecutorRun');
(挂起)
启动gdb,跟踪调试:
[root@localhost ~]# gdb -p 3294
GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-100.el7
Copyright (C) 2013 Free Software Foundation, Inc.
...
(gdb) b standard_ExecutorRun
Breakpoint 1 at 0x690d09: file execMain.c, line 322.
(gdb) c
Continuing.
Breakpoint 1, standard_ExecutorRun (queryDesc=0x2c2d4e0, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:322
322 estate = queryDesc->estate;
#查看参数
#1、queryDesc
(gdb) p *queryDesc
$1 = {operation = CMD_INSERT, plannedstmt = 0x2cc1488,
sourceText = 0x2c09ef0 "insert into t_insert values(16,'ExecutorRun/standard_ExecutorRun','ExecutorRun/standard_ExecutorRun','ExecutorRun/standard_ExecutorRun');", snapshot = 0x2c866e0,
crosscheck_snapshot = 0x0, dest = 0x2cc15e8, params = 0x0, queryEnv = 0x0, instrument_options = 0, tupDesc = 0x2c309d0, estate = 0x2c2f900, planstate = 0x2c2fc50, already_executed = false,
totaltime = 0x0}
(gdb) p *(queryDesc->plannedstmt)
$2 = {type = T_PlannedStmt, commandType = CMD_INSERT, queryId = 0, hasReturning = false, hasModifyingCTE = false, canSetTag = true, transientPlan = false, dependsOnRole = false,
parallelModeNeeded = false, jitFlags = 0, planTree = 0x2cc10f8, rtable = 0x2cc13b8, resultRelations = 0x2cc1458, nonleafResultRelations = 0x0, rootResultRelations = 0x0, subplans = 0x0,
rewindPlanIDs = 0x0, rowMarks = 0x0, relationOids = 0x2cc1408, invalItems = 0x0, paramExecTypes = 0x2c2f590, utilityStmt = 0x0, stmt_location = 0, stmt_len = 136}
(gdb) p *(queryDesc->snapshot)
$3 = {satisfies = 0x9f73fc <HeapTupleSatisfiesMVCC>, xmin = 1612874, xmax = 1612874, xip = 0x0, xcnt = 0, subxip = 0x0, subxcnt = 0, suboverflowed = false, takenDuringRecovery = false, copied = true,
curcid = 0, speculativeToken = 0, active_count = 1, regd_count = 2, ph_node = {first_child = 0x0, next_sibling = 0x0, prev_or_parent = 0x0}, whenTaken = 0, lsn = 0}
(gdb) p *(queryDesc->dest)
$4 = {receiveSlot = 0x4857ad <printtup>, rStartup = 0x485196 <printtup_startup>, rShutdown = 0x485bad <printtup_shutdown>, rDestroy = 0x485c21 <printtup_destroy>, mydest = DestRemote}
(gdb) p *(queryDesc->tupDesc)
$5 = {natts = 0, tdtypeid = 2249, tdtypmod = -1, tdhasoid = false, tdrefcount = -1, constr = 0x0, attrs = 0x2c309f0}
(gdb) p *(queryDesc->estate)
$6 = {type = T_EState, es_direction = ForwardScanDirection, es_snapshot = 0x2c866e0, es_crosscheck_snapshot = 0x0, es_range_table = 0x2cc13b8, es_plannedstmt = 0x2cc1488,
es_sourceText = 0x2c09ef0 "insert into t_insert values(16,'ExecutorRun/standard_ExecutorRun','ExecutorRun/standard_ExecutorRun','ExecutorRun/standard_ExecutorRun');", es_junkFilter = 0x0,
es_output_cid = 0, es_result_relations = 0x2c2fb40, es_num_result_relations = 1, es_result_relation_info = 0x0, es_root_result_relations = 0x0, es_num_root_result_relations = 0,
es_tuple_routing_result_relations = 0x0, es_trig_target_relations = 0x0, es_trig_tuple_slot = 0x2c30ab0, es_trig_oldtup_slot = 0x0, es_trig_newtup_slot = 0x0, es_param_list_info = 0x0,
es_param_exec_vals = 0x2c2fb10, es_queryEnv = 0x0, es_query_cxt = 0x2c2f7f0, es_tupleTable = 0x2c30500, es_rowMarks = 0x0, es_processed = 0, es_lastoid = 0, es_top_eflags = 0, es_instrument = 0,
es_finished = false, es_exprcontexts = 0x2c2feb0, es_subplanstates = 0x0, es_auxmodifytables = 0x0, es_per_tuple_exprcontext = 0x0, es_epqTuple = 0x0, es_epqTupleSet = 0x0, es_epqScanDone = 0x0,
es_use_parallel_mode = false, es_query_dsa = 0x0, es_jit_flags = 0, es_jit = 0x0}
(gdb) p *(queryDesc->planstate)
$7 = {type = T_ModifyTableState, plan = 0x2cc10f8, state = 0x2c2f900, ExecProcNode = 0x69a78b <ExecProcNodeFirst>, ExecProcNodeReal = 0x6c2485 <ExecModifyTable>, instrument = 0x0,
worker_instrument = 0x0, qual = 0x0, lefttree = 0x0, righttree = 0x0, initPlan = 0x0, subPlan = 0x0, chgParam = 0x0, ps_ResultTupleSlot = 0x2c30a00, ps_ExprContext = 0x0, ps_ProjInfo = 0x0,
scandesc = 0x0}
#2、direction
(gdb) p direction
$8 = ForwardScanDirection
#3、count
(gdb) p count
$9 = 0
#4、execute_once
(gdb) p execute_once
$10 = true
#单步调试执行
(gdb) next
330 oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
(gdb)
333 if (queryDesc->totaltime)
#MemoryContext是PG中很重要的内存管理数据结构,需深入理解
(gdb) p *oldcontext
$11 = {type = T_AllocSetContext, isReset = false, allowInCritSection = false, methods = 0xb8c720 <AllocSetMethods>, parent = 0x2c6f380, firstchild = 0x2c2f7f0, prevchild = 0x0, nextchild = 0x0,
name = 0xb8d2f1 "PortalContext", ident = 0x2c72e98 "", reset_cbs = 0x0}
(gdb) p *(estate->es_query_cxt)
$12 = {type = T_AllocSetContext, isReset = false, allowInCritSection = false, methods = 0xb8c720 <AllocSetMethods>, parent = 0x2c2d3d0, firstchild = 0x2cbce60, prevchild = 0x0, nextchild = 0x0,
name = 0xb1a840 "ExecutorState", ident = 0x0, reset_cbs = 0x0}
(gdb) next
339 operation = queryDesc->operation;
(gdb)
340 dest = queryDesc->dest;
(gdb)
345 estate->es_processed = 0;
(gdb)
346 estate->es_lastoid = InvalidOid;
(gdb)
348 sendTuples = (operation == CMD_SELECT ||
(gdb)
349 queryDesc->plannedstmt->hasReturning);
(gdb)
348 sendTuples = (operation == CMD_SELECT ||
(gdb)
351 if (sendTuples)
(gdb)
357 if (!ScanDirectionIsNoMovement(direction))
(gdb)
359 if (execute_once && queryDesc->already_executed)
(gdb)
361 queryDesc->already_executed = true;
(gdb)
363 ExecutePlan(estate,
(gdb)
365 queryDesc->plannedstmt->parallelModeNeeded,
(gdb)
363 ExecutePlan(estate,
(gdb)
377 if (sendTuples)
(gdb)
380 if (queryDesc->totaltime)
(gdb)
383 MemoryContextSwitchTo(oldcontext);
(gdb)
384 }
(gdb)
ExecutorRun (queryDesc=0x2c2d4e0, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:307
307 }
(gdb)
#DONE!
四、小结
1、PG的扩展性:PG提供了钩子函数,可以对ExecutorRun进行Hack;
2、重要的数据结构:MemoryContext,内存上下文,需深入理解。
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
软考中级精品资料免费领
- 历年真题答案解析
- 备考技巧名师总结
- 高频考点精准押题
- 资料下载
- 历年真题
193.9 KB下载数265
191.63 KB下载数245
143.91 KB下载数1148
183.71 KB下载数642
644.84 KB下载数2756
相关文章
发现更多好内容- Java 和 Golang 在性能方面有哪些差异?(Java与Golang的性能差异)
- Java 中带参方法和无参方法的差异究竟体现在哪些方面?(java有参和无参的区别是什么)
- 如何在 Java 中创建 Date 对象?(java怎么创建date对象)
- 如何利用 Java Milo 开展网络编程?(如何使用Java Milo进行网络编程)
- 如何高效使用Redis客户端进行故障排查
- 如何使用 getresources 获取文件系统资源?(getresources如何获取文件系统资源)
- 如何利用 Java 的多线程提升效率?(Java的多线程如何提高效率 )
- Java 中 file.exists 方法在不同操作系统下的表现及差异(java file.exists在不同操作系统下)
- 在 Java 中,究竟什么是线程安全?(java中什么是线程安全)
- Java 中 toString 方法该如何使用呢?(java中tostring方法如何使用)