PG中UPDATE源码分析
本文主要描述SQL中UPDATE语句的源码分析,代码为PG13.3版本。
整体流程分析
以update dtea set id = 1;
这条最简单的Update语句进行源码分析(dtea不是分区表,不考虑并行等,没有建立任何索引),帮助我们理解update的大致流程。
SQL流程如下:
parser(语法解析,生成语法解析树UpdateStmt,检查是否有语法层面的错误)
analyze(语义分析, UpdateStmt转为查询树Query, 会查系统表检查有无语义方面的错误)
rewrite(规则重写, 根据规则rules重写查询树Query, 根据事先存储在系统表中的规则进行重写,没有的话不进行重写,另外加一句,视图的实现是根据规则系统实现的,也是在这里需要进行处理)
optimizer(优化器:逻辑优化、物理优化、生成执行计划, 由Query生成对应的执行计划PlannedStmt, 基于代价的优化器,由最佳路径Path生成最佳执行计划Plan)
executor(执行器,会有各种算子,依据执行计划进行处理,火山模型,一次一元组)
storage(存储引擎)。中间还有事务处理。事务处理部分的代码这里不再进行分析,免得将问题复杂化。存储引擎那部分也不进行分析,重点关注解析、优化、执行这三部分。
对应的代码:
exec_simple_query(const char *query_string)
// ------- 解析器部分--------------
--> pg_parse_query(query_string); //生成语法解析树
--> pg_analyze_and_rewrite(parsetree, query_string,NULL, 0, NULL); // 生成查询树Query
--> parse_analyze(parsetree, query_string, paramTypes, numParams,queryEnv); // 语义分析
--> pg_rewrite_query(query); // 规则重写
// --------优化器----------
--> pg_plan_queries()
//-------- 执行器----------
--> PortalStart(portal, NULL, 0, InvalidSnapshot);
--> PortalRun(portal,FETCH_ALL,true,true,receiver,receiver,&qc); // 执行器执行
--> PortalDrop(portal, false);
解析部分——生成语法解析树UpdateStmt
关键数据结构:UpdateStmt
、RangeVar
、ResTarget
:
typedef struct UpdateStmt
{
NodeTag type;
RangeVar *relation;
List *targetList; // 对应语句中的set id = 0;信息在这里
Node *whereClause;
List *fromClause;
List *returningList;
WithClause *withClause;
} UpdateStmt;
// dtea 表
typedef struct RangeVar
{
NodeTag type;
char *catalogname;
char *schemaname;
char *relname;
bool inh;
char relpersistence;
Alias *alias;
int location;
} RangeVar;
// set id = 0; 经transformTargetList() -> transformTargetEntry,会转为TargetEntry
typedef struct ResTarget
{
NodeTag type;
char *name; // id column
List *indirection;
Node *val; // = 1表达式节点存在这里
int location;
} ResTarget;
用户输入的update语句update dtea set id = 1
由字符串会转为可由数据库理解的内部数据结构语法解析树UpdateStmt
。执行逻辑在pg_parse_query(query_string);
中,需要理解flex与bison。
gram.y中Update语法的定义:
//结合这条语句分析 update dtea set id = 0;
UpdateStmt: opt_with_clause UPDATE relation_expr_opt_alias
SET set_clause_list from_clause where_or_current_clause returning_clause
{
UpdateStmt *n = makeNode(UpdateStmt);
n->relation = $3;
n->targetList = $5;
n->fromClause = $6;
n->whereClause = $7;
n->returningList = $8;
n->withClause = $1;
$$ = (Node *)n;
}
;
set_clause_list:
set_clause { $$ = $1; }
| set_clause_list ',' set_clause { $$ = list_concat($1,$3); }
;
// 对应的是 set id = 0
set_clause: // id = 0
set_target '=' a_expr
{
$1->val = (Node *) $3;
$$ = list_make1($1);
}
| '(' set_target_list ')' '=' a_expr
{
int ncolumns = list_length($2);
int i = 1;
ListCell *col_cell;
foreach(col_cell, $2)
{
ResTarget *res_col = (ResTarget *) lfirst(col_cell);
MultiAssignRef *r = makeNode(MultiAssignRef);
r->source = (Node *) $5;
r->colno = i;
r->ncolumns = ncolumns;
res_col->val = (Node *) r;
i++;
}
$$ = $2;
}
;
set_target:
ColId opt_indirection
{
$$ = makeNode(ResTarget);
$$->name = $1;
$$->indirection = check_indirection($2, yyscanner);
$$->val = NULL;
$$->location = @1;
}
;
set_target_list:
set_target { $$ = list_make1($1); }
| set_target_list ',' set_target { $$ = lappend($1,$3); }
;
解析部分——生成查询树Query
生成了UpdateStmt
后, 会经由parse_analyze
语义分析,生成查询树Query
,以供后续优化器生成执行计划。主要代码在src/backent/parser/analyze.c
中
analyze.c : transform the raw parse tree into a query tree
parse_analyze()
--> transformTopLevelStmt(pstate, parseTree);
--> transformOptionalSelectInto(pstate, parseTree->stmt);
--> transformStmt(pstate, parseTree);
// transforms an update statement
--> transformUpdateStmt(pstate, (UpdateStmt *) parseTree); // 实际由UpdateStmt转为Query的处理函数
具体的我们看一下transformUpdateStmt
函数实现:
static Query *transformUpdateStmt(ParseState *pstate, UpdateStmt *stmt) {
Query *qry = makeNode(Query);
ParseNamespaceItem *nsitem;
Node *qual;
qry->commandType = CMD_UPDATE;
pstate->p_is_insert = false;
if (stmt->withClause) {
qry->hasRecursive = stmt->withClause->recursive;
qry->cteList = transformWithClause(pstate, stmt->withClause);
qry->hasModifyingCTE = pstate->p_hasModifyingCTE;
}
qry->resultRelation = setTargetTable(pstate, stmt->relation, stmt->relation->inh, true, ACL_UPDATE);
nsitem = pstate->p_target_nsitem;
nsitem->p_lateral_only = true;
nsitem->p_lateral_ok = false;
transformFromClause(pstate, stmt->fromClause);
nsitem->p_lateral_only = false;
nsitem->p_lateral_ok = true;
qual = transformWhereClause(pstate, stmt->whereClause,EXPR_KIND_WHERE, "WHERE");
qry->returningList = transformReturningList(pstate, stmt->returningList);
qry->targetList = transformUpdateTargetList(pstate, stmt->targetList); // 处理SQL语句中的 set id =1
qry->rtable = pstate->p_rtable;
qry->jointree = makeFromExpr(pstate->p_joinlist, qual);
qry->hasTargetSRFs = pstate->p_hasTargetSRFs;
qry->hasSubLinks = pstate->p_hasSubLinks;
assign_query_collations(pstate, qry);
return qry;
}
这里面要重点关注一下transformTargetList
,会将抽象语法树中的ResTarget
转为查询器的TargetEntry
。
typedef struct TargetEntry
{
Expr xpr;
Expr *expr;
AttrNumber resno;
char *resname;
Index ressortgroupref;
Oid resorigtbl;
AttrNumber resorigcol;
bool resjunk;
} TargetEntry;
对于其内部处理可参考源码
src/backend/parser
中的相关处理,这里不再细述。需要重点阅读一下README,PG源码中所有的README都是非常好的资料,一定要认真读。
优化器——生成执行计划
这块的内容很多,主要的逻辑是先进行逻辑优化,比如子查询、子链接、常量表达式、选择下推等等的处理,因为我们要分析的这条语句十分简单,所以逻辑优化的这部分都没有涉及到。物理优化,涉及到选择率,代价估计,索引扫描还是顺序扫描,选择那种连接方式,应用动态规划呢还是基因算法,选择nestloop-join、merge-join还是hash-join等。因为我们这个表没有建索引,更新单表也不涉及到多表连接,所以物理优化这块涉及的也不多。路径生成,生成最佳路径,再由最佳路径生成执行计划。
在路径生成这块,最基础的是对表的扫描方式,比如顺序扫描、索引扫描,再往上是连接方式,采用那种连接方式,再往上是比如排序、Limit等路径......,由底向上生成路径。我们要分析的语句很简单,没有其他处理,就顺序扫描再更新就可以了。
这里先不考虑并行执行计划。我们先看一下其执行计划结果:
postgres@postgres=# explain update dtea set id = 0;
QUERY PLAN
--------------------------------------------------------------
Update on dtea (cost=0.00..19.00 rows=900 width=68)
-> Seq Scan on dtea (cost=0.00..19.00 rows=900 width=68)
(2 rows)
下面我们分析一下其执行计划的生成流程:
// 由查询树Query--> Path --> Plan (PlannedStmt)
pg_plan_queries()
--> pg_plan_query()
--> planner()
--> standard_planner(Query *parse, const char *query_string, int cursorOptions,ParamListInfo boundParams)
// 由Query---> PlannerInfo
--> subquery_planner(glob, parse, NULL,false, tuple_fraction); // 涉及到很多逻辑优化的内容,很多不列出
--> pull_up_sublinks(root);
--> pull_up_subqueries(root); // 这里只列出几个重要的逻辑优化内容,其他的不再列出......
// 如果是update/delete分区表继承表则走inheritance_planner(),其他情况走grouping_planner()
--> inheritance_planner() // update/delete分区表继承表的情况
--> grouping_planner()
--> grouping_planner() // 非分区表、继承表的情况
--> preprocess_targetlist(root); // update虽然只更新一列,但是插入一条新元组的时候,需要知道其他列信息.
--> rewriteTargetListUD(parse, target_rte, target_relation);
--> expand_targetlist()
--> query_planner(root, standard_qp_callback, &qp_extra); // 重要
--> add_base_rels_to_query()
--> deconstruct_jointree(root);
--> add_other_rels_to_query(root); // 展开分区表到PlannerInfo中的相关字段中
--> expand_inherited_rtentry()
--> expand_planner_arrays(root, num_live_parts);
--> make_one_rel(root, joinlist);
--> set_base_rel_sizes(root);
--> set_rel_size();
--> set_append_rel_size(root, rel, rti, rte); // 如果是分区表或者继承走这里,否则走下面
--> set_rel_size(root, childrel, childRTindex, childRTE); // 处理子分区表
--> set_plain_rel_size(root, rel, rte);
--> set_plain_rel_size() // 如果不是分区表或者继承
--> set_baserel_size_estimates()
--> set_base_rel_pathlists(root);
--> set_rel_pathlist(root, rel, rti, root->simple_rte_array[rti]);
--> set_append_rel_pathlist(root, rel, rti, rte); // 生成各分区表的访问路径
--> make_rel_from_joinlist(root, joinlist);// 动态规划还是基因规划
--> standard_join_search() // 动态规划
--> geqo() // 基因规划与动态规划二选一
--> apply_scanjoin_target_to_paths()
--> create_modifytable_path()
// 由PlannerInfo---> RelOptInfo
--> fetch_upper_rel(root, UPPERREL_FINAL, NULL);
// 由RelOptInfo---> Path
--> get_cheapest_fractional_path(final_rel, tuple_fraction);
// 由 PlannerInfo+Path ---> Plan
--> create_plan(root, best_path);
// 后续处理,由Plan ---> PlannedStmt
核心数据结构:PlannedStmt、PlannerInfo、RelOptInfo(存储访问路径及其代价)、Path
Path:所有的路径都继承自Path,所以这个比较重要。
typedef struct Path
{
NodeTag type;
NodeTag pathtype;
RelOptInfo *parent;
PathTarget *pathtarget;
ParamPathInfo *param_info;
bool parallel_aware;
bool parallel_safe;
int parallel_workers;
double rows;
Cost startup_cost;
Cost total_cost;
List *pathkeys;
} Path;
typedef struct ModifyTablePath
{
Path path; // 可以看到ModifyTablePath继承自Path
CmdType operation;
bool canSetTag;
Index nominalRelation;
Index rootRelation;
bool partColsUpdated;
List *resultRelations;
List *subpaths;
List *subroots;
List *withCheckOptionLists;
List *returningLists;
List *rowMarks;
OnConflictExpr *onconflict;
int epqParam;
} ModifyTablePath;
生成update执行路径,最终都是要生成ModifyTablePath,本例中路径生成过程:Path-->ProjectionPath-->ModifyTablePath,也就是先顺序扫描表,再修改表。后面由路径生成执行计划。
ModifyTablePath *create_modifytable_path(PlannerInfo *root, RelOptInfo *rel,
CmdType operation, bool canSetTag,
Index nominalRelation, Index rootRelation,
bool partColsUpdated,
List *resultRelations, List *subpaths,
List *subroots,
List *withCheckOptionLists, List *returningLists,
List *rowMarks, OnConflictExpr *onconflict,
int epqParam)
{
ModifyTablePath *pathnode = makeNode(ModifyTablePath);
double total_size;
ListCell *lc;
Assert(list_length(resultRelations) == list_length(subpaths));
Assert(list_length(resultRelations) == list_length(subroots));
Assert(withCheckOptionLists == NIL || list_length(resultRelations) == list_length(withCheckOptionLists));
Assert(returningLists == NIL || list_length(resultRelations) == list_length(returningLists));
pathnode->path.pathtype = T_ModifyTable;
pathnode->path.parent = rel;
pathnode->path.pathtarget = rel->reltarget;
pathnode->path.param_info = NULL;
pathnode->path.parallel_aware = false;
pathnode->path.parallel_safe = false;
pathnode->path.parallel_workers = 0;
pathnode->path.pathkeys = NIL;
pathnode->path.startup_cost = 0;
pathnode->path.total_cost = 0;
pathnode->path.rows = 0;
total_size = 0;
foreach(lc, subpaths)
{
Path *subpath = (Path *) lfirst(lc);
if (lc == list_head(subpaths))
pathnode->path.startup_cost = subpath->startup_cost;
pathnode->path.total_cost += subpath->total_cost;
pathnode->path.rows += subpath->rows;
total_size += subpath->pathtarget->width * subpath->rows;
}
if (pathnode->path.rows > 0)
total_size /= pathnode->path.rows;
pathnode->path.pathtarget->width = rint(total_size);
pathnode->operation = operation;
pathnode->canSetTag = canSetTag;
pathnode->nominalRelation = nominalRelation;
pathnode->rootRelation = rootRelation;
pathnode->partColsUpdated = partColsUpdated;
pathnode->resultRelations = resultRelations;
pathnode->subpaths = subpaths;
pathnode->subroots = subroots;
pathnode->withCheckOptionLists = withCheckOptionLists;
pathnode->returningLists = returningLists;
pathnode->rowMarks = rowMarks;
pathnode->onconflict = onconflict;
pathnode->epqParam = epqParam;
return pathnode;
}
现在我们生成了最优的update路径,需要由路径生成执行计划:
Plan *create_plan(PlannerInfo *root, Path *best_path)
{
Plan *plan;
Assert(root->plan_params == NIL);
root->curOuterRels = NULL;
root->curOuterParams = NIL;
plan = create_plan_recurse(root, best_path, CP_EXACT_TLIST); // 实际实现是在这里
if (!IsA(plan, ModifyTable))
apply_tlist_labeling(plan->targetlist, root->processed_tlist);
SS_attach_initplans(root, plan);
if (root->curOuterParams != NIL)
elog(ERROR, "failed to assign all NestLoopParams to plan nodes");
root->plan_params = NIL;
return plan;
}
// 由最佳路径生成最佳执行计划
static ModifyTable *create_modifytable_plan(PlannerInfo *root, ModifyTablePath *best_path)
{
ModifyTable *plan;
List *subplans = NIL;
ListCell *subpaths,
*subroots;
forboth(subpaths, best_path->subpaths, subroots, best_path->subroots)
{
Path *subpath = (Path *) lfirst(subpaths);
PlannerInfo *subroot = (PlannerInfo *) lfirst(subroots);
Plan *subplan;
subplan = create_plan_recurse(subroot, subpath, CP_EXACT_TLIST);
apply_tlist_labeling(subplan->targetlist, subroot->processed_tlist);
subplans = lappend(subplans, subplan);
}
plan = make_modifytable(root,best_path->operation,best_path->canSetTag,
best_path->nominalRelation,best_path->rootRelation,
best_path->partColsUpdated,best_path->resultRelations,
subplans,best_path->subroots,best_path->withCheckOptionLists,
best_path->returningLists,best_path->rowMarks,
best_path->onconflict,best_path->epqParam);
copy_generic_path_info(&plan->plan, &best_path->path);
return plan;
}
最终的执行计划是ModifyTable:
typedef struct ModifyTable
{
Plan plan;
CmdType operation;
bool canSetTag;
Index nominalRelation;
Index rootRelation;
bool partColsUpdated;
List *resultRelations;
int resultRelIndex;
int rootResultRelIndex;
List *plans;
List *withCheckOptionLists;
List *returningLists;
List *fdwPrivLists;
Bitmapset *fdwDirectModifyPlans;
List *rowMarks;
int epqParam;
OnConflictAction onConflictAction;
List *arbiterIndexes;
List *onConflictSet;
Node *onConflictWhere;
Index exclRelRTI;
List *exclRelTlist;
} ModifyTable;
执行器
根据上面的执行计划,去执行。主要是各种算子的实现,其中要理解执行器的运行原理,主要是火山模型,一次一元组。我们看一下其调用过程。
CreatePortal("", true, true);
PortalDefineQuery(portal,NULL,query_string,commandTag,plantree_list,NULL);
PortalStart(portal, NULL, 0, InvalidSnapshot);
PortalRun(portal,FETCH_ALL,true,true,receiver,receiver,&qc);
--> PortalRunMulti()
--> ProcessQuery()
--> ExecutorStart(queryDesc, 0);
--> standard_ExecutorStart()
--> estate = CreateExecutorState(); // 创建EState
--> estate->es_output_cid = GetCurrentCommandId(true); // 获得cid,后面更新的时候要用
--> InitPlan(queryDesc, eflags);
--> ExecInitNode(plan, estate, eflags);
--> ExecInitModifyTable() // 初始化ModifyTableState
--> ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
--> standard_ExecutorRun()
--> ExecutePlan()
--> ExecProcNode(planstate); // 一次一元组 火山模型
--> node->ExecProcNode(node);
--> ExecProcNodeFirst(PlanState *node)
--> node->ExecProcNode(node);
--> ExecModifyTable(PlanState *pstate)
--> ExecUpdate()
--> table_tuple_update(Relation rel, ......)
--> rel->rd_tableam->tuple_update()
--> heapam_tuple_update(Relation relation, ......)
--> heap_update(relation, otid, tuple, cid, ......)
--> ExecutorFinish(queryDesc);
--> ExecutorEnd(queryDesc);
PortalDrop(portal, false);
关键数据结构:
// ModifyTableState information
typedef struct ModifyTableState
{
PlanState ps;
CmdType operation;
bool canSetTag;
bool mt_done;
PlanState **mt_plans;
int mt_nplans;
int mt_whichplan;
TupleTableSlot **mt_scans;
ResultRelInfo *resultRelInfo;
ResultRelInfo *rootResultRelInfo;
List **mt_arowmarks;
EPQState mt_epqstate;
bool fireBSTriggers;
TupleTableSlot *mt_root_tuple_slot;
struct PartitionTupleRouting *mt_partition_tuple_routing;
struct TransitionCaptureState *mt_transition_capture;
struct TransitionCaptureState *mt_oc_transition_capture;
TupleConversionMap **mt_per_subplan_tupconv_maps;
} ModifyTableState;
核心执行算子实现:
static TupleTableSlot *ExecModifyTable(PlanState *pstate)
{
ModifyTableState *node = castNode(ModifyTableState, pstate);
PartitionTupleRouting *proute = node->mt_partition_tuple_routing;
EState *estate = node->ps.state;
CmdType operation = node->operation;
ResultRelInfo *saved_resultRelInfo;
ResultRelInfo *resultRelInfo;
PlanState *subplanstate;
JunkFilter *junkfilter;
TupleTableSlot *slot;
TupleTableSlot *planSlot;
ItemPointer tupleid;
ItemPointerData tuple_ctid;
HeapTupleData oldtupdata;
HeapTuple oldtuple;
CHECK_FOR_INTERRUPTS();
if (estate->es_epq_active != NULL)
elog(ERROR, "ModifyTable should not be called during EvalPlanQual");
if (node->mt_done)
return NULL;
if (node->fireBSTriggers)
{
fireBSTriggers(node);
node->fireBSTriggers = false;
}
resultRelInfo = node->resultRelInfo + node->mt_whichplan;
subplanstate = node->mt_plans[node->mt_whichplan];
junkfilter = resultRelInfo->ri_junkFilter;
saved_resultRelInfo = estate->es_result_relation_info;
estate->es_result_relation_info = resultRelInfo;
for (;;)
{
ResetPerTupleExprContext(estate);
if (pstate->ps_ExprContext)
ResetExprContext(pstate->ps_ExprContext);
planSlot = ExecProcNode(subplanstate);
if (TupIsNull(planSlot))
{
node->mt_whichplan++; // 分区表的update,每个分区分布对应一个subplan,当执行完一个分区再执行下一个分区
if (node->mt_whichplan < node->mt_nplans)
{
resultRelInfo++;
subplanstate = node->mt_plans[node->mt_whichplan];
junkfilter = resultRelInfo->ri_junkFilter;
estate->es_result_relation_info = resultRelInfo;
EvalPlanQualSetPlan(&node->mt_epqstate, subplanstate->plan, node->mt_arowmarks[node->mt_whichplan]);
if (node->mt_transition_capture != NULL) {
node->mt_transition_capture->tcs_map = tupconv_map_for_subplan(node, node->mt_whichplan);
}
if (node->mt_oc_transition_capture != NULL) {
node->mt_oc_transition_capture->tcs_map = tupconv_map_for_subplan(node, node->mt_whichplan);
}
continue;
}
else
break;
}
if (node->mt_scans[node->mt_whichplan]->tts_ops != planSlot->tts_ops) {
ExecCopySlot(node->mt_scans[node->mt_whichplan], planSlot);
planSlot = node->mt_scans[node->mt_whichplan];
}
if (resultRelInfo->ri_usesFdwDirectModify)
{
Assert(resultRelInfo->ri_projectReturning);
slot = ExecProcessReturning(resultRelInfo->ri_projectReturning, RelationGetRelid(resultRelInfo->ri_RelationDesc), NULL, planSlot);
estate->es_result_relation_info = saved_resultRelInfo;
return slot;
}
EvalPlanQualSetSlot(&node->mt_epqstate, planSlot);
slot = planSlot;
tupleid = NULL;
oldtuple = NULL;
if (junkfilter != NULL)
{
if (operation == CMD_UPDATE || operation == CMD_DELETE)
{
char relkind;
Datum datum;
bool isNull;
relkind = resultRelInfo->ri_RelationDesc->rd_rel->relkind;
if (relkind == RELKIND_RELATION || relkind == RELKIND_MATVIEW)
{
datum = ExecGetJunkAttribute(slot,junkfilter->jf_junkAttNo,&isNull);
if (isNull)
elog(ERROR, "ctid is NULL");
tupleid = (ItemPointer) DatumGetPointer(datum);
tuple_ctid = *tupleid;
tupleid = &tuple_ctid;
}
else if (AttributeNumberIsValid(junkfilter->jf_junkAttNo))
{
datum = ExecGetJunkAttribute(slot,junkfilter->jf_junkAttNo,&isNull);
if (isNull)
elog(ERROR, "wholerow is NULL");
oldtupdata.t_data = DatumGetHeapTupleHeader(datum);
oldtupdata.t_len = HeapTupleHeaderGetDatumLength(oldtupdata.t_data);
ItemPointerSetInvalid(&(oldtupdata.t_self));
oldtupdata.t_tableOid = (relkind == RELKIND_VIEW) ? InvalidOid : RelationGetRelid(resultRelInfo->ri_RelationDesc);
oldtuple = &oldtupdata;
}
else
Assert(relkind == RELKIND_FOREIGN_TABLE);
}
if (operation != CMD_DELETE)
slot = ExecFilterJunk(junkfilter, slot);
}
switch (operation)
{
case CMD_INSERT:
if (proute)
slot = ExecPrepareTupleRouting(node, estate, proute, resultRelInfo, slot);
slot = ExecInsert(node, slot, planSlot, NULL, estate->es_result_relation_info, estate, node->canSetTag);
if (proute)
estate->es_result_relation_info = resultRelInfo;
break;
case CMD_UPDATE:
slot = ExecUpdate(node, tupleid, oldtuple, slot, planSlot,
&node->mt_epqstate, estate, node->canSetTag);
break;
case CMD_DELETE:
slot = ExecDelete(node, tupleid, oldtuple, planSlot,
&node->mt_epqstate, estate,
true, node->canSetTag, false , NULL, NULL);
break;
default:
elog(ERROR, "unknown operation");
break;
}
if (slot) {
estate->es_result_relation_info = saved_resultRelInfo;
return slot;
}
}
estate->es_result_relation_info = saved_resultRelInfo;
fireASTriggers(node);
node->mt_done = true;
return NULL;
}
我们看一下具体执行Update的实现
```c++
static TupleTableSlot *
ExecUpdate(ModifyTableState *mtstate,
ItemPointer tupleid,
HeapTuple oldtuple,
TupleTableSlot *slot,
TupleTableSlot *planSlot,
EPQState *epqstate,
EState *estate,
bool canSetTag)
{
ResultRelInfo *resultRelInfo;
Relation resultRelationDesc;
TM_Result result;
TM_FailureData tmfd;
List *recheckIndexes = NIL;
TupleConversionMap *saved_tcs_map = NULL;
if (IsBootstrapProcessingMode())
elog(ERROR, "cannot UPDATE during bootstrap");
ExecMaterializeSlot(slot);
resultRelInfo = estate->es_result_relation_info;
resultRelationDesc = resultRelInfo->ri_RelationDesc;
if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_update_before_row)
{
if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo, tupleid, oldtuple, slot))
return NULL;
}
if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_update_instead_row)
{
if (!ExecIRUpdateTriggers(estate, resultRelInfo, oldtuple, slot))
return NULL;
}
else if (resultRelInfo->ri_FdwRoutine)
{
if (resultRelationDesc->rd_att->constr && resultRelationDesc->rd_att->constr->has_generated_stored)
ExecComputeStoredGenerated(estate, slot, CMD_UPDATE);
slot = resultRelInfo->ri_FdwRoutine->ExecForeignUpdate(estate, resultRelInfo, slot, planSlot);
if (slot == NULL)
return NULL;
slot->tts_tableOid = RelationGetRelid(resultRelationDesc);
}
else
{
LockTupleMode lockmode;
bool partition_constraint_failed;
bool update_indexes;
slot->tts_tableOid = RelationGetRelid(resultRelationDesc);
if (resultRelationDesc->rd_att->constr && resultRelationDesc->rd_att->constr->has_generated_stored)
ExecComputeStoredGenerated(estate, slot, CMD_UPDATE);
lreplace:;
ExecMaterializeSlot(slot);
partition_constraint_failed = resultRelInfo->ri_PartitionCheck && !ExecPartitionCheck(resultRelInfo, slot, estate, false);
if (!partition_constraint_failed && resultRelInfo->ri_WithCheckOptions != NIL)
{
ExecWithCheckOptions(WCO_RLS_UPDATE_CHECK, resultRelInfo, slot, estate);
}
if (partition_constraint_failed)
{
bool tuple_deleted;
TupleTableSlot *ret_slot;
TupleTableSlot *orig_slot = slot;
TupleTableSlot *epqslot = NULL;
PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing;
int map_index;
TupleConversionMap *tupconv_map;
if (((ModifyTable *) mtstate->ps.plan)->onConflictAction == ONCONFLICT_UPDATE)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("invalid ON UPDATE specification"),
errdetail("The result tuple would appear in a different partition than the original tuple.")));
if (proute == NULL)
ExecPartitionCheckEmitError(resultRelInfo, slot, estate);
ExecDelete(mtstate, tupleid, oldtuple, planSlot, epqstate, estate, false, false , true , &tuple_deleted, &epqslot);
if (!tuple_deleted)
{
if (TupIsNull(epqslot))
return NULL;
else
{
slot = ExecFilterJunk(resultRelInfo->ri_junkFilter, epqslot);
goto lreplace;
}
}
if (mtstate->mt_transition_capture)
saved_tcs_map = mtstate->mt_transition_capture->tcs_map;
map_index = resultRelInfo - mtstate->resultRelInfo;
Assert(map_index >= 0 && map_index < mtstate->mt_nplans);
tupconv_map = tupconv_map_for_subplan(mtstate, map_index);
if (tupconv_map != NULL)
slot = execute_attr_map_slot(tupconv_map->attrMap, slot, mtstate->mt_root_tuple_slot);
Assert(mtstate->rootResultRelInfo != NULL);
slot = ExecPrepareTupleRouting(mtstate, estate, proute, mtstate->rootResultRelInfo, slot);
ret_slot = ExecInsert(mtstate, slot, planSlot,
orig_slot, resultRelInfo,
estate, canSetTag);
estate->es_result_relation_info = resultRelInfo;
if (mtstate->mt_transition_capture)
{
mtstate->mt_transition_capture->tcs_original_insert_tuple = NULL;
mtstate->mt_transition_capture->tcs_map = saved_tcs_map;
}
return ret_slot;
}
if (resultRelationDesc->rd_att->constr)
ExecConstraints(resultRelInfo, slot, estate);
result = table_tuple_update(resultRelationDesc, tupleid, slot, estate->es_output_cid,
estate->es_snapshot, estate->es_crosscheck_snapshot, true ,&tmfd, &lockmode, &update_indexes);
switch (result)
{
case TM_SelfModified:
if (tmfd.cmax != estate->es_output_cid)
ereport(ERROR,(errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION),
errmsg("tuple to be updated was already modified by an operation triggered by the current command"),
errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows.")));
return NULL;
case TM_Ok:
break;
case TM_Updated:
{
TupleTableSlot *inputslot;
TupleTableSlot *epqslot;
if (IsolationUsesXactSnapshot())
ereport(ERROR,(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),errmsg("could not serialize access due to concurrent update")));
inputslot = EvalPlanQualSlot(epqstate, resultRelationDesc,resultRelInfo->ri_RangeTableIndex);
result = table_tuple_lock(resultRelationDesc, tupleid, estate->es_snapshot,inputslot, estate->es_output_cid, lockmode, LockWaitBlock, TUPLE_LOCK_FLAG_FIND_LAST_VERSION,&tmfd);
switch (result)
{
case TM_Ok:
Assert(tmfd.traversed);
epqslot = EvalPlanQual(epqstate, resultRelationDesc, resultRelInfo->ri_RangeTableIndex, inputslot);
if (TupIsNull(epqslot))
return NULL;
slot = ExecFilterJunk(resultRelInfo->ri_junkFilter, epqslot);
goto lreplace;
case TM_Deleted:
return NULL;
case TM_SelfModified:
if (tmfd.cmax != estate->es_output_cid)
ereport(ERROR,(errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION),
errmsg("tuple to be updated was already modified by an operation triggered by the current command"),errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows.")));
return NULL;
default:
elog(ERROR, "unexpected table_tuple_lock status: %u", result);
return NULL;
}
}
break;
case TM_Deleted:
if (IsolationUsesXactSnapshot())
ereport(ERROR,(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),errmsg("could not serialize access due to concurrent delete")));
return NULL;
default:
elog(ERROR, "unrecognized table_tuple_update status: %u",
result);
return NULL;
}
if (resultRelInfo->ri_NumIndices > 0 && update_indexes)
recheckIndexes = ExecInsertIndexTuples(slot, estate, false, NULL, NIL);
}
if (canSetTag)
(estate->es_processed)++;
ExecARUpdateTriggers(estate, resultRelInfo, tupleid, oldtuple, slot,recheckIndexes,mtstate->operation == CMD_INSERT ?mtstate->mt_oc_transition_capture : mtstate->mt_transition_capture);
list_free(recheckIndexes);
if (resultRelInfo->ri_WithCheckOptions != NIL)
ExecWithCheckOptions(WCO_VIEW_CHECK, resultRelInfo, slot, estate);
if (resultRelInfo->ri_projectReturning)
return ExecProcessReturning(resultRelInfo->ri_projectReturning,RelationGetRelid(resultRelationDesc),slot, planSlot);
return NULL;
}
再往下就是涉及到存储引擎的部分了,我们重点看一下其对外的接口输入参数。重点是这4个参数:
relation - table to be modified (caller must hold suitable lock) (要更新的那个表)
otid - TID of old tuple to be replaced (要更新的元组ID,对应的是老的元组,更新后相当于是插入一条新元组,老元组的tid值要更新为新的tid值)
slot - newly constructed tuple data to store (新元组的值)
cid - update command ID (used for visibility test, and stored into cmax/cmin if successful) (cid值,事务相关) 执行器层面的更新算子是建立在存储引擎提供的底层table_tuple_update接口之上的。是我们编写ExecUpdate以及ExecModifyTable的基础。
static inline TM_Result
table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot, CommandId cid,
Snapshot snapshot, Snapshot crosscheck, bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode, bool *update_indexes)
{
return rel->rd_tableam->tuple_update(rel, otid, slot, cid,
snapshot, crosscheck, wait, tmfd, lockmode, update_indexes);
}
事务
这一块主要是要理解PG中update语句并不是原地更新元组,而是插入一条新元组。因为PG实现MVCC与Mysql,Oracle的实现方式有所不同,并不是通过undo日志实现的,相当于把undo日志记录到了原有的表中,并不是单独存放在一个地方。具体的不再细述,内容太多了,以后再分析事务部分。
好了,内容很多,分析源码的时候,涉及到的知识点以及逻辑是非常多的,我们最好每次分析只抓一个主干,不然每个都分析,最后就会比较乱。就先分析到这里吧。
总结
到此这篇关于Postgres中UPDATE更新语句源码分析的文章就介绍到这了,更多相关Postgres中UPDATE源码内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!