Browse Source

ntex updates (#7899)

* cores affinity

* cached queries

* wip
Nikolay Kim 2 years ago
parent
commit
aa7c55e8f1

+ 1 - 0
frameworks/Rust/ntex/Cargo.toml

@@ -39,6 +39,7 @@ async-std = ["ntex/async-std"]
 [dependencies]
 [dependencies]
 ntex = "0.6.1"
 ntex = "0.6.1"
 ntex-bytes = { version = "0.1.19", features=["simd"] }
 ntex-bytes = { version = "0.1.19", features=["simd"] }
+core_affinity = "0.8"
 mimalloc = { version = "0.1.25", default-features = false }
 mimalloc = { version = "0.1.25", default-features = false }
 snmalloc-rs = { version = "0.3.3", features = ["native-cpu"] }
 snmalloc-rs = { version = "0.3.3", features = ["native-cpu"] }
 yarte = { version = "0.15", features = ["bytes-buf", "json"] }
 yarte = { version = "0.15", features = ["bytes-buf", "json"] }

+ 102 - 100
frameworks/Rust/ntex/benchmark_config.json

@@ -2,116 +2,118 @@
   "framework": "ntex",
   "framework": "ntex",
   "tests": [{
   "tests": [{
     "default": {
     "default": {
-      "json_url": "/json",
-      "plaintext_url": "/plaintext",
-      "port": 8080,
-      "approach": "Realistic",
-      "classification": "Micro",
-      "database": "Postgres",
-      "framework": "ntex",
-      "language": "Rust",
-      "orm": "Raw",
-      "platform": "None",
-      "webserver": "ntex",
-      "os": "Linux",
-      "database_os": "Linux",
-      "display_name": "ntex [tokio]",
-      "notes": "",
-      "versus": ""
+        "json_url": "/json",
+        "plaintext_url": "/plaintext",
+        "port": 8080,
+        "approach": "Realistic",
+        "classification": "Micro",
+        "database": "Postgres",
+        "framework": "ntex",
+        "language": "Rust",
+        "orm": "Raw",
+        "platform": "None",
+        "webserver": "ntex",
+        "os": "Linux",
+        "database_os": "Linux",
+        "display_name": "ntex [tokio]",
+        "notes": "",
+        "versus": ""
     },
     },
     "astd": {
     "astd": {
-      "json_url": "/json",
-      "plaintext_url": "/plaintext",
-      "port": 8080,
-      "approach": "Realistic",
-      "classification": "Micro",
-      "database": "Postgres",
-      "framework": "ntex",
-      "language": "Rust",
-      "orm": "Raw",
-      "platform": "None",
-      "webserver": "ntex",
-      "os": "Linux",
-      "database_os": "Linux",
-      "display_name": "ntex [async-std]",
-      "notes": "",
-      "versus": ""
+        "json_url": "/json",
+        "plaintext_url": "/plaintext",
+        "port": 8080,
+        "approach": "Realistic",
+        "classification": "Micro",
+        "database": "Postgres",
+        "framework": "ntex",
+        "language": "Rust",
+        "orm": "Raw",
+        "platform": "None",
+        "webserver": "ntex",
+        "os": "Linux",
+        "database_os": "Linux",
+        "display_name": "ntex [async-std]",
+        "notes": "",
+        "versus": ""
     },
     },
     "db": {
     "db": {
-      "fortune_url": "/fortunes",
-      "db_url": "/db",
-      "query_url": "/query?q=",
-      "update_url": "/update?q=",
-      "port": 8080,
-      "approach": "Realistic",
-      "classification": "Micro",
-      "database": "Postgres",
-      "framework": "ntex",
-      "language": "Rust",
-      "orm": "Raw",
-      "platform": "None",
-      "webserver": "ntex",
-      "os": "Linux",
-      "database_os": "Linux",
-      "display_name": "ntex [tokio,db]",
-      "notes": "",
-      "versus": ""
+        "fortune_url": "/fortunes",
+        "db_url": "/db",
+        "query_url": "/query?q=",
+        "update_url": "/update?q=",
+        "cached_query_url": "/cached_query?q=",
+        "port": 8080,
+        "approach": "Realistic",
+        "classification": "Micro",
+        "database": "Postgres",
+        "framework": "ntex",
+        "language": "Rust",
+        "orm": "Raw",
+        "platform": "None",
+        "webserver": "ntex",
+        "os": "Linux",
+        "database_os": "Linux",
+        "display_name": "ntex [tokio,db]",
+        "notes": "",
+        "versus": ""
     },
     },
     "db-astd": {
     "db-astd": {
-      "fortune_url": "/fortunes",
-      "db_url": "/db",
-      "query_url": "/query?q=",
-      "update_url": "/update?q=",
-      "port": 8080,
-      "approach": "Realistic",
-      "classification": "Micro",
-      "database": "Postgres",
-      "framework": "ntex",
-      "language": "Rust",
-      "orm": "Raw",
-      "platform": "None",
-      "webserver": "ntex",
-      "os": "Linux",
-      "database_os": "Linux",
-      "display_name": "ntex [async-std,db]",
-      "notes": "",
-      "versus": ""
+        "fortune_url": "/fortunes",
+        "db_url": "/db",
+        "query_url": "/query?q=",
+        "update_url": "/update?q=",
+        "cached_query_url": "/cached_query?q=",
+        "port": 8080,
+        "approach": "Realistic",
+        "classification": "Micro",
+        "database": "Postgres",
+        "framework": "ntex",
+        "language": "Rust",
+        "orm": "Raw",
+        "platform": "None",
+        "webserver": "ntex",
+        "os": "Linux",
+        "database_os": "Linux",
+        "display_name": "ntex [async-std,db]",
+        "notes": "",
+        "versus": ""
     },
     },
     "plt": {
     "plt": {
-      "json_url": "/json",
-      "plaintext_url": "/plaintext",
-      "port": 8080,
-      "approach": "Realistic",
-      "classification": "Platform",
-      "database": "Postgres",
-      "framework": "ntex",
-      "language": "Rust",
-      "orm": "Raw",
-      "platform": "None",
-      "webserver": "ntex",
-      "os": "Linux",
-      "database_os": "Linux",
-      "display_name": "ntex [tokio,platform]",
-      "notes": "",
-      "versus": ""
+        "json_url": "/json",
+        "plaintext_url": "/plaintext",
+        "port": 8080,
+        "approach": "Realistic",
+        "classification": "Platform",
+        "database": "Postgres",
+        "framework": "ntex",
+        "language": "Rust",
+        "orm": "Raw",
+        "platform": "None",
+        "webserver": "ntex",
+        "os": "Linux",
+        "database_os": "Linux",
+        "display_name": "ntex [tokio,platform]",
+        "notes": "",
+        "versus": ""
     },
     },
     "plt-astd": {
     "plt-astd": {
-      "json_url": "/json",
-      "plaintext_url": "/plaintext",
-      "port": 8080,
-      "approach": "Realistic",
-      "classification": "Platform",
-      "database": "Postgres",
-      "framework": "ntex",
-      "language": "Rust",
-      "orm": "Raw",
-      "platform": "None",
-      "webserver": "ntex",
-      "os": "Linux",
-      "database_os": "Linux",
-      "display_name": "ntex [async-std,platform]",
-      "notes": "",
-      "versus": ""
+        "json_url": "/json",
+        "plaintext_url": "/plaintext",
+        "port": 8080,
+        "approach": "Realistic",
+        "classification": "Platform",
+        "database": "Postgres",
+        "framework": "ntex",
+        "language": "Rust",
+        "orm": "Raw",
+        "platform": "None",
+        "webserver": "ntex",
+        "os": "Linux",
+        "database_os": "Linux",
+        "display_name": "ntex [async-std,platform]",
+        "notes": "",
+        "versus": ""
     }
     }
   }]
   }]
 }
 }

+ 2 - 0
frameworks/Rust/ntex/config.toml

@@ -31,6 +31,7 @@ versus = ""
 urls.db = "/db"
 urls.db = "/db"
 urls.query = "/query?q="
 urls.query = "/query?q="
 urls.update = "/update?q="
 urls.update = "/update?q="
+urls.cached_query = "/cached_query?q="
 urls.fortune = "/fortunes"
 urls.fortune = "/fortunes"
 approach = "Realistic"
 approach = "Realistic"
 classification = "Micro"
 classification = "Micro"
@@ -47,6 +48,7 @@ urls.db = "/db"
 urls.query = "/query?q="
 urls.query = "/query?q="
 urls.update = "/update?q="
 urls.update = "/update?q="
 urls.fortune = "/fortunes"
 urls.fortune = "/fortunes"
+urls.cached_query = "/cached_query?q="
 approach = "Realistic"
 approach = "Realistic"
 classification = "Micro"
 classification = "Micro"
 database = "Postgres"
 database = "Postgres"

+ 27 - 0
frameworks/Rust/ntex/src/db.rs

@@ -29,6 +29,7 @@ pub struct PgConnection {
     rng: WyRand,
     rng: WyRand,
     updates: Vec<Statement>,
     updates: Vec<Statement>,
     buf: RefCell<BytesMut>,
     buf: RefCell<BytesMut>,
+    cached: Vec<World>,
 }
 }
 
 
 impl PgConnection {
 impl PgConnection {
@@ -61,11 +62,21 @@ impl PgConnection {
         }
         }
         let world = cl.prepare("SELECT * FROM world WHERE id=$1").await.unwrap();
         let world = cl.prepare("SELECT * FROM world WHERE id=$1").await.unwrap();
 
 
+        let all_worlds = cl.prepare("SELECT * FROM world ORDER by id").await.unwrap();
+        let mut cached = Vec::new();
+        for row in cl.query_raw(&all_worlds, &[]).await.unwrap() {
+            cached.push(World {
+                id: row.get(0),
+                randomnumber: row.get(1),
+            });
+        }
+
         PgConnection {
         PgConnection {
             cl,
             cl,
             fortune,
             fortune,
             world,
             world,
             updates,
             updates,
+            cached,
             rng: WyRand::new(),
             rng: WyRand::new(),
             buf: RefCell::new(BytesMut::with_capacity(65535)),
             buf: RefCell::new(BytesMut::with_capacity(65535)),
         }
         }
@@ -156,6 +167,22 @@ impl PgConnection {
         body.split().freeze()
         body.split().freeze()
     }
     }
 
 
+    pub fn cached_query(&self, num: usize) -> Bytes {
+        let mut rng = nanorand::tls_rng();
+
+        let mut body = self.buf.borrow_mut();
+        utils::reserve(&mut body);
+        body.put_u8(b'[');
+        (0..num).for_each(|_| {
+            let w_id = rng.generate::<usize>() % 10_000;
+            self.cached[w_id].to_bytes_mut(&mut *body);
+            body.put_u8(b',');
+        });
+        let idx = body.len() - 1;
+        body[idx] = b']';
+        body.split().freeze()
+    }
+
     pub async fn tell_fortune(&self) -> Bytes {
     pub async fn tell_fortune(&self) -> Bytes {
         let fut = self.cl.query_raw(&self.fortune, &[]);
         let fut = self.cl.query_raw(&self.fortune, &[]);
 
 

+ 17 - 0
frameworks/Rust/ntex/src/main.rs

@@ -1,6 +1,8 @@
 #[global_allocator]
 #[global_allocator]
 static GLOBAL: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
 static GLOBAL: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
 
 
+use std::sync::{Arc, Mutex};
+
 use ntex::http::header::{CONTENT_TYPE, SERVER};
 use ntex::http::header::{CONTENT_TYPE, SERVER};
 use ntex::{http, time::Seconds, util::BytesMut, util::PoolId, web};
 use ntex::{http, time::Seconds, util::BytesMut, util::PoolId, web};
 use yarte::Serialize;
 use yarte::Serialize;
@@ -45,9 +47,23 @@ async fn plaintext() -> web::HttpResponse {
 async fn main() -> std::io::Result<()> {
 async fn main() -> std::io::Result<()> {
     println!("Started http server: 127.0.0.1:8080");
     println!("Started http server: 127.0.0.1:8080");
 
 
+    let cores = core_affinity::get_core_ids().unwrap();
+    let total_cores = cores.len();
+    let cores = Arc::new(Mutex::new(cores));
+
     // start http server
     // start http server
     ntex::server::build()
     ntex::server::build()
         .backlog(1024)
         .backlog(1024)
+        .configure(move |cfg| {
+            let cores = cores.clone();
+            cfg.on_worker_start(move |_| {
+                if let Some(core) = cores.lock().unwrap().pop() {
+                    // Pin this worker to a single CPU core.
+                    core_affinity::set_for_current(core);
+                }
+                std::future::ready(Ok::<_, &'static str>(()))
+            })
+        })?
         .bind("techempower", "0.0.0.0:8080", |cfg| {
         .bind("techempower", "0.0.0.0:8080", |cfg| {
             cfg.memory_pool(PoolId::P1);
             cfg.memory_pool(PoolId::P1);
             PoolId::P1.set_read_params(65535, 2048);
             PoolId::P1.set_read_params(65535, 2048);
@@ -58,6 +74,7 @@ async fn main() -> std::io::Result<()> {
                 .client_timeout(Seconds(0))
                 .client_timeout(Seconds(0))
                 .h1(web::App::new().service(json).service(plaintext).finish())
                 .h1(web::App::new().service(json).service(plaintext).finish())
         })?
         })?
+        .workers(total_cores)
         .run()
         .run()
         .await
         .await
 }
 }

+ 34 - 3
frameworks/Rust/ntex/src/main_db.rs

@@ -2,11 +2,13 @@
 #[global_allocator]
 #[global_allocator]
 static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
 static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
 
 
+use std::sync::{Arc, Mutex};
+
 use ntex::http::header::{CONTENT_TYPE, SERVER};
 use ntex::http::header::{CONTENT_TYPE, SERVER};
 use ntex::http::{HttpService, KeepAlive, Request, Response, StatusCode};
 use ntex::http::{HttpService, KeepAlive, Request, Response, StatusCode};
 use ntex::service::{Service, ServiceFactory};
 use ntex::service::{Service, ServiceFactory};
 use ntex::web::{Error, HttpResponse};
 use ntex::web::{Error, HttpResponse};
-use ntex::{time::Seconds, util::PoolId, util::BoxFuture};
+use ntex::{time::Seconds, util::BoxFuture, util::PoolId};
 
 
 mod db;
 mod db;
 mod utils;
 mod utils;
@@ -38,7 +40,8 @@ impl Service<Request> for App {
                     Ok(res)
                     Ok(res)
                 }
                 }
                 "/query" => {
                 "/query" => {
-                    let worlds = self.0
+                    let worlds = self
+                        .0
                         .get_worlds(utils::get_query_param(req.uri().query()))
                         .get_worlds(utils::get_query_param(req.uri().query()))
                         .await;
                         .await;
                     let mut res = HttpResponse::with_body(StatusCode::OK, worlds.into());
                     let mut res = HttpResponse::with_body(StatusCode::OK, worlds.into());
@@ -48,7 +51,20 @@ impl Service<Request> for App {
                     Ok(res)
                     Ok(res)
                 }
                 }
                 "/update" => {
                 "/update" => {
-                    let worlds = self.0.update(utils::get_query_param(req.uri().query())).await;
+                    let worlds = self
+                        .0
+                        .update(utils::get_query_param(req.uri().query()))
+                        .await;
+                    let mut res = HttpResponse::with_body(StatusCode::OK, worlds.into());
+                    res.headers_mut().insert(SERVER, utils::HDR_SERVER);
+                    res.headers_mut()
+                        .insert(CONTENT_TYPE, utils::HDR_JSON_CONTENT_TYPE);
+                    Ok(res)
+                }
+                "/cached_query" => {
+                    let worlds = self
+                        .0
+                        .cached_query(utils::get_query_param(req.uri().query()));
                     let mut res = HttpResponse::with_body(StatusCode::OK, worlds.into());
                     let mut res = HttpResponse::with_body(StatusCode::OK, worlds.into());
                     res.headers_mut().insert(SERVER, utils::HDR_SERVER);
                     res.headers_mut().insert(SERVER, utils::HDR_SERVER);
                     res.headers_mut()
                     res.headers_mut()
@@ -82,8 +98,22 @@ impl ServiceFactory<Request> for AppFactory {
 async fn main() -> std::io::Result<()> {
 async fn main() -> std::io::Result<()> {
     println!("Starting http server: 127.0.0.1:8080");
     println!("Starting http server: 127.0.0.1:8080");
 
 
+    let cores = core_affinity::get_core_ids().unwrap();
+    let total_cores = cores.len();
+    let cores = Arc::new(Mutex::new(cores));
+
     ntex::server::build()
     ntex::server::build()
         .backlog(1024)
         .backlog(1024)
+        .configure(move |cfg| {
+            let cores = cores.clone();
+            cfg.on_worker_start(move |_| {
+                if let Some(core) = cores.lock().unwrap().pop() {
+                    // Pin this worker to a single CPU core.
+                    core_affinity::set_for_current(core);
+                }
+                std::future::ready(Ok::<_, &'static str>(()))
+            })
+        })?
         .bind("techempower", "0.0.0.0:8080", |cfg| {
         .bind("techempower", "0.0.0.0:8080", |cfg| {
             cfg.memory_pool(PoolId::P1);
             cfg.memory_pool(PoolId::P1);
             PoolId::P1.set_read_params(65535, 2048);
             PoolId::P1.set_read_params(65535, 2048);
@@ -94,6 +124,7 @@ async fn main() -> std::io::Result<()> {
                 .client_timeout(Seconds(0))
                 .client_timeout(Seconds(0))
                 .h1(AppFactory)
                 .h1(AppFactory)
         })?
         })?
+        .workers(total_cores)
         .run()
         .run()
         .await
         .await
 }
 }

+ 16 - 1
frameworks/Rust/ntex/src/main_plt.rs

@@ -1,6 +1,6 @@
 #[global_allocator]
 #[global_allocator]
 static GLOBAL: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
 static GLOBAL: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
-use std::{future::Future, io, pin::Pin, task::Context, task::Poll};
+use std::{future::Future, io, pin::Pin, sync::Arc, sync::Mutex, task::Context, task::Poll};
 
 
 use ntex::{fn_service, http::h1, io::Io, io::RecvError, util::ready, util::PoolId};
 use ntex::{fn_service, http::h1, io::Io, io::RecvError, util::ready, util::PoolId};
 use yarte::Serialize;
 use yarte::Serialize;
@@ -74,9 +74,23 @@ impl Future for App {
 async fn main() -> io::Result<()> {
 async fn main() -> io::Result<()> {
     println!("Started http server: 127.0.0.1:8080");
     println!("Started http server: 127.0.0.1:8080");
 
 
+    let cores = core_affinity::get_core_ids().unwrap();
+    let total_cores = cores.len();
+    let cores = Arc::new(Mutex::new(cores));
+
     // start http server
     // start http server
     ntex::server::build()
     ntex::server::build()
         .backlog(1024)
         .backlog(1024)
+        .configure(move |cfg| {
+            let cores = cores.clone();
+            cfg.on_worker_start(move |_| {
+                if let Some(core) = cores.lock().unwrap().pop() {
+                    // Pin this worker to a single CPU core.
+                    core_affinity::set_for_current(core);
+                }
+                std::future::ready(Ok::<_, &'static str>(()))
+            })
+        })?
         .bind("techempower", "0.0.0.0:8080", |cfg| {
         .bind("techempower", "0.0.0.0:8080", |cfg| {
             cfg.memory_pool(PoolId::P1);
             cfg.memory_pool(PoolId::P1);
             PoolId::P1.set_read_params(65535, 2048);
             PoolId::P1.set_read_params(65535, 2048);
@@ -87,6 +101,7 @@ async fn main() -> io::Result<()> {
                 codec: h1::Codec::default(),
                 codec: h1::Codec::default(),
             })
             })
         })?
         })?
+        .workers(total_cores)
         .run()
         .run()
         .await
         .await
 }
 }