-
-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Open
Labels
A-tokio-utilArea: The tokio-util crateArea: The tokio-util crateC-feature-requestCategory: A feature request.Category: A feature request.M-timeModule: tokio/timeModule: tokio/time
Description
Is your feature request related to a problem? Please describe.
Hello! In most cases where I'm currently using this, I need to simultaneously use tokio::time::timeout/timeout_at
and CancellationToken::run_until_cancelled
method to listen for both timeout and exit signals. The current implementation approach is cancellationtoken.run_until_cancelled(myfut.timeout(duration))
, where timeout
method comes from tokio_util::FutureExt
. If a method like with_cancel
could be added to FutureExt
, it would enable method chaining and make the code much more concise.
use std::{future::ready, time::Duration};
use tokio_util::{sync::CancellationToken, time::FutureExt};
#[tokio::main]
async fn main() {
let cancel = CancellationToken::new();
match cancel
.run_until_cancelled(ready(1).timeout(Duration::from_millis(100)))
.await
{
Some(Ok(res)) => {
println!("{res}")
}
Some(Err(_)) => {
println!("elapsed")
}
None => {
println!("cancelled")
}
}
}
Describe the solution you'd like
after add with_cancel
method
use std::{
future::ready,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use pin_project_lite::pin_project;
use tokio::time::{Instant, Timeout};
use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};
trait FutureExt: Future {
fn timeout(self, timeout: Duration) -> Timeout<Self>
where
Self: Sized,
{
tokio::time::timeout(timeout, self)
}
// ********** new method **********
fn timeout_at(self, timeout_at: Instant) -> Timeout<Self>
where
Self: Sized,
{
tokio::time::timeout_at(timeout_at, self)
}
// ********** new method **********
fn with_cancel(self, cancel: &CancellationToken) -> RunUntilCancelledFuture<'_, Self>
where
Self: Sized,
{
RunUntilCancelledFuture {
cancellation: cancel.cancelled(),
future: self,
}
}
}
impl<T: Future + ?Sized> FutureExt for T {}
// ******* This RunUntilCancelledFuture is copy from run_until_cancelled method *********
pin_project! {
#[must_use = "futures do nothing unless polled"]
struct RunUntilCancelledFuture<'a, F: Future> {
#[pin]
cancellation: WaitForCancellationFuture<'a>,
#[pin]
future: F,
}
}
impl<'a, F: Future> Future for RunUntilCancelledFuture<'a, F> {
type Output = Option<F::Output>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Poll::Ready(res) = this.future.poll(cx) {
Poll::Ready(Some(res))
} else if this.cancellation.poll(cx).is_ready() {
Poll::Ready(None)
} else {
Poll::Pending
}
}
}
#[tokio::main]
async fn main() {
let cancel = CancellationToken::new();
match ready(1)
.with_cancel(&cancel)
.timeout(Duration::from_millis(100))
.await // <-------- here we use fluent call chains --------
{
Ok(Some(res)) => {
println!("{res}")
}
Ok(None) => {
println!("elapsed")
}
Err(_) => {
println!("cancelled")
}
}
}
Moreover, the timeout_at
method is also very useful in loops that require timeout limits.
#[tokio::main]
async fn main() {
let cancel = CancellationToken::new();
let timeout_at = Instant::now() + Duration::from_secs(10);
let mut entries = Vec::new();
loop {
// ***** In scenarios where messages are continuously read from file/network ******
match reader
.read_entries()
.timeout_at(timeout_at)
.with_cancel(&cancel)
.await
{
Some(Ok(res)) => {
entries.extend(res);
}
Some(Err(_)) | None => break,
}
}
}
Metadata
Metadata
Assignees
Labels
A-tokio-utilArea: The tokio-util crateArea: The tokio-util crateC-feature-requestCategory: A feature request.Category: A feature request.M-timeModule: tokio/timeModule: tokio/time