util.js 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. const process = require('process')
  2. const postgres = require('@pg')
  3. const { constants } = postgres
  4. const { BinaryInt } = constants
  5. /**
  6. * Generate a Bulk Update SQL statement definition For a given table, identity
  7. * column and column to be updated, it will generate a single SQL
  8. * statement to update all fields in one statement
  9. *
  10. * @param {string} table - The name of the table
  11. * @param {string} field - The name of the field we want to update
  12. * @param {string} id - The name of the id field
  13. * @param {string} updates - The number of rows to update in the statement
  14. * @param {string} type - The name of the table
  15. */
  16. function generateBulkUpdate (table, field, id, updates = 5, formats = [BinaryInt]) {
  17. function getIds (count) {
  18. const updates = []
  19. for (let i = 1; i < (count * 2); i += 2) {
  20. updates.push(`$${i}`)
  21. }
  22. return updates.join(',')
  23. }
  24. function getClauses (count) {
  25. const clauses = []
  26. for (let i = 1; i < (count * 2); i += 2) {
  27. clauses.push(`when $${i} then $${i + 1}`)
  28. }
  29. return clauses.join('\n')
  30. }
  31. const sql = []
  32. sql.push(`update ${table} set ${field} = CASE ${id}`)
  33. sql.push(getClauses(updates))
  34. sql.push(`else ${field}`)
  35. sql.push(`end where ${id} in (${getIds(updates)})`)
  36. return {
  37. formats,
  38. fields: [],
  39. name: `bulk.${updates}`,
  40. portal: '',
  41. params: Array(updates * 2).fill(0),
  42. sql: sql.join('\n'),
  43. sync: true
  44. }
  45. }
  46. /**
  47. * Utility function to generate an array of N values populated with provided
  48. * map function. There seems to be no simpler/quicker way to do this in JS.
  49. * @param {string} n - Size of the array to create
  50. * @param {string} field - The map function which will create each array value
  51. */
  52. function sprayer (max = 100) {
  53. const ar = [0]
  54. for (let i = 0; i < max; i++) {
  55. ar[i + 1] = (new Array(i + 1)).fill(1)
  56. }
  57. max += 1
  58. return (n, fn) => ar[n % max].map(fn)
  59. }
  60. function sortByMessage (arr) {
  61. const n = arr.length
  62. for (let i = 1; i < n; i++) {
  63. const c = arr[i]
  64. let j = i - 1
  65. while ((j > -1) && (c.message < arr[j].message)) {
  66. arr[j + 1] = arr[j]
  67. j--
  68. }
  69. arr[j + 1] = c
  70. }
  71. return arr
  72. }
  73. function spawn (main) {
  74. if (just.env()['WORKER']) return main()
  75. const { watch, launch } = process
  76. const processes = []
  77. const cpus = parseInt(just.env().CPUS || just.sys.cpus, 10)
  78. for (let i = 0; i < cpus; i++) {
  79. just.sys.setenv('WORKER', i)
  80. //const proc = launch(just.args[0], ['--trace-gc', ...just.args.slice(1)])
  81. const proc = launch(just.args[0], just.args.slice(1))
  82. processes.push(proc)
  83. proc.stats = { user: 0, system: 0 }
  84. }
  85. return Promise.all(processes.map(p => watch(p)))
  86. }
  87. const updates = new Map()
  88. function getUpdateQuery (count, pg, formats = [BinaryInt]) {
  89. const query = updates.get(count)
  90. if (query) return query
  91. const promise = pg.compile(generateBulkUpdate('world', 'randomnumber', 'id', count, formats))
  92. updates.set(count, promise)
  93. return promise
  94. }
  95. class Clock {
  96. constructor () {
  97. this.slots = new Map()
  98. }
  99. unset (callback, repeat = 1000) {
  100. const current = this.slots.get(repeat)
  101. if (!current) return
  102. current.callbacks = current.callbacks.filter(cb => cb !== callback)
  103. if (!current.callbacks.length) {
  104. just.clearTimeout(current.timer)
  105. this.slots.delete(repeat)
  106. }
  107. }
  108. set (callback, repeat = 1000) {
  109. let current = this.slots.get(repeat)
  110. if (current) {
  111. current.callbacks.push(callback)
  112. return
  113. }
  114. current = {
  115. callbacks: [callback],
  116. timer: just.setInterval(() => current.callbacks.forEach(cb => cb()), repeat)
  117. }
  118. this.slots.set(repeat, current)
  119. }
  120. }
  121. module.exports = {
  122. sprayer,
  123. spawn,
  124. sortByMessage,
  125. generateBulkUpdate,
  126. getUpdateQuery,
  127. Clock
  128. }