|
@@ -23,7 +23,7 @@ use std::{
|
|
use futures_util::stream::Stream;
|
|
use futures_util::stream::Stream;
|
|
use xitca_http::{
|
|
use xitca_http::{
|
|
body::Once,
|
|
body::Once,
|
|
- date::{DateTime, DateTimeService},
|
|
|
|
|
|
+ date::DateTimeService,
|
|
h1::proto::context::Context,
|
|
h1::proto::context::Context,
|
|
http::{
|
|
http::{
|
|
self,
|
|
self,
|
|
@@ -34,7 +34,8 @@ use xitca_http::{
|
|
util::service::context::{Context as Ctx, ContextBuilder},
|
|
util::service::context::{Context as Ctx, ContextBuilder},
|
|
};
|
|
};
|
|
use xitca_io::{
|
|
use xitca_io::{
|
|
- bytes::{Buf, Bytes, BytesMut, PagedBytesMut},
|
|
|
|
|
|
+ bytes::{Bytes, BytesMut, PagedBytesMut},
|
|
|
|
+ io_uring::IoBuf,
|
|
net::TcpStream,
|
|
net::TcpStream,
|
|
};
|
|
};
|
|
use xitca_service::{fn_service, middleware::UncheckedReady, Service, ServiceExt};
|
|
use xitca_service::{fn_service, middleware::UncheckedReady, Service, ServiceExt};
|
|
@@ -126,15 +127,17 @@ where
|
|
{
|
|
{
|
|
type Response = Http1IOUService<S::Response>;
|
|
type Response = Http1IOUService<S::Response>;
|
|
type Error = S::Error;
|
|
type Error = S::Error;
|
|
- type Future<'f> = impl Future<Output = Result<Self::Response, Self::Error>> + 'f where Self: 'f, (): 'f ;
|
|
|
|
|
|
+ type Future<'f> = impl Future<Output = Result<Self::Response, Self::Error>> + 'f
|
|
|
|
+ where
|
|
|
|
+ Self: 'f,
|
|
|
|
+ (): 'f;
|
|
|
|
|
|
fn call<'s>(&'s self, _: ()) -> Self::Future<'s>
|
|
fn call<'s>(&'s self, _: ()) -> Self::Future<'s>
|
|
where
|
|
where
|
|
(): 's,
|
|
(): 's,
|
|
{
|
|
{
|
|
async {
|
|
async {
|
|
- let service = self.service.call(()).await?;
|
|
|
|
- Ok(Http1IOUService {
|
|
|
|
|
|
+ self.service.call(()).await.map(|service| Http1IOUService {
|
|
service,
|
|
service,
|
|
date: DateTimeService::new(),
|
|
date: DateTimeService::new(),
|
|
})
|
|
})
|
|
@@ -155,7 +158,10 @@ where
|
|
{
|
|
{
|
|
type Response = ();
|
|
type Response = ();
|
|
type Error = io::Error;
|
|
type Error = io::Error;
|
|
- type Future<'f> = impl Future<Output = Result<Self::Response, Self::Error>> + 'f where Self: 'f, TcpStream: 'f ;
|
|
|
|
|
|
+ type Future<'f> = impl Future<Output = Result<Self::Response, Self::Error>> + 'f
|
|
|
|
+ where
|
|
|
|
+ Self: 'f,
|
|
|
|
+ TcpStream: 'f;
|
|
|
|
|
|
fn call<'s>(&'s self, stream: TcpStream) -> Self::Future<'s>
|
|
fn call<'s>(&'s self, stream: TcpStream) -> Self::Future<'s>
|
|
where
|
|
where
|
|
@@ -166,115 +172,48 @@ where
|
|
let mut paged = PagedBytesMut::new();
|
|
let mut paged = PagedBytesMut::new();
|
|
let mut write_buf = BytesMut::with_capacity(4096);
|
|
let mut write_buf = BytesMut::with_capacity(4096);
|
|
|
|
|
|
- #[cfg(feature = "io-uring")]
|
|
|
|
- {
|
|
|
|
- use tokio_uring::buf::IoBuf;
|
|
|
|
-
|
|
|
|
- let std = stream.into_std()?;
|
|
|
|
- let stream = tokio_uring::net::TcpStream::from_std(std);
|
|
|
|
-
|
|
|
|
- 'io: loop {
|
|
|
|
- let mut buf = paged.into_inner();
|
|
|
|
|
|
+ let std = stream.into_std()?;
|
|
|
|
+ let stream = tokio_uring::net::TcpStream::from_std(std);
|
|
|
|
|
|
- let len = buf.len();
|
|
|
|
- let rem = buf.capacity() - len;
|
|
|
|
|
|
+ loop {
|
|
|
|
+ let mut buf = paged.into_inner();
|
|
|
|
|
|
- if rem < 4096 {
|
|
|
|
- buf.reserve(4096 - rem);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- let (res, buf) = stream.read(buf.slice(len..)).await;
|
|
|
|
- let n = res?;
|
|
|
|
- if n == 0 {
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- paged = PagedBytesMut::from(buf.into_inner());
|
|
|
|
-
|
|
|
|
- request_handler(&mut ctx, &self.service, &mut paged, &mut write_buf).await;
|
|
|
|
-
|
|
|
|
- while !write_buf.is_empty() {
|
|
|
|
- let (res, mut w) = stream.write(write_buf).await;
|
|
|
|
- let n = res?;
|
|
|
|
- if n == 0 {
|
|
|
|
- break 'io;
|
|
|
|
- }
|
|
|
|
- w.advance(n);
|
|
|
|
- write_buf = w;
|
|
|
|
- }
|
|
|
|
|
|
+ let len = buf.len();
|
|
|
|
+ let rem = buf.capacity() - len;
|
|
|
|
+ if rem < 4096 {
|
|
|
|
+ buf.reserve(4096 - rem);
|
|
}
|
|
}
|
|
|
|
|
|
- stream.shutdown(std::net::Shutdown::Both)
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- #[cfg(not(feature = "io-uring"))]
|
|
|
|
- {
|
|
|
|
- use xitca_io::{
|
|
|
|
- bytes::BufRead,
|
|
|
|
- io::{AsyncIo, Interest},
|
|
|
|
- };
|
|
|
|
-
|
|
|
|
- let mut stream = stream;
|
|
|
|
-
|
|
|
|
- 'io: loop {
|
|
|
|
- let interest = if write_buf.is_empty() {
|
|
|
|
- Interest::READABLE
|
|
|
|
- } else {
|
|
|
|
- Interest::READABLE | Interest::WRITABLE
|
|
|
|
- };
|
|
|
|
-
|
|
|
|
- let ready = stream.ready(interest).await?;
|
|
|
|
-
|
|
|
|
- if ready.is_readable() {
|
|
|
|
- paged.do_io(&mut stream)?;
|
|
|
|
- request_handler(&mut ctx, &self.service, &mut paged, &mut write_buf).await;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if ready.is_writable() {
|
|
|
|
- 'write: loop {
|
|
|
|
- match io::Write::write(&mut stream, &write_buf) {
|
|
|
|
- Ok(0) => break 'io,
|
|
|
|
- Ok(n) => {
|
|
|
|
- write_buf.advance(n);
|
|
|
|
- if write_buf.is_empty() {
|
|
|
|
- break 'write;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break 'write,
|
|
|
|
- Err(e) => return Err(e),
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ let (res, buf) = stream.read(buf.slice(len..)).await;
|
|
|
|
+ if res? == 0 {
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ paged = PagedBytesMut::from(buf.into_inner());
|
|
|
|
+
|
|
|
|
+ while let Some((req, _)) = ctx.decode_head::<{ usize::MAX }>(&mut paged).unwrap() {
|
|
|
|
+ let (parts, body) = self.service.call(req).await.unwrap().into_parts();
|
|
|
|
+ let mut encoder = ctx.encode_head(parts, &body, &mut write_buf).unwrap();
|
|
|
|
+ let mut body = pin!(body);
|
|
|
|
+ while let Some(chunk) = poll_fn(|cx| body.as_mut().poll_next(cx)).await {
|
|
|
|
+ let chunk = chunk.unwrap();
|
|
|
|
+ encoder.encode(chunk, &mut write_buf);
|
|
}
|
|
}
|
|
|
|
+ encoder.encode_eof(&mut write_buf);
|
|
}
|
|
}
|
|
|
|
|
|
- Ok(())
|
|
|
|
|
|
+ let (res, b) = stream.write_all(write_buf).await;
|
|
|
|
+ res?;
|
|
|
|
+ write_buf = b;
|
|
|
|
+ write_buf.clear();
|
|
}
|
|
}
|
|
- }
|
|
|
|
- }
|
|
|
|
-}
|
|
|
|
|
|
|
|
-async fn request_handler<D, S, const L: usize>(
|
|
|
|
- ctx: &mut Context<'_, D, L>,
|
|
|
|
- service: &S,
|
|
|
|
- paged: &mut PagedBytesMut<4096>,
|
|
|
|
- write_buf: &mut BytesMut,
|
|
|
|
-) where
|
|
|
|
- D: DateTime,
|
|
|
|
- S: Service<Request, Response = Response>,
|
|
|
|
- S::Error: fmt::Debug,
|
|
|
|
-{
|
|
|
|
- while let Some((req, _)) = ctx.decode_head::<{ usize::MAX }>(paged).unwrap() {
|
|
|
|
- let (parts, body) = service.call(req).await.unwrap().into_parts();
|
|
|
|
- let mut encoder = ctx.encode_head(parts, &body, write_buf).unwrap();
|
|
|
|
- let mut body = pin!(body);
|
|
|
|
- while let Some(chunk) = poll_fn(|cx| body.as_mut().poll_next(cx)).await {
|
|
|
|
- let chunk = chunk.unwrap();
|
|
|
|
- encoder.encode(chunk, write_buf);
|
|
|
|
|
|
+ stream.shutdown(std::net::Shutdown::Both)
|
|
}
|
|
}
|
|
- encoder.encode_eof(write_buf);
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
type Request = http::Request<RequestExt<()>>;
|
|
type Request = http::Request<RequestExt<()>>;
|
|
|
|
+
|
|
type Response = http::Response<Once<Bytes>>;
|
|
type Response = http::Response<Once<Bytes>>;
|
|
|
|
|
|
struct State {
|
|
struct State {
|