本节继续介绍Postgresql的后台进程walsender,重点介绍的是调用栈中的exec_replication_command和StartReplication函数.
调用栈如下:
(gdb) bt
#0 0x00007fb6e6390903 in __epoll_wait_nocancel () from /lib64/libc.so.6
#1 0x000000000088e668 in WaitEventSetWaitBlock (set=0x10ac808, cur_timeout=29999, occurred_events=0x7ffd634441b0,
nevents=1) at latch.c:1048
#2 0x000000000088e543 in WaitEventSetWait (set=0x10ac808, timeout=29999, occurred_events=0x7ffd634441b0, nevents=1,
wait_event_info=83886092) at latch.c:1000
#3 0x000000000088dcec in WaitLatchOrSocket (latch=0x7fb6dcbfc4d4, wakeEvents=27, sock=10, timeout=29999,
wait_event_info=83886092) at latch.c:385
#4 0x000000000085405b in WalSndLoop (send_data=0x8547fe <XLogSendPhysical>) at walsender.c:2229
#5 0x0000000000851c93 in StartReplication (cmd=0x10ab750) at walsender.c:684
#6 0x00000000008532f0 in exec_replication_command (cmd_string=0x101dd78 "START_REPLICATION 0/5D000000 TIMELINE 16")
at walsender.c:1539
#7 0x00000000008c0170 in PostgresMain (argc=1, argv=0x1049cb8, dbname=0x1049ba8 "", username=0x1049b80 "replicator")
at postgres.c:4178
#8 0x000000000081e06c in BackendRun (port=0x103fb50) at postmaster.c:4361
#9 0x000000000081d7df in BackendStartup (port=0x103fb50) at postmaster.c:4033
#10 0x0000000000819bd9 in ServerLoop () at postmaster.c:1706
#11 0x000000000081948f in PostmasterMain (argc=1, argv=0x1018a50) at postmaster.c:1379
#12 0x0000000000742931 in main (argc=1, argv=0x1018a50) at main.c:228
一、数据结构
StringInfo
StringInfoData结构体保存关于扩展字符串的相关信息.
/*-------------------------
* StringInfoData holds information about an extensible string.
* StringInfoData结构体保存关于扩展字符串的相关信息.
* data is the current buffer for the string (allocated with palloc).
* data 通过palloc分配的字符串缓存
* len is the current string length. There is guaranteed to be
* a terminating '\0' at data[len], although this is not very
* useful when the string holds binary data rather than text.
* len 是当前字符串的长度.保证以ASCII 0(\0)结束(data[len] = '\0').
* 虽然如果存储的是二进制数据而不是文本时不太好使.
* maxlen is the allocated size in bytes of 'data', i.e. the maximum
* string size (including the terminating '\0' char) that we can
* currently store in 'data' without having to reallocate
* more space. We must always have maxlen > len.
* maxlen 以字节为单位已分配的'data'的大小,限定了最大的字符串大小(包括结尾的ASCII 0)
* 小于此尺寸的数据可以直接存储而无需重新分配.
* cursor is initialized to zero by makeStringInfo or initStringInfo,
* but is not otherwise touched by the stringinfo.c routines.
* Some routines use it to scan through a StringInfo.
* cursor 通过makeStringInfo或initStringInfo初始化为0,但不受stringinfo.c例程的影响.
* 某些例程使用该字段扫描StringInfo
*-------------------------
*/
typedef struct StringInfoData
{
char *data;
int len;
int maxlen;
int cursor;
} StringInfoData;
typedef StringInfoData *StringInfo;
二、源码解读
exec_replication_command
exec_replication_command执行复制命令,如cmd_string被识别为WalSender命令,返回T,否则返回F.
其主要逻辑如下:
1.执行相关初始化和校验
2.切换内存上下文
3.初始化复制扫描器
4.执行事务相关的判断或校验
5.初始化输入输出消息
6.根据命令类型执行相应的命令
6.1命令类型为T_StartReplicationCmd,调用StartReplication
/*
* Execute an incoming replication command.
* 执行复制命令.
*
* Returns true if the cmd_string was recognized as WalSender command, false
* if not.
* 如cmd_string被识别为WalSender命令,返回T,否则返回F
*/
bool
exec_replication_command(const char *cmd_string)
{
int parse_rc;
Node *cmd_node;
MemoryContext cmd_context;
MemoryContext old_context;
/*
* If WAL sender has been told that shutdown is getting close, switch its
* status accordingly to handle the next replication commands correctly.
* 如果WAL sender已被通知关闭,切换状态以应对接下来的复制命令.
*/
if (got_STOPPING)
WalSndSetState(WALSNDSTATE_STOPPING);
/*
* Throw error if in stopping mode. We need prevent commands that Could
* generate WAL while the shutdown checkpoint is being written. To be
* safe, we just prohibit all new commands.
* 如在stopping模式,则抛出错误.
* 我们需要在shutdown checkpoint写入期间禁止命令的产生.
* 安全期间,禁止所有新的命令.
*/
if (MyWalSnd->state == WALSNDSTATE_STOPPING)
ereport(ERROR,
(errmsg("cannot execute new commands while WAL sender is in stopping mode")));
/*
* CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
* command arrives. Clean up the old stuff if there's anything.
* CREATE_REPLICATION_SLOT ... LOGICAL 导出快照直至下个命令到达.
* 如存在,则清理旧的stuff.
*
*/
SnapBuildClearExportedSnapshot();
//检查中断
CHECK_FOR_INTERRUPTS();
//命令上下文
cmd_context = AllocSetContextCreate(CurrentMemoryContext,
"Replication command context",
ALLOCSET_DEFAULT_SIZES);
old_context = MemoryContextSwitchTo(cmd_context);
//初始化复制扫描器
replication_scanner_init(cmd_string);
parse_rc = replication_yyparse();
if (parse_rc != 0)
ereport(ERROR,
(errcode(ERRCODE_Syntax_ERROR),
(errmsg_internal("replication command parser returned %d",
parse_rc))));
cmd_node = replication_parse_result;
/*
* Log replication command if log_replication_commands is enabled. Even
* when it's disabled, log the command with DEBUG1 level for backward
* compatibility. Note that sql commands are not logged here, and will be
* logged later if log_statement is enabled.
* 如log_replication_commands启用,则记录复制命令在日志中.
* 就算该选项被禁止,通过DEBUG1级别记录日志.
* 注意sql命令不在这里记录,在log_statement启用的情况下在后续进行记录.
*
*/
if (cmd_node->type != T_sqlCmd)
ereport(log_replication_commands ? LOG : DEBUG1,
(errmsg("received replication command: %s", cmd_string)));
/*
* CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
* called outside of transaction the snapshot should be cleared here.
* CREATE_REPLICATION_SLOT ... LOGICAL导出快照.
* 该命令如果在事务的外层被调用,那么快照应在这里清除.
*/
if (!IsTransactionBlock())
SnapBuildClearExportedSnapshot();
/*
* For aborted transactions, don't allow anything except pure sql, the
* exec_simple_query() will handle it correctly.
* 对于废弃的事务,除了纯sql外不允许其他命令,exec_simple_query()函数可以正确处理这种情况.
*/
if (IsAbortedTransactionBlockState() && !IsA(cmd_node, sqlCmd))
ereport(ERROR,
(errcode(ERRCODE_IN_Failed_sql_TRANSACTION),
errmsg("current transaction is aborted, "
"commands ignored until end of transaction block")));
CHECK_FOR_INTERRUPTS();
/*
* Allocate buffers that will be used for each outgoing and incoming
* message. We do this just once per command to reduce palloc overhead.
* 为消息I/O分配缓存.
* 每个命令执行一次以减少palloc的负载.
*/
initStringInfo(&output_message);
initStringInfo(&reply_message);
initStringInfo(&tmpbuf);
/* Report to pgstat that this process is running */
//向pgstat报告该进程正在运行.
pgstat_report_activity(STATE_RUNNING, NULL);
//根据命令类型执行相应的命令
switch (cmd_node->type)
{
case T_IdentifySystemCmd:
//识别系统
IdentifySystem();
break;
case T_BaseBackupCmd:
//BASE_BACKUP
PreventInTransactionBlock(true, "BASE_BACKUP");
SendBaseBackup((BaseBackupCmd *) cmd_node);
break;
case T_CreateReplicationSlotCmd:
//创建复制slot
CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
break;
case T_DropReplicationSlotCmd:
//删除复制slot
DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
break;
case T_StartReplicationCmd:
//START_REPLICATION
{
StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
PreventInTransactionBlock(true, "START_REPLICATION");
if (cmd->kind == REPLICATION_KIND_PHYSICAL)
StartReplication(cmd);
else
StartLogicalReplication(cmd);
break;
}
case T_TimeLineHistoryCmd:
//构造时间线历史 TIMELINE_HISTORY
PreventInTransactionBlock(true, "TIMELINE_HISTORY");
SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
break;
case T_VariableShowStmt:
//
{
DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);
VariableShowStmt *n = (VariableShowStmt *) cmd_node;
GetPGVariable(n->name, dest);
}
break;
case T_sqlCmd:
//sql命令
if (MyDatabaseId == InvalidOid)
ereport(ERROR,
(errmsg("cannot execute sql commands in WAL sender for physical replication")));
/* Report to pgstat that this process is Now idle */
pgstat_report_activity(STATE_IDLE, NULL);
/* Tell the caller that this wasn't a WalSender command. */
return false;
default:
//其他命令
elog(ERROR, "unrecognized replication command node tag: %u",
cmd_node->type);
}
/* done */
//执行完毕,回到原来的内存上下文中
MemoryContextSwitchTo(old_context);
MemoryContextDelete(cmd_context);
/* Send CommandComplete message */
//命令结束
EndCommand("SELECT", DestRemote);
/* Report to pgstat that this process is Now idle */
//报告状态
pgstat_report_activity(STATE_IDLE, NULL);
return true;
}
StartReplication
StartReplication处理START_REPLICATION命令.
其主要逻辑如下:
1.执行相关初始化和校验
2.选择时间线
3.进入copY模式
3.1设置状态
3.2发送copyBothResponse消息,启动streaming
3.3初始化相关变量,如共享内存状态等
3.4进入主循环(WalSndLoop)
/*
* Handle START_REPLICATION command.
* 处理START_REPLICATION命令
*
* At the moment, this never returns, but an ereport(ERROR) will take us back
* to the main loop.
* 该函数不会返回,但ereport(ERROR)调用可以回到主循环
*/
static void
StartReplication(StartReplicationCmd *cmd)
{
StringInfoData buf;
XLogRecPtr FlushPtr;
if (ThisTimeLineID == 0)
//时间线校验
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("IDENTIFY_SYstem has not been run before START_REPLICATION")));
/*
* We assume here that we're logging enough information in the WAL for
* log-shipping, since this is checked in PostmasterMain().
* 在这里,由于在PostmasterMain()假定已为log-shipping记录了足够多的信息
*
* NOTE: wal_level can only change at shutdown, so in most cases it is
* difficult for there to be WAL data that we can still see that was
* written at wal_level='minimal'.
* 注意:wal_level只能在shutdown的情况下进行修改,
* 因此在大多数情况下,很难看到在wal_level='minimal'的情况下的WAL数据.
*/
if (cmd->slotname)
{
ReplicationSlotAcquire(cmd->slotname, true);
//#define SlotIsLogical ( slot ) (slot->data.database != InvalidOid)
if (SlotIsLogical(MyReplicationSlot))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
(errmsg("cannot use a logical replication slot for physical replication"))));
}
/*
* Select the timeline. If it was given explicitly by the client, use
* that. Otherwise use the timeline of the last replayed record, which is
* kept in ThisTimeLineID.
* 选择时间线.
* 如果通过客户端明确给出,则使用该值.
* 否则的话,使用最后重放记录的时间线,在ThisTimeLineID中保存.
*/
if (am_cascading_walsender)
{
/* this also updates ThisTimeLineID */
//这也会更新ThisTimeLineID变量
FlushPtr = GetStandbyFlushRecPtr();
}
else
FlushPtr = GetFlushRecPtr();
if (cmd->timeline != 0)
{
XLogRecPtr switchpoint;
sendTimeLine = cmd->timeline;
if (sendTimeLine == ThisTimeLineID)
{
sendTimeLineIsHistoric = false;
sendTimeLineValidUpto = InvalidXLogRecPtr;
}
else
{
List *timeLineHistory;
sendTimeLineIsHistoric = true;
/*
* Check that the timeline the client requested exists, and the
* requested start location is on that timeline.
* 检查客户端请求的时间线是否存在,请求的开始位置是否在该时间线上.
*/
timeLineHistory = readTimeLineHistory(ThisTimeLineID);
switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
&sendTimeLineNextTLI);
list_free_deep(timeLineHistory);
/*
* Found the requested timeline in the history. Check that
* requested startpoint is on that timeline in our history.
* 通过历史文件找到请求的时间线.
* 在历史中检查请求的开始点是否在时间线上.
*
* This is quite loose on purpose. We only check that we didn't
* fork off the requested timeline before the switchpoint. We
* don't check that we switched *to* it before the requested
* starting point. This is because the client can legitimately
* request to start replication from the beginning of the WAL
* segment that contains switchpoint, but on the new timeline, so
* that it doesn't end up with a partial segment. If you ask for
* too old a starting point, you'll get an error later when we
* fail to find the requested WAL segment in pg_wal.
* 这是有意为之.我们只检查在切换点之前没有fork off的请求的时间线.
* 我们不会检查在请求的开始点之前的时间线.
* 这是因为客户端可以合法地请求从包含交换点的WAL端的开始处进行复制,
* 在新的时间线上如此执行,以避免出现由于部分segment的问题导致出错.
* 如果客户端请求一个较旧的开始点,在pg_wal中无法找到请求的WAL段时会报错.
*
* XXX: we Could be more strict here and only allow a startpoint
* that's older than the switchpoint, if it's still in the same
* WAL segment.
* XXX: 我们可以更严格,如果仍然在同一个WAL segment中,那么可以只允许比切换点旧的开始点
*/
if (!XLogRecPtrIsInvalid(switchpoint) &&
switchpoint < cmd->startpoint)
{
ereport(ERROR,
(errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
(uint32) (cmd->startpoint >> 32),
(uint32) (cmd->startpoint),
cmd->timeline),
errdetail("This server's history forked from timeline %u at %X/%X.",
cmd->timeline,
(uint32) (switchpoint >> 32),
(uint32) (switchpoint))));
}
sendTimeLineValidUpto = switchpoint;
}
}
else
{
sendTimeLine = ThisTimeLineID;
sendTimeLineValidUpto = InvalidXLogRecPtr;
sendTimeLineIsHistoric = false;
}
streamingDonesending = streamingDoneReceiving = false;
/* If there is nothing to stream, don't even enter copY mode */
//如果没有任何东西需要stream,不需要启动copY命令
if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
{
/*
* When we first start replication the standby will be behind the
* primary. For some applications, for example synchronous
* replication, it is important to have a clear state for this initial
* catchup mode, so we can trigger actions when we change streaming
* state later. We may stay in this state for a long time, which is
* exactly why we want to be able to monitor whether or not we are
* still here.
* 在首次启动复制时,standby节点会落后于master节点.
* 对于某些应用,比如同步复制,对于这种初始的catchup模式有一个干净的状态是十分重要的,
* 因此在改变streaming状态时我们可以触发相关的动作.
* 我们可以处于这种状态很长时间,这正是我们希望有能力监控我们是否仍在这里的原因.
*/
//设置状态
WalSndSetState(WALSNDSTATE_CATCHUP);
/* Send a copyBothResponse message, and start streaming */
//发送copyBothResponse消息,启动streaming
pq_beginmessage(&buf, 'W');//W->copY命令?
pq_sendbyte(&buf, 0);
pq_sendint16(&buf, 0);
pq_endmessage(&buf);
pq_flush();
/*
* Don't allow a request to stream from a future point in WAL that
* hasn't been flushed to disk in this server yet.
* 不允许请求该服务器上一个尚未刷入到磁盘上的WAL未来位置.
*/
if (FlushPtr < cmd->startpoint)
{
ereport(ERROR,
(errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
(uint32) (cmd->startpoint >> 32),
(uint32) (cmd->startpoint),
(uint32) (FlushPtr >> 32),
(uint32) (FlushPtr))));
}
/* Start streaming from the requested point */
//从请求点开始streaming
sentPtr = cmd->startpoint;
/* Initialize shared memory status, too */
//初始化共享内存状态
SpinLockAcquire(&MyWalSnd->mutex);
MyWalSnd->sentPtr = sentPtr;
SpinLockRelease(&MyWalSnd->mutex);
SyncRepInitConfig();
/* Main loop of walsender */
//walsender主循环,开始复制,激活复制
replication_active = true;
//主循环
WalSndLoop(XLogSendPhysical);
//完结后设置为非活动状态
replication_active = false;
if (got_STOPPING)
proc_exit(0);//退出
//设置状态
WalSndSetState(WALSNDSTATE_STARTUP);
Assert(streamingDonesending && streamingDoneReceiving);
}
if (cmd->slotname)
ReplicationSlotRelease();
/*
* copy is finished Now. Send a single-row result set indicating the next
* timeline.
* copy命令已完结.发送单行结果集以提升下一个timeline
*/
if (sendTimeLineIsHistoric)
{
char startpos_str[8 + 1 + 8 + 1];
DestReceiver *dest;
TupOutputState *tstate;
TupleDesc tupdesc;
Datum values[2];
bool nulls[2];
snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
(uint32) (sendTimeLineValidUpto >> 32),
(uint32) sendTimeLineValidUpto);
dest = CreateDestReceiver(DestRemoteSimple);
MemSet(nulls, false, sizeof(nulls));
/*
* Need a tuple descriptor representing two columns. int8 may seem
* like a surprising data type for this, but in theory int4 would not
* be wide enough for this, as TimeLineID is unsigned.
*/
tupdesc = CreateTemplateTupleDesc(2);
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
INT8OID, -1, 0);
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
TEXTOID, -1, 0);
/* prepare for projection of tuple */
tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
values[1] = CStringGetTextDatum(startpos_str);
/* send it to dest */
do_tup_output(tstate, values, nulls);
end_tup_output(tstate);
}
/* Send CommandComplete message */
pq_puttextmessage('C', "START_STREAMING");
}
三、跟踪分析
在主节点上用gdb跟踪postmaster,在PostgresMain上设置断点后启动standby节点,进入断点
[xdb@localhost ~]$ ps -ef|grep postgres
xdb 1339 1 2 14:45 pts/0 00:00:00 /appdb/xdb/pg11.2/bin/postgres
[xdb@localhost ~]$ gdb -p 1339
GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-100.el7
...
(gdb) set follow-fork-mode child
(gdb) b exec_replication_command
Breakpoint 1 at 0x852fd2: file walsender.c, line 1438.
(gdb) c
Continuing.
[New process 1356]
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
[Switching to Thread 0x7f5df9d2d8c0 (LWP 1356)]
Breakpoint 1, exec_replication_command (cmd_string=0x1d66d78 "IDENTIFY_SYstem") at walsender.c:1438
1438 if (got_STOPPING)
(gdb)
第一个命令是IDENTIFY_SYstem,第二个命令才是需要跟踪的对象START_REPLICATION
(gdb) c
Continuing.
Breakpoint 1, exec_replication_command (cmd_string=0x1d66d78 "START_REPLICATION 0/5D000000 TIMELINE 16") at walsender.c:1438
1438 if (got_STOPPING)
(gdb)
1.执行相关初始化和校验
(gdb) n
1446 if (MyWalSnd->state == WALSNDSTATE_STOPPING)
(gdb)
1454 SnapBuildClearExportedSnapshot();
(gdb) p *MyWalSnd
$1 = {pid = 1356, state = WALSNDSTATE_STARTUP, sentPtr = 0, needreload = false, write = 0, flush = 0, apply = 0,
writeLag = -1, flushLag = -1, applyLag = -1, mutex = 0 '\000', latch = 0x7f5dee92c4d4, sync_standby_priority = 0}
(gdb) n
1456 CHECK_FOR_INTERRUPTS();
(gdb)
2.切换内存上下文
(gdb)
1458 cmd_context = AllocSetContextCreate(CurrentMemoryContext,
(gdb)
1461 old_context = MemoryContextSwitchTo(cmd_context);
(gdb)
3.初始化复制扫描器
(gdb)
1463 replication_scanner_init(cmd_string);
(gdb) n
1464 parse_rc = replication_yyparse();
(gdb)
1465 if (parse_rc != 0)
(gdb) p parse_rc
$3 = 0
(gdb)
(gdb) n
1471 cmd_node = replication_parse_result;
(gdb)
(gdb)
1479 if (cmd_node->type != T_sqlCmd)
(gdb) n
1480 ereport(log_replication_commands ? LOG : DEBUG1,
(gdb) p cmd_node
$4 = (Node *) 0x1df4710
(gdb) p *cmd_node
$5 = {type = T_StartReplicationCmd}
(gdb)
4.执行事务相关的判断或校验
(gdb) n
1487 if (!IsTransactionBlock())
(gdb)
1488 SnapBuildClearExportedSnapshot();
(gdb)
1494 if (IsAbortedTransactionBlockState() && !IsA(cmd_node, sqlCmd))
(gdb)
1500 CHECK_FOR_INTERRUPTS();
(gdb)
5.初始化输入输出消息
(gdb)
1506 initStringInfo(&output_message);
(gdb)
1507 initStringInfo(&reply_message);
(gdb)
1508 initStringInfo(&tmpbuf);
(gdb)
1511 pgstat_report_activity(STATE_RUNNING, NULL);
6.根据命令类型执行相应的命令
6.1命令类型为T_StartReplicationCmd,调用StartReplication
(gdb) n
1513 switch (cmd_node->type)
(gdb)
1534 StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
(gdb)
1536 PreventInTransactionBlock(true, "START_REPLICATION");
(gdb)
1538 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
(gdb)
1539 StartReplication(cmd);
进入StartReplication
1539 StartReplication(cmd);
(gdb) step
StartReplication (cmd=0x1df4710) at walsender.c:532
532 if (ThisTimeLineID == 0)
(gdb)
1.执行相关初始化和校验
(gdb) n
546 if (cmd->slotname)
(gdb)
560 if (am_cascading_walsender)
(gdb)
2.选择时间线
(gdb) n
568 if (cmd->timeline != 0)
(gdb)
572 sendTimeLine = cmd->timeline;
(gdb)
573 if (sendTimeLine == ThisTimeLineID)
(gdb)
575 sendTimeLineIsHistoric = false;
(gdb) p FlushPtr
$9 = 1560397696
(gdb) n
576 sendTimeLineValidUpto = InvalidXLogRecPtr;
(gdb)
634 streamingDonesending = streamingDoneReceiving = false;
(gdb) p sendTimeLine
$10 = 16
(gdb) p ThisTimeLineID
$11 = 16
(gdb) p *cmd
$12 = {type = T_StartReplicationCmd, kind = REPLICATION_KIND_PHYSICAL, slotname = 0x0, timeline = 16,
startpoint = 1560281088, options = 0x0}
(gdb)
3.进入copY模式
(gdb) n
637 if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
(gdb)
3.1设置状态
648 WalSndSetState(WALSNDSTATE_CATCHUP);
(gdb) p sendTimeLineValidUpto
$13 = 0
(gdb) p cmd->startpoint
$14 = 1560281088
(gdb)
3.2发送copyBothResponse消息,启动streaming
(gdb) n
651 pq_beginmessage(&buf, 'W');
(gdb)
652 pq_sendbyte(&buf, 0);
(gdb)
653 pq_sendint16(&buf, 0);
(gdb)
654 pq_endmessage(&buf);
(gdb) p buf
$15 = {data = 0x1df53b0 "", len = 3, maxlen = 1024, cursor = 87}
(gdb) p buf->data
$16 = 0x1df53b0 ""
(gdb) x/hb buf->data
0x1df53b0: 0
(gdb) x/32hb buf->data
0x1df53b0: 0 0 0 127 127 127 127 127
0x1df53b8: 127 127 127 127 127 127 127 127
0x1df53c0: 127 127 127 127 127 127 127 127
0x1df53c8: 127 127 127 127 127 127 127 127
(gdb)
3.3初始化相关变量,如共享内存状态等
(gdb) n
655 pq_flush();
(gdb)
661 if (FlushPtr < cmd->startpoint)
(gdb) p FlushPtr
$17 = 1560397696
(gdb) p cmd->startpoint
$18 = 1560281088
(gdb) n
672 sentPtr = cmd->startpoint;
(gdb)
675 SpinLockAcquire(&MyWalSnd->mutex);
(gdb)
676 MyWalSnd->sentPtr = sentPtr;
(gdb)
677 SpinLockRelease(&MyWalSnd->mutex);
(gdb)
679 SyncRepInitConfig();
(gdb)
682 replication_active = true;
3.4进入主循环(WalSndLoop)
(gdb)
684 WalSndLoop(XLogSendPhysical);
(gdb)
DONE!
四、参考资料
PG Source Code