Browse Source

Migrate warp-rust to use sqlx (#6354)

In the past, warp-rust has used tokio-postgres which because
it was compatible with async code and there was no real
alternative to it - it wasn't great, but there was nothing
better.

However, since then sqlx was released which provides a nicer
interface than very low-level interface of tokio-postgres
while providing nice features such as built-in connection
pooling, automatic statement preparation and caching,
fetching results directly into structures, and more.
Konrad Borowski 4 years ago
parent
commit
d57aa8983b

+ 1 - 1
frameworks/Rust/warp-rust/Cargo.toml

@@ -8,8 +8,8 @@ edition = "2018"
 futures = "0.3.12"
 rand = { version = "0.8.2", features = ["small_rng"] }
 serde = { version = "1.0.120", features = ["derive"] }
+sqlx = { version = "0.5.1", features = ["runtime-tokio-rustls", "postgres"] }
 tokio = { version = "1.0.2", features = ["macros", "rt-multi-thread"] }
-tokio-postgres = "0.7.0"
 warp = "0.3.0"
 yarte = "0.14.1"
 

+ 6 - 1
frameworks/Rust/warp-rust/README.md

@@ -2,7 +2,7 @@
 
 warp is a composable web server framework based on hyper.
 
-* [API Documentation](https://docs.rs/warp/0.2)
+* [API Documentation](https://docs.rs/warp/0.3)
 
 ### Test Type Implementation Source Code
 
@@ -10,6 +10,7 @@ warp is a composable web server framework based on hyper.
 * [PLAINTEXT](src/main.rs)
 * [DB](src/main.rs)
 * [QUERIES](src/main.rs)
+* [FORTUNES](src/main.rs)
 
 ## Test URLs
 ### JSON
@@ -27,3 +28,7 @@ http://localhost:8080/db
 ### QUERIES
 
 http://localhost:8080/queries/[1...500]
+
+### FORTUNES
+
+http://localhost:8080/fortunes

+ 0 - 58
frameworks/Rust/warp-rust/src/db.rs

@@ -1,58 +0,0 @@
-use serde::Serialize;
-use tokio_postgres::{Client, Error, NoTls, Statement};
-
-const DATABASE_URL: &str = "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
-
-pub struct Database {
-    client: Client,
-    world: Statement,
-    fortune: Statement,
-}
-
-#[derive(Serialize)]
-pub struct World {
-    pub id: i32,
-    pub randomnumber: i32,
-}
-
-pub struct Fortune {
-    pub id: i32,
-    pub message: String,
-}
-
-impl Database {
-    pub async fn connect() -> Result<Self, Error> {
-        let (client, connection) = tokio_postgres::connect(DATABASE_URL, NoTls).await?;
-        tokio::spawn(async { connection.await.unwrap() });
-        let world = client
-            .prepare("SELECT id, randomnumber FROM world WHERE id=$1")
-            .await?;
-        let fortune = client.prepare("SELECT id, message FROM fortune").await?;
-        Ok(Self {
-            client,
-            world,
-            fortune,
-        })
-    }
-
-    pub async fn get_world_by_id(&self, id: i32) -> World {
-        let row = self.client.query_one(&self.world, &[&id]).await.unwrap();
-        World {
-            id: row.get(0),
-            randomnumber: row.get(1),
-        }
-    }
-
-    pub async fn query_fortunes(&self) -> Vec<Fortune> {
-        self.client
-            .query(&self.fortune, &[])
-            .await
-            .unwrap()
-            .into_iter()
-            .map(|row| Fortune {
-                id: row.get(0),
-                message: row.get(1),
-            })
-            .collect()
-    }
-}

+ 42 - 25
frameworks/Rust/warp-rust/src/main.rs

@@ -1,17 +1,18 @@
-mod db;
-
-use crate::db::{Database, Fortune};
 use futures::stream::futures_unordered::FuturesUnordered;
 use futures::{FutureExt, StreamExt};
 use rand::distributions::{Distribution, Uniform};
 use rand::rngs::SmallRng;
 use rand::SeedableRng;
 use serde::Serialize;
+use sqlx::postgres::PgPool;
+use sqlx::FromRow;
 use std::cell::RefCell;
 use warp::http::header;
 use warp::{Filter, Rejection, Reply};
 use yarte::Template;
 
+const DATABASE_URL: &str = "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
+
 // SmallRng is not a CSPRNG. It's used specifically to match performance of
 // benchmarks in other programming languages where the default RNG algorithm
 // is not a CSPRNG. Most Rust programs will likely want to use
@@ -39,18 +40,31 @@ fn plaintext() -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
     warp::path!("plaintext").map(|| "Hello, World!")
 }
 
-fn db(database: &'static Database) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
+#[derive(Serialize, FromRow)]
+pub struct World {
+    pub id: i32,
+    pub randomnumber: i32,
+}
+
+impl World {
+    async fn get_by_id(pool: &PgPool, id: i32) -> Self {
+        sqlx::query_as("SELECT id, randomnumber FROM world WHERE id=$1")
+            .bind(id)
+            .fetch_one(pool)
+            .await
+            .unwrap()
+    }
+}
+
+fn db(pool: &'static PgPool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
     let between = Uniform::from(1..=10_000);
     warp::path!("db").and_then(move || async move {
         let id = with_rng(|rng| between.sample(rng));
-        let world = database.get_world_by_id(id).await;
-        Ok::<_, Rejection>(warp::reply::json(&world))
+        Ok::<_, Rejection>(warp::reply::json(&World::get_by_id(pool, id).await))
     })
 }
 
-fn queries(
-    database: &'static Database,
-) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
+fn queries(pool: &'static PgPool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
     let between = Uniform::from(1..=10_000);
     let clamped = warp::path!(u32).map(|queries: u32| queries.max(1).min(500));
     warp::path!("queries" / ..)
@@ -58,7 +72,7 @@ fn queries(
         .and_then(move |queries| {
             with_rng(|rng| {
                 (0..queries)
-                    .map(|_| database.get_world_by_id(between.sample(rng)))
+                    .map(|_| World::get_by_id(pool, between.sample(rng)))
                     .collect::<FuturesUnordered<_>>()
                     .collect::<Vec<_>>()
                     .map(|worlds| Ok::<_, Rejection>(warp::reply::json(&worlds)))
@@ -72,11 +86,18 @@ struct FortunesYarteTemplate {
     fortunes: Vec<Fortune>,
 }
 
-fn fortune(
-    database: &'static Database,
-) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
+#[derive(FromRow)]
+pub struct Fortune {
+    pub id: i32,
+    pub message: String,
+}
+
+fn fortune(pool: &'static PgPool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
     warp::path!("fortunes").and_then(move || async move {
-        let mut fortunes = database.query_fortunes().await;
+        let mut fortunes = sqlx::query_as("SELECT id, message FROM fortune")
+            .fetch_all(pool)
+            .await
+            .unwrap();
         fortunes.push(Fortune {
             id: 0,
             message: "Additional fortune added at request time.".into(),
@@ -88,22 +109,18 @@ fn fortune(
     })
 }
 
-fn routes(
-    database: &'static Database,
-) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
+fn routes(pool: &'static PgPool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
     json()
         .or(plaintext())
-        .or(db(database))
-        .or(queries(database))
-        .or(fortune(database))
+        .or(db(pool))
+        .or(queries(pool))
+        .or(fortune(pool))
         .map(|reply| warp::reply::with_header(reply, header::SERVER, "warp"))
 }
 
 #[tokio::main]
-async fn main() -> Result<(), tokio_postgres::Error> {
-    let database = Box::leak(Box::new(Database::connect().await?));
-    warp::serve(routes(database))
-        .run(([0, 0, 0, 0], 8080))
-        .await;
+async fn main() -> Result<(), sqlx::Error> {
+    let pool = Box::leak(Box::new(PgPool::connect(DATABASE_URL).await?));
+    warp::serve(routes(pool)).run(([0, 0, 0, 0], 8080)).await;
     Ok(())
 }