文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

PostgreSQL 源码解读(8)- 插入数据#7(ExecutePlan)

2024-04-02 19:55

关注

本文简单介绍了PG插入数据部分的源码,主要内容包括ExecutePlan函数的实现逻辑,该函数位于execMain.c中。

一、基础信息

ExecutePlan函数使用的数据结构、宏定义以及依赖的函数等。
数据结构/宏定义
1、ScanDirection

//枚举变量,扫描的方向,向后/不需要移动/向前三种
 
 typedef enum ScanDirection
 {
     BackwardScanDirection = -1,
     NoMovementScanDirection = 0,
     ForwardScanDirection = 1
 } ScanDirection;

2、DestReceiver

//目标端接收器
//包括启动/关闭/销毁/接收数据的函数以及接收器的命令目标类型
 
 typedef struct _DestReceiver DestReceiver;
 
 struct _DestReceiver
 {
     
     bool        (*receiveSlot) (TupleTableSlot *slot,
                                 DestReceiver *self);//接收Slot的函数指针
     
     void        (*rStartup) (DestReceiver *self,
                              int operation,
                              TupleDesc typeinfo);//Startup函数指针
     void        (*rShutdown) (DestReceiver *self);//Shutdown函数指针
     
     void        (*rDestroy) (DestReceiver *self);//Destroy函数指针
     
     CommandDest mydest;//
     
 };

 
 typedef enum
 {
     DestNone,                   
     DestDebug,                  
     DestRemote,                 
     DestRemoteExecute,          
     DestRemoteSimple,           
     DestSPI,                    
     DestTuplestore,             
     DestIntoRel,                
     DestCopyOut,                
     DestSQLFunction,            
     DestTransientRel,           
     DestTupleQueue              
 } CommandDest;

3、ResetPerTupleExprContext

//重置per-output-tuple exprcontext
 
 #define ResetPerTupleExprContext(estate) \
     do { \
         if ((estate)->es_per_tuple_exprcontext) \
             ResetExprContext((estate)->es_per_tuple_exprcontext); \
     } while (0)

依赖的函数
1、EnterParallelMode

//进入并行模式函数以及其相关的数据结构/子函数等
 
 void
 EnterParallelMode(void)
 {
     TransactionState s = CurrentTransactionState;
 
     Assert(s->parallelModeLevel >= 0);
 
     ++s->parallelModeLevel;
 }

 
 typedef struct TransactionStateData
 {
     TransactionId transactionId;    
     SubTransactionId subTransactionId;  
     char       *name;           
     int         savepointLevel; 
     TransState  state;          
     TBlockState blockState;     
     int         nestingLevel;   
     int         gucNestLevel;   
     MemoryContext curTransactionContext;    
     ResourceOwner curTransactionOwner;  
     TransactionId *childXids;   
     int         nChildXids;     
     int         maxChildXids;   
     Oid         prevUser;       
     int         prevSecContext; 
     bool        prevXactReadOnly;   
     bool        startedInRecovery;  
     bool        didLogXid;      
     int         parallelModeLevel;  
     struct TransactionStateData *parent;    
 } TransactionStateData;
 
 typedef TransactionStateData *TransactionState;

 
 typedef enum TransState
 {
     TRANS_DEFAULT,              
     TRANS_START,                
     TRANS_INPROGRESS,           
     TRANS_COMMIT,               
     TRANS_ABORT,                
     TRANS_PREPARE               
 } TransState;
 

 typedef enum TBlockState
 {
     
     TBLOCK_DEFAULT,             
     TBLOCK_STARTED,             
 
     
     TBLOCK_BEGIN,               
     TBLOCK_INPROGRESS,          
     TBLOCK_IMPLICIT_INPROGRESS, 
     TBLOCK_PARALLEL_INPROGRESS, 
     TBLOCK_END,                 
     TBLOCK_ABORT,               
     TBLOCK_ABORT_END,           
     TBLOCK_ABORT_PENDING,       
     TBLOCK_PREPARE,             
 
     
     TBLOCK_SUBBEGIN,            
     TBLOCK_SUBINPROGRESS,       
     TBLOCK_SUBRELEASE,          
     TBLOCK_SUBCOMMIT,           
     TBLOCK_SUBABORT,            
     TBLOCK_SUBABORT_END,        
     TBLOCK_SUBABORT_PENDING,    
     TBLOCK_SUBRESTART,          
     TBLOCK_SUBABORT_RESTART     
 } TBlockState;

 typedef struct MemoryContextMethods
 {
     void       *(*alloc) (MemoryContext context, Size size);
     
     void        (*free_p) (MemoryContext context, void *pointer);
     void       *(*realloc) (MemoryContext context, void *pointer, Size size);
     void        (*reset) (MemoryContext context);
     void        (*delete_context) (MemoryContext context);
     Size        (*get_chunk_space) (MemoryContext context, void *pointer);
     bool        (*is_empty) (MemoryContext context);
     void        (*stats) (MemoryContext context,
                           MemoryStatsPrintFunc printfunc, void *passthru,
                           MemoryContextCounters *totals);
 #ifdef MEMORY_CONTEXT_CHECKING
     void        (*check) (MemoryContext context);
 #endif
 } MemoryContextMethods;

 
 typedef void (*MemoryContextCallbackFunction) (void *arg);
 
 typedef struct MemoryContextCallback
 {
     MemoryContextCallbackFunction func; 
     void       *arg;            
     struct MemoryContextCallback *next; 
 } MemoryContextCallback;
 

 typedef struct MemoryContextData
 {
     NodeTag     type;           
     
     bool        isReset;        
     bool        allowInCritSection; 
     const MemoryContextMethods *methods;    
     MemoryContext parent;       
     MemoryContext firstchild;   
     MemoryContext prevchild;    
     MemoryContext nextchild;    
     const char *name;           
     const char *ident;          
     MemoryContextCallback *reset_cbs;   
 } MemoryContextData;
 
 


 typedef struct MemoryContextData *MemoryContext;


 
 typedef struct ResourceOwnerData
 {
     ResourceOwner parent;       
     ResourceOwner firstchild;   
     ResourceOwner nextchild;    
     const char *name;           
 
     
     ResourceArray bufferarr;    
     ResourceArray catrefarr;    
     ResourceArray catlistrefarr;    
     ResourceArray relrefarr;    
     ResourceArray planrefarr;   
     ResourceArray tupdescarr;   
     ResourceArray snapshotarr;  
     ResourceArray filearr;      
     ResourceArray dsmarr;       
     ResourceArray jitarr;       
 
     
     int         nlocks;         
     LOCALLOCK  *locks[MAX_RESOWNER_LOCKS];  
 }           ResourceOwnerData;
 
 

 typedef struct ResourceOwnerData *ResourceOwner;
 
 typedef uint32 TransactionId;

2、ExecShutdownNode
//关闭资源

 
 bool
 ExecShutdownNode(PlanState *node)
 {
     if (node == NULL)
         return false;
 
     check_stack_depth();
 
     planstate_tree_walker(node, ExecShutdownNode, NULL);
 
     
     if (node->instrument && node->instrument->running)
         InstrStartNode(node->instrument);
 
     switch (nodeTag(node))
     {
         case T_GatherState:
             ExecShutdownGather((GatherState *) node);
             break;
         case T_ForeignScanState:
             ExecShutdownForeignScan((ForeignScanState *) node);
             break;
         case T_CustomScanState:
             ExecShutdownCustomScan((CustomScanState *) node);
             break;
         case T_GatherMergeState:
             ExecShutdownGatherMerge((GatherMergeState *) node);
             break;
         case T_HashState:
             ExecShutdownHash((HashState *) node);
             break;
         case T_HashJoinState:
             ExecShutdownHashJoin((HashJoinState *) node);
             break;
         default:
             break;
     }
 
     
     if (node->instrument && node->instrument->running)
         InstrStopNode(node->instrument, 0);
 
     return false;
 }

3、ExitParallelMode

//退出并行模式
 
 void
 ExitParallelMode(void)
 {
     TransactionState s = CurrentTransactionState;
 
     Assert(s->parallelModeLevel > 0);
     Assert(s->parallelModeLevel > 1 || !ParallelContextActive());
 
     --s->parallelModeLevel;
 }

二、源码解读



static void
ExecutePlan(EState *estate,
            PlanState *planstate,
            bool use_parallel_mode,
            CmdType operation,
            bool sendTuples,
            uint64 numberTuples,
            ScanDirection direction,
            DestReceiver *dest,
            bool execute_once)
{
    TupleTableSlot *slot;//存储Tuple的Slot
    uint64      current_tuple_count;//Tuple计数器

    
    current_tuple_count = 0;//初始化

    
    estate->es_direction = direction;//扫描方向

    
    if (!execute_once)
        use_parallel_mode = false;//不使用并行模式

    estate->es_use_parallel_mode = use_parallel_mode;
    if (use_parallel_mode)
        EnterParallelMode();//如果并行执行,进入并行模式

    
    for (;;)
    {
        
        ResetPerTupleExprContext(estate);//重置 per-output-tuple exprcontext

        
        slot = ExecProcNode(planstate);//执行处理单元

        
        if (TupIsNull(slot))
        {
            
            (void) ExecShutdownNode(planstate);//如果slot返回为空,已完成处理,可以释放资源了
            break;
        }

        
        if (estate->es_junkFilter != NULL)
            slot = ExecFilterJunk(estate->es_junkFilter, slot);//去掉“无价值”的属性

        
        if (sendTuples)//如果需要传输结果集
        {
            
            if (!dest->receiveSlot(slot, dest))//传输,如果接收完成,则退出
                break;
        }

        
        if (operation == CMD_SELECT)
            (estate->es_processed)++;//统计计数

        
        current_tuple_count++;//统计返回的Tuple数
        if (numberTuples && numberTuples == current_tuple_count)
        {
            
            (void) ExecShutdownNode(planstate);//释放资源后退出循环
            break;
        }
    }

    if (use_parallel_mode)
        ExitParallelMode();//退出并行模式
}


三、跟踪分析

插入测试数据:

testdb=# -- #7 ExecutePlan
testdb=# -- 获取pid
testdb=# select pg_backend_pid();
 pg_backend_pid 
----------------
           3294
(1 row)
testdb=# -- 插入1行
testdb=# insert into t_insert values(15,'ExecutePlan','ExecutePlan','ExecutePlan');
(挂起)

启动gdb,跟踪调试:

[root@localhost ~]# gdb -p 3294
GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-100.el7
Copyright (C) 2013 Free Software Foundation, Inc.
...
(gdb) b ExecutePlan
Breakpoint 1 at 0x692df1: file execMain.c, line 1697.
(gdb) c
Continuing.

Breakpoint 1, ExecutePlan (estate=0x2cca440, planstate=0x2cca790, use_parallel_mode=false, operation=CMD_INSERT, sendTuples=false, numberTuples=0, direction=ForwardScanDirection, dest=0x2cd0888, 
    execute_once=true) at execMain.c:1697
1697        current_tuple_count = 0;
(gdb) 
#查看输入参数
#1、Executor状态 estate
(gdb) p *estate
$1 = {type = T_EState, es_direction = ForwardScanDirection, es_snapshot = 0x2c3f920, es_crosscheck_snapshot = 0x0, es_range_table = 0x2cd0768, es_plannedstmt = 0x2c1d478, 
  es_sourceText = 0x2c1bef0 "insert into t_insert values(15,'ExecutePlan','ExecutePlan','ExecutePlan');", es_junkFilter = 0x0, es_output_cid = 0, es_result_relations = 0x2cca680, 
  es_num_result_relations = 1, es_result_relation_info = 0x0, es_root_result_relations = 0x0, es_num_root_result_relations = 0, es_tuple_routing_result_relations = 0x0, es_trig_target_relations = 0x0, 
  es_trig_tuple_slot = 0x2ccb750, es_trig_oldtup_slot = 0x0, es_trig_newtup_slot = 0x0, es_param_list_info = 0x0, es_param_exec_vals = 0x2cca650, es_queryEnv = 0x0, es_query_cxt = 0x2cca330, 
  es_tupleTable = 0x2ccb000, es_rowMarks = 0x0, es_processed = 0, es_lastoid = 0, es_top_eflags = 0, es_instrument = 0, es_finished = false, es_exprcontexts = 0x2ccac50, es_subplanstates = 0x0, 
  es_auxmodifytables = 0x0, es_per_tuple_exprcontext = 0x0, es_epqTuple = 0x0, es_epqTupleSet = 0x0, es_epqScanDone = 0x0, es_use_parallel_mode = false, es_query_dsa = 0x0, es_jit_flags = 0, 
  es_jit = 0x0}
#执行期间构造的snapshot
(gdb) p *(estate->es_snapshot)
$2 = {satisfies = 0x9f73fc <HeapTupleSatisfiesMVCC>, xmin = 1612872, xmax = 1612872, xip = 0x0, xcnt = 0, subxip = 0x0, subxcnt = 0, suboverflowed = false, takenDuringRecovery = false, copied = true, 
  curcid = 0, speculativeToken = 0, active_count = 1, regd_count = 2, ph_node = {first_child = 0xe7bac0 <CatalogSnapshotData+64>, next_sibling = 0x0, prev_or_parent = 0x0}, whenTaken = 0, lsn = 0}
(gdb) 
#执行计划对应的Statement
#commandType = CMD_INSERT,插入操作
#hasReturning =false,没有返回值
(gdb) p *(estate->es_range_table)
$3 = {type = T_List, length = 1, head = 0x2cd0748, tail = 0x2cd0748}
(gdb) p *(estate->es_plannedstmt)
$4 = {type = T_PlannedStmt, commandType = CMD_INSERT, queryId = 0, hasReturning = false, hasModifyingCTE = false, canSetTag = true, transientPlan = false, dependsOnRole = false, 
  parallelModeNeeded = false, jitFlags = 0, planTree = 0x2c1cff8, rtable = 0x2cd0768, resultRelations = 0x2cd0808, nonleafResultRelations = 0x0, rootResultRelations = 0x0, subplans = 0x0, 
  rewindPlanIDs = 0x0, rowMarks = 0x0, relationOids = 0x2cd07b8, invalItems = 0x0, paramExecTypes = 0x2c43698, utilityStmt = 0x0, stmt_location = 0, stmt_len = 73}
(gdb) 
#SQL语句
#es_sourceText = 0x2c1bef0 "insert into t_insert values(15,'ExecutePlan','ExecutePlan','ExecutePlan');"
#junkFilter为NULL(0x0)
#结果Relation相关信息
(gdb) p *(estate->es_result_relations)
$5 = {type = T_ResultRelInfo, ri_RangeTableIndex = 1, ri_RelationDesc = 0x7f3c13f49b78, ri_NumIndices = 1, ri_IndexRelationDescs = 0x2ccafd0, ri_IndexRelationInfo = 0x2ccafe8, ri_TrigDesc = 0x0, 
  ri_TrigFunctions = 0x0, ri_TrigWhenExprs = 0x0, ri_TrigInstrument = 0x0, ri_FdwRoutine = 0x0, ri_FdwState = 0x0, ri_usesFdwDirectModify = false, ri_WithCheckOptions = 0x0, 
  ri_WithCheckOptionExprs = 0x0, ri_ConstraintExprs = 0x0, ri_junkFilter = 0x0, ri_returningList = 0x0, ri_projectReturning = 0x0, ri_onConflictArbiterIndexes = 0x0, ri_onConflict = 0x0, 
  ri_PartitionCheck = 0x0, ri_PartitionCheckExpr = 0x0, ri_PartitionRoot = 0x0, ri_PartitionReadyForRouting = false}
#es_trig_tuple_slot
gdb) p *(estate->es_trig_tuple_slot)
$6 = {type = T_TupleTableSlot, tts_isempty = true, tts_shouldFree = false, tts_shouldFreeMin = false, tts_slow = false, tts_tuple = 0x0, tts_tupleDescriptor = 0x0, tts_mcxt = 0x2cca330, 
  tts_buffer = 0, tts_nvalid = 0, tts_values = 0x0, tts_isnull = 0x0, tts_mintuple = 0x0, tts_minhdr = {t_len = 0, t_self = {ip_blkid = {bi_hi = 0, bi_lo = 0}, ip_posid = 0}, t_tableOid = 0, 
    t_data = 0x0}, tts_off = 0, tts_fixedTupleDescriptor = false}
#estate中的其他变量
(gdb) p *(estate->es_param_exec_vals)
$7 = {execPlan = 0x0, value = 0, isnull = false}
(gdb) p *(estate->es_query_cxt)
$8 = {type = T_AllocSetContext, isReset = false, allowInCritSection = false, methods = 0xb8c720 <AllocSetMethods>, parent = 0x2c3f3d0, firstchild = 0x2ccc340, prevchild = 0x0, nextchild = 0x0, 
  name = 0xb1a840 "ExecutorState", ident = 0x0, reset_cbs = 0x0}
(gdb) p *(estate->es_tupleTable)
$9 = {type = T_List, length = 3, head = 0x2ccb240, tail = 0x2ccb7e0}
(gdb) p *(estate->es_exprcontexts)
$10 = {type = T_List, length = 1, head = 0x2ccafb0, tail = 0x2ccafb0}
#2、planstate
(gdb) p *planstate
$14 = {type = T_ModifyTableState, plan = 0x2c1cff8, state = 0x2cca440, ExecProcNode = 0x69a78b <ExecProcNodeFirst>, ExecProcNodeReal = 0x6c2485 <ExecModifyTable>, instrument = 0x0, 
  worker_instrument = 0x0, qual = 0x0, lefttree = 0x0, righttree = 0x0, initPlan = 0x0, subPlan = 0x0, chgParam = 0x0, ps_ResultTupleSlot = 0x2ccb6a0, ps_ExprContext = 0x0, ps_ProjInfo = 0x0, 
  scandesc = 0x0}
#plan
(gdb) p *(planstate->plan)
$15 = {type = T_ModifyTable, startup_cost = 0, total_cost = 0.01, plan_rows = 1, plan_width = 298, parallel_aware = false, parallel_safe = false, plan_node_id = 0, targetlist = 0x0, qual = 0x0, 
  lefttree = 0x0, righttree = 0x0, initPlan = 0x0, extParam = 0x0, allParam = 0x0}
#PlanState中的EState
(gdb) p *(planstate->state)
$16 = {type = T_EState, es_direction = ForwardScanDirection, es_snapshot = 0x2c3f920, es_crosscheck_snapshot = 0x0, es_range_table = 0x2cd0768, es_plannedstmt = 0x2c1d478, 
  es_sourceText = 0x2c1bef0 "insert into t_insert values(15,'ExecutePlan','ExecutePlan','ExecutePlan');", es_junkFilter = 0x0, es_output_cid = 0, es_result_relations = 0x2cca680, 
  es_num_result_relations = 1, es_result_relation_info = 0x0, es_root_result_relations = 0x0, es_num_root_result_relations = 0, es_tuple_routing_result_relations = 0x0, es_trig_target_relations = 0x0, 
  es_trig_tuple_slot = 0x2ccb750, es_trig_oldtup_slot = 0x0, es_trig_newtup_slot = 0x0, es_param_list_info = 0x0, es_param_exec_vals = 0x2cca650, es_queryEnv = 0x0, es_query_cxt = 0x2cca330, 
  es_tupleTable = 0x2ccb000, es_rowMarks = 0x0, es_processed = 0, es_lastoid = 0, es_top_eflags = 0, es_instrument = 0, es_finished = false, es_exprcontexts = 0x2ccac50, es_subplanstates = 0x0, 
  es_auxmodifytables = 0x0, es_per_tuple_exprcontext = 0x0, es_epqTuple = 0x0, es_epqTupleSet = 0x0, es_epqScanDone = 0x0, es_use_parallel_mode = false, es_query_dsa = 0x0, es_jit_flags = 0, 
  es_jit = 0x0}
#ExecProcNode=ExecProcNodeFirst
#ExecProcNodeReal =ExecModifyTable
#3、use_parallel_mode
(gdb) p use_parallel_mode
$17 = false #非并行模式
#4、operation
(gdb) p operation
$18 = CMD_INSERT #插入操作
#5、sendTuples
(gdb) p sendTuples
$19 = false
#6、numberTuples
(gdb) p numberTuples
$20 = 0
#7、direction
(gdb) p direction
$21 = ForwardScanDirection
#8、dest
(gdb) p *dest
$22 = {receiveSlot = 0x4857ad <printtup>, rStartup = 0x485196 <printtup_startup>, rShutdown = 0x485bad <printtup_shutdown>, rDestroy = 0x485c21 <printtup_destroy>, mydest = DestRemote}
#9、execute_once
(gdb) p execute_once
$23 = true
#----------------------------------------------------
#继续执行
(gdb) next
1702        estate->es_direction = direction;
(gdb) 
1708        if (!execute_once)
(gdb) 
1711        estate->es_use_parallel_mode = use_parallel_mode;
(gdb) 
1712        if (use_parallel_mode)
(gdb) 
1721            ResetPerTupleExprContext(estate);
(gdb) 
1726            slot = ExecProcNode(planstate);
(gdb) 
1732            if (TupIsNull(slot))
(gdb) p *slot
Cannot access memory at address 0x0
#返回的slot为NULL
(gdb) next
1735                (void) ExecShutdownNode(planstate);
(gdb) 
1736                break;
(gdb) 
1787        if (use_parallel_mode)
(gdb) 
1789    }
(gdb) 
standard_ExecutorRun (queryDesc=0x2c3f4e0, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:377
377     if (sendTuples)
#DONE!

四、小结

1、统一的处理模型:先前也提过,INSERT/UPDATE/DELETE/SELECT等,均采用统一的处理模式进行处理;
2、重要的数据结构:PlanState和EState,在整个执行过程中,非常重要的状态信息;
3、并行模式:并行模式的处理,看起来只是开启&打开,后续值得探究一番。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 资料下载
  • 历年真题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯