Browse Source

hyper: add DB fortunes test (#4927)

Sean McArthur 6 years ago
parent
commit
9577a891c6

+ 15 - 2
frameworks/Rust/hyper/Cargo.toml

@@ -7,11 +7,22 @@ authors = [
     "Sean McArthur <[email protected]>"
 ]
 
+[[bin]]
+name = "hyper-techempower"
+path = "src/main.rs"
+
+[[bin]]
+name = "hyper-db-techempower"
+path = "src/main_db.rs"
+
 [dependencies]
 futures = "0.1"
 # Disable default runtime, so that tokio-core can be used instead.
 # See below for why...
 hyper = { version = "0.12", default-features = false }
+# Since no logs are allowed anyways, just compile-time turn them all off
+log = { version = "0.4", features = ["release_max_level_off"] }
+markup = "0.3.1"
 net2 = "0.2"
 num_cpus = "1.2"
 serde = "1.0"
@@ -20,8 +31,10 @@ serde_derive = "1.0"
 # tokio-core 0.1.11 is used because its technically faster
 # in these specific benchmarks, as work-stealing executors
 # aren't needed.
-tokio-core = "=0.1.11"
-tokio-io = "=0.1.4"
+tokio-core = { git = "https://github.com/seanmonstar/tokio-core", branch = "0.1.11-fix" }
+# Disable postgres' runtime feature as well...
+tokio-postgres = { version = "0.4.0-rc.3", default-features = false }
+v_htmlescape = "0.4"
 
 [profile.release]
 opt-level = 3

+ 16 - 0
frameworks/Rust/hyper/benchmark_config.json

@@ -17,6 +17,22 @@
       "database_os": "Linux",
       "display_name": "hyper",
       "notes": ""
+    },
+    "db": {
+      "fortune_url": "/fortune",
+      "port": 8080,
+      "approach": "Realistic",
+      "classification": "Micro",
+      "database": "Postgres",
+      "framework": "hyper",
+      "language": "Rust",
+      "orm": "Raw",
+      "platform": "Rust",
+      "webserver": "hyper",
+      "os": "Linux",
+      "database_os": "Linux",
+      "display_name": "hyper",
+      "notes": ""
     }
   }]
 }

+ 9 - 0
frameworks/Rust/hyper/hyper-db.dockerfile

@@ -0,0 +1,9 @@
+FROM rust:1.35
+
+ADD ./ /hyper
+WORKDIR /hyper
+
+RUN cargo clean
+RUN RUSTFLAGS="-C target-cpu=native" cargo build --release
+
+CMD ["./target/release/hyper-db-techempower"]

+ 1 - 1
frameworks/Rust/hyper/hyper.dockerfile

@@ -1,4 +1,4 @@
-FROM rust:1.29.1
+FROM rust:1.35
 
 ADD ./ /hyper
 WORKDIR /hyper

+ 64 - 0
frameworks/Rust/hyper/src/db.rs

@@ -0,0 +1,64 @@
+use std::net::SocketAddr;
+
+use futures::{Future, Stream};
+use tokio_core::net::TcpStream;
+use tokio_core::reactor::Handle;
+use tokio_postgres::{Client, Config, NoTls, Statement};
+
+pub struct Db {
+    client: Client,
+    fortune: Statement,
+}
+
+pub struct Fortune {
+    pub id: i32,
+    pub message: String,
+}
+
+pub fn connect(addr: SocketAddr, config: Config, handle: Handle) -> impl Future<Item = Db, Error = ()> {
+    TcpStream::connect(&addr, &handle)
+        .map_err(|e| panic!("error connecting to postgresql: {}", e))
+        .and_then(move |tcp| {
+            config
+                .connect_raw(tcp, NoTls)
+                .map_err(|e| panic!("error connecting to postgresql: {}", e))
+        })
+        .and_then(move |(mut client, conn)| {
+            handle.spawn(conn.map_err(|e| panic!("postgres connection error: {}", e)));
+
+            client
+                .prepare("SELECT id, message FROM fortune")
+                .map_err(|_| ())
+                .map(move |fortune| {
+                    Db {
+                        client,
+                        fortune,
+                    }
+                })
+        })
+}
+
+impl Db {
+    pub fn tell_fortune(
+        &mut self,
+    ) -> impl Future<Item = Vec<Fortune>, Error = ::tokio_postgres::Error> {
+        let items = vec![Fortune {
+            id: 0,
+            message: "Additional fortune added at request time.".to_string(),
+        }];
+
+        self.client
+            .query(&self.fortune, &[])
+            .fold(items, move |mut items, row| {
+                items.push(Fortune {
+                    id: row.get(0),
+                    message: row.get(1),
+                });
+                Ok(items)
+            })
+            .map(|mut items| {
+                items.sort_by(|it, next| it.message.cmp(&next.message));
+                items
+            })
+    }
+}

+ 18 - 70
frameworks/Rust/hyper/src/main.rs

@@ -7,19 +7,14 @@ extern crate serde_derive;
 extern crate serde_json;
 extern crate tokio_core;
 
-use std::io;
-use std::net::SocketAddr;
-use std::thread;
 
-use futures::{Future, Stream};
+use futures::Future;
 
 use hyper::{Body, Response, StatusCode};
 use hyper::header::{CONTENT_LENGTH, CONTENT_TYPE, SERVER, HeaderValue};
-use hyper::server::conn::Http;
 use hyper::service::service_fn_ok;
 
-use tokio_core::reactor::{Core, Handle};
-use tokio_core::net::TcpListener;
+mod server;
 
 static HELLO_WORLD: &'static [u8] = b"Hello, world!";
 
@@ -28,11 +23,8 @@ struct JsonResponse<'a> {
     message: &'a str,
 }
 
-fn server_thread() {
-    // Configure HTTP options
-    let mut http = Http::new();
-    http.pipeline_flush(true);
 
+fn main() {
     // It seems most of the other benchmarks create static header values
     // for performance, so just play by the same rules here...
     let plaintext_len = HeaderValue::from_static("13");
@@ -41,8 +33,12 @@ fn server_thread() {
     let json_ct = HeaderValue::from_static("application/json");
     let server_header = HeaderValue::from_static("hyper");
 
-    // This will create our `Service` to handle an individual connection.
-    let new_svc = move || {
+    server::run(move |socket, http, handle| {
+        // This closure is run for each connection...
+
+        // The plaintext benchmarks use pipelined requests.
+        http.pipeline_flush(true);
+
         // Gotta clone these to be able to move into the Service...
         let plaintext_len = plaintext_len.clone();
         let plaintext_ct = plaintext_ct.clone();
@@ -53,7 +49,7 @@ fn server_thread() {
         // This is the `Service` that will handle the connection.
         // `service_fn_ok` is a helper to convert a function that
         // returns a Response into a `Service`.
-        service_fn_ok(move |req| {
+        let svc = service_fn_ok(move |req| {
             let (req, _body) = req.into_parts();
             // For speed, reuse the allocated header map from the request,
             // instead of allocating a new one. Because.
@@ -88,61 +84,13 @@ fn server_thread() {
             let mut res = Response::new(body);
             *res.headers_mut() = headers;
             res
-        })
-    };
-
-    // Our event loop...
-    let mut core = Core::new().expect("core");
-    let handle = core.handle();
-
-    // Bind to 0.0.0.0:8080
-    let addr = SocketAddr::from(([0, 0, 0, 0], 8080));
-    let tcp = reuse_listener(&addr, &handle)
-        .expect("couldn't bind to addr");
-
-    // For every accepted connection, spawn an HTTP task
-    let server = tcp.incoming()
-        .for_each(move |(sock, _addr)| {
-            let _ = sock.set_nodelay(true);
-            let conn = http.serve_connection(sock, new_svc())
-                .map_err(|e| eprintln!("connection error: {}", e));
-
-            handle.spawn(conn);
-
-            Ok(())
-        })
-        .map_err(|e| eprintln!("accept error: {}", e));
-
-    core.run(server).expect("server");
-}
-
-
-fn reuse_listener(addr: &SocketAddr, handle: &Handle) -> io::Result<TcpListener> {
-    let builder = match *addr {
-        SocketAddr::V4(_) => net2::TcpBuilder::new_v4()?,
-        SocketAddr::V6(_) => net2::TcpBuilder::new_v6()?,
-    };
-
-    #[cfg(unix)]
-    {
-        use net2::unix::UnixTcpBuilderExt;
-        if let Err(e) = builder.reuse_port(true) {
-            eprintln!("error setting SO_REUSEPORT: {}", e);
-        }
-    }
-
-    builder.reuse_address(true)?;
-    builder.bind(addr)?;
-    builder.listen(1024).and_then(|l| {
-        TcpListener::from_listener(l, addr, handle)
+        });
+
+        // Spawn the `serve_connection` future into the runtime.
+        handle.spawn(
+            http
+                .serve_connection(socket, svc)
+                .map_err(|e| eprintln!("connection error: {}", e))
+        );
     })
 }
-
-fn main() {
-    // Spawn a thread for each available core, minus one, since we'll
-    // reuse the main thread as a server thread as well.
-    for _ in 1..num_cpus::get() {
-        thread::spawn(server_thread);
-    }
-    server_thread();
-}

+ 114 - 0
frameworks/Rust/hyper/src/main_db.rs

@@ -0,0 +1,114 @@
+extern crate futures;
+extern crate hyper;
+extern crate net2;
+extern crate tokio_core;
+extern crate tokio_postgres;
+
+use std::fmt::Write;
+use std::net::ToSocketAddrs;
+
+use futures::{future, Future};
+use hyper::header::{CONTENT_TYPE, SERVER, HeaderValue};
+use hyper::service::service_fn;
+use hyper::{Body, Response};
+
+mod db;
+mod server;
+
+fn main() {
+    //"postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
+    let mut psql_config = tokio_postgres::Config::new();
+    psql_config
+        .user("benchmarkdbuser")
+        .password("benchmarkdbpass")
+        .dbname("hello_world");
+
+    let psql_addr = ("tfb-database", 5432)
+        .to_socket_addrs()
+        .expect("must be able to resolve database hostname")
+        .next()
+        .expect("database hostname must resolve to an address");
+
+    server::run(move |socket, http, handle| {
+        let http = http.clone();
+        let handle2 = handle.clone();
+
+        let html_ct = HeaderValue::from_static("text/html; charset=utf-8");
+        let server_header = HeaderValue::from_static("hyper");
+
+        // Before handling any requests, we should grab a DB connection.
+        let db_fut = db::connect(psql_addr, psql_config.clone(), handle.clone())
+            .map(move |mut db_conn| {
+                let html_ct = html_ct.clone();
+                let server_header = server_header.clone();
+
+                // This is the `Service` that will handle the connection.
+                // `service_fn` is a helper to convert a function that
+                // returns a Future<Item=Response> into a `Service`.
+                let svc = service_fn(move |req| {
+                    let (req, _body) = req.into_parts();
+                    // For speed, reuse the allocated header map from the request,
+                    // instead of allocating a new one. Because.
+                    let mut headers = req.headers;
+                    headers.clear();
+
+                    headers.insert(CONTENT_TYPE, html_ct.clone());
+                    headers.insert(SERVER, server_header.clone());
+
+                    match req.uri.path() {
+                        "/fortune" => {
+                            future::Either::A(db_conn
+                                .tell_fortune()
+                                .map(move |fortunes| {
+                                    let mut buf = String::with_capacity(2048);
+                                    let _ = write!(&mut buf, "{}", FortunesTemplate {
+                                        fortunes,
+                                    });
+                                    let mut res = Response::new(Body::from(buf));
+                                    *res.headers_mut() = headers;
+                                    res
+                                }))
+                        },
+                        _ => {
+                            let mut res = Response::new(Body::empty());
+                            *res.status_mut() = hyper::StatusCode::NOT_FOUND;
+                            *res.headers_mut() = headers;
+                            future::Either::B(future::ok(res))
+                        },
+                    }
+                });
+
+
+                // Spawn the `serve_connection` future into the runtime.
+                handle2.spawn(
+                    http
+                        .serve_connection(socket, svc)
+                        .map_err(|e| eprintln!("connection error: {}", e))
+                );
+
+            });
+        handle.spawn(db_fut);
+    });
+}
+
+markup::define! {
+    FortunesTemplate(fortunes: Vec<db::Fortune>) {
+        {markup::doctype()}
+        html {
+            head {
+                title { "Fortunes" }
+            }
+            body {
+                table {
+                    tr { th { "id" } th { "message" } }
+                    @for item in {fortunes} {
+                        tr {
+                            td { {item.id} }
+                            td { {markup::raw(v_htmlescape::escape(&item.message))} }
+                        }
+                    }
+                }
+            }
+        }
+    }
+}

+ 72 - 0
frameworks/Rust/hyper/src/server.rs

@@ -0,0 +1,72 @@
+use std::io;
+use std::net::SocketAddr;
+use std::thread;
+
+use futures::{Future, Stream};
+use hyper::server::conn::Http;
+use tokio_core::reactor::{Core, Handle};
+use tokio_core::net::{TcpListener, TcpStream};
+
+pub(crate) fn run<F>(per_connection: F)
+where
+    F: Fn(TcpStream, &mut Http, &Handle) + Clone + Send + 'static,
+{
+    // Spawn a thread for each available core, minus one, since we'll
+    // reuse the main thread as a server thread as well.
+    for _ in 1..num_cpus::get() {
+        let per_connection = per_connection.clone();
+        thread::spawn(move || {
+            server_thread(per_connection);
+        });
+    }
+    server_thread(per_connection);
+}
+
+fn server_thread<F>(per_connection: F)
+where
+    F: Fn(TcpStream, &mut Http, &Handle) + Send + 'static,
+{
+    let mut http = Http::new();
+    http.http1_only(true);
+
+    // Our event loop...
+    let mut core = Core::new().expect("core");
+    let handle = core.handle();
+
+    // Bind to 0.0.0.0:8080
+    let addr = SocketAddr::from(([0, 0, 0, 0], 8080));
+    let tcp = reuse_listener(&addr, &handle)
+        .expect("couldn't bind to addr");
+
+    // For every accepted connection, spawn an HTTP task
+    let server = tcp.incoming()
+        .for_each(move |(sock, _addr)| {
+            let _ = sock.set_nodelay(true);
+            per_connection(sock, &mut http, &handle);
+            Ok(())
+        })
+        .map_err(|e| eprintln!("accept error: {}", e));
+
+    core.run(server).expect("server");
+}
+
+fn reuse_listener(addr: &SocketAddr, handle: &Handle) -> io::Result<TcpListener> {
+    let builder = match *addr {
+        SocketAddr::V4(_) => net2::TcpBuilder::new_v4()?,
+        SocketAddr::V6(_) => net2::TcpBuilder::new_v6()?,
+    };
+
+    #[cfg(unix)]
+    {
+        use net2::unix::UnixTcpBuilderExt;
+        if let Err(e) = builder.reuse_port(true) {
+            eprintln!("error setting SO_REUSEPORT: {}", e);
+        }
+    }
+
+    builder.reuse_address(true)?;
+    builder.bind(addr)?;
+    builder.listen(1024).and_then(|l| {
+        TcpListener::from_listener(l, addr, handle)
+    })
+}