Browse Source

ntex: update to v0.3.6 (#6423)

* upgrade to ntex-0.3.5

* cleanups
Nikolay Kim 4 years ago
parent
commit
e98df8af4c

+ 3 - 7
frameworks/Rust/ntex/Cargo.toml

@@ -16,8 +16,8 @@ name = "ntex-raw"
 path = "src/main_raw.rs"
 
 [dependencies]
-ntex = "0.3.3"
-snmalloc-rs = { version = "0.2.24", features = ["1mib", "native-cpu"] }
+ntex = "0.3.6"
+snmalloc-rs = { version = "0.2.25", features = ["1mib", "native-cpu"] }
 yarte = { version = "0.15", features = ["bytes-buf", "json"] }
 env_logger = "0.8"
 random-fast-rng = "0.1.1"
@@ -28,17 +28,13 @@ futures = "0.3.13"
 http = "0.2"
 smallvec = "1.6.1"
 simd-json = "0.3.24"
-
 simd-json-derive = "0.1.15"
 serde = { version = "1.0", features = ["derive"] }
 log = { version = "0.4", features = ["release_max_level_off"] }
 tokio = "1"
-tokio-postgres = { git="https://github.com/fafhrd91/postgres.git", branch="ntex-0.3" }
+tokio-postgres = { git="https://github.com/fafhrd91/postgres.git" }
 
 [profile.release]
 lto = true
 opt-level = 3
 codegen-units = 1
-
-[patch.crates-io]
-ntex = { git="https://github.com/ntex-rs/ntex.git", branch="tfb-0.3" }

+ 20 - 29
frameworks/Rust/ntex/src/db.rs

@@ -70,11 +70,11 @@ impl PgConnection {
 impl PgConnection {
     pub fn get_world(&self) -> impl Future<Output = Bytes> {
         let random_id = (self.rng.borrow_mut().get_u32() % 10_000 + 1) as i32;
-        self.cl.query_one(&self.world, &[&random_id]).map(|row| {
-            let row = row.unwrap();
+        self.cl.query(&self.world, &[&random_id]).map(|rows| {
+            let rows = rows.unwrap();
             World {
-                id: row.get(0),
-                randomnumber: row.get(1),
+                id: rows[0].get(0),
+                randomnumber: rows[0].get(1),
             }
             .to_bytes::<BytesMut>(40)
         })
@@ -85,17 +85,13 @@ impl PgConnection {
         let mut rng = self.rng.borrow_mut();
         for _ in 0..num {
             let w_id = (rng.get_u32() % 10_000 + 1) as i32;
-            worlds.push(
-                self.cl
-                    .query_one(&self.world, &[&w_id])
-                    .map(|res| match res {
-                        Err(_) => panic!(),
-                        Ok(row) => World {
-                            id: row.get(0),
-                            randomnumber: row.get(1),
-                        },
-                    }),
-            );
+            worlds.push(self.cl.query(&self.world, &[&w_id]).map(|res| {
+                let rows = res.unwrap();
+                World {
+                    id: rows[0].get(0),
+                    randomnumber: rows[0].get(1),
+                }
+            }));
         }
 
         worlds.collect()
@@ -107,17 +103,13 @@ impl PgConnection {
         for _ in 0..num {
             let id = (rng.get_u32() % 10_000 + 1) as i32;
             let w_id = (rng.get_u32() % 10_000 + 1) as i32;
-            worlds.push(
-                self.cl
-                    .query_one(&self.world, &[&w_id])
-                    .map(move |res| match res {
-                        Err(_) => panic!(),
-                        Ok(row) => World {
-                            id: row.get(0),
-                            randomnumber: id,
-                        },
-                    }),
-            );
+            worlds.push(self.cl.query(&self.world, &[&w_id]).map(move |res| {
+                let rows = res.unwrap();
+                World {
+                    id: rows[0].get(0),
+                    randomnumber: id,
+                }
+            }));
         }
 
         let cl = self.cl.clone();
@@ -146,14 +138,13 @@ impl PgConnection {
         let fut = self.cl.query_raw(&self.fortune, &[]);
 
         async move {
-            let mut stream = fut.await.unwrap();
+            let rows = fut.await.unwrap();
             let mut fortunes: SmallVec<[_; 32]> = smallvec::smallvec![Fortune {
                 id: 0,
                 message: Cow::Borrowed("Additional fortune added at request time."),
             }];
 
-            while let Some(row) = stream.next().await {
-                let row = row.unwrap();
+            for row in rows {
                 fortunes.push(Fortune {
                     id: row.get(0),
                     message: Cow::Owned(row.get(1)),

+ 2 - 0
frameworks/Rust/ntex/src/main.rs

@@ -47,6 +47,8 @@ async fn main() -> std::io::Result<()> {
                 .keep_alive(http::KeepAlive::Os)
                 .client_timeout(0)
                 .disconnect_timeout(0)
+                .read_high_watermark(65535)
+                .write_high_watermark(65535)
                 .h1(web::App::new()
                     .service(json)
                     .service(plaintext)

+ 2 - 0
frameworks/Rust/ntex/src/main_db.rs

@@ -99,6 +99,8 @@ async fn main() -> std::io::Result<()> {
                 .keep_alive(KeepAlive::Os)
                 .client_timeout(0)
                 .disconnect_timeout(0)
+                .read_high_watermark(65535)
+                .write_high_watermark(65535)
                 .h1(AppFactory)
                 .tcp()
         })?

+ 30 - 22
frameworks/Rust/ntex/src/main_raw.rs

@@ -43,25 +43,33 @@ impl Future for App {
         loop {
             match this.state.decode_item(&this.codec) {
                 Ok(Some((req, _))) => {
-                    match req.path() {
-                        "/json" => this.state.with_write_buf(|buf| {
-                            buf.extend_from_slice(JSON);
-                            this.codec.set_date_header(buf);
-                            Message {
-                                message: "Hello, World!",
+                    this.state.with_write_buf(|buf| {
+                        // make sure we've got room
+                        let remaining = buf.capacity() - buf.len();
+                        if remaining < 1024 {
+                            buf.reserve(65535 - remaining);
+                        }
+
+                        match req.path() {
+                            "/json" => {
+                                buf.extend_from_slice(JSON);
+                                this.codec.set_date_header(buf);
+                                Message {
+                                    message: "Hello, World!",
+                                }
+                                .to_bytes_mut(buf);
+                            }
+                            "/plaintext" => {
+                                buf.extend_from_slice(PLAIN);
+                                this.codec.set_date_header(buf);
+                                buf.extend_from_slice(BODY);
                             }
-                            .to_bytes_mut(buf);
-                        }),
-                        "/plaintext" => this.state.with_write_buf(|buf| {
-                            buf.extend_from_slice(PLAIN);
-                            this.codec.set_date_header(buf);
-                            buf.extend_from_slice(BODY);
-                        }),
-                        _ => this.state.with_write_buf(|buf| {
-                            buf.extend_from_slice(HTTPNFOUND);
-                            buf.extend_from_slice(HDR_SERVER);
-                        }),
-                    }
+                            _ => {
+                                buf.extend_from_slice(HTTPNFOUND);
+                                buf.extend_from_slice(HDR_SERVER);
+                            }
+                        }
+                    });
                     updated = true;
                 }
                 Ok(None) => break,
@@ -74,10 +82,10 @@ impl Future for App {
         if updated {
             this.state.dsp_restart_write_task();
         }
-        if !this.state.is_read_ready() {
-            this.state.dsp_read_more_data(cx.waker());
-        } else {
+        if this.state.is_read_ready() {
             this.state.dsp_register_task(cx.waker());
+        } else {
+            this.state.dsp_read_more_data(cx.waker());
         }
         Poll::Pending
     }
@@ -92,7 +100,7 @@ async fn main() -> io::Result<()> {
         .backlog(1024)
         .bind("techempower", "0.0.0.0:8080", || {
             fn_service(|io: TcpStream| {
-                let state = State::new().disconnect_timeout(0);
+                let state = State::with_params(65535, 65535, 1024, 0);
                 let io = Rc::new(RefCell::new(io));
                 ntex::rt::spawn(ReadTask::new(io.clone(), state.clone()));
                 ntex::rt::spawn(WriteTask::new(io, state.clone()));