Browse Source

Actix: use async postgresql driver (#3958)

* upgrade actix to latest stable

* use async postgresql driver

* fix dbb updates test
Nikolay Kim 7 years ago
parent
commit
66c3634f92

File diff suppressed because it is too large
+ 321 - 199
frameworks/Rust/actix/Cargo.lock


+ 9 - 3
frameworks/Rust/actix/Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 name = "actix"
-version = "0.7.0"
+version = "0.7.1"
 build = "build.rs"
 
 [[bin]]
@@ -31,10 +31,16 @@ futures = "0.1"
 http = "0.1"
 diesel = { version = "1.2", features = ["postgres"] }
 url = { version="1.7", features=["query_encoding"] }
-postgres = "0.15"
 
 actix = "0.7"
-actix-web = { git = "https://github.com/actix/actix-web.git", default-features = false }
+actix-web = { version="0.7", default-features = false }
+
+phf = "0.7.22"
+tokio-postgres = { git="https://github.com/sfackler/rust-postgres.git" }
+
+[patch.crates-io]
+tokio-io = { git="https://github.com/tokio-rs/tokio.git" }
+tokio-uds = { git="https://github.com/tokio-rs/tokio.git" }
 
 [build-dependencies]
 askama = "0.7"

+ 160 - 76
frameworks/Rust/actix/src/db_pg.rs

@@ -1,33 +1,81 @@
+use std::io;
+
+use actix::fut;
 use actix::prelude::*;
-use postgres::{Connection, TlsMode};
+use futures::{stream, Future, Stream};
 use rand::{thread_rng, Rng, ThreadRng};
-use std::io;
+use tokio_postgres::{connect, Client, Statement, TlsMode};
 
 use models::{Fortune, World};
 
 /// Postgres interface
 pub struct PgConnection {
-    conn: Connection,
+    cl: Option<Client>,
+    fortune: Option<Statement>,
+    world: Option<Statement>,
+    update: Option<Statement>,
     rng: ThreadRng,
 }
 
 impl Actor for PgConnection {
-    type Context = SyncContext<Self>;
+    type Context = Context<Self>;
 }
 
 impl PgConnection {
-    pub fn new(db_url: &str) -> PgConnection {
-        let conn = Connection::connect(db_url, TlsMode::None)
-            .expect(&format!("Error connecting to {}", db_url));
-        PgConnection {
-            conn,
-            rng: thread_rng(),
-        }
+    pub fn connect(db_url: &str) -> Addr<PgConnection> {
+        let hs = connect(db_url.parse().unwrap(), TlsMode::None);
+
+        PgConnection::create(move |ctx| {
+            let act = PgConnection {
+                cl: None,
+                fortune: None,
+                world: None,
+                update: None,
+                rng: thread_rng(),
+            };
+
+            hs.map_err(|_| panic!("can not connect to postgresql"))
+                .into_actor(&act)
+                .and_then(|(mut cl, conn), act, ctx| {
+                    ctx.wait(
+                        cl.prepare("SELECT id, message FROM fortune")
+                            .map_err(|_| ())
+                            .into_actor(act)
+                            .and_then(|st, act, _| {
+                                act.fortune = Some(st);
+                                fut::ok(())
+                            }),
+                    );
+                    ctx.wait(
+                        cl.prepare("SELECT id, randomnumber FROM world WHERE id=$1")
+                            .map_err(|_| ())
+                            .into_actor(act)
+                            .and_then(|st, act, _| {
+                                act.world = Some(st);
+                                fut::ok(())
+                            }),
+                    );
+                    ctx.wait(
+                        cl.prepare("SELECT id FROM world WHERE id=$1")
+                            .map_err(|_| ())
+                            .into_actor(act)
+                            .and_then(|st, act, _| {
+                                act.update = Some(st);
+                                fut::ok(())
+                            }),
+                    );
+
+                    act.cl = Some(cl);
+                    Arbiter::spawn(conn.map_err(|e| panic!("{}", e)));
+                    fut::ok(())
+                })
+                .wait(ctx);
+
+            act
+        })
     }
 }
 
-unsafe impl Send for PgConnection {}
-
 pub struct RandomWorld;
 
 impl Message for RandomWorld {
@@ -35,21 +83,26 @@ impl Message for RandomWorld {
 }
 
 impl Handler<RandomWorld> for PgConnection {
-    type Result = io::Result<World>;
+    type Result = ResponseFuture<World, io::Error>;
 
     fn handle(&mut self, _: RandomWorld, _: &mut Self::Context) -> Self::Result {
-        let random_world = self
-            .conn
-            .prepare_cached("SELECT id, randomnumber FROM world WHERE id=$1")
-            .unwrap();
-
         let random_id = self.rng.gen_range::<i32>(1, 10_000);
-        let rows = &random_world.query(&[&random_id]).unwrap();
-        let row = rows.get(0);
-        Ok(World {
-            id: row.get(0),
-            randomnumber: row.get(1),
-        })
+
+        Box::new(
+            self.cl
+                .as_mut()
+                .unwrap()
+                .query(self.world.as_ref().unwrap(), &[&random_id])
+                .into_future()
+                .map_err(|e| io::Error::new(io::ErrorKind::Other, e.0))
+                .and_then(|(row, _)| {
+                    let row = row.unwrap();
+                    Ok(World {
+                        id: row.get(0),
+                        randomnumber: row.get(1),
+                    })
+                }),
+        )
     }
 }
 
@@ -60,25 +113,30 @@ impl Message for RandomWorlds {
 }
 
 impl Handler<RandomWorlds> for PgConnection {
-    type Result = io::Result<Vec<World>>;
+    type Result = ResponseFuture<Vec<World>, io::Error>;
 
     fn handle(&mut self, msg: RandomWorlds, _: &mut Self::Context) -> Self::Result {
-        let random_world = self
-            .conn
-            .prepare_cached("SELECT id, randomnumber FROM world WHERE id=$1")
-            .unwrap();
-
         let mut worlds = Vec::with_capacity(msg.0 as usize);
         for _ in 0..msg.0 {
             let w_id: i32 = self.rng.gen_range(1, 10_000);
-            let rows = &random_world.query(&[&w_id]).unwrap();
-            let row = rows.get(0);
-            worlds.push(World {
-                id: row.get(0),
-                randomnumber: row.get(1),
-            });
+            worlds.push(
+                self.cl
+                    .as_mut()
+                    .unwrap()
+                    .query(self.world.as_ref().unwrap(), &[&w_id])
+                    .into_future()
+                    .map_err(|e| io::Error::new(io::ErrorKind::Other, e.0))
+                    .and_then(|(row, _)| {
+                        let row = row.unwrap();
+                        Ok(World {
+                            id: row.get(0),
+                            randomnumber: row.get(1),
+                        })
+                    }),
+            );
         }
-        Ok(worlds)
+
+        Box::new(stream::futures_unordered(worlds).collect())
     }
 }
 
@@ -89,36 +147,57 @@ impl Message for UpdateWorld {
 }
 
 impl Handler<UpdateWorld> for PgConnection {
-    type Result = io::Result<Vec<World>>;
+    type Result = ResponseActFuture<Self, Vec<World>, io::Error>;
 
     fn handle(&mut self, msg: UpdateWorld, _: &mut Self::Context) -> Self::Result {
-        let get_world = self
-            .conn
-            .prepare_cached("SELECT id FROM world WHERE id=$1")
-            .unwrap();
-        let mut update = String::with_capacity(120 + 6 * msg.0 as usize);
-        update
-            .push_str("UPDATE world SET randomnumber = temp.randomnumber FROM (VALUES ");
-
         let mut worlds = Vec::with_capacity(msg.0 as usize);
         for _ in 0..msg.0 {
-            let random_id = self.rng.gen_range::<i32>(1, 10_000);
-            let rows = &get_world.query(&[&random_id]).unwrap();
-            let w = World {
-                id: rows.get(0).get(0),
-                randomnumber: self.rng.gen_range(1, 10_000),
-            };
-            update.push_str(&format!("({}, {}),", w.id, w.randomnumber));
-            worlds.push(w);
+            let id: i32 = self.rng.gen_range(1, 10_000);
+            let w_id: i32 = self.rng.gen_range(1, 10_000);
+            worlds.push(
+                self.cl
+                    .as_mut()
+                    .unwrap()
+                    .query(self.update.as_ref().unwrap(), &[&w_id])
+                    .into_future()
+                    .map_err(|e| io::Error::new(io::ErrorKind::Other, e.0))
+                    .and_then(move |(row, _)| {
+                        let row = row.unwrap();
+                        Ok(World {
+                            id: row.get(0),
+                            randomnumber: id,
+                        })
+                    }),
+            );
         }
-        worlds.sort_by_key(|w| w.id);
-
-        update.pop();
-        update
-            .push_str(" ORDER BY 1) AS temp(id, randomnumber) WHERE temp.id = world.id");
-        self.conn.execute(&update, &[]).unwrap();
 
-        Ok(worlds)
+        Box::new(
+            stream::futures_unordered(worlds)
+                .collect()
+                .into_actor(self)
+                .and_then(move |mut worlds, act, _| {
+                    let mut update = String::with_capacity(120 + 6 * msg.0 as usize);
+                    update
+                        .push_str("UPDATE world SET randomnumber = temp.randomnumber FROM (VALUES ");
+
+                    for w in &worlds {
+                        update.push_str(&format!("({}, {}),", w.id, w.randomnumber));
+                    }
+                    worlds.sort_by_key(|w| w.id);
+
+                    update.pop();
+                    update
+                        .push_str(" ORDER BY 1) AS temp(id, randomnumber) WHERE temp.id = world.id");
+
+                    act.cl
+                        .as_mut()
+                        .unwrap()
+                        .batch_execute(&update)
+                        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
+                        .into_actor(act)
+                        .and_then(|_, _, _| fut::ok(worlds))
+                }),
+        )
     }
 }
 
@@ -129,27 +208,32 @@ impl Message for TellFortune {
 }
 
 impl Handler<TellFortune> for PgConnection {
-    type Result = io::Result<Vec<Fortune>>;
+    type Result = ResponseFuture<Vec<Fortune>, io::Error>;
 
     fn handle(&mut self, _: TellFortune, _: &mut Self::Context) -> Self::Result {
-        let fortune = self
-            .conn
-            .prepare_cached("SELECT id, message FROM fortune")
-            .unwrap();
-
         let mut items = Vec::with_capacity(16);
         items.push(Fortune {
             id: 0,
             message: "Additional fortune added at request time.".to_string(),
         });
 
-        for row in &fortune.query(&[])? {
-            items.push(Fortune {
-                id: row.get(0),
-                message: row.get(1),
-            });
-        }
-        items.sort_by(|it, next| it.message.cmp(&next.message));
-        Ok(items)
+        Box::new(
+            self.cl
+                .as_mut()
+                .unwrap()
+                .query(self.fortune.as_ref().unwrap(), &[])
+                .fold(items, move |mut items, row| {
+                    items.push(Fortune {
+                        id: row.get(0),
+                        message: row.get(1),
+                    });
+                    Ok::<_, io::Error>(items)
+                })
+                .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
+                .and_then(|mut items| {
+                    items.sort_by(|it, next| it.message.cmp(&next.message));
+                    Ok(items)
+                }),
+        )
     }
 }

+ 5 - 16
frameworks/Rust/actix/src/main_pg.rs

@@ -3,10 +3,10 @@ extern crate actix_web;
 extern crate bytes;
 extern crate futures;
 extern crate num_cpus;
-extern crate postgres;
 extern crate rand;
 extern crate serde;
 extern crate serde_json;
+extern crate tokio_postgres;
 extern crate url;
 #[macro_use]
 extern crate serde_derive;
@@ -22,7 +22,6 @@ use actix_web::{
 use askama::Template;
 use bytes::BytesMut;
 use futures::Future;
-use postgres::{Connection, TlsMode};
 
 mod db_pg;
 mod models;
@@ -136,22 +135,11 @@ fn main() {
     let sys = System::new("techempower");
     let db_url = "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
 
-    // Avoid triggering "FATAL: the database system is starting up" error from
-    // postgres.
-    {
-        if Connection::connect(db_url, TlsMode::None).is_err() {
-            std::thread::sleep(std::time::Duration::from_secs(5));
-        }
-    }
-
-    // Start db executor actors
-    let addr = SyncArbiter::start(num_cpus::get() * 3, move || {
-        db_pg::PgConnection::new(db_url)
-    });
-
     // start http server
     server::new(move || {
-        App::with_state(State { db: addr.clone() })
+        let addr = PgConnection::connect(db_url);
+
+        App::with_state(State { db: addr })
             .resource("/db", |r| r.route().f(world_row))
             .resource("/queries", |r| r.route().f(queries))
             .resource("/fortune", |r| r.route().f(fortune))
@@ -159,6 +147,7 @@ fn main() {
     }).backlog(8192)
         .bind("0.0.0.0:8080")
         .unwrap()
+        .workers(1)
         .start();
 
     println!("Started http server: 127.0.0.1:8080");

+ 6 - 17
frameworks/Rust/actix/src/main_raw.rs

@@ -9,11 +9,11 @@ extern crate serde_derive;
 #[macro_use]
 extern crate askama;
 extern crate num_cpus;
-extern crate postgres;
 extern crate rand;
 extern crate url;
 #[macro_use]
 extern crate diesel;
+extern crate tokio_postgres;
 
 use std::mem;
 
@@ -24,7 +24,6 @@ use actix_web::server::{
 use actix_web::Error;
 use askama::Template;
 use futures::{Async, Future, Poll};
-use postgres::{Connection, TlsMode};
 
 mod db_pg;
 mod models;
@@ -269,22 +268,12 @@ fn main() {
     let sys = System::new("techempower");
     let db_url = "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
 
-    // Avoid triggering "FATAL: the database system is starting up" error from
-    // postgres.
-    {
-        if Connection::connect(db_url, TlsMode::None).is_err() {
-            std::thread::sleep(std::time::Duration::from_secs(5));
-        }
-    }
-
-    // Start db executor actors
-    let addr = SyncArbiter::start(num_cpus::get() * 3, move || {
-        db_pg::PgConnection::new(db_url)
-    });
-
     // start http server
-    HttpServer::new(move || vec![App { db: addr.clone() }])
-        .backlog(8192)
+    HttpServer::new(move || {
+        let db = PgConnection::connect(db_url);
+
+        vec![App { db }]
+    }).backlog(8192)
         .bind("0.0.0.0:8080")
         .unwrap()
         .start();

Some files were not shown because too many files changed in this diff