Browse Source

[may_minihttp] update db client logic (#7850)

Xudong Huang 2 years ago
parent
commit
c0c11e3400

+ 1 - 1
frameworks/Rust/may-minihttp/may-minihttp.dockerfile

@@ -1,4 +1,4 @@
-FROM rust:1.65
+FROM rust:1.66
 
 RUN apt-get update -yqq && apt-get install -yqq cmake g++
 

+ 40 - 44
frameworks/Rust/may-minihttp/src/main.rs

@@ -15,7 +15,6 @@ use yarte::{ywrite_html, Serialize};
 
 mod utils {
     use atoi::FromRadix10;
-    use may_postgres::types::ToSql;
 
     pub fn get_query_param(query: &str) -> u16 {
         let q = if let Some(pos) = query.find("?q") {
@@ -25,12 +24,6 @@ mod utils {
         };
         q.clamp(1, 500)
     }
-
-    pub fn slice_iter<'a>(
-        s: &'a [&'a (dyn ToSql + Sync)],
-    ) -> impl ExactSizeIterator<Item = &'a dyn ToSql> + 'a {
-        s.iter().map(|s| *s as _)
-    }
 }
 
 #[derive(Serialize)]
@@ -52,16 +45,15 @@ pub struct Fortune<'a> {
 
 struct PgConnectionPool {
     idx: AtomicUsize,
-    clients: Vec<Arc<PgConnection>>,
+    clients: Vec<PgConnection>,
 }
 
 impl PgConnectionPool {
-    fn new(db_url: &str, size: usize) -> PgConnectionPool {
-        let mut clients = Vec::with_capacity(size);
-        for _ in 0..size {
-            let client = PgConnection::new(db_url);
-            clients.push(Arc::new(client));
-        }
+    fn new(db_url: &'static str, size: usize) -> PgConnectionPool {
+        let clients = (0..size)
+            .map(|_| std::thread::spawn(move || PgConnection::new(db_url)))
+            .collect::<Vec<_>>();
+        let clients = clients.into_iter().map(|t| t.join().unwrap()).collect();
 
         PgConnectionPool {
             idx: AtomicUsize::new(0),
@@ -69,20 +61,28 @@ impl PgConnectionPool {
         }
     }
 
-    fn get_connection(&self) -> (Arc<PgConnection>, usize) {
+    fn get_connection(&self) -> PgConnection {
         let idx = self.idx.fetch_add(1, Ordering::Relaxed);
         let len = self.clients.len();
-        (self.clients[idx % len].clone(), idx)
+        let connection = &self.clients[idx % len];
+        PgConnection {
+            client: connection.client.clone(),
+            statement: connection.statement.clone(),
+        }
     }
 }
 
-struct PgConnection {
-    client: Client,
+struct PgStatement {
     world: Statement,
     fortune: Statement,
     updates: Vec<Statement>,
 }
 
+struct PgConnection {
+    client: Client,
+    statement: Arc<PgStatement>,
+}
+
 impl PgConnection {
     fn new(db_url: &str) -> Self {
         let client = may_postgres::connect(db_url).unwrap();
@@ -103,7 +103,7 @@ impl PgConnection {
             }
             q.push_str("ELSE randomnumber END WHERE id IN (");
             for _ in 1..=num {
-                let _ = write!(&mut q, "${},", pl);
+                let _ = write!(&mut q, "${pl},");
                 pl += 1;
             }
             q.pop();
@@ -111,18 +111,19 @@ impl PgConnection {
             updates.push(client.prepare(&q).unwrap());
         }
 
-        PgConnection {
-            client,
+        let statement = Arc::new(PgStatement {
             world,
             fortune,
             updates,
-        }
+        });
+
+        PgConnection { client, statement }
     }
 
     fn get_world(&self, random_id: i32) -> Result<WorldRow, may_postgres::Error> {
         let mut q = self
             .client
-            .query_raw(&self.world, utils::slice_iter(&[&random_id]))?;
+            .query_raw(&self.statement.world, [&random_id as _])?;
         match q.next().transpose()? {
             Some(row) => Ok(WorldRow {
                 id: row.get(0),
@@ -142,7 +143,7 @@ impl PgConnection {
             let random_id = (rand.generate::<u32>() % 10_000 + 1) as i32;
             queries.push(
                 self.client
-                    .query_raw(&self.world, utils::slice_iter(&[&random_id]))?,
+                    .query_raw(&self.statement.world, [&random_id as _])?,
             );
         }
 
@@ -169,7 +170,7 @@ impl PgConnection {
             let random_id = (rand.generate::<u32>() % 10_000 + 1) as i32;
             queries.push(
                 self.client
-                    .query_raw(&self.world, utils::slice_iter(&[&random_id]))?,
+                    .query_raw(&self.statement.world, [&random_id as _])?,
             );
         }
 
@@ -185,7 +186,7 @@ impl PgConnection {
             }
         }
 
-        let mut params: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(num * 3);
+        let mut params: Vec<&(dyn ToSql)> = Vec::with_capacity(num * 3);
         for w in &worlds {
             params.push(&w.id);
             params.push(&w.randomnumber);
@@ -194,23 +195,20 @@ impl PgConnection {
             params.push(&w.id);
         }
 
-        self.client.query(&self.updates[num - 1], &params)?;
+        self.client
+            .query_raw(&self.statement.updates[num - 1], params)?;
         Ok(worlds)
     }
 
     fn tell_fortune(&self, buf: &mut BytesMut) -> Result<(), may_postgres::Error> {
-        let rows = self
-            .client
-            .query_raw(&self.fortune, utils::slice_iter(&[]))?;
-
-        let all_rows = rows.map(|r| r.unwrap()).collect::<Vec<_>>();
-        let mut fortunes = all_rows
-            .iter()
-            .map(|r| Fortune {
-                id: r.get(0),
-                message: r.get(1),
-            })
-            .collect::<Vec<_>>();
+        let rows = self.client.query_raw(&self.statement.fortune, [])?;
+
+        let all_rows = Vec::from_iter(rows.map(|r| r.unwrap()));
+        let mut fortunes = Vec::with_capacity(all_rows.capacity() + 1);
+        fortunes.extend(all_rows.iter().map(|r| Fortune {
+            id: r.get(0),
+            message: r.get(1),
+        }));
         fortunes.push(Fortune {
             id: 0,
             message: "Additional fortune added at request time.",
@@ -225,7 +223,7 @@ impl PgConnection {
 }
 
 struct Techempower {
-    db: Arc<PgConnection>,
+    db: PgConnection,
     rng: WyRand,
 }
 
@@ -282,16 +280,14 @@ impl HttpServiceFactory for HttpServer {
     type Service = Techempower;
 
     fn new_service(&self) -> Self::Service {
-        let (db, _idx) = self.db_pool.get_connection();
+        let db = self.db_pool.get_connection();
         let rng = WyRand::new();
         Techempower { db, rng }
     }
 }
 
 fn main() {
-    may::config()
-        .set_pool_capacity(10000)
-        .set_stack_size(0x1000);
+    may::config().set_pool_capacity(1000).set_stack_size(0x1000);
     println!("Starting http server: 127.0.0.1:8080");
     let server = HttpServer {
         db_pool: PgConnectionPool::new(