Browse Source

refactor Rust/hyper to futures 0.3 & update toolchain (#7951)

Alexander Polakov 2 years ago
parent
commit
79f2dbf3c9

File diff suppressed because it is too large
+ 213 - 411
frameworks/Rust/hyper/Cargo.lock


+ 5 - 12
frameworks/Rust/hyper/Cargo.toml

@@ -17,24 +17,17 @@ name = "hyper-db-techempower"
 path = "src/main_db.rs"
 
 [dependencies]
-futures = "0.1"
-# Disable default runtime, so that tokio-core can be used instead.
-# See below for why...
-hyper = { version = "0.14", default-features = false }
+# Disable default runtime, so that tokio single thread can be used instead.
+hyper = { version = "0.14", features = ["server", "http1"], default-features = false }
 # Since no logs are allowed anyways, just compile-time turn them all off
 log = { version = "0.4", features = ["release_max_level_off"] }
 markup = "0.3.1"
 net2 = "0.2"
 num_cpus = "1.2"
-serde = "1.0"
+serde = { version = "1.0", features = ["derive"] }
 serde_json = "1.0"
-serde_derive = "1.0"
-# tokio-core 0.1.11 is used because its technically faster
-# in these specific benchmarks, as work-stealing executors
-# aren't needed.
-tokio-core = { git = "https://github.com/seanmonstar/tokio-core", branch = "0.1.11-fix" }
-# Disable postgres' runtime feature as well...
-tokio-postgres = { version = "0.4.0-rc.3", default-features = false }
+tokio = { version = "1", features = ["rt", "net", "time"] }
+tokio-postgres = { version = "0.7", default-features = false, features = ["runtime"] }
 v_htmlescape = "0.10"
 
 [profile.release]

+ 1 - 1
frameworks/Rust/hyper/hyper-db.dockerfile

@@ -1,4 +1,4 @@
-FROM rust:1.44
+FROM rust:1.67
 
 ADD ./ /hyper
 WORKDIR /hyper

+ 1 - 1
frameworks/Rust/hyper/hyper.dockerfile

@@ -1,4 +1,4 @@
-FROM rust:1.44
+FROM rust:1.67
 
 ADD ./ /hyper
 WORKDIR /hyper

+ 20 - 37
frameworks/Rust/hyper/src/db.rs

@@ -1,8 +1,7 @@
 use std::net::SocketAddr;
 
-use futures::{Future, Stream};
-use tokio_core::net::TcpStream;
-use tokio_core::reactor::Handle;
+use tokio::net::TcpStream;
+use tokio::runtime::Handle;
 use tokio_postgres::{Client, Config, NoTls, Statement};
 
 pub struct Db {
@@ -15,49 +14,33 @@ pub struct Fortune {
     pub message: String,
 }
 
-pub fn connect(
+pub async fn connect(
     addr: SocketAddr,
     config: Config,
     handle: Handle,
-) -> impl Future<Item = Db, Error = ()> {
-    TcpStream::connect(&addr, &handle)
-        .map_err(|e| panic!("error connecting to postgresql: {}", e))
-        .and_then(move |tcp| {
-            config
-                .connect_raw(tcp, NoTls)
-                .map_err(|e| panic!("error connecting to postgresql: {}", e))
-        })
-        .and_then(move |(mut client, conn)| {
-            handle.spawn(conn.map_err(|e| panic!("postgres connection error: {}", e)));
-
-            client
-                .prepare("SELECT id, message FROM fortune")
-                .map_err(|_| ())
-                .map(move |fortune| Db { client, fortune })
-        })
+) -> Result<Db, Box<dyn std::error::Error>> {
+    let stream = TcpStream::connect(&addr).await?;
+    let (client, conn) = config.connect_raw(stream, NoTls).await?;
+    handle.spawn(conn);
+    let fortune = client.prepare("SELECT id, message FROM fortune").await?;
+    Ok(Db { client, fortune })
 }
 
 impl Db {
-    pub fn tell_fortune(
-        &mut self,
-    ) -> impl Future<Item = Vec<Fortune>, Error = ::tokio_postgres::Error> {
-        let items = vec![Fortune {
+    pub async fn tell_fortune(&self) -> Result<Vec<Fortune>, tokio_postgres::Error> {
+        let mut items = vec![Fortune {
             id: 0,
             message: "Additional fortune added at request time.".to_string(),
         }];
 
-        self.client
-            .query(&self.fortune, &[])
-            .fold(items, move |mut items, row| {
-                items.push(Fortune {
-                    id: row.get(0),
-                    message: row.get(1),
-                });
-                Ok(items)
-            })
-            .map(|mut items| {
-                items.sort_by(|it, next| it.message.cmp(&next.message));
-                items
-            })
+        let rows = self.client.query(&self.fortune, &[]).await?;
+        for row in rows {
+            items.push(Fortune {
+                id: row.get(0),
+                message: row.get(1),
+            });
+        }
+        items.sort_by(|it, next| it.message.cmp(&next.message));
+        Ok(items)
     }
 }

+ 47 - 51
frameworks/Rust/hyper/src/main.rs

@@ -1,21 +1,11 @@
-extern crate futures;
-extern crate hyper;
-extern crate net2;
-extern crate num_cpus;
-#[macro_use]
-extern crate serde_derive;
-extern crate serde_json;
-extern crate tokio_core;
-
-use futures::Future;
-
 use hyper::header::{HeaderValue, CONTENT_LENGTH, CONTENT_TYPE, SERVER};
-use hyper::service::service_fn_ok;
+use hyper::service::service_fn;
 use hyper::{Body, Response, StatusCode};
+use serde::Serialize;
 
 mod server;
 
-static HELLO_WORLD: &'static [u8] = b"Hello, world!";
+static HELLO_WORLD: &[u8] = b"Hello, world!";
 
 #[derive(Serialize)]
 struct JsonResponse<'a> {
@@ -47,49 +37,55 @@ fn main() {
         // This is the `Service` that will handle the connection.
         // `service_fn_ok` is a helper to convert a function that
         // returns a Response into a `Service`.
-        let svc = service_fn_ok(move |req| {
-            let (req, _body) = req.into_parts();
-            // For speed, reuse the allocated header map from the request,
-            // instead of allocating a new one. Because.
-            let mut headers = req.headers;
-            headers.clear();
+        let svc = service_fn(move |req| {
+            // Gotta clone these to be able to move into future...
+            let plaintext_len = plaintext_len.clone();
+            let plaintext_ct = plaintext_ct.clone();
+            let json_len = json_len.clone();
+            let json_ct = json_ct.clone();
+            let server_header = server_header.clone();
+
+            async move {
+                let (req, _body) = req.into_parts();
+                // For speed, reuse the allocated header map from the request,
+                // instead of allocating a new one. Because.
+                let mut headers = req.headers;
+                headers.clear();
 
-            let body = match req.uri.path() {
-                // Apparently, other benchmarks don't check the method, so we
-                // don't either. Yay?
-                "/plaintext" => {
-                    headers.insert(CONTENT_LENGTH, plaintext_len.clone());
-                    headers.insert(CONTENT_TYPE, plaintext_ct.clone());
-                    Body::from(HELLO_WORLD)
-                }
-                "/json" => {
-                    let rep = JsonResponse {
-                        message: "Hello, world!",
-                    };
-                    let rep_body = serde_json::to_vec(&rep).unwrap();
-                    headers.insert(CONTENT_LENGTH, json_len.clone());
-                    headers.insert(CONTENT_TYPE, json_ct.clone());
-                    Body::from(rep_body)
-                }
-                _ => {
-                    let mut res = Response::new(Body::empty());
-                    *res.status_mut() = StatusCode::NOT_FOUND;
-                    *res.headers_mut() = headers;
-                    return res;
-                }
-            };
+                let body = match req.uri.path() {
+                    // Apparently, other benchmarks don't check the method, so we
+                    // don't either. Yay?
+                    "/plaintext" => {
+                        headers.insert(CONTENT_LENGTH, plaintext_len.clone());
+                        headers.insert(CONTENT_TYPE, plaintext_ct.clone());
+                        Body::from(HELLO_WORLD)
+                    }
+                    "/json" => {
+                        let rep = JsonResponse {
+                            message: "Hello, world!",
+                        };
+                        let rep_body = serde_json::to_vec(&rep).unwrap();
+                        headers.insert(CONTENT_LENGTH, json_len.clone());
+                        headers.insert(CONTENT_TYPE, json_ct.clone());
+                        Body::from(rep_body)
+                    }
+                    _ => {
+                        let mut res = Response::new(Body::empty());
+                        *res.status_mut() = StatusCode::NOT_FOUND;
+                        *res.headers_mut() = headers;
+                        return Ok::<_, std::io::Error>(res);
+                    }
+                };
 
-            headers.insert(SERVER, server_header.clone());
+                headers.insert(SERVER, server_header.clone());
 
-            let mut res = Response::new(body);
-            *res.headers_mut() = headers;
-            res
+                let mut res = Response::new(body);
+                *res.headers_mut() = headers;
+                Ok(res)
+            }
         });
 
         // Spawn the `serve_connection` future into the runtime.
-        handle.spawn(
-            http.serve_connection(socket, svc)
-                .map_err(|e| eprintln!("connection error: {}", e)),
-        );
+        handle.spawn(http.serve_connection(socket, svc));
     })
 }

+ 33 - 31
frameworks/Rust/hyper/src/main_db.rs

@@ -1,16 +1,10 @@
-extern crate futures;
-extern crate hyper;
-extern crate net2;
-extern crate tokio_core;
-extern crate tokio_postgres;
-
 use std::fmt::Write;
-use std::net::ToSocketAddrs;
+use std::sync::Arc;
 
-use futures::{future, Future};
 use hyper::header::{HeaderValue, CONTENT_TYPE, SERVER};
 use hyper::service::service_fn;
 use hyper::{Body, Response};
+use std::net::ToSocketAddrs;
 
 mod db;
 mod server;
@@ -22,7 +16,6 @@ fn main() {
         .user("benchmarkdbuser")
         .password("benchmarkdbpass")
         .dbname("hello_world");
-
     let psql_addr = ("tfb-database", 5432)
         .to_socket_addrs()
         .expect("must be able to resolve database hostname")
@@ -35,17 +28,26 @@ fn main() {
 
         let html_ct = HeaderValue::from_static("text/html; charset=utf-8");
         let server_header = HeaderValue::from_static("hyper");
+        let config = psql_config.clone();
 
         // Before handling any requests, we should grab a DB connection.
-        let db_fut =
-            db::connect(psql_addr, psql_config.clone(), handle.clone()).map(move |mut db_conn| {
+        let db_fut = async move {
+            let handle = handle2.clone();
+            let db_conn = db::connect(psql_addr, config, handle).await?;
+            let db_conn = Arc::new(db_conn);
+
+            let html_ct = html_ct.clone();
+            let server_header = server_header.clone();
+
+            // This is the `Service` that will handle the connection.
+            // `service_fn` is a helper to convert a function that
+            // returns a Future<Item=Response> into a `Service`.
+            let svc = service_fn(move |req| {
                 let html_ct = html_ct.clone();
                 let server_header = server_header.clone();
+                let db_conn = db_conn.clone();
 
-                // This is the `Service` that will handle the connection.
-                // `service_fn` is a helper to convert a function that
-                // returns a Future<Item=Response> into a `Service`.
-                let svc = service_fn(move |req| {
+                async move {
                     let (req, _body) = req.into_parts();
                     // For speed, reuse the allocated header map from the request,
                     // instead of allocating a new one. Because.
@@ -57,30 +59,30 @@ fn main() {
 
                     match req.uri.path() {
                         "/fortunes" => {
-                            future::Either::A(db_conn.tell_fortune().map(move |fortunes| {
-                                let mut buf = String::with_capacity(2048);
-                                let _ = write!(&mut buf, "{}", FortunesTemplate { fortunes });
-                                let mut res = Response::new(Body::from(buf));
-                                *res.headers_mut() = headers;
-                                res
-                            }))
+                            let fortunes = db_conn.tell_fortune().await?;
+                            let mut buf = String::with_capacity(2048);
+                            let _ = write!(&mut buf, "{}", FortunesTemplate { fortunes });
+                            let mut res = Response::new(Body::from(buf));
+                            *res.headers_mut() = headers;
+                            Ok::<_, Box<dyn std::error::Error + Send + Sync>>(res)
                         }
                         _ => {
                             let mut res = Response::new(Body::empty());
                             *res.status_mut() = hyper::StatusCode::NOT_FOUND;
                             *res.headers_mut() = headers;
-                            future::Either::B(future::ok(res))
+                            Ok(res)
                         }
                     }
-                });
-
-                // Spawn the `serve_connection` future into the runtime.
-                handle2.spawn(
-                    http.serve_connection(socket, svc)
-                        .map_err(|e| eprintln!("connection error: {}", e)),
-                );
+                }
             });
-        handle.spawn(db_fut);
+            // Spawn the `serve_connection` future into the runtime.
+            handle2.spawn(http.serve_connection(socket, svc));
+            Ok::<_, Box<dyn std::error::Error>>(())
+        };
+
+        handle.spawn(async move {
+            let _ = db_fut.await;
+        });
     });
 }
 

+ 27 - 20
frameworks/Rust/hyper/src/server.rs

@@ -2,14 +2,13 @@ use std::io;
 use std::net::SocketAddr;
 use std::thread;
 
-use futures::{Future, Stream};
 use hyper::server::conn::Http;
-use tokio_core::net::{TcpListener, TcpStream};
-use tokio_core::reactor::{Core, Handle};
+use tokio::net::{TcpListener, TcpStream};
+use tokio::runtime::{Builder as RuntimeBuilder, Handle};
 
 pub(crate) fn run<F>(per_connection: F)
 where
-    F: Fn(TcpStream, &mut Http, &Handle) + Clone + Send + 'static,
+    F: Fn(TcpStream, &mut Http, Handle) + Clone + Send + 'static,
 {
     // Spawn a thread for each available core, minus one, since we'll
     // reuse the main thread as a server thread as well.
@@ -24,33 +23,41 @@ where
 
 fn server_thread<F>(per_connection: F)
 where
-    F: Fn(TcpStream, &mut Http, &Handle) + Send + 'static,
+    F: Fn(TcpStream, &mut Http, Handle) + Send + 'static,
 {
     let mut http = Http::new();
     http.http1_only(true);
 
     // Our event loop...
-    let mut core = Core::new().expect("core");
+    let core = RuntimeBuilder::new_current_thread()
+        .enable_all()
+        .build()
+        .expect("runtime");
     let handle = core.handle();
 
     // Bind to 0.0.0.0:8080
     let addr = SocketAddr::from(([0, 0, 0, 0], 8080));
-    let tcp = reuse_listener(&addr, &handle).expect("couldn't bind to addr");
 
     // For every accepted connection, spawn an HTTP task
-    let server = tcp
-        .incoming()
-        .for_each(move |(sock, _addr)| {
-            let _ = sock.set_nodelay(true);
-            per_connection(sock, &mut http, &handle);
-            Ok(())
-        })
-        .map_err(|e| eprintln!("accept error: {}", e));
+    let server = async move {
+        let tcp = reuse_listener(&addr).expect("couldn't bind to addr");
+        loop {
+            match tcp.accept().await {
+                Ok((sock, _)) => {
+                    let _ = sock.set_nodelay(true);
+                    per_connection(sock, &mut http, handle.clone());
+                }
+                Err(e) => {
+                    log::warn!("accept error: {}", e)
+                }
+            }
+        }
+    };
 
-    core.run(server).expect("server");
+    core.block_on(server);
 }
 
-fn reuse_listener(addr: &SocketAddr, handle: &Handle) -> io::Result<TcpListener> {
+fn reuse_listener(addr: &SocketAddr) -> io::Result<TcpListener> {
     let builder = match *addr {
         SocketAddr::V4(_) => net2::TcpBuilder::new_v4()?,
         SocketAddr::V6(_) => net2::TcpBuilder::new_v6()?,
@@ -66,7 +73,7 @@ fn reuse_listener(addr: &SocketAddr, handle: &Handle) -> io::Result<TcpListener>
 
     builder.reuse_address(true)?;
     builder.bind(addr)?;
-    builder
-        .listen(1024)
-        .and_then(|l| TcpListener::from_listener(l, addr, handle))
+    let listener = builder.listen(1024)?;
+    listener.set_nonblocking(true)?;
+    TcpListener::from_std(listener)
 }

Some files were not shown because too many files changed in this diff