本节介绍了PostgreSQL提交事务的具体实现逻辑,主要解析了函数CommitTransaction->RecordTransactionCommit的实现逻辑。
一、数据结构
TransactionState
事务状态结构体
typedef enum TransState
{
TRANS_DEFAULT,
TRANS_START,
TRANS_INPROGRESS,
TRANS_COMMIT,
TRANS_ABORT,
TRANS_PREPARE
} TransState;
typedef enum TBlockState
{
TBLOCK_DEFAULT,
TBLOCK_STARTED,
TBLOCK_BEGIN,
TBLOCK_INPROGRESS,
TBLOCK_IMPLICIT_INPROGRESS,
TBLOCK_PARALLEL_INPROGRESS,
TBLOCK_END,
TBLOCK_ABORT,
TBLOCK_ABORT_END,
TBLOCK_ABORT_PENDING,
TBLOCK_PREPARE,
TBLOCK_SUBBEGIN,
TBLOCK_SUBINPROGRESS,
TBLOCK_SUBRELEASE,
TBLOCK_SUBCOMMIT,
TBLOCK_SUBABORT,
TBLOCK_SUBABORT_END,
TBLOCK_SUBABORT_PENDING,
TBLOCK_SUBRESTART,
TBLOCK_SUBABORT_RESTART
} TBlockState;
typedef struct TransactionStateData
{
//事务ID
TransactionId transactionId;
//子事务ID
SubTransactionId subTransactionId;
//保存点名称
char *name;
//保存点级别
int savepointLevel;
//低级别的事务状态
TransState state;
//高级别的事务状态
TBlockState blockState;
//事务嵌套深度
int nestingLevel;
//GUC上下文嵌套深度
int gucNestLevel;
//事务生命周期上下文
MemoryContext curTransactionContext;
//查询资源
ResourceOwner curTransactionOwner;
//按XID顺序保存的已提交的子事务ID
TransactionId *childXids;
//childXids数组大小
int nChildXids;
//分配的childXids数组空间
int maxChildXids;
//上一个CurrentUserId
Oid prevUser;
//上一个SecurityRestrictionContext
int prevSecContext;
//上一事务是否只读?
bool prevXactReadOnly;
//是否处于Recovery?
bool startedInRecovery;
//XID是否已保存在WAL Record中?
bool didLogXid;
//Enter/ExitParallelMode计数器
int parallelModeLevel;
//父事务状态
struct TransactionStateData *parent;
} TransactionStateData;
//结构体指针
typedef TransactionStateData *TransactionState;
二、源码解读
RecordTransactionCommit函数,在WAL Record中记录COMMIT Record,返回最新的XID,如果xact没有XID,则返回InvalidTransactionId。.
static TransactionId
RecordTransactionCommit(void)
{
TransactionId xid = GetTopTransactionIdIfAny();//获取XID
bool markXidCommitted = TransactionIdIsValid(xid);//标记
TransactionId latestXid = InvalidTransactionId;//最后的XID
int nrels;
RelFileNode *rels;
int nchildren;
TransactionId *children;
int nmsgs = 0;
SharedInvalidationMessage *invalMessages = NULL;
bool RelcacheInitFileInval = false;
bool wrote_xlog;
//为WAL Record的commit record准备数据.
nrels = smgrGetPendingDeletes(true, &rels);
nchildren = xactGetCommittedChildren(&children);
if (XLogStandbyInfoActive())
nmsgs = xactGetCommittedInvalidationMessages(&invalMessages,
&RelcacheInitFileInval);
wrote_xlog = (XactLastRecEnd != 0);
if (!markXidCommitted)
{
if (nrels != 0)
elog(ERROR, "cannot commit a transaction that deleted files but has no xid");
//没有child XIDs,AssignTransactionId会强制实现此逻辑.
Assert(nchildren == 0);
if (nmsgs != 0)
{
LogStandbyInvalidations(nmsgs, invalMessages,
RelcacheInitFileInval);
wrote_xlog = true;
}
if (!wrote_xlog)
goto cleanup;
}
else
{
bool replorigin;
replorigin = (replorigin_session_origin != InvalidRepOriginId &&
replorigin_session_origin != DoNotReplicateId);
//通知bufmgr和smgr准备提交
BufmgrCommit();
START_CRIT_SECTION();
MyPgXact->delayChkpt = true;
SetCurrentTransactionStopTimestamp();
XactLogCommitRecord(xactStopTimestamp,
nchildren, children, nrels, rels,
nmsgs, invalMessages,
RelcacheInitFileInval, forceSyncCommit,
MyXactFlags,
InvalidTransactionId, NULL );
if (replorigin)
//为该复制源向前移动LSNs
replorigin_session_advance(replorigin_session_origin_lsn,
XactLastRecEnd);
if (!replorigin || replorigin_session_origin_timestamp == 0)
replorigin_session_origin_timestamp = xactStopTimestamp;
TransactionTreeSetCommitTsData(xid, nchildren, children,
replorigin_session_origin_timestamp,
replorigin_session_origin, false);
}
if ((wrote_xlog && markXidCommitted &&
synchronous_commit > SYNCHRONOUS_COMMIT_OFF) ||
forceSyncCommit || nrels > 0)
{
XLogFlush(XactLastRecEnd);
if (markXidCommitted)
TransactionIdCommitTree(xid, nchildren, children);
}
else
{
//异步提交
XLogSetAsyncXactLSN(XactLastRecEnd);
if (markXidCommitted)
TransactionIdAsyncCommitTree(xid, nchildren, children, XactLastRecEnd);
}
if (markXidCommitted)
{
MyPgXact->delayChkpt = false;
END_CRIT_SECTION();
}
//如持有子XIDs,计算最后的latestXid
latestXid = TransactionIdLatest(xid, nchildren, children);
if (wrote_xlog && markXidCommitted)
SyncRepWaitForLSN(XactLastRecEnd, true);
//记录最后commit记录的位置
XactLastCommitEnd = XactLastRecEnd;
//重置XactLastRecEnd直至下个事务写入数据.
XactLastRecEnd = 0;
cleanup:
//清除本地数据
if (rels)
pfree(rels);
//返回XID
return latestXid;
}
三、跟踪分析
插入数据,执行commit
10:57:56 (xdb@[local]:5432)testdb=# begin;
BEGIN
10:57:59 (xdb@[local]:5432)testdb=#* insert into t_session1 values(1);
INSERT 0 1
10:58:01 (xdb@[local]:5432)testdb=#* commit;
启动gdb,设置断点
(gdb) b RecordTransactionCommit
Breakpoint 2 at 0x547528: file xact.c, line 1141.
(gdb) c
Continuing.
Breakpoint 2, RecordTransactionCommit () at xact.c:1141
1141 TransactionId xid = GetTopTransactionIdIfAny();
(gdb)
查看调用栈
(gdb) bt
#0 RecordTransactionCommit () at xact.c:1141
#1 0x00000000005483f2 in CommitTransaction () at xact.c:2070
#2 0x0000000000549078 in CommitTransactionCommand () at xact.c:2831
#3 0x00000000008c8ea9 in finish_xact_command () at postgres.c:2523
#4 0x00000000008c6b5d in exec_simple_query (query_string=0x2c97ec8 "commit;") at postgres.c:1170
#5 0x00000000008cae70 in PostgresMain (argc=1, argv=0x2cc3dc8, dbname=0x2cc3c30 "testdb", username=0x2c94ba8 "xdb")
at postgres.c:4182
#6 0x000000000082642b in BackendRun (port=0x2cb9c00) at postmaster.c:4361
#7 0x0000000000825b8f in BackendStartup (port=0x2cb9c00) at postmaster.c:4033
#8 0x0000000000821f1c in ServerLoop () at postmaster.c:1706
#9 0x00000000008217b4 in PostmasterMain (argc=1, argv=0x2c92b60) at postmaster.c:1379
#10 0x00000000007488ef in main (argc=1, argv=0x2c92b60) at main.c:228
(gdb)
获取事务ID
(gdb) p xid
$3 = 2411
(gdb)
设置其他变量,markXidCommitted —> True
(gdb) n
1143 TransactionId latestXid = InvalidTransactionId;
(gdb)
1148 int nmsgs = 0;
(gdb)
1149 SharedInvalidationMessage *invalMessages = NULL;
(gdb)
1150 bool RelcacheInitFileInval = false;
(gdb)
1154 nrels = smgrGetPendingDeletes(true, &rels);
(gdb)
1155 nchildren = xactGetCommittedChildren(&children);
(gdb)
1156 if (XLogStandbyInfoActive())
(gdb)
1159 wrote_xlog = (XactLastRecEnd != 0);
(gdb)
1165 if (!markXidCommitted)
(gdb) p latestXid
$4 = 0
(gdb) p markXidCommitted
$5 = true
(gdb) p nrels
$6 = 0
(gdb) p nchildren
$7 = 0
(gdb) p wrote_xlog
$8 = true
(gdb)
markXidCommitted为T,进入相应的处理逻辑.
开始进入提交关键部分并插入commit XLOG记录。
(gdb) n
1214 replorigin = (replorigin_session_origin != InvalidRepOriginId &&
(gdb)
1221 BufmgrCommit();
(gdb) p replorigin
$9 = false
(gdb)
进入提交部分,设置当前事务时间戳
(gdb) n
1240 START_CRIT_SECTION();
(gdb)
1241 MyPgXact->delayChkpt = true;
(gdb)
1243 SetCurrentTransactionStopTimestamp();
(gdb) p *MyPgXact
$10 = {xid = 2411, xmin = 0, vacuumFlags = 0 '\000', overflowed = false, delayChkpt = true, nxids = 0 '\000'}
(gdb)
插入XLOG
(gdb) n
1245 XactLogCommitRecord(xactStopTimestamp,
(gdb)
1252 if (replorigin)
(gdb)
设置提交事务数据
(gdb)
1267 if (!replorigin || replorigin_session_origin_timestamp == 0)
(gdb)
1268 replorigin_session_origin_timestamp = xactStopTimestamp;
(gdb)
1270 TransactionTreeSetCommitTsData(xid, nchildren, children,
(gdb)
1300 if ((wrote_xlog && markXidCommitted &&
(gdb)
同步刷新XLOG
(gdb)
1301 synchronous_commit > SYNCHRONOUS_COMMIT_OFF) ||
(gdb)
1300 if ((wrote_xlog && markXidCommitted &&
(gdb)
1304 XLogFlush(XactLastRecEnd);
(gdb)
1309 if (markXidCommitted)
(gdb)
更新CLOG,如果我们在上面已写入了COMMIT WAL Record.
(gdb)
1310 TransactionIdCommitTree(xid, nchildren, children);
(gdb)
1309 if (markXidCommitted)
(gdb)
退出提交关键区域
(gdb)
1340 if (markXidCommitted)
(gdb)
1342 MyPgXact->delayChkpt = false;
(gdb)
1343 END_CRIT_SECTION();
(gdb)
计算最后的latestXid
(gdb)
1347 latestXid = TransactionIdLatest(xid, nchildren, children);
(gdb) n
1358 if (wrote_xlog && markXidCommitted)
(gdb) p latestXid
$11 = 2411
(gdb)
记录最后commit记录的位置
(gdb) n
1359 SyncRepWaitForLSN(XactLastRecEnd, true);
(gdb)
1362 XactLastCommitEnd = XactLastRecEnd;
(gdb)
1365 XactLastRecEnd = 0;
(gdb)
1368 if (rels)
(gdb)
1371 return latestXid;
(gdb) p XactLastCommitEnd
$12 = 5522364896
(gdb)
返回,完成调用
(gdb) n
1372 }
(gdb)
CommitTransaction () at xact.c:2087
2087 TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->lxid);
(gdb)
DONE!
四、参考资料
How Postgres Makes Transactions Atomic
PG Source Code
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
软考中级精品资料免费领
- 历年真题答案解析
- 备考技巧名师总结
- 高频考点精准押题
- 资料下载
- 历年真题
193.9 KB下载数265
191.63 KB下载数245
143.91 KB下载数1148
183.71 KB下载数642
644.84 KB下载数2756
相关文章
发现更多好内容- Java 多线程批量处理的方法究竟有哪些?(java多线程批量处理的方法是什么)
- Java 中 BigDecimal 的详细介绍与实用使用方法(java中BigDecimal的介绍及使用)
- 如何通过 JavaScript 事件循环来优化代码?(JavaScript 事件循环如何优化代码)
- 如何用 Java 解析 XML 并获取标签属性值?(java怎么解析xml获取标签属性值)
- Java 中实现 MapReduce 的具体方法有哪些?(java实现mapreduce的方法是什么)
- 如何让 Java 的 settimeout 与线程池协同工作?(Java settimeout怎样与线程池配合)
- Java 中对象数组的定义及使用方式有哪些?(Java对象数组定义与用法有哪些)
- Java ClassLoader 的使用方法究竟是什么?(java classloader的使用方法是什么)
- Java 中 Bimap 的适用场景具体有哪些?(Bimap在Java中的适用场景有哪些)
- Java 和 Golang 在性能方面有哪些差异?(Java与Golang的性能差异)