Sfoglia il codice sorgente

Add water-http framework (#10255)

* Add water-http framework

* benchmark_config.json update
Hassan Sharara 1 mese fa
parent
commit
eeda59f533

+ 3 - 0
frameworks/Rust/water-http/.gitignore

@@ -0,0 +1,3 @@
+target
+Cargo.lock
+.idea

+ 45 - 0
frameworks/Rust/water-http/Cargo.toml

@@ -0,0 +1,45 @@
+[package]
+name = "water-http"
+version = "0.1.0"
+edition = "2018"
+
+[dependencies]
+askama = "0.14.0"
+tokio = { version = "1.47.1", features = ["full"] }
+water_http = {git = "https://github.com/HassanSharara/water_http.git", branch = "beta", features = ["use_only_http1"],optional = true , version = "3.0.8-beta.5" }
+smallvec = "1.15.1"
+nanorand = "0.8.0"
+tokio-postgres = "0.7.15"
+sonic-rs = "0.5.5"
+bytes = "1.10.1"
+serde = { version = "1.0.228", features = ["derive","rc"] }
+futures-util = "0.3.31"
+num_cpus = "1.17.0"
+httpdate = "1.0.3"
+parking_lot = "0.12.5"
+yarte = { version = "0.15.7" ,features = ["bytes-buf", "json"] }
+itoa = {version = "1.0.15" ,optional = true}
+
+
+[[bin]]
+name = "plaintext"
+path = "src/plaintext.rs"
+required-features = ["json_plaintext"]
+
+[[bin]]
+name = "json"
+path = "src/json.rs"
+required-features = ["json_plaintext"]
+
+
+[[bin]]
+name = "cache"
+path = "src/cached.rs"
+required-features = ["cache"]
+
+
+[features]
+json_plaintext = ["water_http"]
+db = ["water_http/thread_shared_struct"]
+cache = ["water_http/thread_shared_struct","itoa"]
+all = ["water_http/thread_shared_struct"]

+ 39 - 0
frameworks/Rust/water-http/README

@@ -0,0 +1,39 @@
+🌊 Water HTTP — TechEmpower Benchmarks
+
+Water HTTP is a high-performance Rust web framework built and optimized for the TechEmpower Framework Benchmarks (TFB)
+. It is designed to push the limits of speed, stability, and scalability using pure asynchronous Rust and Tokio’s runtime. Every part of the framework is hand-tuned to achieve predictable latency, minimal allocations, and efficient CPU utilization under extreme concurrency.
+
+This repository contains the official benchmark implementations for all test types, including Plaintext, JSON serialization, Single query, Multiple queries, Fortunes, Database updates, and Cached queries.
+
+⚡ Highlights
+
+🚀 One of the fastest and most stable frameworks in the TechEmpower Benchmarks
+
+🧵 Built entirely on Tokio’s asynchronous runtime — no io_uring or unsafe code
+
+💾 Zero-copy I/O, preallocated buffers, and predictable memory layout
+
+🔒 100% safe Rust with no hidden synchronization overhead
+
+🧱 Designed for consistent high performance at scale
+
+
+🧠 Architecture
+
+Water HTTP is built around a fully asynchronous, event-driven architecture that leverages non-blocking I/O and efficient request processing. The system is designed to eliminate unnecessary locks and minimize latency even under heavy load.
+
+[ tokio::net::TcpListener ]
+         ↓
+   [ connection handler ]
+         ↓
+   [ water::http::parser ]
+         ↓
+   [ application logic ]
+         ↓
+   [ response encoder ]
+This approach ensures optimal throughput and minimal per-request overhead while keeping code simple and safe.
+
+
+🧭 Mission
+
+Water HTTP’s mission is to demonstrate how Rust’s async ecosystem can reach record-breaking performance without compromising simplicity, safety, or maintainability. It shows that carefully engineered async Rust can deliver unmatched speed and reliability in real-world workloads, setting a new standard for what modern web frameworks can achieve.

+ 90 - 0
frameworks/Rust/water-http/benchmark_config.json

@@ -0,0 +1,90 @@
+{
+  "framework": "water-http",
+  "tests": [
+    {
+      "default": {
+        "json_url": "/json",
+        "plaintext_url": "/plaintext",
+        "fortune_url": "/fortunes",
+        "db_url": "/db",
+        "query_url": "/queries?q=",
+        "update_url": "/updates?q=",
+        "port": 8080,
+        "approach": "Realistic",
+        "classification": "Micro",
+        "database": "Postgres",
+        "framework": "water_http",
+        "language": "Rust",
+        "orm": "raw",
+        "platform": "Rust",
+        "webserver": "water_http",
+        "os": "Linux",
+        "database_os": "Linux",
+        "display_name": "water_http"
+      },
+
+      "db": {
+        "fortune_url": "/fortunes",
+        "db_url": "/db",
+        "query_url": "/queries?q=",
+        "update_url": "/updates?q=",
+        "port": 8080,
+        "approach": "Realistic",
+        "classification": "Micro",
+        "database": "Postgres",
+        "framework": "water_http",
+        "language": "Rust",
+        "orm": "raw",
+        "platform": "Rust",
+        "webserver": "water_http",
+        "os": "Linux",
+        "database_os": "Linux",
+        "display_name": "water_http"
+      },
+
+      "cached": {
+        "cached_query_url": "/cached-queries?q=",
+        "port": 8080,
+        "approach": "Realistic",
+        "classification": "Micro",
+        "database": "Postgres",
+        "framework": "water_http",
+        "language": "Rust",
+        "orm": "raw",
+        "platform": "Rust",
+        "webserver": "water_http",
+        "os": "Linux",
+        "database_os": "Linux",
+        "display_name": "water_http"
+      },
+      "json": {
+        "json_url": "/json",
+        "port": 8080,
+        "approach": "Realistic",
+        "classification": "Micro",
+        "framework": "water_http",
+        "language": "Rust",
+        "orm": "raw",
+        "platform": "Rust",
+        "webserver": "water_http",
+        "os": "Linux",
+        "database_os": "Linux",
+        "display_name": "water_http"
+      },
+      "plaintext": {
+        "plaintext_url": "/plaintext",
+        "port": 8080,
+        "approach": "Realistic",
+        "classification": "Micro",
+        "framework": "water_http",
+        "language": "Rust",
+        "orm": "raw",
+        "platform": "Rust",
+        "webserver": "water_http",
+        "os": "Linux",
+        "database_os": "Linux",
+        "display_name": "water_http"
+      }
+    }
+  ]
+}

+ 305 - 0
frameworks/Rust/water-http/src/cached.rs

@@ -0,0 +1,305 @@
+#![allow(static_mut_refs)]
+use std::io;
+use std::fmt::Arguments;
+use std::io::Write;
+use std::mem::MaybeUninit;
+use std::rc::Rc;
+use std::cell::UnsafeCell;
+use std::collections::HashMap;
+use nanorand::{Rng, WyRand};
+use tokio_postgres::{connect, Client, NoTls};
+use tokio_postgres::types::private::BytesMut;
+use sonic_rs::prelude::WriteExt;
+use std::pin::Pin;
+use tokio::task::LocalSet;
+use water_http::{InitControllersRoot, RunServer, WaterController};
+use water_http::http::{HttpSender, ResponseData};
+use water_http::server::{HttpContext, ServerConfigurations};
+use water_http::http::HttpSenderTrait;
+
+pub struct DbConnectionPool {
+    pub connections: Vec<Rc<PgConnection>>,
+    pub next: UnsafeCell<usize>,
+}
+
+impl DbConnectionPool {
+    /// Get a connection from the pool (round-robin, relaxed ordering)
+    #[inline(always)]
+    pub fn get_connection(&self) -> &Rc<PgConnection> {
+        let n = unsafe{&mut *self.next.get()};
+        *n +=1;
+        let idx = *n % self.connections.len();
+        unsafe { self.connections.get_unchecked(idx) }
+    }
+
+    /// Fill the pool with connections
+    pub async fn fill_pool(&mut self, url: &'static str, size: usize) {
+        let mut tasks = Vec::with_capacity(size);
+        for _ in 0..size {
+            tasks.push(tokio::task::spawn_local(async move {
+                for attempt in 0..5 {
+                    match PgConnection::connect(url).await {
+                        Ok(conn) => {
+
+                            return Ok(conn); },
+                        Err(_) if attempt < 4 => {
+                            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
+                        }
+                        Err(_) => return Err(()),
+                    }
+                }
+                Err(())
+            }));
+        }
+        for t in tasks {
+            if let Ok(Ok(conn)) = t.await {
+                self.connections.push(Rc::new(conn));
+            }
+        }
+    }
+
+
+}
+
+
+
+
+pub struct PgConnection {
+    cl:Client,
+    _connection_task: tokio::task::JoinHandle<()>,
+}
+
+// Safety: Only used within LocalSet, no cross-thread access
+impl PgConnection {
+    /// Connect to the database
+
+    pub async fn connect(db_url: &str) -> Result<PgConnection, ()> {
+        let (cl, c) = tokio::time::timeout(
+            std::time::Duration::from_secs(5),
+            connect(db_url, NoTls),
+        )
+            .await
+            .map_err(|_| ())?
+            .map_err(|_| ())?;
+
+        let connection_task = tokio::task::spawn_local(async move {
+            let _ = c.await;
+        });
+
+        Ok(PgConnection {
+            _connection_task: connection_task,
+            cl
+        })
+    }
+}
+
+/// Zero-copy writer for BytesMut
+pub struct BytesMuteWriter<'a>(pub &'a mut BytesMut);
+
+impl BytesMuteWriter<'_> {
+
+    #[inline(always)]
+    pub fn extend_from_slice(&mut self,data:&[u8]){
+        self.0.extend_from_slice(data);
+    }
+}
+
+impl Write for BytesMuteWriter<'_> {
+    #[inline(always)]
+    fn write(&mut self, src: &[u8]) -> Result<usize, io::Error> {
+        self.0.extend_from_slice(src);
+        Ok(src.len())
+    }
+
+    #[inline(always)]
+    fn flush(&mut self) -> Result<(), io::Error> {
+        Ok(())
+    }
+}
+
+impl std::fmt::Write for BytesMuteWriter<'_> {
+    #[inline(always)]
+    fn write_str(&mut self, s: &str) -> std::fmt::Result {
+        self.0.extend_from_slice(s.as_bytes());
+        Ok(())
+    }
+
+    #[inline(always)]
+    fn write_char(&mut self, c: char) -> std::fmt::Result {
+        let mut buf = [0u8; 4];
+        self.0.extend_from_slice(c.encode_utf8(&mut buf).as_bytes());
+        Ok(())
+    }
+
+    #[inline(always)]
+    fn write_fmt(&mut self, args: Arguments<'_>) -> std::fmt::Result {
+        std::fmt::write(self, args)
+    }
+}
+
+impl WriteExt for BytesMuteWriter<'_> {
+    #[inline(always)]
+    fn reserve_with(&mut self, additional: usize) -> Result<&mut [MaybeUninit<u8>], io::Error> {
+        self.0.reserve(additional);
+        unsafe {
+            let ptr = self.0.as_mut_ptr().add(self.0.len()) as *mut MaybeUninit<u8>;
+            Ok(std::slice::from_raw_parts_mut(ptr, additional))
+        }
+    }
+
+    #[inline(always)]
+    unsafe fn flush_len(&mut self, additional: usize) -> io::Result<()> {
+        self.0.set_len(self.0.len() + additional);
+        Ok(())
+    }
+}
+
+
+InitControllersRoot! {
+    name:ROOT,
+    holder_type:MainType,
+    shared_type:SH,
+}
+
+pub struct ThreadSharedStruct{
+    writing_buffer:UnsafeCell<BytesMut>,
+    rng:WyRand,
+}
+
+
+impl ThreadSharedStruct {
+
+    #[inline(always)]
+    pub fn get_value(id:i32)->&'static i32{
+        let map = unsafe {CACHED_VALUES.as_ref().unwrap().get(&id)} ;
+        map.unwrap()
+    }
+    pub fn get_cached_queries(&self,num:usize)->&[u8]{
+        let buf = unsafe{&mut *(self.writing_buffer.get())};
+        buf.clear();
+        buf.extend_from_slice(br#"["#);
+        let mut writer = BytesMuteWriter(buf);
+        let mut rn = self.rng.clone();
+        for _ in 0..num {
+            let rd: i32 = (rn.generate::<u32>() & 0x3FFF) as i32 % 10_000 + 1;
+            let v = Self::get_value(rd);
+            writer.extend_from_slice(br"{");
+            _ = write!(writer, r#""id":{},"randomnumber":{}"#, rd, v);
+            writer.extend_from_slice(br"},");
+        }
+        if buf.len() >1  {buf.truncate(buf.len() - 1);}
+        buf.extend_from_slice(b"]");
+        return &buf[..]
+    }
+}
+
+pub type MainType = u8;
+pub type SH = Rc<ThreadSharedStruct>;
+
+
+static mut CACHED_VALUES:Option<HashMap<i32,i32>> = None;
+
+pub  fn run_server(){
+
+    _= std::thread::spawn(
+        ||{
+            let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
+            rt.block_on(async move {
+                const URL:&'static str = "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
+                // const URL:&'static str = "postgres://postgres:root@localhost:5432/techmpower";
+
+                let  mut pool = DbConnectionPool{
+                    connections:Vec::with_capacity( 1
+                    ),
+                    next:0.into(),
+                    // rt:tokio::runtime::Builder::new_multi_thread().enable_all().worker_threads(cpu_nums).build().unwrap()
+                };
+
+                let local_set = LocalSet::new();
+
+                _= local_set.run_until(async move {
+                    tokio::task::spawn_local(async move {
+                        pool.fill_pool(URL, 1).await;
+                        let connection = pool.get_connection();
+                        let statement = connection.cl.prepare("SELECT id,randomnumber FROM World").await.unwrap();
+                        let res = connection.cl.query(&statement,&[]).await.unwrap();
+                        let mut map = HashMap::new();
+                        for row in res {
+                            map.insert(row.get(0),row.get(1));
+                        }
+                        unsafe {
+                            let static_map = &mut CACHED_VALUES;
+                            *static_map = Some(map);
+                        }
+                    }).await
+                }).await;
+
+            });
+        }
+    ).join();
+    let cpu_nums = num_cpus::get();
+
+
+    println!("start listening on port 8080 while workers count {cpu_nums}");
+    let mut conf = ServerConfigurations::bind("0.0.0.0",8080);
+    conf.worker_threads_count = cpu_nums * 1 ;
+
+    // let addresses =  (0..cpu_nums).map(|_| {
+    //      ("0.0.0.0".to_string(),8080)
+    //  }).collect::<Vec<(String,u16)>>();
+    //  conf.addresses = addresses;
+    RunServer!(
+        conf,
+        ROOT,
+        EntryController,
+        shared_factory
+    );
+}
+
+fn shared_factory()->Pin<Box<dyn futures_util::Future<Output=SH>>>{
+    Box::pin(async {
+
+        // const URL:&'static str = "postgres://postgres:root@localhost:5432/techmpower";
+
+        Rc::new(ThreadSharedStruct{
+            writing_buffer:UnsafeCell::new(BytesMut::with_capacity(100_000)),
+            rng:WyRand::new()
+        })
+    })
+}
+
+pub async fn handle_cached_queries<const HS:usize,const QS:usize>(context:&mut HttpContext<'_,MainType,SH,HS,QS>){
+    let q = context
+        .get_from_path_query("q")
+        .and_then(|v| v.parse::<usize>().ok()) // safely parse
+        .unwrap_or(1)                          // default to 1 if missing or invalid
+        .clamp(1, 500);
+
+    let   connection:SH = context.thread_shared_struct.clone().unwrap().clone();
+    let data = connection.get_cached_queries(q);
+    let mut sender:HttpSender<HS,QS> = context.sender();
+    sender.set_header_ef("Content-Type","application/json");
+    sender.set_header_ef("Server","water");
+    let date = httpdate::fmt_http_date(std::time::SystemTime::now());
+    sender.set_header_ef("Date",date);
+    _= sender.send_data_as_final_response(
+        ResponseData::Slice(data)
+    ).await;
+}
+
+WaterController! {
+    holder -> super::MainType,
+    shared -> super::SH,
+    name -> EntryController,
+    functions -> {
+
+        GET -> "cached-queries" -> query (context) {
+            _=super::handle_cached_queries(context).await;
+        }
+    }
+}
+
+fn main() {
+    run_server();
+}
+

+ 320 - 0
frameworks/Rust/water-http/src/db.rs

@@ -0,0 +1,320 @@
+#![cfg(any(feature = "db",feature = "all"))]
+use std::{borrow::Cow, io};
+use std::fmt::Arguments;
+use std::io::Write;
+use std::mem::MaybeUninit;
+use std::rc::Rc;
+use std::cell::UnsafeCell;
+use bytes::Buf;
+use nanorand::{Rng, WyRand};
+use tokio_postgres::{connect, Client, Statement, NoTls};
+use tokio_postgres::types::private::BytesMut;
+use crate::models::{Fortune, FortuneTemplate, World};
+use sonic_rs::prelude::WriteExt;
+use yarte::TemplateBytesTrait;
+
+/// Database connection pool with thread-local RNG
+pub struct DbConnectionPool {
+    pub connections: Vec<Rc<PgConnection>>,
+    pub next: UnsafeCell<usize>,
+}
+
+impl DbConnectionPool {
+    /// Get a connection from the pool (round-robin, relaxed ordering)
+    #[inline(always)]
+    pub fn get_connection(&self) -> &Rc<PgConnection> {
+        let n = unsafe{&mut *self.next.get()};
+        *n +=1;
+        let idx = *n % self.connections.len();
+        unsafe { self.connections.get_unchecked(idx) }
+    }
+
+    /// Fill the pool with connections
+    pub async fn fill_pool(&mut self, url: &'static str, size: usize) {
+        let mut tasks = Vec::with_capacity(size);
+        for _ in 0..size {
+            tasks.push(tokio::task::spawn_local(async move {
+                for attempt in 0..5 {
+                    match PgConnection::connect(url).await {
+                        Ok(conn) => {
+
+                            return Ok(conn); },
+                        Err(_) if attempt < 4 => {
+                            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
+                        }
+                        Err(_) => return Err(()),
+                    }
+                }
+                Err(())
+            }));
+        }
+        for t in tasks {
+            if let Ok(Ok(conn)) = t.await {
+                self.connections.push(Rc::new(conn));
+            }
+        }
+    }
+}
+
+
+/// Reusable buffer pool per connection
+struct BufferPool {
+    body: BytesMut,
+    worlds: Vec<World>,
+    numbers: Vec<i32>,
+    fortunes: Vec<Fortune>,
+    fortune_output: Vec<u8>,
+}
+
+impl BufferPool {
+    fn new() -> Self {
+        Self {
+            body: BytesMut::with_capacity(4096),
+            worlds: Vec::with_capacity(501),
+            numbers: Vec::with_capacity(501),
+
+            fortunes: Vec::with_capacity(501),
+            fortune_output: Vec::with_capacity(4096),
+        }
+    }
+
+}
+
+/// PostgreSQL connection wrapper with pre-allocated buffers
+pub struct PgConnection {
+    pub cl: Client,
+    pub fortune: Statement,
+    pub world: Statement,
+    pub updates: Vec<Statement>,
+    rang:WyRand,
+    buffers: UnsafeCell<BufferPool>,
+    _connection_task: tokio::task::JoinHandle<()>,
+}
+
+// Safety: Only used within LocalSet, no cross-thread access
+impl PgConnection {
+    /// Connect to the database
+
+    pub async fn connect(db_url: &str) -> Result<PgConnection, ()> {
+        let (cl, conn) = tokio::time::timeout(
+            std::time::Duration::from_secs(5),
+            connect(db_url, NoTls),
+        )
+            .await
+            .map_err(|_| ())?
+            .map_err(|_| ())?;
+
+        let connection_task = tokio::task::spawn_local(async move {
+            let _ = conn.await;
+        });
+
+        let fortune = cl.prepare("SELECT * FROM fortune").await.map_err(|_| ())?;
+        let world = cl.prepare("SELECT id,randomnumber FROM world WHERE id=$1 LIMIT 1").await.map_err(|_| ())?;
+
+        // Pre-compile update statements for batch sizes 1-500
+        let mut updates = vec![];
+        for num  in 1..=500 {
+            let sql = Self::generate_update_values_stmt(num);
+            updates.push(cl.prepare(&sql).await.unwrap());
+        }
+
+        Ok(PgConnection {
+            cl,
+            fortune,
+            world,
+            updates,
+            buffers: UnsafeCell::new(BufferPool::new()),
+            _connection_task: connection_task,
+            rang: WyRand::new()
+        })
+    }    /// Connect to the database
+
+    #[inline(always)]
+  pub fn generate_update_values_stmt(batch_size: usize) -> String {
+
+        let mut sql = String::from("UPDATE world SET randomNumber = w.r FROM (VALUES ");
+
+        for i in 0..batch_size {
+            let id_param = i * 2 + 1;
+            let val_param = id_param + 1;
+            sql.push_str(&format!("(${}::int, ${}::int),", id_param, val_param));
+        }
+
+        // Remove the trailing comma
+        sql.pop();
+
+        sql.push_str(") AS w(i, r) WHERE world.id = w.i");
+        sql
+    }
+
+    /// Get mutable access to buffers (safe because connection pool ensures single access)
+    #[inline(always)]
+    fn buffers(&self) -> &mut BufferPool {
+        unsafe { &mut *self.buffers.get() }
+    }
+
+    /// Get a single random world - optimized with buffer reuse
+    #[inline]
+    pub async fn get_world(&self) -> &[u8] {
+        let rd = (self.rang.clone().generate::<u32>() % 10_000 + 1) as i32;
+        let row = self.cl.query_one(&self.world, &[&rd]).await.unwrap();
+
+        let buffers = self.buffers();
+        buffers.body.clear();
+
+        sonic_rs::to_writer(
+            BytesMuteWriter(&mut buffers.body),
+            &World {
+                id: row.get(0),
+                randomnumber: row.get(1),
+            },
+        ).unwrap();
+
+        buffers.body.chunk()
+    }
+
+    /// Get multiple random worlds - optimized with buffer reuse
+    pub async fn get_worlds(&self, num: usize) -> &[u8] {
+        let buffers = self.buffers();
+        buffers.worlds.clear();
+        let mut rn = self.rang.clone();
+        for _ in 0..num {
+            let id: i32 = (rn.generate::<u32>() & 0x3FFF) as i32 % 10_000 + 1;
+            let row = self.cl.query_one(&self.world, &[&id]).await.unwrap();
+            buffers.worlds.push(World {
+                id: row.get(0),
+                randomnumber: row.get(1),
+            });
+        }
+        buffers.body.clear();
+        sonic_rs::to_writer(BytesMuteWriter(&mut buffers.body), &buffers.worlds).unwrap();
+        buffers.body.chunk()
+    }
+    /// Update worlds in batch - optimized with buffer reuse
+    /// Update worlds in batch - optimized with buffer reuse
+
+    /// Update worlds in batch - optimized with buffer reuse
+
+    /// Update worlds in batch - optimized with buffer reuse
+
+    /// Update worlds in batch - optimized with RETURNING clause to minimize reads
+    /// Update worlds - fetch and update each row to handle duplicates correctly
+    /// Update worlds in batch using CASE statement
+    pub async fn update(&self, num: usize) -> &[u8] {
+
+        let buffers = self.buffers();
+        let mut ids:Vec<i32> = Vec::with_capacity(num);
+        let mut rng = self.rang.clone();
+        let mut params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
+            Vec::with_capacity(num * 2);
+        let mut futures =vec![];
+        for _ in 0..num {
+            let w_id = (rng.generate::<u32>() % 10_000 + 1) as i32;
+            ids.push(w_id);
+        }
+        futures.extend(ids.iter().map(|x| async move {self.cl.query_one(&self.world,&[&x]).await}));
+        futures_util::future::join_all(futures).await;
+        ids.sort_unstable();
+        buffers.worlds.clear();
+        for index in 0..num {
+            let s_id = (rng.generate::<u32>() % 10_000 + 1 ) as i32;
+            buffers.worlds.push(World{
+                id:ids[index],
+                randomnumber:s_id
+            });
+            buffers.numbers.push(s_id);
+        }
+        buffers.body.clear();
+        for index in 0..num {
+            params.push(&ids[index]);
+            params.push(&buffers.numbers[index]);
+        }
+
+        _=self.cl.execute(&self.updates[num - 1], &params).await.unwrap();
+        sonic_rs::to_writer(BytesMuteWriter(&mut buffers.body), &buffers.worlds).unwrap();
+        buffers.body.chunk()
+    }
+
+
+    /// Tell fortunes - optimized with buffer reuse
+    pub async fn tell_fortune(&self) -> Result<&[u8], ()> {
+        let res = self.cl.query(&self.fortune, &[]).await.map_err(|_| ())?;
+
+        let buffers = self.buffers();
+        buffers.fortunes.clear();
+        buffers.fortune_output.clear();
+
+        buffers.fortunes.push(Fortune {
+            id: 0,
+            message: Cow::Borrowed("Additional fortune added at request time."),
+        });
+
+        for row in res {
+            buffers.fortunes.push(Fortune {
+                id: row.get(0),
+                message: Cow::Owned(row.get(1)),
+            });
+        }
+
+        buffers.fortunes.sort_unstable_by(|a, b| a.message.cmp(&b.message));
+
+        let template = FortuneTemplate { items: &buffers.fortunes };
+        template.write_call(&mut buffers.fortune_output);
+
+        // Return reference to buffer - zero-copy!
+        Ok(&buffers.fortune_output)
+    }
+}
+
+/// Zero-copy writer for BytesMut
+pub struct BytesMuteWriter<'a>(pub &'a mut BytesMut);
+
+impl Write for BytesMuteWriter<'_> {
+    #[inline(always)]
+    fn write(&mut self, src: &[u8]) -> Result<usize, io::Error> {
+        self.0.extend_from_slice(src);
+        Ok(src.len())
+    }
+
+    #[inline(always)]
+    fn flush(&mut self) -> Result<(), io::Error> {
+        Ok(())
+    }
+}
+
+impl std::fmt::Write for BytesMuteWriter<'_> {
+    #[inline(always)]
+    fn write_str(&mut self, s: &str) -> std::fmt::Result {
+        self.0.extend_from_slice(s.as_bytes());
+        Ok(())
+    }
+
+    #[inline(always)]
+    fn write_char(&mut self, c: char) -> std::fmt::Result {
+        let mut buf = [0u8; 4];
+        self.0.extend_from_slice(c.encode_utf8(&mut buf).as_bytes());
+        Ok(())
+    }
+
+    #[inline(always)]
+    fn write_fmt(&mut self, args: Arguments<'_>) -> std::fmt::Result {
+        std::fmt::write(self, args)
+    }
+}
+
+impl WriteExt for BytesMuteWriter<'_> {
+    #[inline(always)]
+    fn reserve_with(&mut self, additional: usize) -> Result<&mut [MaybeUninit<u8>], io::Error> {
+        self.0.reserve(additional);
+        unsafe {
+            let ptr = self.0.as_mut_ptr().add(self.0.len()) as *mut MaybeUninit<u8>;
+            Ok(std::slice::from_raw_parts_mut(ptr, additional))
+        }
+    }
+
+    #[inline(always)]
+    unsafe fn flush_len(&mut self, additional: usize) -> io::Result<()> {
+        self.0.set_len(self.0.len() + additional);
+        Ok(())
+    }
+}

+ 55 - 0
frameworks/Rust/water-http/src/json.rs

@@ -0,0 +1,55 @@
+use water_http::{InitControllersRoot, RunServer, WaterController};
+use water_http::server::ServerConfigurations;
+
+InitControllersRoot! {
+    name:ROOT,
+    holder_type:MainType,
+}
+
+
+
+pub type MainType = u8;
+
+
+fn main() {
+    run_server();
+}
+
+
+pub    fn run_server(){
+
+    let cpu_nums = num_cpus::get();
+
+
+    println!("start listening on port 8080 while workers count {cpu_nums}");
+    let mut conf = ServerConfigurations::bind("0.0.0.0",8080);
+    conf.worker_threads_count = cpu_nums * 2 ;
+
+    RunServer!(
+        conf,
+        ROOT,
+        EntryController
+    );
+}
+
+const JSON_RESPONSE:&'static [u8] = br#"{"message":"Hello, World!"}"#;
+
+
+WaterController! {
+    holder -> super::MainType,
+    name -> EntryController,
+    functions -> {
+
+
+         GET => json => j(cx){
+            let mut sender = cx.sender();
+            sender.set_header_ef("Content-Type","application/json");
+            sender.set_header_ef("Server","water");
+            let date = httpdate::fmt_http_date(std::time::SystemTime::now());
+            sender.set_header_ef("Date",date);
+            _=sender.send_data_as_final_response(http::ResponseData::Slice(super::JSON_RESPONSE)).await;
+        }
+
+    }
+}
+

+ 7 - 0
frameworks/Rust/water-http/src/main.rs

@@ -0,0 +1,7 @@
+mod server;
+pub mod models;
+mod db;
+
+fn main() {
+      server::run_server();
+}

+ 25 - 0
frameworks/Rust/water-http/src/models.rs

@@ -0,0 +1,25 @@
+use std::borrow::Cow;
+use sonic_rs::{Serialize};
+// use askama::Template;
+#[derive(Serialize)]
+pub struct World {
+    pub id: i32,
+    pub randomnumber: i32,
+}
+#[derive(Serialize,Debug)]
+pub struct Fortune {
+    pub id: i32,
+    pub message: Cow<'static, str>,
+}
+
+#[derive(yarte::TemplateBytes)]
+#[template(path = "fortune.hbs")]
+pub struct FortuneTemplate<'a>{
+    pub items:&'a Vec<Fortune>
+}
+
+
+
+// pub async fn to(model:FortuneTemplate<'_>){
+//     model.r
+// }

+ 54 - 0
frameworks/Rust/water-http/src/plaintext.rs

@@ -0,0 +1,54 @@
+use water_http::{InitControllersRoot, RunServer, WaterController};
+use water_http::server::ServerConfigurations;
+
+InitControllersRoot! {
+    name:ROOT,
+    holder_type:MainType,
+}
+
+
+
+pub type MainType = u8;
+
+
+fn main() {
+    run_server();
+}
+
+
+pub    fn run_server(){
+
+    let cpu_nums = num_cpus::get();
+
+
+    println!("start listening on port 8080 while workers count {cpu_nums}");
+    let mut conf = ServerConfigurations::bind("0.0.0.0",8080);
+    conf.worker_threads_count = cpu_nums * 1 ;
+
+    RunServer!(
+        conf,
+        ROOT,
+        EntryController
+    );
+}
+
+const JSON_RESPONSE:&'static [u8] = br#"{"message":"Hello, World!"}"#;
+const P:&'static [u8] = br#"Hello, World!"#;
+
+
+WaterController! {
+    holder -> super::MainType,
+    name -> EntryController,
+    functions -> {
+
+         GET => plaintext => p(cx) {
+            let mut sender = cx.sender();
+            sender.set_header_ef("Content-Type","text/plain; charset=utf-8");
+            sender.set_header_ef("Server","water");
+            let date = httpdate::fmt_http_date(std::time::SystemTime::now());
+            sender.set_header_ef("Date",date);
+           _=sender.send_data_as_final_response(http::ResponseData::Str("Hello, World!")).await;
+        }
+    }
+}
+

+ 290 - 0
frameworks/Rust/water-http/src/server.rs

@@ -0,0 +1,290 @@
+
+use std::pin::Pin;
+use std::rc::Rc;
+use water_http::{InitControllersRoot, RunServer, WaterController};
+use water_http::server::ServerConfigurations;
+use crate::db::{DbConnectionPool};
+InitControllersRoot! {
+    name:ROOT,
+    holder_type:MainType,
+    shared_type:SharedType,
+}
+
+pub struct ThreadSharedStruct{
+    pg_connection: DbConnectionPool
+}
+
+pub type MainType = u8;
+pub type SharedType = Rc<ThreadSharedStruct>;
+
+
+
+
+pub    fn run_server(){
+
+    let cpu_nums = num_cpus::get();
+
+
+    println!("start listening on port 8080 while workers count {cpu_nums}");
+    let mut conf = ServerConfigurations::bind("0.0.0.0",8080);
+    #[cfg(feature = "json_plaintext")]
+    {
+        conf.worker_threads_count = cpu_nums * 2 ;
+    }
+    #[cfg(not(feature = "json_plaintext"))]
+    {
+        conf.worker_threads_count = cpu_nums * 1 ;
+    }
+
+   // let addresses =  (0..cpu_nums).map(|_| {
+   //      ("0.0.0.0".to_string(),8080)
+   //  }).collect::<Vec<(String,u16)>>();
+   //  conf.addresses = addresses;
+    RunServer!(
+        conf,
+        ROOT,
+        EntryController,
+       shared_factory
+    );
+}
+
+fn shared_factory()->Pin<Box<dyn futures_util::Future<Output=SharedType>>>{
+    Box::pin(async {
+
+        // const URL:&'static str = "postgres://postgres:root@localhost:5432/techmpower";
+        const URL:&'static str = "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
+        let  mut pool = DbConnectionPool{
+            connections:Vec::with_capacity( 1
+            ),
+            next:0.into(),
+            // rt:tokio::runtime::Builder::new_multi_thread().enable_all().worker_threads(cpu_nums).build().unwrap()
+        };
+        pool.fill_pool(URL, 1).await;
+        Rc::new(ThreadSharedStruct{
+            pg_connection:pool
+        })
+    })
+}
+
+#[cfg(any(feature = "json_plaintext",feature = "all"))]
+const JSON_RESPONSE:&'static [u8] = br#"{"message":"Hello, World!"}"#;
+#[cfg(any(feature = "json_plaintext",feature = "all"))]
+const P:&'static [u8] = br#"Hello, World!"#;
+
+#[cfg(feature = "all")]
+WaterController! {
+    holder -> super::MainType,
+    shared -> super::SharedType,
+    name -> EntryController,
+    functions -> {
+
+
+         GET => json => j(cx){
+            let mut sender = cx.sender();
+            sender.set_header_ef("Content-Type","application/json");
+            sender.set_header_ef("Server","water");
+            let date = httpdate::fmt_http_date(std::time::SystemTime::now());
+            sender.set_header_ef("Date",date);
+            _=sender.send_data_as_final_response(http::ResponseData::Slice(super::JSON_RESPONSE)).await;
+        }
+
+         GET => plaintext => p(cx){
+            let mut sender = cx.sender();
+            sender.set_header_ef("Content-Type","text/plain; charset=utf-8");
+            sender.set_header_ef("Server","water");
+            let date = httpdate::fmt_http_date(std::time::SystemTime::now());
+            sender.set_header_ef("Date",date);
+           _=sender.send_data_as_final_response(http::ResponseData::Slice(super::P)).await;
+        }
+
+
+        GET -> db -> db (context){
+            let   connection:Shared = context.thread_shared_struct.clone().unwrap().clone();
+            let connection = connection.pg_connection.get_connection();
+            let data =  connection.get_world().await;
+            let mut sender = context.sender();
+            sender.set_header_ef("Content-Type","application/json");
+            sender.set_header_ef("Server","water");
+            let date = httpdate::fmt_http_date(std::time::SystemTime::now());
+            sender.set_header_ef("Date",date);
+            _= sender.send_data_as_final_response(
+                http::ResponseData::Slice(data)
+            ).await;
+        }
+          GET -> queries -> query (context){
+         let q = context
+             .get_from_path_query("q")
+             .and_then(|v| v.parse::<usize>().ok()) // safely parse
+             .unwrap_or(1)                          // default to 1 if missing or invalid
+             .clamp(1, 500);
+
+            let   connection:Shared = context.thread_shared_struct.clone().unwrap().clone();
+            let connection = connection.pg_connection.get_connection();
+            let data =  connection.get_worlds(q).await;
+            let mut sender = context.sender();
+            sender.set_header_ef("Content-Type","application/json");
+            sender.set_header_ef("Server","water");
+            let date = httpdate::fmt_http_date(std::time::SystemTime::now());
+            sender.set_header_ef("Date",date);
+            _= sender.send_data_as_final_response(
+                http::ResponseData::Slice(data)
+            ).await;
+        }
+
+           GET -> updates -> update (context){
+            let q = context
+             .get_from_path_query("q")
+             .and_then(|v| v.parse::<usize>().ok()) // safely parse
+             .unwrap_or(1)                          // default to 1 if missing or invalid
+             .clamp(1, 500);
+            let   connection:Shared = context.thread_shared_struct.clone().unwrap().clone();
+            let connection = connection.pg_connection.get_connection();
+            let data =  connection.update(q).await;
+            let mut sender = context.sender();
+            sender.set_header_ef("Content-Type","application/json");
+            sender.set_header_ef("Server","water");
+            let date = httpdate::fmt_http_date(std::time::SystemTime::now());
+            sender.set_header_ef("Date",date);
+            _= sender.send_data_as_final_response(
+                http::ResponseData::Slice(data)
+            ).await;
+        }
+
+
+        GET -> fortunes -> ft (context){
+
+            let   connection:Shared = context.thread_shared_struct.clone().unwrap().clone();
+            let connection = connection.pg_connection.get_connection();
+            let data = match connection.tell_fortune().await {
+                Ok(r)=>{r},
+                _=>{
+                    _= context.send_str("failed to connect").await;
+                    return
+                }
+            };
+            let mut sender = context.sender();
+            sender.set_header_ef("Content-Type","text/html; charset=UTF-8");
+              sender.set_header_ef("Server","water");
+            let date = httpdate::fmt_http_date(std::time::SystemTime::now());
+            sender.set_header_ef("Date",date);
+            _= sender.send_data_as_final_response(
+                http::ResponseData::Slice(&data)
+            ).await;
+        }
+
+    }
+}
+
+
+
+#[cfg(all(not(feature = "all"),feature = "json_plaintext"))]
+WaterController! {
+    holder -> super::MainType,
+    shared -> super::SharedType,
+    name -> EntryController,
+    functions -> {
+
+
+         GET => json => j(cx){
+            let mut sender = cx.sender();
+            sender.set_header_ef("Content-Type","application/json");
+            sender.set_header_ef("Server","water");
+            let date = httpdate::fmt_http_date(std::time::SystemTime::now());
+            sender.set_header_ef("Date",date);
+            _=sender.send_data_as_final_response(http::ResponseData::Slice(super::JSON_RESPONSE)).await;
+        }
+
+         GET => plaintext => p(cx) {
+            let mut sender = cx.sender();
+            sender.set_header_ef("Content-Type","text/plain; charset=utf-8");
+            sender.set_header_ef("Server","water");
+            let date = httpdate::fmt_http_date(std::time::SystemTime::now());
+            sender.set_header_ef("Date",date);
+           _=sender.send_data_as_final_response(http::ResponseData::Slice(super::P)).await;
+        }
+    }
+}
+
+#[cfg(all(not(feature = "all"),feature = "db"))]
+WaterController! {
+    holder -> super::MainType,
+    shared -> super::SharedType,
+    name -> EntryController,
+    functions -> {
+
+
+        GET -> db -> db (context){
+            let   connection:Shared = context.thread_shared_struct.clone().unwrap().clone();
+            let connection = connection.pg_connection.get_connection();
+            let data =  connection.get_world().await;
+            let mut sender = context.sender();
+            sender.set_header_ef("Content-Type","application/json");
+            sender.set_header_ef("Server","water");
+            let date = httpdate::fmt_http_date(std::time::SystemTime::now());
+            sender.set_header_ef("Date",date);
+            _= sender.send_data_as_final_response(
+                http::ResponseData::Slice(data)
+            ).await;
+        }
+          GET -> queries -> query (context){
+         let q = context
+             .get_from_path_query("q")
+             .and_then(|v| v.parse::<usize>().ok()) // safely parse
+             .unwrap_or(1)                          // default to 1 if missing or invalid
+             .clamp(1, 500);
+
+            let   connection:Shared = context.thread_shared_struct.clone().unwrap().clone();
+            let connection = connection.pg_connection.get_connection();
+            let data =  connection.get_worlds(q).await;
+            let mut sender = context.sender();
+            sender.set_header_ef("Content-Type","application/json");
+            sender.set_header_ef("Server","water");
+            let date = httpdate::fmt_http_date(std::time::SystemTime::now());
+            sender.set_header_ef("Date",date);
+            _= sender.send_data_as_final_response(
+                http::ResponseData::Slice(data)
+            ).await;
+        }
+
+           GET -> updates -> update (context){
+            let q = context
+             .get_from_path_query("q")
+             .and_then(|v| v.parse::<usize>().ok()) // safely parse
+             .unwrap_or(1)                          // default to 1 if missing or invalid
+             .clamp(1, 500);
+            let   connection:Shared = context.thread_shared_struct.clone().unwrap().clone();
+            let connection = connection.pg_connection.get_connection();
+            let data =  connection.update(q).await;
+            let mut sender = context.sender();
+            sender.set_header_ef("Content-Type","application/json");
+            sender.set_header_ef("Server","water");
+            let date = httpdate::fmt_http_date(std::time::SystemTime::now());
+            sender.set_header_ef("Date",date);
+            _= sender.send_data_as_final_response(
+                http::ResponseData::Slice(data)
+            ).await;
+        }
+
+
+        GET -> fortunes -> ft (context){
+
+            let   connection:Shared = context.thread_shared_struct.clone().unwrap().clone();
+            let connection = connection.pg_connection.get_connection();
+            let data = match connection.tell_fortune().await {
+                Ok(r)=>{r},
+                _=>{
+                    _= context.send_str("failed to connect").await;
+                    return
+                }
+            };
+            let mut sender = context.sender();
+            sender.set_header_ef("Content-Type","text/html; charset=UTF-8");
+              sender.set_header_ef("Server","water");
+            let date = httpdate::fmt_http_date(std::time::SystemTime::now());
+            sender.set_header_ef("Date",date);
+            _= sender.send_data_as_final_response(
+                http::ResponseData::Slice(&data)
+            ).await;
+        }
+    }
+}

+ 5 - 0
frameworks/Rust/water-http/templates/fortune.hbs

@@ -0,0 +1,5 @@
+<!DOCTYPE html><html><head><title>Fortunes</title></head><body><table><tr><th>id</th><th>message</th></tr>
+{{~# each items ~}}
+<tr><td>{{id}}</td><td>{{message}}</td></tr>
+{{~/each ~}}
+</table></body></html>

+ 12 - 0
frameworks/Rust/water-http/templates/fortune.html

@@ -0,0 +1,12 @@
+<!DOCTYPE html>
+<html>
+<head><title>Fortunes</title></head>
+<body>
+<table>
+    <tr><th>id</th><th>message</th></tr>
+    {% for item in items %}
+    <tr><td>{{item.id}}</td><td>{{item.message}}</td></tr>
+    {% endfor %}
+</table>
+</body>
+</html>

+ 13 - 0
frameworks/Rust/water-http/water-http-cached.dockerfile

@@ -0,0 +1,13 @@
+FROM rust:latest
+
+RUN apt-get update -yqq && apt-get install -yqq cmake g++
+
+ADD ./ /water
+WORKDIR /water
+
+RUN cargo clean
+RUN RUSTFLAGS="-C target-cpu=native" cargo build --release --bin cache --features cache
+
+EXPOSE 8080
+
+CMD ./target/release/cache

+ 13 - 0
frameworks/Rust/water-http/water-http-db.dockerfile

@@ -0,0 +1,13 @@
+FROM rust:latest
+
+RUN apt-get update -yqq && apt-get install -yqq cmake g++
+
+ADD ./ /water
+WORKDIR /water
+
+RUN cargo clean
+RUN RUSTFLAGS="-C target-cpu=native" cargo build --release --bin water-http --features db
+
+EXPOSE 8080
+
+CMD ./target/release/water-http

+ 13 - 0
frameworks/Rust/water-http/water-http-json.dockerfile

@@ -0,0 +1,13 @@
+FROM rust:latest
+
+RUN apt-get update -yqq && apt-get install -yqq cmake g++
+
+ADD ./ /water
+WORKDIR /water
+
+RUN cargo clean
+RUN RUSTFLAGS="-C target-cpu=native" cargo build --release --bin json --features json_plaintext
+
+EXPOSE 8080
+
+CMD ./target/release/json

+ 13 - 0
frameworks/Rust/water-http/water-http-plaintext.dockerfile

@@ -0,0 +1,13 @@
+FROM rust:latest
+
+RUN apt-get update -yqq && apt-get install -yqq cmake g++
+
+ADD ./ /water
+WORKDIR /water
+
+RUN cargo clean
+RUN RUSTFLAGS="-C target-cpu=native" cargo build --release --bin plaintext --features json_plaintext
+
+EXPOSE 8080
+
+CMD ./target/release/plaintext

+ 13 - 0
frameworks/Rust/water-http/water-http.dockerfile

@@ -0,0 +1,13 @@
+FROM rust:latest
+
+RUN apt-get update -yqq && apt-get install -yqq cmake g++
+
+ADD ./ /water
+WORKDIR /water
+
+RUN cargo clean
+RUN RUSTFLAGS="-C target-cpu=native" cargo build --release --bin water-http --features all
+
+EXPOSE 8080
+
+CMD ./target/release/water-http