Bladeren bron

just: Use processes instead of threads (#6047)

* Add new JavaScript framework: Just-JS

* fix bug in cached world test

* remove dependency on openssl for quicker build

* fix dns for full run

* make dns more robust

* update framework in config

* bump modules

* remove debug logging and use processes instead of threads

* insertion sort, process monitor
Andrew Johnston 5 jaren geleden
bovenliggende
commit
2351405737

+ 3 - 3
frameworks/JavaScript/just/just.dockerfile

@@ -14,11 +14,11 @@ RUN JUST_HOME=$(pwd) make -C modules/html/ html.so
 FROM debian:stretch-slim
 WORKDIR /app
 RUN mkdir -p /app/lib
-COPY lib/stringify.js lib/connection.js lib/dns.js lib/http.js lib/lookup.js lib/pg.js lib/stats.js lib/tcp.js lib/md5.js ./lib/
+COPY lib/stringify.js lib/connection.js lib/dns.js lib/http.js lib/lookup.js lib/pg.js lib/stats.js lib/tcp.js lib/md5.js lib/monitor.js ./lib/
 COPY techempower.js spawn.js ./
-COPY --from=builder /just-0.0.2/just ./
+COPY --from=builder /just-0.0.2/just /bin/just
 COPY --from=builder /just-0.0.2/modules/picohttp/http.so ./
 COPY --from=builder /just-0.0.2/modules/html/html.so ./
 ENV LD_LIBRARY_PATH=/app
 ENV PGPOOL=1
-CMD ["./just", "spawn.js", "techempower.js"]
+CMD ["just", "spawn.js", "techempower.js"]

+ 0 - 5
frameworks/JavaScript/just/lib/lookup.js

@@ -24,7 +24,6 @@ function readHosts () {
     return { ipv4, ipv6 }
   }
   const hosts = readFile(fileName)
-  just.error(`${fileName}:\n${hosts}`)
   const lines = hosts.split('\n').filter(line => line.trim())
   for (const line of lines) {
     if (line.match(rxComment)) continue
@@ -55,7 +54,6 @@ function readResolv () {
     return results
   }
   const resolv = readFile(fileName)
-  just.error(`${fileName}:\n${resolv}`)
   const lines = resolv.split('\n').filter(line => line.trim())
   for (const line of lines) {
     const match = line.match(rxName)
@@ -72,14 +70,12 @@ function readResolv () {
 function lookup (query = 'www.google.com', onRecord = () => {}, address = dnsServer, port = 53, buf = new ArrayBuffer(65536)) {
   const ip = lookupHosts(query)
   if (ip) {
-    just.error(`found ${ip} for ${query} in /etc/hosts`)
     onRecord(null, ip)
     return
   }
   const ips = readResolv()
   if (ips.length) {
     address = ips[0]
-    just.error(`dns server ${address} found in /etc/resolv.conf`)
   }
   const fd = net.socket(net.AF_INET, net.SOCK_DGRAM | net.SOCK_NONBLOCK, 0)
   net.bind(fd, address, port)
@@ -98,7 +94,6 @@ function lookup (query = 'www.google.com', onRecord = () => {}, address = dnsSer
     }
     const { ip } = message.answer[0]
     const result = `${ip[0]}.${ip[1]}.${ip[2]}.${ip[3]}`
-    just.error(`got ${result} for ${query} from ${address}`)
     loop.remove(fd)
     net.close(fd)
     onRecord(null, result)

+ 125 - 0
frameworks/JavaScript/just/lib/monitor.js

@@ -0,0 +1,125 @@
+const { fs, sys, net } = just
+
+function readStat (pid = sys.pid()) {
+  const buf = new ArrayBuffer(4096)
+  const path = `/proc/${pid}/stat`
+  const fd = fs.open(path)
+  net.seek(fd, 0, net.SEEK_SET)
+  let bytes = net.read(fd, buf)
+  const parts = []
+  while (bytes > 0) {
+    parts.push(buf.readString(bytes))
+    bytes = net.read(fd, buf)
+  }
+  const fields = parts.join('').split(' ')
+  const comm = fields[1]
+  const state = fields[2]
+  const [
+    ppid,
+    pgrp,
+    session,
+    ttyNr,
+    tpgid,
+    flags,
+    minflt,
+    cminflt,
+    majflt,
+    cmajflt,
+    utime,
+    stime,
+    cutime,
+    cstime,
+    priority,
+    nice,
+    numThreads,
+    itrealvalue,
+    starttime,
+    vsize,
+    rssPages,
+    rsslim,
+    startcode,
+    endcode,
+    startstack,
+    kstkesp,
+    kstkeip,
+    signal,
+    blocked,
+    sigignore,
+    sigcatch,
+    wchan,
+    nswap,
+    cnswap,
+    exitSignal,
+    processor,
+    rtPriority,
+    policy,
+    delayacctBlkioTicks,
+    guestTime,
+    cguestTime,
+    startData,
+    endData,
+    startBrk,
+    argStart,
+    argEnd,
+    envStart,
+    envEnd,
+    exitCode
+  ] = fields.slice(3).map(v => Number(v))
+  net.close(fd)
+  return {
+    pid,
+    comm,
+    state,
+    ppid,
+    pgrp,
+    session,
+    ttyNr,
+    tpgid,
+    flags,
+    minflt,
+    cminflt,
+    majflt,
+    cmajflt,
+    utime,
+    stime,
+    cutime,
+    cstime,
+    priority,
+    nice,
+    numThreads,
+    itrealvalue,
+    starttime,
+    vsize,
+    rssPages,
+    rsslim,
+    startcode,
+    endcode,
+    startstack,
+    kstkesp,
+    kstkeip,
+    signal,
+    blocked,
+    sigignore,
+    sigcatch,
+    wchan,
+    nswap,
+    cnswap,
+    exitSignal,
+    processor,
+    rtPriority,
+    policy,
+    delayacctBlkioTicks,
+    guestTime,
+    cguestTime,
+    startData,
+    endData,
+    startBrk,
+    argStart,
+    argEnd,
+    envStart,
+    envEnd,
+    exitCode
+  }
+}
+
+module.exports = { readStat }

+ 3 - 0
frameworks/JavaScript/just/lib/tcp.js

@@ -119,6 +119,9 @@ function createClient (address = '127.0.0.1', port = 3000) {
     if (!sock.connected) {
       net.setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, 0)
       net.setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, 0)
+      let flags = sys.fcntl(fd, sys.F_GETFL, 0)
+      flags |= O_NONBLOCK
+      sys.fcntl(fd, sys.F_SETFL, flags)
       loop.update(fd, readableMask)
       buffer = sock.onConnect(null, sock)
       buffer.offset = 0

+ 34 - 22
frameworks/JavaScript/just/spawn.js

@@ -1,26 +1,38 @@
-const { readFile } = require('fs')
-
-function spawn (source) {
-  const shared = new SharedArrayBuffer(4)
-  const u32 = new Uint32Array(shared)
-  const tid = just.thread.spawn(source, just.args.slice(1), shared)
-  const thread = { tid, u32 }
-  threads.push(thread)
-  return thread
+const { cwd, errno, strerror, spawn } = just.sys
+const path = cwd()
+const [...args] = just.args.slice(2)
+const { socketpair, AF_UNIX, SOCK_STREAM } = just.net
+function createPipe () {
+  const fds = []
+  const r = socketpair(AF_UNIX, SOCK_STREAM, fds)
+  if (r !== 0) throw new Error(`socketpair ${r} errno ${errno()} : ${strerror(errno())}`)
+  return fds
 }
 
-function onTimer () {
-  const { user, system } = just.cpuUsage()
-  const { rss } = just.memoryUsage()
-  let total = 0
-  for (const thread of threads) {
-    total += Atomics.load(thread.u32, 0)
-  }
-  just.error(`threads ${threads.length} total ${total} mem ${rss} cpu (${user.toFixed(2)}/${system.toFixed(2)}) ${(user + system).toFixed(2)} qps/core ${(total / (user + system)).toFixed(2)}`)
+const cpus = parseInt(just.env().CPUS || just.sys.cpus, 10)
+const pids = []
+for (let i = 0; i < cpus; i++) {
+  const stdin = createPipe()
+  const stdout = createPipe()
+  const stderr = createPipe()
+  const pid = spawn('just', path, args, stdin[1], stdout[1], stderr[1])
+  pids.push(pid)
 }
 
-const source = readFile(just.args[2] || 'test.js')
-const cpus = parseInt(just.env().CPUS || just.sys.cpus, 10)
-const threads = []
-for (let i = 0; i < cpus; i++) spawn(source)
-just.setInterval(onTimer, 1000)
+const { readStat } = require('lib/monitor.js')
+const last = { user: 0, system: 0 }
+just.setInterval(() => {
+  const stat = { user: 0, system: 0, rss: 0 }
+  for (const pid of pids) {
+    const { utime, stime, rssPages } = readStat(pid)
+    const rss = Math.floor((rssPages * just.sys.pageSize) / (1024 * 1024))
+    stat.rss += rss
+    stat.user += utime
+    stat.system += stime
+  }
+  const user = stat.user - last.user
+  const system = stat.system - last.system
+  last.user = stat.user
+  last.system = stat.system
+  just.print(`children ${pids.length} rss ${stat.rss} user ${user} system ${system} total ${user + system}`)
+}, 1000)

+ 28 - 44
frameworks/JavaScript/just/techempower.js

@@ -1,7 +1,6 @@
 const { connect, constants } = require('lib/connection.js')
 const { createServer } = require('lib/tcp.js')
 const { createParser } = require('lib/http.js')
-const { start } = require('lib/stats.js')
 const { sjs, attr } = require('lib/stringify.js')
 
 function compile (sock, query) {
@@ -151,7 +150,6 @@ end where id in ($1,$3,$5,$7,$9)
     params: [1]
   })
   clients.push(sock)
-  stats.clients = clients.length
   if (clients.length === poolSize) onPGReady()
 }
 
@@ -187,16 +185,23 @@ function getHTML (rows) {
   return html + FOOTER
 }
 
-function sortByMessage (a, b) {
-  if (a[1] > b[1]) return 1
-  if (a[1] < b[1]) return -1
-  return 0
+function insertionSort (arr) {
+  const n = arr.length
+  for (let i = 1; i < n; i++) {
+    const c = arr[i]
+    let j = i - 1
+    while ((j > -1) && (c[1] < arr[j][1])) {
+      arr[j + 1] = arr[j]
+      j--
+    }
+    arr[j + 1] = c
+  }
+  return arr
 }
 
 const cache = {}
 
 function onHTTPConnect (sock) {
-  stats.conn++
   const client = clients[sock.fd % clients.length]
   const rbuf = new ArrayBuffer(4096)
   const parser = createParser(rbuf)
@@ -214,22 +219,17 @@ function onHTTPConnect (sock) {
   let queries = 0
   let updates = 0
   function onUpdateMulti () {
-    stats.qps++
     const json = JSON.stringify(results)
     sock.writeString(`${rJSON}${json.length}${END}${json}`)
-    stats.rps++
   }
   function onUpdateSingle () {
-    stats.qps++
     updates++
     if (results.length === updates) {
       const json = JSON.stringify(results)
       sock.writeString(`${rJSON}${json.length}${END}${json}`)
-      stats.rps++
     }
   }
   function onUpdates () {
-    stats.qps++
     const [id, randomNumber] = getWorldById.getRows()[0]
     results.push({ id, randomNumber })
     if (results.length === queries) {
@@ -267,13 +267,11 @@ function onHTTPConnect (sock) {
     getWorldById.send()
   }
   function onMulti () {
-    stats.qps++
     const [id, randomNumber] = getWorldById.getRows()[0]
     results.push({ id, randomNumber })
     if (results.length === queries) {
       const json = JSON.stringify(results)
       sock.writeString(`${rJSON}${json.length}${END}${json}`)
-      stats.rps++
       queries = 0
     }
   }
@@ -290,7 +288,6 @@ function onHTTPConnect (sock) {
     getWorldById.send()
   }
   function onCached () {
-    stats.qps++
     const row = getCachedWorldById.getRows()[0]
     const [id, randomNumber] = row
     const world = { id, randomNumber }
@@ -299,7 +296,6 @@ function onHTTPConnect (sock) {
     if (results.length === queries) {
       const json = JSON.stringify(results)
       sock.writeString(`${rJSON}${json.length}${END}${json}`)
-      stats.rps++
       queries = 0
       results.length = 0
     }
@@ -328,7 +324,6 @@ function onHTTPConnect (sock) {
     if (results.length === queries) {
       const json = JSON.stringify(results)
       sock.writeString(`${rJSON}${json.length}${END}${json}`)
-      stats.rps++
       queries = 0
       results.length = 0
       return
@@ -336,17 +331,13 @@ function onHTTPConnect (sock) {
     getCachedWorldById.send()
   }
   function onFortunes () {
-    stats.qps++
-    const html = getHTML([extra, ...allFortunes.getRows()].sort(sortByMessage))
+    const html = getHTML(insertionSort([extra, ...allFortunes.getRows()]))
     sock.writeString(`${rHTML}${utf8Length(html)}${END}${html}`)
-    stats.rps++
   }
   function onSingle () {
-    stats.qps++
     const [id, randomNumber] = getWorldById.getRows()[0]
     const json = sDB({ id, randomNumber })
     sock.writeString(`${rJSON}${json.length}${END}${json}`)
-    stats.rps++
   }
   const queryPath = '/query'
   const updatePath = '/update'
@@ -357,19 +348,13 @@ function onHTTPConnect (sock) {
     '/json': () => {
       const json = sJSON(message)
       sock.writeString(`${rJSON}${json.length}${END}${json}`)
-      stats.rps++
-    },
-    '/fortunes': () => {
-      allFortunes.call(onFortunes)
     },
+    '/fortunes': () => allFortunes.call(onFortunes),
     '/db': () => {
       getWorldById.params[0] = Math.ceil(Math.random() * 10000)
       getWorldById.call(onSingle)
     },
-    '/plaintext': () => {
-      sock.writeString(`${rTEXT}${text.length}${END}${text}`)
-      stats.rps++
-    },
+    '/plaintext': () => sock.writeString(`${rTEXT}${text.length}${END}${text}`),
     default: url => {
       const [path, qs] = url.split(pathSep)
       if (path === queryPath) {
@@ -385,13 +370,11 @@ function onHTTPConnect (sock) {
         return
       }
       sock.writeString(r404)
-      stats.rps++
     }
   }
   parser.onRequests = count => {
     if (count > 1) {
       sock.writeString(`${rTEXT}${text.length}${END}${text}`.repeat(count))
-      stats.rps += count
       return
     }
     const url = parser.url(0)
@@ -400,7 +383,6 @@ function onHTTPConnect (sock) {
   }
   sock.onData = bytes => parser.parse(bytes)
   sock.onClose = () => {
-    stats.conn--
     parser.free()
   }
   return parser.buffer
@@ -408,7 +390,7 @@ function onHTTPConnect (sock) {
 
 function onPGReady () {
   microtasks = false
-  server.listen()
+  just.print(`listen: ${server.listen()}`)
 }
 
 const { utf8Length } = just.sys
@@ -424,24 +406,26 @@ const tfb = {
   pass: 'benchmarkdbpass',
   database: 'hello_world'
 }
-const stats = start()
 let i = poolSize
 const sJSON = sjs({ message: attr('string') })
 const sDB = sjs({ id: attr('number'), randomNumber: attr('number') })
 while (i--) connect(tfb, onPGConnect)
 const { loop } = just.factory
 let microtasks = true
-let rHTML = `HTTP/1.1 200 OK\r\nServer: j\r\nDate: ${stats.time}\r\nContent-Type: text/html; charset=UTF-8\r\nContent-Length: `
-let rTEXT = `HTTP/1.1 200 OK\r\nServer: j\r\nDate: ${stats.time}\r\nContent-Type: text/plain\r\nContent-Length: `
-let rJSON = `HTTP/1.1 200 OK\r\nServer: j\r\nDate: ${stats.time}\r\nContent-Type: application/json\r\nContent-Length: `
-let r404 = `HTTP/1.1 404 Not Found\r\nServer: j\r\nDate: ${stats.time}\r\nContent-Type: text/plain\r\nContent-Length: 0\r\n\r\n`
+
+let time = (new Date()).toUTCString()
+let rHTML = `HTTP/1.1 200 OK\r\nServer: j\r\nDate: ${time}\r\nContent-Type: text/html; charset=UTF-8\r\nContent-Length: `
+let rTEXT = `HTTP/1.1 200 OK\r\nServer: j\r\nDate: ${time}\r\nContent-Type: text/plain\r\nContent-Length: `
+let rJSON = `HTTP/1.1 200 OK\r\nServer: j\r\nDate: ${time}\r\nContent-Type: application/json\r\nContent-Length: `
+let r404 = `HTTP/1.1 404 Not Found\r\nServer: j\r\nDate: ${time}\r\nContent-Type: text/plain\r\nContent-Length: 0\r\n\r\n`
 just.setInterval(() => {
-  rHTML = `HTTP/1.1 200 OK\r\nServer: j\r\nDate: ${stats.time}\r\nContent-Type: text/html; charset=UTF-8\r\nContent-Length: `
-  rTEXT = `HTTP/1.1 200 OK\r\nServer: j\r\nDate: ${stats.time}\r\nContent-Type: text/plain\r\nContent-Length: `
-  rJSON = `HTTP/1.1 200 OK\r\nServer: j\r\nDate: ${stats.time}\r\nContent-Type: application/json\r\nContent-Length: `
-  r404 = `HTTP/1.1 404 Not Found\r\nServer: j\r\nDate: ${stats.time}\r\nContent-Type: text/plain\r\nContent-Length: 0\r\n\r\n`
+  time = (new Date()).toUTCString()
+  rHTML = `HTTP/1.1 200 OK\r\nServer: j\r\nDate: ${time}\r\nContent-Type: text/html; charset=UTF-8\r\nContent-Length: `
+  rTEXT = `HTTP/1.1 200 OK\r\nServer: j\r\nDate: ${time}\r\nContent-Type: text/plain\r\nContent-Length: `
+  rJSON = `HTTP/1.1 200 OK\r\nServer: j\r\nDate: ${time}\r\nContent-Type: application/json\r\nContent-Length: `
+  r404 = `HTTP/1.1 404 Not Found\r\nServer: j\r\nDate: ${time}\r\nContent-Type: text/plain\r\nContent-Length: 0\r\n\r\n`
 }, 100)
 while (1) {
-  loop.poll(-1)
+  if (loop.poll(0) === 0) loop.poll(-1)
   if (microtasks) just.sys.runMicroTasks()
 }