Browse Source

hyper: upgrade to hyper 0.12 (#3824)

Sean McArthur 7 years ago
parent
commit
12c196f61d

+ 8 - 3
frameworks/Rust/hyper/Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 name = "hyper-techempower"
-version = "0.4.0"
+version = "0.5.0"
 authors = [
     "Steve Klabnik <[email protected]>",
     "Alexander Polyakov <[email protected]>",
@@ -9,12 +9,17 @@ authors = [
 
 [dependencies]
 futures = "0.1"
-hyper = { version = "0.11.26", default-features = false }
+# Disable default runtime, so that tokio-core can be used instead.
+# See below for why...
+hyper = { version = "0.12", default-features = false }
+net2 = "0.2"
 num_cpus = "1.2"
 serde = "1.0"
 serde_json = "1.0"
 serde_derive = "1.0"
-#tokio = "0.1"
+# tokio-core 0.1.11 is used because its technically faster
+# in these specific benchmarks, as work-stealing executors
+# aren't needed.
 tokio-core = "=0.1.11"
 tokio-io = "=0.1.4"
 

+ 2 - 2
frameworks/Rust/hyper/hyper.dockerfile

@@ -1,4 +1,4 @@
-FROM rust:1.25
+FROM rust:1.26
 
 ADD ./ /hyper
 WORKDIR /hyper
@@ -6,4 +6,4 @@ WORKDIR /hyper
 RUN cargo clean
 RUN RUSTFLAGS="-C target-cpu=native" cargo build --release
 
-CMD ["./target/release/hyper-techempower", "pipeline"]
+CMD ["./target/release/hyper-techempower"]

+ 110 - 89
frameworks/Rust/hyper/src/main.rs

@@ -1,23 +1,25 @@
 extern crate futures;
 extern crate hyper;
+extern crate net2;
 extern crate num_cpus;
 #[macro_use]
 extern crate serde_derive;
 extern crate serde_json;
-//extern crate tokio;
+extern crate tokio_core;
 
-use std::env;
+use std::io;
 use std::net::SocketAddr;
-use std::process;
+use std::thread;
 
-use futures::{future}; //, Future, Stream};
+use futures::{Future, Stream};
 
-use hyper::Method::Get;
-use hyper::header::{ContentLength, ContentType, Server};
-use hyper::StatusCode::NotFound;
-use hyper::server::{Http, Service, Request, Response};
+use hyper::{Body, Response, StatusCode};
+use hyper::header::{CONTENT_LENGTH, CONTENT_TYPE, SERVER, HeaderValue};
+use hyper::server::conn::Http;
+use hyper::service::service_fn_ok;
 
-//use tokio::net::TcpListener;
+use tokio_core::reactor::{Core, Handle};
+use tokio_core::net::TcpListener;
 
 static HELLO_WORLD: &'static [u8] = b"Hello, world!";
 
@@ -26,102 +28,121 @@ struct JsonResponse<'a> {
     message: &'a str,
 }
 
-struct TechEmpower;
-
-impl Service for TechEmpower {
-    type Request = Request;
-    type Response = Response;
-    type Error = hyper::Error;
-    type Future = future::FutureResult<Response, hyper::Error>;
-
-    fn call(&self, req: Request) -> Self::Future {
-        let response = match (req.method(), req.path()) {
-            (&Get, "/plaintext") => {
-                Response::new()
-                    .with_header(ContentLength(HELLO_WORLD.len() as u64))
-                    .with_header(ContentType::text())
-                    .with_body(HELLO_WORLD)
-            }
-            (&Get, "/json") => {
-                let rep = JsonResponse { message: "Hello, world!" };
-                let rep_body = serde_json::to_vec(&rep).unwrap();
-                Response::new()
-                    .with_header(ContentLength(rep_body.len() as u64))
-                    .with_header(ContentType::json())
-                    .with_body(rep_body)
-            }
-            _ => Response::new().with_status(NotFound),
-        };
-        future::ok(response.with_header(Server::new("hyper")))
-    }
-}
-
-fn configure() -> Http {
-    let pipeline = {
-        let mut args = env::args();
-        args.next().expect("first arg is this binary");
-
-        args.next()
-            .map(|arg| {
-                if arg == "pipeline" {
-                    true
-                } else {
-                    eprintln!("unknown second argument: {:?}", arg);
-                    process::exit(1);
+fn server_thread() {
+    // Configure HTTP options
+    let mut http = Http::new();
+    http.pipeline_flush(true);
+
+    // It seems most of the other benchmarks create static header values
+    // for performance, so just play by the same rules here...
+    let plaintext_len = HeaderValue::from_static("13");
+    let plaintext_ct = HeaderValue::from_static("text/plain");
+    let json_len = HeaderValue::from_static("27");
+    let json_ct = HeaderValue::from_static("application/json");
+    let server_header = HeaderValue::from_static("hyper");
+
+    // This will create our `Service` to handle an individual connection.
+    let new_svc = move || {
+        // Gotta clone these to be able to move into the Service...
+        let plaintext_len = plaintext_len.clone();
+        let plaintext_ct = plaintext_ct.clone();
+        let json_len = json_len.clone();
+        let json_ct = json_ct.clone();
+        let server_header = server_header.clone();
+
+        // This is the `Service` that will handle the connection.
+        // `service_fn_ok` is a helper to convert a function that
+        // returns a Response into a `Service`.
+        service_fn_ok(move |req| {
+            let (req, _body) = req.into_parts();
+            // For speed, reuse the allocated header map from the request,
+            // instead of allocating a new one. Because.
+            let mut headers = req.headers;
+            headers.clear();
+
+            let body = match req.uri.path() {
+                // Apparently, other benchmarks don't check the method, so we
+                // don't either. Yay?
+                "/plaintext" => {
+                    headers.insert(CONTENT_LENGTH, plaintext_len.clone());
+                    headers.insert(CONTENT_TYPE, plaintext_ct.clone());
+                    Body::from(HELLO_WORLD)
+                }
+                "/json" => {
+                    let rep = JsonResponse { message: "Hello, world!" };
+                    let rep_body = serde_json::to_vec(&rep).unwrap();
+                    headers.insert(CONTENT_LENGTH, json_len.clone());
+                    headers.insert(CONTENT_TYPE, json_ct.clone());
+                    Body::from(rep_body)
                 }
-            })
-            .unwrap_or(false)
+                _ => {
+                    let mut res = Response::new(Body::empty());
+                    *res.status_mut() = StatusCode::NOT_FOUND;
+                    *res.headers_mut() = headers;
+                    return res;
+                },
+            };
+
+            headers.insert(SERVER, server_header.clone());
+
+            let mut res = Response::new(body);
+            *res.headers_mut() = headers;
+            res
+        })
     };
 
-    // Set our hyper options
-    // (pipeline is desired for the plaintext route)
-    let mut http = Http::<hyper::Chunk>::new();
-    http.pipeline(pipeline);
-    http
-}
-
-fn main() {
-    // Check for some runtime configuration
-    let http = configure();
-
-    // Bind to 0.0.0.0:8080
-    let addr = SocketAddr::from(([0, 0, 0, 0], 8080));
-
-    let srv = http
-        .bind(&addr, || Ok(TechEmpower))
-        .expect("bind to address");
-
-    println!("Listening on http://{}", srv.local_addr().unwrap());
-
-    // Use newer hyper dispatcher
-    srv.run_threads(num_cpus::get());
-}
-
-/* This is the future, but there's still a few blockers in new tokio,
- * so disable this for now while we work them out.
-fn main() {
-    // Check for some runtime configuration
-    let http = configure();
+    // Our event loop...
+    let mut core = Core::new().expect("core");
+    let handle = core.handle();
 
     // Bind to 0.0.0.0:8080
     let addr = SocketAddr::from(([0, 0, 0, 0], 8080));
-    let tcp = TcpListener::bind(&addr)
+    let tcp = reuse_listener(&addr, &handle)
         .expect("couldn't bind to addr");
 
     // For every accepted connection, spawn an HTTP task
     let server = tcp.incoming()
-        .for_each(move |sock| {
+        .for_each(move |(sock, _addr)| {
             let _ = sock.set_nodelay(true);
-            let conn = http.serve_connection(sock, TechEmpower)
+            let conn = http.serve_connection(sock, new_svc())
                 .map_err(|e| eprintln!("connection error: {}", e));
 
-            tokio::spawn(conn);
+            handle.spawn(conn);
 
             Ok(())
         })
         .map_err(|e| eprintln!("accept error: {}", e));
 
-    println!("Listening on http://{}", addr);
-    tokio::run(server);
+    core.run(server).expect("server");
+}
+
+
+fn reuse_listener(addr: &SocketAddr, handle: &Handle) -> io::Result<TcpListener> {
+    let builder = match *addr {
+        SocketAddr::V4(_) => net2::TcpBuilder::new_v4()?,
+        SocketAddr::V6(_) => net2::TcpBuilder::new_v6()?,
+    };
+
+    #[cfg(unix)]
+    {
+        use net2::unix::UnixTcpBuilderExt;
+        if let Err(e) = builder.reuse_port(true) {
+            eprintln!("error setting SO_REUSEPORT: {}", e);
+        }
+    }
+
+    builder.reuse_address(true)?;
+    builder.bind(addr)?;
+    builder.listen(1024).and_then(|l| {
+        TcpListener::from_listener(l, addr, handle)
+    })
+}
+
+fn main() {
+    // Spawn a thread for each available core, minus one, since we'll
+    // reuse the main thread as a server thread as well.
+    for _ in 1..num_cpus::get() {
+        thread::spawn(server_thread);
+    }
+    server_thread();
 }
-*/