worker_internal.h 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. /*-------------------------------------------------------------------------
  2. *
  3. * worker_internal.h
  4. * Internal headers shared by logical replication workers.
  5. *
  6. * Portions Copyright (c) 2016-2022, PostgreSQL Global Development Group
  7. *
  8. * src/include/replication/worker_internal.h
  9. *
  10. *-------------------------------------------------------------------------
  11. */
  12. #ifndef WORKER_INTERNAL_H
  13. #define WORKER_INTERNAL_H
  14. #include <signal.h>
  15. #include "access/xlogdefs.h"
  16. #include "catalog/pg_subscription.h"
  17. #include "datatype/timestamp.h"
  18. #include "storage/fileset.h"
  19. #include "storage/lock.h"
  20. #include "storage/spin.h"
  21. typedef struct LogicalRepWorker
  22. {
  23. /* Time at which this worker was launched. */
  24. TimestampTz launch_time;
  25. /* Indicates if this slot is used or free. */
  26. bool in_use;
  27. /* Increased every time the slot is taken by new worker. */
  28. uint16 generation;
  29. /* Pointer to proc array. NULL if not running. */
  30. PGPROC *proc;
  31. /* Database id to connect to. */
  32. Oid dbid;
  33. /* User to use for connection (will be same as owner of subscription). */
  34. Oid userid;
  35. /* Subscription id for the worker. */
  36. Oid subid;
  37. /* Used for initial table synchronization. */
  38. Oid relid;
  39. char relstate;
  40. XLogRecPtr relstate_lsn;
  41. slock_t relmutex;
  42. /*
  43. * Used to create the changes and subxact files for the streaming
  44. * transactions. Upon the arrival of the first streaming transaction, the
  45. * fileset will be initialized, and it will be deleted when the worker
  46. * exits. Under this, separate buffiles would be created for each
  47. * transaction which will be deleted after the transaction is finished.
  48. */
  49. FileSet *stream_fileset;
  50. /* Stats. */
  51. XLogRecPtr last_lsn;
  52. TimestampTz last_send_time;
  53. TimestampTz last_recv_time;
  54. XLogRecPtr reply_lsn;
  55. TimestampTz reply_time;
  56. } LogicalRepWorker;
  57. /* Main memory context for apply worker. Permanent during worker lifetime. */
  58. extern PGDLLIMPORT MemoryContext ApplyContext;
  59. /* libpqreceiver connection */
  60. extern PGDLLIMPORT struct WalReceiverConn *LogRepWorkerWalRcvConn;
  61. /* Worker and subscription objects. */
  62. extern PGDLLIMPORT Subscription *MySubscription;
  63. extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
  64. extern PGDLLIMPORT bool in_remote_transaction;
  65. extern void logicalrep_worker_attach(int slot);
  66. extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
  67. bool only_running);
  68. extern List *logicalrep_workers_find(Oid subid, bool only_running);
  69. extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
  70. Oid userid, Oid relid);
  71. extern void logicalrep_worker_stop(Oid subid, Oid relid);
  72. extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
  73. extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
  74. extern int logicalrep_sync_worker_count(Oid subid);
  75. extern void ReplicationOriginNameForTablesync(Oid suboid, Oid relid,
  76. char *originname, int szorgname);
  77. extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
  78. extern bool AllTablesyncsReady(void);
  79. extern void UpdateTwoPhaseState(Oid suboid, char new_state);
  80. extern void process_syncing_tables(XLogRecPtr current_lsn);
  81. extern void invalidate_syncing_table_states(Datum arg, int cacheid,
  82. uint32 hashvalue);
  83. static inline bool
  84. am_tablesync_worker(void)
  85. {
  86. return OidIsValid(MyLogicalRepWorker->relid);
  87. }
  88. #endif /* WORKER_INTERNAL_H */