Browse Source

optimize fortunes benchmark (#4043)

Nikolay Kim 7 years ago
parent
commit
f86dee7927

File diff suppressed because it is too large
+ 269 - 220
frameworks/Rust/actix/Cargo.lock


+ 1 - 1
frameworks/Rust/actix/Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 name = "actix"
-version = "0.7.1"
+version = "0.7.2"
 build = "build.rs"
 
 [[bin]]

+ 1 - 1
frameworks/Rust/actix/actix-diesel.dockerfile

@@ -1,4 +1,4 @@
-FROM rust:1.28
+FROM rust:1.29.0
 
 ADD ./ /actix
 WORKDIR /actix

+ 1 - 1
frameworks/Rust/actix/actix-pg.dockerfile

@@ -1,4 +1,4 @@
-FROM rust:1.28
+FROM rust:1.29.0
 
 ADD ./ /actix
 WORKDIR /actix

+ 1 - 1
frameworks/Rust/actix/actix-raw.dockerfile

@@ -1,4 +1,4 @@
-FROM rust:1.28
+FROM rust:1.29.0
 
 ADD ./ /actix
 WORKDIR /actix

+ 1 - 1
frameworks/Rust/actix/actix.dockerfile

@@ -1,4 +1,4 @@
-FROM rust:1.28
+FROM rust:1.29.0
 
 ADD ./ /actix
 WORKDIR /actix

+ 2 - 4
frameworks/Rust/actix/src/db_pg.rs

@@ -68,8 +68,7 @@ impl PgConnection {
                     act.cl = Some(cl);
                     Arbiter::spawn(conn.map_err(|e| panic!("{}", e)));
                     fut::ok(())
-                })
-                .wait(ctx);
+                }).wait(ctx);
 
             act
         })
@@ -228,8 +227,7 @@ impl Handler<TellFortune> for PgConnection {
                         message: row.get(1),
                     });
                     Ok::<_, io::Error>(items)
-                })
-                .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
+                }).map_err(|e| io::Error::new(io::ErrorKind::Other, e))
                 .and_then(|mut items| {
                     items.sort_by(|it, next| it.message.cmp(&next.message));
                     Ok(items)

+ 19 - 16
frameworks/Rust/actix/src/db_pg_direct.rs

@@ -16,7 +16,7 @@ pub struct PgConnection {
 }
 
 impl PgConnection {
-    pub fn connect(db_url: &str) -> impl Future<Item=PgConnection, Error=()> {
+    pub fn connect(db_url: &str) -> impl Future<Item = PgConnection, Error = ()> {
         let hs = connect(db_url.parse().unwrap(), TlsMode::None);
 
         hs.map_err(|_| panic!("can not connect to postgresql"))
@@ -32,12 +32,11 @@ impl PgConnection {
                                 cl.prepare("SELECT id FROM world WHERE id=$1")
                                     .map_err(|_| ())
                                     .and_then(|update| {
-
                                         Ok(PgConnection {
                                             cl,
                                             fortune: st.0,
                                             world: st.1,
-                                            update
+                                            update,
                                         })
                                     })
                             })
@@ -47,8 +46,7 @@ impl PgConnection {
 }
 
 impl PgConnection {
-
-    pub fn get_world(&self) -> impl Future<Item=World, Error=io::Error> {
+    pub fn get_world(&self) -> impl Future<Item = World, Error = io::Error> {
         let random_id = thread_rng().gen_range::<i32>(1, 10_001);
 
         self.cl
@@ -64,7 +62,9 @@ impl PgConnection {
             })
     }
 
-    pub fn get_worlds(&self, num: usize) -> impl Future<Item=Vec<World>, Error=io::Error> {
+    pub fn get_worlds(
+        &self, num: usize,
+    ) -> impl Future<Item = Vec<World>, Error = io::Error> {
         let mut worlds = Vec::with_capacity(num);
         for _ in 0..num {
             let w_id: i32 = thread_rng().gen_range(1, 10_001);
@@ -86,7 +86,9 @@ impl PgConnection {
         stream::futures_unordered(worlds).collect()
     }
 
-    pub fn update(&self, num: usize) -> impl Future<Item=Vec<World>, Error=io::Error> {
+    pub fn update(
+        &self, num: usize,
+    ) -> impl Future<Item = Vec<World>, Error = io::Error> {
         let mut worlds = Vec::with_capacity(num);
         for _ in 0..num {
             let id: i32 = thread_rng().gen_range(1, 10_001);
@@ -111,8 +113,9 @@ impl PgConnection {
             .collect()
             .and_then(move |mut worlds| {
                 let mut update = String::with_capacity(120 + 6 * num as usize);
-                update
-                    .push_str("UPDATE world SET randomnumber = temp.randomnumber FROM (VALUES ");
+                update.push_str(
+                    "UPDATE world SET randomnumber = temp.randomnumber FROM (VALUES ",
+                );
 
                 for w in &worlds {
                     update.push_str(&format!("({}, {}),", w.id, w.randomnumber));
@@ -120,16 +123,17 @@ impl PgConnection {
                 worlds.sort_by_key(|w| w.id);
 
                 update.pop();
-                update
-                    .push_str(" ORDER BY 1) AS temp(id, randomnumber) WHERE temp.id = world.id");
+                update.push_str(
+                    " ORDER BY 1) AS temp(id, randomnumber) WHERE temp.id = world.id",
+                );
 
                 cl.batch_execute(&update)
-                  .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
-                  .and_then(|_| Ok(worlds))
+                    .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
+                    .and_then(|_| Ok(worlds))
             })
     }
 
-    pub fn tell_fortune(&self) -> impl Future<Item=Vec<Fortune>, Error=io::Error> {
+    pub fn tell_fortune(&self) -> impl Future<Item = Vec<Fortune>, Error = io::Error> {
         let mut items = Vec::new();
         items.push(Fortune {
             id: 0,
@@ -144,8 +148,7 @@ impl PgConnection {
                     message: row.get(1),
                 });
                 Ok::<_, io::Error>(items)
-            })
-            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
+            }).map_err(|e| io::Error::new(io::ErrorKind::Other, e))
             .and_then(|mut items| {
                 items.sort_by(|it, next| it.message.cmp(&next.message));
                 Ok(items)

+ 3 - 3
frameworks/Rust/actix/src/main.rs

@@ -44,9 +44,9 @@ fn main() {
             .resource("/json", |r| r.f(json))
             .resource("/plaintext", |r| r.f(plaintext))
     }).backlog(8192)
-        .bind("0.0.0.0:8080")
-        .unwrap()
-        .start();
+    .bind("0.0.0.0:8080")
+    .unwrap()
+    .start();
 
     println!("Started http server: 127.0.0.1:8080");
     let _ = sys.run();

+ 7 - 11
frameworks/Rust/actix/src/main_diesel.rs

@@ -50,8 +50,7 @@ fn world_row(req: &HttpRequest<State>) -> FutureResponse<HttpResponse> {
                     .body(body))
             }
             Err(_) => Ok(HttpResponse::InternalServerError().into()),
-        })
-        .responder()
+        }).responder()
 }
 
 fn queries(req: &HttpRequest<State>) -> FutureResponse<HttpResponse> {
@@ -79,8 +78,7 @@ fn queries(req: &HttpRequest<State>) -> FutureResponse<HttpResponse> {
             } else {
                 Ok(HttpResponse::InternalServerError().into())
             }
-        })
-        .responder()
+        }).responder()
 }
 
 fn updates(req: &HttpRequest<State>) -> FutureResponse<HttpResponse> {
@@ -109,8 +107,7 @@ fn updates(req: &HttpRequest<State>) -> FutureResponse<HttpResponse> {
             } else {
                 Ok(HttpResponse::InternalServerError().into())
             }
-        })
-        .responder()
+        }).responder()
 }
 
 #[derive(Template)]
@@ -136,8 +133,7 @@ fn fortune(req: &HttpRequest<State>) -> FutureResponse<HttpResponse> {
                     .body(res))
             }
             Err(_) => Ok(HttpResponse::InternalServerError().into()),
-        })
-        .responder()
+        }).responder()
 }
 
 fn main() {
@@ -164,9 +160,9 @@ fn main() {
             .resource("/queries", |r| r.route().f(queries))
             .resource("/updates", |r| r.route().f(updates))
     }).backlog(8192)
-        .bind("0.0.0.0:8080")
-        .unwrap()
-        .start();
+    .bind("0.0.0.0:8080")
+    .unwrap()
+    .start();
 
     println!("Started http server: 127.0.0.1:8080");
     let _ = sys.run();

+ 7 - 11
frameworks/Rust/actix/src/main_pg.rs

@@ -49,8 +49,7 @@ fn world_row(req: &HttpRequest<State>) -> FutureResponse<HttpResponse> {
                     .body(body))
             }
             Err(_) => Ok(HttpResponse::InternalServerError().into()),
-        })
-        .responder()
+        }).responder()
 }
 
 fn queries(req: &HttpRequest<State>) -> FutureResponse<HttpResponse> {
@@ -75,8 +74,7 @@ fn queries(req: &HttpRequest<State>) -> FutureResponse<HttpResponse> {
             } else {
                 Ok(HttpResponse::InternalServerError().into())
             }
-        })
-        .responder()
+        }).responder()
 }
 
 fn updates(req: &HttpRequest<State>) -> FutureResponse<HttpResponse> {
@@ -100,8 +98,7 @@ fn updates(req: &HttpRequest<State>) -> FutureResponse<HttpResponse> {
             } else {
                 Ok(HttpResponse::InternalServerError().into())
             }
-        })
-        .responder()
+        }).responder()
 }
 
 #[derive(Template)]
@@ -127,8 +124,7 @@ fn fortune(req: &HttpRequest<State>) -> FutureResponse<HttpResponse> {
                     .body(res))
             }
             Err(_) => Ok(HttpResponse::InternalServerError().into()),
-        })
-        .responder()
+        }).responder()
 }
 
 fn main() {
@@ -145,9 +141,9 @@ fn main() {
             .resource("/fortune", |r| r.route().f(fortune))
             .resource("/updates", |r| r.route().f(updates))
     }).backlog(8192)
-        .bind("0.0.0.0:8080")
-        .unwrap()
-        .start();
+    .bind("0.0.0.0:8080")
+    .unwrap()
+    .start();
 
     println!("Started http server: 127.0.0.1:8080");
     let _ = sys.run();

+ 77 - 35
frameworks/Rust/actix/src/main_raw.rs

@@ -13,14 +13,14 @@ extern crate url;
 extern crate diesel;
 extern crate tokio_postgres;
 
-use std::{mem, io};
 use std::cell::RefCell;
-use std::rc::Rc;
 use std::io::Write;
+use std::rc::Rc;
+use std::{io, mem};
 
 use actix::prelude::*;
 use actix_web::server::{
-    self, HttpHandler, HttpHandlerTask, HttpServer, Request, Writer,
+    self, HttpHandler, HttpHandlerTask, HttpServer, KeepAlive, Request, Writer,
 };
 use actix_web::Error;
 use futures::{Async, Future, Poll};
@@ -31,7 +31,7 @@ mod models;
 mod utils;
 
 use db_pg_direct::PgConnection;
-use utils::{Message, StackWriter, Writer as JsonWriter, escape};
+use utils::{escape, Message, StackWriter, Writer as JsonWriter};
 
 const HTTPOK: &[u8] = b"HTTP/1.1 200 OK\r\n";
 const HDR_SERVER: &[u8] = b"Server: Actix\r\n";
@@ -42,6 +42,7 @@ const BODY: &[u8] = b"Hello, World!";
 
 struct App {
     dbs: Rc<RefCell<Vec<PgConnection>>>,
+    useall: bool,
 }
 
 impl HttpHandler for App {
@@ -54,25 +55,51 @@ impl HttpHandler for App {
                 10 if path == "/plaintext" => return Ok(Box::new(Plaintext)),
                 5 if path == "/json" => return Ok(Box::new(Json)),
                 3 if path == "/db" => {
-                    if let Some(db) = thread_rng().choose(&*self.dbs.borrow()) {
-                        return Ok(Box::new(World {fut: Box::new(db.get_world())}))
+                    if self.useall {
+                        if let Some(db) = thread_rng().choose(&*self.dbs.borrow()) {
+                            return Ok(Box::new(World {
+                                fut: db.get_world(),
+                            }));
+                        }
+                    } else {
+                        return Ok(Box::new(World {
+                            fut: self.dbs.borrow()[0].get_world(),
+                        }));
                     }
                 }
                 8 if path == "/fortune" => {
                     if let Some(db) = thread_rng().choose(&*self.dbs.borrow()) {
-                        return Ok(Box::new(Fortune {fut: Box::new(db.tell_fortune())}));
+                        return Ok(Box::new(Fortune {
+                            fut: db.tell_fortune(),
+                        }));
                     }
                 }
                 8 if path == "/queries" => {
                     let q = utils::get_query_param(req.uri());
-                    if let Some(db) = thread_rng().choose(&*self.dbs.borrow()) {
-                        return Ok(Box::new(Queries {fut: Box::new(db.get_worlds(q as usize))}));
+                    if self.useall {
+                        if let Some(db) = thread_rng().choose(&*self.dbs.borrow()) {
+                            return Ok(Box::new(Queries {
+                                fut: db.get_worlds(q as usize),
+                            }));
+                        }
+                    } else {
+                        return Ok(Box::new(Queries {
+                            fut: self.dbs.borrow()[0].get_worlds(q as usize),
+                        }));
                     }
                 }
                 8 if path == "/updates" => {
                     let q = utils::get_query_param(req.uri());
-                    if let Some(db) = thread_rng().choose(&*self.dbs.borrow()) {
-                        return Ok(Box::new(Updates {fut: Box::new(db.update(q as usize))}));
+                    if self.useall {
+                        if let Some(db) = thread_rng().choose(&*self.dbs.borrow()) {
+                            return Ok(Box::new(Updates {
+                                fut: db.update(q as usize),
+                            }));
+                        }
+                    } else {
+                        return Ok(Box::new(Updates {
+                            fut: self.dbs.borrow()[0].update(q as usize),
+                        }));
                     }
                 }
                 _ => (),
@@ -122,8 +149,8 @@ impl HttpHandlerTask for Json {
     }
 }
 
-struct Fortune {
-    fut: Box<Future<Item=Vec<models::Fortune>, Error=io::Error>>,
+struct Fortune<F> {
+    fut: F,
 }
 
 const FORTUNES_START: &[u8] = b"<!DOCTYPE html><html><head><title>Fortunes</title></head><body><table><tr><th>id</th><th>message</th></tr>";
@@ -132,8 +159,10 @@ const FORTUNES_COLUMN: &[u8] = b"</td><td>";
 const FORTUNES_ROW_END: &[u8] = b"</td></tr>";
 const FORTUNES_END: &[u8] = b"</table></body></html>";
 
-
-impl HttpHandlerTask for Fortune {
+impl<F> HttpHandlerTask for Fortune<F>
+where
+    F: Future<Item = Vec<models::Fortune>, Error = io::Error>,
+{
     fn poll_io(&mut self, io: &mut Writer) -> Poll<bool, Error> {
         match self.fut.poll() {
             Ok(Async::Ready(rows)) => {
@@ -170,11 +199,14 @@ impl HttpHandlerTask for Fortune {
     }
 }
 
-struct World {
-    fut: Box<Future<Item=models::World, Error=io::Error>>,
+struct World<F> {
+    fut: F,
 }
 
-impl HttpHandlerTask for World {
+impl<F> HttpHandlerTask for World<F>
+where
+    F: Future<Item = models::World, Error = io::Error>,
+{
     fn poll_io(&mut self, io: &mut Writer) -> Poll<bool, Error> {
         match self.fut.poll() {
             Ok(Async::Ready(row)) => {
@@ -203,11 +235,14 @@ impl HttpHandlerTask for World {
     }
 }
 
-struct Queries {
-    fut: Box<Future<Item=Vec<models::World>, Error=io::Error>>,
+struct Queries<F> {
+    fut: F,
 }
 
-impl HttpHandlerTask for Queries {
+impl<F> HttpHandlerTask for Queries<F>
+where
+    F: Future<Item = Vec<models::World>, Error = io::Error>,
+{
     fn poll_io(&mut self, io: &mut Writer) -> Poll<bool, Error> {
         match self.fut.poll() {
             Ok(Async::Ready(worlds)) => {
@@ -236,11 +271,14 @@ impl HttpHandlerTask for Queries {
     }
 }
 
-struct Updates {
-    fut: Box<Future<Item=Vec<models::World>, Error=io::Error>>,
+struct Updates<F> {
+    fut: F,
 }
 
-impl HttpHandlerTask for Updates {
+impl<F> HttpHandlerTask for Updates<F>
+where
+    F: Future<Item = Vec<models::World>, Error = io::Error>,
+{
     fn poll_io(&mut self, io: &mut Writer) -> Poll<bool, Error> {
         match self.fut.poll() {
             Ok(Async::Ready(worlds)) => {
@@ -277,19 +315,23 @@ fn main() {
     HttpServer::new(move || {
         let dbs = Rc::new(RefCell::new(Vec::new()));
 
-        let db = dbs.clone();
-        Arbiter::spawn(
-            PgConnection::connect(db_url)
-                .and_then(move |conn| {
-                    db.borrow_mut().push(conn);
-                    Ok(())
-                }));
+        for _ in 0..3 {
+            let db = dbs.clone();
+            Arbiter::spawn(PgConnection::connect(db_url).and_then(move |conn| {
+                db.borrow_mut().push(conn);
+                Ok(())
+            }));
+        }
 
-        vec![App { dbs }]
+        vec![App {
+            dbs,
+            useall: num_cpus::get() > 4,
+        }]
     }).backlog(8192)
-        .bind("0.0.0.0:8080")
-        .unwrap()
-        .start();
+    .keep_alive(KeepAlive::Os)
+    .bind("0.0.0.0:8080")
+    .unwrap()
+    .start();
 
     println!("Started http server: 127.0.0.1:8080");
     let _ = sys.run();

+ 1 - 1
frameworks/Rust/actix/src/utils.rs

@@ -110,4 +110,4 @@ pub fn escape<T: io::Write>(writer: &mut T, s: String) {
     if last_pos < bytes.len() - 1 {
         let _ = writer.write(&bytes[last_pos..]);
     }
-}
+}

Some files were not shown because too many files changed in this diff