本篇内容主要讲解“PostgreSQL中PortalRunMulti函数和PortalRun函数的实现逻辑是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“PostgreSQL中PortalRunMulti函数和PortalRun函数的实现逻辑是什么”吧!
一、基础信息
PortalRunMulti函数使用的数据结构、宏定义以及依赖的函数等。
数据结构/宏定义
1、Portal
typedef struct PortalData *Portal;
typedef struct PortalData
{
const char *name;
const char *prepStmtName;
MemoryContext portalContext;
ResourceOwner resowner;
void (*cleanup) (Portal portal);
SubTransactionId createSubid;
SubTransactionId activeSubid;
const char *sourceText;
const char *commandTag;
List *stmts;
CachedPlan *cplan;
ParamListInfo portalParams;
QueryEnvironment *queryEnv;
PortalStrategy strategy;
int cursorOptions;
bool run_once;
PortalStatus status;
bool portalPinned;
bool autoHeld;
QueryDesc *queryDesc;
TupleDesc tupDesc;
int16 *formats;
Tuplestorestate *holdStore;
MemoryContext holdContext;
Snapshot holdSnapshot;
bool atStart;
bool atEnd;
uint64 portalPos;
TimestampTz creation_time;
bool visible;
} PortalData;
2、List
typedef struct ListCell ListCell;
typedef struct List
{
NodeTag type;
int length;
ListCell *head;
ListCell *tail;
} List;
struct ListCell
{
union
{
void *ptr_value;
int int_value;
Oid oid_value;
} data;
ListCell *next;
};
3、Snapshot
typedef struct SnapshotData *Snapshot;
typedef struct SnapshotData
{
SnapshotSatisfiesFunc satisfies;
TransactionId xmin;
TransactionId xmax;
TransactionId *xip;
uint32 xcnt;
TransactionId *subxip;
int32 subxcnt;
bool suboverflowed;
bool takenDuringRecovery;
bool copied;
CommandId curcid;
uint32 speculativeToken;
uint32 active_count;
uint32 regd_count;
pairingheap_node ph_node;
TimestampTz whenTaken;
XLogRecPtr lsn;
} SnapshotData;
依赖的函数
1、lfirst_*
#define lnext(lc) ((lc)->next)
#define lfirst(lc) ((lc)->data.ptr_value)
#define lfirst_int(lc) ((lc)->data.int_value)
#define lfirst_oid(lc) ((lc)->data.oid_value)
#define lfirst_node(type,lc) castNode(type, lfirst(lc))
#ifdef USE_ASSERT_CHECKING
static inline Node *
castNodeImpl(NodeTag type, void *ptr)
{
Assert(ptr == NULL || nodeTag(ptr) == type);
return (Node *) ptr;
}
#define castNode(_type_, nodeptr) ((_type_ *) castNodeImpl(T_##_type_, nodeptr))
#else
#define castNode(_type_, nodeptr) ((_type_ *) (nodeptr))
#endif
2、Snapshot相关
//留待MVCC再行解读
GetTransactionSnapshot
RegisterSnapshot
PushCopiedSnapshot
UpdateActiveSnapshotCommandId
PopActiveSnapshot
3、ProcessQuery
//上一节已介绍
4、CommandCounterIncrement
void
CommandCounterIncrement(void)
{
if (currentCommandIdUsed)
{
if (IsInParallelMode() || IsParallelWorker())
elog(ERROR, "cannot start commands during a parallel operation");
currentCommandId += 1;
if (currentCommandId == InvalidCommandId)
{
currentCommandId -= 1;
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("cannot have more than 2^32-2 commands in a transaction")));
}
currentCommandIdUsed = false;
SnapshotSetCommandId(currentCommandId);
AtCCI_LocalCache();
}
}
5、MemoryContextDeleteChildren
void
MemoryContextDeleteChildren(MemoryContext context)
{
AssertArg(MemoryContextIsValid(context));
while (context->firstchild != NULL)
MemoryContextDelete(context->firstchild);
}
二、源码解读
1、PortalRun
bool
PortalRun(Portal portal, long count, bool isTopLevel, bool run_once,
DestReceiver *dest, DestReceiver *altdest,
char *completionTag)
{
bool result;//返回结果
uint64 nprocessed;
ResourceOwner saveTopTransactionResourceOwner;//高层事务资源宿主
MemoryContext saveTopTransactionContext;//内存上下文
Portal saveActivePortal;//活动的Portal
ResourceOwner saveResourceOwner;
MemoryContext savePortalContext;
MemoryContext saveMemoryContext;
AssertArg(PortalIsValid(portal));
TRACE_POSTGRESQL_QUERY_EXECUTE_START();
if (completionTag)
completionTag[0] = '\0';
if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
{
elog(DEBUG3, "PortalRun");
ResetUsage();
}
MarkPortalActive(portal);
Assert(!portal->run_once || run_once);
portal->run_once = run_once;
//保护现场
saveTopTransactionResourceOwner = TopTransactionResourceOwner;
saveTopTransactionContext = TopTransactionContext;
saveActivePortal = ActivePortal;
saveResourceOwner = CurrentResourceOwner;
savePortalContext = PortalContext;
saveMemoryContext = CurrentMemoryContext;
PG_TRY();
{
ActivePortal = portal;
if (portal->resowner)
CurrentResourceOwner = portal->resowner;
PortalContext = portal->portalContext;
MemoryContextSwitchTo(PortalContext);
switch (portal->strategy)
{
case PORTAL_ONE_SELECT:
case PORTAL_ONE_RETURNING:
case PORTAL_ONE_MOD_WITH:
case PORTAL_UTIL_SELECT:
if (portal->strategy != PORTAL_ONE_SELECT && !portal->holdStore)
FillPortalStore(portal, isTopLevel);
nprocessed = PortalRunSelect(portal, true, count, dest);
if (completionTag && portal->commandTag)
{
if (strcmp(portal->commandTag, "SELECT") == 0)
snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
"SELECT " UINT64_FORMAT, nprocessed);
else
strcpy(completionTag, portal->commandTag);
}
portal->status = PORTAL_READY;
result = portal->atEnd;
break;
case PORTAL_MULTI_QUERY://INSERT语句
PortalRunMulti(portal, isTopLevel, false,
dest, altdest, completionTag);
MarkPortalDone(portal);
result = true;
break;
default:
elog(ERROR, "unrecognized portal strategy: %d",
(int) portal->strategy);
result = false;
break;
}
}
PG_CATCH();
{
MarkPortalFailed(portal);
if (saveMemoryContext == saveTopTransactionContext)
MemoryContextSwitchTo(TopTransactionContext);
else
MemoryContextSwitchTo(saveMemoryContext);
ActivePortal = saveActivePortal;
if (saveResourceOwner == saveTopTransactionResourceOwner)
CurrentResourceOwner = TopTransactionResourceOwner;
else
CurrentResourceOwner = saveResourceOwner;
PortalContext = savePortalContext;
PG_RE_THROW();
}
PG_END_TRY();
if (saveMemoryContext == saveTopTransactionContext)
MemoryContextSwitchTo(TopTransactionContext);
else
MemoryContextSwitchTo(saveMemoryContext);
ActivePortal = saveActivePortal;
if (saveResourceOwner == saveTopTransactionResourceOwner)
CurrentResourceOwner = TopTransactionResourceOwner;
else
CurrentResourceOwner = saveResourceOwner;
PortalContext = savePortalContext;
if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
ShowUsage("EXECUTOR STATISTICS");
TRACE_POSTGRESQL_QUERY_EXECUTE_DONE();
return result;
}
2、PortalRunMulti
static void
PortalRunMulti(Portal portal,
bool isTopLevel, bool setHoldSnapshot,
DestReceiver *dest, DestReceiver *altdest,
char *completionTag)
{
bool active_snapshot_set = false;//活跃snapshot?
ListCell *stmtlist_item;//SQL语句,临时变量
if (dest->mydest == DestRemoteExecute)
dest = None_Receiver;
if (altdest->mydest == DestRemoteExecute)
altdest = None_Receiver;
foreach(stmtlist_item, portal->stmts)//循环处理SQL语句
{
PlannedStmt *pstmt = lfirst_node(PlannedStmt, stmtlist_item);//获取已规划的语句
CHECK_FOR_INTERRUPTS();
if (pstmt->utilityStmt == NULL)//非“工具类”语句
{
TRACE_POSTGRESQL_QUERY_EXECUTE_START();
if (log_executor_stats)
ResetUsage();
if (!active_snapshot_set)
{
Snapshot snapshot = GetTransactionSnapshot();//获取事务快照
if (setHoldSnapshot)
{
snapshot = RegisterSnapshot(snapshot);
portal->holdSnapshot = snapshot;
}
PushCopiedSnapshot(snapshot);
active_snapshot_set = true;
}
else
UpdateActiveSnapshotCommandId();
//处理查询
if (pstmt->canSetTag)
{
ProcessQuery(pstmt,
portal->sourceText,
portal->portalParams,
portal->queryEnv,
dest, completionTag);
}
else
{
ProcessQuery(pstmt,
portal->sourceText,
portal->portalParams,
portal->queryEnv,
altdest, NULL);
}
if (log_executor_stats)
ShowUsage("EXECUTOR STATISTICS");
TRACE_POSTGRESQL_QUERY_EXECUTE_DONE();
}
else//“工具类”语句
{
if (pstmt->canSetTag)
{
Assert(!active_snapshot_set);
PortalRunUtility(portal, pstmt, isTopLevel, false,
dest, completionTag);
}
else
{
Assert(IsA(pstmt->utilityStmt, NotifyStmt));
PortalRunUtility(portal, pstmt, isTopLevel, false,
altdest, NULL);
}
}
if (lnext(stmtlist_item) != NULL)
CommandCounterIncrement();
Assert(portal->portalContext == CurrentMemoryContext);
MemoryContextDeleteChildren(portal->portalContext);//释放资源
}
if (active_snapshot_set)
PopActiveSnapshot();
if (completionTag && completionTag[0] == '\0')//操作提示
{
if (portal->commandTag)
strcpy(completionTag, portal->commandTag);
if (strcmp(completionTag, "SELECT") == 0)
sprintf(completionTag, "SELECT 0 0");
else if (strcmp(completionTag, "INSERT") == 0)
strcpy(completionTag, "INSERT 0 0");
else if (strcmp(completionTag, "UPDATE") == 0)
strcpy(completionTag, "UPDATE 0");
else if (strcmp(completionTag, "DELETE") == 0)
strcpy(completionTag, "DELETE 0");
}
}
三、跟踪分析
插入测试数据:
testdb=# -- 获取pid
testdb=# select pg_backend_pid();
pg_backend_pid
----------------
2551
(1 row)
testdb=# -- 插入1行
testdb=# insert into t_insert values(20,'PortalRun','PortalRun','PortalRun');
启动gdb,跟踪调试:
1、PortalRun
[root@localhost ~]# gdb -p 2551
GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-100.el7
Copyright (C) 2013 Free Software Foundation, Inc.
...
(gdb) b PortalRun
Breakpoint 1 at 0x8528af: file pquery.c, line 707.
(gdb) c
Continuing.
Breakpoint 1, PortalRun (portal=0x2c6f490, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0x2ccb4d8, altdest=0x2ccb4d8, completionTag=0x7ffe94ba4940 "") at pquery.c:707
707 if (completionTag)
#查看输入参数
#1、portal
(gdb) p *portal
$1 = {name = 0x2c72e98 "", prepStmtName = 0x0, portalContext = 0x2cc1470, resowner = 0x2c3ad10, cleanup = 0x62f15c <PortalCleanup>, createSubid = 1, activeSubid = 1,
sourceText = 0x2c09ef0 "insert into t_insert values(20,'PortalRun','PortalRun','PortalRun');", commandTag = 0xb50908 "INSERT", stmts = 0x2ccb4a8, cplan = 0x0, portalParams = 0x0, queryEnv = 0x0,
strategy = PORTAL_MULTI_QUERY, cursorOptions = 4, run_once = false, status = PORTAL_READY, portalPinned = false, autoHeld = false, queryDesc = 0x0, tupDesc = 0x0, formats = 0x0, holdStore = 0x0,
holdContext = 0x0, holdSnapshot = 0x0, atStart = true, atEnd = true, portalPos = 0, creation_time = 587033564125509, visible = false}
(gdb) p *(portal->portalContext)
$2 = {type = T_AllocSetContext, isReset = true, allowInCritSection = false, methods = 0xb8c720 <AllocSetMethods>, parent = 0x2c6f380, firstchild = 0x0, prevchild = 0x0, nextchild = 0x0,
name = 0xb8d2f1 "PortalContext", ident = 0x2c72e98 "", reset_cbs = 0x0}
(gdb) p *(portal->resowner)
$3 = {parent = 0x2c35518, firstchild = 0x0, nextchild = 0x0, name = 0xb8d2ff "Portal", bufferarr = {itemsarr = 0x0, invalidval = 0, capacity = 0, nitems = 0, maxitems = 0, lastidx = 0}, catrefarr = {
itemsarr = 0x0, invalidval = 0, capacity = 0, nitems = 0, maxitems = 0, lastidx = 0}, catlistrefarr = {itemsarr = 0x0, invalidval = 0, capacity = 0, nitems = 0, maxitems = 0, lastidx = 0},
relrefarr = {itemsarr = 0x0, invalidval = 0, capacity = 0, nitems = 0, maxitems = 0, lastidx = 0}, planrefarr = {itemsarr = 0x0, invalidval = 0, capacity = 0, nitems = 0, maxitems = 0, lastidx = 0},
tupdescarr = {itemsarr = 0x0, invalidval = 0, capacity = 0, nitems = 0, maxitems = 0, lastidx = 0}, snapshotarr = {itemsarr = 0x0, invalidval = 0, capacity = 0, nitems = 0, maxitems = 0,
lastidx = 0}, filearr = {itemsarr = 0x0, invalidval = 18446744073709551615, capacity = 0, nitems = 0, maxitems = 0, lastidx = 0}, dsmarr = {itemsarr = 0x0, invalidval = 0, capacity = 0,
nitems = 0, maxitems = 0, lastidx = 0}, jitarr = {itemsarr = 0x0, invalidval = 0, capacity = 0, nitems = 0, maxitems = 0, lastidx = 0}, nlocks = 0, locks = {0x0 <repeats 15 times>}}
(gdb) p *(portal->resowner->parent)
$4 = {parent = 0x0, firstchild = 0x2c3ad10, nextchild = 0x0, name = 0xa1b515 "TopTransaction", bufferarr = {itemsarr = 0x0, invalidval = 0, capacity = 0, nitems = 0, maxitems = 0, lastidx = 0},
catrefarr = {itemsarr = 0x2c3b040, invalidval = 0, capacity = 16, nitems = 0, maxitems = 16, lastidx = 4294967295}, catlistrefarr = {itemsarr = 0x0, invalidval = 0, capacity = 0, nitems = 0,
maxitems = 0, lastidx = 0}, relrefarr = {itemsarr = 0x2c35728, invalidval = 0, capacity = 16, nitems = 0, maxitems = 16, lastidx = 4294967295}, planrefarr = {itemsarr = 0x0, invalidval = 0,
capacity = 0, nitems = 0, maxitems = 0, lastidx = 0}, tupdescarr = {itemsarr = 0x0, invalidval = 0, capacity = 0, nitems = 0, maxitems = 0, lastidx = 0}, snapshotarr = {itemsarr = 0x0,
invalidval = 0, capacity = 0, nitems = 0, maxitems = 0, lastidx = 0}, filearr = {itemsarr = 0x0, invalidval = 18446744073709551615, capacity = 0, nitems = 0, maxitems = 0, lastidx = 0}, dsmarr = {
itemsarr = 0x0, invalidval = 0, capacity = 0, nitems = 0, maxitems = 0, lastidx = 0}, jitarr = {itemsarr = 0x0, invalidval = 0, capacity = 0, nitems = 0, maxitems = 0, lastidx = 0}, nlocks = 1,
locks = {0x2c28320, 0x0 <repeats 14 times>}}
#2、count
(gdb) p count
$5 = 9223372036854775807
#3、isTopLevel
(gdb) p isTopLevel
$6 = true
#4、run_once
(gdb) p run_once
$7 = true
#5、dest
$8 = (DestReceiver *) 0x2ccb4d8
(gdb) p *dest
$9 = {receiveSlot = 0x4857ad <printtup>, rStartup = 0x485196 <printtup_startup>, rShutdown = 0x485bad <printtup_shutdown>, rDestroy = 0x485c21 <printtup_destroy>, mydest = DestRemote}
(gdb)
#6、altdest
(gdb) p altdest
$10 = (DestReceiver *) 0x2ccb4d8
(gdb) p *altdest
$11 = {receiveSlot = 0x4857ad <printtup>, rStartup = 0x485196 <printtup_startup>, rShutdown = 0x485bad <printtup_shutdown>, rDestroy = 0x485c21 <printtup_destroy>, mydest = DestRemote}
(gdb)
#7、completionTag
(gdb) p completionTag
$12 = 0x7ffe94ba4940 ""
#单步调试
710 if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
(gdb)
720 MarkPortalActive(portal);
(gdb)
724 portal->run_once = run_once;
(gdb)
740 saveTopTransactionResourceOwner = TopTransactionResourceOwner;
(gdb)
741 saveTopTransactionContext = TopTransactionContext;
(gdb)
742 saveActivePortal = ActivePortal;
(gdb)
743 saveResourceOwner = CurrentResourceOwner;
(gdb)
744 savePortalContext = PortalContext;
(gdb)
745 saveMemoryContext = CurrentMemoryContext;
(gdb)
746 PG_TRY();
(gdb)
748 ActivePortal = portal;
(gdb)
749 if (portal->resowner)
(gdb)
750 CurrentResourceOwner = portal->resowner;
(gdb)
751 PortalContext = portal->portalContext;
(gdb)
753 MemoryContextSwitchTo(PortalContext);
(gdb)
755 switch (portal->strategy)
(gdb) p portal->strategy
$13 = PORTAL_MULTI_QUERY
(gdb) next
799 PortalRunMulti(portal, isTopLevel, false,
(gdb)
803 MarkPortalDone(portal);
(gdb)
806 result = true;
(gdb)
807 break;
(gdb)
835 PG_END_TRY();
(gdb)
837 if (saveMemoryContext == saveTopTransactionContext)
(gdb)
838 MemoryContextSwitchTo(TopTransactionContext);
(gdb)
841 ActivePortal = saveActivePortal;
(gdb)
842 if (saveResourceOwner == saveTopTransactionResourceOwner)
(gdb)
843 CurrentResourceOwner = TopTransactionResourceOwner;
(gdb)
846 PortalContext = savePortalContext;
(gdb)
848 if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
(gdb)
853 return result;
(gdb)
854 }
#DONE!
2、PortalRunMulti
(gdb) b PortalRunMulti
Breakpoint 1 at 0x8533df: file pquery.c, line 1210.
(gdb) c
Continuing.
Breakpoint 1, PortalRunMulti (portal=0x2c6f490, isTopLevel=true, setHoldSnapshot=false, dest=0x2cbe8f8, altdest=0x2cbe8f8, completionTag=0x7ffe94ba4940 "") at pquery.c:1210
1210 bool active_snapshot_set = false;
#输入参数
#1、portal
(gdb) p *portal
$1 = {name = 0x2c72e98 "", prepStmtName = 0x0, portalContext = 0x2c2d3d0, resowner = 0x2c3ad10, cleanup = 0x62f15c <PortalCleanup>, createSubid = 1, activeSubid = 1,
sourceText = 0x2c09ef0 "insert into t_insert values(21,'PortalRunMulti','PortalRunMulti','PortalRunMulti');", commandTag = 0xb50908 "INSERT", stmts = 0x2cbe8c8, cplan = 0x0, portalParams = 0x0,
queryEnv = 0x0, strategy = PORTAL_MULTI_QUERY, cursorOptions = 4, run_once = true, status = PORTAL_ACTIVE, portalPinned = false, autoHeld = false, queryDesc = 0x0, tupDesc = 0x0, formats = 0x0,
holdStore = 0x0, holdContext = 0x0, holdSnapshot = 0x0, atStart = true, atEnd = true, portalPos = 0, creation_time = 587034112962796, visible = false}
(gdb) p *(portal->portalContext)
$2 = {type = T_AllocSetContext, isReset = true, allowInCritSection = false, methods = 0xb8c720 <AllocSetMethods>, parent = 0x2c6f380, firstchild = 0x0, prevchild = 0x0, nextchild = 0x0,
name = 0xb8d2f1 "PortalContext", ident = 0x2c72e98 "", reset_cbs = 0x0}
#2、isTopLevel
(gdb) p isTopLevel
$3 = true
#3、setHoldSnapshot
(gdb) p setHoldSnapshot
$4 = false
#4、dest
(gdb) p *dest
$5 = {receiveSlot = 0x4857ad <printtup>, rStartup = 0x485196 <printtup_startup>, rShutdown = 0x485bad <printtup_shutdown>, rDestroy = 0x485c21 <printtup_destroy>, mydest = DestRemote}
#5、altdest
(gdb) p *altdest
$6 = {receiveSlot = 0x4857ad <printtup>, rStartup = 0x485196 <printtup_startup>, rShutdown = 0x485bad <printtup_shutdown>, rDestroy = 0x485c21 <printtup_destroy>, mydest = DestRemote}
#6、completionTag
(gdb) p *completionTag
$7 = 0 '\000'
#单步调试
...
(gdb) next
1234 PlannedStmt *pstmt = lfirst_node(PlannedStmt, stmtlist_item);
(gdb)
1239 CHECK_FOR_INTERRUPTS();
(gdb) p *pstmt
$12 = {type = T_PlannedStmt, commandType = CMD_INSERT, queryId = 0, hasReturning = false, hasModifyingCTE = false, canSetTag = true, transientPlan = false, dependsOnRole = false,
parallelModeNeeded = false, jitFlags = 0, planTree = 0x2c0aff8, rtable = 0x2cbe7d8, resultRelations = 0x2cbe878, nonleafResultRelations = 0x0, rootResultRelations = 0x0, subplans = 0x0,
rewindPlanIDs = 0x0, rowMarks = 0x0, relationOids = 0x2cbe828, invalItems = 0x0, paramExecTypes = 0x2c31700, utilityStmt = 0x0, stmt_location = 0, stmt_len = 82}
(gdb) next
1241 if (pstmt->utilityStmt == NULL)
(gdb)
1248 if (log_executor_stats)
(gdb)
1257 if (!active_snapshot_set)
(gdb)
1259 Snapshot snapshot = GetTransactionSnapshot();
(gdb)
1262 if (setHoldSnapshot)
(gdb) p *snapshot
$13 = {satisfies = 0x9f73fc <HeapTupleSatisfiesMVCC>, xmin = 1612887, xmax = 1612887, xip = 0x2c2d1c0, xcnt = 0, subxip = 0x2c81c70, subxcnt = 0, suboverflowed = false, takenDuringRecovery = false,
copied = false, curcid = 0, speculativeToken = 0, active_count = 0, regd_count = 0, ph_node = {first_child = 0x0, next_sibling = 0x0, prev_or_parent = 0x0}, whenTaken = 0, lsn = 0}
(gdb)
...
(gdb)
PortalRun (portal=0x2c6f490, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0x2cbe8f8, altdest=0x2cbe8f8, completionTag=0x7ffe94ba4940 "INSERT 0 1") at pquery.c:803
803 MarkPortalDone(portal);
#DONE!
到此,相信大家对“PostgreSQL中PortalRunMulti函数和PortalRun函数的实现逻辑是什么”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!