Browse Source

Use single connection for db bench (#4654)

Nikolay Kim 6 years ago
parent
commit
dfc9bbbf36

+ 3 - 3
frameworks/Rust/actix/Cargo.toml

@@ -20,9 +20,9 @@ name = "actix-pg"
 path = "src/main_pg.rs"
 path = "src/main_pg.rs"
 
 
 [dependencies]
 [dependencies]
-actix = { version="0.8.0-alpha.2", features=["http"] }
-actix-web = { version="1.0.0-alpha.4", default-features = false }
-actix-http = { version="0.1.0-alpha.4", default-features = false }
+actix = { version="0.8.0-alpha.3", features=["http"] }
+actix-web = { version="1.0.0-alpha.5", default-features = false }
+actix-http = { version="0.1.0-alpha.5", default-features = false }
 actix-rt = "0.2.2"
 actix-rt = "0.2.2"
 actix-server = "0.4.2"
 actix-server = "0.4.2"
 actix-service = "0.3.6"
 actix-service = "0.3.6"

+ 29 - 19
frameworks/Rust/actix/src/db_pg_direct.rs

@@ -1,11 +1,14 @@
 use std::io;
 use std::io;
 
 
+use actix_http::Error;
+use bytes::{Bytes, BytesMut};
 use futures::future::join_all;
 use futures::future::join_all;
 use futures::{stream, Future, Stream};
 use futures::{stream, Future, Stream};
 use rand::{thread_rng, Rng, ThreadRng};
 use rand::{thread_rng, Rng, ThreadRng};
 use tokio_postgres::{connect, Client, NoTls, Statement};
 use tokio_postgres::{connect, Client, NoTls, Statement};
 
 
 use crate::models::{Fortune, World};
 use crate::models::{Fortune, World};
+use crate::utils::{Writer, SIZE};
 
 
 /// Postgres interface
 /// Postgres interface
 pub struct PgConnection {
 pub struct PgConnection {
@@ -43,19 +46,27 @@ impl PgConnection {
 }
 }
 
 
 impl PgConnection {
 impl PgConnection {
-    pub fn get_world(&mut self) -> impl Future<Item = World, Error = io::Error> {
+    pub fn get_world(&mut self) -> impl Future<Item = Bytes, Error = Error> {
         let random_id = self.rng.gen_range::<i32>(1, 10_001);
         let random_id = self.rng.gen_range::<i32>(1, 10_001);
 
 
         self.cl
         self.cl
             .query(&self.world, &[&random_id])
             .query(&self.world, &[&random_id])
             .into_future()
             .into_future()
-            .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{:?}", e.0)))
-            .and_then(|(row, _)| {
+            .map_err(|e| {
+                Error::from(io::Error::new(io::ErrorKind::Other, format!("{:?}", e.0)))
+            })
+            .map(|(row, _)| {
                 let row = row.unwrap();
                 let row = row.unwrap();
-                Ok(World {
-                    id: row.get(0),
-                    randomnumber: row.get(1),
-                })
+                let mut body = BytesMut::with_capacity(SIZE);
+                serde_json::to_writer(
+                    Writer(&mut body),
+                    &World {
+                        id: row.get(0),
+                        randomnumber: row.get(1),
+                    },
+                )
+                .unwrap();
+                body.freeze()
             })
             })
     }
     }
 
 
@@ -73,12 +84,12 @@ impl PgConnection {
                     .map_err(|e| {
                     .map_err(|e| {
                         io::Error::new(io::ErrorKind::Other, format!("{:?}", e.0))
                         io::Error::new(io::ErrorKind::Other, format!("{:?}", e.0))
                     })
                     })
-                    .and_then(|(row, _)| {
+                    .map(|(row, _)| {
                         let row = row.unwrap();
                         let row = row.unwrap();
-                        Ok(World {
+                        World {
                             id: row.get(0),
                             id: row.get(0),
                             randomnumber: row.get(1),
                             randomnumber: row.get(1),
-                        })
+                        }
                     }),
                     }),
             );
             );
         }
         }
@@ -101,12 +112,12 @@ impl PgConnection {
                     .map_err(|e| {
                     .map_err(|e| {
                         io::Error::new(io::ErrorKind::Other, format!("{:?}", e.0))
                         io::Error::new(io::ErrorKind::Other, format!("{:?}", e.0))
                     })
                     })
-                    .and_then(move |(row, _)| {
+                    .map(move |(row, _)| {
                         let row = row.unwrap();
                         let row = row.unwrap();
-                        Ok(World {
+                        World {
                             id: row.get(0),
                             id: row.get(0),
                             randomnumber: id,
                             randomnumber: id,
-                        })
+                        }
                     }),
                     }),
             );
             );
         }
         }
@@ -133,18 +144,17 @@ impl PgConnection {
                     .map_err(|e| {
                     .map_err(|e| {
                         io::Error::new(io::ErrorKind::Other, format!("{:?}", e))
                         io::Error::new(io::ErrorKind::Other, format!("{:?}", e))
                     })
                     })
-                    .and_then(|_| Ok(worlds))
+                    .map(|_| worlds)
             })
             })
     }
     }
 
 
     pub fn tell_fortune(
     pub fn tell_fortune(
         &mut self,
         &mut self,
     ) -> impl Future<Item = Vec<Fortune>, Error = io::Error> {
     ) -> impl Future<Item = Vec<Fortune>, Error = io::Error> {
-        let mut items = Vec::new();
-        items.push(Fortune {
+        let items = vec![Fortune {
             id: 0,
             id: 0,
             message: "Additional fortune added at request time.".to_string(),
             message: "Additional fortune added at request time.".to_string(),
-        });
+        }];
 
 
         self.cl
         self.cl
             .query(&self.fortune, &[])
             .query(&self.fortune, &[])
@@ -157,9 +167,9 @@ impl PgConnection {
                 Ok::<_, io::Error>(items)
                 Ok::<_, io::Error>(items)
             })
             })
             .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{:?}", e)))
             .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{:?}", e)))
-            .and_then(|mut items| {
+            .map(|mut items| {
                 items.sort_by(|it, next| it.message.cmp(&next.message));
                 items.sort_by(|it, next| it.message.cmp(&next.message));
-                Ok(items)
+                items
             })
             })
     }
     }
 }
 }

+ 15 - 20
frameworks/Rust/actix/src/main_platform.rs

@@ -17,6 +17,7 @@ use actix_service::{NewService, Service};
 use bytes::{BufMut, Bytes, BytesMut};
 use bytes::{BufMut, Bytes, BytesMut};
 use futures::future::{join_all, ok, Either, FutureResult};
 use futures::future::{join_all, ok, Either, FutureResult};
 use futures::{Async, Future, Poll};
 use futures::{Async, Future, Poll};
+use serde_json::to_writer;
 
 
 mod db_pg_direct;
 mod db_pg_direct;
 mod models;
 mod models;
@@ -45,7 +46,6 @@ struct App {
 enum Db {
 enum Db {
     All,
     All,
     Multi,
     Multi,
-    Update,
 }
 }
 
 
 impl App {
 impl App {
@@ -56,7 +56,6 @@ impl App {
                     self.next = (self.next + 1) % 5;
                     self.next = (self.next + 1) % 5;
                     &mut self.dbs[self.next]
                     &mut self.dbs[self.next]
                 }
                 }
-                Db::Update => &mut self.dbs[0],
                 Db::Multi => {
                 Db::Multi => {
                     self.next = (self.next + 1) % 2;
                     self.next = (self.next + 1) % 2;
                     &mut self.dbs[self.next]
                     &mut self.dbs[self.next]
@@ -100,8 +99,7 @@ impl Service for App {
                     message: "Hello, World!",
                     message: "Hello, World!",
                 };
                 };
                 let mut body = BytesMut::with_capacity(SIZE);
                 let mut body = BytesMut::with_capacity(SIZE);
-                serde_json::to_writer(Writer(&mut body), &message).unwrap();
-
+                to_writer(Writer(&mut body), &message).unwrap();
                 let mut res =
                 let mut res =
                     Response::with_body(StatusCode::OK, Body::Bytes(body.freeze()));
                     Response::with_body(StatusCode::OK, Body::Bytes(body.freeze()));
                 let hdrs = res.headers_mut();
                 let hdrs = res.headers_mut();
@@ -110,19 +108,16 @@ impl Service for App {
                 Either::A(ok(res))
                 Either::A(ok(res))
             }
             }
             3 if path == "/db" => {
             3 if path == "/db" => {
-                let fut = self.get_db(Db::All).get_world();
+                let fut = self.dbs[0].get_world();
                 let h_srv = self.hdr_srv.clone();
                 let h_srv = self.hdr_srv.clone();
                 let h_ct = self.hdr_ctjson.clone();
                 let h_ct = self.hdr_ctjson.clone();
 
 
-                Either::B(Box::new(fut.from_err().and_then(move |row| {
-                    let mut body = BytesMut::with_capacity(31);
-                    serde_json::to_writer(Writer(&mut body), &row).unwrap();
-                    let mut res =
-                        Response::with_body(StatusCode::OK, Body::Bytes(body.freeze()));
+                Either::B(Box::new(fut.map(move |body| {
+                    let mut res = Response::with_body(StatusCode::OK, Body::Bytes(body));
                     let hdrs = res.headers_mut();
                     let hdrs = res.headers_mut();
                     hdrs.insert(SERVER, h_srv);
                     hdrs.insert(SERVER, h_srv);
                     hdrs.insert(CONTENT_TYPE, h_ct);
                     hdrs.insert(CONTENT_TYPE, h_ct);
-                    Ok(res)
+                    res
                 })))
                 })))
             }
             }
             8 if path == "/fortune" => {
             8 if path == "/fortune" => {
@@ -130,7 +125,7 @@ impl Service for App {
                 let h_srv = self.hdr_srv.clone();
                 let h_srv = self.hdr_srv.clone();
                 let h_ct = self.hdr_cthtml.clone();
                 let h_ct = self.hdr_cthtml.clone();
 
 
-                Either::B(Box::new(fut.from_err().and_then(move |fortunes| {
+                Either::B(Box::new(fut.from_err().map(move |fortunes| {
                     let mut body = BytesMut::with_capacity(2048);
                     let mut body = BytesMut::with_capacity(2048);
                     let mut writer = Writer(&mut body);
                     let mut writer = Writer(&mut body);
                     let _ = writer.0.put_slice(FORTUNES_START);
                     let _ = writer.0.put_slice(FORTUNES_START);
@@ -152,7 +147,7 @@ impl Service for App {
                     let hdrs = res.headers_mut();
                     let hdrs = res.headers_mut();
                     hdrs.insert(SERVER, h_srv);
                     hdrs.insert(SERVER, h_srv);
                     hdrs.insert(CONTENT_TYPE, h_ct);
                     hdrs.insert(CONTENT_TYPE, h_ct);
-                    Ok(res)
+                    res
                 })))
                 })))
             }
             }
             8 if path == "/queries" => {
             8 if path == "/queries" => {
@@ -161,32 +156,32 @@ impl Service for App {
                 let h_srv = self.hdr_srv.clone();
                 let h_srv = self.hdr_srv.clone();
                 let h_ct = self.hdr_ctjson.clone();
                 let h_ct = self.hdr_ctjson.clone();
 
 
-                Either::B(Box::new(fut.from_err().and_then(move |worlds| {
+                Either::B(Box::new(fut.from_err().map(move |worlds| {
                     let mut body = BytesMut::with_capacity(35 * worlds.len());
                     let mut body = BytesMut::with_capacity(35 * worlds.len());
-                    serde_json::to_writer(Writer(&mut body), &worlds).unwrap();
+                    to_writer(Writer(&mut body), &worlds).unwrap();
                     let mut res =
                     let mut res =
                         Response::with_body(StatusCode::OK, Body::Bytes(body.freeze()));
                         Response::with_body(StatusCode::OK, Body::Bytes(body.freeze()));
                     let hdrs = res.headers_mut();
                     let hdrs = res.headers_mut();
                     hdrs.insert(SERVER, h_srv);
                     hdrs.insert(SERVER, h_srv);
                     hdrs.insert(CONTENT_TYPE, h_ct);
                     hdrs.insert(CONTENT_TYPE, h_ct);
-                    Ok(res)
+                    res
                 })))
                 })))
             }
             }
             8 if path == "/updates" => {
             8 if path == "/updates" => {
                 let q = utils::get_query_param(req.uri().query().unwrap_or("")) as usize;
                 let q = utils::get_query_param(req.uri().query().unwrap_or("")) as usize;
-                let fut = self.get_db(Db::Update).update(q);
+                let fut = self.dbs[0].update(q);
                 let h_srv = self.hdr_srv.clone();
                 let h_srv = self.hdr_srv.clone();
                 let h_ct = self.hdr_ctjson.clone();
                 let h_ct = self.hdr_ctjson.clone();
 
 
-                Either::B(Box::new(fut.from_err().and_then(move |worlds| {
+                Either::B(Box::new(fut.from_err().map(move |worlds| {
                     let mut body = BytesMut::with_capacity(35 * worlds.len());
                     let mut body = BytesMut::with_capacity(35 * worlds.len());
-                    serde_json::to_writer(Writer(&mut body), &worlds).unwrap();
+                    to_writer(Writer(&mut body), &worlds).unwrap();
                     let mut res =
                     let mut res =
                         Response::with_body(StatusCode::OK, Body::Bytes(body.freeze()));
                         Response::with_body(StatusCode::OK, Body::Bytes(body.freeze()));
                     let hdrs = res.headers_mut();
                     let hdrs = res.headers_mut();
                     hdrs.insert(SERVER, h_srv);
                     hdrs.insert(SERVER, h_srv);
                     hdrs.insert(CONTENT_TYPE, h_ct);
                     hdrs.insert(CONTENT_TYPE, h_ct);
-                    Ok(res)
+                    res
                 })))
                 })))
             }
             }
             _ => Either::A(ok(Response::new(http::StatusCode::NOT_FOUND))),
             _ => Either::A(ok(Response::new(http::StatusCode::NOT_FOUND))),