From 7390f026d79df930cafb6d38845a38d3d6a09c1d Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Fri, 5 Feb 2021 15:37:21 -0800 Subject: [PATCH] Revert "refactor(lib): Switch from pin-project to pin-project-lite" This reverts commit 43412a950f2052e7865eb596c1d39067b2002a94. --- Cargo.toml | 2 +- src/client/conn.rs | 49 ++++---- src/client/connect/http.rs | 27 +++-- src/client/pool.rs | 25 ++--- src/common/drain.rs | 19 ++-- src/proto/h1/dispatch.rs | 11 +- src/proto/h2/mod.rs | 21 ++-- src/proto/h2/server.rs | 72 +++++------- src/server/accept.rs | 14 +-- src/server/conn.rs | 222 ++++++++++++++++--------------------- src/server/server.rs | 23 ++-- src/server/shutdown.rs | 43 ++++--- src/server/tcp.rs | 15 ++- src/service/oneshot.rs | 48 ++++---- 14 files changed, 255 insertions(+), 336 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 11357ca3c8..9617ff7c23 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,7 +34,7 @@ httparse = "1.0" h2 = { version = "0.3", optional = true } itoa = "0.4.1" tracing = { version = "0.1", default-features = false, features = ["std"] } -pin-project-lite = "0.2.4" +pin-project = "1.0" tower-service = "0.3" tokio = { version = "1", features = ["sync"] } want = "0.3" diff --git a/src/client/conn.rs b/src/client/conn.rs index 62cde0c068..2799c61eff 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -56,7 +56,7 @@ use std::time::Duration; use bytes::Bytes; use futures_util::future::{self, Either, FutureExt as _}; -use pin_project_lite::pin_project; +use pin_project::pin_project; use tokio::io::{AsyncRead, AsyncWrite}; use tower_service::Service; @@ -75,23 +75,15 @@ use crate::{Body, Request, Response}; #[cfg(feature = "http1")] type Http1Dispatcher = proto::dispatch::Dispatcher, B, T, R>; -pin_project! { - #[project = ProtoClientProj] - enum ProtoClient - where - B: HttpBody, - { - #[cfg(feature = "http1")] - H1 { - #[pin] - h1: Http1Dispatcher, - }, - #[cfg(feature = "http2")] - H2 { - #[pin] - h2: proto::h2::ClientTask, _phantom: PhantomData, - }, - } +#[pin_project(project = ProtoClientProj)] +enum ProtoClient +where + B: HttpBody, +{ + #[cfg(feature = "http1")] + H1(#[pin] Http1Dispatcher), + #[cfg(feature = "http2")] + H2(#[pin] proto::h2::ClientTask, PhantomData), } /// Returns a handshake future over some IO. @@ -408,7 +400,7 @@ where pub fn into_parts(self) -> Parts { match self.inner.expect("already upgraded") { #[cfg(feature = "http1")] - ProtoClient::H1 { h1 } => { + ProtoClient::H1(h1) => { let (io, read_buf, _) = h1.into_inner(); Parts { io, @@ -417,7 +409,7 @@ where } } #[cfg(feature = "http2")] - ProtoClient::H2 { .. } => { + ProtoClient::H2(..) => { panic!("http2 cannot into_inner"); } } @@ -437,9 +429,9 @@ where pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll> { match *self.inner.as_mut().expect("already upgraded") { #[cfg(feature = "http1")] - ProtoClient::H1 { ref mut h1 } => h1.poll_without_shutdown(cx), + ProtoClient::H1(ref mut h1) => h1.poll_without_shutdown(cx), #[cfg(feature = "http2")] - ProtoClient::H2 { ref mut h2, .. } => Pin::new(h2).poll(cx).map_ok(|_| ()), + ProtoClient::H2(ref mut h2, _) => Pin::new(h2).poll(cx).map_ok(|_| ()), } } @@ -468,7 +460,7 @@ where proto::Dispatched::Shutdown => Poll::Ready(Ok(())), #[cfg(feature = "http1")] proto::Dispatched::Upgrade(pending) => match self.inner.take() { - Some(ProtoClient::H1 { h1 }) => { + Some(ProtoClient::H1(h1)) => { let (io, buf, _) = h1.into_inner(); pending.fulfill(Upgraded::new(io, buf)); Poll::Ready(Ok(())) @@ -715,17 +707,14 @@ impl Builder { } let cd = proto::h1::dispatch::Client::new(rx); let dispatch = proto::h1::Dispatcher::new(cd, conn); - ProtoClient::H1 { h1: dispatch } + ProtoClient::H1(dispatch) } #[cfg(feature = "http2")] Proto::Http2 => { let h2 = proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec.clone()) .await?; - ProtoClient::H2 { - h2, - _phantom: PhantomData, - } + ProtoClient::H2(h2, PhantomData) } }; @@ -779,9 +768,9 @@ where fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { match self.project() { #[cfg(feature = "http1")] - ProtoClientProj::H1 { h1 } => h1.poll(cx), + ProtoClientProj::H1(c) => c.poll(cx), #[cfg(feature = "http2")] - ProtoClientProj::H2 { h2, .. } => h2.poll(cx), + ProtoClientProj::H2(c, _) => c.poll(cx), } } } diff --git a/src/client/connect/http.rs b/src/client/connect/http.rs index 17339f4179..734aea188a 100644 --- a/src/client/connect/http.rs +++ b/src/client/connect/http.rs @@ -11,7 +11,7 @@ use std::time::Duration; use futures_util::future::Either; use http::uri::{Scheme, Uri}; -use pin_project_lite::pin_project; +use pin_project::pin_project; use tokio::net::{TcpSocket, TcpStream}; use tokio::time::Sleep; @@ -373,19 +373,18 @@ impl HttpInfo { } } -pin_project! { - // Not publicly exported (so missing_docs doesn't trigger). - // - // We return this `Future` instead of the `Pin>` directly - // so that users don't rely on it fitting in a `Pin>` slot - // (and thus we can change the type in the future). - #[must_use = "futures do nothing unless polled"] - #[allow(missing_debug_implementations)] - pub struct HttpConnecting { - #[pin] - fut: BoxConnecting, - _marker: PhantomData, - } +// Not publicly exported (so missing_docs doesn't trigger). +// +// We return this `Future` instead of the `Pin>` directly +// so that users don't rely on it fitting in a `Pin>` slot +// (and thus we can change the type in the future). +#[must_use = "futures do nothing unless polled"] +#[pin_project] +#[allow(missing_debug_implementations)] +pub struct HttpConnecting { + #[pin] + fut: BoxConnecting, + _marker: PhantomData, } type ConnectResult = Result; diff --git a/src/client/pool.rs b/src/client/pool.rs index 94f73f6afd..0f22657bd4 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -11,7 +11,7 @@ use futures_channel::oneshot; use tokio::time::{Duration, Instant, Interval}; use super::client::Ver; -use crate::common::{exec::Exec, task, Future, Pin, Poll, Unpin}; +use crate::common::{task, exec::Exec, Future, Pin, Poll, Unpin}; // FIXME: allow() required due to `impl Trait` leaking types to this lint #[allow(missing_debug_implementations)] @@ -714,17 +714,16 @@ impl Expiration { } #[cfg(feature = "runtime")] -pin_project_lite::pin_project! { - struct IdleTask { - #[pin] - interval: Interval, - pool: WeakOpt>>, - // This allows the IdleTask to be notified as soon as the entire - // Pool is fully dropped, and shutdown. This channel is never sent on, - // but Err(Canceled) will be received when the Pool is dropped. - #[pin] - pool_drop_notifier: oneshot::Receiver, - } +#[pin_project::pin_project] +struct IdleTask { + #[pin] + interval: Interval, + pool: WeakOpt>>, + // This allows the IdleTask to be notified as soon as the entire + // Pool is fully dropped, and shutdown. This channel is never sent on, + // but Err(Canceled) will be received when the Pool is dropped. + #[pin] + pool_drop_notifier: oneshot::Receiver, } #[cfg(feature = "runtime")] @@ -777,7 +776,7 @@ mod tests { use std::time::Duration; use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt}; - use crate::common::{exec::Exec, task, Future, Pin}; + use crate::common::{task, exec::Exec, Future, Pin}; /// Test unique reservations. #[derive(Debug, PartialEq, Eq)] diff --git a/src/common/drain.rs b/src/common/drain.rs index 174da876df..4bb2ecc118 100644 --- a/src/common/drain.rs +++ b/src/common/drain.rs @@ -1,6 +1,6 @@ use std::mem; -use pin_project_lite::pin_project; +use pin_project::pin_project; use tokio::sync::watch; use super::{task, Future, Pin, Poll}; @@ -21,15 +21,14 @@ pub(crate) struct Watch { rx: watch::Receiver<()>, } -pin_project! { - #[allow(missing_debug_implementations)] - pub struct Watching { - #[pin] - future: F, - state: State, - watch: Pin + Send + Sync>>, - _rx: watch::Receiver<()>, - } +#[allow(missing_debug_implementations)] +#[pin_project] +pub struct Watching { + #[pin] + future: F, + state: State, + watch: Pin + Send + Sync>>, + _rx: watch::Receiver<()>, } enum State { diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 1a72450b15..88e641e9a4 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -44,13 +44,10 @@ cfg_server! { } cfg_client! { - pin_project_lite::pin_project! { - pub(crate) struct Client { - callback: Option, http::Response>>, - #[pin] - rx: ClientRx, - rx_closed: bool, - } + pub(crate) struct Client { + callback: Option, http::Response>>, + rx: ClientRx, + rx_closed: bool, } type ClientRx = crate::client::dispatch::Receiver, http::Response>; diff --git a/src/proto/h2/mod.rs b/src/proto/h2/mod.rs index cf78e3f18c..cf06592903 100644 --- a/src/proto/h2/mod.rs +++ b/src/proto/h2/mod.rs @@ -5,7 +5,7 @@ use http::header::{ TRANSFER_ENCODING, UPGRADE, }; use http::HeaderMap; -use pin_project_lite::pin_project; +use pin_project::pin_project; use std::error::Error as StdError; use std::io::IoSlice; @@ -94,16 +94,15 @@ fn decode_content_length(headers: &HeaderMap) -> DecodedLength { // body adapters used by both Client and Server -pin_project! { - struct PipeToSendStream - where - S: HttpBody, - { - body_tx: SendStream>, - data_done: bool, - #[pin] - stream: S, - } +#[pin_project] +struct PipeToSendStream +where + S: HttpBody, +{ + body_tx: SendStream>, + data_done: bool, + #[pin] + stream: S, } impl PipeToSendStream diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index 167dd90dbb..eea52e3e4b 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -5,7 +5,7 @@ use std::time::Duration; use h2::server::{Connection, Handshake, SendResponse}; use h2::Reason; -use pin_project_lite::pin_project; +use pin_project::pin_project; use tokio::io::{AsyncRead, AsyncWrite}; use super::{decode_content_length, ping, PipeToSendStream, SendBuf}; @@ -57,16 +57,15 @@ impl Default for Config { } } -pin_project! { - pub(crate) struct Server - where - S: HttpService, - B: HttpBody, - { - exec: E, - service: S, - state: State, - } +#[pin_project] +pub(crate) struct Server +where + S: HttpService, + B: HttpBody, +{ + exec: E, + service: S, + state: State, } enum State @@ -316,33 +315,24 @@ where } } -pin_project! { - #[allow(missing_debug_implementations)] - pub struct H2Stream - where - B: HttpBody, - { - reply: SendResponse>, - #[pin] - state: H2StreamState, - } +#[allow(missing_debug_implementations)] +#[pin_project] +pub struct H2Stream +where + B: HttpBody, +{ + reply: SendResponse>, + #[pin] + state: H2StreamState, } -pin_project! { - #[project = H2StreamStateProj] - enum H2StreamState - where - B: HttpBody, - { - Service { - #[pin] - fut: F, - }, - Body { - #[pin] - pipe: PipeToSendStream, - }, - } +#[pin_project(project = H2StreamStateProj)] +enum H2StreamState +where + B: HttpBody, +{ + Service(#[pin] F), + Body(#[pin] PipeToSendStream), } impl H2Stream @@ -352,7 +342,7 @@ where fn new(fut: F, respond: SendResponse>) -> H2Stream { H2Stream { reply: respond, - state: H2StreamState::Service { fut }, + state: H2StreamState::Service(fut), } } } @@ -381,7 +371,7 @@ where let mut me = self.project(); loop { let next = match me.state.as_mut().project() { - H2StreamStateProj::Service { fut: h } => { + H2StreamStateProj::Service(h) => { let res = match h.poll(cx) { Poll::Ready(Ok(r)) => r, Poll::Pending => { @@ -419,15 +409,13 @@ where if !body.is_end_stream() { let body_tx = reply!(me, res, false); - H2StreamState::Body { - pipe: PipeToSendStream::new(body, body_tx), - } + H2StreamState::Body(PipeToSendStream::new(body, body_tx)) } else { reply!(me, res, true); return Poll::Ready(Ok(())); } } - H2StreamStateProj::Body { pipe } => { + H2StreamStateProj::Body(pipe) => { return pipe.poll(cx); } }; diff --git a/src/server/accept.rs b/src/server/accept.rs index 4b7a1487dd..4ec287129d 100644 --- a/src/server/accept.rs +++ b/src/server/accept.rs @@ -9,7 +9,7 @@ #[cfg(feature = "stream")] use futures_core::Stream; #[cfg(feature = "stream")] -use pin_project_lite::pin_project; +use pin_project::pin_project; use crate::common::{ task::{self, Poll}, @@ -86,12 +86,8 @@ pub fn from_stream(stream: S) -> impl Accept where S: Stream>, { - pin_project! { - struct FromStream { - #[pin] - stream: S, - } - } + #[pin_project] + struct FromStream(#[pin] S); impl Accept for FromStream where @@ -103,9 +99,9 @@ where self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll>> { - self.project().stream.poll_next(cx) + self.project().0.poll_next(cx) } } - FromStream { stream } + FromStream(stream) } diff --git a/src/server/conn.rs b/src/server/conn.rs index 112bbe535d..5137708fcb 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -45,6 +45,7 @@ use std::error::Error as StdError; use std::fmt; +#[cfg(feature = "http1")] use std::marker::PhantomData; #[cfg(feature = "tcp")] use std::net::SocketAddr; @@ -52,7 +53,7 @@ use std::net::SocketAddr; use std::time::Duration; use bytes::Bytes; -use pin_project_lite::pin_project; +use pin_project::pin_project; use tokio::io::{AsyncRead, AsyncWrite}; use super::accept::Accept; @@ -108,85 +109,77 @@ enum ConnectionMode { Fallback, } -pin_project! { - /// A stream mapping incoming IOs to new services. - /// - /// Yields `Connecting`s that are futures that should be put on a reactor. - #[must_use = "streams do nothing unless polled"] - #[derive(Debug)] - pub(super) struct Serve { - #[pin] - incoming: I, - make_service: S, - protocol: Http, - } +/// A stream mapping incoming IOs to new services. +/// +/// Yields `Connecting`s that are futures that should be put on a reactor. +#[must_use = "streams do nothing unless polled"] +#[pin_project] +#[derive(Debug)] +pub(super) struct Serve { + #[pin] + incoming: I, + make_service: S, + protocol: Http, } -pin_project! { - /// A future building a new `Service` to a `Connection`. - /// - /// Wraps the future returned from `MakeService` into one that returns - /// a `Connection`. - #[must_use = "futures do nothing unless polled"] - #[derive(Debug)] - pub struct Connecting { - #[pin] - future: F, - io: Option, - protocol: Http, - } +/// A future building a new `Service` to a `Connection`. +/// +/// Wraps the future returned from `MakeService` into one that returns +/// a `Connection`. +#[must_use = "futures do nothing unless polled"] +#[pin_project] +#[derive(Debug)] +pub struct Connecting { + #[pin] + future: F, + io: Option, + protocol: Http, } -pin_project! { - #[must_use = "futures do nothing unless polled"] - #[derive(Debug)] - pub(super) struct SpawnAll { - // TODO: re-add `pub(super)` once rustdoc can handle this. - // - // See https://github.com/rust-lang/rust/issues/64705 - #[pin] - pub(super) serve: Serve, - } +#[must_use = "futures do nothing unless polled"] +#[pin_project] +#[derive(Debug)] +pub(super) struct SpawnAll { + // TODO: re-add `pub(super)` once rustdoc can handle this. + // + // See https://github.com/rust-lang/rust/issues/64705 + #[pin] + pub(super) serve: Serve, } -pin_project! { - /// A future binding a connection with a Service. - /// - /// Polling this future will drive HTTP forward. - #[must_use = "futures do nothing unless polled"] - pub struct Connection - where - S: HttpService, - { - pub(super) conn: Option>, - fallback: Fallback, - } +/// A future binding a connection with a Service. +/// +/// Polling this future will drive HTTP forward. +#[must_use = "futures do nothing unless polled"] +#[pin_project] +pub struct Connection +where + S: HttpService, +{ + pub(super) conn: Option>, + #[cfg(all(feature = "http1", feature = "http2"))] + fallback: Fallback, } -pin_project! { - #[project = ProtoServerProj] - pub(super) enum ProtoServer - where - S: HttpService, - B: HttpBody, - { - #[cfg(feature = "http1")] - H1 { - #[pin] - h1: proto::h1::Dispatcher< - proto::h1::dispatch::Server, - B, - T, - proto::ServerTransaction, - >, - _phantom: PhantomData, - }, - #[cfg(feature = "http2")] - H2 { - #[pin] - h2: proto::h2::Server, S, B, E>, - }, - } +#[pin_project(project = ProtoServerProj)] +pub(super) enum ProtoServer +where + S: HttpService, + B: HttpBody, +{ + #[cfg(feature = "http1")] + H1( + #[pin] + proto::h1::Dispatcher< + proto::h1::dispatch::Server, + B, + T, + proto::ServerTransaction, + >, + PhantomData, + ), + #[cfg(feature = "http2")] + H2(#[pin] proto::h2::Server, S, B, E>), } #[cfg(all(feature = "http1", feature = "http2"))] @@ -196,10 +189,6 @@ enum Fallback { Http1Only, } -#[cfg(not(all(feature = "http1", feature = "http2")))] -#[derive(Clone, Debug)] -struct Fallback(PhantomData); - #[cfg(all(feature = "http1", feature = "http2"))] impl Fallback { fn to_h2(&self) -> bool { @@ -530,10 +519,7 @@ impl Http { conn.set_max_buf_size(max); } let sd = proto::h1::dispatch::Server::new(service); - ProtoServer::H1 { - h1: proto::h1::Dispatcher::new(sd, conn), - _phantom: PhantomData, - } + ProtoServer::H1(proto::h1::Dispatcher::new(sd, conn), PhantomData) }}; } @@ -549,7 +535,7 @@ impl Http { let rewind_io = Rewind::new(io); let h2 = proto::h2::Server::new(rewind_io, service, &self.h2_builder, self.exec.clone()); - ProtoServer::H2 { h2 } + ProtoServer::H2(h2) } }; @@ -604,14 +590,14 @@ where /// This should only be called while the `Connection` future is still /// pending. If called after `Connection::poll` has resolved, this does /// nothing. - pub fn graceful_shutdown(mut self: Pin<&mut Self>) { - match self.conn { + pub fn graceful_shutdown(self: Pin<&mut Self>) { + match self.project().conn { #[cfg(feature = "http1")] - Some(ProtoServer::H1 { ref mut h1, .. }) => { + Some(ProtoServer::H1(ref mut h1, _)) => { h1.disable_keep_alive(); } #[cfg(feature = "http2")] - Some(ProtoServer::H2 { ref mut h2 }) => { + Some(ProtoServer::H2(ref mut h2)) => { h2.graceful_shutdown(); } None => (), @@ -638,7 +624,7 @@ where pub fn try_into_parts(self) -> Option> { match self.conn.unwrap() { #[cfg(feature = "http1")] - ProtoServer::H1 { h1, .. } => { + ProtoServer::H1(h1, _) => { let (io, read_buf, dispatch) = h1.into_inner(); Some(Parts { io, @@ -648,7 +634,7 @@ where }) } #[cfg(feature = "http2")] - ProtoServer::H2 { .. } => None, + ProtoServer::H2(_h2) => None, } } @@ -672,7 +658,7 @@ where loop { match *self.conn.as_mut().unwrap() { #[cfg(feature = "http1")] - ProtoServer::H1 { ref mut h1, .. } => match ready!(h1.poll_without_shutdown(cx)) { + ProtoServer::H1(ref mut h1, _) => match ready!(h1.poll_without_shutdown(cx)) { Ok(()) => return Poll::Ready(Ok(())), Err(e) => { #[cfg(feature = "http2")] @@ -688,7 +674,7 @@ where } }, #[cfg(feature = "http2")] - ProtoServer::H2 { ref mut h2 } => return Pin::new(h2).poll(cx).map_ok(|_| ()), + ProtoServer::H2(ref mut h2) => return Pin::new(h2).poll(cx).map_ok(|_| ()), }; } } @@ -714,8 +700,8 @@ where let conn = self.conn.take(); let (io, read_buf, dispatch) = match conn.unwrap() { - ProtoServer::H1 { h1, .. } => h1.into_inner(), - ProtoServer::H2 { .. } => { + ProtoServer::H1(h1, _) => h1.into_inner(), + ProtoServer::H2(_h2) => { panic!("h2 cannot into_inner"); } }; @@ -728,7 +714,7 @@ where let h2 = proto::h2::Server::new(rewind_io, dispatch.into_service(), builder, exec.clone()); debug_assert!(self.conn.is_none()); - self.conn = Some(ProtoServer::H2 { h2 }); + self.conn = Some(ProtoServer::H2(h2)); } /// Enable this connection to support higher-level HTTP upgrades. @@ -962,9 +948,9 @@ where fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { match self.project() { #[cfg(feature = "http1")] - ProtoServerProj::H1 { h1, .. } => h1.poll(cx), + ProtoServerProj::H1(s, _) => s.poll(cx), #[cfg(feature = "http2")] - ProtoServerProj::H2 { h2 } => h2.poll(cx), + ProtoServerProj::H2(s) => s.poll(cx), } } } @@ -978,7 +964,7 @@ pub(crate) mod spawn_all { use crate::common::exec::ConnStreamExec; use crate::common::{task, Future, Pin, Poll, Unpin}; use crate::service::HttpService; - use pin_project_lite::pin_project; + use pin_project::pin_project; // Used by `SpawnAll` to optionally watch a `Connection` future. // @@ -1023,36 +1009,23 @@ pub(crate) mod spawn_all { // Users cannot import this type, nor the associated `NewSvcExec`. Instead, // a blanket implementation for `Executor` is sufficient. - pin_project! { - #[allow(missing_debug_implementations)] - pub struct NewSvcTask, E, W: Watcher> { - #[pin] - state: State, - } + #[pin_project] + #[allow(missing_debug_implementations)] + pub struct NewSvcTask, E, W: Watcher> { + #[pin] + state: State, } - pin_project! { - #[project = StateProj] - pub(super) enum State, E, W: Watcher> { - Connecting { - #[pin] - connecting: Connecting, - watcher: W, - }, - Connected { - #[pin] - future: W::Future, - }, - } + #[pin_project(project = StateProj)] + pub(super) enum State, E, W: Watcher> { + Connecting(#[pin] Connecting, W), + Connected(#[pin] W::Future), } impl, E, W: Watcher> NewSvcTask { pub(super) fn new(connecting: Connecting, watcher: W) -> Self { NewSvcTask { - state: State::Connecting { - connecting, - watcher, - }, + state: State::Connecting(connecting, watcher), } } } @@ -1079,10 +1052,7 @@ pub(crate) mod spawn_all { loop { let next = { match me.state.as_mut().project() { - StateProj::Connecting { - connecting, - watcher, - } => { + StateProj::Connecting(connecting, watcher) => { let res = ready!(connecting.poll(cx)); let conn = match res { Ok(conn) => conn, @@ -1092,10 +1062,10 @@ pub(crate) mod spawn_all { return Poll::Ready(()); } }; - let future = watcher.watch(conn.with_upgrades()); - State::Connected { future } + let connected = watcher.watch(conn.with_upgrades()); + State::Connected(connected) } - StateProj::Connected { future } => { + StateProj::Connected(future) => { return future.poll(cx).map(|res| { if let Err(err) = res { debug!("connection error: {}", err); @@ -1163,7 +1133,7 @@ mod upgrades { #[cfg(feature = "http1")] Ok(proto::Dispatched::Upgrade(pending)) => { match self.inner.conn.take() { - Some(ProtoServer::H1 { h1, .. }) => { + Some(ProtoServer::H1(h1, _)) => { let (io, buf, _) = h1.into_inner(); pending.fulfill(Upgraded::new(io, buf)); return Poll::Ready(Ok(())); diff --git a/src/server/server.rs b/src/server/server.rs index 07d9e5fbb0..48cc6e2803 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -6,7 +6,7 @@ use std::net::{SocketAddr, TcpListener as StdTcpListener}; #[cfg(feature = "tcp")] use std::time::Duration; -use pin_project_lite::pin_project; +use pin_project::pin_project; use tokio::io::{AsyncRead, AsyncWrite}; use super::accept::Accept; @@ -21,17 +21,16 @@ use super::shutdown::{Graceful, GracefulWatcher}; #[cfg(feature = "tcp")] use super::tcp::AddrIncoming; -pin_project! { - /// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default. - /// - /// `Server` is a `Future` mapping a bound listener with a set of service - /// handlers. It is built using the [`Builder`](Builder), and the future - /// completes when the server has been shutdown. It should be run by an - /// `Executor`. - pub struct Server { - #[pin] - spawn_all: SpawnAll, - } +/// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default. +/// +/// `Server` is a `Future` mapping a bound listener with a set of service +/// handlers. It is built using the [`Builder`](Builder), and the future +/// completes when the server has been shutdown. It should be run by an +/// `Executor`. +#[pin_project] +pub struct Server { + #[pin] + spawn_all: SpawnAll, } /// A builder for a [`Server`](Server). diff --git a/src/server/shutdown.rs b/src/server/shutdown.rs index 122853ac17..e54ba42104 100644 --- a/src/server/shutdown.rs +++ b/src/server/shutdown.rs @@ -1,36 +1,33 @@ use std::error::Error as StdError; -use pin_project_lite::pin_project; +use pin_project::pin_project; use tokio::io::{AsyncRead, AsyncWrite}; -use super::accept::Accept; use super::conn::{SpawnAll, UpgradeableConnection, Watcher}; +use super::accept::Accept; use crate::body::{Body, HttpBody}; use crate::common::drain::{self, Draining, Signal, Watch, Watching}; use crate::common::exec::{ConnStreamExec, NewSvcExec}; use crate::common::{task, Future, Pin, Poll, Unpin}; use crate::service::{HttpService, MakeServiceRef}; -pin_project! { - #[allow(missing_debug_implementations)] - pub struct Graceful { - #[pin] - state: State, - } +#[allow(missing_debug_implementations)] +#[pin_project] +pub struct Graceful { + #[pin] + state: State, } -pin_project! { - #[project = StateProj] - pub(super) enum State { - Running { - drain: Option<(Signal, Watch)>, - #[pin] - spawn_all: SpawnAll, - #[pin] - signal: F, - }, - Draining { draining: Draining }, - } +#[pin_project(project = StateProj)] +pub(super) enum State { + Running { + drain: Option<(Signal, Watch)>, + #[pin] + spawn_all: SpawnAll, + #[pin] + signal: F, + }, + Draining(Draining), } impl Graceful { @@ -74,16 +71,14 @@ where Poll::Ready(()) => { debug!("signal received, starting graceful shutdown"); let sig = drain.take().expect("drain channel").0; - State::Draining { - draining: sig.drain(), - } + State::Draining(sig.drain()) } Poll::Pending => { let watch = drain.as_ref().expect("drain channel").1.clone(); return spawn_all.poll_watch(cx, &GracefulWatcher(watch)); } }, - StateProj::Draining { ref mut draining } => { + StateProj::Draining(ref mut draining) => { return Pin::new(draining).poll(cx).map(Ok); } } diff --git a/src/server/tcp.rs b/src/server/tcp.rs index 46c570decd..91afc40120 100644 --- a/src/server/tcp.rs +++ b/src/server/tcp.rs @@ -229,14 +229,13 @@ mod addr_stream { use crate::common::{task, Pin, Poll}; - pin_project_lite::pin_project! { - /// A transport returned yieled by `AddrIncoming`. - #[derive(Debug)] - pub struct AddrStream { - #[pin] - inner: TcpStream, - pub(super) remote_addr: SocketAddr, - } + /// A transport returned yieled by `AddrIncoming`. + #[pin_project::pin_project] + #[derive(Debug)] + pub struct AddrStream { + #[pin] + inner: TcpStream, + pub(super) remote_addr: SocketAddr, } impl AddrStream { diff --git a/src/service/oneshot.rs b/src/service/oneshot.rs index 2697af8f4c..766d0c4689 100644 --- a/src/service/oneshot.rs +++ b/src/service/oneshot.rs @@ -1,6 +1,6 @@ // TODO: Eventually to be replaced with tower_util::Oneshot. -use pin_project_lite::pin_project; +use pin_project::pin_project; use tower_service::Service; use crate::common::{task, Future, Pin, Poll}; @@ -10,35 +10,25 @@ where S: Service, { Oneshot { - state: State::NotReady { svc, req }, + state: State::NotReady(svc, req), } } -pin_project! { - // A `Future` consuming a `Service` and request, waiting until the `Service` - // is ready, and then calling `Service::call` with the request, and - // waiting for that `Future`. - #[allow(missing_debug_implementations)] - pub struct Oneshot, Req> { - #[pin] - state: State, - } +// A `Future` consuming a `Service` and request, waiting until the `Service` +// is ready, and then calling `Service::call` with the request, and +// waiting for that `Future`. +#[allow(missing_debug_implementations)] +#[pin_project] +pub struct Oneshot, Req> { + #[pin] + state: State, } -pin_project! { - #[project = StateProj] - #[project_replace = StateProjOwn] - enum State, Req> { - NotReady { - svc: S, - req: Req, - }, - Called { - #[pin] - fut: S::Future, - }, - Tmp, - } +#[pin_project(project = StateProj, project_replace = StateProjOwn)] +enum State, Req> { + NotReady(S, Req), + Called(#[pin] S::Future), + Tmp, } impl Future for Oneshot @@ -52,19 +42,19 @@ where loop { match me.state.as_mut().project() { - StateProj::NotReady { ref mut svc, .. } => { + StateProj::NotReady(ref mut svc, _) => { ready!(svc.poll_ready(cx))?; // fallthrough out of the match's borrow } - StateProj::Called { fut } => { + StateProj::Called(fut) => { return fut.poll(cx); } StateProj::Tmp => unreachable!(), } match me.state.as_mut().project_replace(State::Tmp) { - StateProjOwn::NotReady { mut svc, req } => { - me.state.set(State::Called { fut: svc.call(req) }); + StateProjOwn::NotReady(mut svc, req) => { + me.state.set(State::Called(svc.call(req))); } _ => unreachable!(), }