|
@@ -6,7 +6,7 @@ use std::io;
|
|
|
use std::pin::Pin;
|
|
|
use std::task::{Context, Poll};
|
|
|
|
|
|
-use bytes::{Buf, BytesMut};
|
|
|
+use bytes::BytesMut;
|
|
|
use ntex::codec::{AsyncRead, AsyncWrite, Decoder};
|
|
|
use ntex::fn_service;
|
|
|
use ntex::http::{h1, Request};
|
|
@@ -30,6 +30,7 @@ struct App {
|
|
|
io: TcpStream,
|
|
|
read_buf: BytesMut,
|
|
|
write_buf: BytesMut,
|
|
|
+ write_pos: usize,
|
|
|
codec: h1::Codec,
|
|
|
}
|
|
|
|
|
@@ -60,13 +61,41 @@ impl App {
|
|
|
impl Future for App {
|
|
|
type Output = Result<(), ()>;
|
|
|
|
|
|
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
|
- let this = self.get_mut();
|
|
|
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
|
+ let this = self.as_mut().get_mut();
|
|
|
|
|
|
- loop {
|
|
|
- if this.read_buf.capacity() - this.read_buf.len() < 512 {
|
|
|
- this.read_buf.reserve(32_768);
|
|
|
+ if !this.write_buf.is_empty() {
|
|
|
+ let len = this.write_buf.len();
|
|
|
+ let mut written = this.write_pos;
|
|
|
+ while written < len {
|
|
|
+ match Pin::new(&mut this.io).poll_write(cx, &this.write_buf[written..]) {
|
|
|
+ Poll::Pending => {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ Poll::Ready(Ok(n)) => {
|
|
|
+ if n == 0 {
|
|
|
+ return Poll::Ready(Ok(()));
|
|
|
+ } else {
|
|
|
+ written += n;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Poll::Ready(Err(_)) => return Poll::Ready(Err(())),
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if written == len {
|
|
|
+ this.write_pos = 0;
|
|
|
+ unsafe { this.write_buf.set_len(0) }
|
|
|
+ } else if written > 0 {
|
|
|
+ this.write_pos = written;
|
|
|
+ return Poll::Pending
|
|
|
}
|
|
|
+ }
|
|
|
+
|
|
|
+ if this.read_buf.capacity() - this.read_buf.len() < 4096 {
|
|
|
+ this.read_buf.reserve(32_768);
|
|
|
+ }
|
|
|
+
|
|
|
+ loop {
|
|
|
let read = Pin::new(&mut this.io).poll_read_buf(cx, &mut this.read_buf);
|
|
|
match read {
|
|
|
Poll::Pending => break,
|
|
@@ -79,10 +108,6 @@ impl Future for App {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if this.write_buf.capacity() - this.write_buf.len() <= 512 {
|
|
|
- this.write_buf.reserve(32_768);
|
|
|
- }
|
|
|
-
|
|
|
loop {
|
|
|
match this.codec.decode(&mut this.read_buf) {
|
|
|
Ok(Some(h1::Message::Item(req))) => this.handle_request(req),
|
|
@@ -92,30 +117,10 @@ impl Future for App {
|
|
|
}
|
|
|
|
|
|
if !this.write_buf.is_empty() {
|
|
|
- let len = this.write_buf.len();
|
|
|
- let mut written = 0;
|
|
|
- while written < len {
|
|
|
- match Pin::new(&mut this.io).poll_write(cx, &this.write_buf[written..]) {
|
|
|
- Poll::Pending => {
|
|
|
- break;
|
|
|
- }
|
|
|
- Poll::Ready(Ok(n)) => {
|
|
|
- if n == 0 {
|
|
|
- return Poll::Ready(Ok(()));
|
|
|
- } else {
|
|
|
- written += n;
|
|
|
- }
|
|
|
- }
|
|
|
- Poll::Ready(Err(_)) => return Poll::Ready(Err(())),
|
|
|
- }
|
|
|
- }
|
|
|
- if written == len {
|
|
|
- unsafe { this.write_buf.set_len(0) }
|
|
|
- } else if written > 0 {
|
|
|
- this.write_buf.advance(written);
|
|
|
- }
|
|
|
+ self.poll(cx)
|
|
|
+ } else {
|
|
|
+ Poll::Pending
|
|
|
}
|
|
|
- Poll::Pending
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -131,6 +136,7 @@ async fn main() -> io::Result<()> {
|
|
|
io,
|
|
|
read_buf: BytesMut::with_capacity(32_768),
|
|
|
write_buf: BytesMut::with_capacity(32_768),
|
|
|
+ write_pos: 0,
|
|
|
codec: h1::Codec::default(),
|
|
|
})
|
|
|
})?
|