Browse Source

use prep statement for update bench (#5369)

Nikolay Kim 5 years ago
parent
commit
171948f26a

+ 29 - 9
frameworks/Rust/actix/src/db_pg.rs

@@ -1,3 +1,4 @@
+use std::collections::HashMap;
 use std::fmt::Write;
 use std::io;
 
@@ -7,6 +8,7 @@ use futures::stream::futures_unordered::FuturesUnordered;
 use futures::{FutureExt, StreamExt, TryStreamExt};
 use rand::rngs::SmallRng;
 use rand::{Rng, SeedableRng};
+use tokio_postgres::types::ToSql;
 use tokio_postgres::{connect, Client, NoTls, Statement};
 
 use crate::models::World;
@@ -18,6 +20,7 @@ pub struct PgConnection {
     fortune: Statement,
     world: Statement,
     rng: SmallRng,
+    updates: HashMap<u16, Statement>,
 }
 
 impl Actor for PgConnection {
@@ -33,11 +36,30 @@ impl PgConnection {
 
         let fortune = cl.prepare("SELECT * FROM fortune").await.unwrap();
         let world = cl.prepare("SELECT * FROM world WHERE id=$1").await.unwrap();
+        let mut updates = HashMap::new();
+        for num in 1..=500u16 {
+            let mut pl = 1;
+            let mut q = String::new();
+            q.push_str("UPDATE world SET randomnumber = CASE id ");
+            for _ in 1..=num {
+                let _ = write!(&mut q, "when ${} then ${} ", pl, pl + 1);
+                pl += 2;
+            }
+            q.push_str("ELSE randomnumber END WHERE id IN (");
+            for _ in 1..=num {
+                let _ = write!(&mut q, "${},", pl);
+                pl += 1;
+            }
+            q.pop();
+            q.push(')');
+            updates.insert(num, cl.prepare(&q).await.unwrap());
+        }
 
         Ok(PgConnection::create(move |_| PgConnection {
             cl,
             fortune,
             world,
+            updates,
             rng: SmallRng::from_entropy(),
         }))
     }
@@ -61,7 +83,7 @@ impl Handler<RandomWorld> for PgConnection {
                 .await
                 .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{:?}", e)))?;
 
-            let mut body = BytesMut::with_capacity(33);
+            let mut body = BytesMut::with_capacity(40);
             serde_json::to_writer(
                 Writer(&mut body),
                 &World {
@@ -136,22 +158,20 @@ impl Handler<UpdateWorld> for PgConnection {
         }
 
         let cl = self.cl.clone();
+        let st = self.updates.get(&msg.0).unwrap().clone();
         Box::pin(async move {
             let worlds: Vec<World> = worlds.try_collect().await?;
 
-            let mut q = String::with_capacity(100 + 24 * msg.0 as usize);
-            q.push_str("UPDATE world SET randomnumber = CASE id ");
+            let mut params: Vec<&dyn ToSql> = Vec::with_capacity(msg.0 as usize * 3);
             for w in &worlds {
-                let _ = write!(&mut q, "when {} then {} ", w.id, w.randomnumber);
+                params.push(&w.id);
+                params.push(&w.randomnumber);
             }
-            q.push_str("ELSE randomnumber END WHERE id IN (");
             for w in &worlds {
-                let _ = write!(&mut q, "{},", w.id);
+                params.push(&w.id);
             }
-            q.pop();
-            q.push(')');
 
-            cl.simple_query(&q)
+            cl.query(&st, &params)
                 .await
                 .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{:?}", e)))?;
 

+ 32 - 13
frameworks/Rust/actix/src/db_pg_direct.rs

@@ -1,3 +1,4 @@
+use std::collections::HashMap;
 use std::fmt::Write;
 use std::io;
 
@@ -7,6 +8,7 @@ use futures::stream::futures_unordered::FuturesUnordered;
 use futures::{Future, FutureExt, StreamExt, TryStreamExt};
 use rand::rngs::SmallRng;
 use rand::{Rng, SeedableRng};
+use tokio_postgres::types::ToSql;
 use tokio_postgres::{connect, Client, NoTls, Statement};
 
 use crate::models::World;
@@ -18,6 +20,7 @@ pub struct PgConnection {
     fortune: Statement,
     world: Statement,
     rng: SmallRng,
+    updates: HashMap<u16, Statement>,
 }
 
 impl PgConnection {
@@ -28,12 +31,31 @@ impl PgConnection {
         actix_rt::spawn(conn.map(|_| ()));
 
         let fortune = cl.prepare("SELECT * FROM fortune").await.unwrap();
+        let mut updates = HashMap::new();
+        for num in 1..=500u16 {
+            let mut pl = 1;
+            let mut q = String::new();
+            q.push_str("UPDATE world SET randomnumber = CASE id ");
+            for _ in 1..=num {
+                let _ = write!(&mut q, "when ${} then ${} ", pl, pl + 1);
+                pl += 2;
+            }
+            q.push_str("ELSE randomnumber END WHERE id IN (");
+            for _ in 1..=num {
+                let _ = write!(&mut q, "${},", pl);
+                pl += 1;
+            }
+            q.pop();
+            q.push(')');
+            updates.insert(num, cl.prepare(&q).await.unwrap());
+        }
         let world = cl.prepare("SELECT * FROM world WHERE id=$1").await.unwrap();
 
         PgConnection {
             cl,
             fortune,
             world,
+            updates,
             rng: SmallRng::from_entropy(),
         }
     }
@@ -49,7 +71,7 @@ impl PgConnection {
                 Error::from(io::Error::new(io::ErrorKind::Other, format!("{:?}", e)))
             })?;
 
-            let mut body = BytesMut::with_capacity(33);
+            let mut body = BytesMut::with_capacity(40);
             serde_json::to_writer(
                 Writer(&mut body),
                 &World {
@@ -90,7 +112,7 @@ impl PgConnection {
 
     pub fn update(
         &mut self,
-        num: usize,
+        num: u16,
     ) -> impl Future<Output = Result<Vec<World>, io::Error>> {
         let worlds = FuturesUnordered::new();
         for _ in 0..num {
@@ -114,23 +136,20 @@ impl PgConnection {
         }
 
         let cl = self.cl.clone();
+        let st = self.updates.get(&num).unwrap().clone();
         async move {
             let worlds: Vec<World> = worlds.try_collect().await?;
 
-            let mut update = String::with_capacity(120 + 6 * num as usize);
-            update.push_str(
-                "UPDATE world SET randomnumber = temp.randomnumber FROM (VALUES ",
-            );
-
+            let mut params: Vec<&dyn ToSql> = Vec::with_capacity(num as usize * 3);
             for w in &worlds {
-                let _ = write!(&mut update, "({}, {}),", w.id, w.randomnumber);
+                params.push(&w.id);
+                params.push(&w.randomnumber);
+            }
+            for w in &worlds {
+                params.push(&w.id);
             }
-            update.pop();
-            update.push_str(
-                " ORDER BY 1) AS temp(id, randomnumber) WHERE temp.id = world.id",
-            );
 
-            cl.simple_query(&update)
+            cl.query(&st, &params)
                 .await
                 .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{:?}", e)))?;
 

+ 1 - 1
frameworks/Rust/actix/src/main_platform.rs

@@ -100,7 +100,7 @@ impl Service for App {
                 })
             }
             "/updates" => {
-                let q = utils::get_query_param(req.uri().query().unwrap_or("")) as usize;
+                let q = utils::get_query_param(req.uri().query().unwrap_or(""));
                 let h_srv = self.hdr_srv.clone();
                 let h_ct = self.hdr_ctjson.clone();
                 let fut = self.db.update(q);

+ 6 - 8
frameworks/Rust/actix/src/main_raw.rs

@@ -83,7 +83,7 @@ impl Future for App {
             }
         }
 
-        if this.write_buf.capacity() - this.write_buf.len() < 512 {
+        if this.write_buf.capacity() - this.write_buf.len() <= 512 {
             this.write_buf.reserve(32_768);
         }
 
@@ -101,13 +101,6 @@ impl Future for App {
             while written < len {
                 match Pin::new(&mut this.io).poll_write(cx, &this.write_buf[written..]) {
                     Poll::Pending => {
-                        if written > 0 {
-                            if written == len {
-                                unsafe { this.write_buf.set_len(0) }
-                            } else {
-                                this.write_buf.advance(written);
-                            }
-                        }
                         break;
                     }
                     Poll::Ready(Ok(n)) => {
@@ -120,6 +113,11 @@ impl Future for App {
                     Poll::Ready(Err(_)) => return Poll::Ready(Err(())),
                 }
             }
+            if written == len {
+                unsafe { this.write_buf.set_len(0) }
+            } else if written > 0 {
+                this.write_buf.advance(written);
+            }
         }
         Poll::Pending
     }