123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- /*
- * Rust signaling server example for libdatachannel
- * Copyright (c) 2020 Paul-Louis Ageneau
- *
- * This program is free software: you can redistribute it and/or modify it
- * under the terms of the GNU Affero General Public License as published
- * by the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
- extern crate tokio;
- extern crate tungstenite;
- extern crate futures_util;
- extern crate futures_channel;
- extern crate json;
- use std::env;
- use std::collections::HashMap;
- use std::sync::{Arc, Mutex};
- use tokio::net::{TcpListener, TcpStream};
- use tungstenite::protocol::Message;
- use tungstenite::handshake::server::{Request, Response};
- use futures_util::{future, pin_mut, StreamExt};
- use futures_util::stream::TryStreamExt;
- use futures_channel::mpsc;
- type Id = String;
- type Tx = mpsc::UnboundedSender<Message>;
- type ClientsMap = Arc<Mutex<HashMap<Id, Tx>>>;
- async fn handle(clients: ClientsMap, stream: TcpStream) {
- let mut client_id = Id::new();
- let callback = |req: &Request, response: Response| {
- let path: &str = req.uri().path();
- let tokens: Vec<&str> = path.split('/').collect();
- client_id = tokens[1].to_string();
- return Ok(response);
- };
- let websocket = tokio_tungstenite::accept_hdr_async(stream, callback)
- .await.expect("WebSocket handshake failed");
- println!("Client {} connected", &client_id);
- let (tx, rx) = mpsc::unbounded();
- clients.lock().unwrap().insert(client_id.clone(), tx);
- let (outgoing, incoming) = websocket.split();
- let forward = rx.map(Ok).forward(outgoing);
- let process = incoming.try_for_each(|msg| {
- if msg.is_text() {
- let text = msg.to_text().unwrap();
- println!("Client {} << {}", &client_id, &text);
- // Parse
- let mut content = json::parse(text).unwrap();
- let remote_id = content["id"].to_string();
- let mut locked = clients.lock().unwrap();
- match locked.get_mut(&remote_id) {
- Some(remote) => {
- // Format
- content.insert("id", client_id.clone()).unwrap();
- let text = json::stringify(content);
- // Send to remote
- println!("Client {} >> {}", &remote_id, &text);
- remote.unbounded_send(Message::text(text)).unwrap();
- },
- _ => println!("Client {} not found", &remote_id),
- }
- }
- future::ok(())
- });
- pin_mut!(process, forward);
- future::select(process, forward).await;
- println!("Client {} disconnected", &client_id);
- clients.lock().unwrap().remove(&client_id);
- }
- #[tokio::main]
- async fn main() -> Result<(), std::io::Error> {
- let service = env::args().nth(1).unwrap_or("8000".to_string());
- let endpoint = format!("127.0.0.1:{}", service);
- let mut listener = TcpListener::bind(endpoint)
- .await.expect("Listener binding failed");
- let clients = ClientsMap::new(Mutex::new(HashMap::new()));
- while let Ok((stream, _)) = listener.accept().await {
- tokio::spawn(handle(clients.clone(), stream));
- }
- return Ok(())
- }
|