tcp.js 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. const { sys, net } = just
  2. const { EPOLLIN, EPOLLERR, EPOLLHUP, EPOLLOUT } = just.loop
  3. const { IPPROTO_TCP, O_NONBLOCK, TCP_NODELAY, SO_KEEPALIVE, SOMAXCONN, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR, SO_REUSEPORT, SOCK_NONBLOCK, SO_ERROR } = net
  4. const { loop } = just.factory
  5. const readableMask = EPOLLIN | EPOLLERR | EPOLLHUP
  6. const readableWritableMask = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLOUT
  7. function createServer (host = '127.0.0.1', port = 3000) {
  8. const server = { host, port }
  9. const sockets = {}
  10. function closeSocket (sock) {
  11. const { fd } = sock
  12. sock.onClose && sock.onClose(sock)
  13. delete sockets[fd]
  14. loop.remove(fd)
  15. net.close(fd)
  16. }
  17. function onConnect (fd, event) {
  18. if (event & EPOLLERR || event & EPOLLHUP) {
  19. return closeSocket({ fd })
  20. }
  21. const clientfd = net.accept(fd)
  22. const socket = sockets[clientfd] = { fd: clientfd }
  23. net.setsockopt(clientfd, IPPROTO_TCP, TCP_NODELAY, 0)
  24. net.setsockopt(clientfd, SOL_SOCKET, SO_KEEPALIVE, 0)
  25. loop.add(clientfd, (fd, event) => {
  26. if (event & EPOLLERR || event & EPOLLHUP) {
  27. return closeSocket(socket)
  28. }
  29. const bytes = net.recv(fd, buffer, buffer.offset, buffer.byteLength - buffer.offset)
  30. if (bytes > 0) {
  31. socket.onData(bytes)
  32. return
  33. }
  34. if (bytes < 0) {
  35. const errno = sys.errno()
  36. if (errno === net.EAGAIN) return
  37. just.error(`recv error: ${sys.strerror(errno)} (${errno})`)
  38. }
  39. closeSocket(socket)
  40. })
  41. let flags = sys.fcntl(clientfd, sys.F_GETFL, 0)
  42. flags |= O_NONBLOCK
  43. sys.fcntl(clientfd, sys.F_SETFL, flags)
  44. loop.update(clientfd, readableMask)
  45. socket.write = (buf, len = buf.byteLength, off = 0) => {
  46. const written = net.send(clientfd, buf, len, off)
  47. if (written > 0) {
  48. return written
  49. }
  50. if (written < 0) {
  51. const errno = sys.errno()
  52. if (errno === net.EAGAIN) return written
  53. just.error(`write error (${clientfd}): ${sys.strerror(errno)} (${errno})`)
  54. }
  55. if (written === 0) {
  56. just.error(`zero write ${clientfd}`)
  57. }
  58. return written
  59. }
  60. socket.writeString = str => net.sendString(clientfd, str)
  61. socket.close = () => closeSocket(socket)
  62. const buffer = server.onConnect(socket)
  63. buffer.offset = 0
  64. }
  65. function listen (maxconn = SOMAXCONN) {
  66. const r = net.listen(sockfd, maxconn)
  67. if (r === 0) loop.add(sockfd, onConnect)
  68. return r
  69. }
  70. server.listen = listen
  71. const sockfd = net.socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)
  72. net.setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, 1)
  73. net.setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, 1)
  74. net.bind(sockfd, host, port)
  75. return server
  76. }
  77. function createClient (address = '127.0.0.1', port = 3000) {
  78. const sock = { address, port, connected: false }
  79. let fd
  80. function closeSocket () {
  81. sock.onClose && sock.onClose(sock)
  82. loop.remove(fd)
  83. net.close(fd)
  84. }
  85. function handleRead (fd, event) {
  86. const bytes = net.recv(fd, buffer, buffer.offset, buffer.byteLength - buffer.offset)
  87. if (bytes > 0) {
  88. sock.onData(bytes)
  89. return
  90. }
  91. if (bytes < 0) {
  92. const errno = sys.errno()
  93. if (errno === net.EAGAIN) return
  94. just.print(`recv error: ${sys.strerror(errno)} (${errno})`)
  95. }
  96. closeSocket(sock)
  97. }
  98. function handleError (fd, event) {
  99. const errno = net.getsockopt(fd, SOL_SOCKET, SO_ERROR)
  100. if (!sock.connected) {
  101. sock.onConnect(new Error(`${errno} : ${just.sys.strerror(errno)}`))
  102. }
  103. }
  104. function handleWrite (fd, event) {
  105. if (!sock.connected) {
  106. net.setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, 0)
  107. net.setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, 0)
  108. let flags = sys.fcntl(fd, sys.F_GETFL, 0)
  109. flags |= O_NONBLOCK
  110. sys.fcntl(fd, sys.F_SETFL, flags)
  111. loop.update(fd, readableMask)
  112. buffer = sock.onConnect(null, sock)
  113. buffer.offset = 0
  114. sock.connected = true
  115. }
  116. }
  117. function onSocketEvent (fd, event) {
  118. if (event & EPOLLERR || event & EPOLLHUP) {
  119. handleError(fd, event)
  120. closeSocket()
  121. return
  122. }
  123. if (event & EPOLLIN) {
  124. handleRead(fd, event)
  125. }
  126. if (event & EPOLLOUT) {
  127. handleWrite(fd, event)
  128. }
  129. }
  130. sock.write = (buf, len = buf.byteLength, off = 0) => {
  131. const written = net.send(fd, buf, len, off)
  132. if (written > 0) {
  133. return written
  134. }
  135. if (written < 0) {
  136. const errno = sys.errno()
  137. if (errno === net.EAGAIN) return written
  138. just.error(`write error (${fd}): ${sys.strerror(errno)} (${errno})`)
  139. }
  140. if (written === 0) {
  141. just.error(`zero write ${fd}`)
  142. }
  143. return written
  144. }
  145. sock.writeString = str => net.sendString(fd, str)
  146. sock.close = () => closeSocket(sock)
  147. function connect () {
  148. fd = net.socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)
  149. loop.add(fd, onSocketEvent, readableWritableMask)
  150. net.connect(fd, address, port)
  151. sock.fd = fd
  152. return sock
  153. }
  154. let buffer
  155. sock.connect = connect
  156. return sock
  157. }
  158. module.exports = { createServer, createClient }