hashjoin.h 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. /*-------------------------------------------------------------------------
  2. *
  3. * hashjoin.h
  4. * internal structures for hash joins
  5. *
  6. *
  7. * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
  8. * Portions Copyright (c) 1994, Regents of the University of California
  9. *
  10. * src/include/executor/hashjoin.h
  11. *
  12. *-------------------------------------------------------------------------
  13. */
  14. #ifndef HASHJOIN_H
  15. #define HASHJOIN_H
  16. #include "nodes/execnodes.h"
  17. #include "port/atomics.h"
  18. #include "storage/barrier.h"
  19. #include "storage/buffile.h"
  20. #include "storage/lwlock.h"
  21. /* ----------------------------------------------------------------
  22. * hash-join hash table structures
  23. *
  24. * Each active hashjoin has a HashJoinTable control block, which is
  25. * palloc'd in the executor's per-query context. All other storage needed
  26. * for the hashjoin is kept in private memory contexts, two for each hashjoin.
  27. * This makes it easy and fast to release the storage when we don't need it
  28. * anymore. (Exception: data associated with the temp files lives in the
  29. * per-query context too, since we always call buffile.c in that context.)
  30. *
  31. * The hashtable contexts are made children of the per-query context, ensuring
  32. * that they will be discarded at end of statement even if the join is
  33. * aborted early by an error. (Likewise, any temporary files we make will
  34. * be cleaned up by the virtual file manager in event of an error.)
  35. *
  36. * Storage that should live through the entire join is allocated from the
  37. * "hashCxt", while storage that is only wanted for the current batch is
  38. * allocated in the "batchCxt". By resetting the batchCxt at the end of
  39. * each batch, we free all the per-batch storage reliably and without tedium.
  40. *
  41. * During first scan of inner relation, we get its tuples from executor.
  42. * If nbatch > 1 then tuples that don't belong in first batch get saved
  43. * into inner-batch temp files. The same statements apply for the
  44. * first scan of the outer relation, except we write tuples to outer-batch
  45. * temp files. After finishing the first scan, we do the following for
  46. * each remaining batch:
  47. * 1. Read tuples from inner batch file, load into hash buckets.
  48. * 2. Read tuples from outer batch file, match to hash buckets and output.
  49. *
  50. * It is possible to increase nbatch on the fly if the in-memory hash table
  51. * gets too big. The hash-value-to-batch computation is arranged so that this
  52. * can only cause a tuple to go into a later batch than previously thought,
  53. * never into an earlier batch. When we increase nbatch, we rescan the hash
  54. * table and dump out any tuples that are now of a later batch to the correct
  55. * inner batch file. Subsequently, while reading either inner or outer batch
  56. * files, we might find tuples that no longer belong to the current batch;
  57. * if so, we just dump them out to the correct batch file.
  58. * ----------------------------------------------------------------
  59. */
  60. /* these are in nodes/execnodes.h: */
  61. /* typedef struct HashJoinTupleData *HashJoinTuple; */
  62. /* typedef struct HashJoinTableData *HashJoinTable; */
  63. typedef struct HashJoinTupleData
  64. {
  65. /* link to next tuple in same bucket */
  66. union
  67. {
  68. struct HashJoinTupleData *unshared;
  69. dsa_pointer shared;
  70. } next;
  71. uint32 hashvalue; /* tuple's hash code */
  72. /* Tuple data, in MinimalTuple format, follows on a MAXALIGN boundary */
  73. } HashJoinTupleData;
  74. #define HJTUPLE_OVERHEAD MAXALIGN(sizeof(HashJoinTupleData))
  75. #define HJTUPLE_MINTUPLE(hjtup) \
  76. ((MinimalTuple) ((char *) (hjtup) + HJTUPLE_OVERHEAD))
  77. /*
  78. * If the outer relation's distribution is sufficiently nonuniform, we attempt
  79. * to optimize the join by treating the hash values corresponding to the outer
  80. * relation's MCVs specially. Inner relation tuples matching these hash
  81. * values go into the "skew" hashtable instead of the main hashtable, and
  82. * outer relation tuples with these hash values are matched against that
  83. * table instead of the main one. Thus, tuples with these hash values are
  84. * effectively handled as part of the first batch and will never go to disk.
  85. * The skew hashtable is limited to SKEW_HASH_MEM_PERCENT of the total memory
  86. * allowed for the join; while building the hashtables, we decrease the number
  87. * of MCVs being specially treated if needed to stay under this limit.
  88. *
  89. * Note: you might wonder why we look at the outer relation stats for this,
  90. * rather than the inner. One reason is that the outer relation is typically
  91. * bigger, so we get more I/O savings by optimizing for its most common values.
  92. * Also, for similarly-sized relations, the planner prefers to put the more
  93. * uniformly distributed relation on the inside, so we're more likely to find
  94. * interesting skew in the outer relation.
  95. */
  96. typedef struct HashSkewBucket
  97. {
  98. uint32 hashvalue; /* common hash value */
  99. HashJoinTuple tuples; /* linked list of inner-relation tuples */
  100. } HashSkewBucket;
  101. #define SKEW_BUCKET_OVERHEAD MAXALIGN(sizeof(HashSkewBucket))
  102. #define INVALID_SKEW_BUCKET_NO (-1)
  103. #define SKEW_HASH_MEM_PERCENT 2
  104. #define SKEW_MIN_OUTER_FRACTION 0.01
  105. /*
  106. * To reduce palloc overhead, the HashJoinTuples for the current batch are
  107. * packed in 32kB buffers instead of pallocing each tuple individually.
  108. */
  109. typedef struct HashMemoryChunkData
  110. {
  111. int ntuples; /* number of tuples stored in this chunk */
  112. size_t maxlen; /* size of the chunk's tuple buffer */
  113. size_t used; /* number of buffer bytes already used */
  114. /* pointer to the next chunk (linked list) */
  115. union
  116. {
  117. struct HashMemoryChunkData *unshared;
  118. dsa_pointer shared;
  119. } next;
  120. /*
  121. * The chunk's tuple buffer starts after the HashMemoryChunkData struct,
  122. * at offset HASH_CHUNK_HEADER_SIZE (which must be maxaligned). Note that
  123. * that offset is not included in "maxlen" or "used".
  124. */
  125. } HashMemoryChunkData;
  126. typedef struct HashMemoryChunkData *HashMemoryChunk;
  127. #define HASH_CHUNK_SIZE (32 * 1024L)
  128. #define HASH_CHUNK_HEADER_SIZE MAXALIGN(sizeof(HashMemoryChunkData))
  129. #define HASH_CHUNK_DATA(hc) (((char *) (hc)) + HASH_CHUNK_HEADER_SIZE)
  130. /* tuples exceeding HASH_CHUNK_THRESHOLD bytes are put in their own chunk */
  131. #define HASH_CHUNK_THRESHOLD (HASH_CHUNK_SIZE / 4)
  132. /*
  133. * For each batch of a Parallel Hash Join, we have a ParallelHashJoinBatch
  134. * object in shared memory to coordinate access to it. Since they are
  135. * followed by variable-sized objects, they are arranged in contiguous memory
  136. * but not accessed directly as an array.
  137. */
  138. typedef struct ParallelHashJoinBatch
  139. {
  140. dsa_pointer buckets; /* array of hash table buckets */
  141. Barrier batch_barrier; /* synchronization for joining this batch */
  142. dsa_pointer chunks; /* chunks of tuples loaded */
  143. size_t size; /* size of buckets + chunks in memory */
  144. size_t estimated_size; /* size of buckets + chunks while writing */
  145. size_t ntuples; /* number of tuples loaded */
  146. size_t old_ntuples; /* number of tuples before repartitioning */
  147. bool space_exhausted;
  148. /*
  149. * Variable-sized SharedTuplestore objects follow this struct in memory.
  150. * See the accessor macros below.
  151. */
  152. } ParallelHashJoinBatch;
  153. /* Accessor for inner batch tuplestore following a ParallelHashJoinBatch. */
  154. #define ParallelHashJoinBatchInner(batch) \
  155. ((SharedTuplestore *) \
  156. ((char *) (batch) + MAXALIGN(sizeof(ParallelHashJoinBatch))))
  157. /* Accessor for outer batch tuplestore following a ParallelHashJoinBatch. */
  158. #define ParallelHashJoinBatchOuter(batch, nparticipants) \
  159. ((SharedTuplestore *) \
  160. ((char *) ParallelHashJoinBatchInner(batch) + \
  161. MAXALIGN(sts_estimate(nparticipants))))
  162. /* Total size of a ParallelHashJoinBatch and tuplestores. */
  163. #define EstimateParallelHashJoinBatch(hashtable) \
  164. (MAXALIGN(sizeof(ParallelHashJoinBatch)) + \
  165. MAXALIGN(sts_estimate((hashtable)->parallel_state->nparticipants)) * 2)
  166. /* Accessor for the nth ParallelHashJoinBatch given the base. */
  167. #define NthParallelHashJoinBatch(base, n) \
  168. ((ParallelHashJoinBatch *) \
  169. ((char *) (base) + \
  170. EstimateParallelHashJoinBatch(hashtable) * (n)))
  171. /*
  172. * Each backend requires a small amount of per-batch state to interact with
  173. * each ParallelHashJoinBatch.
  174. */
  175. typedef struct ParallelHashJoinBatchAccessor
  176. {
  177. ParallelHashJoinBatch *shared; /* pointer to shared state */
  178. /* Per-backend partial counters to reduce contention. */
  179. size_t preallocated; /* pre-allocated space for this backend */
  180. size_t ntuples; /* number of tuples */
  181. size_t size; /* size of partition in memory */
  182. size_t estimated_size; /* size of partition on disk */
  183. size_t old_ntuples; /* how many tuples before repartitioning? */
  184. bool at_least_one_chunk; /* has this backend allocated a chunk? */
  185. bool done; /* flag to remember that a batch is done */
  186. SharedTuplestoreAccessor *inner_tuples;
  187. SharedTuplestoreAccessor *outer_tuples;
  188. } ParallelHashJoinBatchAccessor;
  189. /*
  190. * While hashing the inner relation, any participant might determine that it's
  191. * time to increase the number of buckets to reduce the load factor or batches
  192. * to reduce the memory size. This is indicated by setting the growth flag to
  193. * these values.
  194. */
  195. typedef enum ParallelHashGrowth
  196. {
  197. /* The current dimensions are sufficient. */
  198. PHJ_GROWTH_OK,
  199. /* The load factor is too high, so we need to add buckets. */
  200. PHJ_GROWTH_NEED_MORE_BUCKETS,
  201. /* The memory budget would be exhausted, so we need to repartition. */
  202. PHJ_GROWTH_NEED_MORE_BATCHES,
  203. /* Repartitioning didn't help last time, so don't try to do that again. */
  204. PHJ_GROWTH_DISABLED
  205. } ParallelHashGrowth;
  206. /*
  207. * The shared state used to coordinate a Parallel Hash Join. This is stored
  208. * in the DSM segment.
  209. */
  210. typedef struct ParallelHashJoinState
  211. {
  212. dsa_pointer batches; /* array of ParallelHashJoinBatch */
  213. dsa_pointer old_batches; /* previous generation during repartition */
  214. int nbatch; /* number of batches now */
  215. int old_nbatch; /* previous number of batches */
  216. int nbuckets; /* number of buckets */
  217. ParallelHashGrowth growth; /* control batch/bucket growth */
  218. dsa_pointer chunk_work_queue; /* chunk work queue */
  219. int nparticipants;
  220. size_t space_allowed;
  221. size_t total_tuples; /* total number of inner tuples */
  222. LWLock lock; /* lock protecting the above */
  223. Barrier build_barrier; /* synchronization for the build phases */
  224. Barrier grow_batches_barrier;
  225. Barrier grow_buckets_barrier;
  226. pg_atomic_uint32 distributor; /* counter for load balancing */
  227. SharedFileSet fileset; /* space for shared temporary files */
  228. } ParallelHashJoinState;
  229. /* The phases for building batches, used by build_barrier. */
  230. #define PHJ_BUILD_ELECTING 0
  231. #define PHJ_BUILD_ALLOCATING 1
  232. #define PHJ_BUILD_HASHING_INNER 2
  233. #define PHJ_BUILD_HASHING_OUTER 3
  234. #define PHJ_BUILD_RUNNING 4
  235. #define PHJ_BUILD_DONE 5
  236. /* The phases for probing each batch, used by for batch_barrier. */
  237. #define PHJ_BATCH_ELECTING 0
  238. #define PHJ_BATCH_ALLOCATING 1
  239. #define PHJ_BATCH_LOADING 2
  240. #define PHJ_BATCH_PROBING 3
  241. #define PHJ_BATCH_DONE 4
  242. /* The phases of batch growth while hashing, for grow_batches_barrier. */
  243. #define PHJ_GROW_BATCHES_ELECTING 0
  244. #define PHJ_GROW_BATCHES_ALLOCATING 1
  245. #define PHJ_GROW_BATCHES_REPARTITIONING 2
  246. #define PHJ_GROW_BATCHES_DECIDING 3
  247. #define PHJ_GROW_BATCHES_FINISHING 4
  248. #define PHJ_GROW_BATCHES_PHASE(n) ((n) % 5) /* circular phases */
  249. /* The phases of bucket growth while hashing, for grow_buckets_barrier. */
  250. #define PHJ_GROW_BUCKETS_ELECTING 0
  251. #define PHJ_GROW_BUCKETS_ALLOCATING 1
  252. #define PHJ_GROW_BUCKETS_REINSERTING 2
  253. #define PHJ_GROW_BUCKETS_PHASE(n) ((n) % 3) /* circular phases */
  254. typedef struct HashJoinTableData
  255. {
  256. int nbuckets; /* # buckets in the in-memory hash table */
  257. int log2_nbuckets; /* its log2 (nbuckets must be a power of 2) */
  258. int nbuckets_original; /* # buckets when starting the first hash */
  259. int nbuckets_optimal; /* optimal # buckets (per batch) */
  260. int log2_nbuckets_optimal; /* log2(nbuckets_optimal) */
  261. /* buckets[i] is head of list of tuples in i'th in-memory bucket */
  262. union
  263. {
  264. /* unshared array is per-batch storage, as are all the tuples */
  265. struct HashJoinTupleData **unshared;
  266. /* shared array is per-query DSA area, as are all the tuples */
  267. dsa_pointer_atomic *shared;
  268. } buckets;
  269. bool keepNulls; /* true to store unmatchable NULL tuples */
  270. bool skewEnabled; /* are we using skew optimization? */
  271. HashSkewBucket **skewBucket; /* hashtable of skew buckets */
  272. int skewBucketLen; /* size of skewBucket array (a power of 2!) */
  273. int nSkewBuckets; /* number of active skew buckets */
  274. int *skewBucketNums; /* array indexes of active skew buckets */
  275. int nbatch; /* number of batches */
  276. int curbatch; /* current batch #; 0 during 1st pass */
  277. int nbatch_original; /* nbatch when we started inner scan */
  278. int nbatch_outstart; /* nbatch when we started outer scan */
  279. bool growEnabled; /* flag to shut off nbatch increases */
  280. double totalTuples; /* # tuples obtained from inner plan */
  281. double partialTuples; /* # tuples obtained from inner plan by me */
  282. double skewTuples; /* # tuples inserted into skew tuples */
  283. /*
  284. * These arrays are allocated for the life of the hash join, but only if
  285. * nbatch > 1. A file is opened only when we first write a tuple into it
  286. * (otherwise its pointer remains NULL). Note that the zero'th array
  287. * elements never get used, since we will process rather than dump out any
  288. * tuples of batch zero.
  289. */
  290. BufFile **innerBatchFile; /* buffered virtual temp file per batch */
  291. BufFile **outerBatchFile; /* buffered virtual temp file per batch */
  292. /*
  293. * Info about the datatype-specific hash functions for the datatypes being
  294. * hashed. These are arrays of the same length as the number of hash join
  295. * clauses (hash keys).
  296. */
  297. FmgrInfo *outer_hashfunctions; /* lookup data for hash functions */
  298. FmgrInfo *inner_hashfunctions; /* lookup data for hash functions */
  299. bool *hashStrict; /* is each hash join operator strict? */
  300. Oid *collations;
  301. Size spaceUsed; /* memory space currently used by tuples */
  302. Size spaceAllowed; /* upper limit for space used */
  303. Size spacePeak; /* peak space used */
  304. Size spaceUsedSkew; /* skew hash table's current space usage */
  305. Size spaceAllowedSkew; /* upper limit for skew hashtable */
  306. MemoryContext hashCxt; /* context for whole-hash-join storage */
  307. MemoryContext batchCxt; /* context for this-batch-only storage */
  308. /* used for dense allocation of tuples (into linked chunks) */
  309. HashMemoryChunk chunks; /* one list for the whole batch */
  310. /* Shared and private state for Parallel Hash. */
  311. HashMemoryChunk current_chunk; /* this backend's current chunk */
  312. dsa_area *area; /* DSA area to allocate memory from */
  313. ParallelHashJoinState *parallel_state;
  314. ParallelHashJoinBatchAccessor *batches;
  315. dsa_pointer current_chunk_shared;
  316. } HashJoinTableData;
  317. #endif /* HASHJOIN_H */