本篇内容介绍了“PostgreSQL中PortalRun->PortalRunSelect函数的实现逻辑是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
一、数据结构
Portal
对于Portals(客户端请求),有几种执行策略,具体取决于要执行什么查询。
(注意:无论什么情况下,一个Portal只执行一个source-SQL查询,因此从用户的角度来看只产生一个结果。
typedef enum PortalStrategy
{
PORTAL_ONE_SELECT,
PORTAL_ONE_RETURNING,
PORTAL_ONE_MOD_WITH,
PORTAL_UTIL_SELECT,
PORTAL_MULTI_QUERY
} PortalStrategy;
typedef enum PortalStatus
{
PORTAL_NEW,
PORTAL_DEFINED,
PORTAL_READY,
PORTAL_ACTIVE,
PORTAL_DONE,
PORTAL_FAILED
} PortalStatus;
typedef struct PortalData *Portal;//结构体指针
typedef struct PortalData
{
const char *name;
const char *prepStmtName;
MemoryContext portalContext;
ResourceOwner resowner;
void (*cleanup) (Portal portal);
SubTransactionId createSubid;
SubTransactionId activeSubid;
//portal将会执行的查询
const char *sourceText;
const char *commandTag;
List *stmts;
CachedPlan *cplan;
ParamListInfo portalParams;
QueryEnvironment *queryEnv;
PortalStrategy strategy;
int cursorOptions;
bool run_once;
PortalStatus status;
bool portalPinned;
bool autoHeld;
//如不为NULL,执行器处于活动状态
QueryDesc *queryDesc;
//如Portal需要返回元组,这是元组的描述
TupleDesc tupDesc;
//列信息的格式码
int16 *formats;
Tuplestorestate *holdStore;
MemoryContext holdContext;
Snapshot holdSnapshot;
bool atStart;//处于开始位置?
bool atEnd;//处于结束位置?
uint64 portalPos;//实际行号
//用于表示的数据,主要由pg_cursors系统视图使用
TimestampTz creation_time;
bool visible;
} PortalData;
#define PortalIsValid(p) PointerIsValid(p)
QueryDesc
QueryDesc封装了执行器执行查询所需的所有内容。
typedef struct QueryDesc
{
//以下变量由CreateQueryDesc函数设置
CmdType operation;
PlannedStmt *plannedstmt;
const char *sourceText;
Snapshot snapshot;
Snapshot crosscheck_snapshot;
DestReceiver *dest;
ParamListInfo params;
QueryEnvironment *queryEnv;
int instrument_options;
//以下变量由ExecutorStart函数设置
TupleDesc tupDesc;
EState *estate;
PlanState *planstate;
//以下变量由ExecutorRun设置
bool already_executed;
//内核设置为NULL,可由插件修改
struct Instrumentation *totaltime;
} QueryDesc;
二、源码解读
PortalRun->PortalRunSelect函数执行以PORTAL_ONE_SELECT模式运行的SQL.
static uint64
PortalRunSelect(Portal portal,
bool forward,
long count,
DestReceiver *dest)
{
QueryDesc *queryDesc;
ScanDirection direction;
uint64 nprocessed;
queryDesc = portal->queryDesc;
//确保queryDescbuweiNULL或者持有提取的数据
Assert(queryDesc || portal->holdStore);
if (queryDesc)
queryDesc->dest = dest;//设置dest
if (forward)//前向
{
if (portal->atEnd || count <= 0)
{
//已到末尾或者行计数小于等于0
direction = NoMovementScanDirection;
count = 0;
}
else
direction = ForwardScanDirection;//前向扫描
//在executor中,count=0意味着提取所有行
if (count == FETCH_ALL)
count = 0;
if (portal->holdStore)
//持有提取后的数据游标
nprocessed = RunFromStore(portal, direction, (uint64) count, dest);
else
{
//没有持有游标(数据)
PushActiveSnapshot(queryDesc->snapshot);//快照入栈
ExecutorRun(queryDesc, direction, (uint64) count,
portal->run_once);//开始执行
nprocessed = queryDesc->estate->es_processed;//结果行数
PopActiveSnapshot();//快照出栈
}
if (!ScanDirectionIsNoMovement(direction))//扫描方向可移动
{
if (nprocessed > 0)//扫描行数>0
portal->atStart = false;
if (count == 0 || nprocessed < (uint64) count)
//count为0或者行数小于传入的计数器
portal->atEnd = true;
portal->portalPos += nprocessed;//位置移动(+处理行数)
}
}
else//非前向(后向)
{
if (portal->cursorOptions & CURSOR_OPT_NO_SCROLL)//如游标不可移动,报错
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cursor can only scan forward"),
errhint("Declare it with SCROLL option to enable backward scan.")));
if (portal->atStart || count <= 0)
{
//处于开始或者count小于等于0
direction = NoMovementScanDirection;
count = 0;
}
else
//往后扫描
direction = BackwardScanDirection;
//参见forward=T的注释
if (count == FETCH_ALL)
count = 0;
if (portal->holdStore)
nprocessed = RunFromStore(portal, direction, (uint64) count, dest);
else
{
PushActiveSnapshot(queryDesc->snapshot);
ExecutorRun(queryDesc, direction, (uint64) count,
portal->run_once);
nprocessed = queryDesc->estate->es_processed;
PopActiveSnapshot();
}
if (!ScanDirectionIsNoMovement(direction))
{
if (nprocessed > 0 && portal->atEnd)
{
portal->atEnd = false;
portal->portalPos++;
}
if (count == 0 || nprocessed < (uint64) count)
{
portal->atStart = true;
portal->portalPos = 0;
}
else
{
portal->portalPos -= nprocessed;
}
}
}
return nprocessed;
}
static uint64
RunFromStore(Portal portal, ScanDirection direction, uint64 count,
DestReceiver *dest)
{
uint64 current_tuple_count = 0;
TupleTableSlot *slot;//元组表slot
slot = MakeSingleTupleTableSlot(portal->tupDesc);
dest->rStartup(dest, CMD_SELECT, portal->tupDesc);//目标启动
if (ScanDirectionIsNoMovement(direction))//无法移动
{
//不需要做任何事情
}
else
{
bool forward = ScanDirectionIsForward(direction);//是否前向扫描
for (;;)//循环
{
MemoryContext oldcontext;//内存上下文
bool ok;
oldcontext = MemoryContextSwitchTo(portal->holdContext);//切换至相应的内存上下文
ok = tuplestore_gettupleslot(portal->holdStore, forward, false,
slot);//获取元组
MemoryContextSwitchTo(oldcontext);//切换回原上下文
if (!ok)
break;//如出错,则跳出循环
if (!dest->receiveSlot(slot, dest))
break;
ExecClearTuple(slot);//执行清理
current_tuple_count++;
if (count && count == current_tuple_count)
break;
}
}
dest->rShutdown(dest);//关闭目标端
ExecDropSingleTupleTableSlot(slot);//清除slot
return current_tuple_count;//返回行数
}
void
ExecutorRun(QueryDesc *queryDesc,
ScanDirection direction, uint64 count,
bool execute_once)
{
if (ExecutorRun_hook)
(*ExecutorRun_hook) (queryDesc, direction, count, execute_once);//钩子函数
else
standard_ExecutorRun(queryDesc, direction, count, execute_once);//标准函数
}
void
standard_ExecutorRun(QueryDesc *queryDesc,
ScanDirection direction, uint64 count, bool execute_once)
{
EState *estate;//全局执行状态
CmdType operation;//命令类型
DestReceiver *dest;//接收器
bool sendTuples;//是否需要传输元组
MemoryContext oldcontext;//内存上下文
Assert(queryDesc != NULL);//校验queryDesc不能为NULL
estate = queryDesc->estate;//获取执行状态
Assert(estate != NULL);//执行状态不能为NULL
Assert(!(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY));//eflags标记不能为EXEC_FLAG_EXPLAIN_ONLY
oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
//允许全程instrumentation
if (queryDesc->totaltime)
InstrStartNode(queryDesc->totaltime);
operation = queryDesc->operation;
dest = queryDesc->dest;
estate->es_processed = 0;
estate->es_lastoid = InvalidOid;
sendTuples = (operation == CMD_SELECT ||
queryDesc->plannedstmt->hasReturning);
if (sendTuples)//如需发送元组
dest->rStartup(dest, operation, queryDesc->tupDesc);
if (!ScanDirectionIsNoMovement(direction))//如非ScanDirectionIsNoMovement
{
if (execute_once && queryDesc->already_executed)//校验
elog(ERROR, "can't re-execute query flagged for single execution");
queryDesc->already_executed = true;//修改标记
ExecutePlan(estate,
queryDesc->planstate,
queryDesc->plannedstmt->parallelModeNeeded,
operation,
sendTuples,
count,
direction,
dest,
execute_once);//执行Plan
}
if (sendTuples)
dest->rShutdown(dest);
if (queryDesc->totaltime)//收集时间
InstrStopNode(queryDesc->totaltime, estate->es_processed);
MemoryContextSwitchTo(oldcontext);//切换内存上下文
}
三、跟踪分析
测试脚本如下
testdb=# explain select dw.*,grjf.grbh,grjf.xm,grjf.ny,grjf.je
testdb-# from t_dwxx dw,lateral (select gr.grbh,gr.xm,jf.ny,jf.je
testdb(# from t_grxx gr inner join t_jfxx jf
testdb(# on gr.dwbh = dw.dwbh
testdb(# and gr.grbh = jf.grbh) grjf
testdb-# order by dw.dwbh;
QUERY PLAN
------------------------------------------------------------------------------------------
Sort (cost=20070.93..20320.93 rows=100000 width=47)
Sort Key: dw.dwbh
-> Hash Join (cost=3754.00..8689.61 rows=100000 width=47)
Hash Cond: ((gr.dwbh)::text = (dw.dwbh)::text)
-> Hash Join (cost=3465.00..8138.00 rows=100000 width=31)
Hash Cond: ((jf.grbh)::text = (gr.grbh)::text)
-> Seq Scan on t_jfxx jf (cost=0.00..1637.00 rows=100000 width=20)
-> Hash (cost=1726.00..1726.00 rows=100000 width=16)
-> Seq Scan on t_grxx gr (cost=0.00..1726.00 rows=100000 width=16)
-> Hash (cost=164.00..164.00 rows=10000 width=20)
-> Seq Scan on t_dwxx dw (cost=0.00..164.00 rows=10000 width=20)
(11 rows)
启动gdb,设置断点,进入PortalRunSelect
(gdb) b PortalRunSelect
Breakpoint 1 at 0x8cc0e8: file pquery.c, line 888.
(gdb) c
Continuing.
Breakpoint 1, PortalRunSelect (portal=0x1af2468, forward=true, count=9223372036854775807, dest=0x1b74668) at pquery.c:888
warning: Source file is more recent than executable.
888 queryDesc = portal->queryDesc;
(gdb)
查看输入参数portal&dest,forward为T表示前向扫描
portal:未命名的Portal,holdStore为NULL,atStart = true, atEnd = false, portalPos = 0
dest:接收器slot为printtup
(gdb) p *portal
$1 = {name = 0x1af5e90 "", prepStmtName = 0x0, portalContext = 0x1b795d0, resowner = 0x1abde80,
cleanup = 0x6711b6 <PortalCleanup>, createSubid = 1, activeSubid = 1,
sourceText = 0x1a8ceb8 "select dw.*,grjf.grbh,grjf.xm,grjf.ny,grjf.je \nfrom t_dwxx dw,lateral (select gr.grbh,gr.xm,jf.ny,jf.je \n", ' ' <repeats 24 times>, "from t_grxx gr inner join t_jfxx jf \n", ' ' <repeats 34 times>...,
commandTag = 0xc5eed5 "SELECT", stmts = 0x1b74630, cplan = 0x0, portalParams = 0x0, queryEnv = 0x0,
strategy = PORTAL_ONE_SELECT, cursorOptions = 4, run_once = true, status = PORTAL_ACTIVE, portalPinned = false,
autoHeld = false, queryDesc = 0x1b796e8, tupDesc = 0x1b867d8, formats = 0x1b79780, holdStore = 0x0, holdContext = 0x0,
holdSnapshot = 0x0, atStart = true, atEnd = false, portalPos = 0, creation_time = 595566906253867, visible = false}
(gdb) p *dest
$2 = {receiveSlot = 0x48cc00 <printtup>, rStartup = 0x48c5c1 <printtup_startup>, rShutdown = 0x48d02e <printtup_shutdown>,
rDestroy = 0x48d0a7 <printtup_destroy>, mydest = DestRemote}
校验并设置dest
(gdb) n
891 Assert(queryDesc || portal->holdStore);
(gdb)
899 if (queryDesc)
(gdb)
900 queryDesc->dest = dest;
前向扫描
(gdb) n
913 if (forward)
(gdb)
915 if (portal->atEnd || count <= 0)
进入ExecutorRun
...
(gdb)
932 ExecutorRun(queryDesc, direction, (uint64) count,
(gdb) step
ExecutorRun (queryDesc=0x1b796e8, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:304
warning: Source file is more recent than executable.
304 if (ExecutorRun_hook)
进入standard_ExecutorRun
(gdb) n
307 standard_ExecutorRun(queryDesc, direction, count, execute_once);
(gdb) step
standard_ExecutorRun (queryDesc=0x1b796e8, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:321
321 Assert(queryDesc != NULL);
standard_ExecutorRun->校验并切换上下文
321 Assert(queryDesc != NULL);
(gdb) n
323 estate = queryDesc->estate;
(gdb)
325 Assert(estate != NULL);
(gdb)
326 Assert(!(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY));
(gdb)
331 oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
(gdb)
standard_ExecutorRun->变量赋值,判断是否需要传输元组
(gdb)
334 if (queryDesc->totaltime)
(gdb) n
340 operation = queryDesc->operation;
(gdb)
341 dest = queryDesc->dest;
(gdb) p operation
$3 = CMD_SELECT
(gdb) n
346 estate->es_processed = 0;
(gdb)
347 estate->es_lastoid = InvalidOid;
(gdb)
349 sendTuples = (operation == CMD_SELECT ||
(gdb)
352 if (sendTuples)
(gdb)
353 dest->rStartup(dest, operation, queryDesc->tupDesc);
(gdb) p sendTuples
$4 = true
(gdb)
standard_ExecutorRun->执行计划(ExecutePlan函数下节介绍)
(gdb) n
358 if (!ScanDirectionIsNoMovement(direction))
(gdb)
360 if (execute_once && queryDesc->already_executed)
(gdb)
362 queryDesc->already_executed = true;
(gdb)
364 ExecutePlan(estate,
(gdb)
standard_ExecutorRun->关闭资源并切换上下文
(gdb)
378 if (sendTuples)
(gdb) n
379 dest->rShutdown(dest);
(gdb)
381 if (queryDesc->totaltime)
(gdb)
384 MemoryContextSwitchTo(oldcontext);
(gdb)
385 }
(gdb)
standard_ExecutorRun->回到PortalRunSelect
(gdb) n
ExecutorRun (queryDesc=0x1b796e8, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:308
308 }
(gdb)
PortalRunSelect (portal=0x1af2468, forward=true, count=0, dest=0x1b74668) at pquery.c:934
934 nprocessed = queryDesc->estate->es_processed;
快照出栈,修改状态atStart/atEnd等
(gdb) n
935 PopActiveSnapshot();
(gdb)
938 if (!ScanDirectionIsNoMovement(direction))
(gdb)
940 if (nprocessed > 0)
(gdb) p nprocessed
$6 = 99991
(gdb) n
941 portal->atStart = false;
(gdb)
942 if (count == 0 || nprocessed < (uint64) count)
(gdb)
完成调用
(gdb) n
943 portal->atEnd = true;
(gdb) p count
$7 = 0
(gdb) n
944 portal->portalPos += nprocessed;
(gdb)
997 return nprocessed;
(gdb)
998 }
(gdb) n
PortalRun (portal=0x1af2468, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0x1b74668, altdest=0x1b74668,
completionTag=0x7ffc5ff58740 "") at pquery.c:780
780 if (completionTag && portal->commandTag)
(gdb) p nprocessed
$8 = 99991
“PostgreSQL中PortalRun->PortalRunSelect函数的实现逻辑是什么”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!