Forráskód Böngészése

Actix: optimizae async pg connection (#3987)

* optimizae async pg connection

* remove pre-allocation

* use rust 1.28
Nikolay Kim 7 éve
szülő
commit
83f9cc045e

A különbségek nem kerülnek megjelenítésre, a fájl túl nagy
+ 181 - 180
frameworks/Rust/actix/Cargo.lock


+ 1 - 1
frameworks/Rust/actix/Cargo.toml

@@ -36,7 +36,7 @@ actix = "0.7"
 actix-web = { version="0.7", default-features = false }
 actix-web = { version="0.7", default-features = false }
 
 
 phf = "0.7.22"
 phf = "0.7.22"
-tokio-postgres = { git="https://github.com/sfackler/rust-postgres.git" }
+tokio-postgres = { git="https://github.com/fafhrd91/rust-postgres.git" }
 
 
 [patch.crates-io]
 [patch.crates-io]
 tokio-io = { git="https://github.com/tokio-rs/tokio.git" }
 tokio-io = { git="https://github.com/tokio-rs/tokio.git" }

+ 1 - 1
frameworks/Rust/actix/actix-diesel.dockerfile

@@ -1,4 +1,4 @@
-FROM rust:1.27
+FROM rust:1.28
 
 
 ADD ./ /actix
 ADD ./ /actix
 WORKDIR /actix
 WORKDIR /actix

+ 1 - 1
frameworks/Rust/actix/actix-pg.dockerfile

@@ -1,4 +1,4 @@
-FROM rust:1.27
+FROM rust:1.28
 
 
 ADD ./ /actix
 ADD ./ /actix
 WORKDIR /actix
 WORKDIR /actix

+ 1 - 1
frameworks/Rust/actix/actix-raw.dockerfile

@@ -1,4 +1,4 @@
-FROM rust:1.27
+FROM rust:1.28
 
 
 ADD ./ /actix
 ADD ./ /actix
 WORKDIR /actix
 WORKDIR /actix

+ 1 - 1
frameworks/Rust/actix/actix.dockerfile

@@ -1,4 +1,4 @@
-FROM rust:1.27
+FROM rust:1.28
 
 
 ADD ./ /actix
 ADD ./ /actix
 WORKDIR /actix
 WORKDIR /actix

+ 1 - 1
frameworks/Rust/actix/src/db_pg.rs

@@ -36,7 +36,7 @@ impl PgConnection {
 
 
             hs.map_err(|_| panic!("can not connect to postgresql"))
             hs.map_err(|_| panic!("can not connect to postgresql"))
                 .into_actor(&act)
                 .into_actor(&act)
-                .and_then(|(mut cl, conn), act, ctx| {
+                .and_then(|(cl, conn), act, ctx| {
                     ctx.wait(
                     ctx.wait(
                         cl.prepare("SELECT id, message FROM fortune")
                         cl.prepare("SELECT id, message FROM fortune")
                             .map_err(|_| ())
                             .map_err(|_| ())

+ 154 - 0
frameworks/Rust/actix/src/db_pg_direct.rs

@@ -0,0 +1,154 @@
+use std::io;
+
+use actix::prelude::*;
+use futures::{stream, Future, Stream};
+use rand::{thread_rng, Rng};
+use tokio_postgres::{connect, Client, Statement, TlsMode};
+
+use models::{Fortune, World};
+
+/// Postgres interface
+pub struct PgConnection {
+    cl: Client,
+    fortune: Statement,
+    world: Statement,
+    update: Statement,
+}
+
+impl PgConnection {
+    pub fn connect(db_url: &str) -> impl Future<Item=PgConnection, Error=()> {
+        let hs = connect(db_url.parse().unwrap(), TlsMode::None);
+
+        hs.map_err(|_| panic!("can not connect to postgresql"))
+            .and_then(|(cl, conn)| {
+                Arbiter::spawn(conn.map_err(|e| panic!("{}", e)));
+                cl.prepare("SELECT id, message FROM fortune")
+                    .map_err(|_| ())
+                    .and_then(move |fortune| {
+                        cl.prepare("SELECT id, randomnumber FROM world WHERE id=$1")
+                            .map_err(|_| ())
+                            .and_then(move |world| {
+                                let st = (fortune, world);
+                                cl.prepare("SELECT id FROM world WHERE id=$1")
+                                    .map_err(|_| ())
+                                    .and_then(|update| {
+
+                                        Ok(PgConnection {
+                                            cl,
+                                            fortune: st.0,
+                                            world: st.1,
+                                            update
+                                        })
+                                    })
+                            })
+                    })
+            })
+    }
+}
+
+impl PgConnection {
+
+    pub fn get_world(&self) -> impl Future<Item=World, Error=io::Error> {
+        let random_id = thread_rng().gen_range::<i32>(1, 10_000);
+
+        self.cl
+            .query(&self.world, &[&random_id])
+            .into_future()
+            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.0))
+            .and_then(|(row, _)| {
+                let row = row.unwrap();
+                Ok(World {
+                    id: row.get(0),
+                    randomnumber: row.get(1),
+                })
+            })
+    }
+
+    pub fn get_worlds(&self, num: usize) -> impl Future<Item=Vec<World>, Error=io::Error> {
+        let mut worlds = Vec::with_capacity(num);
+        for _ in 0..num {
+            let w_id: i32 = thread_rng().gen_range(1, 10_000);
+            worlds.push(
+                self.cl
+                    .query(&self.world, &[&w_id])
+                    .into_future()
+                    .map_err(|e| io::Error::new(io::ErrorKind::Other, e.0))
+                    .and_then(|(row, _)| {
+                        let row = row.unwrap();
+                        Ok(World {
+                            id: row.get(0),
+                            randomnumber: row.get(1),
+                        })
+                    }),
+            );
+        }
+
+        stream::futures_unordered(worlds).collect()
+    }
+
+    pub fn update(&self, num: usize) -> impl Future<Item=Vec<World>, Error=io::Error> {
+        let mut worlds = Vec::with_capacity(num);
+        for _ in 0..num {
+            let id: i32 = thread_rng().gen_range(1, 10_000);
+            let w_id: i32 =thread_rng().gen_range(1, 10_000);
+            worlds.push(
+                self.cl
+                    .query(&self.update, &[&w_id])
+                    .into_future()
+                    .map_err(|e| io::Error::new(io::ErrorKind::Other, e.0))
+                    .and_then(move |(row, _)| {
+                        let row = row.unwrap();
+                        Ok(World {
+                            id: row.get(0),
+                            randomnumber: id,
+                        })
+                    }),
+            );
+        }
+
+        let cl = self.cl.clone();
+        stream::futures_unordered(worlds)
+            .collect()
+            .and_then(move |mut worlds| {
+                let mut update = String::with_capacity(120 + 6 * num as usize);
+                update
+                    .push_str("UPDATE world SET randomnumber = temp.randomnumber FROM (VALUES ");
+
+                for w in &worlds {
+                    update.push_str(&format!("({}, {}),", w.id, w.randomnumber));
+                }
+                worlds.sort_by_key(|w| w.id);
+
+                update.pop();
+                update
+                    .push_str(" ORDER BY 1) AS temp(id, randomnumber) WHERE temp.id = world.id");
+
+                cl.batch_execute(&update)
+                  .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
+                  .and_then(|_| Ok(worlds))
+            })
+    }
+
+    pub fn tell_fortune(&self) -> impl Future<Item=Vec<Fortune>, Error=io::Error> {
+        let mut items = Vec::new();
+        items.push(Fortune {
+            id: 0,
+            message: "Additional fortune added at request time.".to_string(),
+        });
+
+        self.cl
+            .query(&self.fortune, &[])
+            .fold(items, move |mut items, row| {
+                items.push(Fortune {
+                    id: row.get(0),
+                    message: row.get(1),
+                });
+                Ok::<_, io::Error>(items)
+            })
+            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
+            .and_then(|mut items| {
+                items.sort_by(|it, next| it.message.cmp(&next.message));
+                Ok(items)
+            })
+    }
+}

+ 39 - 30
frameworks/Rust/actix/src/main_raw.rs

@@ -15,7 +15,9 @@ extern crate url;
 extern crate diesel;
 extern crate diesel;
 extern crate tokio_postgres;
 extern crate tokio_postgres;
 
 
-use std::mem;
+use std::{mem, io};
+use std::cell::RefCell;
+use std::rc::Rc;
 
 
 use actix::prelude::*;
 use actix::prelude::*;
 use actix_web::server::{
 use actix_web::server::{
@@ -24,12 +26,13 @@ use actix_web::server::{
 use actix_web::Error;
 use actix_web::Error;
 use askama::Template;
 use askama::Template;
 use futures::{Async, Future, Poll};
 use futures::{Async, Future, Poll};
+use rand::{thread_rng, Rng};
 
 
-mod db_pg;
+mod db_pg_direct;
 mod models;
 mod models;
 mod utils;
 mod utils;
 
 
-use db_pg::{PgConnection, RandomWorld, RandomWorlds, TellFortune, UpdateWorld};
+use db_pg_direct::PgConnection;
 use utils::{Message, StackWriter, Writer as JsonWriter};
 use utils::{Message, StackWriter, Writer as JsonWriter};
 
 
 const HTTPOK: &[u8] = b"HTTP/1.1 200 OK\r\n";
 const HTTPOK: &[u8] = b"HTTP/1.1 200 OK\r\n";
@@ -40,7 +43,7 @@ const HDR_CTHTML: &[u8] = b"Content-Type: text/html; charset=utf-8";
 const BODY: &[u8] = b"Hello, World!";
 const BODY: &[u8] = b"Hello, World!";
 
 
 struct App {
 struct App {
-    db: Addr<PgConnection>,
+    dbs: Rc<RefCell<Vec<PgConnection>>>,
 }
 }
 
 
 impl HttpHandler for App {
 impl HttpHandler for App {
@@ -53,26 +56,26 @@ impl HttpHandler for App {
                 10 if path == "/plaintext" => return Ok(Box::new(Plaintext)),
                 10 if path == "/plaintext" => return Ok(Box::new(Plaintext)),
                 5 if path == "/json" => return Ok(Box::new(Json)),
                 5 if path == "/json" => return Ok(Box::new(Json)),
                 3 if path == "/db" => {
                 3 if path == "/db" => {
-                    return Ok(Box::new(World {
-                        fut: self.db.send(RandomWorld),
-                    }))
+                    if let Some(db) = thread_rng().choose(&*self.dbs.borrow()) {
+                        return Ok(Box::new(World {fut: Box::new(db.get_world())}))
+                    }
                 }
                 }
                 8 if path == "/fortune" => {
                 8 if path == "/fortune" => {
-                    return Ok(Box::new(Fortune {
-                        fut: self.db.send(TellFortune),
-                    }));
+                    if let Some(db) = thread_rng().choose(&*self.dbs.borrow()) {
+                        return Ok(Box::new(Fortune {fut: Box::new(db.tell_fortune())}));
+                    }
                 }
                 }
                 8 if path == "/queries" => {
                 8 if path == "/queries" => {
                     let q = utils::get_query_param(req.uri());
                     let q = utils::get_query_param(req.uri());
-                    return Ok(Box::new(Queries {
-                        fut: self.db.send(RandomWorlds(q)),
-                    }));
+                    if let Some(db) = thread_rng().choose(&*self.dbs.borrow()) {
+                        return Ok(Box::new(Queries {fut: Box::new(db.get_worlds(q as usize))}));
+                    }
                 }
                 }
                 8 if path == "/updates" => {
                 8 if path == "/updates" => {
                     let q = utils::get_query_param(req.uri());
                     let q = utils::get_query_param(req.uri());
-                    return Ok(Box::new(Updates {
-                        fut: self.db.send(UpdateWorld(q)),
-                    }));
+                    if let Some(db) = thread_rng().choose(&*self.dbs.borrow()) {
+                        return Ok(Box::new(Updates {fut: Box::new(db.update(q as usize))}));
+                    }
                 }
                 }
                 _ => (),
                 _ => (),
             }
             }
@@ -122,7 +125,7 @@ impl HttpHandlerTask for Json {
 }
 }
 
 
 struct Fortune {
 struct Fortune {
-    fut: actix::dev::Request<PgConnection, TellFortune>,
+    fut: Box<Future<Item=Vec<models::Fortune>, Error=io::Error>>,
 }
 }
 
 
 #[derive(Template)]
 #[derive(Template)]
@@ -134,7 +137,7 @@ struct FortuneTemplate<'a> {
 impl HttpHandlerTask for Fortune {
 impl HttpHandlerTask for Fortune {
     fn poll_io(&mut self, io: &mut Writer) -> Poll<bool, Error> {
     fn poll_io(&mut self, io: &mut Writer) -> Poll<bool, Error> {
         match self.fut.poll() {
         match self.fut.poll() {
-            Ok(Async::Ready(Ok(rows))) => {
+            Ok(Async::Ready(rows)) => {
                 let mut body: [u8; 2048] = unsafe { mem::uninitialized() };
                 let mut body: [u8; 2048] = unsafe { mem::uninitialized() };
                 let len = {
                 let len = {
                     let mut writer = StackWriter(&mut body, 0);
                     let mut writer = StackWriter(&mut body, 0);
@@ -156,20 +159,19 @@ impl HttpHandlerTask for Fortune {
                 Ok(Async::Ready(true))
                 Ok(Async::Ready(true))
             }
             }
             Ok(Async::NotReady) => Ok(Async::NotReady),
             Ok(Async::NotReady) => Ok(Async::NotReady),
-            Ok(Async::Ready(Err(e))) => Err(e.into()),
             Err(e) => Err(e.into()),
             Err(e) => Err(e.into()),
         }
         }
     }
     }
 }
 }
 
 
 struct World {
 struct World {
-    fut: actix::dev::Request<PgConnection, RandomWorld>,
+    fut: Box<Future<Item=models::World, Error=io::Error>>,
 }
 }
 
 
 impl HttpHandlerTask for World {
 impl HttpHandlerTask for World {
     fn poll_io(&mut self, io: &mut Writer) -> Poll<bool, Error> {
     fn poll_io(&mut self, io: &mut Writer) -> Poll<bool, Error> {
         match self.fut.poll() {
         match self.fut.poll() {
-            Ok(Async::Ready(Ok(row))) => {
+            Ok(Async::Ready(row)) => {
                 let mut body: [u8; 48] = unsafe { mem::uninitialized() };
                 let mut body: [u8; 48] = unsafe { mem::uninitialized() };
                 let len = {
                 let len = {
                     let mut writer = StackWriter(&mut body, 0);
                     let mut writer = StackWriter(&mut body, 0);
@@ -190,20 +192,19 @@ impl HttpHandlerTask for World {
                 Ok(Async::Ready(true))
                 Ok(Async::Ready(true))
             }
             }
             Ok(Async::NotReady) => Ok(Async::NotReady),
             Ok(Async::NotReady) => Ok(Async::NotReady),
-            Ok(Async::Ready(Err(e))) => Err(e.into()),
             Err(e) => Err(e.into()),
             Err(e) => Err(e.into()),
         }
         }
     }
     }
 }
 }
 
 
 struct Queries {
 struct Queries {
-    fut: actix::dev::Request<PgConnection, RandomWorlds>,
+    fut: Box<Future<Item=Vec<models::World>, Error=io::Error>>,
 }
 }
 
 
 impl HttpHandlerTask for Queries {
 impl HttpHandlerTask for Queries {
     fn poll_io(&mut self, io: &mut Writer) -> Poll<bool, Error> {
     fn poll_io(&mut self, io: &mut Writer) -> Poll<bool, Error> {
         match self.fut.poll() {
         match self.fut.poll() {
-            Ok(Async::Ready(Ok(worlds))) => {
+            Ok(Async::Ready(worlds)) => {
                 let mut body: [u8; 24576] = unsafe { mem::uninitialized() };
                 let mut body: [u8; 24576] = unsafe { mem::uninitialized() };
                 let len = {
                 let len = {
                     let mut writer = StackWriter(&mut body, 0);
                     let mut writer = StackWriter(&mut body, 0);
@@ -224,20 +225,19 @@ impl HttpHandlerTask for Queries {
                 Ok(Async::Ready(true))
                 Ok(Async::Ready(true))
             }
             }
             Ok(Async::NotReady) => Ok(Async::NotReady),
             Ok(Async::NotReady) => Ok(Async::NotReady),
-            Ok(Async::Ready(Err(e))) => Err(e.into()),
             Err(e) => Err(e.into()),
             Err(e) => Err(e.into()),
         }
         }
     }
     }
 }
 }
 
 
 struct Updates {
 struct Updates {
-    fut: actix::dev::Request<PgConnection, UpdateWorld>,
+    fut: Box<Future<Item=Vec<models::World>, Error=io::Error>>,
 }
 }
 
 
 impl HttpHandlerTask for Updates {
 impl HttpHandlerTask for Updates {
     fn poll_io(&mut self, io: &mut Writer) -> Poll<bool, Error> {
     fn poll_io(&mut self, io: &mut Writer) -> Poll<bool, Error> {
         match self.fut.poll() {
         match self.fut.poll() {
-            Ok(Async::Ready(Ok(worlds))) => {
+            Ok(Async::Ready(worlds)) => {
                 let mut body: [u8; 24576] = unsafe { mem::uninitialized() };
                 let mut body: [u8; 24576] = unsafe { mem::uninitialized() };
                 let len = {
                 let len = {
                     let mut writer = StackWriter(&mut body, 0);
                     let mut writer = StackWriter(&mut body, 0);
@@ -258,7 +258,6 @@ impl HttpHandlerTask for Updates {
                 Ok(Async::Ready(true))
                 Ok(Async::Ready(true))
             }
             }
             Ok(Async::NotReady) => Ok(Async::NotReady),
             Ok(Async::NotReady) => Ok(Async::NotReady),
-            Ok(Async::Ready(Err(e))) => Err(e.into()),
             Err(e) => Err(e.into()),
             Err(e) => Err(e.into()),
         }
         }
     }
     }
@@ -270,9 +269,19 @@ fn main() {
 
 
     // start http server
     // start http server
     HttpServer::new(move || {
     HttpServer::new(move || {
-        let db = PgConnection::connect(db_url);
+        let dbs = Rc::new(RefCell::new(Vec::new()));
+
+        for _ in 0..3 {
+            let db = dbs.clone();
+            Arbiter::spawn(
+                PgConnection::connect(db_url)
+                    .and_then(move |conn| {
+                        db.borrow_mut().push(conn);
+                        Ok(())
+                    }));
+        }
 
 
-        vec![App { db }]
+        vec![App { dbs }]
     }).backlog(8192)
     }).backlog(8192)
         .bind("0.0.0.0:8080")
         .bind("0.0.0.0:8080")
         .unwrap()
         .unwrap()

Nem az összes módosított fájl került megjelenítésre, mert túl sok fájl változott