Browse Source

ntex: upgrade to 0.2 and cleanups (#6398)

Nikolay Kim 4 years ago
parent
commit
75e7efcbbf

+ 1 - 1
frameworks/Rust/ntex/Cargo.toml

@@ -16,7 +16,7 @@ name = "ntex-raw"
 path = "src/main_raw.rs"
 path = "src/main_raw.rs"
 
 
 [dependencies]
 [dependencies]
-ntex = "0.2.0-b.12"
+ntex = "0.2.1"
 snmalloc-rs = { version = "0.2.24", features = ["1mib", "native-cpu"] }
 snmalloc-rs = { version = "0.2.24", features = ["1mib", "native-cpu"] }
 yarte = { version = "0.14", features = ["bytes-buf", "json"] }
 yarte = { version = "0.14", features = ["bytes-buf", "json"] }
 env_logger = "0.8"
 env_logger = "0.8"

+ 1 - 1
frameworks/Rust/ntex/rustfmt.toml

@@ -1,2 +1,2 @@
-max_width = 89
+max_width = 98
 reorder_imports = true
 reorder_imports = true

+ 29 - 52
frameworks/Rust/ntex/src/db.rs

@@ -1,9 +1,7 @@
-use std::{borrow::Cow, cell::RefCell, fmt::Write as FmtWrite, io};
+use std::{borrow::Cow, cell::RefCell, fmt::Write as FmtWrite};
 
 
 use bytes::{Bytes, BytesMut};
 use bytes::{Bytes, BytesMut};
-use futures::stream::futures_unordered::FuturesUnordered;
-use futures::{Future, FutureExt, StreamExt, TryStreamExt};
-use ntex::web::Error;
+use futures::{stream::futures_unordered::FuturesUnordered, Future, FutureExt, StreamExt};
 use random_fast_rng::{FastRng, Random};
 use random_fast_rng::{FastRng, Random};
 use smallvec::SmallVec;
 use smallvec::SmallVec;
 use tokio_postgres::types::ToSql;
 use tokio_postgres::types::ToSql;
@@ -70,27 +68,19 @@ impl PgConnection {
 }
 }
 
 
 impl PgConnection {
 impl PgConnection {
-    pub fn get_world(&self) -> impl Future<Output = Result<Bytes, Error>> {
+    pub fn get_world(&self) -> impl Future<Output = Bytes> {
         let random_id = (self.rng.borrow_mut().get_u32() % 10_000 + 1) as i32;
         let random_id = (self.rng.borrow_mut().get_u32() % 10_000 + 1) as i32;
-        let fut = self.cl.query_one(&self.world, &[&random_id]);
-
-        async move {
-            let row = fut.await.map_err(|e| {
-                Error::from(io::Error::new(io::ErrorKind::Other, format!("{:?}", e)))
-            })?;
-
-            Ok(World {
+        self.cl.query_one(&self.world, &[&random_id]).map(|row| {
+            let row = row.unwrap();
+            World {
                 id: row.get(0),
                 id: row.get(0),
                 randomnumber: row.get(1),
                 randomnumber: row.get(1),
             }
             }
-            .to_bytes::<BytesMut>(40))
-        }
+            .to_bytes::<BytesMut>(40)
+        })
     }
     }
 
 
-    pub fn get_worlds(
-        &self,
-        num: usize,
-    ) -> impl Future<Output = Result<Vec<World>, io::Error>> {
+    pub fn get_worlds(&self, num: u16) -> impl Future<Output = Vec<World>> {
         let worlds = FuturesUnordered::new();
         let worlds = FuturesUnordered::new();
         let mut rng = self.rng.borrow_mut();
         let mut rng = self.rng.borrow_mut();
         for _ in 0..num {
         for _ in 0..num {
@@ -99,49 +89,41 @@ impl PgConnection {
                 self.cl
                 self.cl
                     .query_one(&self.world, &[&w_id])
                     .query_one(&self.world, &[&w_id])
                     .map(|res| match res {
                     .map(|res| match res {
-                        Err(e) => {
-                            Err(io::Error::new(io::ErrorKind::Other, format!("{:?}", e)))
-                        }
-                        Ok(row) => Ok(World {
+                        Err(_) => panic!(),
+                        Ok(row) => World {
                             id: row.get(0),
                             id: row.get(0),
                             randomnumber: row.get(1),
                             randomnumber: row.get(1),
-                        }),
+                        },
                     }),
                     }),
             );
             );
         }
         }
 
 
-        worlds.try_collect()
+        worlds.collect()
     }
     }
 
 
-    pub fn update(
-        &self,
-        num: u16,
-    ) -> impl Future<Output = Result<Vec<World>, io::Error>> {
+    pub fn update(&self, num: u16) -> impl Future<Output = Vec<World>> {
         let worlds = FuturesUnordered::new();
         let worlds = FuturesUnordered::new();
         let mut rng = self.rng.borrow_mut();
         let mut rng = self.rng.borrow_mut();
         for _ in 0..num {
         for _ in 0..num {
             let id = (rng.get_u32() % 10_000 + 1) as i32;
             let id = (rng.get_u32() % 10_000 + 1) as i32;
             let w_id = (rng.get_u32() % 10_000 + 1) as i32;
             let w_id = (rng.get_u32() % 10_000 + 1) as i32;
-            worlds.push(self.cl.query_one(&self.world, &[&w_id]).map(
-                move |res| match res {
-                    Err(e) => {
-                        Err(io::Error::new(io::ErrorKind::Other, format!("{:?}", e)))
-                    }
-                    Ok(row) => {
-                        let world = World {
+            worlds.push(
+                self.cl
+                    .query_one(&self.world, &[&w_id])
+                    .map(move |res| match res {
+                        Err(_) => panic!(),
+                        Ok(row) => World {
                             id: row.get(0),
                             id: row.get(0),
                             randomnumber: id,
                             randomnumber: id,
-                        };
-                        Ok(world)
-                    }
-                },
-            ));
+                        },
+                    }),
+            );
         }
         }
 
 
         let cl = self.cl.clone();
         let cl = self.cl.clone();
         let st = self.updates[(num as usize) - 1].clone();
         let st = self.updates[(num as usize) - 1].clone();
         async move {
         async move {
-            let worlds: Vec<World> = worlds.try_collect().await?;
+            let worlds: Vec<World> = worlds.collect().await;
 
 
             let mut params: Vec<&dyn ToSql> = Vec::with_capacity(num as usize * 3);
             let mut params: Vec<&dyn ToSql> = Vec::with_capacity(num as usize * 3);
             for w in &worlds {
             for w in &worlds {
@@ -156,27 +138,22 @@ impl PgConnection {
                 .await
                 .await
                 .map_err(|e| log::error!("{:?}", e));
                 .map_err(|e| log::error!("{:?}", e));
 
 
-            Ok(worlds)
+            worlds
         }
         }
     }
     }
 
 
-    pub fn tell_fortune(&self) -> impl Future<Output = Result<Bytes, io::Error>> {
+    pub fn tell_fortune(&self) -> impl Future<Output = Bytes> {
         let fut = self.cl.query_raw(&self.fortune, &[]);
         let fut = self.cl.query_raw(&self.fortune, &[]);
 
 
         async move {
         async move {
-            let mut stream = fut
-                .await
-                .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{:?}", e)))?;
-
+            let mut stream = fut.await.unwrap();
             let mut fortunes: SmallVec<[_; 32]> = smallvec::smallvec![Fortune {
             let mut fortunes: SmallVec<[_; 32]> = smallvec::smallvec![Fortune {
                 id: 0,
                 id: 0,
                 message: Cow::Borrowed("Additional fortune added at request time."),
                 message: Cow::Borrowed("Additional fortune added at request time."),
             }];
             }];
 
 
             while let Some(row) = stream.next().await {
             while let Some(row) = stream.next().await {
-                let row = row.map_err(|e| {
-                    io::Error::new(io::ErrorKind::Other, format!("{:?}", e))
-                })?;
+                let row = row.unwrap();
                 fortunes.push(Fortune {
                 fortunes.push(Fortune {
                     id: row.get(0),
                     id: row.get(0),
                     message: Cow::Owned(row.get(1)),
                     message: Cow::Owned(row.get(1)),
@@ -188,7 +165,7 @@ impl PgConnection {
             let mut buf = Vec::with_capacity(2048);
             let mut buf = Vec::with_capacity(2048);
             ywrite_html!(buf, "{{> fortune }}");
             ywrite_html!(buf, "{{> fortune }}");
 
 
-            Ok(Bytes::from(buf))
+            Bytes::from(buf)
         }
         }
     }
     }
 }
 }

+ 16 - 34
frameworks/Rust/ntex/src/main.rs

@@ -2,53 +2,37 @@
 static GLOBAL: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
 static GLOBAL: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
 
 
 use bytes::Bytes;
 use bytes::Bytes;
+use ntex::http::header::{HeaderValue, CONTENT_TYPE, SERVER};
 use ntex::{http, web};
 use ntex::{http, web};
 use yarte::Serialize;
 use yarte::Serialize;
 
 
 mod utils;
 mod utils;
-use utils::SIZE;
 
 
 #[derive(Serialize)]
 #[derive(Serialize)]
 pub struct Message {
 pub struct Message {
     pub message: &'static str,
     pub message: &'static str,
 }
 }
 
 
+#[web::get("/json")]
 async fn json() -> web::HttpResponse {
 async fn json() -> web::HttpResponse {
-    let mut body = Vec::with_capacity(SIZE);
+    let mut body = Vec::with_capacity(utils::SIZE);
     Message {
     Message {
         message: "Hello, World!",
         message: "Hello, World!",
     }
     }
     .to_bytes_mut(&mut body);
     .to_bytes_mut(&mut body);
 
 
-    let mut res = web::HttpResponse::with_body(
-        http::StatusCode::OK,
-        http::body::Body::Bytes(Bytes::from(body)),
-    );
-    res.headers_mut().insert(
-        http::header::SERVER,
-        http::header::HeaderValue::from_static("N"),
-    );
-    res.headers_mut().insert(
-        http::header::CONTENT_TYPE,
-        http::header::HeaderValue::from_static("application/json"),
-    );
-    res
+    web::HttpResponse::Ok()
+        .header(SERVER, HeaderValue::from_static("N"))
+        .header(CONTENT_TYPE, HeaderValue::from_static("application/json"))
+        .body(body)
 }
 }
 
 
+#[web::get("/plaintext")]
 async fn plaintext() -> web::HttpResponse {
 async fn plaintext() -> web::HttpResponse {
-    let mut res = web::HttpResponse::with_body(
-        http::StatusCode::OK,
-        http::body::Body::Bytes(Bytes::from_static(b"Hello, World!")),
-    );
-    res.headers_mut().insert(
-        http::header::SERVER,
-        http::header::HeaderValue::from_static("N"),
-    );
-    res.headers_mut().insert(
-        http::header::CONTENT_TYPE,
-        http::header::HeaderValue::from_static("text/plain"),
-    );
-    res
+    web::HttpResponse::Ok()
+        .header(SERVER, HeaderValue::from_static("N"))
+        .header(CONTENT_TYPE, HeaderValue::from_static("text/plain"))
+        .body(Bytes::from_static(b"Hello, World!"))
 }
 }
 
 
 #[ntex::main]
 #[ntex::main]
@@ -63,12 +47,10 @@ async fn main() -> std::io::Result<()> {
                 .keep_alive(http::KeepAlive::Os)
                 .keep_alive(http::KeepAlive::Os)
                 .client_timeout(0)
                 .client_timeout(0)
                 .disconnect_timeout(0)
                 .disconnect_timeout(0)
-                .h1(ntex::map_config(
-                    web::App::new()
-                        .service(web::resource("/json").to(json))
-                        .service(web::resource("/plaintext").to(plaintext)),
-                    |_| web::dev::AppConfig::default(),
-                ))
+                .h1(web::App::new()
+                    .service(json)
+                    .service(plaintext)
+                    .with_config(web::dev::AppConfig::default()))
                 .tcp()
                 .tcp()
         })?
         })?
         .start()
         .start()

+ 41 - 89
frameworks/Rust/ntex/src/main_db.rs

@@ -1,28 +1,20 @@
 #[global_allocator]
 #[global_allocator]
 static GLOBAL: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
 static GLOBAL: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
 
 
-use std::{future::Future, pin::Pin, task::Context, task::Poll};
+use std::{pin::Pin, task::Context, task::Poll};
 
 
 use bytes::BytesMut;
 use bytes::BytesMut;
-use futures::future::ok;
-use ntex::http::body::Body;
+use futures::future::{ok, Future, FutureExt};
 use ntex::http::header::{HeaderValue, CONTENT_TYPE, SERVER};
 use ntex::http::header::{HeaderValue, CONTENT_TYPE, SERVER};
-use ntex::http::{HttpService, KeepAlive, Request, Response, StatusCode};
+use ntex::http::{HttpService, KeepAlive, Request, Response};
 use ntex::service::{Service, ServiceFactory};
 use ntex::service::{Service, ServiceFactory};
-use ntex::web::Error;
+use ntex::web::{Error, HttpResponse};
 use yarte::Serialize;
 use yarte::Serialize;
 
 
 mod db;
 mod db;
 mod utils;
 mod utils;
 
 
-use crate::db::PgConnection;
-
-struct App {
-    db: PgConnection,
-    hdr_srv: HeaderValue,
-    hdr_ctjson: HeaderValue,
-    hdr_cthtml: HeaderValue,
-}
+struct App(db::PgConnection);
 
 
 impl Service for App {
 impl Service for App {
     type Request = Request;
     type Request = Request;
@@ -37,78 +29,46 @@ impl Service for App {
 
 
     fn call(&self, req: Request) -> Self::Future {
     fn call(&self, req: Request) -> Self::Future {
         match req.path() {
         match req.path() {
-            "/db" => {
-                let h_srv = self.hdr_srv.clone();
-                let h_ct = self.hdr_ctjson.clone();
-                let fut = self.db.get_world();
-
-                Box::pin(async move {
-                    let body = fut.await?;
-                    let mut res = Response::with_body(StatusCode::OK, Body::Bytes(body));
-                    let hdrs = res.headers_mut();
-                    hdrs.insert(SERVER, h_srv);
-                    hdrs.insert(CONTENT_TYPE, h_ct);
-                    Ok(res)
-                })
-            }
-            "/fortunes" => {
-                let h_srv = self.hdr_srv.clone();
-                let h_ct = self.hdr_cthtml.clone();
-                let fut = self.db.tell_fortune();
-
-                Box::pin(async move {
-                    let body = fut.await?;
-                    let mut res = Response::with_body(StatusCode::OK, Body::Bytes(body));
-                    let hdrs = res.headers_mut();
-                    hdrs.insert(SERVER, h_srv);
-                    hdrs.insert(CONTENT_TYPE, h_ct);
-                    Ok(res)
-                })
-            }
-            "/query" => {
-                let q = utils::get_query_param(req.uri().query().unwrap_or("")) as usize;
-                let h_srv = self.hdr_srv.clone();
-                let h_ct = self.hdr_ctjson.clone();
-                let fut = self.db.get_worlds(q);
-
-                Box::pin(async move {
-                    let worlds = fut.await?;
-                    let size = 35 * worlds.len();
-                    let mut res = Response::with_body(
-                        StatusCode::OK,
-                        Body::Bytes(worlds.to_bytes::<BytesMut>(size)),
-                    );
-                    let hdrs = res.headers_mut();
-                    hdrs.insert(SERVER, h_srv);
-                    hdrs.insert(CONTENT_TYPE, h_ct);
-                    Ok(res)
-                })
-            }
-            "/update" => {
-                let q = utils::get_query_param(req.uri().query().unwrap_or(""));
-                let h_srv = self.hdr_srv.clone();
-                let h_ct = self.hdr_ctjson.clone();
-                let fut = self.db.update(q);
-
-                Box::pin(async move {
-                    let worlds = fut.await?;
-                    let size = 35 * worlds.len();
-                    let mut res = Response::with_body(
-                        StatusCode::OK,
-                        Body::Bytes(worlds.to_bytes::<BytesMut>(size)),
-                    );
-                    let hdrs = res.headers_mut();
-                    hdrs.insert(SERVER, h_srv);
-                    hdrs.insert(CONTENT_TYPE, h_ct);
-                    Ok(res)
-                })
-            }
+            "/db" => Box::pin(self.0.get_world().map(|body| {
+                Ok(HttpResponse::Ok()
+                    .header(SERVER, HeaderValue::from_static("N"))
+                    .header(CONTENT_TYPE, HeaderValue::from_static("application/json"))
+                    .body(body))
+            })),
+            "/fortunes" => Box::pin(self.0.tell_fortune().map(|body| {
+                Ok(HttpResponse::Ok()
+                    .header(SERVER, HeaderValue::from_static("N"))
+                    .header(
+                        CONTENT_TYPE,
+                        HeaderValue::from_static("text/html; charset=utf-8"),
+                    )
+                    .body(body))
+            })),
+            "/query" => Box::pin(
+                self.0
+                    .get_worlds(utils::get_query_param(req.uri().query()))
+                    .map(|worlds| {
+                        Ok(HttpResponse::Ok()
+                            .header(SERVER, HeaderValue::from_static("N"))
+                            .header(CONTENT_TYPE, HeaderValue::from_static("application/json"))
+                            .body(worlds.to_bytes::<BytesMut>(35 * worlds.len())))
+                    }),
+            ),
+            "/update" => Box::pin(
+                self.0
+                    .update(utils::get_query_param(req.uri().query()))
+                    .map(|worlds| {
+                        Ok(HttpResponse::Ok()
+                            .header(SERVER, HeaderValue::from_static("N"))
+                            .header(CONTENT_TYPE, HeaderValue::from_static("application/json"))
+                            .body(worlds.to_bytes::<BytesMut>(35 * worlds.len())))
+                    }),
+            ),
             _ => Box::pin(ok(Response::new(http::StatusCode::NOT_FOUND))),
             _ => Box::pin(ok(Response::new(http::StatusCode::NOT_FOUND))),
         }
         }
     }
     }
 }
 }
 
 
-#[derive(Clone)]
 struct AppFactory;
 struct AppFactory;
 
 
 impl ServiceFactory for AppFactory {
 impl ServiceFactory for AppFactory {
@@ -124,15 +84,7 @@ impl ServiceFactory for AppFactory {
         const DB_URL: &str =
         const DB_URL: &str =
             "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
             "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
 
 
-        Box::pin(async move {
-            let db = PgConnection::connect(DB_URL).await;
-            Ok(App {
-                db,
-                hdr_srv: HeaderValue::from_static("N"),
-                hdr_ctjson: HeaderValue::from_static("application/json"),
-                hdr_cthtml: HeaderValue::from_static("text/html; charset=utf-8"),
-            })
-        })
+        Box::pin(async move { Ok(App(db::PgConnection::connect(DB_URL).await)) })
     }
     }
 }
 }
 
 

+ 5 - 5
frameworks/Rust/ntex/src/main_raw.rs

@@ -1,9 +1,7 @@
 #[global_allocator]
 #[global_allocator]
 static GLOBAL: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
 static GLOBAL: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
 
 
-use std::{
-    cell::RefCell, future::Future, io, pin::Pin, rc::Rc, task::Context, task::Poll,
-};
+use std::{cell::RefCell, future::Future, io, pin::Pin, rc::Rc, task::Context, task::Poll};
 
 
 use ntex::fn_service;
 use ntex::fn_service;
 use ntex::framed::{ReadTask, State, WriteTask};
 use ntex::framed::{ReadTask, State, WriteTask};
@@ -13,8 +11,10 @@ use yarte::Serialize;
 
 
 mod utils;
 mod utils;
 
 
-const JSON: &[u8] = b"HTTP/1.1 200 OK\r\nServer: N\r\nContent-Type: application/json\r\nContent-Length: 27\r\n";
-const PLAIN: &[u8] = b"HTTP/1.1 200 OK\r\nServer: N\r\nContent-Type: text/plain\r\nContent-Length: 13\r\n";
+const JSON: &[u8] =
+    b"HTTP/1.1 200 OK\r\nServer: N\r\nContent-Type: application/json\r\nContent-Length: 27\r\n";
+const PLAIN: &[u8] =
+    b"HTTP/1.1 200 OK\r\nServer: N\r\nContent-Type: text/plain\r\nContent-Length: 13\r\n";
 const HTTPNFOUND: &[u8] = b"HTTP/1.1 400 OK\r\n";
 const HTTPNFOUND: &[u8] = b"HTTP/1.1 400 OK\r\n";
 const HDR_SERVER: &[u8] = b"Server: N\r\n";
 const HDR_SERVER: &[u8] = b"Server: N\r\n";
 const BODY: &[u8] = b"Hello, World!";
 const BODY: &[u8] = b"Hello, World!";

+ 2 - 1
frameworks/Rust/ntex/src/utils.rs

@@ -5,7 +5,8 @@ use atoi::FromRadix10;
 
 
 pub const SIZE: usize = 27;
 pub const SIZE: usize = 27;
 
 
-pub fn get_query_param(query: &str) -> u16 {
+pub fn get_query_param(query: Option<&str>) -> u16 {
+    let query = query.unwrap_or("");
     let q = if let Some(pos) = query.find('q') {
     let q = if let Some(pos) = query.find('q') {
         u16::from_radix_10(query.split_at(pos + 2).1.as_ref()).0
         u16::from_radix_10(query.split_at(pos + 2).1.as_ref()).0
     } else {
     } else {