Browse Source

ntex optimizations (#9406)

* cpu affinity

* ntex optimizations

* remove smallvec

* wip
Nikolay Kim 8 months ago
parent
commit
d078293eff

+ 10 - 8
frameworks/Rust/ntex/Cargo.toml

@@ -34,11 +34,11 @@ default = []
 tokio = ["ntex/tokio"]
 
 # compio runtime
-compio = ["ntex/compio", ]
+compio = ["ntex/compio"]
 
 [dependencies]
-ntex = "2.4"
-ntex-compio = "0.1.2"
+ntex = "2.8"
+ntex-compio = "0.2"
 ntex-bytes = { version = "0.1.21", features=["simd"] }
 mimalloc = { version = "0.1.25", default-features = false }
 snmalloc-rs = { version = "0.3.3", features = ["native-cpu"] }
@@ -47,16 +47,18 @@ buf-min = { version = "0.7", features = ["ntex-bytes"] }
 env_logger = "0.11"
 nanorand = { version = "0.7", default-features = false, features = ["std", "wyrand", "tls"] }
 atoi = "2.0"
-num_cpus = "1.16"
-smallvec = "1.13"
+core_affinity = "0.8"
 futures = "0.3"
-serde = { version = "1.0", features = ["derive"] }
-serde_json = "1.0"
+sonic-rs = "0.3.16"
+serde = { version = "1", features = ["derive"] }
+serde_json = "1"
 log = { version = "0.4", features = ["release_max_level_off"] }
-compio-driver = { version = "0.4", features = ["io-uring", "io-uring-socket"]}
 tok_io = {version = "1", package = "tokio" }
 tokio-postgres = { git="https://github.com/fafhrd91/postgres.git", branch="ntex-2" }
 
+[target.'cfg(target_os = "linux")'.dependencies]
+compio-driver = { version = "*", features = ["io-uring"]}
+
 [profile.release]
 opt-level = 3
 codegen-units = 1

+ 36 - 31
frameworks/Rust/ntex/src/db.rs

@@ -2,26 +2,31 @@
 use std::{borrow::Cow, cell::RefCell, fmt::Write as FmtWrite};
 
 use nanorand::{Rng, WyRand};
-use ntex::util::{BufMut, Bytes, BytesMut};
-use smallvec::SmallVec;
+use ntex::util::{Bytes, BytesMut};
 use tokio_postgres::types::ToSql;
 use tokio_postgres::{connect, Client, Statement};
-use yarte::{ywrite_html, Serialize};
+use yarte::TemplateBytesTrait;
 
 use super::utils;
 
-#[derive(Copy, Clone, Serialize, Debug)]
+#[derive(Copy, Clone, Debug, sonic_rs::Serialize)]
 pub struct World {
     pub id: i32,
     pub randomnumber: i32,
 }
 
-#[derive(Serialize, Debug)]
+#[derive(Debug, sonic_rs::Serialize)]
 pub struct Fortune {
     pub id: i32,
     pub message: Cow<'static, str>,
 }
 
+#[derive(yarte::TemplateBytes)]
+#[template(path = "fortune.hbs")]
+pub struct FortunesTemplate<'a> {
+    pub fortunes: &'a Vec<Fortune>,
+}
+
 /// Postgres interface
 pub struct PgConnection {
     cl: Client,
@@ -30,6 +35,7 @@ pub struct PgConnection {
     rng: WyRand,
     updates: Vec<Statement>,
     buf: RefCell<BytesMut>,
+    fbuf: RefCell<Vec<Fortune>>,
 }
 
 impl PgConnection {
@@ -69,6 +75,7 @@ impl PgConnection {
             updates,
             rng: WyRand::new(),
             buf: RefCell::new(BytesMut::with_capacity(10 * 1024 * 1024)),
+            fbuf: RefCell::new(Vec::with_capacity(64)),
         }
     }
 }
@@ -81,23 +88,26 @@ impl PgConnection {
 
         let mut body = self.buf.borrow_mut();
         utils::reserve(&mut body, 1024);
-        World {
-            id: row.get(0),
-            randomnumber: row.get(1),
-        }
-        .to_bytes_mut(&mut *body);
+        sonic_rs::to_writer(
+            utils::BytesWriter(&mut body),
+            &World {
+                id: row.get(0),
+                randomnumber: row.get(1),
+            },
+        )
+        .unwrap();
         body.split().freeze()
     }
 
     pub async fn get_worlds(&self, num: usize) -> Bytes {
         let mut rng = self.rng.clone();
-        let mut queries = SmallVec::<[_; 32]>::new();
+        let mut queries = Vec::with_capacity(num);
         (0..num).for_each(|_| {
             let w_id = (rng.generate::<u32>() % 10_000 + 1) as i32;
             queries.push(self.cl.query_one(&self.world, &[&w_id]));
         });
 
-        let mut worlds = SmallVec::<[_; 32]>::new();
+        let mut worlds = Vec::with_capacity(num);
         for fut in queries {
             let row = fut.await.unwrap();
             worlds.push(World {
@@ -108,25 +118,19 @@ impl PgConnection {
 
         let mut body = self.buf.borrow_mut();
         utils::reserve(&mut body, 2 * 1024);
-        body.put_u8(b'[');
-        worlds.iter().for_each(|w| {
-            w.to_bytes_mut(&mut *body);
-            body.put_u8(b',');
-        });
-        let idx = body.len() - 1;
-        body[idx] = b']';
+        sonic_rs::to_writer(utils::BytesWriter(&mut body), &worlds[..]).unwrap();
         body.split().freeze()
     }
 
     pub async fn update(&self, num: usize) -> Bytes {
         let mut rng = nanorand::tls_rng();
-        let mut queries = SmallVec::<[_; 32]>::new();
+        let mut queries = Vec::with_capacity(num);
         (0..num).for_each(|_| {
             let w_id = (rng.generate::<u32>() % 10_000 + 1) as i32;
             queries.push(self.cl.query_one(&self.world, &[&w_id]));
         });
 
-        let mut worlds = SmallVec::<[_; 32]>::new();
+        let mut worlds = Vec::with_capacity(num);
         for fut in queries.into_iter() {
             let row = fut.await.unwrap();
             worlds.push(World {
@@ -147,23 +151,18 @@ impl PgConnection {
 
         let mut body = self.buf.borrow_mut();
         utils::reserve(&mut body, 2 * 1024);
-        body.put_u8(b'[');
-        worlds.iter().for_each(|w| {
-            w.to_bytes_mut(&mut *body);
-            body.put_u8(b',');
-        });
-        let idx = body.len() - 1;
-        body[idx] = b']';
+        sonic_rs::to_writer(utils::BytesWriter(&mut body), &worlds[..]).unwrap();
         body.split().freeze()
     }
 
     pub async fn tell_fortune(&self) -> Bytes {
         let rows = self.cl.query_raw(&self.fortune, &[]).await.unwrap();
 
-        let mut fortunes: SmallVec<[_; 32]> = smallvec::smallvec![Fortune {
+        let mut fortunes = self.fbuf.borrow_mut();
+        fortunes.push(Fortune {
             id: 0,
             message: Cow::Borrowed("Additional fortune added at request time."),
-        }];
+        });
         fortunes.extend(rows.iter().map(|row| Fortune {
             id: row.get(0),
             message: Cow::Owned(row.get(1)),
@@ -172,7 +171,13 @@ impl PgConnection {
 
         let mut body = std::mem::replace(&mut *self.buf.borrow_mut(), BytesMut::new());
         utils::reserve(&mut body, 4 * 1024);
-        ywrite_html!(body, "{{> fortune }}");
+
+        FortunesTemplate {
+            fortunes: &*fortunes,
+        }
+        .write_call(&mut body);
+        fortunes.clear();
+
         let result = body.split().freeze();
         let _ = std::mem::replace(&mut *self.buf.borrow_mut(), body);
         result

+ 24 - 7
frameworks/Rust/ntex/src/main.rs

@@ -2,8 +2,8 @@
 static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
 
 use ntex::http::header::{CONTENT_TYPE, SERVER};
-use ntex::{http, time::Seconds, util::BytesMut, util::PoolId, web};
-use yarte::Serialize;
+use ntex::{http, time::Seconds, util::BytesMut, util::PoolId, util::Ready, web};
+use sonic_rs::Serialize;
 
 mod utils;
 
@@ -15,10 +15,13 @@ pub struct Message {
 #[web::get("/json")]
 async fn json() -> web::HttpResponse {
     let mut body = BytesMut::with_capacity(utils::SIZE);
-    Message {
-        message: "Hello, World!",
-    }
-    .to_bytes_mut(&mut body);
+    sonic_rs::to_writer(
+        utils::BytesWriter(&mut body),
+        &Message {
+            message: "Hello, World!",
+        },
+    )
+    .unwrap();
 
     let mut response = web::HttpResponse::with_body(http::StatusCode::OK, body.into());
     response.headers_mut().insert(SERVER, utils::HDR_SERVER);
@@ -45,6 +48,10 @@ async fn plaintext() -> web::HttpResponse {
 async fn main() -> std::io::Result<()> {
     println!("Started http server: 127.0.0.1:8080");
 
+    let cores = core_affinity::get_core_ids().unwrap();
+    let total_cores = cores.len();
+    let cores = std::sync::Arc::new(std::sync::Mutex::new(cores));
+
     // start http server
     ntex::server::build()
         .backlog(1024)
@@ -60,7 +67,17 @@ async fn main() -> std::io::Result<()> {
                 .payload_read_rate(Seconds::ZERO, Seconds::ZERO, 0)
                 .h1(web::App::new().service(json).service(plaintext).finish())
         })?
-        .workers(num_cpus::get())
+        .configure(move |cfg| {
+            let cores = cores.clone();
+            cfg.on_worker_start(move |_| {
+                if let Some(core) = cores.lock().unwrap().pop() {
+                    core_affinity::set_for_current(core);
+                }
+                Ready::<_, &str>::Ok(())
+            });
+            Ok(())
+        })?
+        .workers(total_cores)
         .run()
         .await
 }

+ 16 - 3
frameworks/Rust/ntex/src/main_db.rs

@@ -1,13 +1,12 @@
 #[cfg(not(target_os = "macos"))]
 #[global_allocator]
 static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
-// static GLOBAL: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
 
 use ntex::http::header::{CONTENT_TYPE, SERVER};
 use ntex::http::{HttpService, KeepAlive, Request, Response, StatusCode};
 use ntex::service::{Service, ServiceCtx, ServiceFactory};
 use ntex::web::{Error, HttpResponse};
-use ntex::{time::Seconds, util::PoolId};
+use ntex::{time::Seconds, util::PoolId, util::Ready};
 
 mod db;
 mod utils;
@@ -83,6 +82,10 @@ impl ServiceFactory<Request> for AppFactory {
 async fn main() -> std::io::Result<()> {
     println!("Starting http server: 127.0.0.1:8080");
 
+    let cores = core_affinity::get_core_ids().unwrap();
+    let total_cores = cores.len();
+    let cores = std::sync::Arc::new(std::sync::Mutex::new(cores));
+
     ntex::server::build()
         .backlog(1024)
         .bind("techempower", "0.0.0.0:8080", |cfg| {
@@ -97,7 +100,17 @@ async fn main() -> std::io::Result<()> {
                 .payload_read_rate(Seconds::ZERO, Seconds::ZERO, 0)
                 .h1(AppFactory)
         })?
-        .workers(num_cpus::get())
+        .configure(move |cfg| {
+            let cores = cores.clone();
+            cfg.on_worker_start(move |_| {
+                if let Some(core) = cores.lock().unwrap().pop() {
+                    core_affinity::set_for_current(core);
+                }
+                Ready::<_, &str>::Ok(())
+            });
+            Ok(())
+        })?
+        .workers(total_cores)
         .run()
         .await
 }

+ 25 - 7
frameworks/Rust/ntex/src/main_plt.rs

@@ -3,8 +3,9 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
 
 use std::{future::Future, io, pin::Pin, task::Context, task::Poll};
 
-use ntex::{fn_service, http::h1, io::Io, io::RecvError, util::ready, util::PoolId};
-use yarte::Serialize;
+use ntex::util::{ready, PoolId, Ready};
+use ntex::{fn_service, http::h1, io::Io, io::RecvError};
+use sonic_rs::Serialize;
 
 mod utils;
 
@@ -42,10 +43,13 @@ impl Future for App {
                                     buf.extend_from_slice(JSON);
                                     this.codec.set_date_header(buf);
 
-                                    Message {
-                                        message: "Hello, World!",
-                                    }
-                                    .to_bytes_mut(buf);
+                                    sonic_rs::to_writer(
+                                        utils::BytesWriter(buf),
+                                        &Message {
+                                            message: "Hello, World!",
+                                        },
+                                    )
+                                    .unwrap();
                                 }
                                 "/plaintext" => {
                                     buf.extend_from_slice(PLAIN);
@@ -75,6 +79,10 @@ impl Future for App {
 async fn main() -> io::Result<()> {
     println!("Started http server: 127.0.0.1:8080");
 
+    let cores = core_affinity::get_core_ids().unwrap();
+    let total_cores = cores.len();
+    let cores = std::sync::Arc::new(std::sync::Mutex::new(cores));
+
     // start http server
     ntex::server::build()
         .backlog(1024)
@@ -88,7 +96,17 @@ async fn main() -> io::Result<()> {
                 codec: h1::Codec::default(),
             })
         })?
-        .workers(num_cpus::get())
+        .configure(move |cfg| {
+            let cores = cores.clone();
+            cfg.on_worker_start(move |_| {
+                if let Some(core) = cores.lock().unwrap().pop() {
+                    core_affinity::set_for_current(core);
+                }
+                Ready::<_, &str>::Ok(())
+            });
+            Ok(())
+        })?
+        .workers(total_cores)
         .run()
         .await
 }

+ 35 - 1
frameworks/Rust/ntex/src/utils.rs

@@ -1,8 +1,9 @@
 #![allow(dead_code)]
-use std::cmp;
+use std::{cmp, io, io::Write, mem::MaybeUninit, slice::from_raw_parts_mut};
 
 use atoi::FromRadix10;
 use ntex::{http::header::HeaderValue, util::BufMut, util::Bytes, util::BytesMut};
+use sonic_rs::writer::WriteExt;
 
 pub const HDR_SERVER: HeaderValue = HeaderValue::from_static("N");
 pub const HDR_JSON_CONTENT_TYPE: HeaderValue = HeaderValue::from_static("application/json");
@@ -30,3 +31,36 @@ pub fn reserve(buf: &mut BytesMut, lw: usize) {
         buf.reserve(HW);
     }
 }
+
+pub struct BytesWriter<'a>(pub &'a mut BytesMut);
+
+impl<'a> Write for BytesWriter<'a> {
+    fn write(&mut self, src: &[u8]) -> Result<usize, io::Error> {
+        self.0.extend_from_slice(src);
+        Ok(src.len())
+    }
+
+    fn flush(&mut self) -> Result<(), io::Error> {
+        Ok(())
+    }
+}
+
+impl<'a> WriteExt for BytesWriter<'a> {
+    #[inline(always)]
+    fn reserve_with(&mut self, additional: usize) -> Result<&mut [MaybeUninit<u8>], io::Error> {
+        self.0.reserve(additional);
+
+        unsafe {
+            let ptr = self.0.as_mut_ptr().add(self.0.len()) as *mut MaybeUninit<u8>;
+            Ok(from_raw_parts_mut(ptr, additional))
+        }
+    }
+
+    #[inline(always)]
+    unsafe fn flush_len(&mut self, additional: usize) {
+        unsafe {
+            let new_len = self.0.len() + additional;
+            self.0.set_len(new_len);
+        }
+    }
+}

+ 3 - 3
frameworks/Rust/ntex/templates/fortune.hbs

@@ -1,5 +1,5 @@
 <!DOCTYPE html><html><head><title>Fortunes</title></head><body><table><tr><th>id</th><th>message</th></tr>
-      {{~# each fortunes ~}}
-      <tr><td>{{id}}</td><td>{{message}}</td></tr>
-      {{~/each ~}}
+{{~# each fortunes ~}}
+<tr><td>{{id}}</td><td>{{message}}</td></tr>
+{{~/each ~}}
 </table></body></html>

+ 0 - 12
frameworks/Rust/ntex/templates/fortune.html

@@ -1,12 +0,0 @@
-<!DOCTYPE html>
-<html>
-  <head><title>Fortunes</title></head>
-  <body>
-    <table>
-      <tr><th>id</th><th>message</th></tr>
-      {% for item in items %}
-      <tr><td>{{item.id}}</td><td>{{item.message}}</td></tr>
-      {% endfor %}
-    </table>
-  </body>
-</html>

+ 0 - 10
frameworks/Rust/ntex/templates/fortune.stpl

@@ -1,10 +0,0 @@
-<!DOCTYPE html>
-<html>
-  <head><title>Fortunes</title></head>
-  <body>
-    <table>
-      <tr><th>id</th><th>message</th></tr>
-      <% for item in items { %><tr><td><%= item.id %></td><td><%= &*item.message %></td></tr><% } %>
-    </table>
-  </body>
-</html>