Browse Source

ntex optimizations (#7741)

* ntex db bench optimizations

* fix
Nikolay Kim 2 years ago
parent
commit
b05202e3ad

+ 1 - 1
frameworks/Rust/ntex/Cargo.toml

@@ -37,7 +37,7 @@ tokio = ["ntex/tokio"]
 async-std = ["ntex/async-std"]
 
 [dependencies]
-ntex = "0.5.28"
+ntex = "0.5.30"
 mimalloc = { version = "0.1.25", default-features = false }
 snmalloc-rs = { version = "0.3.3", features = ["native-cpu"] }
 yarte = { version = "0.15", features = ["bytes-buf", "json"] }

+ 65 - 50
frameworks/Rust/ntex/src/db.rs

@@ -1,4 +1,5 @@
-use std::{borrow::Cow, fmt::Write as FmtWrite};
+#![allow(clippy::uninit_vec)]
+use std::{borrow::Cow, cell::RefCell, fmt::Write as FmtWrite, rc::Rc};
 
 use futures::{Future, FutureExt};
 use nanorand::{Rng, WyRand};
@@ -8,6 +9,8 @@ use tokio_postgres::types::ToSql;
 use tokio_postgres::{connect, Client, Statement};
 use yarte::{ywrite_html, Serialize};
 
+use super::utils;
+
 #[derive(Copy, Clone, Serialize, Debug)]
 pub struct World {
     pub id: i32,
@@ -27,6 +30,7 @@ pub struct PgConnection {
     world: Statement,
     rng: WyRand,
     updates: Vec<Statement>,
+    buf: Rc<RefCell<BytesMut>>,
 }
 
 impl PgConnection {
@@ -63,36 +67,42 @@ impl PgConnection {
             world,
             updates,
             rng: WyRand::new(),
+            buf: Rc::new(RefCell::new(BytesMut::with_capacity(65535))),
         }
     }
 }
 
 impl PgConnection {
     pub fn get_world(&self) -> impl Future<Output = Bytes> {
+        let buf = self.buf.clone();
         let random_id = (self.rng.clone().generate::<u32>() % 10_000 + 1) as i32;
-        self.cl.query_one(&self.world, &[&random_id]).map(|row| {
-            let row = row.unwrap();
-            let mut body = BytesMut::with_capacity(64);
-            World {
-                id: row.get(0),
-                randomnumber: row.get(1),
-            }
-            .to_bytes_mut(&mut body);
-            body.freeze()
-        })
+        self.cl
+            .query_one(&self.world, &[&random_id])
+            .map(move |row| {
+                let row = row.unwrap();
+                let mut body = buf.borrow_mut();
+                utils::reserve(&mut body);
+                World {
+                    id: row.get(0),
+                    randomnumber: row.get(1),
+                }
+                .to_bytes_mut(&mut *body);
+                body.split().freeze()
+            })
     }
 
     pub fn get_worlds(&self, num: usize) -> impl Future<Output = Bytes> {
-        let mut futs = Vec::with_capacity(num);
+        let buf = self.buf.clone();
         let mut rng = self.rng.clone();
-        for _ in 0..num {
+        let mut queries = SmallVec::<[_; 32]>::new();
+        (0..num).for_each(|_| {
             let w_id = (rng.generate::<u32>() % 10_000 + 1) as i32;
-            futs.push(self.cl.query_one(&self.world, &[&w_id]));
-        }
+            queries.push(self.cl.query_one(&self.world, &[&w_id]));
+        });
 
         async move {
-            let mut worlds = Vec::with_capacity(num);
-            for fut in futs {
+            let mut worlds = SmallVec::<[_; 32]>::new();
+            for fut in queries {
                 let row = fut.await.unwrap();
                 worlds.push(World {
                     id: row.get(0),
@@ -100,36 +110,39 @@ impl PgConnection {
                 })
             }
 
-            let mut buf = BytesMut::with_capacity(48 * num);
-            buf.put_u8(b'[');
+            let mut body = buf.borrow_mut();
+            utils::reserve(&mut body);
+
+            body.put_u8(b'[');
             worlds.iter().for_each(|w| {
-                w.to_bytes_mut(&mut buf);
-                buf.put_u8(b',');
+                w.to_bytes_mut(&mut *body);
+                body.put_u8(b',');
             });
-            let idx = buf.len() - 1;
-            buf[idx] = b']';
-            buf.freeze()
+            let idx = body.len() - 1;
+            body[idx] = b']';
+            body.split().freeze()
         }
     }
 
     pub fn update(&self, num: usize) -> impl Future<Output = Bytes> {
-        let mut futs = Vec::with_capacity(num);
+        let buf = self.buf.clone();
         let mut rng = self.rng.clone();
-        for _ in 0..num {
+        let mut queries = SmallVec::<[_; 32]>::new();
+        (0..num).for_each(|_| {
             let w_id = (rng.generate::<u32>() % 10_000 + 1) as i32;
-            futs.push(self.cl.query_one(&self.world, &[&w_id]));
-        }
+            queries.push(self.cl.query_one(&self.world, &[&w_id]));
+        });
 
         let cl = self.cl.clone();
         let st = self.updates[num - 1].clone();
         let base = num * 2;
         async move {
-            let mut worlds = Vec::with_capacity(num);
+            let mut worlds = SmallVec::<[_; 32]>::new();
             let mut params_data: Vec<i32> = Vec::with_capacity(num * 3);
             unsafe {
                 params_data.set_len(num * 3);
             }
-            for (idx, fut) in futs.into_iter().enumerate() {
+            for (idx, fut) in queries.into_iter().enumerate() {
                 let q = fut.await.unwrap();
                 let id = (rng.generate::<u32>() % 10_000 + 1) as i32;
                 let wid = q.get(0);
@@ -144,30 +157,29 @@ impl PgConnection {
                 });
             }
 
-            ntex::rt::spawn(async move {
-                let mut params: Vec<&dyn ToSql> = Vec::with_capacity(num as usize * 3);
-                for i in params_data.iter() {
-                    params.push(i);
-                }
-                let _ = cl
-                    .query(&st, &params)
-                    .await
-                    .map_err(|e| log::error!("{:?}", e));
-            });
+            let mut params: Vec<&dyn ToSql> = Vec::with_capacity(num as usize * 3);
+            for i in params_data.iter() {
+                params.push(i);
+            }
+            let _ = cl
+                .query(&st, &params)
+                .await;
 
-            let mut buf = BytesMut::with_capacity(48 * num);
-            buf.put_u8(b'[');
+            let mut body = buf.borrow_mut();
+            utils::reserve(&mut body);
+            body.put_u8(b'[');
             worlds.iter().for_each(|w| {
-                w.to_bytes_mut(&mut buf);
-                buf.put_u8(b',');
+                w.to_bytes_mut(&mut *body);
+                body.put_u8(b',');
             });
-            let idx = buf.len() - 1;
-            buf[idx] = b']';
-            buf.freeze()
+            let idx = body.len() - 1;
+            body[idx] = b']';
+            body.split().freeze()
         }
     }
 
     pub fn tell_fortune(&self) -> impl Future<Output = Bytes> {
+        let buf = self.buf.clone();
         let fut = self.cl.query_raw(&self.fortune, &[]);
 
         async move {
@@ -186,10 +198,13 @@ impl PgConnection {
 
             fortunes.sort_by(|it, next| it.message.cmp(&next.message));
 
-            let mut buf = BytesMut::with_capacity(2048);
-            ywrite_html!(buf, "{{> fortune }}");
+            let mut body = std::mem::replace(&mut *buf.borrow_mut(), BytesMut::new());
+            utils::reserve(&mut body);
+            ywrite_html!(body, "{{> fortune }}");
 
-            buf.freeze()
+            let result = body.split().freeze();
+            let _ = std::mem::replace(&mut *buf.borrow_mut(), body);
+            result
         }
     }
 }

+ 4 - 4
frameworks/Rust/ntex/src/main.rs

@@ -21,10 +21,10 @@ async fn json() -> web::HttpResponse {
     .to_bytes_mut(&mut body);
 
     let mut response = web::HttpResponse::with_body(http::StatusCode::OK, body.into());
-    response.headers_mut().append(SERVER, utils::HDR_SERVER);
+    response.headers_mut().insert(SERVER, utils::HDR_SERVER);
     response
         .headers_mut()
-        .append(CONTENT_TYPE, utils::HDR_JSON_CONTENT_TYPE);
+        .insert(CONTENT_TYPE, utils::HDR_JSON_CONTENT_TYPE);
     response
 }
 
@@ -34,10 +34,10 @@ async fn plaintext() -> web::HttpResponse {
         http::StatusCode::OK,
         http::body::Body::Bytes(utils::BODY_PLAIN_TEXT),
     );
-    response.headers_mut().append(SERVER, utils::HDR_SERVER);
+    response.headers_mut().insert(SERVER, utils::HDR_SERVER);
     response
         .headers_mut()
-        .append(CONTENT_TYPE, utils::HDR_TEXT_CONTENT_TYPE);
+        .insert(CONTENT_TYPE, utils::HDR_TEXT_CONTENT_TYPE);
     response
 }
 

+ 8 - 8
frameworks/Rust/ntex/src/main_db.rs

@@ -30,16 +30,16 @@ impl Service<Request> for App {
         match req.path() {
             "/db" => Box::pin(self.0.get_world().map(|body| {
                 let mut res = HttpResponse::with_body(StatusCode::OK, body.into());
-                res.headers_mut().append(SERVER, utils::HDR_SERVER);
+                res.headers_mut().insert(SERVER, utils::HDR_SERVER);
                 res.headers_mut()
-                    .append(CONTENT_TYPE, utils::HDR_JSON_CONTENT_TYPE);
+                    .insert(CONTENT_TYPE, utils::HDR_JSON_CONTENT_TYPE);
                 Ok(res)
             })),
             "/fortunes" => Box::pin(self.0.tell_fortune().map(|body| {
                 let mut res = HttpResponse::with_body(StatusCode::OK, body.into());
-                res.headers_mut().append(SERVER, utils::HDR_SERVER);
+                res.headers_mut().insert(SERVER, utils::HDR_SERVER);
                 res.headers_mut()
-                    .append(CONTENT_TYPE, utils::HDR_HTML_CONTENT_TYPE);
+                    .insert(CONTENT_TYPE, utils::HDR_HTML_CONTENT_TYPE);
                 Ok(res)
             })),
             "/query" => Box::pin(
@@ -47,9 +47,9 @@ impl Service<Request> for App {
                     .get_worlds(utils::get_query_param(req.uri().query()))
                     .map(|worlds| {
                         let mut res = HttpResponse::with_body(StatusCode::OK, worlds.into());
-                        res.headers_mut().append(SERVER, utils::HDR_SERVER);
+                        res.headers_mut().insert(SERVER, utils::HDR_SERVER);
                         res.headers_mut()
-                            .append(CONTENT_TYPE, utils::HDR_JSON_CONTENT_TYPE);
+                            .insert(CONTENT_TYPE, utils::HDR_JSON_CONTENT_TYPE);
                         Ok(res)
                     }),
             ),
@@ -58,9 +58,9 @@ impl Service<Request> for App {
                     .update(utils::get_query_param(req.uri().query()))
                     .map(|worlds| {
                         let mut res = HttpResponse::with_body(StatusCode::OK, worlds.into());
-                        res.headers_mut().append(SERVER, utils::HDR_SERVER);
+                        res.headers_mut().insert(SERVER, utils::HDR_SERVER);
                         res.headers_mut()
-                            .append(CONTENT_TYPE, utils::HDR_JSON_CONTENT_TYPE);
+                            .insert(CONTENT_TYPE, utils::HDR_JSON_CONTENT_TYPE);
                         Ok(res)
                     }),
             ),

+ 2 - 9
frameworks/Rust/ntex/src/main_plt.rs

@@ -2,9 +2,7 @@
 static GLOBAL: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
 use std::{future::Future, io, pin::Pin, task::Context, task::Poll};
 
-use ntex::{
-    fn_service, http::h1, io::Io, io::RecvError, util::ready, util::BufMut, util::PoolId,
-};
+use ntex::{fn_service, http::h1, io::Io, io::RecvError, util::ready, util::PoolId};
 use yarte::Serialize;
 
 mod utils;
@@ -37,12 +35,7 @@ impl Future for App {
                 Ok((req, _)) => {
                     let _ = this.io.with_write_buf(|buf| {
                         buf.with_bytes_mut(|buf| {
-                            // make sure we've got room
-                            let remaining = buf.remaining_mut();
-                            if remaining < 1024 {
-                                buf.reserve(65535 - remaining);
-                            }
-
+                            utils::reserve(buf);
                             match req.path() {
                                 "/json" => {
                                     buf.extend_from_slice(JSON);

+ 8 - 1
frameworks/Rust/ntex/src/utils.rs

@@ -2,7 +2,7 @@
 use std::cmp;
 
 use atoi::FromRadix10;
-use ntex::{http::header::HeaderValue, util::Bytes};
+use ntex::{http::header::HeaderValue, util::BufMut, util::Bytes, util::BytesMut};
 
 pub const HDR_SERVER: HeaderValue = HeaderValue::from_static("N");
 pub const HDR_JSON_CONTENT_TYPE: HeaderValue = HeaderValue::from_static("application/json");
@@ -22,3 +22,10 @@ pub fn get_query_param(query: Option<&str>) -> usize {
     };
     cmp::min(500, cmp::max(1, q) as usize)
 }
+
+pub fn reserve(buf: &mut BytesMut) {
+    let remaining = buf.remaining_mut();
+    if remaining < 1024 {
+        buf.reserve(65535 - remaining);
+    }
+}