123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472 |
- /*-------------------------------------------------------------------------
- *
- * walreceiver.h
- * Exports from replication/walreceiverfuncs.c.
- *
- * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
- *
- * src/include/replication/walreceiver.h
- *
- *-------------------------------------------------------------------------
- */
- #ifndef _WALRECEIVER_H
- #define _WALRECEIVER_H
- #include "access/xlog.h"
- #include "access/xlogdefs.h"
- #include "getaddrinfo.h" /* for NI_MAXHOST */
- #include "pgtime.h"
- #include "port/atomics.h"
- #include "replication/logicalproto.h"
- #include "replication/walsender.h"
- #include "storage/condition_variable.h"
- #include "storage/latch.h"
- #include "storage/spin.h"
- #include "utils/tuplestore.h"
- /* user-settable parameters */
- extern PGDLLIMPORT int wal_receiver_status_interval;
- extern PGDLLIMPORT int wal_receiver_timeout;
- extern PGDLLIMPORT bool hot_standby_feedback;
- /*
- * MAXCONNINFO: maximum size of a connection string.
- *
- * XXX: Should this move to pg_config_manual.h?
- */
- #define MAXCONNINFO 1024
- /* Can we allow the standby to accept replication connection from another standby? */
- #define AllowCascadeReplication() (EnableHotStandby && max_wal_senders > 0)
- /*
- * Values for WalRcv->walRcvState.
- */
- typedef enum
- {
- WALRCV_STOPPED, /* stopped and mustn't start up again */
- WALRCV_STARTING, /* launched, but the process hasn't
- * initialized yet */
- WALRCV_STREAMING, /* walreceiver is streaming */
- WALRCV_WAITING, /* stopped streaming, waiting for orders */
- WALRCV_RESTARTING, /* asked to restart streaming */
- WALRCV_STOPPING /* requested to stop, but still running */
- } WalRcvState;
- /* Shared memory area for management of walreceiver process */
- typedef struct
- {
- /*
- * PID of currently active walreceiver process, its current state and
- * start time (actually, the time at which it was requested to be
- * started).
- */
- pid_t pid;
- WalRcvState walRcvState;
- ConditionVariable walRcvStoppedCV;
- pg_time_t startTime;
- /*
- * receiveStart and receiveStartTLI indicate the first byte position and
- * timeline that will be received. When startup process starts the
- * walreceiver, it sets these to the point where it wants the streaming to
- * begin.
- */
- XLogRecPtr receiveStart;
- TimeLineID receiveStartTLI;
- /*
- * flushedUpto-1 is the last byte position that has already been received,
- * and receivedTLI is the timeline it came from. At the first startup of
- * walreceiver, these are set to receiveStart and receiveStartTLI. After
- * that, walreceiver updates these whenever it flushes the received WAL to
- * disk.
- */
- XLogRecPtr flushedUpto;
- TimeLineID receivedTLI;
- /*
- * latestChunkStart is the starting byte position of the current "batch"
- * of received WAL. It's actually the same as the previous value of
- * flushedUpto before the last flush to disk. Startup process can use
- * this to detect whether it's keeping up or not.
- */
- XLogRecPtr latestChunkStart;
- /*
- * Time of send and receive of any message received.
- */
- TimestampTz lastMsgSendTime;
- TimestampTz lastMsgReceiptTime;
- /*
- * Latest reported end of WAL on the sender
- */
- XLogRecPtr latestWalEnd;
- TimestampTz latestWalEndTime;
- /*
- * connection string; initially set to connect to the primary, and later
- * clobbered to hide security-sensitive fields.
- */
- char conninfo[MAXCONNINFO];
- /*
- * Host name (this can be a host name, an IP address, or a directory path)
- * and port number of the active replication connection.
- */
- char sender_host[NI_MAXHOST];
- int sender_port;
- /*
- * replication slot name; is also used for walreceiver to connect with the
- * primary
- */
- char slotname[NAMEDATALEN];
- /*
- * If it's a temporary replication slot, it needs to be recreated when
- * connecting.
- */
- bool is_temp_slot;
- /* set true once conninfo is ready to display (obfuscated pwds etc) */
- bool ready_to_display;
- /*
- * Latch used by startup process to wake up walreceiver after telling it
- * where to start streaming (after setting receiveStart and
- * receiveStartTLI), and also to tell it to send apply feedback to the
- * primary whenever specially marked commit records are applied. This is
- * normally mapped to procLatch when walreceiver is running.
- */
- Latch *latch;
- slock_t mutex; /* locks shared variables shown above */
- /*
- * Like flushedUpto, but advanced after writing and before flushing,
- * without the need to acquire the spin lock. Data can be read by another
- * process up to this point, but shouldn't be used for data integrity
- * purposes.
- */
- pg_atomic_uint64 writtenUpto;
- /*
- * force walreceiver reply? This doesn't need to be locked; memory
- * barriers for ordering are sufficient. But we do need atomic fetch and
- * store semantics, so use sig_atomic_t.
- */
- sig_atomic_t force_reply; /* used as a bool */
- } WalRcvData;
- extern PGDLLIMPORT WalRcvData *WalRcv;
- typedef struct
- {
- bool logical; /* True if this is logical replication stream,
- * false if physical stream. */
- char *slotname; /* Name of the replication slot or NULL. */
- XLogRecPtr startpoint; /* LSN of starting point. */
- union
- {
- struct
- {
- TimeLineID startpointTLI; /* Starting timeline */
- } physical;
- struct
- {
- uint32 proto_version; /* Logical protocol version */
- List *publication_names; /* String list of publications */
- bool binary; /* Ask publisher to use binary */
- bool streaming; /* Streaming of large transactions */
- bool twophase; /* Streaming of two-phase transactions at
- * prepare time */
- } logical;
- } proto;
- } WalRcvStreamOptions;
- struct WalReceiverConn;
- typedef struct WalReceiverConn WalReceiverConn;
- /*
- * Status of walreceiver query execution.
- *
- * We only define statuses that are currently used.
- */
- typedef enum
- {
- WALRCV_ERROR, /* There was error when executing the query. */
- WALRCV_OK_COMMAND, /* Query executed utility or replication
- * command. */
- WALRCV_OK_TUPLES, /* Query returned tuples. */
- WALRCV_OK_COPY_IN, /* Query started COPY FROM. */
- WALRCV_OK_COPY_OUT, /* Query started COPY TO. */
- WALRCV_OK_COPY_BOTH /* Query started COPY BOTH replication
- * protocol. */
- } WalRcvExecStatus;
- /*
- * Return value for walrcv_exec, returns the status of the execution and
- * tuples if any.
- */
- typedef struct WalRcvExecResult
- {
- WalRcvExecStatus status;
- int sqlstate;
- char *err;
- Tuplestorestate *tuplestore;
- TupleDesc tupledesc;
- } WalRcvExecResult;
- /* WAL receiver - libpqwalreceiver hooks */
- /*
- * walrcv_connect_fn
- *
- * Establish connection to a cluster. 'logical' is true if the
- * connection is logical, and false if the connection is physical.
- * 'appname' is a name associated to the connection, to use for example
- * with fallback_application_name or application_name. Returns the
- * details about the connection established, as defined by
- * WalReceiverConn for each WAL receiver module. On error, NULL is
- * returned with 'err' including the error generated.
- */
- typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo,
- bool logical,
- const char *appname,
- char **err);
- /*
- * walrcv_check_conninfo_fn
- *
- * Parse and validate the connection string given as of 'conninfo'.
- */
- typedef void (*walrcv_check_conninfo_fn) (const char *conninfo);
- /*
- * walrcv_get_conninfo_fn
- *
- * Returns a user-displayable conninfo string. Note that any
- * security-sensitive fields should be obfuscated.
- */
- typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn);
- /*
- * walrcv_get_senderinfo_fn
- *
- * Provide information of the WAL sender this WAL receiver is connected
- * to, as of 'sender_host' for the host of the sender and 'sender_port'
- * for its port.
- */
- typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn,
- char **sender_host,
- int *sender_port);
- /*
- * walrcv_identify_system_fn
- *
- * Run IDENTIFY_SYSTEM on the cluster connected to and validate the
- * identity of the cluster. Returns the system ID of the cluster
- * connected to. 'primary_tli' is the timeline ID of the sender.
- */
- typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
- TimeLineID *primary_tli);
- /*
- * walrcv_server_version_fn
- *
- * Returns the version number of the cluster connected to.
- */
- typedef int (*walrcv_server_version_fn) (WalReceiverConn *conn);
- /*
- * walrcv_readtimelinehistoryfile_fn
- *
- * Fetch from cluster the timeline history file for timeline 'tli'.
- * Returns the name of the timeline history file as of 'filename', its
- * contents as of 'content' and its 'size'.
- */
- typedef void (*walrcv_readtimelinehistoryfile_fn) (WalReceiverConn *conn,
- TimeLineID tli,
- char **filename,
- char **content,
- int *size);
- /*
- * walrcv_startstreaming_fn
- *
- * Start streaming WAL data from given streaming options. Returns true
- * if the connection has switched successfully to copy-both mode and false
- * if the server received the command and executed it successfully, but
- * didn't switch to copy-mode.
- */
- typedef bool (*walrcv_startstreaming_fn) (WalReceiverConn *conn,
- const WalRcvStreamOptions *options);
- /*
- * walrcv_endstreaming_fn
- *
- * Stop streaming of WAL data. Returns the next timeline ID of the cluster
- * connected to in 'next_tli', or 0 if there was no report.
- */
- typedef void (*walrcv_endstreaming_fn) (WalReceiverConn *conn,
- TimeLineID *next_tli);
- /*
- * walrcv_receive_fn
- *
- * Receive a message available from the WAL stream. 'buffer' is a pointer
- * to a buffer holding the message received. Returns the length of the data,
- * 0 if no data is available yet ('wait_fd' is a socket descriptor which can
- * be waited on before a retry), and -1 if the cluster ended the COPY.
- */
- typedef int (*walrcv_receive_fn) (WalReceiverConn *conn,
- char **buffer,
- pgsocket *wait_fd);
- /*
- * walrcv_send_fn
- *
- * Send a message of size 'nbytes' to the WAL stream with 'buffer' as
- * contents.
- */
- typedef void (*walrcv_send_fn) (WalReceiverConn *conn,
- const char *buffer,
- int nbytes);
- /*
- * walrcv_create_slot_fn
- *
- * Create a new replication slot named 'slotname'. 'temporary' defines
- * if the slot is temporary. 'snapshot_action' defines the behavior wanted
- * for an exported snapshot (see replication protocol for more details).
- * 'lsn' includes the LSN position at which the created slot became
- * consistent. Returns the name of the exported snapshot for a logical
- * slot, or NULL for a physical slot.
- */
- typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
- const char *slotname,
- bool temporary,
- bool two_phase,
- CRSSnapshotAction snapshot_action,
- XLogRecPtr *lsn);
- /*
- * walrcv_get_backend_pid_fn
- *
- * Returns the PID of the remote backend process.
- */
- typedef pid_t (*walrcv_get_backend_pid_fn) (WalReceiverConn *conn);
- /*
- * walrcv_exec_fn
- *
- * Send generic queries (and commands) to the remote cluster. 'nRetTypes'
- * is the expected number of returned attributes, and 'retTypes' an array
- * including their type OIDs. Returns the status of the execution and
- * tuples if any.
- */
- typedef WalRcvExecResult *(*walrcv_exec_fn) (WalReceiverConn *conn,
- const char *query,
- const int nRetTypes,
- const Oid *retTypes);
- /*
- * walrcv_disconnect_fn
- *
- * Disconnect with the cluster.
- */
- typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
- typedef struct WalReceiverFunctionsType
- {
- walrcv_connect_fn walrcv_connect;
- walrcv_check_conninfo_fn walrcv_check_conninfo;
- walrcv_get_conninfo_fn walrcv_get_conninfo;
- walrcv_get_senderinfo_fn walrcv_get_senderinfo;
- walrcv_identify_system_fn walrcv_identify_system;
- walrcv_server_version_fn walrcv_server_version;
- walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile;
- walrcv_startstreaming_fn walrcv_startstreaming;
- walrcv_endstreaming_fn walrcv_endstreaming;
- walrcv_receive_fn walrcv_receive;
- walrcv_send_fn walrcv_send;
- walrcv_create_slot_fn walrcv_create_slot;
- walrcv_get_backend_pid_fn walrcv_get_backend_pid;
- walrcv_exec_fn walrcv_exec;
- walrcv_disconnect_fn walrcv_disconnect;
- } WalReceiverFunctionsType;
- extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
- #define walrcv_connect(conninfo, logical, appname, err) \
- WalReceiverFunctions->walrcv_connect(conninfo, logical, appname, err)
- #define walrcv_check_conninfo(conninfo) \
- WalReceiverFunctions->walrcv_check_conninfo(conninfo)
- #define walrcv_get_conninfo(conn) \
- WalReceiverFunctions->walrcv_get_conninfo(conn)
- #define walrcv_get_senderinfo(conn, sender_host, sender_port) \
- WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port)
- #define walrcv_identify_system(conn, primary_tli) \
- WalReceiverFunctions->walrcv_identify_system(conn, primary_tli)
- #define walrcv_server_version(conn) \
- WalReceiverFunctions->walrcv_server_version(conn)
- #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
- WalReceiverFunctions->walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
- #define walrcv_startstreaming(conn, options) \
- WalReceiverFunctions->walrcv_startstreaming(conn, options)
- #define walrcv_endstreaming(conn, next_tli) \
- WalReceiverFunctions->walrcv_endstreaming(conn, next_tli)
- #define walrcv_receive(conn, buffer, wait_fd) \
- WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
- #define walrcv_send(conn, buffer, nbytes) \
- WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
- #define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \
- WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
- #define walrcv_get_backend_pid(conn) \
- WalReceiverFunctions->walrcv_get_backend_pid(conn)
- #define walrcv_exec(conn, exec, nRetTypes, retTypes) \
- WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes)
- #define walrcv_disconnect(conn) \
- WalReceiverFunctions->walrcv_disconnect(conn)
- static inline void
- walrcv_clear_result(WalRcvExecResult *walres)
- {
- if (!walres)
- return;
- if (walres->err)
- pfree(walres->err);
- if (walres->tuplestore)
- tuplestore_end(walres->tuplestore);
- if (walres->tupledesc)
- FreeTupleDesc(walres->tupledesc);
- pfree(walres);
- }
- /* prototypes for functions in walreceiver.c */
- extern void WalReceiverMain(void) pg_attribute_noreturn();
- extern void ProcessWalRcvInterrupts(void);
- /* prototypes for functions in walreceiverfuncs.c */
- extern Size WalRcvShmemSize(void);
- extern void WalRcvShmemInit(void);
- extern void ShutdownWalRcv(void);
- extern bool WalRcvStreaming(void);
- extern bool WalRcvRunning(void);
- extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
- const char *conninfo, const char *slotname,
- bool create_temp_slot);
- extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
- extern XLogRecPtr GetWalRcvWriteRecPtr(void);
- extern int GetReplicationApplyDelay(void);
- extern int GetReplicationTransferLatency(void);
- extern void WalRcvForceReply(void);
- #endif /* _WALRECEIVER_H */
|