|
@@ -1,5 +1,3 @@
|
|
|
-#![feature(generic_associated_types, type_alias_impl_trait)]
|
|
|
-
|
|
|
#[global_allocator]
|
|
|
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
|
|
|
|
|
@@ -8,178 +6,181 @@ mod ser;
|
|
|
mod util;
|
|
|
|
|
|
use std::{
|
|
|
+ cell::{RefCell, RefMut},
|
|
|
convert::Infallible,
|
|
|
error::Error,
|
|
|
- future::ready,
|
|
|
- io,
|
|
|
+ fmt::Debug,
|
|
|
sync::{Arc, Mutex},
|
|
|
};
|
|
|
|
|
|
-use serde::Serialize;
|
|
|
use xitca_http::{
|
|
|
body::ResponseBody,
|
|
|
- bytes::Bytes,
|
|
|
+ bytes::BufMutWriter,
|
|
|
+ bytes::BytesMut,
|
|
|
config::HttpServiceConfig,
|
|
|
h1::RequestBody,
|
|
|
http::{
|
|
|
self,
|
|
|
+ const_header_value::{JSON, TEXT, TEXT_HTML_UTF8},
|
|
|
header::{CONTENT_TYPE, SERVER},
|
|
|
- IntoResponse, Method,
|
|
|
+ IntoResponse,
|
|
|
+ },
|
|
|
+ request,
|
|
|
+ util::{
|
|
|
+ middleware::TcpConfig,
|
|
|
+ service::{
|
|
|
+ context::{object::ContextObjectConstructor, Context, ContextBuilder},
|
|
|
+ route::get,
|
|
|
+ GenericRouter,
|
|
|
+ },
|
|
|
},
|
|
|
- util::service::Route,
|
|
|
HttpServiceBuilder,
|
|
|
};
|
|
|
-use xitca_server::Builder;
|
|
|
+use xitca_service::{fn_service, BuildServiceExt, Service};
|
|
|
|
|
|
use self::db::Client;
|
|
|
use self::ser::Message;
|
|
|
-use self::util::{
|
|
|
- internal, not_found, AppState, QueryParse, JSON_HEADER_VALUE, SERVER_HEADER_VALUE,
|
|
|
- TEXT_HEADER_VALUE,
|
|
|
-};
|
|
|
-
|
|
|
-type Request = xitca_http::Request<RequestBody>;
|
|
|
+use self::util::{QueryParse, SERVER_HEADER_VALUE};
|
|
|
|
|
|
type Response = http::Response<ResponseBody>;
|
|
|
+type Request = request::Request<RequestBody>;
|
|
|
+type State = AppState<Client>;
|
|
|
+
|
|
|
+type Ctx<'a> = Context<'a, Request, State>;
|
|
|
|
|
|
-#[tokio::main(flavor = "current_thread")]
|
|
|
-async fn main() -> io::Result<()> {
|
|
|
- let cores = core_affinity::get_core_ids().unwrap_or_else(Vec::new);
|
|
|
+fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
|
|
|
+ let cores = core_affinity::get_core_ids().unwrap_or_default();
|
|
|
let cores = Arc::new(Mutex::new(cores));
|
|
|
|
|
|
- let factory = || {
|
|
|
- let http = Http {
|
|
|
- config: "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world",
|
|
|
- };
|
|
|
+ let db_url = "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
|
|
|
|
|
|
+ let builder = || {
|
|
|
let config = HttpServiceConfig::new()
|
|
|
.disable_vectored_write()
|
|
|
.max_request_headers::<8>();
|
|
|
|
|
|
- let route = Route::new(http).methods([Method::GET]);
|
|
|
+ let router = GenericRouter::with_custom_object::<ContextObjectConstructor<_, _>>()
|
|
|
+ .insert("/plaintext", get(fn_service(plain_text)))
|
|
|
+ .insert("/json", get(fn_service(json)))
|
|
|
+ .insert("/db", get(fn_service(db)))
|
|
|
+ .insert("/fortunes", get(fn_service(fortunes)))
|
|
|
+ .insert("/queries", get(fn_service(queries)))
|
|
|
+ .insert("/updates", get(fn_service(updates)))
|
|
|
+ .enclosed_fn(middleware_fn);
|
|
|
+
|
|
|
+ let service = ContextBuilder::new(|| async {
|
|
|
+ let client = db::create(db_url).await;
|
|
|
+ Ok::<_, Infallible>(State::new(client))
|
|
|
+ })
|
|
|
+ .service(router);
|
|
|
+
|
|
|
+ let tcp_config = TcpConfig::new().set_nodelay(true);
|
|
|
|
|
|
- HttpServiceBuilder::h1(route).config(config)
|
|
|
+ HttpServiceBuilder::h1(service)
|
|
|
+ .config(config)
|
|
|
+ .enclosed(tcp_config)
|
|
|
};
|
|
|
|
|
|
- Builder::new()
|
|
|
- .on_worker_start(move || {
|
|
|
- if let Some(core) = cores.lock().unwrap().pop() {
|
|
|
- core_affinity::set_for_current(core);
|
|
|
- }
|
|
|
- ready(())
|
|
|
- })
|
|
|
- .bind("xitca-web", "0.0.0.0:8080", factory)?
|
|
|
- .build()
|
|
|
- .await
|
|
|
-}
|
|
|
+ let task = async {
|
|
|
+ xitca_server::Builder::new()
|
|
|
+ .on_worker_start(move || {
|
|
|
+ if let Some(core) = cores.lock().unwrap().pop() {
|
|
|
+ core_affinity::set_for_current(core);
|
|
|
+ }
|
|
|
+ async {}
|
|
|
+ })
|
|
|
+ .bind("xitca-web", "0.0.0.0:8080", builder)?
|
|
|
+ .build()
|
|
|
+ .await
|
|
|
+ .map_err(Into::into)
|
|
|
+ };
|
|
|
|
|
|
-#[derive(Clone)]
|
|
|
-struct Http {
|
|
|
- config: &'static str,
|
|
|
+ tokio::runtime::Builder::new_current_thread()
|
|
|
+ .enable_all()
|
|
|
+ .build()?
|
|
|
+ .block_on(task)
|
|
|
}
|
|
|
|
|
|
-struct HttpService {
|
|
|
- state: AppState<Client>,
|
|
|
+async fn middleware_fn<S, E>(service: &S, req: Ctx<'_>) -> Result<Response, Infallible>
|
|
|
+where
|
|
|
+ S: for<'r, 'c> Service<Ctx<'c>, Response = Response, Error = E>,
|
|
|
+ E: Debug,
|
|
|
+{
|
|
|
+ let mut res = service.call(req).await.unwrap();
|
|
|
+ res.headers_mut().append(SERVER, SERVER_HEADER_VALUE);
|
|
|
+ Ok(res)
|
|
|
}
|
|
|
|
|
|
-#[xitca_http_codegen::service_impl]
|
|
|
-impl HttpService {
|
|
|
- async fn new_service(http: &Http, _: ()) -> Result<Self, ()> {
|
|
|
- let client = db::create(http.config).await;
|
|
|
-
|
|
|
- Ok(HttpService {
|
|
|
- state: AppState::new(client),
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
- async fn ready(&self) -> Result<(), Infallible> {
|
|
|
- Ok(())
|
|
|
- }
|
|
|
-
|
|
|
- async fn call(&self, req: Request) -> Result<Response, Infallible> {
|
|
|
- match req.uri().path() {
|
|
|
- "/plaintext" => self.plain_text(req),
|
|
|
- "/json" => self.json(req),
|
|
|
- "/db" => self.db(req).await,
|
|
|
- "/fortunes" => self.fortunes(req).await,
|
|
|
- "/queries" => self.queries(req).await,
|
|
|
- "/updates" => self.updates(req).await,
|
|
|
- _ => not_found(),
|
|
|
- }
|
|
|
- }
|
|
|
+async fn plain_text(ctx: Ctx<'_>) -> Result<Response, Box<dyn Error>> {
|
|
|
+ let (req, _) = ctx.into_parts();
|
|
|
+ let mut res = req.into_response("Hello, World!");
|
|
|
+ res.headers_mut().append(CONTENT_TYPE, TEXT);
|
|
|
+ Ok(res)
|
|
|
}
|
|
|
|
|
|
-impl HttpService {
|
|
|
- fn plain_text(&self, req: Request) -> Result<Response, Infallible> {
|
|
|
- let mut res = req.into_response("Hello, World!");
|
|
|
-
|
|
|
- res.headers_mut().append(SERVER, SERVER_HEADER_VALUE);
|
|
|
- res.headers_mut().append(CONTENT_TYPE, TEXT_HEADER_VALUE);
|
|
|
-
|
|
|
- Ok(res)
|
|
|
- }
|
|
|
+async fn json(ctx: Ctx<'_>) -> Result<Response, Box<dyn Error>> {
|
|
|
+ let (req, state) = ctx.into_parts();
|
|
|
+ _json(req, state, &Message::new())
|
|
|
+}
|
|
|
|
|
|
- #[inline]
|
|
|
- fn json(&self, req: Request) -> Result<Response, Infallible> {
|
|
|
- self._json(req, &Message::new())
|
|
|
- }
|
|
|
+async fn db(ctx: Ctx<'_>) -> Result<Response, Box<dyn Error>> {
|
|
|
+ let (req, state) = ctx.into_parts();
|
|
|
+ let world = state.client().get_world().await?;
|
|
|
+ _json(req, state, &world)
|
|
|
+}
|
|
|
|
|
|
- async fn db(&self, req: Request) -> Result<Response, Infallible> {
|
|
|
- match self.state.client().get_world().await {
|
|
|
- Ok(ref world) => self._json(req, world),
|
|
|
- Err(_) => internal(),
|
|
|
- }
|
|
|
- }
|
|
|
+async fn fortunes(ctx: Ctx<'_>) -> Result<Response, Box<dyn Error>> {
|
|
|
+ let (req, state) = ctx.into_parts();
|
|
|
+ use sailfish::TemplateOnce;
|
|
|
+ let fortunes = state.client().tell_fortune().await?.render_once()?;
|
|
|
+ let mut res = req.into_response(fortunes);
|
|
|
+ res.headers_mut().append(CONTENT_TYPE, TEXT_HTML_UTF8);
|
|
|
+ Ok(res)
|
|
|
+}
|
|
|
|
|
|
- async fn fortunes(&self, req: Request) -> Result<Response, Infallible> {
|
|
|
- match self._fortunes().await {
|
|
|
- Ok(body) => {
|
|
|
- let mut res = req.into_response(body);
|
|
|
+async fn queries(ctx: Ctx<'_>) -> Result<Response, Box<dyn Error>> {
|
|
|
+ let (req, state) = ctx.into_parts();
|
|
|
+ let num = req.uri().query().parse_query();
|
|
|
+ let worlds = state.client().get_worlds(num).await?;
|
|
|
+ _json(req, state, worlds.as_slice())
|
|
|
+}
|
|
|
|
|
|
- res.headers_mut().append(SERVER, util::SERVER_HEADER_VALUE);
|
|
|
- res.headers_mut()
|
|
|
- .append(CONTENT_TYPE, util::HTML_HEADER_VALUE);
|
|
|
+async fn updates(ctx: Ctx<'_>) -> Result<Response, Box<dyn Error>> {
|
|
|
+ let (req, state) = ctx.into_parts();
|
|
|
+ let num = req.uri().query().parse_query();
|
|
|
+ let worlds = state.client().update(num).await?;
|
|
|
+ _json(req, state, worlds.as_slice())
|
|
|
+}
|
|
|
|
|
|
- Ok(res)
|
|
|
- }
|
|
|
- Err(_) => internal(),
|
|
|
- }
|
|
|
- }
|
|
|
+fn _json<S>(req: Request, state: &State, value: &S) -> Result<Response, Box<dyn Error>>
|
|
|
+where
|
|
|
+ S: ?Sized + serde::Serialize,
|
|
|
+{
|
|
|
+ let mut buf = state.write_buf();
|
|
|
+ simd_json::to_writer(BufMutWriter(&mut *buf), value).unwrap();
|
|
|
+ let body = buf.split().freeze();
|
|
|
+ let mut res = req.into_response(body);
|
|
|
+ res.headers_mut().append(CONTENT_TYPE, JSON);
|
|
|
+ Ok(res)
|
|
|
+}
|
|
|
|
|
|
- async fn queries(&self, req: Request) -> Result<Response, Infallible> {
|
|
|
- let num = req.uri().query().parse_query();
|
|
|
- match self.state.client().get_worlds(num).await {
|
|
|
- Ok(worlds) => self._json(req, worlds.as_slice()),
|
|
|
- Err(_) => internal(),
|
|
|
- }
|
|
|
- }
|
|
|
+struct AppState<C> {
|
|
|
+ client: C,
|
|
|
+ // a re-usable buffer for write response data.
|
|
|
+ write_buf: RefCell<BytesMut>,
|
|
|
+}
|
|
|
|
|
|
- async fn updates(&self, req: Request) -> Result<Response, Infallible> {
|
|
|
- let num = req.uri().query().parse_query();
|
|
|
- match self.state.client().update(num).await {
|
|
|
- Ok(worlds) => self._json(req, worlds.as_slice()),
|
|
|
- Err(_) => internal(),
|
|
|
- }
|
|
|
+impl<C> AppState<C> {
|
|
|
+ fn new(client: C) -> Self {
|
|
|
+ let write_buf = RefCell::new(BytesMut::new());
|
|
|
+ Self { client, write_buf }
|
|
|
}
|
|
|
|
|
|
- async fn _fortunes(&self) -> Result<Bytes, Box<dyn Error>> {
|
|
|
- use sailfish::TemplateOnce;
|
|
|
- let fortunes = self.state.client().tell_fortune().await?.render_once()?;
|
|
|
- Ok(fortunes.into())
|
|
|
+ fn write_buf(&self) -> RefMut<'_, BytesMut> {
|
|
|
+ self.write_buf.borrow_mut()
|
|
|
}
|
|
|
|
|
|
- fn _json<S>(&self, req: Request, value: &S) -> Result<Response, Infallible>
|
|
|
- where
|
|
|
- S: ?Sized + Serialize,
|
|
|
- {
|
|
|
- let mut writer = self.state.writer();
|
|
|
- simd_json::to_writer(&mut writer, value).unwrap();
|
|
|
- let body = writer.take();
|
|
|
-
|
|
|
- let mut res = req.into_response(body);
|
|
|
- res.headers_mut().append(SERVER, SERVER_HEADER_VALUE);
|
|
|
- res.headers_mut().append(CONTENT_TYPE, JSON_HEADER_VALUE);
|
|
|
-
|
|
|
- Ok(res)
|
|
|
+ fn client(&self) -> &C {
|
|
|
+ &self.client
|
|
|
}
|
|
|
}
|