tcp_server.rs 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. // You can run this example from the root of the mio repo:
  2. // cargo run --example tcp_server --features="os-poll net"
  3. use mio::event::Event;
  4. use mio::net::{TcpListener, TcpStream};
  5. use mio::{Events, Interest, Poll, Registry, Token};
  6. use std::collections::HashMap;
  7. use std::io::{self, Read, Write};
  8. use std::str::from_utf8;
  9. // Setup some tokens to allow us to identify which event is for which socket.
  10. const SERVER: Token = Token(0);
  11. // Some data we'll send over the connection.
  12. const DATA: &[u8] = b"Hello world!\n";
  13. #[cfg(not(target_os = "wasi"))]
  14. fn main() -> io::Result<()> {
  15. env_logger::init();
  16. // Create a poll instance.
  17. let mut poll = Poll::new()?;
  18. // Create storage for events.
  19. let mut events = Events::with_capacity(128);
  20. // Setup the TCP server socket.
  21. let addr = "127.0.0.1:9000".parse().unwrap();
  22. let mut server = TcpListener::bind(addr)?;
  23. // Register the server with poll we can receive events for it.
  24. poll.registry()
  25. .register(&mut server, SERVER, Interest::READABLE)?;
  26. // Map of `Token` -> `TcpStream`.
  27. let mut connections = HashMap::new();
  28. // Unique token for each incoming connection.
  29. let mut unique_token = Token(SERVER.0 + 1);
  30. println!("You can connect to the server using `nc`:");
  31. println!(" $ nc 127.0.0.1 9000");
  32. println!("You'll see our welcome message and anything you type will be printed here.");
  33. loop {
  34. poll.poll(&mut events, None)?;
  35. for event in events.iter() {
  36. match event.token() {
  37. SERVER => loop {
  38. // Received an event for the TCP server socket, which
  39. // indicates we can accept an connection.
  40. let (mut connection, address) = match server.accept() {
  41. Ok((connection, address)) => (connection, address),
  42. Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
  43. // If we get a `WouldBlock` error we know our
  44. // listener has no more incoming connections queued,
  45. // so we can return to polling and wait for some
  46. // more.
  47. break;
  48. }
  49. Err(e) => {
  50. // If it was any other kind of error, something went
  51. // wrong and we terminate with an error.
  52. return Err(e);
  53. }
  54. };
  55. println!("Accepted connection from: {}", address);
  56. let token = next(&mut unique_token);
  57. poll.registry().register(
  58. &mut connection,
  59. token,
  60. Interest::READABLE.add(Interest::WRITABLE),
  61. )?;
  62. connections.insert(token, connection);
  63. },
  64. token => {
  65. // Maybe received an event for a TCP connection.
  66. let done = if let Some(connection) = connections.get_mut(&token) {
  67. handle_connection_event(poll.registry(), connection, event)?
  68. } else {
  69. // Sporadic events happen, we can safely ignore them.
  70. false
  71. };
  72. if done {
  73. if let Some(mut connection) = connections.remove(&token) {
  74. poll.registry().deregister(&mut connection)?;
  75. }
  76. }
  77. }
  78. }
  79. }
  80. }
  81. }
  82. fn next(current: &mut Token) -> Token {
  83. let next = current.0;
  84. current.0 += 1;
  85. Token(next)
  86. }
  87. /// Returns `true` if the connection is done.
  88. fn handle_connection_event(
  89. registry: &Registry,
  90. connection: &mut TcpStream,
  91. event: &Event,
  92. ) -> io::Result<bool> {
  93. if event.is_writable() {
  94. // We can (maybe) write to the connection.
  95. match connection.write(DATA) {
  96. // We want to write the entire `DATA` buffer in a single go. If we
  97. // write less we'll return a short write error (same as
  98. // `io::Write::write_all` does).
  99. Ok(n) if n < DATA.len() => return Err(io::ErrorKind::WriteZero.into()),
  100. Ok(_) => {
  101. // After we've written something we'll reregister the connection
  102. // to only respond to readable events.
  103. registry.reregister(connection, event.token(), Interest::READABLE)?
  104. }
  105. // Would block "errors" are the OS's way of saying that the
  106. // connection is not actually ready to perform this I/O operation.
  107. Err(ref err) if would_block(err) => {}
  108. // Got interrupted (how rude!), we'll try again.
  109. Err(ref err) if interrupted(err) => {
  110. return handle_connection_event(registry, connection, event)
  111. }
  112. // Other errors we'll consider fatal.
  113. Err(err) => return Err(err),
  114. }
  115. }
  116. if event.is_readable() {
  117. let mut connection_closed = false;
  118. let mut received_data = vec![0; 4096];
  119. let mut bytes_read = 0;
  120. // We can (maybe) read from the connection.
  121. loop {
  122. match connection.read(&mut received_data[bytes_read..]) {
  123. Ok(0) => {
  124. // Reading 0 bytes means the other side has closed the
  125. // connection or is done writing, then so are we.
  126. connection_closed = true;
  127. break;
  128. }
  129. Ok(n) => {
  130. bytes_read += n;
  131. if bytes_read == received_data.len() {
  132. received_data.resize(received_data.len() + 1024, 0);
  133. }
  134. }
  135. // Would block "errors" are the OS's way of saying that the
  136. // connection is not actually ready to perform this I/O operation.
  137. Err(ref err) if would_block(err) => break,
  138. Err(ref err) if interrupted(err) => continue,
  139. // Other errors we'll consider fatal.
  140. Err(err) => return Err(err),
  141. }
  142. }
  143. if bytes_read != 0 {
  144. let received_data = &received_data[..bytes_read];
  145. if let Ok(str_buf) = from_utf8(received_data) {
  146. println!("Received data: {}", str_buf.trim_end());
  147. } else {
  148. println!("Received (none UTF-8) data: {:?}", received_data);
  149. }
  150. }
  151. if connection_closed {
  152. println!("Connection closed");
  153. return Ok(true);
  154. }
  155. }
  156. Ok(false)
  157. }
  158. fn would_block(err: &io::Error) -> bool {
  159. err.kind() == io::ErrorKind::WouldBlock
  160. }
  161. fn interrupted(err: &io::Error) -> bool {
  162. err.kind() == io::ErrorKind::Interrupted
  163. }
  164. #[cfg(target_os = "wasi")]
  165. fn main() {
  166. panic!("can't bind to an address with wasi")
  167. }