main.rs 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. use futures::stream::futures_unordered::FuturesUnordered;
  2. use futures::{FutureExt, StreamExt};
  3. use rand::distributions::{Distribution, Uniform};
  4. use rand::rngs::SmallRng;
  5. use rand::SeedableRng;
  6. use serde::Serialize;
  7. use sqlx::postgres::PgPool;
  8. use sqlx::FromRow;
  9. use std::cell::RefCell;
  10. use warp::http::header;
  11. use warp::{Filter, Rejection, Reply};
  12. use yarte::Template;
  13. const DATABASE_URL: &str = "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
  14. // SmallRng is not a CSPRNG. It's used specifically to match performance of
  15. // benchmarks in other programming languages where the default RNG algorithm
  16. // is not a CSPRNG. Most Rust programs will likely want to use
  17. // rand::thread_rng instead which is more convenient to use and safer.
  18. thread_local!(static RNG: RefCell<SmallRng> = RefCell::new(SmallRng::from_entropy()));
  19. fn with_rng<T>(f: impl FnOnce(&mut SmallRng) -> T) -> T {
  20. RNG.with(|rng| f(&mut rng.borrow_mut()))
  21. }
  22. #[derive(Serialize)]
  23. struct Message {
  24. message: &'static str,
  25. }
  26. fn json() -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
  27. warp::path!("json").map(|| {
  28. warp::reply::json(&Message {
  29. message: "Hello, world!",
  30. })
  31. })
  32. }
  33. fn plaintext() -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
  34. warp::path!("plaintext").map(|| "Hello, World!")
  35. }
  36. #[derive(Serialize, FromRow)]
  37. pub struct World {
  38. pub id: i32,
  39. pub randomnumber: i32,
  40. }
  41. impl World {
  42. async fn get_by_id(pool: &PgPool, id: i32) -> Self {
  43. sqlx::query_as("SELECT id, randomnumber FROM world WHERE id=$1")
  44. .bind(id)
  45. .fetch_one(pool)
  46. .await
  47. .unwrap()
  48. }
  49. }
  50. fn db(pool: &'static PgPool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
  51. let between = Uniform::from(1..=10_000);
  52. warp::path!("db").and_then(move || async move {
  53. let id = with_rng(|rng| between.sample(rng));
  54. Ok::<_, Rejection>(warp::reply::json(&World::get_by_id(pool, id).await))
  55. })
  56. }
  57. fn queries(pool: &'static PgPool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
  58. let between = Uniform::from(1..=10_000);
  59. let clamped = warp::path!(u32).map(|queries: u32| queries.clamp(1, 500));
  60. warp::path!("queries" / ..)
  61. .and(clamped.or(warp::any().map(|| 1)).unify())
  62. .and_then(move |queries| {
  63. with_rng(|rng| {
  64. (0..queries)
  65. .map(|_| World::get_by_id(pool, between.sample(rng)))
  66. .collect::<FuturesUnordered<_>>()
  67. .collect::<Vec<_>>()
  68. .map(|worlds| Ok::<_, Rejection>(warp::reply::json(&worlds)))
  69. })
  70. })
  71. }
  72. #[derive(Template)]
  73. #[template(path = "fortune")]
  74. struct FortunesYarteTemplate {
  75. fortunes: Vec<Fortune>,
  76. }
  77. #[derive(FromRow)]
  78. pub struct Fortune {
  79. pub id: i32,
  80. pub message: String,
  81. }
  82. fn fortune(pool: &'static PgPool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
  83. warp::path!("fortunes").and_then(move || async move {
  84. let mut fortunes = sqlx::query_as("SELECT id, message FROM fortune")
  85. .fetch_all(pool)
  86. .await
  87. .unwrap();
  88. fortunes.push(Fortune {
  89. id: 0,
  90. message: "Additional fortune added at request time.".into(),
  91. });
  92. fortunes.sort_by(|a, b| a.message.cmp(&b.message));
  93. Ok::<_, Rejection>(warp::reply::html(
  94. FortunesYarteTemplate { fortunes }.call().unwrap(),
  95. ))
  96. })
  97. }
  98. fn update(pool: &'static PgPool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
  99. let between = Uniform::from(1..=10_000);
  100. let clamped = warp::path!(u32).map(|queries: u32| queries.clamp(1, 500));
  101. warp::path!("updates" / ..)
  102. .and(clamped.or(warp::any().map(|| 1)).unify())
  103. .and_then(move |queries| async move {
  104. let mut worlds = with_rng(|rng| {
  105. (0..queries)
  106. .map(|_| World::get_by_id(pool, between.sample(rng)))
  107. .collect::<FuturesUnordered<_>>()
  108. .collect::<Vec<_>>()
  109. })
  110. .await;
  111. with_rng(|rng| {
  112. for world in &mut worlds {
  113. world.randomnumber = between.sample(rng);
  114. }
  115. });
  116. let mut transaction = pool.begin().await.unwrap();
  117. for world in &worlds {
  118. sqlx::query("UPDATE world SET randomnumber = $1 WHERE id = $2")
  119. .bind(world.randomnumber)
  120. .bind(world.id)
  121. .execute(&mut transaction)
  122. .await
  123. .unwrap();
  124. }
  125. transaction.commit().await.unwrap();
  126. Ok::<_, Rejection>(warp::reply::json(&worlds))
  127. })
  128. }
  129. fn routes(pool: &'static PgPool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
  130. json()
  131. .or(plaintext())
  132. .or(db(pool))
  133. .or(queries(pool))
  134. .or(fortune(pool))
  135. .or(update(pool))
  136. .map(|reply| warp::reply::with_header(reply, header::SERVER, "warp"))
  137. }
  138. #[tokio::main]
  139. async fn main() -> Result<(), sqlx::Error> {
  140. let pool = Box::leak(Box::new(PgPool::connect(DATABASE_URL).await?));
  141. warp::serve(routes(pool)).run(([0, 0, 0, 0], 8080)).await;
  142. Ok(())
  143. }