本篇内容主要讲解“PostgreSQL中插入数据时与WAL相关的处理逻辑是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“PostgreSQL中插入数据时与WAL相关的处理逻辑是什么”吧!
一、数据结构
静态变量
进程中全局共享
static int num_rdatas;
//已分配的空间大小
static int max_rdatas;
//是否调用XLogBeginInsert函数
static bool begininsert_called = false;
static XLogCtlData *XLogCtl = NULL;
static uint8 curinsert_flags = 0;
XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr;
XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr;
XLogRecPtr XactLastCommitEnd = InvalidXLogRecPtr;
//用于WALInsertLockAcquire/Release函数
static int MyLockNo = 0;
static bool holdingAllLocks = false;
宏定义
typedef char* Pointer;//指针
typedef Pointer Page;//Page
#define XLOG_HEAP_INSERT 0x00
typedef uint64 XLogRecPtr;
#define PageGetLSN(page) \
PageXLogRecPtrGet(((PageHeader) (page))->pd_lsn)
#define PageSetLSN(page, lsn) \
PageXLogRecPtrSet(((PageHeader) (page))->pd_lsn, lsn)
//存储压缩会后的块镜像所需要的缓存空间大小
#define PGLZ_MAX_BLCKSZ PGLZ_MAX_OUTPUT(BLCKSZ)
//-------------------------------------------------- 锁相关
typedef int slock_t;
typedef uint32 pg_crc32c;
#define SpinLockInit(lock) S_INIT_LOCK(lock)
#define SpinLockAcquire(lock) S_LOCK(lock)
#define SpinLockRelease(lock) S_UNLOCK(lock)
#define SpinLockFree(lock) S_LOCK_FREE(lock)
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes) \
((xlogptr) & ((wal_segsz_bytes) - 1))
#define LW_FLAG_HAS_WAITERS ((uint32) 1 << 30)
#define LW_FLAG_RELEASE_OK ((uint32) 1 << 29)
#define LW_FLAG_LOCKED ((uint32) 1 << 28)
#define LW_VAL_EXCLUSIVE ((uint32) 1 << 24)
#define LW_VAL_SHARED 1
#define LW_LOCK_MASK ((uint32) ((1 << 25)-1))
#define LW_SHARED_MASK ((uint32) ((1 << 24)-1))
LWLock
lwlock.c外的代码不应直接操作这个结构的内容,但我们必须声明该结构体以便将LWLocks合并到其他数据结构中。
typedef struct LWLock
{
uint16 tranche;
//独占/非独占locker的状态
pg_atomic_uint32 state;
//正在等待的PGPROCs链表
proclist_head waiters;
#ifdef LOCK_DEBUG//用于DEBUG
//waiters的数量
pg_atomic_uint32 nwaiters;
//锁的最后独占者
struct PGPROC *owner;
#endif
} LWLock;
二、源码解读
heap_insert
主要实现逻辑是插入元组到堆中,其中存在对WAL(XLog)进行处理的部分.
参见PostgreSQL 源码解读(104)- WAL#1(Insert & WAL-heap_insert函数#1)
XLogInsert/XLogInsertRecord
插入一个具有指定的RMID和info字节的XLOG记录,该记录的主体是先前通过XLogRegister*调用注册的数据和缓冲区引用。
参见PostgreSQL 源码解读(106)- WAL#3(Insert & WAL-heap_insert函数#3)
WALInsertLockXXX
包括WALInsertLockAcquireExclusive、WALInsertLockAcquire和WALInsertLockRelease等
//----------------------------------------------------------- WALInsertLockAcquireExclusive
static void
WALInsertLockAcquireExclusive(void)
{
int i;
for (i = 0; i < NUM_XLOGINSERT_LOCKS - 1; i++)//NUM_XLOGINSERT_LOCKS
{
LWLockAcquire(&WALInsertLocks[i].l.lock, LW_EXCLUSIVE);
LWLockUpdateVar(&WALInsertLocks[i].l.lock,
&WALInsertLocks[i].l.insertingAt,
PG_UINT64_MAX);
}
//在释放时,变量值重置为0
LWLockAcquire(&WALInsertLocks[i].l.lock, LW_EXCLUSIVE);
//设置标记
holdingAllLocks = true;
}
bool
LWLockAcquire(LWLock *lock, LWLockMode mode)
{
PGPROC *proc = MyProc;//PGPROC数据结构
bool result = true;
int extraWaits = 0;
#ifdef LWLOCK_STATS
lwlock_stats *lwstats;
lwstats = get_lwlock_stats_entry(lock);//获得锁的统计入口
#endif
//模式验证
AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
PRINT_LWDEBUG("LWLockAcquire", lock, mode);
#ifdef LWLOCK_STATS
if (mode == LW_EXCLUSIVE)
lwstats->ex_acquire_count++;
else
lwstats->sh_acquire_count++;
#endif
Assert(!(proc == NULL && IsUnderPostmaster));
//确保我们有足够的地方存储锁
if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
elog(ERROR, "too many LWLocks taken");
HOLD_INTERRUPTS();
for (;;)
{
bool mustwait;
mustwait = LWLockAttemptLock(lock, mode);
if (!mustwait)
{
LOG_LWDEBUG("LWLockAcquire", lock, "immediately acquired lock");
break;
}
//添加到队列中
LWLockQueueSelf(lock, mode);
//在需要的时候,确保可以被唤醒
mustwait = LWLockAttemptLock(lock, mode);
//第二次尝试获取锁,需要取消排队
if (!mustwait)
{
LOG_LWDEBUG("LWLockAcquire", lock, "acquired, undoing queue");
LWLockDequeueSelf(lock);//出列
break;
}
LOG_LWDEBUG("LWLockAcquire", lock, "waiting");
#ifdef LWLOCK_STATS
lwstats->block_count++;//统计
#endif
LWLockReportWaitStart(lock);//报告等待
TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
for (;;)
{
PGSemaphoreLock(proc->sem);
if (!proc->lwWaiting)//如果不是LWLock等待,跳出循环
break;
extraWaits++;//额外的等待
}
//重试,允许LWLockRelease再次释放waiters
pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
#ifdef LOCK_DEBUG
{
//无需等待
uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
Assert(nwaiters < MAX_BACKENDS);
}
#endif
TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
LWLockReportWaitEnd();
LOG_LWDEBUG("LWLockAcquire", lock, "awakened");
//再次循环以再次请求锁
result = false;
}
TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), mode);
//获取成功!
//在该后台进程持有的锁链表中添加锁
held_lwlocks[num_held_lwlocks].lock = lock;
held_lwlocks[num_held_lwlocks++].mode = mode;
while (extraWaits-- > 0)
PGSemaphoreUnlock(proc->sem);
return result;
}
static bool
LWLockAttemptLock(LWLock *lock, LWLockMode mode)
{
uint32 old_state;
AssertArg(mode == LW_EXCLUSIVE || mode == LW_SHARED);
old_state = pg_atomic_read_u32(&lock->state);
//循环指针我们确定是否可以获得锁位置
while (true)
{
uint32 desired_state;
bool lock_free;
desired_state = old_state;
if (mode == LW_EXCLUSIVE)//独占
{
lock_free = (old_state & LW_LOCK_MASK) == 0;
if (lock_free)
desired_state += LW_VAL_EXCLUSIVE;
}
else
{
//非独占
lock_free = (old_state & LW_VAL_EXCLUSIVE) == 0;
if (lock_free)
desired_state += LW_VAL_SHARED;
}
if (pg_atomic_compare_exchange_u32(&lock->state,
&old_state, desired_state))
{
if (lock_free)
{
//很好,获取锁!
#ifdef LOCK_DEBUG
if (mode == LW_EXCLUSIVE)
lock->owner = MyProc;
#endif
return false;
}
else
return true;
}
}
pg_unreachable();//正常来说,程序逻辑不应到这里
}
//----------------------------------------------------------- WALInsertLockAcquire
static void
WALInsertLockAcquire(void)
{
bool immed;
static int lockToTry = -1;
if (lockToTry == -1)
lockToTry = MyProc->pgprocno % NUM_XLOGINSERT_LOCKS;
MyLockNo = lockToTry;
immed = LWLockAcquire(&WALInsertLocks[MyLockNo].l.lock, LW_EXCLUSIVE);
if (!immed)
{
lockToTry = (lockToTry + 1) % NUM_XLOGINSERT_LOCKS;
}
}
//----------------------------------------------------------- WALInsertLockRelease
static void
WALInsertLockRelease(void)
{
if (holdingAllLocks)//如持有所有锁
{
int i;
for (i = 0; i < NUM_XLOGINSERT_LOCKS; i++)
LWLockReleaseClearVar(&WALInsertLocks[i].l.lock,
&WALInsertLocks[i].l.insertingAt,
0);
holdingAllLocks = false;
}
else
{
LWLockReleaseClearVar(&WALInsertLocks[MyLockNo].l.lock,
&WALInsertLocks[MyLockNo].l.insertingAt,
0);
}
}
void
LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val)
{
LWLockWaitListLock(lock);
*valptr = val;
LWLockWaitListUnlock(lock);
LWLockRelease(lock);
}
static void
LWLockWaitListLock(LWLock *lock)
{
uint32 old_state;
#ifdef LWLOCK_STATS
lwlock_stats *lwstats;
uint32 delays = 0;
lwstats = get_lwlock_stats_entry(lock);
#endif
while (true)
{
//首次尝试直接获取锁
old_state = pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_LOCKED);
if (!(old_state & LW_FLAG_LOCKED))
break;
//然后在没有原子操作的情况下spin,直到锁释放
{
SpinDelayStatus delayStatus;//SpinDelay状态
init_local_spin_delay(&delayStatus);//初始化
while (old_state & LW_FLAG_LOCKED)//获取Lock
{
perform_spin_delay(&delayStatus);
old_state = pg_atomic_read_u32(&lock->state);
}
#ifdef LWLOCK_STATS
delays += delayStatus.delays;
#endif
finish_spin_delay(&delayStatus);
}
}
#ifdef LWLOCK_STATS
lwstats->spin_delay_count += delays;//延迟计数
#endif
}
static void
LWLockWaitListUnlock(LWLock *lock)
{
uint32 old_state PG_USED_FOR_ASSERTS_ONLY;
old_state = pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_LOCKED);
Assert(old_state & LW_FLAG_LOCKED);
}
void
LWLockRelease(LWLock *lock)
{
LWLockMode mode;
uint32 oldstate;
bool check_waiters;
int i;
for (i = num_held_lwlocks; --i >= 0;)
if (lock == held_lwlocks[i].lock)
break;
if (i < 0)
elog(ERROR, "lock %s is not held", T_NAME(lock));
mode = held_lwlocks[i].mode;//模式
num_held_lwlocks--;//减一
for (; i < num_held_lwlocks; i++)
held_lwlocks[i] = held_lwlocks[i + 1];
PRINT_LWDEBUG("LWLockRelease", lock, mode);
if (mode == LW_EXCLUSIVE)
oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_EXCLUSIVE);
else
oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_SHARED);
//舍我其谁!
Assert(!(oldstate & LW_VAL_EXCLUSIVE));
if ((oldstate & (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK)) ==
(LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK) &&
(oldstate & LW_LOCK_MASK) == 0)
check_waiters = true;
else
check_waiters = false;
if (check_waiters)
{
//XXX: 在commit前清除?
LOG_LWDEBUG("LWLockRelease", lock, "releasing waiters");
LWLockWakeup(lock);
}
TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock));
RESUME_INTERRUPTS();
}
到此,相信大家对“PostgreSQL中插入数据时与WAL相关的处理逻辑是什么”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!