|
@@ -10,14 +10,13 @@ mod db;
|
|
mod ser;
|
|
mod ser;
|
|
mod util;
|
|
mod util;
|
|
|
|
|
|
-use std::{convert::Infallible, io};
|
|
|
|
|
|
+use std::io;
|
|
|
|
|
|
use xitca_http::{
|
|
use xitca_http::{
|
|
bytes::BufMutWriter,
|
|
bytes::BufMutWriter,
|
|
h1::dispatcher_unreal::{Dispatcher, Request, Response},
|
|
h1::dispatcher_unreal::{Dispatcher, Request, Response},
|
|
http::StatusCode,
|
|
http::StatusCode,
|
|
};
|
|
};
|
|
-use xitca_io::net::TcpStream;
|
|
|
|
use xitca_service::Service;
|
|
use xitca_service::Service;
|
|
|
|
|
|
use self::{
|
|
use self::{
|
|
@@ -30,49 +29,57 @@ fn main() -> io::Result<()> {
|
|
|
|
|
|
let cores = std::thread::available_parallelism().map(|num| num.get()).unwrap_or(56);
|
|
let cores = std::thread::available_parallelism().map(|num| num.get()).unwrap_or(56);
|
|
|
|
|
|
|
|
+ let mut ids = core_affinity::get_core_ids().unwrap();
|
|
|
|
+
|
|
|
|
+ let worker = move |id: Option<core_affinity::CoreId>| {
|
|
|
|
+ if let Some(id) = id {
|
|
|
|
+ let _ = core_affinity::set_for_current(id);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ tokio::runtime::Builder::new_current_thread()
|
|
|
|
+ .enable_all()
|
|
|
|
+ .build_local(&Default::default())
|
|
|
|
+ .unwrap()
|
|
|
|
+ .block_on(async {
|
|
|
|
+ let socket = tokio::net::TcpSocket::new_v4()?;
|
|
|
|
+ socket.set_reuseaddr(true)?;
|
|
|
|
+ // unrealistic due to following reason:
|
|
|
|
+ // 1. this only works good on unix system.
|
|
|
|
+ // 2. no resource distribution adjustment between sockets on different threads. causing uneven workload
|
|
|
|
+ // where some threads are idle while others busy. resulting in overall increased latency
|
|
|
|
+ socket.set_reuseport(true)?;
|
|
|
|
+ socket.bind(addr)?;
|
|
|
|
+ let listener = socket.listen(1024)?;
|
|
|
|
+
|
|
|
|
+ let client = db::create().await.unwrap();
|
|
|
|
+
|
|
|
|
+ // unrealistic http dispatcher. no spec check. no security feature.
|
|
|
|
+ let service = Dispatcher::new(handler, State::new(client));
|
|
|
|
+
|
|
|
|
+ loop {
|
|
|
|
+ match listener.accept().await {
|
|
|
|
+ Ok((stream, _)) => {
|
|
|
|
+ let service = service.clone();
|
|
|
|
+ tokio::task::spawn_local(async move {
|
|
|
|
+ let _ = service.call(stream.into()).await;
|
|
|
|
+ });
|
|
|
|
+ }
|
|
|
|
+ Err(e) => return Err(e),
|
|
|
|
+ };
|
|
|
|
+ }
|
|
|
|
+ })
|
|
|
|
+ };
|
|
|
|
+
|
|
let handle = core::iter::repeat_with(|| {
|
|
let handle = core::iter::repeat_with(|| {
|
|
- std::thread::spawn(move || {
|
|
|
|
- tokio::runtime::Builder::new_current_thread()
|
|
|
|
- .enable_all()
|
|
|
|
- .build_local(&Default::default())
|
|
|
|
- .unwrap()
|
|
|
|
- .block_on(async {
|
|
|
|
- let socket = tokio::net::TcpSocket::new_v4()?;
|
|
|
|
- socket.set_reuseaddr(true)?;
|
|
|
|
- // unrealistic due to following reason:
|
|
|
|
- // 1. this only works good on unix system.
|
|
|
|
- // 2. no resource distribution adjustment between sockets on different threads. causing uneven workload
|
|
|
|
- // where some threads are idle while others busy. resulting in overall increased latency
|
|
|
|
- socket.set_reuseport(true)?;
|
|
|
|
- socket.bind(addr)?;
|
|
|
|
- let listener = socket.listen(1024)?;
|
|
|
|
-
|
|
|
|
- let client = db::create().await.unwrap();
|
|
|
|
-
|
|
|
|
- // unrealistic http dispatcher. no spec check. no security feature.
|
|
|
|
- let service = Dispatcher::new(handler, State::new(client));
|
|
|
|
-
|
|
|
|
- loop {
|
|
|
|
- match listener.accept().await {
|
|
|
|
- Ok((stream, _)) => {
|
|
|
|
- let stream = stream.into_std()?;
|
|
|
|
- let stream = TcpStream::from_std(stream)?;
|
|
|
|
- let service = service.clone();
|
|
|
|
- tokio::task::spawn_local(async move {
|
|
|
|
- let _ = service.call(stream).await;
|
|
|
|
- });
|
|
|
|
- }
|
|
|
|
- Err(e) => return Err(e),
|
|
|
|
- };
|
|
|
|
- }
|
|
|
|
- })
|
|
|
|
- })
|
|
|
|
|
|
+ let id = ids.pop();
|
|
|
|
+ std::thread::spawn(move || worker(id))
|
|
})
|
|
})
|
|
- .take(cores)
|
|
|
|
|
|
+ .take(cores - 1)
|
|
.collect::<Vec<_>>();
|
|
.collect::<Vec<_>>();
|
|
|
|
|
|
// unrealistic due to no signal handling, not shutdown handling. when killing this process all resources that
|
|
// unrealistic due to no signal handling, not shutdown handling. when killing this process all resources that
|
|
// need clean async shutdown will be leaked.
|
|
// need clean async shutdown will be leaked.
|
|
|
|
+ worker(ids.pop())?;
|
|
for handle in handle {
|
|
for handle in handle {
|
|
handle.join().unwrap()?;
|
|
handle.join().unwrap()?;
|
|
}
|
|
}
|
|
@@ -80,7 +87,7 @@ fn main() -> io::Result<()> {
|
|
Ok(())
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
|
|
-async fn handler<'h>(req: Request<'h>, res: Response<'h>, state: &State<db::Client>) -> Response<'h, 3> {
|
|
|
|
|
|
+async fn handler<'h>(req: Request<'h, State<db::Client>>, res: Response<'h>) -> Response<'h, 3> {
|
|
// unrealistic due to no http method check
|
|
// unrealistic due to no http method check
|
|
match req.path {
|
|
match req.path {
|
|
// unrealistic due to no dynamic path matching
|
|
// unrealistic due to no dynamic path matching
|
|
@@ -91,8 +98,7 @@ async fn handler<'h>(req: Request<'h>, res: Response<'h>, state: &State<db::Clie
|
|
.header("server", "X")
|
|
.header("server", "X")
|
|
// unrealistic content length header.
|
|
// unrealistic content length header.
|
|
.header("content-length", "13")
|
|
.header("content-length", "13")
|
|
- .body_writer(|buf| Ok::<_, Infallible>(buf.extend_from_slice(b"Hello, World!")))
|
|
|
|
- .unwrap()
|
|
|
|
|
|
+ .body_writer(|buf| buf.extend_from_slice(b"Hello, World!"))
|
|
}
|
|
}
|
|
"/json" => res
|
|
"/json" => res
|
|
.status(StatusCode::OK)
|
|
.status(StatusCode::OK)
|
|
@@ -100,12 +106,12 @@ async fn handler<'h>(req: Request<'h>, res: Response<'h>, state: &State<db::Clie
|
|
.header("server", "X")
|
|
.header("server", "X")
|
|
// unrealistic content length header.
|
|
// unrealistic content length header.
|
|
.header("content-length", "27")
|
|
.header("content-length", "27")
|
|
- .body_writer(|buf| serde_json::to_writer(BufMutWriter(buf), &Message::new()))
|
|
|
|
- .unwrap(),
|
|
|
|
|
|
+ .body_writer(|buf| serde_json::to_writer(BufMutWriter(buf), &Message::new()).unwrap()),
|
|
|
|
+
|
|
// all database related categories are unrealistic. please reference db_unrealistic module for detail.
|
|
// all database related categories are unrealistic. please reference db_unrealistic module for detail.
|
|
"/fortunes" => {
|
|
"/fortunes" => {
|
|
use sailfish::TemplateOnce;
|
|
use sailfish::TemplateOnce;
|
|
- let fortunes = state.client.tell_fortune().await.unwrap().render_once().unwrap();
|
|
|
|
|
|
+ let fortunes = req.ctx.client.tell_fortune().await.unwrap().render_once().unwrap();
|
|
res.status(StatusCode::OK)
|
|
res.status(StatusCode::OK)
|
|
.header("content-type", "text/html; charset=utf-8")
|
|
.header("content-type", "text/html; charset=utf-8")
|
|
.header("server", "X")
|
|
.header("server", "X")
|
|
@@ -114,18 +120,18 @@ async fn handler<'h>(req: Request<'h>, res: Response<'h>, state: &State<db::Clie
|
|
"/db" => {
|
|
"/db" => {
|
|
// unrealistic due to no error handling. any db/serialization error will cause process crash.
|
|
// unrealistic due to no error handling. any db/serialization error will cause process crash.
|
|
// the same goes for all following unwraps on database related functions.
|
|
// the same goes for all following unwraps on database related functions.
|
|
- let world = state.client.get_world().await.unwrap();
|
|
|
|
- json_response(res, state, &world)
|
|
|
|
|
|
+ let world = req.ctx.client.get_world().await.unwrap();
|
|
|
|
+ json_response(res, req.ctx, &world)
|
|
}
|
|
}
|
|
p if p.starts_with("/q") => {
|
|
p if p.starts_with("/q") => {
|
|
let num = p["/queries?q=".len()..].parse_query();
|
|
let num = p["/queries?q=".len()..].parse_query();
|
|
- let worlds = state.client.get_worlds(num).await.unwrap();
|
|
|
|
- json_response(res, state, &worlds)
|
|
|
|
|
|
+ let worlds = req.ctx.client.get_worlds(num).await.unwrap();
|
|
|
|
+ json_response(res, req.ctx, &worlds)
|
|
}
|
|
}
|
|
p if p.starts_with("/u") => {
|
|
p if p.starts_with("/u") => {
|
|
let num = p["/updates?q=".len()..].parse_query();
|
|
let num = p["/updates?q=".len()..].parse_query();
|
|
- let worlds = state.client.update(num).await.unwrap();
|
|
|
|
- json_response(res, state, &worlds)
|
|
|
|
|
|
+ let worlds = req.ctx.client.update(num).await.unwrap();
|
|
|
|
+ json_response(res, req.ctx, &worlds)
|
|
}
|
|
}
|
|
_ => res.status(StatusCode::NOT_FOUND).header("server", "X").body(&[]),
|
|
_ => res.status(StatusCode::NOT_FOUND).header("server", "X").body(&[]),
|
|
}
|
|
}
|