Browse Source

ntex: upgrade to v0.2 (#6316)

* upgrade to ntex v0.2

* trigger build

* update postgres driver
Nikolay Kim 4 years ago
parent
commit
8a73cb089e

+ 8 - 8
frameworks/Rust/ntex/Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 name = "ntex"
-version = "0.1.0"
+version = "0.2.0"
 edition = "2018"
 
 [[bin]]
@@ -20,19 +20,19 @@ name = "ntex-sailfish"
 path = "src/main_sailfish.rs"
 
 [dependencies]
-ntex = "0.1.21"
+ntex = "0.2.0-b.5"
 mimalloc = { version = "0.1.21", default-features = false }
 yarte = { version = "0.14", features = ["bytes-buf", "json"] }
-env_logger = "0.7"
+env_logger = "0.8"
 random-fast-rng = "0.1.1"
 bytes = "0.5.6"
 atoi = "0.3.2"
-num_cpus = "1.0"
-futures = "0.3.4"
+num_cpus = "1.13"
+futures = "0.3.12"
 http = "0.2"
-smallvec = "1.4.0"
-simd-json = "0.3.14"
-simd-json-derive = "0.1.9"
+smallvec = "1.6.1"
+simd-json = "0.3.23"
+simd-json-derive = "0.1.15"
 serde = { version = "1.0", features = ["derive"] }
 log = { version = "0.4", features = ["release_max_level_off"] }
 tokio = "=0.2.6"

+ 1 - 1
frameworks/Rust/ntex/ntex-db.dockerfile

@@ -1,4 +1,4 @@
-FROM rust:1.46
+FROM rust:1.49
 
 # Disable simd at jsonescape
 ENV CARGO_CFG_JSONESCAPE_DISABLE_AUTO_SIMD=

+ 1 - 1
frameworks/Rust/ntex/ntex-raw.dockerfile

@@ -1,4 +1,4 @@
-FROM rust:1.46
+FROM rust:1.49
 
 # Disable simd at jsonescape
 ENV CARGO_CFG_JSONESCAPE_DISABLE_AUTO_SIMD=

+ 1 - 1
frameworks/Rust/ntex/ntex-sailfish.dockerfile

@@ -1,4 +1,4 @@
-FROM rust:1.46
+FROM rust:1.49
 
 RUN apt-get update -yqq && apt-get install -yqq cmake g++
 

+ 1 - 1
frameworks/Rust/ntex/ntex.dockerfile

@@ -1,4 +1,4 @@
-FROM rust:1.46
+FROM rust:1.49
 
 # Disable simd at jsonescape
 ENV CARGO_CFG_JSONESCAPE_DISABLE_AUTO_SIMD=

+ 1 - 2
frameworks/Rust/ntex/src/main_db.rs

@@ -38,8 +38,7 @@ impl Service for App {
     }
 
     fn call(&self, req: Request) -> Self::Future {
-        let path = req.path();
-        match path {
+        match req.path() {
             "/db" => {
                 let h_srv = self.hdr_srv.clone();
                 let h_ct = self.hdr_ctjson.clone();

+ 53 - 90
frameworks/Rust/ntex/src/main_raw.rs

@@ -1,15 +1,13 @@
 #[global_allocator]
 static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
 
-use std::future::Future;
-use std::io;
-use std::pin::Pin;
-use std::task::{Context, Poll};
+use std::{
+    cell::RefCell, future::Future, io, pin::Pin, rc::Rc, task::Context, task::Poll,
+};
 
-use bytes::BytesMut;
-use ntex::codec::{AsyncRead, AsyncWrite, Decoder};
 use ntex::fn_service;
-use ntex::http::{h1, Request};
+use ntex::framed::{ReadTask, State, WriteTask};
+use ntex::http::h1;
 use ntex::rt::net::TcpStream;
 use yarte::Serialize;
 
@@ -27,100 +25,61 @@ pub struct Message {
 }
 
 struct App {
-    io: TcpStream,
-    read_buf: BytesMut,
-    write_buf: BytesMut,
-    write_pos: usize,
+    state: State,
     codec: h1::Codec,
 }
 
-impl App {
-    fn handle_request(&mut self, req: Request) {
-        match req.path() {
-            "/json" => {
-                self.write_buf.extend_from_slice(JSON);
-                self.codec.set_date_header(&mut self.write_buf);
-                Message {
-                    message: "Hello, World!",
-                }
-                .to_bytes_mut(&mut self.write_buf);
-            }
-            "/plaintext" => {
-                self.write_buf.extend_from_slice(PLAIN);
-                self.codec.set_date_header(&mut self.write_buf);
-                self.write_buf.extend_from_slice(BODY);
-            }
-            _ => {
-                self.write_buf.extend_from_slice(HTTPNFOUND);
-                self.write_buf.extend_from_slice(HDR_SERVER);
-            }
-        }
-    }
-}
-
 impl Future for App {
     type Output = Result<(), ()>;
 
     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
         let this = self.as_mut().get_mut();
-
-        if !this.write_buf.is_empty() {
-            let len = this.write_buf.len();
-            let mut written = this.write_pos;
-            while written < len {
-                match Pin::new(&mut this.io).poll_write(cx, &this.write_buf[written..]) {
-                    Poll::Pending => {
-                        break;
-                    }
-                    Poll::Ready(Ok(n)) => {
-                        if n == 0 {
-                            return Poll::Ready(Ok(()));
-                        } else {
-                            written += n;
-                        }
-                    }
-                    Poll::Ready(Err(_)) => return Poll::Ready(Err(())),
-                }
-            }
-            if written == len {
-                this.write_pos = 0;
-                unsafe { this.write_buf.set_len(0) }
-            } else if written > 0 {
-                this.write_pos = written;
-                return Poll::Pending;
-            }
-        }
-
-        if this.read_buf.capacity() - this.read_buf.len() < 4096 {
-            this.read_buf.reserve(32_768);
+        if !this.state.is_open() {
+            this.state.close();
+            return Poll::Ready(Ok(()));
         }
 
+        let mut updated = false;
         loop {
-            let read = Pin::new(&mut this.io).poll_read_buf(cx, &mut this.read_buf);
-            match read {
-                Poll::Pending => break,
-                Poll::Ready(Ok(n)) => {
-                    if n == 0 {
-                        return Poll::Ready(Ok(()));
+            match this.state.decode_item(&this.codec) {
+                Ok(Some((req, _))) => {
+                    match req.path() {
+                        "/json" => this.state.with_write_buf(|buf| {
+                            buf.extend_from_slice(JSON);
+                            this.codec.set_date_header(buf);
+                            Message {
+                                message: "Hello, World!",
+                            }
+                            .to_bytes_mut(buf);
+                        }),
+                        "/plaintext" => this.state.with_write_buf(|buf| {
+                            buf.extend_from_slice(PLAIN);
+                            this.codec.set_date_header(buf);
+                            buf.extend_from_slice(BODY);
+                        }),
+                        _ => this.state.with_write_buf(|buf| {
+                            buf.extend_from_slice(HTTPNFOUND);
+                            buf.extend_from_slice(HDR_SERVER);
+                        }),
                     }
+                    updated = true;
                 }
-                Poll::Ready(Err(_)) => return Poll::Ready(Err(())),
-            }
-        }
-
-        loop {
-            match this.codec.decode(&mut this.read_buf) {
-                Ok(Some(h1::Message::Item(req))) => this.handle_request(req),
                 Ok(None) => break,
-                _ => return Poll::Ready(Err(())),
+                _ => {
+                    this.state.close();
+                    return Poll::Ready(Err(()));
+                }
             }
         }
-
-        if !this.write_buf.is_empty() {
-            self.poll(cx)
-        } else {
-            Poll::Pending
+        if updated {
+            this.state.dsp_flush_write_data(cx.waker());
+        }
+        if !this.state.is_read_ready() {
+            this.state.dsp_read_more_data(cx.waker());
+        } else if !updated {
+            this.state.dsp_register_task(cx.waker());
         }
+        Poll::Pending
     }
 }
 
@@ -132,12 +91,16 @@ async fn main() -> io::Result<()> {
     ntex::server::build()
         .backlog(1024)
         .bind("techempower", "0.0.0.0:8080", || {
-            fn_service(|io: TcpStream| App {
-                io,
-                read_buf: BytesMut::with_capacity(32_768),
-                write_buf: BytesMut::with_capacity(32_768),
-                write_pos: 0,
-                codec: h1::Codec::default(),
+            fn_service(|io: TcpStream| {
+                let state = State::new();
+                let io = Rc::new(RefCell::new(io));
+                ntex::rt::spawn(ReadTask::new(io.clone(), state.clone()));
+                ntex::rt::spawn(WriteTask::new(io, state.clone()));
+
+                App {
+                    state,
+                    codec: h1::Codec::default(),
+                }
             })
         })?
         .start()