Browse Source

update rand, change update query (#5363)

Nikolay Kim 5 years ago
parent
commit
0d4554867d

+ 2 - 2
frameworks/Rust/actix/Cargo.toml

@@ -33,13 +33,13 @@ actix-server = "1.0.0"
 actix-service = "1.0.1"
 actix-service = "1.0.1"
 jemallocator = "0.3.2"
 jemallocator = "0.3.2"
 askama = "0.8"
 askama = "0.8"
-markup = "0.3.1"
+markup = "0.4.1"
 yarte = "0.3"
 yarte = "0.3"
 serde = "1.0"
 serde = "1.0"
 serde_json = "1.0"
 serde_json = "1.0"
 serde_derive = "1.0"
 serde_derive = "1.0"
 env_logger = "0.7"
 env_logger = "0.7"
-rand = "0.5"
+rand = { version = "0.7", features = ["small_rng"] }
 bytes = "0.5.3"
 bytes = "0.5.3"
 num_cpus = "1.0"
 num_cpus = "1.0"
 futures = "0.3.1"
 futures = "0.3.1"

+ 5 - 4
frameworks/Rust/actix/src/db.rs

@@ -5,13 +5,14 @@ use actix::prelude::*;
 use diesel;
 use diesel;
 use diesel::prelude::*;
 use diesel::prelude::*;
 use diesel::result::Error;
 use diesel::result::Error;
-use rand::{thread_rng, Rng, ThreadRng};
+use rand::rngs::SmallRng;
+use rand::{Rng, SeedableRng};
 
 
 use crate::models;
 use crate::models;
 
 
 pub struct DbExecutor {
 pub struct DbExecutor {
     conn: PgConnection,
     conn: PgConnection,
-    rng: ThreadRng,
+    rng: SmallRng,
 }
 }
 
 
 impl Actor for DbExecutor {
 impl Actor for DbExecutor {
@@ -23,7 +24,7 @@ impl DbExecutor {
         DbExecutor {
         DbExecutor {
             conn: PgConnection::establish(db_url)
             conn: PgConnection::establish(db_url)
                 .expect(&format!("Error connecting to {}", db_url)),
                 .expect(&format!("Error connecting to {}", db_url)),
-            rng: thread_rng(),
+            rng: SmallRng::from_entropy(),
         }
         }
     }
     }
 }
 }
@@ -92,7 +93,7 @@ impl Handler<UpdateWorld> for DbExecutor {
 
 
         let mut worlds = Vec::with_capacity(msg.0 as usize);
         let mut worlds = Vec::with_capacity(msg.0 as usize);
         for _ in 0..msg.0 {
         for _ in 0..msg.0 {
-            let w_id = self.rng.gen_range::<i32>(1, 10_001);
+            let w_id: i32 = self.rng.gen_range(1, 10_001);
             let mut w = match world.filter(id.eq(w_id)).load::<models::World>(&self.conn)
             let mut w = match world.filter(id.eq(w_id)).load::<models::World>(&self.conn)
             {
             {
                 Ok(mut items) => items.pop().unwrap(),
                 Ok(mut items) => items.pop().unwrap(),

+ 25 - 32
frameworks/Rust/actix/src/db_pg.rs

@@ -5,7 +5,8 @@ use actix::prelude::*;
 use bytes::{Bytes, BytesMut};
 use bytes::{Bytes, BytesMut};
 use futures::stream::futures_unordered::FuturesUnordered;
 use futures::stream::futures_unordered::FuturesUnordered;
 use futures::{FutureExt, StreamExt, TryStreamExt};
 use futures::{FutureExt, StreamExt, TryStreamExt};
-use rand::{thread_rng, Rng, ThreadRng};
+use rand::rngs::SmallRng;
+use rand::{Rng, SeedableRng};
 use tokio_postgres::{connect, Client, NoTls, Statement};
 use tokio_postgres::{connect, Client, NoTls, Statement};
 
 
 use crate::models::World;
 use crate::models::World;
@@ -16,7 +17,7 @@ pub struct PgConnection {
     cl: Client,
     cl: Client,
     fortune: Statement,
     fortune: Statement,
     world: Statement,
     world: Statement,
-    rng: ThreadRng,
+    rng: SmallRng,
 }
 }
 
 
 impl Actor for PgConnection {
 impl Actor for PgConnection {
@@ -28,19 +29,16 @@ impl PgConnection {
         let (cl, conn) = connect(db_url, NoTls)
         let (cl, conn) = connect(db_url, NoTls)
             .await
             .await
             .expect("can not connect to postgresql");
             .expect("can not connect to postgresql");
-        actix_rt::spawn(conn.map(|res| panic!("{:?}", res)));
+        actix_rt::spawn(conn.map(|_| ()));
 
 
-        let fortune = cl.prepare("SELECT id, message FROM fortune").await.unwrap();
-        let world = cl
-            .prepare("SELECT id, randomnumber FROM world WHERE id=$1")
-            .await
-            .unwrap();
+        let fortune = cl.prepare("SELECT * FROM fortune").await.unwrap();
+        let world = cl.prepare("SELECT * FROM world WHERE id=$1").await.unwrap();
 
 
         Ok(PgConnection::create(move |_| PgConnection {
         Ok(PgConnection::create(move |_| PgConnection {
             cl,
             cl,
             fortune,
             fortune,
             world,
             world,
-            rng: thread_rng(),
+            rng: SmallRng::from_entropy(),
         }))
         }))
     }
     }
 }
 }
@@ -55,7 +53,7 @@ impl Handler<RandomWorld> for PgConnection {
     type Result = ResponseFuture<Result<Bytes, io::Error>>;
     type Result = ResponseFuture<Result<Bytes, io::Error>>;
 
 
     fn handle(&mut self, _: RandomWorld, _: &mut Self::Context) -> Self::Result {
     fn handle(&mut self, _: RandomWorld, _: &mut Self::Context) -> Self::Result {
-        let random_id = self.rng.gen_range::<i32>(1, 10_001);
+        let random_id = (self.rng.gen::<u32>() % 10_000 + 1) as i32;
         let fut = self.cl.query_one(&self.world, &[&random_id]);
         let fut = self.cl.query_one(&self.world, &[&random_id]);
 
 
         Box::pin(async move {
         Box::pin(async move {
@@ -90,7 +88,7 @@ impl Handler<RandomWorlds> for PgConnection {
     fn handle(&mut self, msg: RandomWorlds, _: &mut Self::Context) -> Self::Result {
     fn handle(&mut self, msg: RandomWorlds, _: &mut Self::Context) -> Self::Result {
         let worlds = FuturesUnordered::new();
         let worlds = FuturesUnordered::new();
         for _ in 0..msg.0 {
         for _ in 0..msg.0 {
-            let w_id: i32 = self.rng.gen_range(1, 10_001);
+            let w_id = (self.rng.gen::<u32>() % 10_000 + 1) as i32;
             worlds.push(
             worlds.push(
                 self.cl
                 self.cl
                     .query_one(&self.world, &[&w_id])
                     .query_one(&self.world, &[&w_id])
@@ -122,21 +120,17 @@ impl Handler<UpdateWorld> for PgConnection {
     fn handle(&mut self, msg: UpdateWorld, _: &mut Self::Context) -> Self::Result {
     fn handle(&mut self, msg: UpdateWorld, _: &mut Self::Context) -> Self::Result {
         let worlds = FuturesUnordered::new();
         let worlds = FuturesUnordered::new();
         for _ in 0..msg.0 {
         for _ in 0..msg.0 {
-            let id: i32 = self.rng.gen_range(1, 10_001);
-            let w_id: i32 = self.rng.gen_range(1, 10_001);
+            let id = (self.rng.gen::<u32>() % 10_000 + 1) as i32;
+            let w_id = (self.rng.gen::<u32>() % 10_000 + 1) as i32;
             worlds.push(self.cl.query_one(&self.world, &[&w_id]).map(
             worlds.push(self.cl.query_one(&self.world, &[&w_id]).map(
                 move |res| match res {
                 move |res| match res {
                     Err(e) => {
                     Err(e) => {
                         Err(io::Error::new(io::ErrorKind::Other, format!("{:?}", e)))
                         Err(io::Error::new(io::ErrorKind::Other, format!("{:?}", e)))
                     }
                     }
-                    Ok(row) => {
-                        let mut world = World {
-                            id: row.get(0),
-                            randomnumber: row.get(1),
-                        };
-                        world.randomnumber = id;
-                        Ok(world)
-                    }
+                    Ok(row) => Ok(World {
+                        id: row.get(0),
+                        randomnumber: id,
+                    }),
                 },
                 },
             ));
             ));
         }
         }
@@ -145,20 +139,19 @@ impl Handler<UpdateWorld> for PgConnection {
         Box::pin(async move {
         Box::pin(async move {
             let worlds: Vec<World> = worlds.try_collect().await?;
             let worlds: Vec<World> = worlds.try_collect().await?;
 
 
-            let mut update = String::with_capacity(120 + 6 * msg.0 as usize);
-            update.push_str(
-                "UPDATE world SET randomnumber = temp.randomnumber FROM (VALUES ",
-            );
-
+            let mut q = String::with_capacity(100 + 24 * msg.0 as usize);
+            q.push_str("UPDATE world SET randomnumber = CASE id ");
             for w in &worlds {
             for w in &worlds {
-                let _ = write!(&mut update, "({}, {}),", w.id, w.randomnumber);
+                let _ = write!(&mut q, "when {} then {} ", w.id, w.randomnumber);
             }
             }
-            update.pop();
-            update.push_str(
-                " ORDER BY 1) AS temp(id, randomnumber) WHERE temp.id = world.id",
-            );
+            q.push_str("ELSE randomnumber END WHERE id IN (");
+            for w in &worlds {
+                let _ = write!(&mut q, "{},", w.id);
+            }
+            q.pop();
+            q.push(')');
 
 
-            cl.simple_query(&update)
+            cl.simple_query(&q)
                 .await
                 .await
                 .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{:?}", e)))?;
                 .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{:?}", e)))?;
 
 

+ 11 - 13
frameworks/Rust/actix/src/db_pg_direct.rs

@@ -5,7 +5,8 @@ use actix_http::Error;
 use bytes::{Bytes, BytesMut};
 use bytes::{Bytes, BytesMut};
 use futures::stream::futures_unordered::FuturesUnordered;
 use futures::stream::futures_unordered::FuturesUnordered;
 use futures::{Future, FutureExt, StreamExt, TryStreamExt};
 use futures::{Future, FutureExt, StreamExt, TryStreamExt};
-use rand::{thread_rng, Rng, ThreadRng};
+use rand::rngs::SmallRng;
+use rand::{Rng, SeedableRng};
 use tokio_postgres::{connect, Client, NoTls, Statement};
 use tokio_postgres::{connect, Client, NoTls, Statement};
 
 
 use crate::models::World;
 use crate::models::World;
@@ -16,7 +17,7 @@ pub struct PgConnection {
     cl: Client,
     cl: Client,
     fortune: Statement,
     fortune: Statement,
     world: Statement,
     world: Statement,
-    rng: ThreadRng,
+    rng: SmallRng,
 }
 }
 
 
 impl PgConnection {
 impl PgConnection {
@@ -24,26 +25,23 @@ impl PgConnection {
         let (cl, conn) = connect(db_url, NoTls)
         let (cl, conn) = connect(db_url, NoTls)
             .await
             .await
             .expect("can not connect to postgresql");
             .expect("can not connect to postgresql");
-        actix_rt::spawn(conn.map(|res| panic!("{:?}", res)));
+        actix_rt::spawn(conn.map(|_| ()));
 
 
-        let fortune = cl.prepare("SELECT id, message FROM fortune").await.unwrap();
-        let world = cl
-            .prepare("SELECT id, randomnumber FROM world WHERE id=$1")
-            .await
-            .unwrap();
+        let fortune = cl.prepare("SELECT * FROM fortune").await.unwrap();
+        let world = cl.prepare("SELECT * FROM world WHERE id=$1").await.unwrap();
 
 
         PgConnection {
         PgConnection {
             cl,
             cl,
             fortune,
             fortune,
             world,
             world,
-            rng: thread_rng(),
+            rng: SmallRng::from_entropy(),
         }
         }
     }
     }
 }
 }
 
 
 impl PgConnection {
 impl PgConnection {
     pub fn get_world(&mut self) -> impl Future<Output = Result<Bytes, Error>> {
     pub fn get_world(&mut self) -> impl Future<Output = Result<Bytes, Error>> {
-        let random_id = self.rng.gen_range::<i32>(1, 10_001);
+        let random_id = (self.rng.gen::<u32>() % 10_000 + 1) as i32;
         let fut = self.cl.query_one(&self.world, &[&random_id]);
         let fut = self.cl.query_one(&self.world, &[&random_id]);
 
 
         async move {
         async move {
@@ -71,7 +69,7 @@ impl PgConnection {
     ) -> impl Future<Output = Result<Vec<World>, io::Error>> {
     ) -> impl Future<Output = Result<Vec<World>, io::Error>> {
         let worlds = FuturesUnordered::new();
         let worlds = FuturesUnordered::new();
         for _ in 0..num {
         for _ in 0..num {
-            let w_id: i32 = self.rng.gen_range(1, 10_001);
+            let w_id = (self.rng.gen::<u32>() % 10_000 + 1) as i32;
             worlds.push(
             worlds.push(
                 self.cl
                 self.cl
                     .query_one(&self.world, &[&w_id])
                     .query_one(&self.world, &[&w_id])
@@ -96,8 +94,8 @@ impl PgConnection {
     ) -> impl Future<Output = Result<Vec<World>, io::Error>> {
     ) -> impl Future<Output = Result<Vec<World>, io::Error>> {
         let worlds = FuturesUnordered::new();
         let worlds = FuturesUnordered::new();
         for _ in 0..num {
         for _ in 0..num {
-            let id: i32 = self.rng.gen_range(1, 10_001);
-            let w_id: i32 = self.rng.gen_range(1, 10_001);
+            let id = (self.rng.gen::<u32>() % 10_000 + 1) as i32;
+            let w_id = (self.rng.gen::<u32>() % 10_000 + 1) as i32;
             worlds.push(self.cl.query_one(&self.world, &[&w_id]).map(
             worlds.push(self.cl.query_one(&self.world, &[&w_id]).map(
                 move |res| match res {
                 move |res| match res {
                     Err(e) => {
                     Err(e) => {

+ 1 - 4
frameworks/Rust/actix/src/main_pg.rs

@@ -117,18 +117,15 @@ async fn main() -> std::io::Result<()> {
     const DB_URL: &str =
     const DB_URL: &str =
         "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
         "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
 
 
-    let addr = PgConnection::connect(DB_URL).await?;
-
     // start http server
     // start http server
     Server::build()
     Server::build()
         .backlog(1024)
         .backlog(1024)
         .bind("techempower", "0.0.0.0:8080", move || {
         .bind("techempower", "0.0.0.0:8080", move || {
-            let addr = addr.clone();
             HttpService::build()
             HttpService::build()
                 .keep_alive(KeepAlive::Os)
                 .keep_alive(KeepAlive::Os)
                 .h1(map_config(
                 .h1(map_config(
                     App::new()
                     App::new()
-                        .data(addr)
+                        .data_factory(|| PgConnection::connect(DB_URL))
                         .service(web::resource("/db").to(world_row))
                         .service(web::resource("/db").to(world_row))
                         .service(web::resource("/queries").to(queries))
                         .service(web::resource("/queries").to(queries))
                         .service(web::resource("/fortune").to(fortune))
                         .service(web::resource("/fortune").to(fortune))

+ 8 - 12
frameworks/Rust/actix/src/main_raw.rs

@@ -39,8 +39,7 @@ struct App {
 
 
 impl App {
 impl App {
     fn handle_request(&mut self, req: Request) {
     fn handle_request(&mut self, req: Request) {
-        let path = req.path();
-        match path {
+        match req.path() {
             "/json" => {
             "/json" => {
                 let message = Message {
                 let message = Message {
                     message: "Hello, World!",
                     message: "Hello, World!",
@@ -69,7 +68,7 @@ impl Future for App {
         let this = self.get_mut();
         let this = self.get_mut();
 
 
         loop {
         loop {
-            if this.read_buf.capacity() - this.read_buf.len() < 4096 {
+            if this.read_buf.capacity() - this.read_buf.len() < 512 {
                 this.read_buf.reserve(32_768);
                 this.read_buf.reserve(32_768);
             }
             }
             let read = Pin::new(&mut this.io).poll_read_buf(cx, &mut this.read_buf);
             let read = Pin::new(&mut this.io).poll_read_buf(cx, &mut this.read_buf);
@@ -84,7 +83,7 @@ impl Future for App {
             }
             }
         }
         }
 
 
-        if this.write_buf.capacity() - this.write_buf.len() < 8192 {
+        if this.write_buf.capacity() - this.write_buf.len() < 512 {
             this.write_buf.reserve(32_768);
             this.write_buf.reserve(32_768);
         }
         }
 
 
@@ -103,7 +102,11 @@ impl Future for App {
                 match Pin::new(&mut this.io).poll_write(cx, &this.write_buf[written..]) {
                 match Pin::new(&mut this.io).poll_write(cx, &this.write_buf[written..]) {
                     Poll::Pending => {
                     Poll::Pending => {
                         if written > 0 {
                         if written > 0 {
-                            this.write_buf.advance(written);
+                            if written == len {
+                                unsafe { this.write_buf.set_len(0) }
+                            } else {
+                                this.write_buf.advance(written);
+                            }
                         }
                         }
                         break;
                         break;
                     }
                     }
@@ -117,13 +120,6 @@ impl Future for App {
                     Poll::Ready(Err(_)) => return Poll::Ready(Err(())),
                     Poll::Ready(Err(_)) => return Poll::Ready(Err(())),
                 }
                 }
             }
             }
-            if written > 0 {
-                if written == len {
-                    unsafe { this.write_buf.set_len(0) }
-                } else {
-                    this.write_buf.advance(written);
-                }
-            }
         }
         }
         Poll::Pending
         Poll::Pending
     }
     }

+ 0 - 43
frameworks/Rust/actix/src/utils.rs

@@ -39,49 +39,6 @@ pub fn get_query_param(query: &str) -> u16 {
     cmp::min(500, cmp::max(1, q))
     cmp::min(500, cmp::max(1, q))
 }
 }
 
 
-fn escapable(b: u8) -> bool {
-    match b {
-        b'<' | b'>' | b'&' | b'"' | b'\'' | b'/' => true,
-        _ => false,
-    }
-}
-
-pub fn escape(writer: &mut Writer, s: String) {
-    let bytes = s.as_bytes();
-    let mut last_pos = 0;
-    for (idx, b) in s.as_bytes().iter().enumerate() {
-        if escapable(*b) {
-            let _ = writer.0.put_slice(&bytes[last_pos..idx]);
-
-            last_pos = idx + 1;
-            match *b {
-                b'<' => {
-                    let _ = writer.0.put_slice(b"&lt;");
-                }
-                b'>' => {
-                    let _ = writer.0.put_slice(b"&gt;");
-                }
-                b'&' => {
-                    let _ = writer.0.put_slice(b"&amp;");
-                }
-                b'"' => {
-                    let _ = writer.0.put_slice(b"&quot;");
-                }
-                b'\'' => {
-                    let _ = writer.0.put_slice(b"&#x27;");
-                }
-                b'/' => {
-                    let _ = writer.0.put_slice(b"&#x2f;");
-                }
-                _ => panic!("incorrect indexing"),
-            }
-        }
-    }
-    if last_pos < bytes.len() - 1 {
-        let _ = writer.0.put_slice(&bytes[last_pos..]);
-    }
-}
-
 markup::define! {
 markup::define! {
     FortunesTemplate(fortunes: Vec<Fortune>) {
     FortunesTemplate(fortunes: Vec<Fortune>) {
         {markup::doctype()}
         {markup::doctype()}