#![cfg(not(target_arch = "wasm32"))] use std::convert::Infallible; use std::future::Future; use std::net; use std::sync::Arc; use std::time::Duration; use futures_util::FutureExt; use http::{Request, Response}; use hyper::service::service_fn; use hyper::Body; use tokio::net::TcpListener; use tokio::select; use tokio::sync::oneshot; /// This server, unlike [`super::server::Server`], allows for delaying the /// specified amount of time after each TCP connection is established. This is /// useful for testing the behavior of the client when the server is slow. /// /// For example, in case of HTTP/2, once the TCP/TLS connection is established, /// both endpoints are supposed to send a preface and an initial `SETTINGS` /// frame (See [RFC9113 3.4] for details). What if these frames are delayed for /// whatever reason? This server allows for testing such scenarios. /// /// [RFC9113 3.4]: https://www.rfc-editor.org/rfc/rfc9113.html#name-http-2-connection-preface pub struct Server { addr: net::SocketAddr, shutdown_tx: Option>, server_terminated_rx: oneshot::Receiver<()>, } impl Server { pub async fn new(func: F1, apply_config: F2, delay: Duration) -> Self where F1: Fn(Request) -> Fut + Clone + Send + 'static, Fut: Future> + Send + 'static, F2: FnOnce(hyper::server::conn::Http) -> hyper::server::conn::Http + Send + 'static, { let (shutdown_tx, shutdown_rx) = oneshot::channel(); let (server_terminated_tx, server_terminated_rx) = oneshot::channel(); let tcp_listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = tcp_listener.local_addr().unwrap(); tokio::spawn(async move { let http = Arc::new(apply_config(hyper::server::conn::Http::new())); tokio::spawn(async move { let (connection_shutdown_tx, connection_shutdown_rx) = oneshot::channel(); let connection_shutdown_rx = connection_shutdown_rx.shared(); let mut shutdown_rx = std::pin::pin!(shutdown_rx); let mut handles = Vec::new(); loop { select! { _ = shutdown_rx.as_mut() => { connection_shutdown_tx.send(()).unwrap(); break; } res = tcp_listener.accept() => { let (stream, _) = res.unwrap(); let handle = tokio::spawn({ let connection_shutdown_rx = connection_shutdown_rx.clone(); let http = http.clone(); let func = func.clone(); async move { tokio::time::sleep(delay).await; let mut conn = std::pin::pin!(http.serve_connection( stream, service_fn(move |req| { let fut = func(req); async move { Ok::<_, Infallible>(fut.await) }}) )); select! { _ = conn.as_mut() => {} _ = connection_shutdown_rx => { conn.as_mut().graceful_shutdown(); conn.await.unwrap(); } } } }); handles.push(handle); } } } futures_util::future::join_all(handles).await; server_terminated_tx.send(()).unwrap(); }); }); Self { addr, shutdown_tx: Some(shutdown_tx), server_terminated_rx, } } pub async fn shutdown(mut self) { if let Some(tx) = self.shutdown_tx.take() { let _ = tx.send(()); } self.server_terminated_rx.await.unwrap(); } pub fn addr(&self) -> net::SocketAddr { self.addr } }