文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

PostgreSQL的后台进程checkpointer分析

2024-04-02 19:55

关注

本篇内容介绍了“PostgreSQL的后台进程checkpointer分析”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

一、数据结构

CheckPoint
CheckPoint XLOG record结构体.


typedef struct CheckPoint
{
    //在开始创建CheckPoint时下一个可用的RecPtr(比如REDO的开始点)
    XLogRecPtr  redo;           
    //当前的时间线
    TimeLineID  ThisTimeLineID; 
    //上一个时间线(如该记录正在开启一条新的时间线,否则等于当前时间线)
    TimeLineID  PrevTimeLineID; 
    //是否full-page-write
    bool        fullPageWrites; 
    //nextXid的高阶位
    uint32      nextXidEpoch;   
    //下一个free的XID
    TransactionId nextXid;      
    //下一个free的OID
    Oid         nextOid;        
    //下一个fredd的MultiXactId
    MultiXactId nextMulti;      
    //下一个空闲的MultiXact偏移
    MultiXactOffset nextMultiOffset;    
    //集群范围内的最小datfrozenxid
    TransactionId oldestXid;    
    //最小datfrozenxid所在的database
    Oid         oldestXidDB;    
    //集群范围内的最小datminmxid
    MultiXactId oldestMulti;    
    //最小datminmxid所在的database
    Oid         oldestMultiDB;  
    //checkpoint的时间戳
    pg_time_t   time;           
    //带有有效提交时间戳的最老Xid
    TransactionId oldestCommitTsXid;    
    //带有有效提交时间戳的最新Xid
    TransactionId newestCommitTsXid;    
    
    TransactionId oldestActiveXid;
} CheckPoint;

#define XLOG_CHECKPOINT_SHUTDOWN        0x00
#define XLOG_CHECKPOINT_ONLINE          0x10
#define XLOG_NOOP                       0x20
#define XLOG_NEXTOID                    0x30
#define XLOG_SWITCH                     0x40
#define XLOG_BACKUP_END                 0x50
#define XLOG_PARAMETER_CHANGE           0x60
#define XLOG_RESTORE_POINT              0x70
#define XLOG_FPW_CHANGE                 0x80
#define XLOG_END_OF_RECOVERY            0x90
#define XLOG_FPI_FOR_HINT               0xA0
#define XLOG_FPI                        0xB0

CheckpointerShmem
checkpointer进程和其他后台进程之间通讯的共享内存结构.


typedef struct
{
    RelFileNode rnode;//表空间/数据库/Relation信息
    ForkNumber  forknum;//fork编号
    BlockNumber segno;          
    
} CheckpointerRequest;
typedef struct
{
    //checkpoint进程的pid(为0则进程未启动)
    pid_t       checkpointer_pid;   
    //用于保护所有的ckpt_*域
    slock_t     ckpt_lck;       
    //在checkpoint启动时计数
    int         ckpt_started;   
    //在checkpoint完成时计数
    int         ckpt_done;      
    //在checkpoint失败时计数
    int         ckpt_failed;    
    //检查点标记,在xlog.h中定义
    int         ckpt_flags;     
    //计数后台进程缓存写的次数
    uint32      num_backend_writes; 
    //计数后台进程fsync调用次数
    uint32      num_backend_fsync;  
    //当前的请求编号
    int         num_requests;   
    //最大的请求编号
    int         max_requests;   
    //请求数组
    CheckpointerRequest requests[FLEXIBLE_ARRAY_MEMBER];
} CheckpointerShmemStruct;
//静态变量(CheckpointerShmemStruct结构体指针)
static CheckpointerShmemStruct *CheckpointerShmem;

二、源码解读

CheckpointerMain函数是checkpointer进程的入口.
该函数首先为信号设置控制器(如熟悉Java OO开发,对这样的写法应不陌生),然后创建进程的内存上下文,接着进入循环(forever),在"合适"的时候执行checkpoint.


void
CheckpointerMain(void)
{
    sigjmp_buf  local_sigjmp_buf;
    MemoryContext checkpointer_context;
    CheckpointerShmem->checkpointer_pid = MyProcPid;
    //为信号设置控制器(如熟悉Java OO开发,对这样的写法应不陌生)
    
    //设置标志,读取配置文件
    pqsignal(SIGHUP, ChkptSigHupHandler);   
    //请求checkpoint
    pqsignal(SIGINT, ReqCheckpointHandler); 
    //忽略SIGTERM
    pqsignal(SIGTERM, SIG_IGN); 
    //宕机
    pqsignal(SIGQUIT, chkpt_quickdie);  
    //忽略SIGALRM & SIGPIPE
    pqsignal(SIGALRM, SIG_IGN);
    pqsignal(SIGPIPE, SIG_IGN);
    pqsignal(SIGUSR1, chkpt_sigusr1_handler);
    //请求关闭
    pqsignal(SIGUSR2, ReqShutdownHandler);  
    
    pqsignal(SIGCHLD, SIG_DFL);
    
    //运行SIGQUIT信号
    sigdelset(&BlockSig, SIGQUIT);
    
    last_checkpoint_time = last_xlog_switch_time = (pg_time_t) time(NULL);
    
    checkpointer_context = AllocSetContextCreate(TopMemoryContext,
                                                 "Checkpointer",
                                                 ALLOCSET_DEFAULT_SIZES);
    MemoryContextSwitchTo(checkpointer_context);
    
    if (sigsetjmp(local_sigjmp_buf, 1) != 0)
    {
        
        //没有使用PG_TRY,必须重置错误栈
        error_context_stack = NULL;
        
        //在清除期间必须避免中断
        HOLD_INTERRUPTS();
        
        //在日志中报告错误信息
        EmitErrorReport();
        
        LWLockReleaseAll();
        ConditionVariableCancelSleep();
        pgstat_report_wait_end();
        AbortBufferIO();
        UnlockBuffers();
        ReleaseAuxProcessResources(false);
        AtEOXact_Buffers(false);
        AtEOXact_SMgr();
        AtEOXact_Files(false);
        AtEOXact_HashTables(false);
        
        //通知正在等待的后台进程:checkpoint执行失败
        if (ckpt_active)
        {
            SpinLockAcquire(&CheckpointerShmem->ckpt_lck);
            CheckpointerShmem->ckpt_failed++;
            CheckpointerShmem->ckpt_done = CheckpointerShmem->ckpt_started;
            SpinLockRelease(&CheckpointerShmem->ckpt_lck);
            ckpt_active = false;
        }
        
        MemoryContextSwitchTo(checkpointer_context);
        FlushErrorState();
        
        //在顶层上下文刷新泄漏的数据
        MemoryContextResetAndDeleteChildren(checkpointer_context);
        
        //现在我们可以允许中断了
        RESUME_INTERRUPTS();
        
        pg_usleep(1000000L);
        
        smgrcloseall();
    }
    
    //现在可以处理ereport(ERROR)调用了.
    PG_exception_stack = &local_sigjmp_buf;
    
    PG_SETMASK(&UnBlockSig);
    
    UpdateSharedMemoryConfig();
    
    ProcGlobal->checkpointerLatch = &MyProc->procLatch;
    
    for (;;)
    {
        bool        do_checkpoint = false;//是否执行checkpoint
        int         flags = 0;//标记
        pg_time_t   now;//时间
        int         elapsed_secs;//已消逝的时间
        int         cur_timeout;//timeout时间
        
        ResetLatch(MyLatch);
        
        AbsorbFsyncRequests();
        if (got_SIGHUP)//
        {
            got_SIGHUP = false;
            ProcessConfigFile(PGC_SIGHUP);
            
            UpdateSharedMemoryConfig();
        }
        if (checkpoint_requested)
        {
            //接收到checkpoint请求
            checkpoint_requested = false;//重置标志
            do_checkpoint = true;//需要执行checkpoint
            BgWriterStats.m_requested_checkpoints++;//计数
        }
        if (shutdown_requested)
        {
            //接收到关闭请求
            
            ExitOnAnyError = true;
            
            //关闭数据库
            ShutdownXLOG(0, 0);
            
            //checkpointer在这里正常退出
            proc_exit(0);       
        }
        
        now = (pg_time_t) time(NULL);//当前时间
        elapsed_secs = now - last_checkpoint_time;//已消逝的时间
        if (elapsed_secs >= CheckPointTimeout)
        {
            //超时
            if (!do_checkpoint)
                BgWriterStats.m_timed_checkpoints++;//没有接收到checkpoint请求,进行统计
            do_checkpoint = true;//设置标记
            flags |= CHECKPOINT_CAUSE_TIME;//设置标记
        }
        
        if (do_checkpoint)
        {
            bool        ckpt_performed = false;//设置标记
            bool        do_restartpoint;
            
            do_restartpoint = RecoveryInProgress();
            
            SpinLockAcquire(&CheckpointerShmem->ckpt_lck);
            flags |= CheckpointerShmem->ckpt_flags;
            CheckpointerShmem->ckpt_flags = 0;
            CheckpointerShmem->ckpt_started++;
            SpinLockRelease(&CheckpointerShmem->ckpt_lck);
            
            if (flags & CHECKPOINT_END_OF_RECOVERY)
                do_restartpoint = false;
            
            if (!do_restartpoint &&
                (flags & CHECKPOINT_CAUSE_XLOG) &&
                elapsed_secs < CheckPointWarning)
                ereport(LOG,
                        (errmsg_plural("checkpoints are occurring too frequently (%d second apart)",
                                       "checkpoints are occurring too frequently (%d seconds apart)",
                                       elapsed_secs,
                                       elapsed_secs),
                         errhint("Consider increasing the configuration parameter \"max_wal_size\".")));
            
            ckpt_active = true;
            if (do_restartpoint)
                //执行restartpoint
                ckpt_start_recptr = GetXLogReplayRecPtr(NULL);//获取Redo pint
            else
                //执行checkpoint
                ckpt_start_recptr = GetInsertRecPtr();//获取checkpoint XLOG Record插入的位置
            ckpt_start_time = now;//开始时间
            ckpt_cached_elapsed = 0;//消逝时间
            
            if (!do_restartpoint)
            {
                //执行checkpoint
                CreateCheckPoint(flags);//创建checkpoint
                ckpt_performed = true;//DONE!
            }
            else
                //恢复过程的restartpoint
                ckpt_performed = CreateRestartPoint(flags);
            
            smgrcloseall();
            
            SpinLockAcquire(&CheckpointerShmem->ckpt_lck);
            CheckpointerShmem->ckpt_done = CheckpointerShmem->ckpt_started;
            SpinLockRelease(&CheckpointerShmem->ckpt_lck);
            if (ckpt_performed)
            {
                //已完成checkpoint
                
                last_checkpoint_time = now;
            }
            else
            {
                //
                
                last_checkpoint_time = now - CheckPointTimeout + 15;
            }
            ckpt_active = false;
        }
        
        //在需要的时候,检查archive_timeout并切换xlog文件.
        CheckArchiveTimeout();
        
        pgstat_send_bgwriter();
        
        //重置相关变量
        now = (pg_time_t) time(NULL);
        elapsed_secs = now - last_checkpoint_time;
        if (elapsed_secs >= CheckPointTimeout)
            continue;           
        cur_timeout = CheckPointTimeout - elapsed_secs;
        if (XLogArchiveTimeout > 0 && !RecoveryInProgress())
        {
            elapsed_secs = now - last_xlog_switch_time;
            if (elapsed_secs >= XLogArchiveTimeout)
                continue;       
            cur_timeout = Min(cur_timeout, XLogArchiveTimeout - elapsed_secs);//获得最小休眠时间
        }
        (void) WaitLatch(MyLatch,
                         WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                         cur_timeout * 1000L  ,
                         WAIT_EVENT_CHECKPOINTER_MAIN);//休眠
    }
}
 

pqsigfunc
pqsignal(int signum, pqsigfunc handler)
{
    pqsigfunc   prevfunc;//函数
    if (signum >= PG_SIGNAL_COUNT || signum < 0)
        return SIG_ERR;//验证不通过,返回错误
    prevfunc = pg_signal_array[signum];//获取先前的处理函数
    pg_signal_array[signum] = handler;//注册函数
    return prevfunc;//返回先前注册的函数
}

XLogRecPtr
GetInsertRecPtr(void)
{
    XLogRecPtr  recptr;
    SpinLockAcquire(&XLogCtl->info_lck);
    recptr = XLogCtl->LogwrtRqst.Write;//获取插入位置
    SpinLockRelease(&XLogCtl->info_lck);
    return recptr;
}

三、跟踪分析

创建数据表,插入数据,执行checkpoint

testdb=# drop table t_wal_ckpt;
DROP TABLE
testdb=# create table t_wal_ckpt(c1 int not null,c2  varchar(40),c3 varchar(40));
CREATE TABLE
testdb=# insert into t_wal_ckpt(c1,c2,c3) values(1,'C2-1','C3-1');
INSERT 0 1
testdb=# 
testdb=# checkpoint; --> 第一次checkpoint

更新数据,执行checkpoint.

testdb=# update t_wal_ckpt set c2 = 'C2#'||substr(c2,4,40);
UPDATE 1
testdb=# checkpoint;

启动gdb,设置信号控制

(gdb) handle SIGINT print nostop pass
SIGINT is used by the debugger.
Are you sure you want to change it? (y or n) y
Signal        Stop  Print Pass to program Description
SIGINT        No  Yes Yes   Interrupt
(gdb) 
(gdb) b checkpointer.c:441
Breakpoint 1 at 0x815197: file checkpointer.c, line 441.
(gdb) c
Continuing.
Program received signal SIGINT, Interrupt.
Breakpoint 1, CheckpointerMain () at checkpointer.c:441
441       flags |= CheckpointerShmem->ckpt_flags;
(gdb)

查看共享内存信息CheckpointerShmem

(gdb) p *CheckpointerShmem
$1 = {checkpointer_pid = 1650, ckpt_lck = 1 '\001', ckpt_started = 2, ckpt_done = 2, ckpt_failed = 0, ckpt_flags = 44, 
  num_backend_writes = 0, num_backend_fsync = 0, num_requests = 0, max_requests = 65536, requests = 0x7f2cdda07b28}
(gdb)

设置相关信息CheckpointerShmem

441       flags |= CheckpointerShmem->ckpt_flags;
(gdb) n
442       CheckpointerShmem->ckpt_flags = 0;
(gdb) 
443       CheckpointerShmem->ckpt_started++;
(gdb) 
444       SpinLockRelease(&CheckpointerShmem->ckpt_lck);
(gdb) 
450       if (flags & CHECKPOINT_END_OF_RECOVERY)
(gdb) 
460       if (!do_restartpoint &&
(gdb) 
461         (flags & CHECKPOINT_CAUSE_XLOG) &&
(gdb) 
460       if (!do_restartpoint &&

初始化checkpointer进程在checkpoint过程中需使用的私有变量.
其中ckpt_start_recptr为插入点,即Redo point,5521180544转换为16进制为0x1 49168780

(gdb) 
474       ckpt_active = true;
(gdb) 
475       if (do_restartpoint)
(gdb) 
478         ckpt_start_recptr = GetInsertRecPtr();
(gdb) p XLogCtl->LogwrtRqst
$1 = {Write = 5521180544, Flush = 5521180544}
(gdb) n
479       ckpt_start_time = now;
(gdb) p ckpt_start_recptr
$2 = 5521180544
(gdb) n
480       ckpt_cached_elapsed = 0;
(gdb) 
485       if (!do_restartpoint)
(gdb)

执行checkpoint.OK!

(gdb) 
487         CreateCheckPoint(flags);
(gdb) 
488         ckpt_performed = true;
(gdb)

关闭资源,并设置共享内存中的信息

497       smgrcloseall();
(gdb) 
502       SpinLockAcquire(&CheckpointerShmem->ckpt_lck);
(gdb) 
503       CheckpointerShmem->ckpt_done = CheckpointerShmem->ckpt_started;
(gdb) 
504       SpinLockRelease(&CheckpointerShmem->ckpt_lck);
(gdb) 
506       if (ckpt_performed)
(gdb) p CheckpointerShmem
$3 = (CheckpointerShmemStruct *) 0x7fcecc063b00
(gdb) p *CheckpointerShmem
$4 = {checkpointer_pid = 1697, ckpt_lck = 0 '\000', ckpt_started = 1, ckpt_done = 1, ckpt_failed = 0, ckpt_flags = 0, 
  num_backend_writes = 0, num_backend_fsync = 0, num_requests = 0, max_requests = 65536, requests = 0x7fcecc063b28}
(gdb)

checkpoint请求已清空

(gdb) p CheckpointerShmem->requests[0]
$5 = {rnode = {spcNode = 0, dbNode = 0, relNode = 0}, forknum = MAIN_FORKNUM, segno = 0}

在需要的时候,检查archive_timeout并切换xlog文件.
休眠,直至接收到信号或者需要启动新的checkpoint或xlog文件切换.

(gdb) n
513         last_checkpoint_time = now;
(gdb) 
526       ckpt_active = false;
(gdb) 
530     CheckArchiveTimeout();
(gdb) 
539     pgstat_send_bgwriter();
(gdb) 
545     now = (pg_time_t) time(NULL);
(gdb) 
546     elapsed_secs = now - last_checkpoint_time;
(gdb) 
547     if (elapsed_secs >= CheckPointTimeout)
(gdb) p elapsed_secs
$7 = 1044
(gdb) p CheckPointTimeout
$8 = 900
(gdb) n
548       continue;     

已超时,执行新的checkpoint

(gdb) 
569   }
(gdb) 
352     bool    do_checkpoint = false;
(gdb) 
353     int     flags = 0;
(gdb) n
360     ResetLatch(MyLatch);
(gdb) 
365     AbsorbFsyncRequests();
(gdb) 
367     if (got_SIGHUP)
(gdb) 
385     if (checkpoint_requested)
(gdb) 
391     if (shutdown_requested)
(gdb) 
410     now = (pg_time_t) time(NULL);
(gdb) 
411     elapsed_secs = now - last_checkpoint_time;
(gdb) 
412     if (elapsed_secs >= CheckPointTimeout)
(gdb) p elapsed_secs
$9 = 1131
(gdb) n
414       if (!do_checkpoint)
(gdb) 
415         BgWriterStats.m_timed_checkpoints++;
(gdb) 
416       do_checkpoint = true;
(gdb) 
417       flags |= CHECKPOINT_CAUSE_TIME;
(gdb) 
423     if (do_checkpoint)
(gdb) 
425       bool    ckpt_performed = false;
(gdb) 
433       do_restartpoint = RecoveryInProgress();
(gdb) 
440       SpinLockAcquire(&CheckpointerShmem->ckpt_lck);
(gdb) 
Breakpoint 1, CheckpointerMain () at checkpointer.c:441
441       flags |= CheckpointerShmem->ckpt_flags;
(gdb) 
442       CheckpointerShmem->ckpt_flags = 0;
(gdb) 
443       CheckpointerShmem->ckpt_started++;
(gdb) c
Continuing.

“PostgreSQL的后台进程checkpointer分析”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-数据库
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯