Browse Source

ntex: various optimizations (#7688)

* ntex: various optimizations

* use query_one
Nikolay Kim 2 years ago
parent
commit
829ecd45c9

+ 1 - 2
frameworks/Rust/ntex/Cargo.toml

@@ -41,14 +41,13 @@ ntex = "0.5.28"
 mimalloc = { version = "0.1.25", default-features = false }
 snmalloc-rs = { version = "0.3.3", features = ["native-cpu"] }
 yarte = { version = "0.15", features = ["bytes-buf", "json"] }
+buf-min = { version = "0.7", features = ["ntex-bytes"] }
 env_logger = "0.9"
 nanorand = { version = "0.7", default-features = false, features = ["std", "wyrand"] }
 atoi = "2.0"
 num_cpus = "1.13"
 futures = "0.3"
 smallvec = "1.6.1"
-simd-json = "0.6.0"
-simd-json-derive = "0.6.3"
 serde = { version = "1.0", features = ["derive"] }
 serde_json = "1.0"
 log = { version = "0.4", features = ["release_max_level_off"] }

+ 72 - 54
frameworks/Rust/ntex/src/db.rs

@@ -2,24 +2,19 @@ use std::{borrow::Cow, fmt::Write as FmtWrite};
 
 use futures::{Future, FutureExt};
 use nanorand::{Rng, WyRand};
-use ntex::util::{join_all, Bytes, BytesMut};
+use ntex::util::{BufMut, Bytes, BytesMut};
 use smallvec::SmallVec;
 use tokio_postgres::types::ToSql;
 use tokio_postgres::{connect, Client, Statement};
 use yarte::{ywrite_html, Serialize};
 
-#[cfg(target_os = "macos")]
-use serde_json as simd_json;
-
-use crate::utils::Writer;
-
-#[derive(Copy, Clone, Serialize, Debug, serde::Serialize)]
+#[derive(Copy, Clone, Serialize, Debug)]
 pub struct World {
     pub id: i32,
     pub randomnumber: i32,
 }
 
-#[derive(serde::Serialize, Debug)]
+#[derive(Serialize, Debug)]
 pub struct Fortune {
     pub id: i32,
     pub message: Cow<'static, str>,
@@ -75,77 +70,100 @@ impl PgConnection {
 impl PgConnection {
     pub fn get_world(&self) -> impl Future<Output = Bytes> {
         let random_id = (self.rng.clone().generate::<u32>() % 10_000 + 1) as i32;
-        self.cl.query(&self.world, &[&random_id]).map(|rows| {
-            let rows = rows.unwrap();
-            let mut body = BytesMut::new();
-            simd_json::to_writer(
-                Writer(&mut body),
-                &World {
-                    id: rows[0].get(0),
-                    randomnumber: rows[0].get(1),
-                },
-            )
-            .unwrap();
+        self.cl.query_one(&self.world, &[&random_id]).map(|row| {
+            let row = row.unwrap();
+            let mut body = BytesMut::with_capacity(64);
+            World {
+                id: row.get(0),
+                randomnumber: row.get(1),
+            }
+            .to_bytes_mut(&mut body);
             body.freeze()
         })
     }
 
-    pub fn get_worlds(&self, num: u16) -> impl Future<Output = Vec<World>> {
-        let mut futs = Vec::with_capacity(num as usize);
+    pub fn get_worlds(&self, num: usize) -> impl Future<Output = Bytes> {
+        let mut futs = Vec::with_capacity(num);
         let mut rng = self.rng.clone();
         for _ in 0..num {
             let w_id = (rng.generate::<u32>() % 10_000 + 1) as i32;
-            futs.push(self.cl.query(&self.world, &[&w_id]));
+            futs.push(self.cl.query_one(&self.world, &[&w_id]));
         }
 
         async move {
-            let mut worlds: Vec<World> = Vec::with_capacity(num as usize);
-            for item in join_all(futs).await {
-                let rows = item.unwrap();
+            let mut worlds = Vec::with_capacity(num);
+            for fut in futs {
+                let row = fut.await.unwrap();
                 worlds.push(World {
-                    id: rows[0].get(0),
-                    randomnumber: rows[0].get(1),
+                    id: row.get(0),
+                    randomnumber: row.get(1),
                 })
             }
-            worlds
+
+            let mut buf = BytesMut::with_capacity(48 * num);
+            buf.put_u8(b'[');
+            worlds.iter().for_each(|w| {
+                w.to_bytes_mut(&mut buf);
+                buf.put_u8(b',');
+            });
+            let idx = buf.len() - 1;
+            buf[idx] = b']';
+            buf.freeze()
         }
     }
 
-    pub fn update(&self, num: u16) -> impl Future<Output = Vec<World>> {
-        let mut futs = Vec::with_capacity(num as usize);
+    pub fn update(&self, num: usize) -> impl Future<Output = Bytes> {
+        let mut futs = Vec::with_capacity(num);
         let mut rng = self.rng.clone();
         for _ in 0..num {
             let w_id = (rng.generate::<u32>() % 10_000 + 1) as i32;
-            futs.push(self.cl.query(&self.world, &[&w_id]));
+            futs.push(self.cl.query_one(&self.world, &[&w_id]));
         }
 
         let cl = self.cl.clone();
-        let st = self.updates[(num as usize) - 1].clone();
+        let st = self.updates[num - 1].clone();
+        let base = num * 2;
         async move {
-            let mut worlds: Vec<World> = Vec::with_capacity(num as usize);
-            for q in join_all(futs).await {
-                let q = q.unwrap();
-                let id = (rng.generate::<u32>() % 10_000 + 1) as i32;
-                worlds.push(World {
-                    id: q[0].get(0),
-                    randomnumber: id,
-                })
+            let mut worlds = Vec::with_capacity(num);
+            let mut params_data: Vec<i32> = Vec::with_capacity(num * 3);
+            unsafe {
+                params_data.set_len(num * 3);
             }
+            for (idx, fut) in futs.into_iter().enumerate() {
+                let q = fut.await.unwrap();
+                let id = (rng.generate::<u32>() % 10_000 + 1) as i32;
+                let wid = q.get(0);
+                let randomnumber = id;
 
-            let mut params: Vec<&dyn ToSql> = Vec::with_capacity(num as usize * 3);
-            for w in &worlds {
-                params.push(&w.id);
-                params.push(&w.randomnumber);
-            }
-            for w in &worlds {
-                params.push(&w.id);
+                params_data[idx * 2] = wid;
+                params_data[idx * 2 + 1] = randomnumber;
+                params_data[base + idx] = wid;
+                worlds.push(World {
+                    id: wid,
+                    randomnumber,
+                });
             }
-            let _ = cl
-                .query(&st, &params)
-                .await
-                .map_err(|e| log::error!("{:?}", e));
 
-            worlds
+            ntex::rt::spawn(async move {
+                let mut params: Vec<&dyn ToSql> = Vec::with_capacity(num as usize * 3);
+                for i in params_data.iter() {
+                    params.push(i);
+                }
+                let _ = cl
+                    .query(&st, &params)
+                    .await
+                    .map_err(|e| log::error!("{:?}", e));
+            });
+
+            let mut buf = BytesMut::with_capacity(48 * num);
+            buf.put_u8(b'[');
+            worlds.iter().for_each(|w| {
+                w.to_bytes_mut(&mut buf);
+                buf.put_u8(b',');
+            });
+            let idx = buf.len() - 1;
+            buf[idx] = b']';
+            buf.freeze()
         }
     }
 
@@ -168,10 +186,10 @@ impl PgConnection {
 
             fortunes.sort_by(|it, next| it.message.cmp(&next.message));
 
-            let mut buf = Vec::with_capacity(2048);
+            let mut buf = BytesMut::with_capacity(2048);
             ywrite_html!(buf, "{{> fortune }}");
 
-            Bytes::from(buf)
+            buf.freeze()
         }
     }
 }

+ 4 - 4
frameworks/Rust/ntex/src/main.rs

@@ -2,7 +2,7 @@
 static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
 
 use ntex::http::header::{CONTENT_TYPE, SERVER};
-use ntex::{http, time::Seconds, util::PoolId, web};
+use ntex::{http, time::Seconds, util::BytesMut, util::PoolId, web};
 use yarte::Serialize;
 
 mod utils;
@@ -14,7 +14,7 @@ pub struct Message {
 
 #[web::get("/json")]
 async fn json() -> web::HttpResponse {
-    let mut body = Vec::with_capacity(utils::SIZE);
+    let mut body = BytesMut::with_capacity(utils::SIZE);
     Message {
         message: "Hello, World!",
     }
@@ -50,8 +50,8 @@ async fn main() -> std::io::Result<()> {
         .backlog(1024)
         .bind("techempower", "0.0.0.0:8080", |cfg| {
             cfg.memory_pool(PoolId::P1);
-            PoolId::P1.set_read_params(65535, 8192);
-            PoolId::P1.set_write_params(65535, 8192);
+            PoolId::P1.set_read_params(65535, 1024);
+            PoolId::P1.set_write_params(65535, 1024);
 
             http::HttpService::build()
                 .keep_alive(http::KeepAlive::Os)

+ 5 - 12
frameworks/Rust/ntex/src/main_db.rs

@@ -9,10 +9,7 @@ use ntex::http::header::{CONTENT_TYPE, SERVER};
 use ntex::http::{HttpService, KeepAlive, Request, Response, StatusCode};
 use ntex::service::{Service, ServiceFactory};
 use ntex::web::{Error, HttpResponse};
-use ntex::{time::Seconds, util::BytesMut, util::PoolId};
-
-#[cfg(target_os = "macos")]
-use serde_json as simd_json;
+use ntex::{time::Seconds, util::PoolId};
 
 mod db;
 mod utils;
@@ -49,9 +46,7 @@ impl Service<Request> for App {
                 self.0
                     .get_worlds(utils::get_query_param(req.uri().query()))
                     .map(|worlds| {
-                        let mut body = BytesMut::with_capacity(35 * worlds.len());
-                        let _ = simd_json::to_writer(crate::utils::Writer(&mut body), &worlds);
-                        let mut res = HttpResponse::with_body(StatusCode::OK, body.into());
+                        let mut res = HttpResponse::with_body(StatusCode::OK, worlds.into());
                         res.headers_mut().append(SERVER, utils::HDR_SERVER);
                         res.headers_mut()
                             .append(CONTENT_TYPE, utils::HDR_JSON_CONTENT_TYPE);
@@ -62,9 +57,7 @@ impl Service<Request> for App {
                 self.0
                     .update(utils::get_query_param(req.uri().query()))
                     .map(|worlds| {
-                        let mut body = BytesMut::with_capacity(35 * worlds.len());
-                        let _ = simd_json::to_writer(crate::utils::Writer(&mut body), &worlds);
-                        let mut res = HttpResponse::with_body(StatusCode::OK, body.into());
+                        let mut res = HttpResponse::with_body(StatusCode::OK, worlds.into());
                         res.headers_mut().append(SERVER, utils::HDR_SERVER);
                         res.headers_mut()
                             .append(CONTENT_TYPE, utils::HDR_JSON_CONTENT_TYPE);
@@ -101,8 +94,8 @@ async fn main() -> std::io::Result<()> {
         .backlog(1024)
         .bind("techempower", "0.0.0.0:8080", |cfg| {
             cfg.memory_pool(PoolId::P1);
-            PoolId::P1.set_read_params(65535, 8192);
-            PoolId::P1.set_write_params(65535, 8192);
+            PoolId::P1.set_read_params(65535, 2048);
+            PoolId::P1.set_write_params(65535, 2048);
 
             HttpService::build()
                 .keep_alive(KeepAlive::Os)

+ 10 - 12
frameworks/Rust/ntex/src/main_plt.rs

@@ -5,10 +5,9 @@ use std::{future::Future, io, pin::Pin, task::Context, task::Poll};
 use ntex::{
     fn_service, http::h1, io::Io, io::RecvError, util::ready, util::BufMut, util::PoolId,
 };
-mod utils;
+use yarte::Serialize;
 
-#[cfg(target_os = "macos")]
-use serde_json as simd_json;
+mod utils;
 
 const JSON: &[u8] =
     b"HTTP/1.1 200 OK\r\nServer: N\r\nContent-Type: application/json\r\nContent-Length: 27\r\n";
@@ -18,7 +17,7 @@ const HTTPNFOUND: &[u8] = b"HTTP/1.1 400 OK\r\n";
 const HDR_SERVER: &[u8] = b"Server: N\r\n";
 const BODY: &[u8] = b"Hello, World!";
 
-#[derive(serde::Serialize)]
+#[derive(Serialize)]
 pub struct Message {
     pub message: &'static str,
 }
@@ -48,12 +47,11 @@ impl Future for App {
                                 "/json" => {
                                     buf.extend_from_slice(JSON);
                                     this.codec.set_date_header(buf);
-                                    let _ = simd_json::to_writer(
-                                        crate::utils::Writer(buf),
-                                        &Message {
-                                            message: "Hello, World!",
-                                        },
-                                    );
+
+                                    Message {
+                                        message: "Hello, World!",
+                                    }
+                                    .to_bytes_mut(buf);
                                 }
                                 "/plaintext" => {
                                     buf.extend_from_slice(PLAIN);
@@ -88,8 +86,8 @@ async fn main() -> io::Result<()> {
         .backlog(1024)
         .bind("techempower", "0.0.0.0:8080", |cfg| {
             cfg.memory_pool(PoolId::P1);
-            PoolId::P1.set_read_params(65535, 8192);
-            PoolId::P1.set_write_params(65535, 8192);
+            PoolId::P1.set_read_params(65535, 1024);
+            PoolId::P1.set_write_params(65535, 1024);
 
             fn_service(|io| App {
                 io,

+ 4 - 17
frameworks/Rust/ntex/src/utils.rs

@@ -1,9 +1,8 @@
 #![allow(dead_code)]
-use std::{cmp, io};
+use std::cmp;
 
 use atoi::FromRadix10;
-use ntex::http::header::HeaderValue;
-use ntex::util::{BufMut, Bytes, BytesMut};
+use ntex::{http::header::HeaderValue, util::Bytes};
 
 pub const HDR_SERVER: HeaderValue = HeaderValue::from_static("N");
 pub const HDR_JSON_CONTENT_TYPE: HeaderValue = HeaderValue::from_static("application/json");
@@ -14,24 +13,12 @@ pub const BODY_PLAIN_TEXT: Bytes = Bytes::from_static(b"Hello, World!");
 
 pub const SIZE: usize = 27;
 
-pub fn get_query_param(query: Option<&str>) -> u16 {
+pub fn get_query_param(query: Option<&str>) -> usize {
     let query = query.unwrap_or("");
     let q = if let Some(pos) = query.find('q') {
         u16::from_radix_10(query.split_at(pos + 2).1.as_ref()).0
     } else {
         1
     };
-    cmp::min(500, cmp::max(1, q))
-}
-
-pub struct Writer<'a>(pub &'a mut BytesMut);
-
-impl<'a> io::Write for Writer<'a> {
-    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
-        self.0.put_slice(buf);
-        Ok(buf.len())
-    }
-    fn flush(&mut self) -> io::Result<()> {
-        Ok(())
-    }
+    cmp::min(500, cmp::max(1, q) as usize)
 }