2
0

slot.h 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. /*-------------------------------------------------------------------------
  2. * slot.h
  3. * Replication slot management.
  4. *
  5. * Copyright (c) 2012-2022, PostgreSQL Global Development Group
  6. *
  7. *-------------------------------------------------------------------------
  8. */
  9. #ifndef SLOT_H
  10. #define SLOT_H
  11. #include "access/xlog.h"
  12. #include "access/xlogreader.h"
  13. #include "storage/condition_variable.h"
  14. #include "storage/lwlock.h"
  15. #include "storage/shmem.h"
  16. #include "storage/spin.h"
  17. #include "replication/walreceiver.h"
  18. /*
  19. * Behaviour of replication slots, upon release or crash.
  20. *
  21. * Slots marked as PERSISTENT are crash-safe and will not be dropped when
  22. * released. Slots marked as EPHEMERAL will be dropped when released or after
  23. * restarts. Slots marked TEMPORARY will be dropped at the end of a session
  24. * or on error.
  25. *
  26. * EPHEMERAL is used as a not-quite-ready state when creating persistent
  27. * slots. EPHEMERAL slots can be made PERSISTENT by calling
  28. * ReplicationSlotPersist(). For a slot that goes away at the end of a
  29. * session, TEMPORARY is the appropriate choice.
  30. */
  31. typedef enum ReplicationSlotPersistency
  32. {
  33. RS_PERSISTENT,
  34. RS_EPHEMERAL,
  35. RS_TEMPORARY
  36. } ReplicationSlotPersistency;
  37. /*
  38. * On-Disk data of a replication slot, preserved across restarts.
  39. */
  40. typedef struct ReplicationSlotPersistentData
  41. {
  42. /* The slot's identifier */
  43. NameData name;
  44. /* database the slot is active on */
  45. Oid database;
  46. /*
  47. * The slot's behaviour when being dropped (or restored after a crash).
  48. */
  49. ReplicationSlotPersistency persistency;
  50. /*
  51. * xmin horizon for data
  52. *
  53. * NB: This may represent a value that hasn't been written to disk yet;
  54. * see notes for effective_xmin, below.
  55. */
  56. TransactionId xmin;
  57. /*
  58. * xmin horizon for catalog tuples
  59. *
  60. * NB: This may represent a value that hasn't been written to disk yet;
  61. * see notes for effective_xmin, below.
  62. */
  63. TransactionId catalog_xmin;
  64. /* oldest LSN that might be required by this replication slot */
  65. XLogRecPtr restart_lsn;
  66. /* restart_lsn is copied here when the slot is invalidated */
  67. XLogRecPtr invalidated_at;
  68. /*
  69. * Oldest LSN that the client has acked receipt for. This is used as the
  70. * start_lsn point in case the client doesn't specify one, and also as a
  71. * safety measure to jump forwards in case the client specifies a
  72. * start_lsn that's further in the past than this value.
  73. */
  74. XLogRecPtr confirmed_flush;
  75. /*
  76. * LSN at which we enabled two_phase commit for this slot or LSN at which
  77. * we found a consistent point at the time of slot creation.
  78. */
  79. XLogRecPtr two_phase_at;
  80. /*
  81. * Allow decoding of prepared transactions?
  82. */
  83. bool two_phase;
  84. /* plugin name */
  85. NameData plugin;
  86. } ReplicationSlotPersistentData;
  87. /*
  88. * Shared memory state of a single replication slot.
  89. *
  90. * The in-memory data of replication slots follows a locking model based
  91. * on two linked concepts:
  92. * - A replication slot's in_use flag is switched when added or discarded using
  93. * the LWLock ReplicationSlotControlLock, which needs to be hold in exclusive
  94. * mode when updating the flag by the backend owning the slot and doing the
  95. * operation, while readers (concurrent backends not owning the slot) need
  96. * to hold it in shared mode when looking at replication slot data.
  97. * - Individual fields are protected by mutex where only the backend owning
  98. * the slot is authorized to update the fields from its own slot. The
  99. * backend owning the slot does not need to take this lock when reading its
  100. * own fields, while concurrent backends not owning this slot should take the
  101. * lock when reading this slot's data.
  102. */
  103. typedef struct ReplicationSlot
  104. {
  105. /* lock, on same cacheline as effective_xmin */
  106. slock_t mutex;
  107. /* is this slot defined */
  108. bool in_use;
  109. /* Who is streaming out changes for this slot? 0 in unused slots. */
  110. pid_t active_pid;
  111. /* any outstanding modifications? */
  112. bool just_dirtied;
  113. bool dirty;
  114. /*
  115. * For logical decoding, it's extremely important that we never remove any
  116. * data that's still needed for decoding purposes, even after a crash;
  117. * otherwise, decoding will produce wrong answers. Ordinary streaming
  118. * replication also needs to prevent old row versions from being removed
  119. * too soon, but the worst consequence we might encounter there is
  120. * unwanted query cancellations on the standby. Thus, for logical
  121. * decoding, this value represents the latest xmin that has actually been
  122. * written to disk, whereas for streaming replication, it's just the same
  123. * as the persistent value (data.xmin).
  124. */
  125. TransactionId effective_xmin;
  126. TransactionId effective_catalog_xmin;
  127. /* data surviving shutdowns and crashes */
  128. ReplicationSlotPersistentData data;
  129. /* is somebody performing io on this slot? */
  130. LWLock io_in_progress_lock;
  131. /* Condition variable signaled when active_pid changes */
  132. ConditionVariable active_cv;
  133. /* all the remaining data is only used for logical slots */
  134. /*
  135. * When the client has confirmed flushes >= candidate_xmin_lsn we can
  136. * advance the catalog xmin. When restart_valid has been passed,
  137. * restart_lsn can be increased.
  138. */
  139. TransactionId candidate_catalog_xmin;
  140. XLogRecPtr candidate_xmin_lsn;
  141. XLogRecPtr candidate_restart_valid;
  142. XLogRecPtr candidate_restart_lsn;
  143. } ReplicationSlot;
  144. #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
  145. #define SlotIsLogical(slot) ((slot)->data.database != InvalidOid)
  146. /*
  147. * Shared memory control area for all of replication slots.
  148. */
  149. typedef struct ReplicationSlotCtlData
  150. {
  151. /*
  152. * This array should be declared [FLEXIBLE_ARRAY_MEMBER], but for some
  153. * reason you can't do that in an otherwise-empty struct.
  154. */
  155. ReplicationSlot replication_slots[1];
  156. } ReplicationSlotCtlData;
  157. /*
  158. * Pointers to shared memory
  159. */
  160. extern PGDLLIMPORT ReplicationSlotCtlData *ReplicationSlotCtl;
  161. extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
  162. /* GUCs */
  163. extern PGDLLIMPORT int max_replication_slots;
  164. /* shmem initialization functions */
  165. extern Size ReplicationSlotsShmemSize(void);
  166. extern void ReplicationSlotsShmemInit(void);
  167. /* management of individual slots */
  168. extern void ReplicationSlotCreate(const char *name, bool db_specific,
  169. ReplicationSlotPersistency p, bool two_phase);
  170. extern void ReplicationSlotPersist(void);
  171. extern void ReplicationSlotDrop(const char *name, bool nowait);
  172. extern void ReplicationSlotAcquire(const char *name, bool nowait);
  173. extern void ReplicationSlotRelease(void);
  174. extern void ReplicationSlotCleanup(void);
  175. extern void ReplicationSlotSave(void);
  176. extern void ReplicationSlotMarkDirty(void);
  177. /* misc stuff */
  178. extern void ReplicationSlotInitialize(void);
  179. extern bool ReplicationSlotValidateName(const char *name, int elevel);
  180. extern void ReplicationSlotReserveWal(void);
  181. extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
  182. extern void ReplicationSlotsComputeRequiredLSN(void);
  183. extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
  184. extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
  185. extern void ReplicationSlotsDropDBSlots(Oid dboid);
  186. extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
  187. extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
  188. extern int ReplicationSlotIndex(ReplicationSlot *slot);
  189. extern bool ReplicationSlotName(int index, Name name);
  190. extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot);
  191. extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
  192. extern void StartupReplicationSlots(void);
  193. extern void CheckPointReplicationSlots(void);
  194. extern void CheckSlotRequirements(void);
  195. extern void CheckSlotPermissions(void);
  196. #endif /* SLOT_H */