main.rs 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. /*
  2. * Rust signaling server example for libdatachannel
  3. * Copyright (c) 2020 Paul-Louis Ageneau
  4. *
  5. * This program is free software: you can redistribute it and/or modify it
  6. * under the terms of the GNU Affero General Public License as published
  7. * by the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU Affero General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU Affero General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. */
  18. extern crate tokio;
  19. extern crate tungstenite;
  20. extern crate futures_util;
  21. extern crate futures_channel;
  22. extern crate json;
  23. use std::env;
  24. use std::collections::HashMap;
  25. use std::sync::{Arc, Mutex};
  26. use tokio::net::{TcpListener, TcpStream};
  27. use tungstenite::protocol::Message;
  28. use tungstenite::handshake::server::{Request, Response};
  29. use futures_util::{future, pin_mut, StreamExt};
  30. use futures_util::stream::TryStreamExt;
  31. use futures_channel::mpsc;
  32. type Id = String;
  33. type Tx = mpsc::UnboundedSender<Message>;
  34. type ClientsMap = Arc<Mutex<HashMap<Id, Tx>>>;
  35. async fn handle(clients: ClientsMap, stream: TcpStream) {
  36. let mut client_id = Id::new();
  37. let callback = |req: &Request, response: Response| {
  38. let path: &str = req.uri().path();
  39. let tokens: Vec<&str> = path.split('/').collect();
  40. client_id = tokens[1].to_string();
  41. return Ok(response);
  42. };
  43. let websocket = tokio_tungstenite::accept_hdr_async(stream, callback)
  44. .await.expect("WebSocket handshake failed");
  45. println!("Client {} connected", &client_id);
  46. let (tx, rx) = mpsc::unbounded();
  47. clients.lock().unwrap().insert(client_id.clone(), tx);
  48. let (outgoing, incoming) = websocket.split();
  49. let forward = rx.map(Ok).forward(outgoing);
  50. let process = incoming.try_for_each(|msg| {
  51. if msg.is_text() {
  52. let text = msg.to_text().unwrap();
  53. println!("Client {} << {}", &client_id, &text);
  54. // Parse
  55. let mut content = json::parse(text).unwrap();
  56. let remote_id = content["id"].to_string();
  57. let mut locked = clients.lock().unwrap();
  58. match locked.get_mut(&remote_id) {
  59. Some(remote) => {
  60. // Format
  61. content.insert("id", client_id.clone()).unwrap();
  62. let text = json::stringify(content);
  63. // Send to remote
  64. println!("Client {} >> {}", &remote_id, &text);
  65. remote.unbounded_send(Message::text(text)).unwrap();
  66. },
  67. _ => println!("Client {} not found", &remote_id),
  68. }
  69. }
  70. future::ok(())
  71. });
  72. pin_mut!(process, forward);
  73. future::select(process, forward).await;
  74. println!("Client {} disconnected", &client_id);
  75. clients.lock().unwrap().remove(&client_id);
  76. }
  77. #[tokio::main]
  78. async fn main() -> Result<(), std::io::Error> {
  79. let service = env::args().nth(1).unwrap_or("8000".to_string());
  80. let endpoint = format!("127.0.0.1:{}", service);
  81. let mut listener = TcpListener::bind(endpoint)
  82. .await.expect("Listener binding failed");
  83. let clients = ClientsMap::new(Mutex::new(HashMap::new()));
  84. while let Ok((stream, _)) = listener.accept().await {
  85. tokio::spawn(handle(clients.clone(), stream));
  86. }
  87. return Ok(())
  88. }