//! An extension trait for Futures that provides a variety of convenient adapters. mod with_cancellation_token; use with_cancellation_token::{WithCancellationTokenFuture, WithCancellationTokenFutureOwned}; use std::future::Future; use crate::sync::CancellationToken; /// A trait which contains a variety of convenient adapters and utilities for `Future`s. pub trait FutureExt: Future { cfg_time! { /// A wrapper around [`tokio::time::timeout`], with the advantage that it is easier to write /// fluent call chains. /// /// # Examples /// /// ```rust /// use tokio::{sync::oneshot, time::Duration}; /// use tokio_util::future::FutureExt; /// /// # async fn dox() { /// let (_tx, rx) = oneshot::channel::<()>(); /// /// let res = rx.timeout(Duration::from_millis(10)).await; /// assert!(res.is_err()); /// # } /// ``` #[track_caller] fn timeout(self, timeout: std::time::Duration) -> tokio::time::Timeout where Self: Sized, { tokio::time::timeout(timeout, self) } /// A wrapper around [`tokio::time::timeout_at`], with the advantage that it is easier to write /// fluent call chains. /// /// # Examples /// /// ```rust /// use tokio::{sync::oneshot, time::{Duration, Instant}}; /// use tokio_util::future::FutureExt; /// /// # async fn dox() { /// let (_tx, rx) = oneshot::channel::<()>(); /// let deadline = Instant::now() + Duration::from_millis(10); /// /// let res = rx.timeout_at(deadline).await; /// assert!(res.is_err()); /// # } /// ``` fn timeout_at(self, deadline: tokio::time::Instant) -> tokio::time::Timeout where Self: Sized, { tokio::time::timeout_at(deadline, self) } } /// Similar to [`CancellationToken::run_until_cancelled`], /// but with the advantage that it is easier to write fluent call chains. /// /// # Fairness /// /// Calling this on an already-cancelled token directly returns `None`. /// For all subsequent polls, in case of concurrent completion and /// cancellation, this is biased towards the `self` future completion. /// /// # Examples /// /// ```rust /// use tokio::sync::oneshot; /// use tokio_util::future::FutureExt; /// use tokio_util::sync::CancellationToken; /// /// # async fn dox() { /// let (_tx, rx) = oneshot::channel::<()>(); /// let token = CancellationToken::new(); /// let token_clone = token.clone(); /// tokio::spawn(async move { /// tokio::time::sleep(std::time::Duration::from_millis(10)).await; /// token.cancel(); /// }); /// assert!(rx.with_cancellation_token(&token_clone).await.is_none()) /// # } /// ``` fn with_cancellation_token( self, cancellation_token: &CancellationToken, ) -> WithCancellationTokenFuture<'_, Self> where Self: Sized, { WithCancellationTokenFuture::new(cancellation_token, self) } /// Similar to [`CancellationToken::run_until_cancelled_owned`], /// but with the advantage that it is easier to write fluent call chains. /// /// # Fairness /// /// Calling this on an already-cancelled token directly returns `None`. /// For all subsequent polls, in case of concurrent completion and /// cancellation, this is biased towards the `self` future completion. /// /// # Examples /// /// ```rust /// use tokio::sync::oneshot; /// use tokio_util::future::FutureExt; /// use tokio_util::sync::CancellationToken; /// /// # async fn dox() { /// let (_tx, rx) = oneshot::channel::<()>(); /// let token = CancellationToken::new(); /// let token_clone = token.clone(); /// tokio::spawn(async move { /// tokio::time::sleep(std::time::Duration::from_millis(10)).await; /// token.cancel(); /// }); /// assert!(rx.with_cancellation_token_owned(token_clone).await.is_none()) /// # } /// ``` fn with_cancellation_token_owned( self, cancellation_token: CancellationToken, ) -> WithCancellationTokenFutureOwned where Self: Sized, { WithCancellationTokenFutureOwned::new(cancellation_token, self) } } impl FutureExt for T {}