2
0

walreceiver.h 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. /*-------------------------------------------------------------------------
  2. *
  3. * walreceiver.h
  4. * Exports from replication/walreceiverfuncs.c.
  5. *
  6. * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
  7. *
  8. * src/include/replication/walreceiver.h
  9. *
  10. *-------------------------------------------------------------------------
  11. */
  12. #ifndef _WALRECEIVER_H
  13. #define _WALRECEIVER_H
  14. #include "access/xlog.h"
  15. #include "access/xlogdefs.h"
  16. #include "getaddrinfo.h" /* for NI_MAXHOST */
  17. #include "pgtime.h"
  18. #include "port/atomics.h"
  19. #include "replication/logicalproto.h"
  20. #include "replication/walsender.h"
  21. #include "storage/condition_variable.h"
  22. #include "storage/latch.h"
  23. #include "storage/spin.h"
  24. #include "utils/tuplestore.h"
  25. /* user-settable parameters */
  26. extern PGDLLIMPORT int wal_receiver_status_interval;
  27. extern PGDLLIMPORT int wal_receiver_timeout;
  28. extern PGDLLIMPORT bool hot_standby_feedback;
  29. /*
  30. * MAXCONNINFO: maximum size of a connection string.
  31. *
  32. * XXX: Should this move to pg_config_manual.h?
  33. */
  34. #define MAXCONNINFO 1024
  35. /* Can we allow the standby to accept replication connection from another standby? */
  36. #define AllowCascadeReplication() (EnableHotStandby && max_wal_senders > 0)
  37. /*
  38. * Values for WalRcv->walRcvState.
  39. */
  40. typedef enum
  41. {
  42. WALRCV_STOPPED, /* stopped and mustn't start up again */
  43. WALRCV_STARTING, /* launched, but the process hasn't
  44. * initialized yet */
  45. WALRCV_STREAMING, /* walreceiver is streaming */
  46. WALRCV_WAITING, /* stopped streaming, waiting for orders */
  47. WALRCV_RESTARTING, /* asked to restart streaming */
  48. WALRCV_STOPPING /* requested to stop, but still running */
  49. } WalRcvState;
  50. /* Shared memory area for management of walreceiver process */
  51. typedef struct
  52. {
  53. /*
  54. * PID of currently active walreceiver process, its current state and
  55. * start time (actually, the time at which it was requested to be
  56. * started).
  57. */
  58. pid_t pid;
  59. WalRcvState walRcvState;
  60. ConditionVariable walRcvStoppedCV;
  61. pg_time_t startTime;
  62. /*
  63. * receiveStart and receiveStartTLI indicate the first byte position and
  64. * timeline that will be received. When startup process starts the
  65. * walreceiver, it sets these to the point where it wants the streaming to
  66. * begin.
  67. */
  68. XLogRecPtr receiveStart;
  69. TimeLineID receiveStartTLI;
  70. /*
  71. * flushedUpto-1 is the last byte position that has already been received,
  72. * and receivedTLI is the timeline it came from. At the first startup of
  73. * walreceiver, these are set to receiveStart and receiveStartTLI. After
  74. * that, walreceiver updates these whenever it flushes the received WAL to
  75. * disk.
  76. */
  77. XLogRecPtr flushedUpto;
  78. TimeLineID receivedTLI;
  79. /*
  80. * latestChunkStart is the starting byte position of the current "batch"
  81. * of received WAL. It's actually the same as the previous value of
  82. * flushedUpto before the last flush to disk. Startup process can use
  83. * this to detect whether it's keeping up or not.
  84. */
  85. XLogRecPtr latestChunkStart;
  86. /*
  87. * Time of send and receive of any message received.
  88. */
  89. TimestampTz lastMsgSendTime;
  90. TimestampTz lastMsgReceiptTime;
  91. /*
  92. * Latest reported end of WAL on the sender
  93. */
  94. XLogRecPtr latestWalEnd;
  95. TimestampTz latestWalEndTime;
  96. /*
  97. * connection string; initially set to connect to the primary, and later
  98. * clobbered to hide security-sensitive fields.
  99. */
  100. char conninfo[MAXCONNINFO];
  101. /*
  102. * Host name (this can be a host name, an IP address, or a directory path)
  103. * and port number of the active replication connection.
  104. */
  105. char sender_host[NI_MAXHOST];
  106. int sender_port;
  107. /*
  108. * replication slot name; is also used for walreceiver to connect with the
  109. * primary
  110. */
  111. char slotname[NAMEDATALEN];
  112. /*
  113. * If it's a temporary replication slot, it needs to be recreated when
  114. * connecting.
  115. */
  116. bool is_temp_slot;
  117. /* set true once conninfo is ready to display (obfuscated pwds etc) */
  118. bool ready_to_display;
  119. /*
  120. * Latch used by startup process to wake up walreceiver after telling it
  121. * where to start streaming (after setting receiveStart and
  122. * receiveStartTLI), and also to tell it to send apply feedback to the
  123. * primary whenever specially marked commit records are applied. This is
  124. * normally mapped to procLatch when walreceiver is running.
  125. */
  126. Latch *latch;
  127. slock_t mutex; /* locks shared variables shown above */
  128. /*
  129. * Like flushedUpto, but advanced after writing and before flushing,
  130. * without the need to acquire the spin lock. Data can be read by another
  131. * process up to this point, but shouldn't be used for data integrity
  132. * purposes.
  133. */
  134. pg_atomic_uint64 writtenUpto;
  135. /*
  136. * force walreceiver reply? This doesn't need to be locked; memory
  137. * barriers for ordering are sufficient. But we do need atomic fetch and
  138. * store semantics, so use sig_atomic_t.
  139. */
  140. sig_atomic_t force_reply; /* used as a bool */
  141. } WalRcvData;
  142. extern PGDLLIMPORT WalRcvData *WalRcv;
  143. typedef struct
  144. {
  145. bool logical; /* True if this is logical replication stream,
  146. * false if physical stream. */
  147. char *slotname; /* Name of the replication slot or NULL. */
  148. XLogRecPtr startpoint; /* LSN of starting point. */
  149. union
  150. {
  151. struct
  152. {
  153. TimeLineID startpointTLI; /* Starting timeline */
  154. } physical;
  155. struct
  156. {
  157. uint32 proto_version; /* Logical protocol version */
  158. List *publication_names; /* String list of publications */
  159. bool binary; /* Ask publisher to use binary */
  160. bool streaming; /* Streaming of large transactions */
  161. bool twophase; /* Streaming of two-phase transactions at
  162. * prepare time */
  163. } logical;
  164. } proto;
  165. } WalRcvStreamOptions;
  166. struct WalReceiverConn;
  167. typedef struct WalReceiverConn WalReceiverConn;
  168. /*
  169. * Status of walreceiver query execution.
  170. *
  171. * We only define statuses that are currently used.
  172. */
  173. typedef enum
  174. {
  175. WALRCV_ERROR, /* There was error when executing the query. */
  176. WALRCV_OK_COMMAND, /* Query executed utility or replication
  177. * command. */
  178. WALRCV_OK_TUPLES, /* Query returned tuples. */
  179. WALRCV_OK_COPY_IN, /* Query started COPY FROM. */
  180. WALRCV_OK_COPY_OUT, /* Query started COPY TO. */
  181. WALRCV_OK_COPY_BOTH /* Query started COPY BOTH replication
  182. * protocol. */
  183. } WalRcvExecStatus;
  184. /*
  185. * Return value for walrcv_exec, returns the status of the execution and
  186. * tuples if any.
  187. */
  188. typedef struct WalRcvExecResult
  189. {
  190. WalRcvExecStatus status;
  191. int sqlstate;
  192. char *err;
  193. Tuplestorestate *tuplestore;
  194. TupleDesc tupledesc;
  195. } WalRcvExecResult;
  196. /* WAL receiver - libpqwalreceiver hooks */
  197. /*
  198. * walrcv_connect_fn
  199. *
  200. * Establish connection to a cluster. 'logical' is true if the
  201. * connection is logical, and false if the connection is physical.
  202. * 'appname' is a name associated to the connection, to use for example
  203. * with fallback_application_name or application_name. Returns the
  204. * details about the connection established, as defined by
  205. * WalReceiverConn for each WAL receiver module. On error, NULL is
  206. * returned with 'err' including the error generated.
  207. */
  208. typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo,
  209. bool logical,
  210. const char *appname,
  211. char **err);
  212. /*
  213. * walrcv_check_conninfo_fn
  214. *
  215. * Parse and validate the connection string given as of 'conninfo'.
  216. */
  217. typedef void (*walrcv_check_conninfo_fn) (const char *conninfo);
  218. /*
  219. * walrcv_get_conninfo_fn
  220. *
  221. * Returns a user-displayable conninfo string. Note that any
  222. * security-sensitive fields should be obfuscated.
  223. */
  224. typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn);
  225. /*
  226. * walrcv_get_senderinfo_fn
  227. *
  228. * Provide information of the WAL sender this WAL receiver is connected
  229. * to, as of 'sender_host' for the host of the sender and 'sender_port'
  230. * for its port.
  231. */
  232. typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn,
  233. char **sender_host,
  234. int *sender_port);
  235. /*
  236. * walrcv_identify_system_fn
  237. *
  238. * Run IDENTIFY_SYSTEM on the cluster connected to and validate the
  239. * identity of the cluster. Returns the system ID of the cluster
  240. * connected to. 'primary_tli' is the timeline ID of the sender.
  241. */
  242. typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
  243. TimeLineID *primary_tli);
  244. /*
  245. * walrcv_server_version_fn
  246. *
  247. * Returns the version number of the cluster connected to.
  248. */
  249. typedef int (*walrcv_server_version_fn) (WalReceiverConn *conn);
  250. /*
  251. * walrcv_readtimelinehistoryfile_fn
  252. *
  253. * Fetch from cluster the timeline history file for timeline 'tli'.
  254. * Returns the name of the timeline history file as of 'filename', its
  255. * contents as of 'content' and its 'size'.
  256. */
  257. typedef void (*walrcv_readtimelinehistoryfile_fn) (WalReceiverConn *conn,
  258. TimeLineID tli,
  259. char **filename,
  260. char **content,
  261. int *size);
  262. /*
  263. * walrcv_startstreaming_fn
  264. *
  265. * Start streaming WAL data from given streaming options. Returns true
  266. * if the connection has switched successfully to copy-both mode and false
  267. * if the server received the command and executed it successfully, but
  268. * didn't switch to copy-mode.
  269. */
  270. typedef bool (*walrcv_startstreaming_fn) (WalReceiverConn *conn,
  271. const WalRcvStreamOptions *options);
  272. /*
  273. * walrcv_endstreaming_fn
  274. *
  275. * Stop streaming of WAL data. Returns the next timeline ID of the cluster
  276. * connected to in 'next_tli', or 0 if there was no report.
  277. */
  278. typedef void (*walrcv_endstreaming_fn) (WalReceiverConn *conn,
  279. TimeLineID *next_tli);
  280. /*
  281. * walrcv_receive_fn
  282. *
  283. * Receive a message available from the WAL stream. 'buffer' is a pointer
  284. * to a buffer holding the message received. Returns the length of the data,
  285. * 0 if no data is available yet ('wait_fd' is a socket descriptor which can
  286. * be waited on before a retry), and -1 if the cluster ended the COPY.
  287. */
  288. typedef int (*walrcv_receive_fn) (WalReceiverConn *conn,
  289. char **buffer,
  290. pgsocket *wait_fd);
  291. /*
  292. * walrcv_send_fn
  293. *
  294. * Send a message of size 'nbytes' to the WAL stream with 'buffer' as
  295. * contents.
  296. */
  297. typedef void (*walrcv_send_fn) (WalReceiverConn *conn,
  298. const char *buffer,
  299. int nbytes);
  300. /*
  301. * walrcv_create_slot_fn
  302. *
  303. * Create a new replication slot named 'slotname'. 'temporary' defines
  304. * if the slot is temporary. 'snapshot_action' defines the behavior wanted
  305. * for an exported snapshot (see replication protocol for more details).
  306. * 'lsn' includes the LSN position at which the created slot became
  307. * consistent. Returns the name of the exported snapshot for a logical
  308. * slot, or NULL for a physical slot.
  309. */
  310. typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
  311. const char *slotname,
  312. bool temporary,
  313. bool two_phase,
  314. CRSSnapshotAction snapshot_action,
  315. XLogRecPtr *lsn);
  316. /*
  317. * walrcv_get_backend_pid_fn
  318. *
  319. * Returns the PID of the remote backend process.
  320. */
  321. typedef pid_t (*walrcv_get_backend_pid_fn) (WalReceiverConn *conn);
  322. /*
  323. * walrcv_exec_fn
  324. *
  325. * Send generic queries (and commands) to the remote cluster. 'nRetTypes'
  326. * is the expected number of returned attributes, and 'retTypes' an array
  327. * including their type OIDs. Returns the status of the execution and
  328. * tuples if any.
  329. */
  330. typedef WalRcvExecResult *(*walrcv_exec_fn) (WalReceiverConn *conn,
  331. const char *query,
  332. const int nRetTypes,
  333. const Oid *retTypes);
  334. /*
  335. * walrcv_disconnect_fn
  336. *
  337. * Disconnect with the cluster.
  338. */
  339. typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
  340. typedef struct WalReceiverFunctionsType
  341. {
  342. walrcv_connect_fn walrcv_connect;
  343. walrcv_check_conninfo_fn walrcv_check_conninfo;
  344. walrcv_get_conninfo_fn walrcv_get_conninfo;
  345. walrcv_get_senderinfo_fn walrcv_get_senderinfo;
  346. walrcv_identify_system_fn walrcv_identify_system;
  347. walrcv_server_version_fn walrcv_server_version;
  348. walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile;
  349. walrcv_startstreaming_fn walrcv_startstreaming;
  350. walrcv_endstreaming_fn walrcv_endstreaming;
  351. walrcv_receive_fn walrcv_receive;
  352. walrcv_send_fn walrcv_send;
  353. walrcv_create_slot_fn walrcv_create_slot;
  354. walrcv_get_backend_pid_fn walrcv_get_backend_pid;
  355. walrcv_exec_fn walrcv_exec;
  356. walrcv_disconnect_fn walrcv_disconnect;
  357. } WalReceiverFunctionsType;
  358. extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
  359. #define walrcv_connect(conninfo, logical, appname, err) \
  360. WalReceiverFunctions->walrcv_connect(conninfo, logical, appname, err)
  361. #define walrcv_check_conninfo(conninfo) \
  362. WalReceiverFunctions->walrcv_check_conninfo(conninfo)
  363. #define walrcv_get_conninfo(conn) \
  364. WalReceiverFunctions->walrcv_get_conninfo(conn)
  365. #define walrcv_get_senderinfo(conn, sender_host, sender_port) \
  366. WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port)
  367. #define walrcv_identify_system(conn, primary_tli) \
  368. WalReceiverFunctions->walrcv_identify_system(conn, primary_tli)
  369. #define walrcv_server_version(conn) \
  370. WalReceiverFunctions->walrcv_server_version(conn)
  371. #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
  372. WalReceiverFunctions->walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
  373. #define walrcv_startstreaming(conn, options) \
  374. WalReceiverFunctions->walrcv_startstreaming(conn, options)
  375. #define walrcv_endstreaming(conn, next_tli) \
  376. WalReceiverFunctions->walrcv_endstreaming(conn, next_tli)
  377. #define walrcv_receive(conn, buffer, wait_fd) \
  378. WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
  379. #define walrcv_send(conn, buffer, nbytes) \
  380. WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
  381. #define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \
  382. WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
  383. #define walrcv_get_backend_pid(conn) \
  384. WalReceiverFunctions->walrcv_get_backend_pid(conn)
  385. #define walrcv_exec(conn, exec, nRetTypes, retTypes) \
  386. WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes)
  387. #define walrcv_disconnect(conn) \
  388. WalReceiverFunctions->walrcv_disconnect(conn)
  389. static inline void
  390. walrcv_clear_result(WalRcvExecResult *walres)
  391. {
  392. if (!walres)
  393. return;
  394. if (walres->err)
  395. pfree(walres->err);
  396. if (walres->tuplestore)
  397. tuplestore_end(walres->tuplestore);
  398. if (walres->tupledesc)
  399. FreeTupleDesc(walres->tupledesc);
  400. pfree(walres);
  401. }
  402. /* prototypes for functions in walreceiver.c */
  403. extern void WalReceiverMain(void) pg_attribute_noreturn();
  404. extern void ProcessWalRcvInterrupts(void);
  405. /* prototypes for functions in walreceiverfuncs.c */
  406. extern Size WalRcvShmemSize(void);
  407. extern void WalRcvShmemInit(void);
  408. extern void ShutdownWalRcv(void);
  409. extern bool WalRcvStreaming(void);
  410. extern bool WalRcvRunning(void);
  411. extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
  412. const char *conninfo, const char *slotname,
  413. bool create_temp_slot);
  414. extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
  415. extern XLogRecPtr GetWalRcvWriteRecPtr(void);
  416. extern int GetReplicationApplyDelay(void);
  417. extern int GetReplicationTransferLatency(void);
  418. extern void WalRcvForceReply(void);
  419. #endif /* _WALRECEIVER_H */