Browse Source

use async diesel as ORM (#7044)

fakeshadow 3 years ago
parent
commit
9b849f4238

+ 14 - 7
frameworks/Rust/xitca-web/Cargo.toml

@@ -21,7 +21,8 @@ xitca-web = "0.1"
 ahash = { version = "0.7.6", features = ["compile-time-rng"] }
 ahash = { version = "0.7.6", features = ["compile-time-rng"] }
 atoi = "0.4.0"
 atoi = "0.4.0"
 core_affinity = "0.5.10"
 core_affinity = "0.5.10"
-diesel = { version = "1.4.8", features = ["postgres"] }
+diesel = { git = "https://github.com/diesel-rs/diesel.git", rev = "37ec18f46ced2d6e9197414156fdb705d7a61426", default-features = false }
+diesel-async = { version = "0.1.0", features = ["postgres"], default-features = false }
 futures-util = { version = "0.3.18", default-features = false, features = ["alloc"] }
 futures-util = { version = "0.3.18", default-features = false, features = ["alloc"] }
 mimalloc = { version = "0.1.27", default-features = false }
 mimalloc = { version = "0.1.27", default-features = false }
 rand = { version = "0.8", default-features = false, features = ["min_const_gen", "small_rng"] }
 rand = { version = "0.8", default-features = false, features = ["min_const_gen", "small_rng"] }
@@ -39,9 +40,15 @@ codegen-units = 1
 panic = "abort"
 panic = "abort"
 
 
 [patch.crates-io]
 [patch.crates-io]
-xitca-http = { git = "https://github.com/fakeshadow/xitca-web.git", rev = "958e3c2205e6036c7e0cbf4813d08ddf6029fd1d" }
-xitca-http-codegen = { git = "https://github.com/fakeshadow/xitca-web.git", rev = "958e3c2205e6036c7e0cbf4813d08ddf6029fd1d" }
-xitca-io = { git = "https://github.com/fakeshadow/xitca-web.git", rev = "958e3c2205e6036c7e0cbf4813d08ddf6029fd1d" }
-xitca-server = { git = "https://github.com/fakeshadow/xitca-web.git", rev = "958e3c2205e6036c7e0cbf4813d08ddf6029fd1d" }
-xitca-service = { git = "https://github.com/fakeshadow/xitca-web.git", rev = "958e3c2205e6036c7e0cbf4813d08ddf6029fd1d" }
-xitca-web = { git = "https://github.com/fakeshadow/xitca-web.git", rev = "958e3c2205e6036c7e0cbf4813d08ddf6029fd1d" }
+xitca-http = { git = "https://github.com/fakeshadow/xitca-web.git", rev = "59827177f6c319c6fa9940fe5f146754fff90aad" }
+xitca-http-codegen = { git = "https://github.com/fakeshadow/xitca-web.git", rev = "59827177f6c319c6fa9940fe5f146754fff90aad" }
+xitca-io = { git = "https://github.com/fakeshadow/xitca-web.git", rev = "59827177f6c319c6fa9940fe5f146754fff90aad" }
+xitca-server = { git = "https://github.com/fakeshadow/xitca-web.git", rev = "59827177f6c319c6fa9940fe5f146754fff90aad" }
+xitca-service = { git = "https://github.com/fakeshadow/xitca-web.git", rev = "59827177f6c319c6fa9940fe5f146754fff90aad" }
+xitca-web = { git = "https://github.com/fakeshadow/xitca-web.git", rev = "59827177f6c319c6fa9940fe5f146754fff90aad" }
+
+diesel = { git = "https://github.com/diesel-rs/diesel.git", rev = "37ec18f46ced2d6e9197414156fdb705d7a61426" }
+diesel-async = { git = "https://github.com/weiznich/diesel_async.git", rev = "06b3416826dbc8ce404f6d613daea989b23549ca" }
+
+# this is not right. but not much can be done when the maintainer is unactive.
+sailfish = { git = "https://github.com/jdrouet/sailfish", rev = "7d2b59247eaab10b67311d6c1c7d50a7d751d791" }

+ 90 - 67
frameworks/Rust/xitca-web/src/db_diesel.rs

@@ -1,12 +1,11 @@
-use std::{error::Error, fmt, future::Future, io, time::Duration};
+use std::{cell::RefCell, error::Error, fmt, future::Future, io, time::Duration};
 
 
-use diesel::prelude::*;
+use diesel::prelude::{ConnectionError, ExpressionMethods, QueryDsl};
+use diesel_async::{AsyncConnection, AsyncPgConnection, RunQueryDsl};
+use futures_util::stream::{FuturesUnordered, TryStreamExt};
 use rand::{rngs::SmallRng, Rng, SeedableRng};
 use rand::{rngs::SmallRng, Rng, SeedableRng};
 use tang_rs::{Manager, ManagerFuture, ManagerTimeout, Pool};
 use tang_rs::{Manager, ManagerFuture, ManagerTimeout, Pool};
-use tokio::{
-    task::spawn_blocking,
-    time::{sleep, Sleep},
-};
+use tokio::time::{sleep, Sleep};
 
 
 use super::ser::{Fortune, Fortunes, World};
 use super::ser::{Fortune, Fortunes, World};
 
 
@@ -15,14 +14,17 @@ type DbResult<T> = Result<T, Box<dyn Error + Send + Sync + 'static>>;
 pub struct DieselPoolManager(String);
 pub struct DieselPoolManager(String);
 
 
 impl Manager for DieselPoolManager {
 impl Manager for DieselPoolManager {
-    type Connection = (PgConnection, SmallRng);
+    type Connection = AsyncPgConnection;
     type Error = DieselPoolError;
     type Error = DieselPoolError;
     type Timeout = Sleep;
     type Timeout = Sleep;
     type TimeoutError = ();
     type TimeoutError = ();
 
 
     fn connect(&self) -> ManagerFuture<Result<Self::Connection, Self::Error>> {
     fn connect(&self) -> ManagerFuture<Result<Self::Connection, Self::Error>> {
-        let conn = PgConnection::establish(self.0.as_str());
-        Box::pin(async move { Ok((conn?, SmallRng::from_entropy())) })
+        let url = self.0.clone();
+        Box::pin(async move {
+            let conn = AsyncPgConnection::establish(url.as_str()).await?;
+            Ok(conn)
+        })
     }
     }
 
 
     fn is_valid<'a>(
     fn is_valid<'a>(
@@ -85,8 +87,10 @@ impl From<()> for DieselPoolError {
     }
     }
 }
 }
 
 
-#[derive(Clone)]
-pub struct DieselPool(Pool<DieselPoolManager>);
+pub struct DieselPool {
+    rng: RefCell<SmallRng>,
+    pool: Pool<DieselPoolManager>,
+}
 
 
 pub async fn create(config: &str) -> io::Result<DieselPool> {
 pub async fn create(config: &str) -> io::Result<DieselPool> {
     let pool = tang_rs::Builder::new()
     let pool = tang_rs::Builder::new()
@@ -99,98 +103,117 @@ pub async fn create(config: &str) -> io::Result<DieselPool> {
         .await
         .await
         .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
         .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
 
 
-    Ok(DieselPool(pool))
+    Ok(DieselPool {
+        rng: RefCell::new(SmallRng::from_entropy()),
+        pool,
+    })
 }
 }
 
 
 impl DieselPool {
 impl DieselPool {
     pub async fn get_world(&self) -> DbResult<World> {
     pub async fn get_world(&self) -> DbResult<World> {
-        let mut conn = self.0.get_owned().await?;
+        let mut conn = self.pool.get().await?;
 
 
-        spawn_blocking(move || {
-            use crate::schema::world::dsl::*;
+        let random_id = self.rng.borrow_mut().gen_range(1..10_001);
 
 
-            let (c, rng) = &mut *conn;
+        use crate::schema::world::dsl::*;
 
 
-            let random_id = rng.gen_range(1..10_001);
-            let w = world
-                .filter(id.eq(random_id))
-                .load::<World>(c)?
-                .pop()
-                .unwrap();
+        let w = world
+            .filter(id.eq(random_id))
+            .load::<World>(&mut *conn)
+            .await?
+            .pop()
+            .unwrap();
 
 
-            Ok(w)
-        })
-        .await?
+        Ok(w)
     }
     }
 
 
     pub async fn get_worlds(&self, num: u16) -> DbResult<Vec<World>> {
     pub async fn get_worlds(&self, num: u16) -> DbResult<Vec<World>> {
-        let mut conn = self.0.get_owned().await?;
+        let worlds = {
+            let mut rng = self.rng.borrow_mut();
+            (0..num)
+                .map(|_| {
+                    let w_id = (rng.gen::<u32>() % 10_000 + 1) as i32;
 
 
-        spawn_blocking(move || {
-            use crate::schema::world::dsl::*;
+                    async move {
+                        let mut conn = self.pool.get().await?;
 
 
-            let (c, rng) = &mut *conn;
+                        use crate::schema::world::dsl::*;
 
 
-            (0..num)
-                .map(|_| {
-                    let w_id = rng.gen_range(1..10_001);
-                    let w = world.filter(id.eq(w_id)).load::<World>(c)?.pop().unwrap();
-                    Ok(w)
+                        let w = world
+                            .filter(id.eq(w_id))
+                            .load::<World>(&mut *conn)
+                            .await?
+                            .pop()
+                            .unwrap();
+
+                        Ok(w)
+                    }
                 })
                 })
-                .collect()
-        })
-        .await?
+                .collect::<FuturesUnordered<_>>()
+        };
+
+        worlds.try_collect().await
     }
     }
 
 
     pub async fn update(&self, num: u16) -> DbResult<Vec<World>> {
     pub async fn update(&self, num: u16) -> DbResult<Vec<World>> {
-        let mut conn = self.0.get_owned().await?;
+        use crate::schema::world::dsl::*;
 
 
-        spawn_blocking(move || {
-            use crate::schema::world::dsl::*;
+        let worlds = {
+            let mut rng = self.rng.borrow_mut();
+            (0..num)
+                .map(|_| {
+                    let w_id = rng.gen_range::<i32, _>(1..10_001);
+                    let new_id = rng.gen_range::<i32, _>(1..10_001);
 
 
-            let (c, rng) = &mut *conn;
+                    async move {
+                        let mut conn = self.pool.get().await?;
 
 
-            let mut worlds = (0..num)
-                .map(|_| {
-                    let w_id: i32 = rng.gen_range(1..10_001);
-                    let mut w = world.filter(id.eq(w_id)).load::<World>(c)?.pop().unwrap();
-                    w.randomnumber = rng.gen_range(1..10_001);
-                    Ok(w)
+                        let mut w = world
+                            .filter(id.eq(w_id))
+                            .load::<World>(&mut *conn)
+                            .await?
+                            .pop()
+                            .unwrap();
+
+                        w.randomnumber = new_id;
+
+                        DbResult::Ok(w)
+                    }
                 })
                 })
-                .collect::<DbResult<Vec<_>>>()?;
+                .collect::<FuturesUnordered<_>>()
+        };
+
+        let mut worlds = worlds.try_collect::<Vec<_>>().await?;
 
 
-            worlds.sort_by_key(|w| w.id);
+        worlds.sort_by_key(|w| w.id);
 
 
-            c.transaction::<_, diesel::result::Error, _>(|| {
+        let mut conn = self.pool.get().await?;
+
+        conn.transaction(move |conn| {
+            Box::pin(async move {
                 for w in &worlds {
                 for w in &worlds {
                     diesel::update(world)
                     diesel::update(world)
                         .filter(id.eq(w.id))
                         .filter(id.eq(w.id))
                         .set(randomnumber.eq(w.randomnumber))
                         .set(randomnumber.eq(w.randomnumber))
-                        .execute(c)?;
+                        .execute(conn)
+                        .await?;
                 }
                 }
-                Ok(())
-            })?;
-
-            Ok(worlds)
+                Ok(worlds)
+            })
         })
         })
-        .await?
+        .await
     }
     }
 
 
     pub async fn tell_fortune(&self) -> DbResult<Fortunes> {
     pub async fn tell_fortune(&self) -> DbResult<Fortunes> {
-        let mut conn = self.0.get_owned().await?;
+        let mut conn = self.pool.get().await?;
 
 
-        spawn_blocking(move || {
-            use crate::schema::fortune::dsl::*;
+        use crate::schema::fortune::dsl::*;
 
 
-            let (c, _) = &mut *conn;
+        let mut items = fortune.load::<Fortune>(&mut *conn).await?;
 
 
-            let mut items = fortune.load::<Fortune>(c)?;
+        items.push(Fortune::new(0, "Additional fortune added at request time."));
+        items.sort_by(|it, next| it.message.cmp(&next.message));
 
 
-            items.push(Fortune::new(0, "Additional fortune added at request time."));
-            items.sort_by(|it, next| it.message.cmp(&next.message));
-
-            Ok(Fortunes::new(items))
-        })
-        .await?
+        Ok(Fortunes::new(items))
     }
     }
 }
 }

+ 5 - 3
frameworks/Rust/xitca-web/src/main.rs

@@ -24,9 +24,9 @@ use xitca_http::{
     http::{
     http::{
         self,
         self,
         header::{CONTENT_TYPE, SERVER},
         header::{CONTENT_TYPE, SERVER},
-        IntoResponse,
+        IntoResponse, Method,
     },
     },
-    util::service::get,
+    util::service::Route,
     HttpServiceBuilder,
     HttpServiceBuilder,
 };
 };
 use xitca_server::Builder;
 use xitca_server::Builder;
@@ -56,7 +56,9 @@ async fn main() -> io::Result<()> {
             .disable_vectored_write()
             .disable_vectored_write()
             .max_request_headers::<8>();
             .max_request_headers::<8>();
 
 
-        HttpServiceBuilder::h1(get(http)).config(config)
+        let route = Route::new(http).methods([Method::GET]);
+
+        HttpServiceBuilder::h1(route).config(config)
     };
     };
 
 
     Builder::new()
     Builder::new()

+ 5 - 5
frameworks/Rust/xitca-web/src/main_diesel.rs

@@ -37,8 +37,8 @@ async fn main() -> io::Result<()> {
 
 
     HttpServer::new(move || {
     HttpServer::new(move || {
         App::with_async_state(move || async move {
         App::with_async_state(move || async move {
-            let pool = create(config).await.unwrap();
-            AppState::new(pool)
+            let pool = create(config).await.map_err(|_| ())?;
+            Ok(AppState::new(pool))
         })
         })
         .service(fn_service(handle))
         .service(fn_service(handle))
     })
     })
@@ -50,7 +50,7 @@ async fn main() -> io::Result<()> {
 }
 }
 
 
 async fn handle(req: &mut WebRequest<'_, State>) -> HandleResult {
 async fn handle(req: &mut WebRequest<'_, State>) -> HandleResult {
-    let inner = req.request_mut();
+    let inner = req.req_mut();
 
 
     match (inner.method(), inner.uri().path()) {
     match (inner.method(), inner.uri().path()) {
         (&Method::GET, "/plaintext") => plain_text(req),
         (&Method::GET, "/plaintext") => plain_text(req),
@@ -86,7 +86,7 @@ async fn fortunes(req: &mut WebRequest<'_, State>) -> HandleResult {
 }
 }
 
 
 async fn queries(req: &mut WebRequest<'_, State>) -> HandleResult {
 async fn queries(req: &mut WebRequest<'_, State>) -> HandleResult {
-    let num = req.request_mut().uri().query().parse_query();
+    let num = req.req_mut().uri().query().parse_query();
 
 
     match req.state().client().get_worlds(num).await {
     match req.state().client().get_worlds(num).await {
         Ok(worlds) => _json(req, worlds.as_slice()),
         Ok(worlds) => _json(req, worlds.as_slice()),
@@ -95,7 +95,7 @@ async fn queries(req: &mut WebRequest<'_, State>) -> HandleResult {
 }
 }
 
 
 async fn updates(req: &mut WebRequest<'_, State>) -> HandleResult {
 async fn updates(req: &mut WebRequest<'_, State>) -> HandleResult {
-    let num = req.request_mut().uri().query().parse_query();
+    let num = req.req_mut().uri().query().parse_query();
 
 
     match req.state().client().update(num).await {
     match req.state().client().update(num).await {
         Ok(worlds) => _json(req, worlds.as_slice()),
         Ok(worlds) => _json(req, worlds.as_slice()),

+ 2 - 2
frameworks/Rust/xitca-web/xitca-web-diesel.dockerfile

@@ -1,9 +1,9 @@
-FROM rust:1.56
+FROM rust:1.58
 
 
 ADD ./ /xitca-web
 ADD ./ /xitca-web
 WORKDIR /xitca-web
 WORKDIR /xitca-web
 
 
-RUN rustup default nightly-2021-11-27
+RUN rustup default nightly-2022-01-26
 RUN cargo clean
 RUN cargo clean
 RUN RUSTFLAGS="-C target-cpu=native" cargo build --release --bin xitca-web-diesel
 RUN RUSTFLAGS="-C target-cpu=native" cargo build --release --bin xitca-web-diesel
 
 

+ 2 - 2
frameworks/Rust/xitca-web/xitca-web.dockerfile

@@ -1,9 +1,9 @@
-FROM rust:1.56
+FROM rust:1.58
 
 
 ADD ./ /xitca-web
 ADD ./ /xitca-web
 WORKDIR /xitca-web
 WORKDIR /xitca-web
 
 
-RUN rustup default nightly-2021-11-27
+RUN rustup default nightly-2022-01-26
 RUN cargo clean
 RUN cargo clean
 RUN RUSTFLAGS="-C target-cpu=native" cargo build --release --bin xitca-web
 RUN RUSTFLAGS="-C target-cpu=native" cargo build --release --bin xitca-web