这篇文章主要讲解了“PostgreSQL中GetSnapshotData的处理过程是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“PostgreSQL中GetSnapshotData的处理过程是什么”吧!
一、数据结构
全局/静态变量
static int xmin_cmp(const pairingheap_node *a, const pairingheap_node *b,
void *arg);
static pairingheap RegisteredSnapshots = {&xmin_cmp, NULL, NULL};
bool FirstSnapshotSet = false;
static Snapshot FirstXactSnapshot = NULL;
static SnapshotData CurrentSnapshotData = {HeapTupleSatisfiesMVCC};
static SnapshotData SecondarySnapshotData = {HeapTupleSatisfiesMVCC};
SnapshotData CatalogSnapshotData = {HeapTupleSatisfiesMVCC};
//指向有效的快照
static Snapshot CurrentSnapshot = NULL;
static Snapshot SecondarySnapshot = NULL;
static Snapshot CatalogSnapshot = NULL;
static Snapshot HistoricSnapshot = NULL;
TransactionId TransactionXmin = FirstNormalTransactionId;
TransactionId RecentXmin = FirstNormalTransactionId;
TransactionId RecentGlobalXmin = InvalidTransactionId;
TransactionId RecentGlobalDataXmin = InvalidTransactionId;
static HTAB *tuplecid_data = NULL;
MyPgXact
当前的事务信息.
//是否auto vacuum worker?
#define PROC_IS_AUTOVACUUM 0x01
//正在运行lazy vacuum
#define PROC_IN_VACUUM 0x02
//正在运行analyze
#define PROC_IN_ANALYZE 0x04
//只能通过auto vacuum设置
#define PROC_VACUUM_FOR_WRAPAROUND 0x08
//在事务外部正在执行逻辑解码
#define PROC_IN_LOGICAL_DECODING 0x10
//保留用于procarray
#define PROC_RESERVED 0x20
//在EOXact时用于重置标记的MASK
#define PROC_VACUUM_STATE_MASK \
(PROC_IN_VACUUM | PROC_IN_ANALYZE | PROC_VACUUM_FOR_WRAPAROUND)
typedef struct PGXACT
{
//当前的顶层事务ID(非子事务)
//出于优化的目的,只读事务并不会分配事务号(xid = 0)
TransactionId xid;
//在启动事务时,当前正在执行的最小事务号XID,但不包括LAZY VACUUM
//vacuum不能清除删除事务号xid >= xmin的元组
TransactionId xmin;
//vacuum相关的标记
uint8 vacuumFlags;
bool overflowed;
bool delayChkpt;
uint8 nxids;
} PGXACT;
extern PGDLLIMPORT struct PGXACT *MyPgXact;
Snapshot
SnapshotData结构体指针,SnapshotData结构体可表达的信息囊括了所有可能的快照.
有以下几种不同类型的快照:
1.常规的MVCC快照
2.在恢复期间的MVCC快照(处于Hot-Standby模式)
3.在逻辑解码过程中使用的历史MVCC快照
4.作为参数传递给HeapTupleSatisfiesDirty()函数的快照
5.作为参数传递给HeapTupleSatisfiesNonVacuumable()函数的快照
6.用于在没有成员访问情况下SatisfiesAny、Toast和Self的快照
//SnapshotData结构体指针
typedef struct SnapshotData *Snapshot;
//无效的快照
#define InvalidSnapshot ((Snapshot) NULL)
//测试函数
typedef bool (*SnapshotSatisfiesFunc) (HeapTuple htup,
Snapshot snapshot, Buffer buffer);
//常见的有:
//HeapTupleSatisfiesMVCC:判断元组对某一快照版本是否有效
//HeapTupleSatisfiesUpdate:判断元组是否可更新(同时更新同一个元组)
//HeapTupleSatisfiesDirty:判断当前元组是否存在脏数据
//HeapTupleSatisfiesSelf:判断tuple对自身信息是否有效
//HeapTupleSatisfiesToast:判断是否TOAST表
//HeapTupleSatisfiesVacuum:判断元组是否能被VACUUM删除
//HeapTupleSatisfiesAny:所有元组都可见
//HeapTupleSatisfiesHistoricMVCC:用于CATALOG 表
typedef struct SnapshotData
{
//测试tuple是否可见的函数
SnapshotSatisfiesFunc satisfies;
//XID ∈ [2,min)是可见的
TransactionId xmin;
//XID ∈ [xmax,∞)是不可见的
TransactionId xmax;
TransactionId *xip;
//xip数组中的元素个数
uint32 xcnt;
TransactionId *subxip;
//subxip数组元素个数
int32 subxcnt;
//是否溢出?
bool suboverflowed;
//在Recovery期间的快照?
bool takenDuringRecovery;
//如为静态快照,则该值为F
bool copied;
//在自身的事务中,CID < curcid是可见的
CommandId curcid;
uint32 speculativeToken;
//在ActiveSnapshot栈中的引用计数
uint32 active_count;
//在RegisteredSnapshots中的引用计数
uint32 regd_count;
//RegisteredSnapshots堆中的链接
pairingheap_node ph_node;
//快照"拍摄"时间戳
TimestampTz whenTaken;
//拍照时WAL stream中的位置
XLogRecPtr lsn;
} SnapshotData;
ShmemVariableCache
VariableCache是共享内存中的一种数据结构,用于跟踪OID和XID分配状态。
ShmemVariableCache是VariableCache结构体指针.
typedef struct VariableCacheData
{
//下一个待分配的OID
Oid nextOid;
//在必须执行XLOG work前可用OIDs
uint32 oidCount;
//下一个待分配的事务ID
TransactionId nextXid;
//集群范围内最小datfrozenxid
TransactionId oldestXid;
//在该XID开始强制执行autovacuum
TransactionId xidVacLimit;
//在该XID开始提出警告
TransactionId xidWarnLimit;
//在该XID开外,拒绝生成下一个XID
TransactionId xidStopLimit;
//"世界末日"XID,需回卷
TransactionId xidWrapLimit;
//持有最小datfrozenxid的DB
Oid oldestXidDB;
TransactionId oldestCommitTsXid;
TransactionId newestCommitTsXid;
TransactionId latestCompletedXid;
//clog中最古老的XID
TransactionId oldestClogXid;
} VariableCacheData;
//结构体指针
typedef VariableCacheData *VariableCache;
//共享内存中的指针(通过shmem.c设置)
VariableCache ShmemVariableCache = NULL;
二、源码解读
GetSnapshotData函数返回快照信息.
重点是构造xmin : xmax : xip_list,其实现逻辑简单总结如下:
1.获取xmax = ShmemVariableCache->latestCompletedXid + 1;
2.遍历全局procArray数组,构建快照信息
2.1 获取进程相应的事务信息pgxact
2.2 获取进程事务ID(pgxact->xid),取最小的xid作为xmin(不包括0)
2.3 把xid放入快照->xip数组中(不包括本进程所在的事务id)
Snapshot
GetSnapshotData(Snapshot snapshot)
{
ProcArrayStruct *arrayP = procArray;//进程数组
TransactionId xmin;//xmin
TransactionId xmax;//xmax
TransactionId globalxmin;//全局xmin
int index;
int count = 0;
int subcount = 0;
bool suboverflowed = false;
TransactionId replication_slot_xmin = InvalidTransactionId;
TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
Assert(snapshot != NULL);
if (snapshot->xip == NULL)
{
snapshot->xip = (TransactionId *)
malloc(GetMaxSnapshotXidCount() * sizeof(TransactionId));
if (snapshot->xip == NULL)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
Assert(snapshot->subxip == NULL);
snapshot->subxip = (TransactionId *)
malloc(GetMaxSnapshotSubxidCount() * sizeof(TransactionId));
if (snapshot->subxip == NULL)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
}
LWLockAcquire(ProcArrayLock, LW_SHARED);
//xmax = latestCompletedXid + 1
//已完结事务号 + 1
xmax = ShmemVariableCache->latestCompletedXid;
Assert(TransactionIdIsNormal(xmax));
TransactionIdAdvance(xmax);// + 1
//初始化xmin为xmax
globalxmin = xmin = xmax;
//是否处于恢复过程中?
snapshot->takenDuringRecovery = RecoveryInProgress();
if (!snapshot->takenDuringRecovery)
{
//不是,正常运行中
int *pgprocnos = arrayP->pgprocnos;//进程数
int numProcs;
numProcs = arrayP->numProcs;
for (index = 0; index < numProcs; index++)//遍历procArray数组
{
int pgprocno = pgprocnos[index];//allPgXact[]索引
PGXACT *pgxact = &allPgXact[pgprocno];//获取PGXACT
TransactionId xid;//事务id
if (pgxact->vacuumFlags &
(PROC_IN_LOGICAL_DECODING | PROC_IN_VACUUM))
continue;
//更新globalxmin为最小有效的xmin
xid = UINT32_ACCESS_ONCE(pgxact->xmin);//获取进程事务的xmin
if (TransactionIdIsNormal(xid) &&
NormalTransactionIdPrecedes(xid, globalxmin))
globalxmin = xid;
//只提取一次xid -- 查看函数GetNewTransactionId
xid = UINT32_ACCESS_ONCE(pgxact->xid);
if (!TransactionIdIsNormal(xid)
|| !NormalTransactionIdPrecedes(xid, xmax))
continue;
if (NormalTransactionIdPrecedes(xid, xmin))
//xid 小于 xmin,设置为xid
xmin = xid;
if (pgxact == MyPgXact)
continue;//跳过本事务
//添加XID到快照中
snapshot->xip[count++] = xid;
if (!suboverflowed)
{
if (pgxact->overflowed)
suboverflowed = true;
else
{
int nxids = pgxact->nxids;
if (nxids > 0)
{
PGPROC *proc = &allProcs[pgprocno];
pg_read_barrier();
memcpy(snapshot->subxip + subcount,
(void *) proc->subxids.xids,
nxids * sizeof(TransactionId));
subcount += nxids;
}
}
}
}
}
else
{
subcount = KnownAssignedXidsGetAndSetXmin(snapshot->subxip, &xmin,
xmax);
if (TransactionIdPrecedesOrEquals(xmin, procArray->lastOverflowedXid))
suboverflowed = true;
}
replication_slot_xmin = procArray->replication_slot_xmin;
replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
if (!TransactionIdIsValid(MyPgXact->xmin))
MyPgXact->xmin = TransactionXmin = xmin;
LWLockRelease(ProcArrayLock);
if (TransactionIdPrecedes(xmin, globalxmin))
globalxmin = xmin;
//更新全局变量
RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age;
if (!TransactionIdIsNormal(RecentGlobalXmin))
RecentGlobalXmin = FirstNormalTransactionId;
//检查是否存在正在请求更旧xmin的复制slot
if (TransactionIdIsValid(replication_slot_xmin) &&
NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin))
RecentGlobalXmin = replication_slot_xmin;
//比该xid小的非catalog表可被vacuum进程清除
RecentGlobalDataXmin = RecentGlobalXmin;
if (TransactionIdIsNormal(replication_slot_catalog_xmin) &&
NormalTransactionIdPrecedes(replication_slot_catalog_xmin, RecentGlobalXmin))
RecentGlobalXmin = replication_slot_catalog_xmin;
RecentXmin = xmin;
snapshot->xmin = xmin;
snapshot->xmax = xmax;
snapshot->xcnt = count;
snapshot->subxcnt = subcount;
snapshot->suboverflowed = suboverflowed;
//当前命令id
snapshot->curcid = GetCurrentCommandId(false);
snapshot->active_count = 0;
snapshot->regd_count = 0;
snapshot->copied = false;
if (old_snapshot_threshold < 0)
{
snapshot->lsn = InvalidXLogRecPtr;
snapshot->whenTaken = 0;
}
else
{
snapshot->lsn = GetXLogInsertRecPtr();
snapshot->whenTaken = GetSnapshotCurrentTimestamp();
MaintainOldSnapshotTimeMapping(snapshot->whenTaken, xmin);
}
//返回快照
return snapshot;
}
三、跟踪分析
执行简单查询,可触发获取快照逻辑.
16:35:08 (xdb@[local]:5432)testdb=# begin;
BEGIN
16:35:13 (xdb@[local]:5432)testdb=#* select 1;
启动gdb,设置断点
(gdb) b GetSnapshotData
Breakpoint 1 at 0x89aef3: file procarray.c, line 1519.
(gdb) c
Continuing.
Breakpoint 1, GetSnapshotData (snapshot=0xf9be60 <CurrentSnapshotData>) at procarray.c:1519
1519 ProcArrayStruct *arrayP = procArray;
(gdb)
输入参数snapshot,实质是全局变量CurrentSnapshotData
(gdb) p *snapshot
$1 = {satisfies = 0xa9310d <HeapTupleSatisfiesMVCC>, xmin = 2354, xmax = 2358, xip = 0x24c7e40, xcnt = 1,
subxip = 0x251dfa0, subxcnt = 0, suboverflowed = false, takenDuringRecovery = false, copied = false, curcid = 0,
speculativeToken = 0, active_count = 0, regd_count = 0, ph_node = {first_child = 0x0, next_sibling = 0x0,
prev_or_parent = 0x0}, whenTaken = 0, lsn = 0}
查看共享内存(ShmemVariableCache)中的信息.
nextXID = 2358,下一个待分配的事务ID = 2358.
(gdb) p *ShmemVariableCache
$2 = {nextOid = 42605, oidCount = 8183, nextXid = 2358, oldestXid = 561, xidVacLimit = 200000561,
xidWarnLimit = 2136484208, xidStopLimit = 2146484208, xidWrapLimit = 2147484208, oldestXidDB = 16400,
oldestCommitTsXid = 0, newestCommitTsXid = 0, latestCompletedXid = 2357, oldestClogXid = 561}
(gdb)
获取全局进程数组procArray,赋值->arrayP.
初始化相关变量.
(gdb) n
1524 int count = 0;
(gdb) n
1525 int subcount = 0;
(gdb)
1526 bool suboverflowed = false;
(gdb)
1527 volatile TransactionId replication_slot_xmin = InvalidTransactionId;
(gdb)
1528 volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
(gdb)
1530 Assert(snapshot != NULL);
(gdb)
1543 if (snapshot->xip == NULL)
(gdb)
查看进程数组信息和allPgXact[]数组编号(arrayP->pgprocnos数组).
allPgXact定义:static PGXACT *allPgXact;
(gdb) p *arrayP
$3 = {numProcs = 5, maxProcs = 112, maxKnownAssignedXids = 7280, numKnownAssignedXids = 0, tailKnownAssignedXids = 0,
headKnownAssignedXids = 0, known_assigned_xids_lck = 0 '\000', lastOverflowedXid = 0, replication_slot_xmin = 0,
replication_slot_catalog_xmin = 0, pgprocnos = 0x7f8765d9a3a8}
(gdb) p arrayP->pgprocnos[0]
$4 = 97
(gdb) p arrayP->pgprocnos[1]
$5 = 98
(gdb) p arrayP->pgprocnos[2]
$6 = 99
(gdb) p arrayP->pgprocnos[3]
$7 = 103
(gdb) p arrayP->pgprocnos[4]
$9 = 111
加锁,获取/修改相关信息
(gdb)
1568 LWLockAcquire(ProcArrayLock, LW_SHARED);
计算xmax
(gdb) n
1571 xmax = ShmemVariableCache->latestCompletedXid;
(gdb)
1572 Assert(TransactionIdIsNormal(xmax));
(gdb) p xmax
$10 = 2357
(gdb) n
1573 TransactionIdAdvance(xmax);
(gdb)
1576 globalxmin = xmin = xmax;
(gdb)
1578 snapshot->takenDuringRecovery = RecoveryInProgress();
(gdb) p xmax
$11 = 2358
判断是否处于恢复状态,当前不是恢复状态,进入相应的处理逻辑
(gdb) n
1580 if (!snapshot->takenDuringRecovery)
(gdb) p snapshot->takenDuringRecovery
$13 = false
(gdb) n
1582 int *pgprocnos = arrayP->pgprocnos;
(gdb)
获取进程数和PGXACT索引数组,准备遍历
(gdb) n
1590 numProcs = arrayP->numProcs;
(gdb)
1591 for (index = 0; index < numProcs; index++)
(gdb)
(gdb) p *pgprocnos
$14 = 97
(gdb) p numProcs
$15 = 5
(gdb)
获取pgxact信息
(gdb) n
1593 int pgprocno = pgprocnos[index];
(gdb)
1594 volatile PGXACT *pgxact = &allPgXact[pgprocno];
(gdb)
1601 if (pgxact->vacuumFlags & PROC_IN_LOGICAL_DECODING)
(gdb)
1605 if (pgxact->vacuumFlags & PROC_IN_VACUUM)
(gdb)
1609 xid = pgxact->xmin;
(gdb) p *pgxact
$16 = {xid = 0, xmin = 0, vacuumFlags = 0 '\000', overflowed = false, delayChkpt = false, nxids = 0 '\000'}
(gdb)
不是正常的xid,下一个pgxact
(gdb) n
1610 if (TransactionIdIsNormal(xid) &&
(gdb)
1615 xid = pgxact->xid;
(gdb)
1623 if (!TransactionIdIsNormal(xid)
(gdb) p xid
$17 = 0
(gdb) n
1625 continue;
(gdb)
下一个xid = 2355,正常的事务ID
(gdb)
1591 for (index = 0; index < numProcs; index++)
(gdb)
1593 int pgprocno = pgprocnos[index];
(gdb)
1594 volatile PGXACT *pgxact = &allPgXact[pgprocno];
(gdb)
1601 if (pgxact->vacuumFlags & PROC_IN_LOGICAL_DECODING)
(gdb) p *pgxact
$18 = {xid = 2355, xmin = 0, vacuumFlags = 0 '\000', overflowed = false, delayChkpt = false, nxids = 0 '\000'}
(gdb)
进行处理
(gdb) n
1605 if (pgxact->vacuumFlags & PROC_IN_VACUUM)
(gdb)
1609 xid = pgxact->xmin;
(gdb)
1610 if (TransactionIdIsNormal(xid) &&
(gdb)
1615 xid = pgxact->xid;
(gdb)
1623 if (!TransactionIdIsNormal(xid)
(gdb)
1624 || !NormalTransactionIdPrecedes(xid, xmax))
(gdb)
1631 if (NormalTransactionIdPrecedes(xid, xmin))
(gdb) p xid
$19 = 2355
(gdb) p xmin
$20 = 2358
(gdb) n
1632 xmin = xid;
(gdb)
1633 if (pgxact == MyPgXact)
(gdb)
这是同一个xact,处理下一个xact
(gdb)
1633 if (pgxact == MyPgXact)
(gdb) p pgxact
$21 = (volatile PGXACT *) 0x7f8765d9a218
(gdb) p MyPgXact
$22 = (struct PGXACT *) 0x7f8765d9a218
(gdb) n
1634 continue;
(gdb)
下一个是2354
...
(gdb) p *pgxact
$23 = {xid = 2354, xmin = 0, vacuumFlags = 0 '\000', overflowed = false, delayChkpt = false, nxids = 0 '\000'}
(gdb)
xmin调整为2354
1631 if (NormalTransactionIdPrecedes(xid, xmin))
(gdb)
1632 xmin = xid;
(gdb)
1633 if (pgxact == MyPgXact)
(gdb) p xmin
$24 = 2354
(gdb)
写入到xip_list中
1637 snapshot->xip[count++] = xid;
(gdb)
1654 if (!suboverflowed)
(gdb)
(gdb) p count
$25 = 1
继续循环,完成5个pgxact的遍历
1591 for (index = 0; index < numProcs; index++)
(gdb)
1715 replication_slot_xmin = procArray->replication_slot_xmin;
(gdb)
无复制信息
(gdb)
1715 replication_slot_xmin = procArray->replication_slot_xmin;
(gdb) p procArray->replication_slot_xmin
$28 = 0
(gdb) n
1716 replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
(gdb)
1718 if (!TransactionIdIsValid(MyPgXact->xmin))
调整本进程的事务信息
(gdb) n
1719 MyPgXact->xmin = TransactionXmin = xmin;
(gdb) p MyPgXact->xmin
$29 = 0
(gdb) n
释放锁
1721 LWLockRelease(ProcArrayLock);
(gdb)
1728 if (TransactionIdPrecedes(xmin, globalxmin))
(gdb)
调整全局xmin
(gdb) p xmin
$30 = 2354
(gdb) p globalxmin
$31 = 2358
(gdb) n
1729 globalxmin = xmin;
(gdb)
更新其他信息
(gdb)
1732 RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age;
(gdb) p RecentGlobalXmin
$32 = 2354
(gdb) p vacuum_defer_cleanup_age
$33 = 0
(gdb) n
1733 if (!TransactionIdIsNormal(RecentGlobalXmin))
(gdb)
1737 if (TransactionIdIsValid(replication_slot_xmin) &&
(gdb)
1742 RecentGlobalDataXmin = RecentGlobalXmin;
(gdb) p RecentGlobalXmin
$34 = 2354
(gdb) n
1748 if (TransactionIdIsNormal(replication_slot_catalog_xmin) &&
(gdb)
填充snapshot域字段信息
(gdb)
1752 RecentXmin = xmin;
(gdb)
1754 snapshot->xmin = xmin;
(gdb)
1755 snapshot->xmax = xmax;
(gdb)
1756 snapshot->xcnt = count;
(gdb)
1757 snapshot->subxcnt = subcount;
(gdb)
1758 snapshot->suboverflowed = suboverflowed;
(gdb)
1760 snapshot->curcid = GetCurrentCommandId(false);
(gdb)
1766 snapshot->active_count = 0;
(gdb)
1767 snapshot->regd_count = 0;
(gdb)
1768 snapshot->copied = false;
(gdb)
1770 if (old_snapshot_threshold < 0)
(gdb)
1776 snapshot->lsn = InvalidXLogRecPtr;
(gdb)
1777 snapshot->whenTaken = 0;
(gdb)
1791 return snapshot;
(gdb)
返回snapshot
(gdb) p snapshot
$35 = (Snapshot) 0xf9be60 <CurrentSnapshotData>
(gdb) p *snapshot
$36 = {satisfies = 0xa9310d <HeapTupleSatisfiesMVCC>, xmin = 2354, xmax = 2358, xip = 0x24c7e40, xcnt = 1,
subxip = 0x251dfa0, subxcnt = 0, suboverflowed = false, takenDuringRecovery = false, copied = false, curcid = 0,
speculativeToken = 0, active_count = 0, regd_count = 0, ph_node = {first_child = 0x0, next_sibling = 0x0,
prev_or_parent = 0x0}, whenTaken = 0, lsn = 0}
(gdb)
注意:snapshot->satisfies函数在初始化该全局变量已设置为HeapTupleSatisfiesMVCC.
感谢各位的阅读,以上就是“PostgreSQL中GetSnapshotData的处理过程是什么”的内容了,经过本文的学习后,相信大家对PostgreSQL中GetSnapshotData的处理过程是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!