Browse Source

salvo: update version 0.22 (#7325)

* salvo: update version 0.22

* salvo: remvoe num_cpus

* remove commented code

* fix compile error

* use mimalloc for main
Chrislearn Young 3 years ago
parent
commit
e2ac68a597

+ 2 - 4
frameworks/Rust/salvo/Cargo.toml

@@ -21,14 +21,12 @@ async-trait = "0.1.51"
 diesel = { version = "1.4", features = ["postgres", "r2d2"] }
 futures = "0.3"
 markup = "0.12"
-num_cpus = "1.13.0"
-# mimalloc = { version = "0.1.25", default-features = false }
+mimalloc = { version = "0.1.25", default-features = false }
 once_cell = "1.5.2"
 rand = { version = "0.8.3", features = ["min_const_gen", "small_rng"] }
 random-fast-rng = "0.1.1"
-salvo = { version = "0.21", features = ["anyhow"] }
+salvo = { version = "0.22", features = ["anyhow"] }
 serde = { version = "1.0", features = ["derive"] }
-serde_derive = "1.0.125"
 serde_json = "1.0.64"
 smallvec = "1.6.1"
 snmalloc-rs = { version = "0.2.24", features = ["1mib", "native-cpu"] }

+ 9 - 12
frameworks/Rust/salvo/src/main.rs

@@ -1,16 +1,12 @@
-// #[global_allocator]
-// static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
-// #[global_allocator]
-// static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
-
-#[macro_use]
-extern crate serde_derive;
-extern crate serde_json;
+#[global_allocator]
+static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
 
 use std::sync::Arc;
+use std::thread::available_parallelism;
 
 use salvo::http::header::{self, HeaderValue};
 use salvo::prelude::*;
+use serde::Serialize;
 
 mod server;
 
@@ -31,8 +27,9 @@ async fn json(res: &mut Response) {
 #[fn_handler]
 async fn plaintext(res: &mut Response) {
     res.headers_mut().insert(header::SERVER, HeaderValue::from_static("S"));
-    res.headers_mut().insert(header::CONTENT_TYPE, HeaderValue::from_static("text/plain"));
-    res.write_body(HELLO_WORLD);
+    res.headers_mut()
+        .insert(header::CONTENT_TYPE, HeaderValue::from_static("text/plain"));
+    res.write_body(HELLO_WORLD).ok();
 }
 
 fn main() {
@@ -42,7 +39,7 @@ fn main() {
             .push(Router::with_path("json").get(json)),
     );
 
-    for _ in 1..num_cpus::get() {
+    for _ in 1..available_parallelism().map(|n| n.get()).unwrap_or(16) {
         let router = router.clone();
         std::thread::spawn(move || {
             let rt = tokio::runtime::Builder::new_current_thread()
@@ -52,6 +49,7 @@ fn main() {
             rt.block_on(serve(router));
         });
     }
+    println!("Started http server: 127.0.0.1:8080");
     let rt = tokio::runtime::Builder::new_current_thread()
         .enable_all()
         .build()
@@ -60,7 +58,6 @@ fn main() {
 }
 
 async fn serve(router: Arc<Router>) {
-    println!("Started http server: 127.0.0.1:8080");
     server::builder()
         .http1_pipeline_flush(true)
         .serve(Service::new(router))

+ 8 - 7
frameworks/Rust/salvo/src/main_diesel.rs

@@ -9,6 +9,7 @@ extern crate diesel;
 use std::cmp;
 use std::fmt::Write;
 use std::sync::Arc;
+use std::thread::available_parallelism;
 
 use anyhow::Error;
 use diesel::prelude::*;
@@ -58,7 +59,7 @@ async fn world_row(res: &mut Response) -> Result<(), Error> {
 
 #[fn_handler]
 async fn queries(req: &mut Request, res: &mut Response) -> Result<(), Error> {
-    let count = req.get_query::<usize>("q").unwrap_or(1);
+    let count = req.query::<usize>("q").unwrap_or(1);
     let count = cmp::min(500, cmp::max(1, count));
     let mut worlds = Vec::with_capacity(count);
     let mut rng = SmallRng::from_entropy();
@@ -75,7 +76,7 @@ async fn queries(req: &mut Request, res: &mut Response) -> Result<(), Error> {
 
 #[fn_handler]
 async fn cached_queries(req: &mut Request, res: &mut Response) -> Result<(), Error> {
-    let count = req.get_query::<usize>("q").unwrap_or(1);
+    let count = req.query::<usize>("q").unwrap_or(1);
     let count = cmp::min(500, cmp::max(1, count));
     let mut worlds = Vec::with_capacity(count);
     let mut rng = SmallRng::from_entropy();
@@ -93,7 +94,7 @@ async fn cached_queries(req: &mut Request, res: &mut Response) -> Result<(), Err
 
 #[fn_handler]
 async fn updates(req: &mut Request, res: &mut Response) -> Result<(), Error> {
-    let count = req.get_query::<usize>("q").unwrap_or(1);
+    let count = req.query::<usize>("q").unwrap_or(1);
     let count = cmp::min(500, cmp::max(1, count));
     let conn = connect()?;
     let mut worlds = Vec::with_capacity(count);
@@ -176,12 +177,12 @@ fn main() {
             .push(Router::with_path("cached_queries").get(cached_queries))
             .push(Router::with_path("updates").get(updates)),
     );
-    let cpus = num_cpus::get();
+    let size = available_parallelism().map(|n| n.get()).unwrap_or(16);
     DB_POOL
-        .set(build_pool(&DB_URL, cpus as u32).expect(&format!("Error connecting to {}", &DB_URL)))
+        .set(build_pool(&DB_URL, size as u32).expect(&format!("Error connecting to {}", &DB_URL)))
         .ok();
     populate_cache().expect("error cache worlds");
-    for _ in 1..cpus {
+    for _ in 1..size {
         let router = router.clone();
         std::thread::spawn(move || {
             let rt = tokio::runtime::Builder::new_current_thread()
@@ -191,6 +192,7 @@ fn main() {
             rt.block_on(serve(router));
         });
     }
+    println!("Starting http server: 127.0.0.1:8080");
     let rt = tokio::runtime::Builder::new_current_thread()
         .enable_all()
         .build()
@@ -199,6 +201,5 @@ fn main() {
 }
 
 async fn serve(router: Arc<Router>) {
-    println!("Starting http server: 127.0.0.1:8080");
     server::builder().serve(Service::new(router)).await.unwrap();
 }

+ 6 - 5
frameworks/Rust/salvo/src/main_pg.rs

@@ -7,6 +7,7 @@ use std::cmp;
 use std::collections::HashMap;
 use std::fmt::Write;
 use std::io;
+use std::thread::available_parallelism;
 
 use anyhow::Error;
 use async_trait::async_trait;
@@ -212,7 +213,7 @@ impl WorldsHandler {
 #[async_trait]
 impl Handler for WorldsHandler {
     async fn handle(&self, req: &mut Request, _depot: &mut Depot, res: &mut Response, _ctrl: &mut FlowCtrl) {
-        let count = req.get_query::<u16>("q").unwrap_or(1);
+        let count = req.query::<u16>("q").unwrap_or(1);
         let count = cmp::min(500, cmp::max(1, count));
         res.headers_mut().insert(header::SERVER, HeaderValue::from_static("S"));
         let worlds = self.conn.get_worlds(count).await.unwrap();
@@ -234,7 +235,7 @@ impl UpdatesHandler {
 #[async_trait]
 impl Handler for UpdatesHandler {
     async fn handle(&self, req: &mut Request, _depot: &mut Depot, res: &mut Response, _ctrl: &mut FlowCtrl) {
-        let count = req.get_query::<u16>("q").unwrap_or(1);
+        let count = req.query::<u16>("q").unwrap_or(1);
         let count = cmp::min(500, cmp::max(1, count));
         res.headers_mut().insert(header::SERVER, HeaderValue::from_static("S"));
         let worlds = self.conn.update(count).await.unwrap();
@@ -265,7 +266,7 @@ impl Handler for FortunesHandler {
 
 #[fn_handler]
 async fn cached_queries(req: &mut Request, res: &mut Response) -> Result<(), Error> {
-    let count = req.get_query::<usize>("q").unwrap_or(1);
+    let count = req.query::<usize>("q").unwrap_or(1);
     let count = cmp::min(500, cmp::max(1, count));
     let mut worlds = Vec::with_capacity(count);
     let mut rng = SmallRng::from_entropy();
@@ -296,7 +297,7 @@ fn main() {
     rt.block_on(async {
         populate_cache().await.expect("error cache worlds");
     });
-    for _ in 1..num_cpus::get() {
+    for _ in 1..available_parallelism().map(|n| n.get()).unwrap_or(16) {
         std::thread::spawn(move || {
             let rt = tokio::runtime::Builder::new_current_thread()
                 .enable_all()
@@ -305,11 +306,11 @@ fn main() {
             rt.block_on(serve());
         });
     }
+    println!("Started http server: 127.0.0.1:8080");
     rt.block_on(serve());
 }
 
 async fn serve() {
-    println!("Started http server: 127.0.0.1:8080");
     let router = Router::new()
         .push(Router::with_path("db").get(WorldHandler::new().await))
         .push(Router::with_path("fortunes").get(FortunesHandler::new().await))