这是indexloc提供的服务,不要输入任何密码
Skip to content

Consider add timeout_at and with_cancel method in tokio_util:: FutureExt trait #7466

@bitcapybara

Description

@bitcapybara

Is your feature request related to a problem? Please describe.
Hello! In most cases where I'm currently using this, I need to simultaneously use tokio::time::timeout/timeout_at and CancellationToken::run_until_cancelled method to listen for both timeout and exit signals. The current implementation approach is cancellationtoken.run_until_cancelled(myfut.timeout(duration)), where timeout method comes from tokio_util::FutureExt. If a method like with_cancel could be added to FutureExt, it would enable method chaining and make the code much more concise.

use std::{future::ready, time::Duration};

use tokio_util::{sync::CancellationToken, time::FutureExt};

#[tokio::main]
async fn main() {
    let cancel = CancellationToken::new();

    match cancel
        .run_until_cancelled(ready(1).timeout(Duration::from_millis(100)))
        .await
    {
        Some(Ok(res)) => {
            println!("{res}")
        }
        Some(Err(_)) => {
            println!("elapsed")
        }
        None => {
            println!("cancelled")
        }
    }
}

Describe the solution you'd like
after add with_cancel method

use std::{
    future::ready,
    pin::Pin,
    task::{Context, Poll},
    time::Duration,
};

use pin_project_lite::pin_project;
use tokio::time::{Instant, Timeout};
use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};

trait FutureExt: Future {
    fn timeout(self, timeout: Duration) -> Timeout<Self>
    where
        Self: Sized,
    {
        tokio::time::timeout(timeout, self)
    }

    // ********** new method **********
    fn timeout_at(self, timeout_at: Instant) -> Timeout<Self>
    where
        Self: Sized,
    {
        tokio::time::timeout_at(timeout_at, self)
    }

    // ********** new method **********
    fn with_cancel(self, cancel: &CancellationToken) -> RunUntilCancelledFuture<'_, Self>
    where
        Self: Sized,
    {
        RunUntilCancelledFuture {
            cancellation: cancel.cancelled(),
            future: self,
        }
    }
}

impl<T: Future + ?Sized> FutureExt for T {}

// ******* This RunUntilCancelledFuture is copy from run_until_cancelled method *********
pin_project! {
    #[must_use = "futures do nothing unless polled"]
    struct RunUntilCancelledFuture<'a, F: Future> {
        #[pin]
        cancellation: WaitForCancellationFuture<'a>,
        #[pin]
        future: F,
    }
}

impl<'a, F: Future> Future for RunUntilCancelledFuture<'a, F> {
    type Output = Option<F::Output>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();
        if let Poll::Ready(res) = this.future.poll(cx) {
            Poll::Ready(Some(res))
        } else if this.cancellation.poll(cx).is_ready() {
            Poll::Ready(None)
        } else {
            Poll::Pending
        }
    }
}

#[tokio::main]
async fn main() {
    let cancel = CancellationToken::new();
    match ready(1)
        .with_cancel(&cancel)
        .timeout(Duration::from_millis(100))
        .await        // <-------- here we use fluent call chains --------
    {
        Ok(Some(res)) => {
            println!("{res}")
        }
        Ok(None) => {
            println!("elapsed")
        }
        Err(_) => {
            println!("cancelled")
        }
    }
}

Moreover, the timeout_at method is also very useful in loops that require timeout limits.

#[tokio::main]
async fn main() {
    let cancel = CancellationToken::new();
    let timeout_at = Instant::now() + Duration::from_secs(10);

    let mut entries = Vec::new();
    loop {
        // ***** In scenarios where messages are continuously read from file/network ******
        match reader
            .read_entries()
            .timeout_at(timeout_at)
            .with_cancel(&cancel)
            .await
        {
            Some(Ok(res)) => {
                entries.extend(res);
            }
            Some(Err(_)) | None => break,
        }
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    A-tokio-utilArea: The tokio-util crateC-feature-requestCategory: A feature request.M-timeModule: tokio/time

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions