Kaynağa Gözat

ntex: various fixes (#7838)

Nikolay Kim 2 yıl önce
ebeveyn
işleme
8dcd4e8a3e

+ 1 - 2
frameworks/Rust/ntex/Cargo.toml

@@ -42,11 +42,10 @@ mimalloc = { version = "0.1.25", default-features = false }
 snmalloc-rs = { version = "0.3.3", features = ["native-cpu"] }
 yarte = { version = "0.15", features = ["bytes-buf", "json"] }
 buf-min = { version = "0.7", features = ["ntex-bytes"] }
-env_logger = "0.9"
+env_logger = "0.10"
 nanorand = { version = "0.7", default-features = false, features = ["std", "wyrand"] }
 atoi = "2.0"
 num_cpus = "1.13"
-futures = "0.3"
 smallvec = "1.6.1"
 serde = { version = "1.0", features = ["derive"] }
 serde_json = "1.0"

+ 84 - 109
frameworks/Rust/ntex/src/db.rs

@@ -1,7 +1,6 @@
 #![allow(clippy::uninit_vec)]
-use std::{borrow::Cow, cell::RefCell, fmt::Write as FmtWrite, rc::Rc};
+use std::{borrow::Cow, cell::RefCell, fmt::Write as FmtWrite};
 
-use futures::{Future, FutureExt};
 use nanorand::{Rng, WyRand};
 use ntex::util::{BufMut, Bytes, BytesMut};
 use smallvec::SmallVec;
@@ -30,7 +29,7 @@ pub struct PgConnection {
     world: Statement,
     rng: WyRand,
     updates: Vec<Statement>,
-    buf: Rc<RefCell<BytesMut>>,
+    buf: RefCell<BytesMut>,
 }
 
 impl PgConnection {
@@ -38,7 +37,9 @@ impl PgConnection {
         let (cl, conn) = connect(db_url)
             .await
             .expect("can not connect to postgresql");
-        ntex::rt::spawn(conn.map(|_| ()));
+        ntex::rt::spawn(async move {
+            let _ = conn.await;
+        });
 
         let fortune = cl.prepare("SELECT * FROM fortune").await.unwrap();
         let mut updates = Vec::new();
@@ -67,32 +68,28 @@ impl PgConnection {
             world,
             updates,
             rng: WyRand::new(),
-            buf: Rc::new(RefCell::new(BytesMut::with_capacity(65535))),
+            buf: RefCell::new(BytesMut::with_capacity(65535)),
         }
     }
 }
 
 impl PgConnection {
-    pub fn get_world(&self) -> impl Future<Output = Bytes> {
-        let buf = self.buf.clone();
+    pub async fn get_world(&self) -> Bytes {
         let random_id = (self.rng.clone().generate::<u32>() % 10_000 + 1) as i32;
-        self.cl
-            .query_one(&self.world, &[&random_id])
-            .map(move |row| {
-                let row = row.unwrap();
-                let mut body = buf.borrow_mut();
-                utils::reserve(&mut body);
-                World {
-                    id: row.get(0),
-                    randomnumber: row.get(1),
-                }
-                .to_bytes_mut(&mut *body);
-                body.split().freeze()
-            })
+
+        let row = self.cl.query_one(&self.world, &[&random_id]).await.unwrap();
+
+        let mut body = self.buf.borrow_mut();
+        utils::reserve(&mut body);
+        World {
+            id: row.get(0),
+            randomnumber: row.get(1),
+        }
+        .to_bytes_mut(&mut *body);
+        body.split().freeze()
     }
 
-    pub fn get_worlds(&self, num: usize) -> impl Future<Output = Bytes> {
-        let buf = self.buf.clone();
+    pub async fn get_worlds(&self, num: usize) -> Bytes {
         let mut rng = self.rng.clone();
         let mut queries = SmallVec::<[_; 32]>::new();
         (0..num).for_each(|_| {
@@ -100,32 +97,28 @@ impl PgConnection {
             queries.push(self.cl.query_one(&self.world, &[&w_id]));
         });
 
-        async move {
-            let mut worlds = SmallVec::<[_; 32]>::new();
-            for fut in queries {
-                let row = fut.await.unwrap();
-                worlds.push(World {
-                    id: row.get(0),
-                    randomnumber: row.get(1),
-                })
-            }
-
-            let mut body = buf.borrow_mut();
-            utils::reserve(&mut body);
-
-            body.put_u8(b'[');
-            worlds.iter().for_each(|w| {
-                w.to_bytes_mut(&mut *body);
-                body.put_u8(b',');
-            });
-            let idx = body.len() - 1;
-            body[idx] = b']';
-            body.split().freeze()
+        let mut worlds = SmallVec::<[_; 32]>::new();
+        for fut in queries {
+            let row = fut.await.unwrap();
+            worlds.push(World {
+                id: row.get(0),
+                randomnumber: row.get(1),
+            })
         }
+
+        let mut body = self.buf.borrow_mut();
+        utils::reserve(&mut body);
+        body.put_u8(b'[');
+        worlds.iter().for_each(|w| {
+            w.to_bytes_mut(&mut *body);
+            body.put_u8(b',');
+        });
+        let idx = body.len() - 1;
+        body[idx] = b']';
+        body.split().freeze()
     }
 
-    pub fn update(&self, num: usize) -> impl Future<Output = Bytes> {
-        let buf = self.buf.clone();
+    pub async fn update(&self, num: usize) -> Bytes {
         let mut rng = self.rng.clone();
         let mut queries = SmallVec::<[_; 32]>::new();
         (0..num).for_each(|_| {
@@ -133,78 +126,60 @@ impl PgConnection {
             queries.push(self.cl.query_one(&self.world, &[&w_id]));
         });
 
-        let cl = self.cl.clone();
-        let st = self.updates[num - 1].clone();
-        let base = num * 2;
-        async move {
-            let mut worlds = SmallVec::<[_; 32]>::new();
-            let mut params_data: Vec<i32> = Vec::with_capacity(num * 3);
-            unsafe {
-                params_data.set_len(num * 3);
-            }
-            for (idx, fut) in queries.into_iter().enumerate() {
-                let q = fut.await.unwrap();
-                let id = (rng.generate::<u32>() % 10_000 + 1) as i32;
-                let wid = q.get(0);
-                let randomnumber = id;
-
-                params_data[idx * 2] = wid;
-                params_data[idx * 2 + 1] = randomnumber;
-                params_data[base + idx] = wid;
-                worlds.push(World {
-                    id: wid,
-                    randomnumber,
-                });
-            }
-
-            let mut params: Vec<&dyn ToSql> = Vec::with_capacity(num as usize * 3);
-            for i in params_data.iter() {
-                params.push(i);
-            }
-            let _ = cl
-                .query(&st, &params)
-                .await;
-
-            let mut body = buf.borrow_mut();
-            utils::reserve(&mut body);
-            body.put_u8(b'[');
-            worlds.iter().for_each(|w| {
-                w.to_bytes_mut(&mut *body);
-                body.put_u8(b',');
+        let mut worlds = SmallVec::<[_; 32]>::new();
+        for fut in queries.into_iter() {
+            let row = fut.await.unwrap();
+            worlds.push(World {
+                id: row.get(0),
+                randomnumber: (rng.generate::<u32>() % 10_000 + 1) as i32,
             });
-            let idx = body.len() - 1;
-            body[idx] = b']';
-            body.split().freeze()
         }
+
+        let mut params: Vec<&dyn ToSql> = Vec::with_capacity(num * 3);
+        for w in &worlds {
+            params.push(&w.id);
+            params.push(&w.randomnumber);
+        }
+        for w in &worlds {
+            params.push(&w.id);
+        }
+        let _ = self.cl.query(&self.updates[num - 1], &params).await;
+
+        let mut body = self.buf.borrow_mut();
+        utils::reserve(&mut body);
+        body.put_u8(b'[');
+        worlds.iter().for_each(|w| {
+            w.to_bytes_mut(&mut *body);
+            body.put_u8(b',');
+        });
+        let idx = body.len() - 1;
+        body[idx] = b']';
+        body.split().freeze()
     }
 
-    pub fn tell_fortune(&self) -> impl Future<Output = Bytes> {
-        let buf = self.buf.clone();
+    pub async fn tell_fortune(&self) -> Bytes {
         let fut = self.cl.query_raw(&self.fortune, &[]);
 
-        async move {
-            let rows = fut.await.unwrap();
-            let mut fortunes: SmallVec<[_; 32]> = smallvec::smallvec![Fortune {
-                id: 0,
-                message: Cow::Borrowed("Additional fortune added at request time."),
-            }];
-
-            for row in rows {
-                fortunes.push(Fortune {
-                    id: row.get(0),
-                    message: Cow::Owned(row.get(1)),
-                });
-            }
+        let rows = fut.await.unwrap();
+        let mut fortunes: SmallVec<[_; 32]> = smallvec::smallvec![Fortune {
+            id: 0,
+            message: Cow::Borrowed("Additional fortune added at request time."),
+        }];
 
-            fortunes.sort_by(|it, next| it.message.cmp(&next.message));
+        for row in rows {
+            fortunes.push(Fortune {
+                id: row.get(0),
+                message: Cow::Owned(row.get(1)),
+            });
+        }
 
-            let mut body = std::mem::replace(&mut *buf.borrow_mut(), BytesMut::new());
-            utils::reserve(&mut body);
-            ywrite_html!(body, "{{> fortune }}");
+        fortunes.sort_by(|it, next| it.message.cmp(&next.message));
 
-            let result = body.split().freeze();
-            let _ = std::mem::replace(&mut *buf.borrow_mut(), body);
-            result
-        }
+        let mut body = std::mem::replace(&mut *self.buf.borrow_mut(), BytesMut::new());
+        utils::reserve(&mut body);
+        ywrite_html!(body, "{{> fortune }}");
+        let result = body.split().freeze();
+        let _ = std::mem::replace(&mut *self.buf.borrow_mut(), body);
+        result
     }
 }

+ 3 - 3
frameworks/Rust/ntex/src/main.rs

@@ -1,5 +1,5 @@
 #[global_allocator]
-static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
+static GLOBAL: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
 
 use ntex::http::header::{CONTENT_TYPE, SERVER};
 use ntex::{http, time::Seconds, util::BytesMut, util::PoolId, web};
@@ -50,8 +50,8 @@ async fn main() -> std::io::Result<()> {
         .backlog(1024)
         .bind("techempower", "0.0.0.0:8080", |cfg| {
             cfg.memory_pool(PoolId::P1);
-            PoolId::P1.set_read_params(65535, 1024);
-            PoolId::P1.set_write_params(65535, 1024);
+            PoolId::P1.set_read_params(65535, 2048);
+            PoolId::P1.set_write_params(65535, 2048);
 
             http::HttpService::build()
                 .keep_alive(http::KeepAlive::Os)

+ 44 - 43
frameworks/Rust/ntex/src/main_db.rs

@@ -2,9 +2,8 @@
 #[global_allocator]
 static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
 
-use std::{pin::Pin, task::Context, task::Poll};
+use std::{future::Future, pin::Pin, rc::Rc, task::Context, task::Poll};
 
-use futures::future::{ok, Future, FutureExt};
 use ntex::http::header::{CONTENT_TYPE, SERVER};
 use ntex::http::{HttpService, KeepAlive, Request, Response, StatusCode};
 use ntex::service::{Service, ServiceFactory};
@@ -14,7 +13,7 @@ use ntex::{time::Seconds, util::PoolId};
 mod db;
 mod utils;
 
-struct App(db::PgConnection);
+struct App(Rc<db::PgConnection>);
 
 impl Service<Request> for App {
     type Response = Response;
@@ -27,45 +26,47 @@ impl Service<Request> for App {
     }
 
     fn call(&self, req: Request) -> Self::Future {
-        match req.path() {
-            "/db" => Box::pin(self.0.get_world().map(|body| {
-                let mut res = HttpResponse::with_body(StatusCode::OK, body.into());
-                res.headers_mut().insert(SERVER, utils::HDR_SERVER);
-                res.headers_mut()
-                    .insert(CONTENT_TYPE, utils::HDR_JSON_CONTENT_TYPE);
-                Ok(res)
-            })),
-            "/fortunes" => Box::pin(self.0.tell_fortune().map(|body| {
-                let mut res = HttpResponse::with_body(StatusCode::OK, body.into());
-                res.headers_mut().insert(SERVER, utils::HDR_SERVER);
-                res.headers_mut()
-                    .insert(CONTENT_TYPE, utils::HDR_HTML_CONTENT_TYPE);
-                Ok(res)
-            })),
-            "/query" => Box::pin(
-                self.0
-                    .get_worlds(utils::get_query_param(req.uri().query()))
-                    .map(|worlds| {
-                        let mut res = HttpResponse::with_body(StatusCode::OK, worlds.into());
-                        res.headers_mut().insert(SERVER, utils::HDR_SERVER);
-                        res.headers_mut()
-                            .insert(CONTENT_TYPE, utils::HDR_JSON_CONTENT_TYPE);
-                        Ok(res)
-                    }),
-            ),
-            "/update" => Box::pin(
-                self.0
-                    .update(utils::get_query_param(req.uri().query()))
-                    .map(|worlds| {
-                        let mut res = HttpResponse::with_body(StatusCode::OK, worlds.into());
-                        res.headers_mut().insert(SERVER, utils::HDR_SERVER);
-                        res.headers_mut()
-                            .insert(CONTENT_TYPE, utils::HDR_JSON_CONTENT_TYPE);
-                        Ok(res)
-                    }),
-            ),
-            _ => Box::pin(ok(Response::new(StatusCode::NOT_FOUND))),
-        }
+        let db = self.0.clone();
+
+        Box::pin(async move {
+            match req.path() {
+                "/db" => {
+                    let body = db.get_world().await;
+                    let mut res = HttpResponse::with_body(StatusCode::OK, body.into());
+                    res.headers_mut().insert(SERVER, utils::HDR_SERVER);
+                    res.headers_mut()
+                        .insert(CONTENT_TYPE, utils::HDR_JSON_CONTENT_TYPE);
+                    Ok(res)
+                }
+                "/fortunes" => {
+                    let body = db.tell_fortune().await;
+                    let mut res = HttpResponse::with_body(StatusCode::OK, body.into());
+                    res.headers_mut().insert(SERVER, utils::HDR_SERVER);
+                    res.headers_mut()
+                        .insert(CONTENT_TYPE, utils::HDR_HTML_CONTENT_TYPE);
+                    Ok(res)
+                }
+                "/query" => {
+                    let worlds = db
+                        .get_worlds(utils::get_query_param(req.uri().query()))
+                        .await;
+                    let mut res = HttpResponse::with_body(StatusCode::OK, worlds.into());
+                    res.headers_mut().insert(SERVER, utils::HDR_SERVER);
+                    res.headers_mut()
+                        .insert(CONTENT_TYPE, utils::HDR_JSON_CONTENT_TYPE);
+                    Ok(res)
+                }
+                "/update" => {
+                    let worlds = db.update(utils::get_query_param(req.uri().query())).await;
+                    let mut res = HttpResponse::with_body(StatusCode::OK, worlds.into());
+                    res.headers_mut().insert(SERVER, utils::HDR_SERVER);
+                    res.headers_mut()
+                        .insert(CONTENT_TYPE, utils::HDR_JSON_CONTENT_TYPE);
+                    Ok(res)
+                }
+                _ => Ok(Response::new(StatusCode::NOT_FOUND)),
+            }
+        })
     }
 }
 
@@ -82,7 +83,7 @@ impl ServiceFactory<Request> for AppFactory {
         const DB_URL: &str =
             "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
 
-        Box::pin(async move { Ok(App(db::PgConnection::connect(DB_URL).await)) })
+        Box::pin(async move { Ok(App(Rc::new(db::PgConnection::connect(DB_URL).await))) })
     }
 }
 

+ 2 - 2
frameworks/Rust/ntex/src/main_plt.rs

@@ -79,8 +79,8 @@ async fn main() -> io::Result<()> {
         .backlog(1024)
         .bind("techempower", "0.0.0.0:8080", |cfg| {
             cfg.memory_pool(PoolId::P1);
-            PoolId::P1.set_read_params(65535, 1024);
-            PoolId::P1.set_write_params(65535, 1024);
+            PoolId::P1.set_read_params(65535, 2048);
+            PoolId::P1.set_write_params(65535, 2048);
 
             fn_service(|io| App {
                 io,