这篇文章主要介绍“PostgreSQL中ReceiveXlogStream有什么作用”,在日常操作中,相信很多人在PostgreSQL中ReceiveXlogStream有什么作用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”PostgreSQL中ReceiveXlogStream有什么作用”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
本节简单介绍了PostgreSQL的备份工具pg_basebackup源码中实际执行备份逻辑的BaseBackup中对WAL数据进行备份的实现函数StartLogStreamer->LogStreamerMain及其主要的实现函数ReceiveXlogStream.
一、数据结构
logstreamer_param
WAL data streamer参数.
typedef struct
{
////后台连接
PGconn *bgconn;
//开始位置
XLogRecPtr startptr;
//目录或者tar文件,依赖于使用的模式
char xlog[MAXPGPATH];
//系统标识符
char *sysidentifier;
//时间线
int timeline;
} logstreamer_param;
StreamCtl
接收xlog流数据时的全局参数
typedef struct StreamCtl
{
//streaming的开始位置
XLogRecPtr startpos;
//时间线
TimeLineID timeline;
//系统标识符
char *sysidentifier;
//standby超时信息
int standby_message_timeout;
//是否同步(写入时是否马上Flush WAL data)
bool synchronous;
//在已归档的数据中标记segment为已完成
bool mark_done;
//刷新到磁盘上以确保数据的一致性状态(是否已刷新到磁盘上)
bool do_sync;
//在返回T时停止streaming
stream_stop_callback stream_stop;
//如有效,监测该socket中的输入并检查stream_stop()的返回
pgsocket stop_socket;
//如何写WAL
WalWriteMethod *walmethod;
//附加到部分接受文件的后缀
char *partial_suffix;
//使用的replication slot,如无则为NULL
char *replication_slot;
} StreamCtl;
二、源码解读
LogStreamerMain
WAL流复制主函数,用于fork后的子进程调用
static int
LogStreamerMain(logstreamer_param *param)
{
StreamCtl stream;//接收xlog流数据时的全局参数
in_log_streamer = true;
//初始化StreamCtl结构体
MemSet(&stream, 0, sizeof(stream));
stream.startpos = param->startptr;
stream.timeline = param->timeline;
stream.sysidentifier = param->sysidentifier;
stream.stream_stop = reached_end_position;
#ifndef WIN32
stream.stop_socket = bgpipe[0];
#else
stream.stop_socket = PGINVALID_SOCKET;
#endif
stream.standby_message_timeout = standby_message_timeout;
stream.synchronous = false;
stream.do_sync = do_sync;
stream.mark_done = true;
stream.partial_suffix = NULL;
stream.replication_slot = replication_slot;
if (format == 'p')
stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync);
else
stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel, do_sync);
//接收数据
if (!ReceiveXlogStream(param->bgconn, &stream))
return 1;
if (!stream.walmethod->finish())
{
fprintf(stderr,
_("%s: could not finish writing WAL files: %s\n"),
progname, strerror(errno));
return 1;
}
//结束连接
PQfinish(param->bgconn);
//普通文件格式
if (format == 'p')
FreeWalDirectoryMethod();
else
FreeWalTarMethod();
//是否内存
pg_free(stream.walmethod);
return 0;
}
ReceiveXlogStream
在指定的开始位置接收log stream
bool
ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
{
char query[128];
char slotcmd[128];
PGresult *res;
XLogRecPtr stoppos;
if (!CheckServerVersionForStreaming(conn))
return false;
if (stream->replication_slot != NULL)
{
//存在slot
reportFlushPosition = true;
sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
}
else
{
if (stream->synchronous)
reportFlushPosition = true;//同步
else
reportFlushPosition = false;//异步
slotcmd[0] = 0;//ASCII 0
}
if (stream->sysidentifier != NULL)
{
//系统标识符不为NULL
//验证系统标识符没有改变
//发送IDENTIFY_SYSTEM命令
res = PQexec(conn, "IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr,
_("%s: could not send replication command \"%s\": %s"),
progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
PQclear(res);
return false;
}
if (PQntuples(res) != 1 || PQnfields(res) < 3)
{
fprintf(stderr,
_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d r more fields\n"),
progname, PQntuples(res), PQnfields(res), 1, 3);
PQclear(res);
return false;
}
if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
{
fprintf(stderr,
_("%s: system identifier does not match between base backup and streaming onnection\n"),
progname);
PQclear(res);
return false;
}
if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
{
fprintf(stderr,
_("%s: starting timeline %u is not present in the server\n"),
progname, stream->timeline);
PQclear(res);
return false;
}
PQclear(res);
}
lastFlushPosition = stream->startpos;
while (1)
{
if (!existsTimeLineHistoryFile(stream))
{
//如不存在history文件
snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
//发送TIMELINE_HISTORY命令
res = PQexec(conn, query);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));
PQclear(res);
return false;
}
if (PQnfields(res) != 2 || PQntuples(res) != 1)
{
fprintf(stderr,
_("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d ields, expected %d rows and %d fields\n"),
progname, PQntuples(res), PQnfields(res), 1, 2);
}
//写入history文件到磁盘上
writeTimeLineHistoryFile(stream,
PQgetvalue(res, 0, 0),
PQgetvalue(res, 0, 1));
PQclear(res);
}
if (stream->stream_stop(stream->startpos, stream->timeline, false))
return true;
//在指定的位置初始化复制流
snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
slotcmd,
(uint32) (stream->startpos >> 32), (uint32) stream->startpos,
stream->timeline);
//发送START_REPLICATION命令
res = PQexec(conn, query);
if (PQresultStatus(res) != PGRES_COPY_BOTH)
{
fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
progname, "START_REPLICATION", PQresultErrorMessage(res));
PQclear(res);
return false;
}
PQclear(res);
//流化WAL
res = HandleCopyStream(conn, stream, &stoppos);
if (res == NULL)
goto error;
if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
uint32 newtimeline;//新的时间线
bool parsed;//是否解析
//读取结果集的末尾
parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
PQclear(res);
if (!parsed)
goto error;
//执行校验和坚持
if (newtimeline <= stream->timeline)
{
//新的时间线不可能小于等于stream中的时间线
fprintf(stderr,
_("%s: server reported unexpected next timeline %u, following timeline %u\n"),
progname, newtimeline, stream->timeline);
goto error;
}
if (stream->startpos > stoppos)
{
//开始位置大于结束位置
fprintf(stderr,
_("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline u to begin at %X/%X\n"),
progname,
stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos);
goto error;
}
//读取最后的结果,应为命令结束
res = PQgetResult(conn);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr,
_("%s: unexpected termination of replication stream: %s"),
progname, PQresultErrorMessage(res));
PQclear(res);
goto error;
}
PQclear(res);
stream->timeline = newtimeline;
stream->startpos = stream->startpos -
XLogSegmentOffset(stream->startpos, WalSegSz);
continue;//继续循环
}
else if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
PQclear(res);
if (stream->stream_stop(stoppos, stream->timeline, false))
return true;
else
{
fprintf(stderr, _("%s: replication stream was terminated before stop point\n"),
progname);
goto error;
}
}
else
{
//返回错误
fprintf(stderr,
_("%s: unexpected termination of replication stream: %s"),
progname, PQresultErrorMessage(res));
PQclear(res);
goto error;
}
}
error:
if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
progname, current_walfile_name, stream->walmethod->getlasterror());
walfile = NULL;
return false;
}
static PGresult *
HandleCopyStream(PGconn *conn, StreamCtl *stream,
XLogRecPtr *stoppos)
{
char *copybuf = NULL;
TimestampTz last_status = -1;
XLogRecPtr blockpos = stream->startpos;
still_sending = true;
while (1)
{
//循环处理
int r;
TimestampTz now;//时间戳
long sleeptime;
if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
goto error;
now = feGetCurrentTimestamp();
if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
{
if (stream->walmethod->sync(walfile) != 0)
{
fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
progname, current_walfile_name, stream->walmethod->getlasterror());
goto error;
}
lastFlushPosition = blockpos;
if (!sendFeedback(conn, blockpos, now, false))
goto error;
last_status = now;
}
if (still_sending && stream->standby_message_timeout > 0 &&
feTimestampDifferenceExceeds(last_status, now,
stream->standby_message_timeout))
{
//是时候发送反馈了.
if (!sendFeedback(conn, blockpos, now, false))
goto error;
last_status = now;
}
sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
last_status);
//拷贝stream中接收到的内容
r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, ©buf);
while (r != 0)
{
if (r == -1)
goto error;//出错
if (r == -2)
{
//已完结或出错
PGresult *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
if (res == NULL)
goto error;
else
return res;
}
//检查消息类型
if (copybuf[0] == 'k')
{
if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
&last_status))
goto error;
}
else if (copybuf[0] == 'w')
{
if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
goto error;
if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
goto error;
}
else
{
fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
progname, copybuf[0]);
goto error;
}
r = CopyStreamReceive(conn, 0, stream->stop_socket, ©buf);
}
}
error:
if (copybuf != NULL)
PQfreemem(copybuf);
return NULL;
}
static bool
CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
XLogRecPtr *stoppos)
{
if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
{
if (!close_walfile(stream, blockpos))
{
return false;
}
if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
{
fprintf(stderr, _("%s: could not send copy-end packet: %s"),
progname, PQerrorMessage(conn));
return false;
}
still_sending = false;
}
return true;
}
static int
CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
char **buffer)
{
char *copybuf = NULL;
int rawlen;
if (*buffer != NULL)
PQfreemem(*buffer);
*buffer = NULL;
rawlen = PQgetCopyData(conn, ©buf, 1);
if (rawlen == 0)
{
int ret;
ret = CopyStreamPoll(conn, timeout, stop_socket);
if (ret <= 0)
return ret;
if (PQconsumeInput(conn) == 0)
{
fprintf(stderr,
_("%s: could not receive data from WAL stream: %s"),
progname, PQerrorMessage(conn));
return -1;
}
rawlen = PQgetCopyData(conn, ©buf, 1);
if (rawlen == 0)
return 0;
}
if (rawlen == -1)
return -2;
if (rawlen == -2)
{
fprintf(stderr, _("%s: could not read COPY data: %s"),
progname, PQerrorMessage(conn));
return -1;
}
*buffer = copybuf;
return rawlen;
}
三、跟踪分析
备份命令
pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v
启动gdb跟踪(跟踪fork的子进程)
[xdb@localhost ~]$ gdb pg_basebackup
GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-100.el7
Copyright (C) 2013 Free Software Foundation, Inc.
License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>
This is free software: you are free to change and redistribute it.
There is NO WARRANTY, to the extent permitted by law. Type "show copying"
and "show warranty" for details.
This GDB was configured as "x86_64-redhat-linux-gnu".
For bug reporting instructions, please see:
<http://www.gnu.org/software/gdb/bugs/>...
Reading symbols from /appdb/xdb/pg11.2/bin/pg_basebackup...done.
(gdb) set args -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v
(gdb) set follow-fork-mode child
(gdb) b LogStreamerMain
Breakpoint 1 at 0x403c51: file pg_basebackup.c, line 490.
(gdb) r
Starting program: /appdb/xdb/pg11.2/bin/pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
Password:
pg_basebackup: initiating base backup, waiting for checkpoint to complete
pg_basebackup: checkpoint completed
pg_basebackup: write-ahead log start point: 0/5A000028 on timeline 16
pg_basebackup: starting background WAL receiver
pg_basebackup: created temporary replication slot "pg_basebackup_1604"
[New process 2036]
[Thread debugging using libthread_db enabled]backup/backup_label )
Using host libthread_db library "/lib64/libthread_db.so.1".
[Switching to Thread 0x7ffff7fe7840 (LWP 2036)]
Breakpoint 1, LogStreamerMain (param=0x629db0) at pg_basebackup.c:490
490 in_log_streamer = true;
305153/305153 kB (100%), 1/1 tablespace )
pg_basebackup: write-ahead log end point: 0/5A0000F8
pg_basebackup: waiting for background process to finish streaming ...
(gdb)
输入参数
(gdb) n
492 MemSet(&stream, 0, sizeof(stream));
(gdb) p *param
$1 = {bgconn = 0x62a280, startptr = 1509949440, xlog = "/data/backup/pg_wal", '\000' <repeats 1004 times>,
sysidentifier = 0x61f1a0 "6666964067616600474", timeline = 16}
(gdb)
设置StreamCtl结构体
(gdb) n
493 stream.startpos = param->startptr;
(gdb)
494 stream.timeline = param->timeline;
(gdb)
495 stream.sysidentifier = param->sysidentifier;
(gdb)
496 stream.stream_stop = reached_end_position;
(gdb)
498 stream.stop_socket = bgpipe[0];
(gdb)
502 stream.standby_message_timeout = standby_message_timeout;
(gdb)
503 stream.synchronous = false;
(gdb)
504 stream.do_sync = do_sync;
(gdb)
505 stream.mark_done = true;
(gdb)
506 stream.partial_suffix = NULL;
(gdb)
507 stream.replication_slot = replication_slot;
(gdb)
509 if (format == 'p')
(gdb)
510 stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync);
(gdb)
进入ReceiveXlogStream函数
(gdb)
514 if (!ReceiveXlogStream(param->bgconn, &stream))
(gdb) step
ReceiveXlogStream (conn=0x62a280, stream=0x7fffffffda30) at receivelog.c:458
458 if (!CheckServerVersionForStreaming(conn))
(gdb)
(gdb) n
472 if (stream->replication_slot != NULL)
(gdb) p *stream
$2 = {startpos = 1509949440, timeline = 16, sysidentifier = 0x61f1a0 "6666964067616600474",
standby_message_timeout = 10000, synchronous = false, mark_done = true, do_sync = true,
stream_stop = 0x403953 <reached_end_position>, stop_socket = 8, walmethod = 0x632b10, partial_suffix = 0x0,
replication_slot = 0x62a1e0 "pg_basebackup_1604"}
(gdb)
判断系统标识符和时间线
(gdb) n
474 reportFlushPosition = true;
(gdb)
475 sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
(gdb)
486 if (stream->sysidentifier != NULL)
(gdb)
489 res = PQexec(conn, "IDENTIFY_SYSTEM");
(gdb)
490 if (PQresultStatus(res) != PGRES_TUPLES_OK)
(gdb)
498 if (PQntuples(res) != 1 || PQnfields(res) < 3)
(gdb)
506 if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
(gdb) p PQgetvalue(res, 0, 0)
$3 = 0x633500 "6666964067616600474"
(gdb) n
514 if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
(gdb)
522 PQclear(res);
(gdb) p PQgetvalue(res, 0, 1)
$4 = 0x633514 "16"
(gdb)
不存在时间线history文件,生成history文件
(gdb) n
529 lastFlushPosition = stream->startpos;
(gdb)
539 if (!existsTimeLineHistoryFile(stream))
(gdb)
541 snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
(gdb)
542 res = PQexec(conn, query);
(gdb)
543 if (PQresultStatus(res) != PGRES_TUPLES_OK)
(gdb)
556 if (PQnfields(res) != 2 || PQntuples(res) != 1)
(gdb)
564 writeTimeLineHistoryFile(stream,
(gdb)
568 PQclear(res);
(gdb)
调用START_REPLICATION命令初始化
(gdb)
575 if (stream->stream_stop(stream->startpos, stream->timeline, false))
(gdb) n
579 snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
(gdb)
581 (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
(gdb)
579 snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
(gdb)
581 (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
(gdb)
579 snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
(gdb)
583 res = PQexec(conn, query);
(gdb)
584 if (PQresultStatus(res) != PGRES_COPY_BOTH)
(gdb)
591 PQclear(res);
(gdb)
执行命令,处理stream WAL,完成调用
595 if (res == NULL)
(gdb) p *res
$5 = {ntups = 0, numAttributes = 0, attDescs = 0x0, tuples = 0x0, tupArrSize = 0, numParameters = 0, paramDescs = 0x0,
resultStatus = PGRES_COMMAND_OK,
cmdStatus = "START_STREAMING\000\000\000\000\000\270\027u\367\377\177\000\000P/c\000\000\000\000\000CT\000\000\001", '\000' <repeats 19 times>, "\200\000\000", binary = 0, noticeHooks = {noticeRec = 0x7ffff7b9eaa4 <defaultNoticeReceiver>,
noticeRecArg = 0x0, noticeProc = 0x7ffff7b9eaf9 <defaultNoticeProcessor>, noticeProcArg = 0x0}, events = 0x0,
nEvents = 0, client_encoding = 0, errMsg = 0x0, errFields = 0x0, errQuery = 0x0, null_field = "", curBlock = 0x0,
curOffset = 0, spaceLeft = 0}
(gdb) n
608 if (PQresultStatus(res) == PGRES_TUPLES_OK)
(gdb)
666 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
(gdb)
668 PQclear(res);
(gdb)
676 if (stream->stream_stop(stoppos, stream->timeline, false))
(gdb)
677 return true;
(gdb)
702 }
(gdb)
LogStreamerMain (param=0x629db0) at pg_basebackup.c:523
523 if (!stream.walmethod->finish())
(gdb)
到此,关于“PostgreSQL中ReceiveXlogStream有什么作用”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!