diff --git a/src/client/client.rs b/src/client/client.rs index d213b99c8c..fd61699f84 100644 --- a/src/client/client.rs +++ b/src/client/client.rs @@ -162,7 +162,7 @@ where Version::HTTP_10 => { if is_http_connect { warn!("CONNECT is not allowed for HTTP/1.0"); - return ResponseFuture::new(Box::new(future::err( + return ResponseFuture::new(Box::pin(future::err( crate::Error::new_user_unsupported_request_method(), ))); } @@ -179,35 +179,33 @@ where let pool_key = match extract_domain(req.uri_mut(), is_http_connect) { Ok(s) => s, Err(err) => { - return ResponseFuture::new(Box::new(future::err(err))); + return ResponseFuture::new(Box::pin(future::err(err))); } }; - ResponseFuture::new(Box::new(self.retryably_send_request(req, pool_key))) + ResponseFuture::new(Box::pin(self.clone().retryably_send_request(req, pool_key))) } - fn retryably_send_request( - &self, - req: Request, + async fn retryably_send_request( + self, + mut req: Request, pool_key: PoolKey, - ) -> impl Future>> { - let client = self.clone(); + ) -> crate::Result> { let uri = req.uri().clone(); - let mut send_fut = client.send_request(req, pool_key.clone()); - future::poll_fn(move |cx| loop { - match ready!(Pin::new(&mut send_fut).poll(cx)) { - Ok(resp) => return Poll::Ready(Ok(resp)), - Err(ClientError::Normal(err)) => return Poll::Ready(Err(err)), + loop { + req = match self.send_request(req, pool_key.clone()).await { + Ok(resp) => return Ok(resp), + Err(ClientError::Normal(err)) => return Err(err), Err(ClientError::Canceled { connection_reused, mut req, reason, }) => { - if !client.config.retry_canceled_requests || !connection_reused { + if !self.config.retry_canceled_requests || !connection_reused { // if client disabled, don't retry // a fresh connection means we definitely can't retry - return Poll::Ready(Err(reason)); + return Err(reason); } trace!( @@ -215,115 +213,112 @@ where reason ); *req.uri_mut() = uri.clone(); - send_fut = client.send_request(req, pool_key.clone()); + req } } - }) + } } - fn send_request( + async fn send_request( &self, mut req: Request, pool_key: PoolKey, - ) -> impl Future, ClientError>> + Unpin { - let conn = self.connection_for(pool_key); + ) -> Result, ClientError> { + let mut pooled = self.connection_for(pool_key).await?; + + if pooled.is_http1() { + if self.config.set_host { + let uri = req.uri().clone(); + req.headers_mut().entry(HOST).or_insert_with(|| { + let hostname = uri.host().expect("authority implies host"); + if let Some(port) = uri.port() { + let s = format!("{}:{}", hostname, port); + HeaderValue::from_str(&s) + } else { + HeaderValue::from_str(hostname) + } + .expect("uri host is valid header value") + }); + } - let set_host = self.config.set_host; - let executor = self.conn_builder.exec.clone(); - conn.and_then(move |mut pooled| { - if pooled.is_http1() { - if set_host { - let uri = req.uri().clone(); - req.headers_mut().entry(HOST).or_insert_with(|| { - let hostname = uri.host().expect("authority implies host"); - if let Some(port) = uri.port() { - let s = format!("{}:{}", hostname, port); - HeaderValue::from_str(&s) - } else { - HeaderValue::from_str(hostname) - } - .expect("uri host is valid header value") - }); - } + // CONNECT always sends authority-form, so check it first... + if req.method() == Method::CONNECT { + authority_form(req.uri_mut()); + } else if pooled.conn_info.is_proxied { + absolute_form(req.uri_mut()); + } else { + origin_form(req.uri_mut()); + }; + } else if req.method() == Method::CONNECT { + debug!("client does not support CONNECT requests over HTTP2"); + return Err(ClientError::Normal( + crate::Error::new_user_unsupported_request_method(), + )); + } - // CONNECT always sends authority-form, so check it first... - if req.method() == Method::CONNECT { - authority_form(req.uri_mut()); - } else if pooled.conn_info.is_proxied { - absolute_form(req.uri_mut()); - } else { - origin_form(req.uri_mut()); - }; - } else if req.method() == Method::CONNECT { - debug!("client does not support CONNECT requests over HTTP2"); - return Either::Left(future::err(ClientError::Normal( - crate::Error::new_user_unsupported_request_method(), - ))); + let fut = pooled + .send_request_retryable(req) + .map_err(ClientError::map_with_reused(pooled.is_reused())); + + // If the Connector included 'extra' info, add to Response... + let extra_info = pooled.conn_info.extra.clone(); + let fut = fut.map_ok(move |mut res| { + if let Some(extra) = extra_info { + extra.set(res.extensions_mut()); } + res + }); - let fut = pooled - .send_request_retryable(req) - .map_err(ClientError::map_with_reused(pooled.is_reused())); + // As of futures@0.1.21, there is a race condition in the mpsc + // channel, such that sending when the receiver is closing can + // result in the message being stuck inside the queue. It won't + // ever notify until the Sender side is dropped. + // + // To counteract this, we must check if our senders 'want' channel + // has been closed after having tried to send. If so, error out... + if pooled.is_closed() { + return fut.await; + } - // If the Connector included 'extra' info, add to Response... - let extra_info = pooled.conn_info.extra.clone(); - let fut = fut.map_ok(move |mut res| { - if let Some(extra) = extra_info { - extra.set(res.extensions_mut()); - } - res + let mut res = fut.await?; + + // If pooled is HTTP/2, we can toss this reference immediately. + // + // when pooled is dropped, it will try to insert back into the + // pool. To delay that, spawn a future that completes once the + // sender is ready again. + // + // This *should* only be once the related `Connection` has polled + // for a new request to start. + // + // It won't be ready if there is a body to stream. + if pooled.is_http2() || !pooled.is_pool_enabled() || pooled.is_ready() { + drop(pooled); + } else if !res.body().is_end_stream() { + let (delayed_tx, delayed_rx) = oneshot::channel(); + res.body_mut().delayed_eof(delayed_rx); + let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(move |_| { + // At this point, `pooled` is dropped, and had a chance + // to insert into the pool (if conn was idle) + drop(delayed_tx); }); - // As of futures@0.1.21, there is a race condition in the mpsc - // channel, such that sending when the receiver is closing can - // result in the message being stuck inside the queue. It won't - // ever notify until the Sender side is dropped. - // - // To counteract this, we must check if our senders 'want' channel - // has been closed after having tried to send. If so, error out... - if pooled.is_closed() { - return Either::Right(Either::Left(fut)); - } + self.conn_builder.exec.execute(on_idle); + } else { + // There's no body to delay, but the connection isn't + // ready yet. Only re-insert when it's ready + let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(|_| ()); - Either::Right(Either::Right(fut.map_ok(move |mut res| { - // If pooled is HTTP/2, we can toss this reference immediately. - // - // when pooled is dropped, it will try to insert back into the - // pool. To delay that, spawn a future that completes once the - // sender is ready again. - // - // This *should* only be once the related `Connection` has polled - // for a new request to start. - // - // It won't be ready if there is a body to stream. - if pooled.is_http2() || !pooled.is_pool_enabled() || pooled.is_ready() { - drop(pooled); - } else if !res.body().is_end_stream() { - let (delayed_tx, delayed_rx) = oneshot::channel(); - res.body_mut().delayed_eof(delayed_rx); - let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(move |_| { - // At this point, `pooled` is dropped, and had a chance - // to insert into the pool (if conn was idle) - drop(delayed_tx); - }); - - executor.execute(on_idle); - } else { - // There's no body to delay, but the connection isn't - // ready yet. Only re-insert when it's ready - let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(|_| ()); + self.conn_builder.exec.execute(on_idle); + } - executor.execute(on_idle); - } - res - }))) - }) + Ok(res) } - fn connection_for( + async fn connection_for( &self, pool_key: PoolKey, - ) -> impl Future>, ClientError>> { + ) -> Result>, ClientError> { // This actually races 2 different futures to try to get a ready // connection the fastest, and to reduce connection churn. // @@ -340,9 +335,9 @@ where let checkout = self.pool.checkout(pool_key.clone()); let connect = self.connect_to(pool_key); - let executor = self.conn_builder.exec.clone(); // The order of the `select` is depended on below... - future::select(checkout, connect).then(move |either| match either { + + match future::select(checkout, connect).await { // Checkout won, connect future may have been started or not. // // If it has, let it finish and insert back into the pool, @@ -366,12 +361,12 @@ where }); // An execute error here isn't important, we're just trying // to prevent a waste of a socket... - executor.execute(bg); + self.conn_builder.exec.execute(bg); } - Either::Left(future::ok(checked_out)) + Ok(checked_out) } // Connect won, checkout can just be dropped. - Either::Right((Ok(connected), _checkout)) => Either::Left(future::ok(connected)), + Either::Right((Ok(connected), _checkout)) => Ok(connected), // Either checkout or connect could get canceled: // // 1. Connect is canceled if this is HTTP/2 and there is @@ -380,21 +375,21 @@ where // idle connection reliably. // // In both cases, we should just wait for the other future. - Either::Left((Err(err), connecting)) => Either::Right(Either::Left({ + Either::Left((Err(err), connecting)) => { if err.is_canceled() { - Either::Left(connecting.map_err(ClientError::Normal)) + connecting.await.map_err(ClientError::Normal) } else { - Either::Right(future::err(ClientError::Normal(err))) + Err(ClientError::Normal(err)) } - })), - Either::Right((Err(err), checkout)) => Either::Right(Either::Right({ + } + Either::Right((Err(err), checkout)) => { if err.is_canceled() { - Either::Left(checkout.map_err(ClientError::Normal)) + checkout.await.map_err(ClientError::Normal) } else { - Either::Right(future::err(ClientError::Normal(err))) + Err(ClientError::Normal(err)) } - })), - }) + } + } } fn connect_to( @@ -459,44 +454,40 @@ where conn_builder.http2_only(is_h2); } - Either::Left(Box::pin( - conn_builder - .handshake(io) - .and_then(move |(tx, conn)| { - trace!( - "handshake complete, spawning background dispatcher task" - ); - executor.execute( - conn.map_err(|e| debug!("client connection error: {}", e)) - .map(|_| ()), - ); - - // Wait for 'conn' to ready up before we - // declare this tx as usable - tx.when_ready() - }) - .map_ok(move |tx| { - let tx = { - #[cfg(feature = "http2")] - { - if is_h2 { - PoolTx::Http2(tx.into_http2()) - } else { - PoolTx::Http1(tx) - } - } - #[cfg(not(feature = "http2"))] + Either::Left(Box::pin(async move { + let (tx, conn) = conn_builder.handshake(io).await?; + + trace!("handshake complete, spawning background dispatcher task"); + executor.execute( + conn.map_err(|e| debug!("client connection error: {}", e)) + .map(|_| ()), + ); + + // Wait for 'conn' to ready up before we + // declare this tx as usable + let tx = tx.when_ready().await?; + + let tx = { + #[cfg(feature = "http2")] + { + if is_h2 { + PoolTx::Http2(tx.into_http2()) + } else { PoolTx::Http1(tx) - }; - pool.pooled( - connecting, - PoolClient { - conn_info: connected, - tx, - }, - ) - }), - )) + } + } + #[cfg(not(feature = "http2"))] + PoolTx::Http1(tx) + }; + + Ok(pool.pooled( + connecting, + PoolClient { + conn_info: connected, + tx, + }, + )) + })) }), ) }) @@ -563,13 +554,13 @@ impl fmt::Debug for Client { // ===== impl ResponseFuture ===== impl ResponseFuture { - fn new(fut: Box>> + Send>) -> Self { - Self { inner: fut.into() } + fn new(fut: Pin>> + Send>>) -> Self { + Self { inner: fut } } fn error_version(ver: Version) -> Self { warn!("Request has unsupported version \"{:?}\"", ver); - ResponseFuture::new(Box::new(future::err( + ResponseFuture::new(Box::pin(future::err( crate::Error::new_user_unsupported_version(), ))) } diff --git a/src/client/conn.rs b/src/client/conn.rs index 2799c61eff..2da083db16 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -192,12 +192,13 @@ impl SendRequest { self.dispatch.poll_ready(cx) } - pub(super) fn when_ready(self) -> impl Future> { + pub(super) async fn when_ready(self) -> crate::Result { let mut me = Some(self); future::poll_fn(move |cx| { ready!(me.as_mut().unwrap().poll_ready(cx))?; Poll::Ready(Ok(me.take().unwrap())) }) + .await } pub(super) fn is_ready(&self) -> bool { diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index b42a65352f..484cb04f4b 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -4,9 +4,9 @@ use std::future::Future; use futures_util::FutureExt; use tokio::sync::{mpsc, oneshot}; -use crate::common::{task, Poll}; #[cfg(feature = "http2")] use crate::common::Pin; +use crate::common::{task, Poll}; pub(crate) type RetryPromise = oneshot::Receiver)>>; pub(crate) type Promise = oneshot::Receiver>; @@ -230,10 +230,10 @@ impl Callback { } #[cfg(feature = "http2")] - pub(crate) fn send_when( + pub(crate) async fn send_when( self, mut when: impl Future)>> + Unpin, - ) -> impl Future { + ) { use futures_util::future; let mut cb = Some(self); @@ -257,6 +257,7 @@ impl Callback { } } }) + .await } } diff --git a/src/common/lazy.rs b/src/common/lazy.rs index 4d2e322c2c..6bf87c4355 100644 --- a/src/common/lazy.rs +++ b/src/common/lazy.rs @@ -1,4 +1,4 @@ -use std::mem; +use pin_project::pin_project; use super::{task, Future, Pin, Poll}; @@ -18,20 +18,23 @@ where // FIXME: allow() required due to `impl Trait` leaking types to this lint #[allow(missing_debug_implementations)] +#[pin_project] pub(crate) struct Lazy { + #[pin] inner: Inner, } +#[pin_project(project = InnerProj, project_replace = InnerProjReplace)] enum Inner { Init(F), - Fut(R), + Fut(#[pin] R), Empty, } impl Started for Lazy where F: FnOnce() -> R, - R: Future + Unpin, + R: Future, { fn started(&self) -> bool { match self.inner { @@ -44,26 +47,26 @@ where impl Future for Lazy where F: FnOnce() -> R, - R: Future + Unpin, + R: Future, { type Output = R::Output; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - if let Inner::Fut(ref mut f) = self.inner { - return Pin::new(f).poll(cx); + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + let mut this = self.project(); + + if let InnerProj::Fut(f) = this.inner.as_mut().project() { + return f.poll(cx); } - match mem::replace(&mut self.inner, Inner::Empty) { - Inner::Init(func) => { - let mut fut = func(); - let ret = Pin::new(&mut fut).poll(cx); - self.inner = Inner::Fut(fut); - ret + match this.inner.as_mut().project_replace(Inner::Empty) { + InnerProjReplace::Init(func) => { + this.inner.set(Inner::Fut(func())); + if let InnerProj::Fut(f) = this.inner.project() { + return f.poll(cx); + } + unreachable!() } _ => unreachable!("lazy state wrong"), } } } - -// The closure `F` is never pinned -impl Unpin for Lazy {}