Browse Source

Merge pull request #6877 from dragosv/rocket-async-sqlx

Rocket async
Mike Smith 3 years ago
parent
commit
deebdf3637

+ 1 - 0
frameworks/Rust/rocket/.gitignore

@@ -0,0 +1 @@
+.env

+ 21 - 21
frameworks/Rust/rocket/Cargo.toml

@@ -1,26 +1,26 @@
 [package]
 [package]
-name = "rocket"
-version = "0.1.0"
-authors = ["Marcelo Barbosa <[email protected]>", "Brendan Hansknecht <[email protected]>"]
+name = "rocket_techempower"
+version = "0.2.0"
+authors = ["Marcelo Barbosa <[email protected]>", "Brendan Hansknecht <[email protected]>", "Dragos Varovici <[email protected]>"]
 edition = "2018"
 edition = "2018"
 
 
 [dependencies]
 [dependencies]
-diesel = { version = "1.4.5", features = ["postgres", "r2d2"] }
-num_cpus = "1.13.0"
-rand = "0.7.3"
-rocket = "0.4.5"
-serde = "1.0.115"
-serde_json = "1.0.57"
-serde_derive = "1.0.115"
-yarte = "0.12.2"
-lazy_static = "1.4.0"
+num_cpus = { version = "^1.13" }
+rand = { version = "^0.8" }
+yarte = { version = "^0.15" }
+lazy_static = { version = "^1.4" }
+async-stream = { version = "^0.3" }
+async-trait = { version = "0.1" }
+futures = { version = "^0.3" }
+futures-util = { version = "^0.3" }
+rocket = { git = "https://github.com/SergioBenitez/Rocket", features = [
+    "json",
+] }
+sqlx = { version = "^0.5", features = [ "postgres", "macros", "migrate", "sqlite" ] }
+rocket_db_pools = { git = "https://github.com/SergioBenitez/Rocket", features = [ "sqlx_postgres" ] }
+figment = { version = "^0.10" }
+dotenv = { version = "^0.15" }
 
 
-[dependencies.rocket_contrib]
-version = "*"
-default-features = false
-features = ["json"]
-
-[profile.release]
-lto = true
-opt-level = 3
-codegen-units = 1
+serde = { version = "^1" }
+serde_json = { version = "^1" }
+serde_derive = { version = "^1" }

+ 1 - 1
frameworks/Rust/rocket/config.toml

@@ -13,7 +13,7 @@ classification = "Fullstack"
 database = "Postgres"
 database = "Postgres"
 database_os = "Linux"
 database_os = "Linux"
 os = "Linux"
 os = "Linux"
-orm = "Full"
+orm = "Raw"
 platform = "Rust"
 platform = "Rust"
 webserver = "Hyper"
 webserver = "Hyper"
 versus = "None"
 versus = "None"

+ 10 - 0
frameworks/Rust/rocket/db/migrations/20211027024424_create-world-table.sql

@@ -0,0 +1,10 @@
+CREATE TABLE World (
+    id INTEGER PRIMARY KEY,
+    randomnumber INTEGER NOT NULL
+);
+
+INSERT INTO World (id, randomnumber) VALUES (1, 101);
+INSERT INTO World (id, randomnumber) VALUES (2, 102);
+INSERT INTO World (id, randomnumber) VALUES (3, 103);
+INSERT INTO World (id, randomnumber) VALUES (4, 104);
+INSERT INTO World (id, randomnumber) VALUES (5, 105);

+ 17 - 0
frameworks/Rust/rocket/db/migrations/20211028024424_create-fortune-table.sql

@@ -0,0 +1,17 @@
+CREATE TABLE Fortune (
+    id INTEGER PRIMARY KEY,
+    message VARCHAR NOT NULL
+);
+
+INSERT INTO Fortune (id, message) VALUES (1, 'fortune: No such file or directory');
+INSERT INTO Fortune (id, message) VALUES (2, 'A computer scientist is someone who fixes things that aren''t broken.');
+INSERT INTO Fortune (id, message) VALUES (3, 'After enough decimal places, nobody gives a damn.');
+INSERT INTO Fortune (id, message) VALUES (4, 'A bad random number generator: 1, 1, 1, 1, 1, 4.33e+67, 1, 1, 1');
+INSERT INTO Fortune (id, message) VALUES (5, 'A computer program does what you tell it to do, not what you want it to do.');
+INSERT INTO Fortune (id, message) VALUES (6, 'Emacs is a nice operating system, but I prefer UNIX. — Tom Christaensen');
+INSERT INTO Fortune (id, message) VALUES (7, 'Any program that runs right is obsolete.');
+INSERT INTO Fortune (id, message) VALUES (8, 'A list is only as strong as its weakest link. — Donald Knuth');
+INSERT INTO Fortune (id, message) VALUES (9, 'Feature: A bug with seniority.');
+INSERT INTO Fortune (id, message) VALUES (10, 'Computers make very fast, very accurate mistakes.');
+INSERT INTO Fortune (id, message) VALUES (11, '<script>alert("This should not be displayed in a browser alert box.");</script>');
+INSERT INTO Fortune (id, message) VALUES (12, 'フレームワークのベンチマーク');

+ 4 - 6
frameworks/Rust/rocket/rocket.dockerfile

@@ -1,18 +1,16 @@
-FROM rust:1.46.0-slim-buster
+FROM rust:1.55-slim-buster
 
 
-ENV DATABASE_URL=postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world
+ENV ROCKET_BENCHMARK_DATABASE_URL=postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world
 
 
 RUN apt-get update && apt-get install -y --no-install-recommends \
 RUN apt-get update && apt-get install -y --no-install-recommends \
     libpq-dev \
     libpq-dev \
 && rm -rf /var/lib/apt/lists/*
 && rm -rf /var/lib/apt/lists/*
 
 
-RUN rustup toolchain install nightly-2020-08-29 -t x86_64-unknown-linux-gnu --no-self-update --profile minimal
-
 ADD ./ /rocket
 ADD ./ /rocket
 WORKDIR /rocket
 WORKDIR /rocket
 
 
-RUN RUSTFLAGS="-C target-cpu=native" cargo +nightly-2020-08-29 build --release
+RUN RUSTFLAGS="-C target-cpu=native" cargo build --release
 
 
 EXPOSE 8000
 EXPOSE 8000
 
 
-CMD ./target/release/rocket
+CMD ./target/release/rocket_techempower

+ 5 - 0
frameworks/Rust/rocket/src/database.rs

@@ -0,0 +1,5 @@
+use rocket_db_pools::{sqlx, Database};
+
+#[derive(Database)]
+#[database("hello_world")]
+pub struct HelloWorld(sqlx::PgPool);

+ 0 - 35
frameworks/Rust/rocket/src/db.rs

@@ -1,35 +0,0 @@
-use diesel::pg::PgConnection;
-use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};
-use rocket::http::Status;
-use rocket::request::{self, FromRequest};
-use rocket::{Outcome, Request, State};
-use std::ops::Deref;
-
-type PgPool = Pool<ConnectionManager<PgConnection>>;
-
-pub struct DbConn(pub PooledConnection<ConnectionManager<PgConnection>>);
-
-impl<'a, 'r> FromRequest<'a, 'r> for DbConn {
-    type Error = ();
-
-    fn from_request(request: &'a Request<'r>) -> request::Outcome<Self, Self::Error> {
-        let pool = request.guard::<State<PgPool>>()?;
-        match pool.get() {
-            Ok(conn) => Outcome::Success(DbConn(conn)),
-            Err(_) => Outcome::Failure((Status::ServiceUnavailable, ())),
-        }
-    }
-}
-
-impl Deref for DbConn {
-    type Target = PgConnection;
-
-    fn deref(&self) -> &Self::Target {
-        &self.0
-    }
-}
-
-pub fn init_pool() -> PgPool {
-    let manager = ConnectionManager::<PgConnection>::new(env!("DATABASE_URL"));
-    Pool::builder().max_size((num_cpus::get()*16) as u32).build(manager).expect("db pool")
-}

+ 88 - 119
frameworks/Rust/rocket/src/main.rs

@@ -1,103 +1,60 @@
-#![feature(proc_macro_hygiene, decl_macro)]
-
-extern crate rand;
-#[macro_use]
-extern crate rocket;
-extern crate rocket_contrib;
 #[macro_use]
 #[macro_use]
-extern crate diesel;
+extern crate lazy_static;
 #[macro_use]
 #[macro_use]
+extern crate rocket;
 extern crate serde_derive;
 extern crate serde_derive;
-extern crate lazy_static;
-
-use diesel::prelude::*;
-use diesel::result::Error;
-use lazy_static::lazy_static;
-use rand::seq::SliceRandom;
-use rand::thread_rng;
-use rocket::config::{Config, Environment, LoggingLevel};
-use rocket::response::content;
-use rocket_contrib::json::Json;
-use std::sync::Mutex;
-use yarte::Template;
+extern crate dotenv;
 
 
-mod db;
 mod models;
 mod models;
-mod schema;
-
-struct RandomArray {
-    pointer: usize,
-    size: i32,
-    data: Vec<i32>,
-}
-
-impl RandomArray {
-    fn new(size: i32) -> Self {
-        let mut data: Vec<i32> = (1..=size).collect();
-        let mut rng = thread_rng();
-        data.shuffle(&mut rng);
-
-        RandomArray {
-            pointer: 0,
-            size,
-            data,
-        }
-    }
+mod random;
+mod database;
+
+use dotenv::dotenv;
+use std::net::{IpAddr, Ipv4Addr};
+use std::env;
+use rocket::{Rocket, Build};
+use rocket::serde::json::Json;
+use rocket::response::content::RawHtml;
+use rocket::config::{Config, LogLevel};
+use yarte::Template;
+use rocket_db_pools::{sqlx, Database, Connection};
+use sqlx::Acquire;
+use figment::Figment;
 
 
-    fn next(&mut self) -> i32 {
-        if self.pointer >= self.size as usize {
-            self.pointer = 1;
-        } else {
-            self.pointer += 1;
-        }
-        self.data[self.pointer - 1]
-    }
-}
-
-lazy_static! {
-    static ref RANDOM_ARRAY: Mutex<RandomArray> = Mutex::new(RandomArray::new(10000));
-}
-fn random_number() -> i32 {
-    RANDOM_ARRAY
-        .lock()
-        .expect("Failed to lock RANDOM_ARRAY")
-        .next()
-}
+use models::{World, Fortune, Message};
+use database::HelloWorld;
+use random::random_number;
 
 
 #[get("/plaintext")]
 #[get("/plaintext")]
-fn plaintext() -> &'static str {
+async fn plaintext() -> &'static str {
     "Hello, World!"
     "Hello, World!"
 }
 }
 
 
 #[get("/json")]
 #[get("/json")]
-fn json() -> Json<models::Message> {
-    let message = models::Message {
+async fn json() -> Json<models::Message> {
+    let message = Message {
         message: "Hello, World!",
         message: "Hello, World!",
     };
     };
     Json(message)
     Json(message)
 }
 }
 
 
 #[get("/db")]
 #[get("/db")]
-fn db(conn: db::DbConn) -> Json<models::World> {
-    use schema::world::dsl::*;
+async fn db(mut db: Connection<HelloWorld>) -> Json<World> {
+    let number = random_number();
 
 
-    let result = world
-        .filter(id.eq(random_number()))
-        .first::<models::World>(&*conn)
-        .expect("error loading world");
+    let result : World = sqlx::query_as("SELECT id, randomnumber FROM World WHERE id = $1").bind(number)
+        .fetch_one(&mut *db).await.ok().expect("error loading world");
 
 
     Json(result)
     Json(result)
 }
 }
 
 
 #[get("/queries")]
 #[get("/queries")]
-fn queries_empty(conn: db::DbConn) -> Json<Vec<models::World>> {
-    queries(conn, 1)
+async fn queries_empty(db: Connection<HelloWorld>) -> Json<Vec<World>> {
+    queries(db,1).await
 }
 }
 
 
 #[get("/queries?<q>")]
 #[get("/queries?<q>")]
-fn queries(conn: db::DbConn, q: u16) -> Json<Vec<models::World>> {
-    use schema::world::dsl::*;
-
+async fn queries(mut db: Connection<HelloWorld>, q: u16) -> Json<Vec<World>> {
     let q = if q == 0 {
     let q = if q == 0 {
         1
         1
     } else if q > 500 {
     } else if q > 500 {
@@ -110,10 +67,10 @@ fn queries(conn: db::DbConn, q: u16) -> Json<Vec<models::World>> {
 
 
     for _ in 0..q {
     for _ in 0..q {
         let query_id = random_number();
         let query_id = random_number();
-        let result = world
-            .filter(id.eq(query_id))
-            .first::<models::World>(&*conn)
-            .unwrap_or_else(|_| panic!("error loading world, id={}", query_id));
+
+        let result :World = sqlx::query_as("SELECT * FROM World WHERE id = $1").bind(query_id)
+            .fetch_one(&mut *db).await.ok().expect("error loading world");
+
         results.push(result);
         results.push(result);
     }
     }
 
 
@@ -123,25 +80,22 @@ fn queries(conn: db::DbConn, q: u16) -> Json<Vec<models::World>> {
 #[derive(Template)]
 #[derive(Template)]
 #[template(path = "fortunes.html.hbs")]
 #[template(path = "fortunes.html.hbs")]
 pub struct FortunesTemplate<'a> {
 pub struct FortunesTemplate<'a> {
-    pub fortunes: &'a Vec<models::Fortune>,
+    pub fortunes: &'a Vec<Fortune>,
 }
 }
 
 
 #[get("/fortunes")]
 #[get("/fortunes")]
-fn fortunes(conn: db::DbConn) -> content::Html<String> {
-    use schema::fortune::dsl::*;
-
-    let mut fortunes = fortune
-        .load::<models::Fortune>(&*conn)
-        .expect("error loading fortunes");
+async fn fortunes(mut db: Connection<HelloWorld>) -> RawHtml<String> {
+    let mut fortunes: Vec<Fortune> = sqlx::query_as("SELECT * FROM Fortune").fetch_all(&mut *db).await
+        .ok().expect("Could not load Fortunes");
 
 
-    fortunes.push(models::Fortune {
+    fortunes.push(Fortune {
         id: 0,
         id: 0,
         message: "Additional fortune added at request time.".to_string(),
         message: "Additional fortune added at request time.".to_string(),
     });
     });
 
 
     fortunes.sort_by(|a, b| a.message.cmp(&b.message));
     fortunes.sort_by(|a, b| a.message.cmp(&b.message));
 
 
-    content::Html(
+    RawHtml(
         FortunesTemplate {
         FortunesTemplate {
             fortunes: &fortunes,
             fortunes: &fortunes,
         }
         }
@@ -151,14 +105,12 @@ fn fortunes(conn: db::DbConn) -> content::Html<String> {
 }
 }
 
 
 #[get("/updates")]
 #[get("/updates")]
-fn updates_empty(conn: db::DbConn) -> Json<Vec<models::World>> {
-    updates(conn, 1)
+async fn updates_empty(db: Connection<HelloWorld>) -> Json<Vec<World>> {
+    updates(db,1).await
 }
 }
 
 
 #[get("/updates?<q>")]
 #[get("/updates?<q>")]
-fn updates(conn: db::DbConn, q: u16) -> Json<Vec<models::World>> {
-    use schema::world::dsl::*;
-
+async fn updates(mut db: Connection<HelloWorld>, q: u16) -> Json<Vec<World>> {
     let q = if q == 0 {
     let q = if q == 0 {
         1
         1
     } else if q > 500 {
     } else if q > 500 {
@@ -171,39 +123,56 @@ fn updates(conn: db::DbConn, q: u16) -> Json<Vec<models::World>> {
 
 
     for _ in 0..q {
     for _ in 0..q {
         let query_id = random_number();
         let query_id = random_number();
-        let mut result = world
-            .filter(id.eq(query_id))
-            .first::<models::World>(&*conn)
-            .unwrap_or_else(|_| panic!("error loading world, id={}", query_id));
-        result.randomNumber = random_number();
+        let mut result :World = sqlx::query_as("SELECT * FROM World WHERE id = $1").bind(query_id)
+            .fetch_one(&mut *db).await.ok().expect("World was not found");
+
+        result.random_number = random_number();
         results.push(result);
         results.push(result);
     }
     }
 
 
-    let _ = conn.transaction::<(), Error, _>(|| {
-        for w in &results {
-            let _ = diesel::update(world)
-                .filter(id.eq(w.id))
-                .set(randomnumber.eq(w.randomNumber))
-                .execute(&*conn);
-        }
-        Ok(())
-    });
+    let mut pool = db.into_inner();
+    let mut tx = pool.begin().await.ok().expect("could not start transaction");
+
+    for w in &results {
+        sqlx::query("UPDATE World SET randomnumber = $1 WHERE id = $2")
+            .bind(w.random_number).bind(w.id)
+            .execute(&mut tx)
+            .await.ok().expect("Could not update World");
+    }
+
+    tx.commit().await.ok().expect("could not update worlds");
 
 
     Json(results)
     Json(results)
 }
 }
 
 
-fn main() {
-    let mut config = Config::build(Environment::Production)
-        .address("0.0.0.0")
-        .port(8000)
-        .log_level(LoggingLevel::Off)
-        .workers((num_cpus::get() * 16) as u16)
-        .keep_alive(0)
-        .expect("failed to generate config");
-    config
-        .set_secret_key("dY+Rj2ybjGxKetLawKGSWi6EzESKejvENbQ3stffZg0=")
-        .expect("failed to set secret");
-    rocket::custom(config)
+#[launch]
+pub fn launch() -> Rocket<Build> {
+    if cfg!(not(test)) {
+        dotenv().ok();
+    }
+
+    let config = Config {
+        address: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
+        port: 8000,
+        keep_alive: 0,
+        log_level: LogLevel::Off,
+        workers: num_cpus::get() * 16,
+        ..Default::default()
+    };
+
+    let database_url = env::var("ROCKET_BENCHMARK_DATABASE_URL").ok()
+        .expect("ROCKET_BENCHMARK_DATABASE_URL environment variable was not set");
+
+    let figment = Figment::from(config)
+        .merge(("databases.hello_world", rocket_db_pools::Config {
+                url: database_url,
+                min_connections: None,
+                max_connections: 100,
+                connect_timeout: 3,
+                idle_timeout: None,
+            }));
+
+    rocket::custom(figment)
         .mount(
         .mount(
             "/",
             "/",
             routes![
             routes![
@@ -217,6 +186,6 @@ fn main() {
                 updates_empty,
                 updates_empty,
             ],
             ],
         )
         )
-        .manage(db::init_pool())
-        .launch();
+        .attach(HelloWorld::init())
 }
 }
+

+ 17 - 6
frameworks/Rust/rocket/src/models.rs

@@ -1,17 +1,28 @@
+use rocket::serde::{Deserialize, Serialize};
+use sqlx::FromRow;
+
 #[derive(Serialize)]
 #[derive(Serialize)]
 pub struct Message {
 pub struct Message {
     pub message: &'static str,
     pub message: &'static str,
 }
 }
 
 
 #[allow(non_snake_case)]
 #[allow(non_snake_case)]
-#[derive(Serialize, Queryable)]
-pub struct World {
+#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, FromRow)]
+#[serde(crate = "rocket::serde")]
+pub struct Fortune {
     pub id: i32,
     pub id: i32,
-    pub randomNumber: i32,
+    pub message: String
 }
 }
 
 
-#[derive(Serialize, Queryable)]
-pub struct Fortune {
+#[allow(non_snake_case)]
+#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, FromRow)]
+#[serde(crate = "rocket::serde")]
+pub struct World {
     pub id: i32,
     pub id: i32,
-    pub message: String,
+    #[sqlx(rename = "randomnumber")]
+    #[serde(rename = "randomNumber")]
+    pub random_number: i32
 }
 }
+
+
+

+ 47 - 0
frameworks/Rust/rocket/src/random.rs

@@ -0,0 +1,47 @@
+extern crate lazy_static;
+extern crate rand;
+
+use rand::seq::SliceRandom;
+use rand::thread_rng;
+use std::sync::Mutex;
+
+lazy_static! {
+    static ref RANDOM_ARRAY: Mutex<RandomArray> = Mutex::new(RandomArray::new(10000));
+}
+
+pub fn random_number() -> i32 {
+    RANDOM_ARRAY
+        .lock()
+        .expect("Failed to lock RANDOM_ARRAY")
+        .next()
+}
+
+struct RandomArray {
+    pointer: usize,
+    size: i32,
+    data: Vec<i32>,
+}
+
+impl RandomArray {
+    fn new(size: i32) -> Self {
+        let mut data: Vec<i32> = (1..=size).collect();
+        let mut rng = thread_rng();
+        data.shuffle(&mut rng);
+
+        RandomArray {
+            pointer: 0,
+            size,
+            data,
+        }
+    }
+
+    fn next(&mut self) -> i32 {
+        if self.pointer >= self.size as usize {
+            self.pointer = 1;
+        } else {
+            self.pointer += 1;
+        }
+        self.data[self.pointer - 1]
+    }
+}
+

+ 0 - 15
frameworks/Rust/rocket/src/schema.rs

@@ -1,15 +0,0 @@
-#![allow(non_snake_case)]
-
-table! {
-    world (id) {
-        id -> Integer,
-        randomnumber -> Integer,
-    }
-}
-
-table! {
-    fortune (id) {
-        id -> Integer,
-        message -> Text,
-    }
-}