main.rkt 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. #lang racket/base
  2. (require racket/cmdline
  3. racket/match
  4. racket/place
  5. racket/tcp
  6. (prefix-in app: "app.rkt")
  7. "place-tcp-unit.rkt")
  8. (define (start-place)
  9. (place ch
  10. (define accept-ch (make-channel))
  11. (define tcp (make-place-tcp@ accept-ch))
  12. (define stop #f)
  13. (let loop ([pid #f])
  14. (match (sync ch)
  15. [`(stop)
  16. (stop)
  17. (eprintf "place ~a stopped~n" pid)]
  18. [`(init ,pid ,host ,port)
  19. (set! stop (app:start host port tcp))
  20. (eprintf "place ~a ready~n" pid)
  21. (loop pid)]
  22. [`(accept ,in ,out)
  23. (channel-put accept-ch (list in out))
  24. (loop pid)]))))
  25. (module+ main
  26. (define-values (host port parallelism)
  27. (let ([host "127.0.0.1"]
  28. [port 8000]
  29. [parallelism (processor-count)])
  30. (command-line
  31. #:once-each
  32. [("--host" "-H") HOST "the host to listen on" (set! host HOST)]
  33. [("--port" "-p") PORT "the port to bind to"
  34. (define port-num (string->number PORT))
  35. (unless (and port-num (>= port-num 0) (< port-num 65536))
  36. (eprintf "error: PORT must be a number between 0 and 65535, inclusive~n")
  37. (exit 1))
  38. (set! port port-num)]
  39. [("--parallelism" "-P") PARALLELISM "the number of parallel places to run"
  40. (define n-places (string->number PARALLELISM))
  41. (unless (and n-places (positive? n-places))
  42. (eprintf "error: PARALLELISM must be a positive number~n")
  43. (exit 1))
  44. (set! parallelism n-places)]
  45. #:args []
  46. (values host port parallelism))))
  47. (define places
  48. (for/list ([pid (in-range parallelism)])
  49. (define p (start-place))
  50. (begin0 p
  51. (place-channel-put p `(init ,pid ,host ,port)))))
  52. (define (stop-places)
  53. (for ([ch (in-list places)])
  54. (place-channel-put ch `(stop)))
  55. (for-each place-wait places))
  56. (define place-fail-evt
  57. (apply choice-evt (map place-dead-evt places)))
  58. (define backlog
  59. (* parallelism 65 1024))
  60. (define listener
  61. (tcp-listen port backlog #t host))
  62. (define stop-ch (make-channel))
  63. (define listener-thd
  64. (thread
  65. (lambda ()
  66. (define places* (list->vector places))
  67. (define num-places (vector-length places*))
  68. (define stop-evt (choice-evt stop-ch place-fail-evt))
  69. (let loop ([idx 0])
  70. (sync
  71. (handle-evt
  72. listener
  73. (lambda (_)
  74. (define-values (in out)
  75. (tcp-accept listener))
  76. (place-channel-put (vector-ref places* idx) `(accept ,in, out))
  77. (tcp-abandon-port out)
  78. (tcp-abandon-port in)
  79. (loop (modulo (add1 idx) num-places))))
  80. (handle-evt
  81. stop-evt
  82. (lambda (_)
  83. (stop-places)
  84. (tcp-close listener))))))))
  85. (define (stop)
  86. (channel-put stop-ch #t)
  87. (thread-wait listener-thd))
  88. (with-handlers ([exn:break?
  89. (lambda (_e)
  90. (stop))])
  91. (sync/enable-break never-evt listener-thd)))