Browse Source

Upgrade hyperlane (#9748)

* feat: update & fortune

* feat: randomNumber

* feat: max row 500

* feat: update

* feat: Fortune

* feat: fortunes

* feat: update

* feat: update

* feat: update

* feat: update

* feat: fortunes

* feat: fortunes

* feat: fortunes

* feat: fortunes

* feat: fortunes

* feat: fortunes

* feat: update

* feat: update

* feat: update

* feat: update

* feat: update

* feat: update

* feat: update

* feat: update

* feat: update

* feat: update

* feat: update

* feat: update

* feat: update

* feat: update

* feat: update

* feat: update

* feat: update

* feat: update

* feat: update

* feat: update

* feat: update

* feat: update

* feat: update

* feat: update

* feat: update

* feat: update

* feat: update

* feat: cache

* feat: cache

* feat: config

* feat: config

* feat: config

* feat: v4.36.1

* docs: readme

* feat: remove dyn

* docs: readme

* feat: lock

* feat: lock

* feat: lock

* feat: lock

* feat: db pool

* feat: db pool

* feat: lock

* feat: lock

* feat: db

* feat: db

* feat: db

* feat: db

* feat: db

* feat: db

* feat: db

* feat: db

* feat: rand

* feat: rand

* feat: rand

* feat: rand

* feat: rand

* feat: port

* feat: port

* feat: port

* feat: db

* feat: db

* feat: db

* feat: lock

* feat: lock

* feat: v4.41.0

* feat: v4.42.0

* Merge remote-tracking branch 'upstream/master'

* Merge remote-tracking branch 'upstream/master'

* feat: inline

* feat: dockerfile

* feat: v4.52.1

* feat: remove key

* remove: log

* remove: log

* feat: async

* remove: empty loop

* feat: utf8

* change: pool_size

* remove: utf8

* feat: log

* feat: log

* feat: v3.14.1

* feat: 4.56.3

* feat: 4.56.4

* feat: 4.56.5

* feat:  rename

* Merge branch 'master' of github.com:TechEmpower/FrameworkBenchmarks

* feat: speed

* feat: speed

* lock: toml

* lock: toml

* lock: toml

* lock: toml

* lock: toml

* lock: toml

* lock: toml

* lock: toml

* lock: toml

* lock: toml

* lock: toml

* lock: toml

* lock: toml

* feat: runtime

* feat: runtime

* feat: runtime
尤雨东 4 months ago
parent
commit
e04fbbbdf9

+ 5 - 4
frameworks/Rust/hyperlane/Cargo.toml

@@ -18,10 +18,11 @@ exclude = [
 ]
 
 [dependencies]
-hyperlane = "4.56.5"
-rand = "0.9.0"
-serde = "1.0.219"
-sqlx = { version = "0.8.3", features = ["runtime-tokio", "postgres"] }
+hyperlane = "=4.65.0"
+num_cpus = "=1.16.0"
+rand = "=0.9.0"
+serde = "=1.0.219"
+sqlx = { version = "=0.8.3", features = ["runtime-tokio", "postgres"] }
 
 [profile.dev]
 incremental = false

+ 39 - 43
frameworks/Rust/hyperlane/src/db.rs

@@ -1,18 +1,12 @@
 use crate::*;
 
-pub async fn get_db_connection() -> DbPoolConnection {
-    if let Some(db_pool) = DB.get() {
-        return db_pool.clone();
-    };
-    let db_pool: DbPoolConnection = connection_db().await;
-    DB.set(db_pool.clone())
-        .expect("Failed to initialize DB_POOL");
-    db_pool
+pub fn get_db_connection() -> &'static DbPoolConnection {
+    &DB
 }
 
 #[cfg(feature = "dev")]
 pub async fn create_database() {
-    let db_pool: DbPoolConnection = get_db_connection().await;
+    let db_pool: &DbPoolConnection = get_db_connection();
     let _ = query(&format!("CREATE DATABASE {};", DATABASE_NAME))
         .execute(&db_pool)
         .await;
@@ -20,7 +14,7 @@ pub async fn create_database() {
 
 #[cfg(feature = "dev")]
 pub async fn create_table() {
-    let db_pool: DbPoolConnection = get_db_connection().await;
+    let db_pool: &DbPoolConnection = get_db_connection();
     let _ = query(&format!(
         "CREATE TABLE IF NOT EXISTS {} (
             id SERIAL PRIMARY KEY, randomNumber INT NOT NULL
@@ -41,7 +35,7 @@ pub async fn create_table() {
 
 #[cfg(feature = "dev")]
 pub async fn insert_records() {
-    let db_pool: DbPoolConnection = get_db_connection().await;
+    let db_pool: &DbPoolConnection = get_db_connection();
     let row: PgRow = query(&format!("SELECT COUNT(*) FROM {}", TABLE_NAME_WORLD))
         .fetch_one(&db_pool)
         .await
@@ -76,21 +70,21 @@ pub async fn insert_records() {
     let _ = query(&sql).execute(&db_pool).await;
 }
 
-pub async fn init_cache() {
+pub async fn init_cache() -> Vec<QueryRow> {
     let mut res: Vec<QueryRow> = Vec::with_capacity(RANDOM_MAX as usize);
-    let db_pool: DbPoolConnection = get_db_connection().await;
+    let db_pool: &DbPoolConnection = get_db_connection();
     let sql: String = format!(
         "SELECT id, randomNumber FROM {} LIMIT {}",
         TABLE_NAME_WORLD, RANDOM_MAX
     );
-    if let Ok(rows) = query(&sql).fetch_all(&db_pool).await {
+    if let Ok(rows) = query(&sql).fetch_all(db_pool).await {
         for row in rows {
             let id: i32 = row.get(KEY_ID);
             let random_number: i32 = row.get(KEY_RANDOM_NUMBER);
             res.push(QueryRow::new(id, random_number));
         }
     }
-    let _ = CACHE.set(res);
+    res
 }
 
 pub async fn connection_db() -> DbPoolConnection {
@@ -106,18 +100,10 @@ pub async fn connection_db() -> DbPoolConnection {
             DATABASE_NAME
         ),
     };
-    let pool_size: u32 = (get_thread_count() << 2).max(10).min(100) as u32;
-    let max_pool_size: u32 = option_env!("POSTGRES_MAX_POOL_SIZE")
-        .unwrap_or(&pool_size.to_string())
-        .parse::<u32>()
-        .unwrap_or(pool_size);
-    let min_pool_size: u32 = option_env!("POSTGRES_MIN_POOL_SIZE")
-        .unwrap_or(&pool_size.to_string())
-        .parse::<u32>()
-        .unwrap_or(pool_size);
+    let pool_size: u32 = num_cpus::get() as u32;
     let pool: DbPoolConnection = PgPoolOptions::new()
-        .max_connections(max_pool_size)
-        .min_connections(min_pool_size)
+        .max_connections(100)
+        .min_connections(pool_size)
         .max_lifetime(None)
         .test_before_acquire(false)
         .idle_timeout(None)
@@ -130,10 +116,10 @@ pub async fn connection_db() -> DbPoolConnection {
 pub async fn get_update_data(
     limit: Queries,
 ) -> (String, Vec<QueryRow>, Vec<Queries>, Vec<Queries>) {
-    let db_pool: DbPoolConnection = get_db_connection().await;
+    let db_pool: &DbPoolConnection = get_db_connection();
     let mut query_res_list: Vec<QueryRow> = Vec::with_capacity(limit as usize);
-    let rows: Vec<QueryRow> = get_some_row_id(limit, &db_pool).await;
-    let mut sql = format!("UPDATE {} SET randomNumber = CASE id ", TABLE_NAME_WORLD);
+    let rows: Vec<QueryRow> = get_some_row_id(limit, db_pool).await;
+    let mut sql: String = format!("UPDATE {} SET randomNumber = CASE id ", TABLE_NAME_WORLD);
     let mut id_list: Vec<i32> = Vec::with_capacity(rows.len());
     let mut value_list: Vec<String> = Vec::with_capacity(rows.len() * 2);
     let mut random_numbers: Vec<i32> = Vec::with_capacity(rows.len());
@@ -159,14 +145,13 @@ pub async fn get_update_data(
 }
 
 pub async fn init_db() {
-    get_db_connection().await;
     #[cfg(feature = "dev")]
     {
         create_database().await;
         create_table().await;
         insert_records().await;
     }
-    init_cache().await;
+    black_box(init_cache().await);
 }
 
 pub async fn random_world_row(db_pool: &DbPoolConnection) -> QueryRow {
@@ -176,7 +161,7 @@ pub async fn random_world_row(db_pool: &DbPoolConnection) -> QueryRow {
 
 pub async fn query_world_row(db_pool: &DbPoolConnection, id: Queries) -> QueryRow {
     let sql: String = format!(
-        "SELECT id, randomNumber FROM {} WHERE id = {} LIMIT 1",
+        "SELECT id, randomNumber FROM {} WHERE id = {}",
         TABLE_NAME_WORLD, id
     );
     if let Ok(rows) = query(&sql).fetch_one(db_pool).await {
@@ -187,7 +172,7 @@ pub async fn query_world_row(db_pool: &DbPoolConnection, id: Queries) -> QueryRo
 }
 
 pub async fn update_world_rows(limit: Queries) -> Vec<QueryRow> {
-    let db_pool: DbPoolConnection = get_db_connection().await;
+    let db_pool: &DbPoolConnection = get_db_connection();
     let (sql, data, id_list, random_numbers) = get_update_data(limit).await;
     let mut query_builder: query::Query<'_, Postgres, postgres::PgArguments> = query(&sql);
     for (id, random_number) in id_list.iter().zip(random_numbers.iter()) {
@@ -196,23 +181,34 @@ pub async fn update_world_rows(limit: Queries) -> Vec<QueryRow> {
     for id in &id_list {
         query_builder = query_builder.bind(id);
     }
-    let _ = query_builder.execute(&db_pool).await;
+    let _ = query_builder.execute(db_pool).await;
     data
 }
 
 pub async fn all_world_row() -> Vec<PgRow> {
-    let db_pool: DbPoolConnection = get_db_connection().await;
+    let db_pool: &DbPoolConnection = get_db_connection();
     let sql: String = format!("SELECT id, message FROM {}", TABLE_NAME_FORTUNE);
-    let res: Vec<PgRow> = query(&sql).fetch_all(&db_pool).await.unwrap_or_default();
+    let res: Vec<PgRow> = query(&sql).fetch_all(db_pool).await.unwrap_or_default();
     return res;
 }
 
 pub async fn get_some_row_id(limit: Queries, db_pool: &DbPoolConnection) -> Vec<QueryRow> {
-    let futures: Vec<_> = (0..limit)
-        .map(|_| async {
-            let id: i32 = get_random_id();
-            query_world_row(db_pool, id).await
-        })
-        .collect();
-    join_all(futures).await
+    let semaphore: Arc<Semaphore> = Arc::new(Semaphore::new(32));
+    let mut tasks: Vec<JoinHandle<QueryRow>> = Vec::with_capacity(limit as usize);
+    for _ in 0..limit {
+        let _ = semaphore.clone().acquire_owned().await.map(|permit| {
+            let db_pool: DbPoolConnection = db_pool.clone();
+            tasks.push(spawn(async move {
+                let id: i32 = get_random_id();
+                let res: QueryRow = query_world_row(&db_pool, id).await;
+                drop(permit);
+                res
+            }));
+        });
+    }
+    join_all(tasks)
+        .await
+        .into_iter()
+        .filter_map(Result::ok)
+        .collect()
 }

+ 2 - 2
frameworks/Rust/hyperlane/src/lazy.rs

@@ -1,4 +1,4 @@
 use crate::*;
 
-pub static DB: OnceCell<DbPoolConnection> = OnceCell::new();
-pub static CACHE: OnceCell<Vec<QueryRow>> = OnceCell::new();
+pub static DB: Lazy<DbPoolConnection> = Lazy::new(|| block_on(async { connection_db().await }));
+pub static CACHE: Lazy<Vec<QueryRow>> = Lazy::new(|| block_on(async { init_cache().await }));

+ 6 - 7
frameworks/Rust/hyperlane/src/main.rs

@@ -10,11 +10,12 @@ pub(crate) mod utils;
 
 pub(crate) use constant::*;
 pub(crate) use db::*;
-pub(crate) use futures::future::join_all;
 pub(crate) use hyperlane::{
-    once_cell::sync::OnceCell,
+    futures::{executor::block_on, future::join_all},
+    once_cell::sync::Lazy,
     serde::*,
     serde_json::{Value, json},
+    tokio::{spawn, sync::Semaphore, task::JoinHandle},
     *,
 };
 pub(crate) use lazy::*;
@@ -27,12 +28,10 @@ pub(crate) use sqlx::{
     postgres::{PgPoolOptions, PgRow},
     *,
 };
-pub(crate) use std::fmt;
+pub(crate) use std::{fmt, hint::black_box, sync::Arc};
 pub(crate) use r#type::*;
 pub(crate) use utils::*;
 
-#[tokio::main]
-async fn main() {
-    init_db().await;
-    run_server().await;
+fn main() {
+    run_server();
 }

+ 2 - 2
frameworks/Rust/hyperlane/src/request_middleware.rs

@@ -1,7 +1,7 @@
 use crate::*;
 
-pub async fn request(controller_data: ControllerData) {
-    let _ = controller_data
+pub async fn request(ctx: Context) {
+    let _ = ctx
         .set_response_header(CONNECTION, CONNECTION_KEEP_ALIVE)
         .await
         .set_response_header(CONTENT_TYPE, APPLICATION_JSON)

+ 2 - 2
frameworks/Rust/hyperlane/src/response_middleware.rs

@@ -1,5 +1,5 @@
 use crate::*;
 
-pub async fn response(controller_data: ControllerData) {
-    let _ = controller_data.send().await;
+pub async fn response(ctx: Context) {
+    let _ = ctx.send().await;
 }

+ 22 - 29
frameworks/Rust/hyperlane/src/route.rs

@@ -1,46 +1,46 @@
 use crate::*;
 
-pub async fn json(controller_data: ControllerData) {
+pub async fn json(ctx: Context) {
     let json: Value = json!({
         "message": RESPONSEDATA_STR
     });
-    let _ = controller_data
+    let _ = ctx
         .set_response_body(serde_json::to_string(&json).unwrap_or_default())
         .await;
 }
 
-pub async fn plaintext(controller_data: ControllerData) {
-    let _ = controller_data
+pub async fn plaintext(ctx: Context) {
+    let _ = ctx
         .set_response_header(CONTENT_TYPE, TEXT_PLAIN)
         .await
         .set_response_body(RESPONSEDATA_BIN)
         .await;
 }
 
-pub async fn db(controller_data: ControllerData) {
-    let db_connection: DbPoolConnection = get_db_connection().await;
-    let query_row: QueryRow = random_world_row(&db_connection).await;
-    let _ = controller_data
+pub async fn db(ctx: Context) {
+    let db_connection: &DbPoolConnection = get_db_connection();
+    let query_row: QueryRow = random_world_row(db_connection).await;
+    let _ = ctx
         .set_response_body(serde_json::to_string(&query_row).unwrap_or_default())
         .await;
 }
 
-pub async fn queries(controller_data: ControllerData) {
-    let queries: Queries = controller_data
+pub async fn queries(ctx: Context) {
+    let queries: Queries = ctx
         .get_request_query("q")
         .await
         .and_then(|queries| queries.parse::<Queries>().ok())
         .unwrap_or_default()
         .min(ROW_LIMIT as Queries)
         .max(1);
-    let db_pool: DbPoolConnection = get_db_connection().await;
-    let data: Vec<QueryRow> = get_some_row_id(queries, &db_pool).await;
-    let _ = controller_data
+    let db_pool: &DbPoolConnection = get_db_connection();
+    let data: Vec<QueryRow> = get_some_row_id(queries, db_pool).await;
+    let _ = ctx
         .set_response_body(serde_json::to_string(&data).unwrap_or_default())
         .await;
 }
 
-pub async fn fortunes(controller_data: ControllerData) {
+pub async fn fortunes(ctx: Context) {
     let all_rows: Vec<PgRow> = all_world_row().await;
     let mut fortunes_list: Vec<Fortunes> = all_rows
         .iter()
@@ -56,15 +56,14 @@ pub async fn fortunes(controller_data: ControllerData) {
     ));
     fortunes_list.sort_by(|it, next| it.message.cmp(&next.message));
     let res: String = FortunesTemplate::new(fortunes_list).to_string();
-    controller_data
-        .set_response_header(CONTENT_TYPE, content_type_charset(TEXT_HTML, UTF8))
+    ctx.set_response_header(CONTENT_TYPE, content_type_charset(TEXT_HTML, UTF8))
         .await
         .set_response_body(res)
         .await;
 }
 
-pub async fn updates(controller_data: ControllerData) {
-    let queries: Queries = controller_data
+pub async fn updates(ctx: Context) {
+    let queries: Queries = ctx
         .get_request_query("q")
         .await
         .and_then(|queries| queries.parse::<Queries>().ok())
@@ -72,27 +71,21 @@ pub async fn updates(controller_data: ControllerData) {
         .min(ROW_LIMIT as Queries)
         .max(1);
     let res: Vec<QueryRow> = update_world_rows(queries).await;
-    let _ = controller_data
+    let _ = ctx
         .set_response_body(serde_json::to_string(&res).unwrap_or_default())
         .await;
 }
 
-pub async fn cached_queries(controller_data: ControllerData) {
-    let count: Queries = controller_data
+pub async fn cached_queries(ctx: Context) {
+    let count: Queries = ctx
         .get_request_query("c")
         .await
         .and_then(|queries| queries.parse::<Queries>().ok())
         .unwrap_or_default()
         .min(ROW_LIMIT as Queries)
         .max(1);
-    let res: Vec<QueryRow> = CACHE
-        .get()
-        .unwrap_or(&Vec::new())
-        .iter()
-        .take(count as usize)
-        .cloned()
-        .collect();
-    let _ = controller_data
+    let res: Vec<QueryRow> = CACHE.iter().take(count as usize).cloned().collect();
+    let _ = ctx
         .set_response_body(serde_json::to_string(&res).unwrap_or_default())
         .await;
 }

+ 25 - 2
frameworks/Rust/hyperlane/src/server.rs

@@ -1,9 +1,23 @@
 use crate::*;
+use tokio::runtime::{Builder, Runtime};
 
-pub async fn run_server() {
+fn runtime() -> Runtime {
+    Builder::new_multi_thread()
+        .worker_threads(get_thread_count())
+        .thread_stack_size(2097152)
+        .max_blocking_threads(5120)
+        .max_io_events_per_tick(5120)
+        .enable_all()
+        .build()
+        .unwrap()
+}
+
+async fn init_server() {
     let server: Server = Server::new();
     server.host("0.0.0.0").await;
     server.port(8080).await;
+    server.disable_linger().await;
+    server.disable_nodelay().await;
     server.disable_log().await;
     server.disable_inner_log().await;
     server.disable_inner_print().await;
@@ -18,5 +32,14 @@ pub async fn run_server() {
     server.route("/upda", updates).await;
     server.request_middleware(request).await;
     server.response_middleware(response).await;
-    server.listen().await;
+    server.listen().await.unwrap();
+}
+
+async fn init() {
+    init_db().await;
+    init_server().await;
+}
+
+pub fn run_server() {
+    runtime().block_on(init());
 }