Nikolay Kim 4 лет назад
Родитель
Сommit
7b8a12062d

+ 2 - 2
frameworks/Rust/ntex/Cargo.toml

@@ -16,12 +16,12 @@ name = "ntex-raw"
 path = "src/main_raw.rs"
 
 [dependencies]
-ntex = "0.3.12"
+ntex = "0.3.15"
 mimalloc = { version = "0.1.25", default-features = false }
 snmalloc-rs = { version = "0.2.26", features = ["1mib", "native-cpu"] }
 yarte = { version = "0.15", features = ["bytes-buf", "json"] }
 env_logger = "0.8"
-random-fast-rng = "0.1.1"
+nanorand = { version = "0.5", default-features = false, features = ["std", "wyrand"] }
 bytes = "1.0"
 atoi = "0.4.0"
 num_cpus = "1.13"

+ 26 - 19
frameworks/Rust/ntex/src/db.rs

@@ -1,8 +1,8 @@
 use std::{borrow::Cow, cell::RefCell, fmt::Write as FmtWrite};
 
 use bytes::{Bytes, BytesMut};
-use futures::{stream::futures_unordered::FuturesUnordered, Future, FutureExt, StreamExt};
-use random_fast_rng::{FastRng, Random};
+use futures::{Future, FutureExt};
+use nanorand::{WyRand, RNG};
 use smallvec::SmallVec;
 use tokio_postgres::types::ToSql;
 use tokio_postgres::{connect, Client, NoTls, Statement};
@@ -25,7 +25,7 @@ pub struct PgConnection {
     cl: Client,
     fortune: Statement,
     world: Statement,
-    rng: RefCell<FastRng>,
+    rng: RefCell<WyRand>,
     updates: Vec<Statement>,
 }
 
@@ -62,14 +62,14 @@ impl PgConnection {
             fortune,
             world,
             updates,
-            rng: RefCell::new(FastRng::new()),
+            rng: RefCell::new(WyRand::new()),
         }
     }
 }
 
 impl PgConnection {
     pub fn get_world(&self) -> impl Future<Output = Bytes> {
-        let random_id = (self.rng.borrow_mut().get_u32() % 10_000 + 1) as i32;
+        let random_id = (self.rng.borrow_mut().generate::<u32>() % 10_000 + 1) as i32;
         self.cl.query(&self.world, &[&random_id]).map(|rows| {
             let rows = rows.unwrap();
             World {
@@ -81,29 +81,33 @@ impl PgConnection {
     }
 
     pub fn get_worlds(&self, num: u16) -> impl Future<Output = Vec<World>> {
-        let worlds = FuturesUnordered::new();
+        let mut futs = Vec::with_capacity(num as usize);
         let mut rng = self.rng.borrow_mut();
         for _ in 0..num {
-            let w_id = (rng.get_u32() % 10_000 + 1) as i32;
-            worlds.push(self.cl.query(&self.world, &[&w_id]).map(|res| {
-                let rows = res.unwrap();
-                World {
+            let w_id = (rng.generate::<u32>() % 10_000 + 1) as i32;
+            futs.push(self.cl.query(&self.world, &[&w_id]));
+        }
+
+        async move {
+            let mut worlds: Vec<World> = Vec::with_capacity(num as usize);
+            for q in futs {
+                let rows = q.await.unwrap();
+                worlds.push(World {
                     id: rows[0].get(0),
                     randomnumber: rows[0].get(1),
-                }
-            }));
+                })
+            }
+            worlds
         }
-
-        worlds.collect()
     }
 
     pub fn update(&self, num: u16) -> impl Future<Output = Vec<World>> {
-        let worlds = FuturesUnordered::new();
+        let mut futs = Vec::with_capacity(num as usize);
         let mut rng = self.rng.borrow_mut();
         for _ in 0..num {
-            let id = (rng.get_u32() % 10_000 + 1) as i32;
-            let w_id = (rng.get_u32() % 10_000 + 1) as i32;
-            worlds.push(self.cl.query(&self.world, &[&w_id]).map(move |res| {
+            let id = (rng.generate::<u32>() % 10_000 + 1) as i32;
+            let w_id = (rng.generate::<u32>() % 10_000 + 1) as i32;
+            futs.push(self.cl.query(&self.world, &[&w_id]).map(move |res| {
                 let rows = res.unwrap();
                 World {
                     id: rows[0].get(0),
@@ -115,7 +119,10 @@ impl PgConnection {
         let cl = self.cl.clone();
         let st = self.updates[(num as usize) - 1].clone();
         async move {
-            let worlds: Vec<World> = worlds.collect().await;
+            let mut worlds: Vec<World> = Vec::with_capacity(num as usize);
+            for q in futs {
+                worlds.push(q.await);
+            }
 
             let mut params: Vec<&dyn ToSql> = Vec::with_capacity(num as usize * 3);
             for w in &worlds {

+ 1 - 1
frameworks/Rust/ntex/src/main.rs

@@ -1,5 +1,5 @@
 #[global_allocator]
-static GLOBAL: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
+static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
 
 use bytes::Bytes;
 use ntex::http::header::{HeaderValue, CONTENT_TYPE, SERVER};

+ 29 - 8
frameworks/Rust/ntex/src/main_db.rs

@@ -1,7 +1,7 @@
 #[global_allocator]
 static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
 
-use std::{pin::Pin, task::Context, task::Poll};
+use std::{cell::Cell, pin::Pin, task::Context, task::Poll};
 
 use bytes::BytesMut;
 use futures::future::{ok, Future, FutureExt};
@@ -14,7 +14,23 @@ use yarte::Serialize;
 mod db;
 mod utils;
 
-struct App(db::PgConnection);
+struct App {
+    c1: db::PgConnection,
+    c2: db::PgConnection,
+    next: Cell<bool>,
+}
+
+impl App {
+    fn get_db(&self) -> &db::PgConnection {
+        if self.next.get() {
+            self.next.set(!self.next.get());
+            &self.c1
+        } else {
+            self.next.set(!self.next.get());
+            &self.c2
+        }
+    }
+}
 
 impl Service for App {
     type Request = Request;
@@ -29,13 +45,13 @@ impl Service for App {
 
     fn call(&self, req: Request) -> Self::Future {
         match req.path() {
-            "/db" => Box::pin(self.0.get_world().map(|body| {
+            "/db" => Box::pin(self.get_db().get_world().map(|body| {
                 Ok(HttpResponse::Ok()
                     .header(SERVER, HeaderValue::from_static("N"))
                     .header(CONTENT_TYPE, HeaderValue::from_static("application/json"))
                     .body(body))
             })),
-            "/fortunes" => Box::pin(self.0.tell_fortune().map(|body| {
+            "/fortunes" => Box::pin(self.get_db().tell_fortune().map(|body| {
                 Ok(HttpResponse::Ok()
                     .header(SERVER, HeaderValue::from_static("N"))
                     .header(
@@ -45,7 +61,7 @@ impl Service for App {
                     .body(body))
             })),
             "/query" => Box::pin(
-                self.0
+                self.get_db()
                     .get_worlds(utils::get_query_param(req.uri().query()))
                     .map(|worlds| {
                         Ok(HttpResponse::Ok()
@@ -55,7 +71,7 @@ impl Service for App {
                     }),
             ),
             "/update" => Box::pin(
-                self.0
+                self.get_db()
                     .update(utils::get_query_param(req.uri().query()))
                     .map(|worlds| {
                         Ok(HttpResponse::Ok()
@@ -84,7 +100,13 @@ impl ServiceFactory for AppFactory {
         const DB_URL: &str =
             "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
 
-        Box::pin(async move { Ok(App(db::PgConnection::connect(DB_URL).await)) })
+        Box::pin(async move {
+            Ok(App {
+                next: Cell::new(true),
+                c1: db::PgConnection::connect(DB_URL).await,
+                c2: db::PgConnection::connect(DB_URL).await,
+            })
+        })
     }
 }
 
@@ -103,7 +125,6 @@ async fn main() -> std::io::Result<()> {
                 .h1(AppFactory)
                 .tcp()
         })?
-        .workers((num_cpus::get() as f32 * 1.2) as usize)
         .start()
         .await
 }