这篇文章主要介绍“PostgreSQL中exec_simple_query函数的实现逻辑是什么”,在日常操作中,相信很多人在PostgreSQL中exec_simple_query函数的实现逻辑是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”PostgreSQL中exec_simple_query函数的实现逻辑是什么”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
一、源码解读
exec_simple_query函数,顾名思义,执行简单“查询”(包括INSERT/UPDATE/DELETE等语句)
static void
exec_simple_query(const char *query_string)
{
CommandDest dest = whereToSendOutput;//输出到哪里的定义
MemoryContext oldcontext;//存储原内存上下文
List *parsetree_list;//分析树列表
ListCell *parsetree_item;//分析树中的ITEM
bool save_log_statement_stats = log_statement_stats;//是否保存统计信息,false
bool was_logged = false;//Log?
bool use_implicit_block;//是否使用隐式事务块
char msec_str[32];
debug_query_string = query_string;
pgstat_report_activity(STATE_RUNNING, query_string);//统计信息
TRACE_POSTGRESQL_QUERY_START(query_string);
if (save_log_statement_stats)
ResetUsage();
start_xact_command();//启动事务
drop_unnamed_stmt();//清除匿名语句
oldcontext = MemoryContextSwitchTo(MessageContext);//切换内存上下文
parsetree_list = pg_parse_query(query_string);//解析输入的查询语句,获得分析树List(元素是RawStmt nodes)
if (check_log_statement(parsetree_list))//日志记录
{
ereport(LOG,
(errmsg("statement: %s", query_string),
errhidestmt(true),
errdetail_execute(parsetree_list)));
was_logged = true;
}
MemoryContextSwitchTo(oldcontext);//切换回原内存上下文
use_implicit_block = (list_length(parsetree_list) > 1);//如果分析树条目>1,使用隐式事务块(多条SQL语句在同一个事务中)
foreach(parsetree_item, parsetree_list)//对分析树中的每一个条目进行处理
{
RawStmt *parsetree = lfirst_node(RawStmt, parsetree_item);//分析树List中的元素为RawStmt指针类型
bool snapshot_set = false;//是否设置快照?
const char *commandTag;//命令标识
char completionTag[COMPLETION_TAG_BUFSIZE];//完成标记,如INSERT 0 1之类的字符串
List *querytree_list,//查询树List
*plantree_list;//执行计划List
Portal portal;//“门户”变量
DestReceiver *receiver;//目标接收端
int16 format;//
commandTag = CreateCommandTag(parsetree->stmt);//创建命令标记,插入数据则为INSERT
set_ps_display(commandTag, false);
BeginCommand(commandTag, dest);//do Nothing!
if (IsAbortedTransactionBlockState() &&
!IsTransactionExitStmt(parsetree->stmt))
ereport(ERROR,
(errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
errmsg("current transaction is aborted, "
"commands ignored until end of transaction block"),
errdetail_abort()));
start_xact_command();//确认在事务中
if (use_implicit_block)
BeginImplicitTransactionBlock();//隐式事务,进入事务块
CHECK_FOR_INTERRUPTS();
if (analyze_requires_snapshot(parsetree))//是否需要快照进行分析?增删改查均需要
{
PushActiveSnapshot(GetTransactionSnapshot());//
snapshot_set = true;
}
oldcontext = MemoryContextSwitchTo(MessageContext);//切换内存上下文
querytree_list = pg_analyze_and_rewrite(parsetree, query_string,
NULL, 0, NULL);//根据分析树获得查询树,返回List(元素为Query)
plantree_list = pg_plan_queries(querytree_list,
CURSOR_OPT_PARALLEL_OK, NULL);//根据查询树获取计划树,返回List(元素为PlannedStmt)
if (snapshot_set)
PopActiveSnapshot();//
CHECK_FOR_INTERRUPTS();
portal = CreatePortal("", true, true);//创建匿名Portal变量
portal->visible = false;
PortalDefineQuery(portal,
NULL,
query_string,
commandTag,
plantree_list,
NULL);//给Portal变量赋值
PortalStart(portal, NULL, 0, InvalidSnapshot);//为PortalRun作准备
format = 0;
if (IsA(parsetree->stmt, FetchStmt))
{
FetchStmt *stmt = (FetchStmt *) parsetree->stmt;
if (!stmt->ismove)
{
Portal fportal = GetPortalByName(stmt->portalname);
if (PortalIsValid(fportal) &&
(fportal->cursorOptions & CURSOR_OPT_BINARY))
format = 1;
}
}
PortalSetResultFormat(portal, 1, &format);//设置结果返回的格式,默认为TEXT
receiver = CreateDestReceiver(dest);//创建目标接收器(如使用psql则为:printtup DestReceiver)
if (dest == DestRemote)
SetRemoteDestReceiverParams(receiver, portal);
MemoryContextSwitchTo(oldcontext);//切换回原内存上下文
(void) PortalRun(portal,
FETCH_ALL,
true,
true,
receiver,
receiver,
completionTag);//执行
receiver->rDestroy(receiver);//执行完毕,销毁接收器
PortalDrop(portal, false);//清除Portal中的资源&Portal
if (lnext(parsetree_item) == NULL)//所有语句已执行完毕
{
if (use_implicit_block)
EndImplicitTransactionBlock();//结束事务
finish_xact_command();//结束事务
}
else if (IsA(parsetree->stmt, TransactionStmt))//事务语句?BEGIN/COMMIT/ABORT...
{
finish_xact_command();
}
else
{
CommandCounterIncrement();//命令+1(对应Tuple中的cid)
}
EndCommand(completionTag, dest);//命令Done
}
//所有语句结束
finish_xact_command();
if (!parsetree_list)
NullCommand(dest);
switch (check_log_duration(msec_str, was_logged))
{
case 1:
ereport(LOG,
(errmsg("duration: %s ms", msec_str),
errhidestmt(true)));
break;
case 2:
ereport(LOG,
(errmsg("duration: %s ms statement: %s",
msec_str, query_string),
errhidestmt(true),
errdetail_execute(parsetree_list)));
break;
}
if (save_log_statement_stats)
ShowUsage("QUERY STATISTICS");
TRACE_POSTGRESQL_QUERY_DONE(query_string);
debug_query_string = NULL;
}
二、基础信息
exec_simple_query函数使用的数据结构、宏定义以及依赖的函数等。
数据结构/宏定义
*1、whereToSendOutput *
CommandDest whereToSendOutput = DestDebug;
typedef enum
{
DestNone,
DestDebug,
DestRemote,
DestRemoteExecute,
DestRemoteSimple,
DestSPI,
DestTuplestore,
DestIntoRel,
DestCopyOut,
DestSQLFunction,
DestTransientRel,
DestTupleQueue
} CommandDest;
2、RawStmt
typedef struct RawStmt
{
NodeTag type;
Node *stmt;
int stmt_location;
int stmt_len;
} RawStmt;
3、Query
//在解析查询语句时再深入解析
typedef struct Query
{
NodeTag type;
CmdType commandType;
QuerySource querySource;
uint64 queryId;
bool canSetTag;
Node *utilityStmt;
int resultRelation;
bool hasAggs;
bool hasWindowFuncs;
bool hasTargetSRFs;
bool hasSubLinks;
bool hasDistinctOn;
bool hasRecursive;
bool hasModifyingCTE;
bool hasForUpdate;
bool hasRowSecurity;
List *cteList;
List *rtable;
FromExpr *jointree;
List *targetList;
OverridingKind override;
OnConflictExpr *onConflict;
List *returningList;
List *groupClause;
List *groupingSets;
Node *havingQual;
List *windowClause;
List *distinctClause;
List *sortClause;
Node *limitOffset;
Node *limitCount;
List *rowMarks;
Node *setOperations;
List *constraintDeps;
List *withCheckOptions;
int stmt_location;
int stmt_len;
} Query;
4、ParseState
typedef struct ParseState ParseState;
typedef Node *(*PreParseColumnRefHook) (ParseState *pstate, ColumnRef *cref);
typedef Node *(*PostParseColumnRefHook) (ParseState *pstate, ColumnRef *cref, Node *var);
typedef Node *(*ParseParamRefHook) (ParseState *pstate, ParamRef *pref);
typedef Node *(*CoerceParamHook) (ParseState *pstate, Param *param,
Oid targetTypeId, int32 targetTypeMod,
int location);
struct ParseState
{
struct ParseState *parentParseState;
const char *p_sourcetext;
List *p_rtable;
List *p_joinexprs;
List *p_joinlist;
List *p_namespace;
bool p_lateral_active;
List *p_ctenamespace;
List *p_future_ctes;
CommonTableExpr *p_parent_cte;
Relation p_target_relation;
RangeTblEntry *p_target_rangetblentry;
bool p_is_insert;
List *p_windowdefs;
ParseExprKind p_expr_kind;
int p_next_resno;
List *p_multiassign_exprs;
List *p_locking_clause;
bool p_locked_from_parent;
bool p_resolve_unknowns;
QueryEnvironment *p_queryEnv;
bool p_hasAggs;
bool p_hasWindowFuncs;
bool p_hasTargetSRFs;
bool p_hasSubLinks;
bool p_hasModifyingCTE;
Node *p_last_srf;
PreParseColumnRefHook p_pre_columnref_hook;
PostParseColumnRefHook p_post_columnref_hook;
ParseParamRefHook p_paramref_hook;
CoerceParamHook p_coerce_param_hook;
void *p_ref_hook_state;
};
5、RangeTblEntry
typedef enum RTEKind
{
RTE_RELATION,
RTE_SUBQUERY,
RTE_JOIN,
RTE_FUNCTION,
RTE_TABLEFUNC,
RTE_VALUES,
RTE_CTE,
RTE_NAMEDTUPLESTORE
} RTEKind;
typedef struct RangeTblEntry
{
NodeTag type;
RTEKind rtekind;
Oid relid;
char relkind;
struct TableSampleClause *tablesample;
Query *subquery;
bool security_barrier;
JoinType jointype;
List *joinaliasvars;
List *functions;
bool funcordinality;
TableFunc *tablefunc;
List *values_lists;
char *ctename;
Index ctelevelsup;
bool self_reference;
List *coltypes;
List *coltypmods;
List *colcollations;
char *enrname;
double enrtuples;
Alias *alias;
Alias *eref;
bool lateral;
bool inh;
bool inFromCl;
AclMode requiredPerms;
Oid checkAsUser;
Bitmapset *selectedCols;
Bitmapset *insertedCols;
Bitmapset *updatedCols;
List *securityQuals;
} RangeTblEntry;
6、TargetEntry
typedef struct TargetEntry
{
Expr xpr;
Expr *expr;
AttrNumber resno;
char *resname;
Index ressortgroupref;
Oid resorigtbl;
AttrNumber resorigcol;
bool resjunk;
} TargetEntry;
7、全局变量定义
bool log_parser_stats = false;
bool log_planner_stats = false;
bool log_executor_stats = false;
bool log_statement_stats = false;
依赖的函数
1、start_xact_command
static void
start_xact_command(void)
{
if (!xact_started)
{
StartTransactionCommand();//开启事务
xact_started = true;
}
enable_statement_timeout();
}
void
StartTransactionCommand(void)
{
TransactionState s = CurrentTransactionState;
switch (s->blockState)
{
case TBLOCK_DEFAULT:
StartTransaction();
s->blockState = TBLOCK_STARTED;
break;
case TBLOCK_INPROGRESS:
case TBLOCK_IMPLICIT_INPROGRESS:
case TBLOCK_SUBINPROGRESS:
break;
case TBLOCK_ABORT:
case TBLOCK_SUBABORT:
break;
case TBLOCK_STARTED:
case TBLOCK_BEGIN:
case TBLOCK_PARALLEL_INPROGRESS:
case TBLOCK_SUBBEGIN:
case TBLOCK_END:
case TBLOCK_SUBRELEASE:
case TBLOCK_SUBCOMMIT:
case TBLOCK_ABORT_END:
case TBLOCK_SUBABORT_END:
case TBLOCK_ABORT_PENDING:
case TBLOCK_SUBABORT_PENDING:
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
case TBLOCK_PREPARE:
elog(ERROR, "StartTransactionCommand: unexpected state %s",
BlockStateAsString(s->blockState));
break;
}
Assert(CurTransactionContext != NULL);
MemoryContextSwitchTo(CurTransactionContext);//内存上下文切换至当前事务上下文
}
2、drop_unnamed_stmt
static void
drop_unnamed_stmt(void)
{
if (unnamed_stmt_psrc)
{
CachedPlanSource *psrc = unnamed_stmt_psrc;
unnamed_stmt_psrc = NULL;
DropCachedPlan(psrc);
}
}
static CachedPlanSource *unnamed_stmt_psrc = NULL;
3、pg_parse_query
//执行语句解析,返回RawStmt nodes(List)
List *
pg_parse_query(const char *query_string)
{
List *raw_parsetree_list;
TRACE_POSTGRESQL_QUERY_PARSE_START(query_string);
if (log_parser_stats)
ResetUsage();
raw_parsetree_list = raw_parser(query_string);
if (log_parser_stats)
ShowUsage("PARSER STATISTICS");
#ifdef COPY_PARSE_PLAN_TREES
{
List *new_list = copyObject(raw_parsetree_list);
if (!equal(new_list, raw_parsetree_list))
elog(WARNING, "copyObject() failed to produce an equal raw parse tree");
else
raw_parsetree_list = new_list;
}
#endif
TRACE_POSTGRESQL_QUERY_PARSE_DONE(query_string);
return raw_parsetree_list;
}
4、raw_parser
//执行词法和语法分析,返回raw parse trees(List,其中的元素是RawStmt)
List *
raw_parser(const char *str)
{
core_yyscan_t yyscanner;
base_yy_extra_type yyextra;
int yyresult;
yyscanner = scanner_init(str, &yyextra.core_yy_extra,
ScanKeywords, NumScanKeywords);
yyextra.have_lookahead = false;
parser_init(&yyextra);
yyresult = base_yyparse(yyscanner);
scanner_finish(yyscanner);
if (yyresult)
return NIL;
return yyextra.parsetree;
}
5、CreateCommandTag
//创建命令Tag
//基本上,所有的PG命令类型都可以在这里找到
const char *
CreateCommandTag(Node *parsetree)
{
const char *tag;
switch (nodeTag(parsetree))
{
case T_RawStmt:
tag = CreateCommandTag(((RawStmt *) parsetree)->stmt);
break;
case T_InsertStmt:
tag = "INSERT";
break;
case T_DeleteStmt:
tag = "DELETE";
break;
case T_UpdateStmt:
tag = "UPDATE";
break;
case T_SelectStmt:
tag = "SELECT";
break;
case T_TransactionStmt:
{
TransactionStmt *stmt = (TransactionStmt *) parsetree;
switch (stmt->kind)
{
case TRANS_STMT_BEGIN:
tag = "BEGIN";
break;
case TRANS_STMT_START:
tag = "START TRANSACTION";
break;
case TRANS_STMT_COMMIT:
tag = "COMMIT";
break;
case TRANS_STMT_ROLLBACK:
case TRANS_STMT_ROLLBACK_TO:
tag = "ROLLBACK";
break;
case TRANS_STMT_SAVEPOINT:
tag = "SAVEPOINT";
break;
case TRANS_STMT_RELEASE:
tag = "RELEASE";
break;
case TRANS_STMT_PREPARE:
tag = "PREPARE TRANSACTION";
break;
case TRANS_STMT_COMMIT_PREPARED:
tag = "COMMIT PREPARED";
break;
case TRANS_STMT_ROLLBACK_PREPARED:
tag = "ROLLBACK PREPARED";
break;
default:
tag = "???";
break;
}
}
break;
case T_DeclareCursorStmt:
tag = "DECLARE CURSOR";
break;
case T_ClosePortalStmt:
{
ClosePortalStmt *stmt = (ClosePortalStmt *) parsetree;
if (stmt->portalname == NULL)
tag = "CLOSE CURSOR ALL";
else
tag = "CLOSE CURSOR";
}
break;
case T_FetchStmt:
{
FetchStmt *stmt = (FetchStmt *) parsetree;
tag = (stmt->ismove) ? "MOVE" : "FETCH";
}
break;
case T_CreateDomainStmt:
tag = "CREATE DOMAIN";
break;
case T_CreateSchemaStmt:
tag = "CREATE SCHEMA";
break;
case T_CreateStmt:
tag = "CREATE TABLE";
break;
case T_CreateTableSpaceStmt:
tag = "CREATE TABLESPACE";
break;
case T_DropTableSpaceStmt:
tag = "DROP TABLESPACE";
break;
case T_AlterTableSpaceOptionsStmt:
tag = "ALTER TABLESPACE";
break;
case T_CreateExtensionStmt:
tag = "CREATE EXTENSION";
break;
case T_AlterExtensionStmt:
tag = "ALTER EXTENSION";
break;
case T_AlterExtensionContentsStmt:
tag = "ALTER EXTENSION";
break;
case T_CreateFdwStmt:
tag = "CREATE FOREIGN DATA WRAPPER";
break;
case T_AlterFdwStmt:
tag = "ALTER FOREIGN DATA WRAPPER";
break;
case T_CreateForeignServerStmt:
tag = "CREATE SERVER";
break;
case T_AlterForeignServerStmt:
tag = "ALTER SERVER";
break;
case T_CreateUserMappingStmt:
tag = "CREATE USER MAPPING";
break;
case T_AlterUserMappingStmt:
tag = "ALTER USER MAPPING";
break;
case T_DropUserMappingStmt:
tag = "DROP USER MAPPING";
break;
case T_CreateForeignTableStmt:
tag = "CREATE FOREIGN TABLE";
break;
case T_ImportForeignSchemaStmt:
tag = "IMPORT FOREIGN SCHEMA";
break;
case T_DropStmt:
switch (((DropStmt *) parsetree)->removeType)
{
case OBJECT_TABLE:
tag = "DROP TABLE";
break;
case OBJECT_SEQUENCE:
tag = "DROP SEQUENCE";
break;
case OBJECT_VIEW:
tag = "DROP VIEW";
break;
case OBJECT_MATVIEW:
tag = "DROP MATERIALIZED VIEW";
break;
case OBJECT_INDEX:
tag = "DROP INDEX";
break;
case OBJECT_TYPE:
tag = "DROP TYPE";
break;
case OBJECT_DOMAIN:
tag = "DROP DOMAIN";
break;
case OBJECT_COLLATION:
tag = "DROP COLLATION";
break;
case OBJECT_CONVERSION:
tag = "DROP CONVERSION";
break;
case OBJECT_SCHEMA:
tag = "DROP SCHEMA";
break;
case OBJECT_TSPARSER:
tag = "DROP TEXT SEARCH PARSER";
break;
case OBJECT_TSDICTIONARY:
tag = "DROP TEXT SEARCH DICTIONARY";
break;
case OBJECT_TSTEMPLATE:
tag = "DROP TEXT SEARCH TEMPLATE";
break;
case OBJECT_TSCONFIGURATION:
tag = "DROP TEXT SEARCH CONFIGURATION";
break;
case OBJECT_FOREIGN_TABLE:
tag = "DROP FOREIGN TABLE";
break;
case OBJECT_EXTENSION:
tag = "DROP EXTENSION";
break;
case OBJECT_FUNCTION:
tag = "DROP FUNCTION";
break;
case OBJECT_PROCEDURE:
tag = "DROP PROCEDURE";
break;
case OBJECT_ROUTINE:
tag = "DROP ROUTINE";
break;
case OBJECT_AGGREGATE:
tag = "DROP AGGREGATE";
break;
case OBJECT_OPERATOR:
tag = "DROP OPERATOR";
break;
case OBJECT_LANGUAGE:
tag = "DROP LANGUAGE";
break;
case OBJECT_CAST:
tag = "DROP CAST";
break;
case OBJECT_TRIGGER:
tag = "DROP TRIGGER";
break;
case OBJECT_EVENT_TRIGGER:
tag = "DROP EVENT TRIGGER";
break;
case OBJECT_RULE:
tag = "DROP RULE";
break;
case OBJECT_FDW:
tag = "DROP FOREIGN DATA WRAPPER";
break;
case OBJECT_FOREIGN_SERVER:
tag = "DROP SERVER";
break;
case OBJECT_OPCLASS:
tag = "DROP OPERATOR CLASS";
break;
case OBJECT_OPFAMILY:
tag = "DROP OPERATOR FAMILY";
break;
case OBJECT_POLICY:
tag = "DROP POLICY";
break;
case OBJECT_TRANSFORM:
tag = "DROP TRANSFORM";
break;
case OBJECT_ACCESS_METHOD:
tag = "DROP ACCESS METHOD";
break;
case OBJECT_PUBLICATION:
tag = "DROP PUBLICATION";
break;
case OBJECT_STATISTIC_EXT:
tag = "DROP STATISTICS";
break;
default:
tag = "???";
}
break;
case T_TruncateStmt:
tag = "TRUNCATE TABLE";
break;
case T_CommentStmt:
tag = "COMMENT";
break;
case T_SecLabelStmt:
tag = "SECURITY LABEL";
break;
case T_CopyStmt:
tag = "COPY";
break;
case T_RenameStmt:
tag = AlterObjectTypeCommandTag(((RenameStmt *) parsetree)->renameType);
break;
case T_AlterObjectDependsStmt:
tag = AlterObjectTypeCommandTag(((AlterObjectDependsStmt *) parsetree)->objectType);
break;
case T_AlterObjectSchemaStmt:
tag = AlterObjectTypeCommandTag(((AlterObjectSchemaStmt *) parsetree)->objectType);
break;
case T_AlterOwnerStmt:
tag = AlterObjectTypeCommandTag(((AlterOwnerStmt *) parsetree)->objectType);
break;
case T_AlterTableMoveAllStmt:
tag = AlterObjectTypeCommandTag(((AlterTableMoveAllStmt *) parsetree)->objtype);
break;
case T_AlterTableStmt:
tag = AlterObjectTypeCommandTag(((AlterTableStmt *) parsetree)->relkind);
break;
case T_AlterDomainStmt:
tag = "ALTER DOMAIN";
break;
case T_AlterFunctionStmt:
switch (((AlterFunctionStmt *) parsetree)->objtype)
{
case OBJECT_FUNCTION:
tag = "ALTER FUNCTION";
break;
case OBJECT_PROCEDURE:
tag = "ALTER PROCEDURE";
break;
case OBJECT_ROUTINE:
tag = "ALTER ROUTINE";
break;
default:
tag = "???";
}
break;
case T_GrantStmt:
{
GrantStmt *stmt = (GrantStmt *) parsetree;
tag = (stmt->is_grant) ? "GRANT" : "REVOKE";
}
break;
case T_GrantRoleStmt:
{
GrantRoleStmt *stmt = (GrantRoleStmt *) parsetree;
tag = (stmt->is_grant) ? "GRANT ROLE" : "REVOKE ROLE";
}
break;
case T_AlterDefaultPrivilegesStmt:
tag = "ALTER DEFAULT PRIVILEGES";
break;
case T_DefineStmt:
switch (((DefineStmt *) parsetree)->kind)
{
case OBJECT_AGGREGATE:
tag = "CREATE AGGREGATE";
break;
case OBJECT_OPERATOR:
tag = "CREATE OPERATOR";
break;
case OBJECT_TYPE:
tag = "CREATE TYPE";
break;
case OBJECT_TSPARSER:
tag = "CREATE TEXT SEARCH PARSER";
break;
case OBJECT_TSDICTIONARY:
tag = "CREATE TEXT SEARCH DICTIONARY";
break;
case OBJECT_TSTEMPLATE:
tag = "CREATE TEXT SEARCH TEMPLATE";
break;
case OBJECT_TSCONFIGURATION:
tag = "CREATE TEXT SEARCH CONFIGURATION";
break;
case OBJECT_COLLATION:
tag = "CREATE COLLATION";
break;
case OBJECT_ACCESS_METHOD:
tag = "CREATE ACCESS METHOD";
break;
default:
tag = "???";
}
break;
case T_CompositeTypeStmt:
tag = "CREATE TYPE";
break;
case T_CreateEnumStmt:
tag = "CREATE TYPE";
break;
case T_CreateRangeStmt:
tag = "CREATE TYPE";
break;
case T_AlterEnumStmt:
tag = "ALTER TYPE";
break;
case T_ViewStmt:
tag = "CREATE VIEW";
break;
case T_CreateFunctionStmt:
if (((CreateFunctionStmt *) parsetree)->is_procedure)
tag = "CREATE PROCEDURE";
else
tag = "CREATE FUNCTION";
break;
case T_IndexStmt:
tag = "CREATE INDEX";
break;
case T_RuleStmt:
tag = "CREATE RULE";
break;
case T_CreateSeqStmt:
tag = "CREATE SEQUENCE";
break;
case T_AlterSeqStmt:
tag = "ALTER SEQUENCE";
break;
case T_DoStmt:
tag = "DO";
break;
case T_CreatedbStmt:
tag = "CREATE DATABASE";
break;
case T_AlterDatabaseStmt:
tag = "ALTER DATABASE";
break;
case T_AlterDatabaseSetStmt:
tag = "ALTER DATABASE";
break;
case T_DropdbStmt:
tag = "DROP DATABASE";
break;
case T_NotifyStmt:
tag = "NOTIFY";
break;
case T_ListenStmt:
tag = "LISTEN";
break;
case T_UnlistenStmt:
tag = "UNLISTEN";
break;
case T_LoadStmt:
tag = "LOAD";
break;
case T_CallStmt:
tag = "CALL";
break;
case T_ClusterStmt:
tag = "CLUSTER";
break;
case T_VacuumStmt:
if (((VacuumStmt *) parsetree)->options & VACOPT_VACUUM)
tag = "VACUUM";
else
tag = "ANALYZE";
break;
case T_ExplainStmt:
tag = "EXPLAIN";
break;
case T_CreateTableAsStmt:
switch (((CreateTableAsStmt *) parsetree)->relkind)
{
case OBJECT_TABLE:
if (((CreateTableAsStmt *) parsetree)->is_select_into)
tag = "SELECT INTO";
else
tag = "CREATE TABLE AS";
break;
case OBJECT_MATVIEW:
tag = "CREATE MATERIALIZED VIEW";
break;
default:
tag = "???";
}
break;
case T_RefreshMatViewStmt:
tag = "REFRESH MATERIALIZED VIEW";
break;
case T_AlterSystemStmt:
tag = "ALTER SYSTEM";
break;
case T_VariableSetStmt:
switch (((VariableSetStmt *) parsetree)->kind)
{
case VAR_SET_VALUE:
case VAR_SET_CURRENT:
case VAR_SET_DEFAULT:
case VAR_SET_MULTI:
tag = "SET";
break;
case VAR_RESET:
case VAR_RESET_ALL:
tag = "RESET";
break;
default:
tag = "???";
}
break;
case T_VariableShowStmt:
tag = "SHOW";
break;
case T_DiscardStmt:
switch (((DiscardStmt *) parsetree)->target)
{
case DISCARD_ALL:
tag = "DISCARD ALL";
break;
case DISCARD_PLANS:
tag = "DISCARD PLANS";
break;
case DISCARD_TEMP:
tag = "DISCARD TEMP";
break;
case DISCARD_SEQUENCES:
tag = "DISCARD SEQUENCES";
break;
default:
tag = "???";
}
break;
case T_CreateTransformStmt:
tag = "CREATE TRANSFORM";
break;
case T_CreateTrigStmt:
tag = "CREATE TRIGGER";
break;
case T_CreateEventTrigStmt:
tag = "CREATE EVENT TRIGGER";
break;
case T_AlterEventTrigStmt:
tag = "ALTER EVENT TRIGGER";
break;
case T_CreatePLangStmt:
tag = "CREATE LANGUAGE";
break;
case T_CreateRoleStmt:
tag = "CREATE ROLE";
break;
case T_AlterRoleStmt:
tag = "ALTER ROLE";
break;
case T_AlterRoleSetStmt:
tag = "ALTER ROLE";
break;
case T_DropRoleStmt:
tag = "DROP ROLE";
break;
case T_DropOwnedStmt:
tag = "DROP OWNED";
break;
case T_ReassignOwnedStmt:
tag = "REASSIGN OWNED";
break;
case T_LockStmt:
tag = "LOCK TABLE";
break;
case T_ConstraintsSetStmt:
tag = "SET CONSTRAINTS";
break;
case T_CheckPointStmt:
tag = "CHECKPOINT";
break;
case T_ReindexStmt:
tag = "REINDEX";
break;
case T_CreateConversionStmt:
tag = "CREATE CONVERSION";
break;
case T_CreateCastStmt:
tag = "CREATE CAST";
break;
case T_CreateOpClassStmt:
tag = "CREATE OPERATOR CLASS";
break;
case T_CreateOpFamilyStmt:
tag = "CREATE OPERATOR FAMILY";
break;
case T_AlterOpFamilyStmt:
tag = "ALTER OPERATOR FAMILY";
break;
case T_AlterOperatorStmt:
tag = "ALTER OPERATOR";
break;
case T_AlterTSDictionaryStmt:
tag = "ALTER TEXT SEARCH DICTIONARY";
break;
case T_AlterTSConfigurationStmt:
tag = "ALTER TEXT SEARCH CONFIGURATION";
break;
case T_CreatePolicyStmt:
tag = "CREATE POLICY";
break;
case T_AlterPolicyStmt:
tag = "ALTER POLICY";
break;
case T_CreateAmStmt:
tag = "CREATE ACCESS METHOD";
break;
case T_CreatePublicationStmt:
tag = "CREATE PUBLICATION";
break;
case T_AlterPublicationStmt:
tag = "ALTER PUBLICATION";
break;
case T_CreateSubscriptionStmt:
tag = "CREATE SUBSCRIPTION";
break;
case T_AlterSubscriptionStmt:
tag = "ALTER SUBSCRIPTION";
break;
case T_DropSubscriptionStmt:
tag = "DROP SUBSCRIPTION";
break;
case T_AlterCollationStmt:
tag = "ALTER COLLATION";
break;
case T_PrepareStmt:
tag = "PREPARE";
break;
case T_ExecuteStmt:
tag = "EXECUTE";
break;
case T_CreateStatsStmt:
tag = "CREATE STATISTICS";
break;
case T_DeallocateStmt:
{
DeallocateStmt *stmt = (DeallocateStmt *) parsetree;
if (stmt->name == NULL)
tag = "DEALLOCATE ALL";
else
tag = "DEALLOCATE";
}
break;
case T_PlannedStmt:
{
PlannedStmt *stmt = (PlannedStmt *) parsetree;
switch (stmt->commandType)
{
case CMD_SELECT:
if (stmt->rowMarks != NIL)
{
switch (((PlanRowMark *) linitial(stmt->rowMarks))->strength)
{
case LCS_FORKEYSHARE:
tag = "SELECT FOR KEY SHARE";
break;
case LCS_FORSHARE:
tag = "SELECT FOR SHARE";
break;
case LCS_FORNOKEYUPDATE:
tag = "SELECT FOR NO KEY UPDATE";
break;
case LCS_FORUPDATE:
tag = "SELECT FOR UPDATE";
break;
default:
tag = "SELECT";
break;
}
}
else
tag = "SELECT";
break;
case CMD_UPDATE:
tag = "UPDATE";
break;
case CMD_INSERT:
tag = "INSERT";
break;
case CMD_DELETE:
tag = "DELETE";
break;
case CMD_UTILITY:
tag = CreateCommandTag(stmt->utilityStmt);
break;
default:
elog(WARNING, "unrecognized commandType: %d",
(int) stmt->commandType);
tag = "???";
break;
}
}
break;
case T_Query:
{
Query *stmt = (Query *) parsetree;
switch (stmt->commandType)
{
case CMD_SELECT:
if (stmt->rowMarks != NIL)
{
switch (((RowMarkClause *) linitial(stmt->rowMarks))->strength)
{
case LCS_FORKEYSHARE:
tag = "SELECT FOR KEY SHARE";
break;
case LCS_FORSHARE:
tag = "SELECT FOR SHARE";
break;
case LCS_FORNOKEYUPDATE:
tag = "SELECT FOR NO KEY UPDATE";
break;
case LCS_FORUPDATE:
tag = "SELECT FOR UPDATE";
break;
default:
tag = "???";
break;
}
}
else
tag = "SELECT";
break;
case CMD_UPDATE:
tag = "UPDATE";
break;
case CMD_INSERT:
tag = "INSERT";
break;
case CMD_DELETE:
tag = "DELETE";
break;
case CMD_UTILITY:
tag = CreateCommandTag(stmt->utilityStmt);
break;
default:
elog(WARNING, "unrecognized commandType: %d",
(int) stmt->commandType);
tag = "???";
break;
}
}
break;
default:
elog(WARNING, "unrecognized node type: %d",
(int) nodeTag(parsetree));
tag = "???";
break;
}
return tag;
}
6、BeginCommand
void
BeginCommand(const char *commandTag, CommandDest dest)
{
}
7、analyze_requires_snapshot
//是否需要快照?
//增删改查均需要
bool
analyze_requires_snapshot(RawStmt *parseTree)
{
bool result;
switch (nodeTag(parseTree->stmt))
{
case T_InsertStmt:
case T_DeleteStmt:
case T_UpdateStmt:
case T_SelectStmt:
result = true;
break;
case T_DeclareCursorStmt:
case T_ExplainStmt:
case T_CreateTableAsStmt:
result = true;
break;
default:
result = false;
break;
}
return result;
}
8、pg_analyze_and_rewrite
List *
pg_analyze_and_rewrite(RawStmt *parsetree, const char *query_string,
Oid *paramTypes, int numParams,
QueryEnvironment *queryEnv)
{
Query *query;
List *querytree_list;
TRACE_POSTGRESQL_QUERY_REWRITE_START(query_string);
if (log_parser_stats)
ResetUsage();
query = parse_analyze(parsetree, query_string, paramTypes, numParams,
queryEnv);//解析&分析
if (log_parser_stats)
ShowUsage("PARSE ANALYSIS STATISTICS");
querytree_list = pg_rewrite_query(query);//查询重写
TRACE_POSTGRESQL_QUERY_REWRITE_DONE(query_string);
return querytree_list;
}
Query *
parse_analyze(RawStmt *parseTree, const char *sourceText,
Oid *paramTypes, int numParams,
QueryEnvironment *queryEnv)
{
ParseState *pstate = make_parsestate(NULL);
Query *query;
Assert(sourceText != NULL);
pstate->p_sourcetext = sourceText;
if (numParams > 0)
parse_fixed_parameters(pstate, paramTypes, numParams);
pstate->p_queryEnv = queryEnv;
query = transformTopLevelStmt(pstate, parseTree);
if (post_parse_analyze_hook)
(*post_parse_analyze_hook) (pstate, query);
free_parsestate(pstate);
return query;
}
ParseState *
make_parsestate(ParseState *parentParseState)
{
ParseState *pstate;
pstate = palloc0(sizeof(ParseState));
pstate->parentParseState = parentParseState;
pstate->p_next_resno = 1;
pstate->p_resolve_unknowns = true;
if (parentParseState)
{
pstate->p_sourcetext = parentParseState->p_sourcetext;
pstate->p_pre_columnref_hook = parentParseState->p_pre_columnref_hook;
pstate->p_post_columnref_hook = parentParseState->p_post_columnref_hook;
pstate->p_paramref_hook = parentParseState->p_paramref_hook;
pstate->p_coerce_param_hook = parentParseState->p_coerce_param_hook;
pstate->p_ref_hook_state = parentParseState->p_ref_hook_state;
pstate->p_queryEnv = parentParseState->p_queryEnv;
}
return pstate;
}
Query *
transformTopLevelStmt(ParseState *pstate, RawStmt *parseTree)
{
Query *result;
result = transformOptionalSelectInto(pstate, parseTree->stmt);
result->stmt_location = parseTree->stmt_location;
result->stmt_len = parseTree->stmt_len;
return result;
}
static Query *
transformOptionalSelectInto(ParseState *pstate, Node *parseTree)
{
if (IsA(parseTree, SelectStmt))
{
SelectStmt *stmt = (SelectStmt *) parseTree;
while (stmt && stmt->op != SETOP_NONE)
stmt = stmt->larg;
Assert(stmt && IsA(stmt, SelectStmt) &&stmt->larg == NULL);
if (stmt->intoClause)
{
CreateTableAsStmt *ctas = makeNode(CreateTableAsStmt);
ctas->query = parseTree;
ctas->into = stmt->intoClause;
ctas->relkind = OBJECT_TABLE;
ctas->is_select_into = true;
stmt->intoClause = NULL;
parseTree = (Node *) ctas;
}
}
return transformStmt(pstate, parseTree);
}
Query *
transformStmt(ParseState *pstate, Node *parseTree)
{
Query *result;
#ifdef RAW_EXPRESSION_COVERAGE_TEST
switch (nodeTag(parseTree))
{
case T_SelectStmt:
case T_InsertStmt:
case T_UpdateStmt:
case T_DeleteStmt:
(void) test_raw_expression_coverage(parseTree, NULL);
break;
default:
break;
}
#endif
switch (nodeTag(parseTree))
{
case T_InsertStmt:
result = transformInsertStmt(pstate, (InsertStmt *) parseTree);
break;
case T_DeleteStmt:
result = transformDeleteStmt(pstate, (DeleteStmt *) parseTree);
break;
case T_UpdateStmt:
result = transformUpdateStmt(pstate, (UpdateStmt *) parseTree);
break;
case T_SelectStmt:
{
SelectStmt *n = (SelectStmt *) parseTree;
if (n->valuesLists)
result = transformValuesClause(pstate, n);
else if (n->op == SETOP_NONE)
result = transformSelectStmt(pstate, n);
else
result = transformSetOperationStmt(pstate, n);
}
break;
case T_DeclareCursorStmt:
result = transformDeclareCursorStmt(pstate,
(DeclareCursorStmt *) parseTree);
break;
case T_ExplainStmt:
result = transformExplainStmt(pstate,
(ExplainStmt *) parseTree);
break;
case T_CreateTableAsStmt:
result = transformCreateTableAsStmt(pstate,
(CreateTableAsStmt *) parseTree);
break;
case T_CallStmt:
result = transformCallStmt(pstate,
(CallStmt *) parseTree);
break;
default:
result = makeNode(Query);
result->commandType = CMD_UTILITY;
result->utilityStmt = (Node *) parseTree;
break;
}
result->querySource = QSRC_ORIGINAL;
result->canSetTag = true;
return result;
}
static Query *
transformInsertStmt(ParseState *pstate, InsertStmt *stmt)
{
Query *qry = makeNode(Query);
SelectStmt *selectStmt = (SelectStmt *) stmt->selectStmt;
List *exprList = NIL;
bool isGeneralSelect;
List *sub_rtable;
List *sub_namespace;
List *icolumns;
List *attrnos;
RangeTblEntry *rte;
RangeTblRef *rtr;
ListCell *icols;
ListCell *attnos;
ListCell *lc;
bool isOnConflictUpdate;
AclMode targetPerms;
Assert(pstate->p_ctenamespace == NIL);
qry->commandType = CMD_INSERT;
pstate->p_is_insert = true;
if (stmt->withClause)
{
qry->hasRecursive = stmt->withClause->recursive;
qry->cteList = transformWithClause(pstate, stmt->withClause);
qry->hasModifyingCTE = pstate->p_hasModifyingCTE;
}
qry->override = stmt->override;
isOnConflictUpdate = (stmt->onConflictClause &&
stmt->onConflictClause->action == ONCONFLICT_UPDATE);
isGeneralSelect = (selectStmt && (selectStmt->valuesLists == NIL ||
selectStmt->sortClause != NIL ||
selectStmt->limitOffset != NULL ||
selectStmt->limitCount != NULL ||
selectStmt->lockingClause != NIL ||
selectStmt->withClause != NULL));
if (isGeneralSelect)
{
sub_rtable = pstate->p_rtable;
pstate->p_rtable = NIL;
sub_namespace = pstate->p_namespace;
pstate->p_namespace = NIL;
}
else
{
sub_rtable = NIL;
sub_namespace = NIL;
}
targetPerms = ACL_INSERT;
if (isOnConflictUpdate)
targetPerms |= ACL_UPDATE;
qry->resultRelation = setTargetTable(pstate, stmt->relation,
false, false, targetPerms);
icolumns = checkInsertTargets(pstate, stmt->cols, &attrnos);
Assert(list_length(icolumns) == list_length(attrnos));
if (selectStmt == NULL)
{
exprList = NIL;
}
else if (isGeneralSelect)
{
ParseState *sub_pstate = make_parsestate(pstate);
Query *selectQuery;
sub_pstate->p_rtable = sub_rtable;
sub_pstate->p_joinexprs = NIL;
sub_pstate->p_namespace = sub_namespace;
sub_pstate->p_resolve_unknowns = false;
selectQuery = transformStmt(sub_pstate, stmt->selectStmt);
free_parsestate(sub_pstate);
if (!IsA(selectQuery, Query) ||
selectQuery->commandType != CMD_SELECT)
elog(ERROR, "unexpected non-SELECT command in INSERT ... SELECT");
rte = addRangeTableEntryForSubquery(pstate,
selectQuery,
makeAlias("*SELECT*", NIL),
false,
false);
rtr = makeNode(RangeTblRef);
rtr->rtindex = list_length(pstate->p_rtable);
Assert(rte == rt_fetch(rtr->rtindex, pstate->p_rtable));
pstate->p_joinlist = lappend(pstate->p_joinlist, rtr);
exprList = NIL;
foreach(lc, selectQuery->targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
Expr *expr;
if (tle->resjunk)
continue;
if (tle->expr &&
(IsA(tle->expr, Const) ||IsA(tle->expr, Param)) &&
exprType((Node *) tle->expr) == UNKNOWNOID)
expr = tle->expr;
else
{
Var *var = makeVarFromTargetEntry(rtr->rtindex, tle);
var->location = exprLocation((Node *) tle->expr);
expr = (Expr *) var;
}
exprList = lappend(exprList, expr);
}
exprList = transformInsertRow(pstate, exprList,
stmt->cols,
icolumns, attrnos,
false);
}
else if (list_length(selectStmt->valuesLists) > 1)
{
List *exprsLists = NIL;
List *coltypes = NIL;
List *coltypmods = NIL;
List *colcollations = NIL;
int sublist_length = -1;
bool lateral = false;
Assert(selectStmt->intoClause == NULL);
foreach(lc, selectStmt->valuesLists)
{
List *sublist = (List *) lfirst(lc);
sublist = transformExpressionList(pstate, sublist,
EXPR_KIND_VALUES, true);
if (sublist_length < 0)
{
sublist_length = list_length(sublist);
}
else if (sublist_length != list_length(sublist))
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("VALUES lists must all be the same length"),
parser_errposition(pstate,
exprLocation((Node *) sublist))));
}
sublist = transformInsertRow(pstate, sublist,
stmt->cols,
icolumns, attrnos,
true);
assign_list_collations(pstate, sublist);
exprsLists = lappend(exprsLists, sublist);
}
foreach(lc, (List *) linitial(exprsLists))
{
Node *val = (Node *) lfirst(lc);
coltypes = lappend_oid(coltypes, exprType(val));
coltypmods = lappend_int(coltypmods, exprTypmod(val));
colcollations = lappend_oid(colcollations, InvalidOid);
}
if (list_length(pstate->p_rtable) != 1 &&
contain_vars_of_level((Node *) exprsLists, 0))
lateral = true;
rte = addRangeTableEntryForValues(pstate, exprsLists,
coltypes, coltypmods, colcollations,
NULL, lateral, true);
rtr = makeNode(RangeTblRef);
rtr->rtindex = list_length(pstate->p_rtable);
Assert(rte == rt_fetch(rtr->rtindex, pstate->p_rtable));
pstate->p_joinlist = lappend(pstate->p_joinlist, rtr);
expandRTE(rte, rtr->rtindex, 0, -1, false, NULL, &exprList);
exprList = transformInsertRow(pstate, exprList,
stmt->cols,
icolumns, attrnos,
false);
}
else
{
List *valuesLists = selectStmt->valuesLists;
Assert(list_length(valuesLists) == 1);
Assert(selectStmt->intoClause == NULL);
exprList = transformExpressionList(pstate,
(List *) linitial(valuesLists),
EXPR_KIND_VALUES_SINGLE,
true);
exprList = transformInsertRow(pstate, exprList,
stmt->cols,
icolumns, attrnos,
false);
}
rte = pstate->p_target_rangetblentry;
qry->targetList = NIL;
icols = list_head(icolumns);
attnos = list_head(attrnos);
foreach(lc, exprList)
{
Expr *expr = (Expr *) lfirst(lc);
ResTarget *col;
AttrNumber attr_num;
TargetEntry *tle;
col = lfirst_node(ResTarget, icols);
attr_num = (AttrNumber) lfirst_int(attnos);
tle = makeTargetEntry(expr,
attr_num,
col->name,
false);
qry->targetList = lappend(qry->targetList, tle);
rte->insertedCols = bms_add_member(rte->insertedCols,
attr_num - FirstLowInvalidHeapAttributeNumber);
icols = lnext(icols);
attnos = lnext(attnos);
}
if (stmt->onConflictClause)
qry->onConflict = transformOnConflictClause(pstate,
stmt->onConflictClause);
if (stmt->returningList)
{
pstate->p_namespace = NIL;
addRTEtoQuery(pstate, pstate->p_target_rangetblentry,
false, true, true);
qry->returningList = transformReturningList(pstate,
stmt->returningList);
}
qry->rtable = pstate->p_rtable;
qry->jointree = makeFromExpr(pstate->p_joinlist, NULL);
qry->hasTargetSRFs = pstate->p_hasTargetSRFs;
qry->hasSubLinks = pstate->p_hasSubLinks;
assign_query_collations(pstate, qry);
return qry;
}
9、pg_plan_queries
List *
pg_plan_queries(List *querytrees, int cursorOptions, ParamListInfo boundParams)
{
List *stmt_list = NIL;
ListCell *query_list;
foreach(query_list, querytrees)
{
Query *query = lfirst_node(Query, query_list);
PlannedStmt *stmt;
if (query->commandType == CMD_UTILITY)
{
stmt = makeNode(PlannedStmt);
stmt->commandType = CMD_UTILITY;
stmt->canSetTag = query->canSetTag;
stmt->utilityStmt = query->utilityStmt;
stmt->stmt_location = query->stmt_location;
stmt->stmt_len = query->stmt_len;
}
else
{
stmt = pg_plan_query(query, cursorOptions, boundParams);
}
stmt_list = lappend(stmt_list, stmt);
}
return stmt_list;
}
PlannedStmt *
pg_plan_query(Query *querytree, int cursorOptions, ParamListInfo boundParams)
{
PlannedStmt *plan;
if (querytree->commandType == CMD_UTILITY)
return NULL;
Assert(ActiveSnapshotSet());
TRACE_POSTGRESQL_QUERY_PLAN_START();
if (log_planner_stats)
ResetUsage();
plan = planner(querytree, cursorOptions, boundParams);
if (log_planner_stats)
ShowUsage("PLANNER STATISTICS");
#ifdef COPY_PARSE_PLAN_TREES
{
PlannedStmt *new_plan = copyObject(plan);
#ifdef NOT_USED
if (!equal(new_plan, plan))
elog(WARNING, "copyObject() failed to produce an equal plan tree");
else
#endif
plan = new_plan;
}
#endif
if (Debug_print_plan)
elog_node_display(LOG, "plan", plan, Debug_pretty_print);
TRACE_POSTGRESQL_QUERY_PLAN_DONE();
return plan;
}
PlannedStmt *
planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
{
PlannedStmt *result;
if (planner_hook)
result = (*planner_hook) (parse, cursorOptions, boundParams);
else
result = standard_planner(parse, cursorOptions, boundParams);
return result;
}
10、CreatePortal
Portal
CreatePortal(const char *name, bool allowDup, bool dupSilent)
{
Portal portal;
AssertArg(PointerIsValid(name));
portal = GetPortalByName(name);
if (PortalIsValid(portal))
{
if (!allowDup)
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_CURSOR),
errmsg("cursor \"%s\" already exists", name)));
if (!dupSilent)
ereport(WARNING,
(errcode(ERRCODE_DUPLICATE_CURSOR),
errmsg("closing existing cursor \"%s\"",
name)));
PortalDrop(portal, false);
}
portal = (Portal) MemoryContextAllocZero(TopPortalContext, sizeof *portal);
portal->portalContext = AllocSetContextCreate(TopPortalContext,
"PortalContext",
ALLOCSET_SMALL_SIZES);
portal->resowner = ResourceOwnerCreate(CurTransactionResourceOwner,
"Portal");
portal->status = PORTAL_NEW;
portal->cleanup = PortalCleanup;
portal->createSubid = GetCurrentSubTransactionId();
portal->activeSubid = portal->createSubid;
portal->strategy = PORTAL_MULTI_QUERY;
portal->cursorOptions = CURSOR_OPT_NO_SCROLL;
portal->atStart = true;
portal->atEnd = true;
portal->visible = true;
portal->creation_time = GetCurrentStatementStartTimestamp();
PortalHashTableInsert(portal, name);
MemoryContextSetIdentifier(portal->portalContext, portal->name);
return portal;
}
11、PortalDefineQuery
void
PortalDefineQuery(Portal portal,
const char *prepStmtName,
const char *sourceText,
const char *commandTag,
List *stmts,
CachedPlan *cplan)
{
AssertArg(PortalIsValid(portal));
AssertState(portal->status == PORTAL_NEW);
AssertArg(sourceText != NULL);
AssertArg(commandTag != NULL || stmts == NIL);
portal->prepStmtName = prepStmtName;
portal->sourceText = sourceText;
portal->commandTag = commandTag;
portal->stmts = stmts;
portal->cplan = cplan;
portal->status = PORTAL_DEFINED;
}
12、PortalStart
void
PortalStart(Portal portal, ParamListInfo params,
int eflags, Snapshot snapshot)
{
Portal saveActivePortal;
ResourceOwner saveResourceOwner;
MemoryContext savePortalContext;
MemoryContext oldContext;
QueryDesc *queryDesc;
int myeflags;
AssertArg(PortalIsValid(portal));
AssertState(portal->status == PORTAL_DEFINED);
saveActivePortal = ActivePortal;
saveResourceOwner = CurrentResourceOwner;
savePortalContext = PortalContext;
PG_TRY();
{
ActivePortal = portal;
if (portal->resowner)
CurrentResourceOwner = portal->resowner;
PortalContext = portal->portalContext;
oldContext = MemoryContextSwitchTo(PortalContext);
portal->portalParams = params;
portal->strategy = ChoosePortalStrategy(portal->stmts);
switch (portal->strategy)
{
case PORTAL_ONE_SELECT:
if (snapshot)
PushActiveSnapshot(snapshot);
else
PushActiveSnapshot(GetTransactionSnapshot());
queryDesc = CreateQueryDesc(linitial_node(PlannedStmt, portal->stmts),
portal->sourceText,
GetActiveSnapshot(),
InvalidSnapshot,
None_Receiver,
params,
portal->queryEnv,
0);
if (portal->cursorOptions & CURSOR_OPT_SCROLL)
myeflags = eflags | EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD;
else
myeflags = eflags;
ExecutorStart(queryDesc, myeflags);
portal->queryDesc = queryDesc;
portal->tupDesc = queryDesc->tupDesc;
portal->atStart = true;
portal->atEnd = false;
portal->portalPos = 0;
PopActiveSnapshot();
break;
case PORTAL_ONE_RETURNING:
case PORTAL_ONE_MOD_WITH:
{
PlannedStmt *pstmt;
pstmt = PortalGetPrimaryStmt(portal);
portal->tupDesc =
ExecCleanTypeFromTL(pstmt->planTree->targetlist,
false);
}
portal->atStart = true;
portal->atEnd = false;
portal->portalPos = 0;
break;
case PORTAL_UTIL_SELECT:
{
PlannedStmt *pstmt = PortalGetPrimaryStmt(portal);
Assert(pstmt->commandType == CMD_UTILITY);
portal->tupDesc = UtilityTupleDescriptor(pstmt->utilityStmt);
}
portal->atStart = true;
portal->atEnd = false;
portal->portalPos = 0;
break;
case PORTAL_MULTI_QUERY:
portal->tupDesc = NULL;
break;
}
}
PG_CATCH();
{
MarkPortalFailed(portal);
ActivePortal = saveActivePortal;
CurrentResourceOwner = saveResourceOwner;
PortalContext = savePortalContext;
PG_RE_THROW();
}
PG_END_TRY();
MemoryContextSwitchTo(oldContext);
ActivePortal = saveActivePortal;
CurrentResourceOwner = saveResourceOwner;
PortalContext = savePortalContext;
portal->status = PORTAL_READY;
}
13、PortalSetResultFormat
void
PortalSetResultFormat(Portal portal, int nFormats, int16 *formats)
{
int natts;
int i;
if (portal->tupDesc == NULL)
return;
natts = portal->tupDesc->natts;
portal->formats = (int16 *)
MemoryContextAlloc(portal->portalContext,
natts * sizeof(int16));
if (nFormats > 1)
{
if (nFormats != natts)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("bind message has %d result formats but query has %d columns",
nFormats, natts)));
memcpy(portal->formats, formats, natts * sizeof(int16));
}
else if (nFormats > 0)
{
int16 format1 = formats[0];
for (i = 0; i < natts; i++)
portal->formats[i] = format1;
}
else
{
for (i = 0; i < natts; i++)
portal->formats[i] = 0;
}
}
14、CreateDestReceiver
DestReceiver *
CreateDestReceiver(CommandDest dest)
{
switch (dest)
{
case DestRemote:
case DestRemoteExecute:
return printtup_create_DR(dest);
case DestRemoteSimple:
return &printsimpleDR;
case DestNone:
return &donothingDR;
case DestDebug:
return &debugtupDR;
case DestSPI:
return &spi_printtupDR;
case DestTuplestore:
return CreateTuplestoreDestReceiver();
case DestIntoRel:
return CreateIntoRelDestReceiver(NULL);
case DestCopyOut:
return CreateCopyDestReceiver();
case DestSQLFunction:
return CreateSQLFunctionDestReceiver();
case DestTransientRel:
return CreateTransientRelDestReceiver(InvalidOid);
case DestTupleQueue:
return CreateTupleQueueDestReceiver(NULL);
}
return &donothingDR;
}
15、printtup_create_DR
DestReceiver *
printtup_create_DR(CommandDest dest)
{
DR_printtup *self = (DR_printtup *) palloc0(sizeof(DR_printtup));
self->pub.receiveSlot = printtup;
self->pub.rStartup = printtup_startup;
self->pub.rShutdown = printtup_shutdown;
self->pub.rDestroy = printtup_destroy;
self->pub.mydest = dest;
self->sendDescrip = (dest == DestRemote);
self->attrinfo = NULL;
self->nattrs = 0;
self->myinfo = NULL;
self->tmpcontext = NULL;
return (DestReceiver *) self;
}
16、PortalDrop
void
PortalDrop(Portal portal, bool isTopCommit)
{
AssertArg(PortalIsValid(portal));
if (portal->portalPinned)
ereport(ERROR,
(errcode(ERRCODE_INVALID_CURSOR_STATE),
errmsg("cannot drop pinned portal \"%s\"", portal->name)));
if (portal->status == PORTAL_ACTIVE)
ereport(ERROR,
(errcode(ERRCODE_INVALID_CURSOR_STATE),
errmsg("cannot drop active portal \"%s\"", portal->name)));
if (PointerIsValid(portal->cleanup))
{
portal->cleanup(portal);
portal->cleanup = NULL;
}
PortalHashTableDelete(portal);
PortalReleaseCachedPlan(portal);
if (portal->holdSnapshot)
{
if (portal->resowner)
UnregisterSnapshotFromOwner(portal->holdSnapshot,
portal->resowner);
portal->holdSnapshot = NULL;
}
if (portal->resowner &&
(!isTopCommit || portal->status == PORTAL_FAILED))
{
bool isCommit = (portal->status != PORTAL_FAILED);
ResourceOwnerRelease(portal->resowner,
RESOURCE_RELEASE_BEFORE_LOCKS,
isCommit, false);
ResourceOwnerRelease(portal->resowner,
RESOURCE_RELEASE_LOCKS,
isCommit, false);
ResourceOwnerRelease(portal->resowner,
RESOURCE_RELEASE_AFTER_LOCKS,
isCommit, false);
ResourceOwnerDelete(portal->resowner);
}
portal->resowner = NULL;
if (portal->holdStore)
{
MemoryContext oldcontext;
oldcontext = MemoryContextSwitchTo(portal->holdContext);
tuplestore_end(portal->holdStore);
MemoryContextSwitchTo(oldcontext);
portal->holdStore = NULL;
}
if (portal->holdContext)
MemoryContextDelete(portal->holdContext);
MemoryContextDelete(portal->portalContext);
pfree(portal);
}
17、EndImplicitTransactionBlock
void
EndImplicitTransactionBlock(void)
{
TransactionState s = CurrentTransactionState;
if (s->blockState == TBLOCK_IMPLICIT_INPROGRESS)
s->blockState = TBLOCK_STARTED;
}
18、finish_xact_command
static void
finish_xact_command(void)
{
disable_statement_timeout();
if (xact_started)
{
CommitTransactionCommand();
#ifdef MEMORY_CONTEXT_CHECKING
MemoryContextCheck(TopMemoryContext);
#endif
#ifdef SHOW_MEMORY_STATS
MemoryContextStats(TopMemoryContext);
#endif
xact_started = false;
}
}
19、CommandCounterIncrement
void
CommandCounterIncrement(void)
{
if (currentCommandIdUsed)
{
if (IsInParallelMode() || IsParallelWorker())
elog(ERROR, "cannot start commands during a parallel operation");
currentCommandId += 1;
if (currentCommandId == InvalidCommandId)
{
currentCommandId -= 1;
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("cannot have more than 2^32-2 commands in a transaction")));
}
currentCommandIdUsed = false;
SnapshotSetCommandId(currentCommandId);
AtCCI_LocalCache();
}
}
20、EndCommand
void
EndCommand(const char *commandTag, CommandDest dest)
{
switch (dest)
{
case DestRemote:
case DestRemoteExecute:
case DestRemoteSimple:
pq_putmessage('C', commandTag, strlen(commandTag) + 1);
break;
case DestNone:
case DestDebug:
case DestSPI:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
case DestSQLFunction:
case DestTransientRel:
case DestTupleQueue:
break;
}
}
三、跟踪分析
插入测试数据:
testdb=# -- 获取pid
testdb=# select pg_backend_pid();
pg_backend_pid
----------------
1893
(1 row)
testdb=# -- 插入1行
testdb=# insert into t_insert values(22,'exec_simple_query','exec_simple_query','exec_simple_query');
(挂起)
启动gdb,跟踪调试:
[root@localhost ~]# gdb -p 1893
GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-100.el7
Copyright (C) 2013 Free Software Foundation, Inc.
...
(gdb) b exec_simple_query
Breakpoint 1 at 0x84cad8: file postgres.c, line 893.
(gdb) c
Continuing.
Breakpoint 1, exec_simple_query (query_string=0x1508ef0 "insert into t_insert values(22,'exec_simple_query','exec_simple_query','exec_simple_query');") at postgres.c:893
893 CommandDest dest = whereToSendOutput;
#输入参数
#query_string
(gdb) p query_string
$1 = 0x1508ef0 "insert into t_insert values(22,'exec_simple_query','exec_simple_query','exec_simple_query');"
#单步调试
938 oldcontext = MemoryContextSwitchTo(MessageContext);
(gdb) p *MessageContext
$2 = {type = T_AllocSetContext, isReset = false, allowInCritSection = false, methods = 0xb8c720 <AllocSetMethods>, parent = 0x1503ba0, firstchild = 0x0, prevchild = 0x157c3c0, nextchild = 0x15956d0,
name = 0xb4e87c "MessageContext", ident = 0x0, reset_cbs = 0x0}
(gdb) n
944 parsetree_list = pg_parse_query(query_string);
(gdb) p oldcontext
$3 = (MemoryContext) 0x15a8320
(gdb) p *oldcontext
$4 = {type = T_AllocSetContext, isReset = true, allowInCritSection = false, methods = 0xb8c720 <AllocSetMethods>, parent = 0x1503ba0, firstchild = 0x0, prevchild = 0x0, nextchild = 0x157c3c0,
name = 0xa1b4ff "TopTransactionContext", ident = 0x0, reset_cbs = 0x0}
(gdb) step
pg_parse_query (query_string=0x1508ef0 "insert into t_insert values(22,'exec_simple_query','exec_simple_query','exec_simple_query');") at postgres.c:615
615 if (log_parser_stats)
(gdb)
618 raw_parsetree_list = raw_parser(query_string);
#进入raw_parser
(gdb) step
raw_parser (str=0x1508ef0 "insert into t_insert values(22,'exec_simple_query','exec_simple_query','exec_simple_query');") at parser.c:43
43 yyscanner = scanner_init(str, &yyextra.core_yy_extra,
(gdb)
...
61 return yyextra.parsetree;
(gdb) p yyextra
$8 = {core_yy_extra = {scanbuf = 0x1509820 "insert into t_insert values(22,'exec_simple_query','exec_simple_query','exec_simple_query');", scanbuflen = 92, keywords = 0xbb8d40 <ScanKeywords>,
num_keywords = 440, backslash_quote = 2, escape_string_warning = true, standard_conforming_strings = true, literalbuf = 0x1509300 "exec_simple_query", literallen = 17, literalalloc = 1024,
xcdepth = 1087033144, dolqstart = 0x0, utf16_first_part = 16777215, warn_on_first_escape = true, saw_non_ascii = false}, have_lookahead = false, lookahead_token = 32765, lookahead_yylval = {
ival = 10027008, str = 0x300990000 <Address 0x300990000 out of bounds>, keyword = 0x300990000 <Address 0x300990000 out of bounds>}, lookahead_yylloc = 2015867616,
lookahead_end = 0xa1b62b "StartTransaction", lookahead_hold_char = 16 '\020', parsetree = 0x1509d88}
(gdb) p *(yyextra.parsetree)
$10 = {type = T_List, length = 1, head = 0x1509d68, tail = 0x1509d68}
#解析树中的内容
(gdb) p *((RawStmt*)(yyextra.parsetree->head->data.ptr_value))
$25 = {type = T_RawStmt, stmt = 0x1509ce8, stmt_location = 0, stmt_len = 91}
(gdb) p *(((RawStmt*)(yyextra.parsetree->head->data.ptr_value))->stmt)
$27 = {type = T_InsertStmt}
#跳出子函数,重新进入主函数
(gdb)
exec_simple_query (query_string=0x1508ef0 "insert into t_insert values(22,'exec_simple_query','exec_simple_query','exec_simple_query');") at postgres.c:947
947 if (check_log_statement(parsetree_list))
...
#解析树只有一个元素
(gdb) n
974 foreach(parsetree_item, parsetree_list)
(gdb) p list_length(parsetree_list)
$30 = 1
(gdb) n
976 RawStmt *parsetree = lfirst_node(RawStmt, parsetree_item);
(gdb)
977 bool snapshot_set = false;
(gdb) p *parsetree
$31 = {type = T_RawStmt, stmt = 0x1509ce8, stmt_location = 0, stmt_len = 91}
(gdb) p *(parsetree->stmt)
$32 = {type = T_InsertStmt}
#commandTag
(gdb) n
992 commandTag = CreateCommandTag(parsetree->stmt);
(gdb)
994 set_ps_display(commandTag, false);
(gdb) p commandTag
$33 = 0xb50908 "INSERT"
#进入分析&查询重写
1047 querytree_list = pg_analyze_and_rewrite(parsetree, query_string,
(gdb) step
pg_analyze_and_rewrite (parsetree=0x1509d38, query_string=0x1508ef0 "insert into t_insert values(22,'exec_simple_query','exec_simple_query','exec_simple_query');", paramTypes=0x0, numParams=0,
queryEnv=0x0) at postgres.c:663
663 if (log_parser_stats)
...
#分析后的Query数据结构
(gdb) p *query
$34 = {type = T_Query, commandType = CMD_INSERT, querySource = QSRC_ORIGINAL, queryId = 0, canSetTag = true, utilityStmt = 0x0, resultRelation = 1, hasAggs = false, hasWindowFuncs = false,
hasTargetSRFs = false, hasSubLinks = false, hasDistinctOn = false, hasRecursive = false, hasModifyingCTE = false, hasForUpdate = false, hasRowSecurity = false, cteList = 0x0, rtable = 0x150a788,
jointree = 0x152cf40, targetList = 0x152cda8, override = OVERRIDING_NOT_SET, onConflict = 0x0, returningList = 0x0, groupClause = 0x0, groupingSets = 0x0, havingQual = 0x0, windowClause = 0x0,
distinctClause = 0x0, sortClause = 0x0, limitOffset = 0x0, limitCount = 0x0, rowMarks = 0x0, setOperations = 0x0, constraintDeps = 0x0, withCheckOptions = 0x0, stmt_location = 0, stmt_len = 91}
...
#回到主函数
(gdb)
exec_simple_query (query_string=0x1508ef0 "insert into t_insert values(22,'exec_simple_query','exec_simple_query','exec_simple_query');") at postgres.c:1050
1050 plantree_list = pg_plan_queries(querytree_list,
(gdb)
1054 if (snapshot_set)
(gdb)
1055 PopActiveSnapshot();
(gdb)
1058 CHECK_FOR_INTERRUPTS();
(gdb)
1064 portal = CreatePortal("", true, true);
(gdb) p *plantree_list
$36 = {type = T_List, length = 1, head = 0x15c03e8, tail = 0x15c03e8}
(gdb) p (PlannedStmt*)(plantree_list->head->data.ptr_value)
$37 = (PlannedStmt *) 0x150a4a8
(gdb) p *((PlannedStmt*)(plantree_list->head->data.ptr_value))
$38 = {type = T_PlannedStmt, commandType = CMD_INSERT, queryId = 0, hasReturning = false, hasModifyingCTE = false, canSetTag = true, transientPlan = false, dependsOnRole = false,
parallelModeNeeded = false, jitFlags = 0, planTree = 0x150a028, rtable = 0x15c0318, resultRelations = 0x15c03b8, nonleafResultRelations = 0x0, rootResultRelations = 0x0, subplans = 0x0,
rewindPlanIDs = 0x0, rowMarks = 0x0, relationOids = 0x15c0368, invalItems = 0x0, paramExecTypes = 0x152e720, utilityStmt = 0x0, stmt_location = 0, stmt_len = 91}
#Portal
...
(gdb) p *portal
$40 = {name = 0x1571e98 "", prepStmtName = 0x0, portalContext = 0x152c3d0, resowner = 0x1539d10, cleanup = 0x62f15c <PortalCleanup>, createSubid = 1, activeSubid = 1, sourceText = 0x0,
commandTag = 0x0, stmts = 0x0, cplan = 0x0, portalParams = 0x0, queryEnv = 0x0, strategy = PORTAL_MULTI_QUERY, cursorOptions = 4, run_once = false, status = PORTAL_NEW, portalPinned = false,
autoHeld = false, queryDesc = 0x0, tupDesc = 0x0, formats = 0x0, holdStore = 0x0, holdContext = 0x0, holdSnapshot = 0x0, atStart = true, atEnd = true, portalPos = 0, creation_time = 587101481469205,
visible = true}
(gdb) n
1073 PortalDefineQuery(portal,
(gdb)
1083 PortalStart(portal, NULL, 0, InvalidSnapshot);
(gdb)
1091 format = 0;
(gdb)
1092 if (IsA(parsetree->stmt, FetchStmt))
(gdb) p *portal
$41 = {name = 0x1571e98 "", prepStmtName = 0x0, portalContext = 0x152c3d0, resowner = 0x1539d10, cleanup = 0x62f15c <PortalCleanup>, createSubid = 1, activeSubid = 1,
sourceText = 0x1508ef0 "insert into t_insert values(22,'exec_simple_query','exec_simple_query','exec_simple_query');", commandTag = 0xb50908 "INSERT", stmts = 0x15c0408, cplan = 0x0,
portalParams = 0x0, queryEnv = 0x0, strategy = PORTAL_MULTI_QUERY, cursorOptions = 4, run_once = false, status = PORTAL_READY, portalPinned = false, autoHeld = false, queryDesc = 0x0, tupDesc = 0x0,
formats = 0x0, holdStore = 0x0, holdContext = 0x0, holdSnapshot = 0x0, atStart = true, atEnd = true, portalPos = 0, creation_time = 587101481469205, visible = false}
#Receiver
1110 receiver = CreateDestReceiver(dest);
(gdb)
1111 if (dest == DestRemote)
(gdb) p *receiver
$42 = {receiveSlot = 0x4857ad <printtup>, rStartup = 0x485196 <printtup_startup>, rShutdown = 0x485bad <printtup_shutdown>, rDestroy = 0x485c21 <printtup_destroy>, mydest = DestRemote}
(gdb)
#执行
...
1122 (void) PortalRun(portal,
(gdb)
...
#DONE!
(gdb)
PostgresMain (argc=1, argv=0x1532aa8, dbname=0x1532990 "testdb", username=0x1532978 "xdb") at postgres.c:4155
4155 send_ready_for_query = true;
到此,关于“PostgreSQL中exec_simple_query函数的实现逻辑是什么”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!