diff --git a/core/core/src/docs/upgrade.md b/core/core/src/docs/upgrade.md index 2a8312347c87..d0b72aec61d1 100644 --- a/core/core/src/docs/upgrade.md +++ b/core/core/src/docs/upgrade.md @@ -10,6 +10,10 @@ The long-deprecated `http_client` customization hooks on service builders have b `TimeoutLayer::with_speed` has been removed after a deprecation cycle. Use `with_io_timeout` to enforce per-IO deadlines instead. +### `RetryInterceptor::intercept` takes `RetryEvent` + +`RetryInterceptor::intercept` now receives a single `RetryEvent<'_>` argument instead of `(&Error, Duration)`. The event carries the operation being retried and a 1-based retry attempt counter, and is `#[non_exhaustive]` so future fields can be added without another break. + ## Raw API ### Test helpers moved to `opendal_testkit` diff --git a/core/layers/retry/src/lib.rs b/core/layers/retry/src/lib.rs index edca6fb294e4..ba049b58b3b8 100644 --- a/core/layers/retry/src/lib.rs +++ b/core/layers/retry/src/lib.rs @@ -96,14 +96,15 @@ use opendal_core::*; /// # use opendal_core::Error; /// # use opendal_core::Operator; /// # use opendal_core::Result; +/// # use opendal_layer_retry::RetryEvent; /// # use opendal_layer_retry::RetryInterceptor; /// # use opendal_layer_retry::RetryLayer; /// # /// struct MyRetryInterceptor; /// /// impl RetryInterceptor for MyRetryInterceptor { -/// fn intercept(&self, err: &Error, dur: Duration) { -/// // do something +/// fn intercept(&self, event: RetryEvent<'_>) { +/// // do something with event.op, event.err, event.dur, event.attempt /// } /// } /// @@ -152,7 +153,7 @@ impl RetryLayer { /// use opendal_core::Operator; /// use opendal_layer_retry::RetryLayer; /// - /// fn notify(_err: &opendal_core::Error, _dur: std::time::Duration) {} + /// fn notify(_event: opendal_layer_retry::RetryEvent<'_>) {} /// /// let _ = Operator::new(services::Memory::default()) /// .expect("must init") @@ -220,6 +221,32 @@ impl Layer for RetryLayer { } } +/// Context passed to [`RetryInterceptor`] before each retry sleep. +#[non_exhaustive] +#[derive(Debug)] +pub struct RetryEvent<'a> { + /// The operation being retried. + pub op: Operation, + /// The error that triggered the retry. + pub err: &'a Error, + /// The duration before the next attempt. + pub dur: Duration, + /// 1-based retry attempt number. + pub attempt: u32, +} + +impl<'a> RetryEvent<'a> { + /// Build a new `RetryEvent`. + pub fn new(op: Operation, err: &'a Error, dur: Duration, attempt: u32) -> Self { + Self { + op, + err, + dur, + attempt, + } + } +} + /// RetryInterceptor is used to intercept while retry happened. pub trait RetryInterceptor: Send + Sync + 'static { /// Everytime RetryLayer is retrying, this function will be called. @@ -228,24 +255,19 @@ pub trait RetryInterceptor: Send + Sync + 'static { /// /// just before the retry sleep. /// - /// # Inputs - /// - /// - err: The error that caused the current retry. - /// - dur: The duration that will sleep before next retry. - /// /// # Notes /// /// The intercept must be quick and non-blocking. No heavy IO is /// allowed. Otherwise, the retry will be blocked. - fn intercept(&self, err: &Error, dur: Duration); + fn intercept(&self, event: RetryEvent<'_>); } impl RetryInterceptor for F where - F: Fn(&Error, Duration) + Send + Sync + 'static, + F: for<'a> Fn(RetryEvent<'a>) + Send + Sync + 'static, { - fn intercept(&self, err: &Error, dur: Duration) { - self(err, dur); + fn intercept(&self, event: RetryEvent<'_>) { + self(event); } } @@ -253,11 +275,11 @@ where pub struct DefaultRetryInterceptor; impl RetryInterceptor for DefaultRetryInterceptor { - fn intercept(&self, err: &Error, dur: Duration) { + fn intercept(&self, event: RetryEvent<'_>) { log::warn!( target: "opendal::layers::retry", - "will retry after {}s because: {}", - dur.as_secs_f64(), err + "will retry {:?} (attempt {}) after {}s because: {}", + event.op, event.attempt, event.dur.as_secs_f64(), event.err ); } } @@ -289,19 +311,29 @@ impl LayeredAccess for RetryAccessor { } async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result { + let mut attempt: u32 = 0; { || self.inner.create_dir(path, args.clone()) } .retry(self.builder) .when(|e| e.is_temporary()) - .notify(|err, dur: Duration| self.notify.intercept(err, dur)) + .notify(|err, dur| { + attempt += 1; + self.notify + .intercept(RetryEvent::new(Operation::CreateDir, err, dur, attempt)) + }) .await .map_err(|e| e.set_persistent()) } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + let mut attempt: u32 = 0; let (rp, reader) = { || self.inner.read(path, args.clone()) } .retry(self.builder) .when(|e| e.is_temporary()) - .notify(|err, dur| self.notify.intercept(err, dur)) + .notify(|err, dur| { + attempt += 1; + self.notify + .intercept(RetryEvent::new(Operation::Read, err, dur, attempt)) + }) .await .map_err(|e| e.set_persistent())?; @@ -312,57 +344,87 @@ impl LayeredAccess for RetryAccessor { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + let mut attempt: u32 = 0; { || self.inner.write(path, args.clone()) } .retry(self.builder) .when(|e| e.is_temporary()) - .notify(|err, dur| self.notify.intercept(err, dur)) + .notify(|err, dur| { + attempt += 1; + self.notify + .intercept(RetryEvent::new(Operation::Write, err, dur, attempt)) + }) .await .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder))) .map_err(|e| e.set_persistent()) } async fn stat(&self, path: &str, args: OpStat) -> Result { + let mut attempt: u32 = 0; { || self.inner.stat(path, args.clone()) } .retry(self.builder) .when(|e| e.is_temporary()) - .notify(|err, dur| self.notify.intercept(err, dur)) + .notify(|err, dur| { + attempt += 1; + self.notify + .intercept(RetryEvent::new(Operation::Stat, err, dur, attempt)) + }) .await .map_err(|e| e.set_persistent()) } async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { + let mut attempt: u32 = 0; { || self.inner.delete() } .retry(self.builder) .when(|e| e.is_temporary()) - .notify(|err, dur| self.notify.intercept(err, dur)) + .notify(|err, dur| { + attempt += 1; + self.notify + .intercept(RetryEvent::new(Operation::Delete, err, dur, attempt)) + }) .await .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder))) .map_err(|e| e.set_persistent()) } async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result { + let mut attempt: u32 = 0; { || self.inner.copy(from, to, args.clone()) } .retry(self.builder) .when(|e| e.is_temporary()) - .notify(|err, dur| self.notify.intercept(err, dur)) + .notify(|err, dur| { + attempt += 1; + self.notify + .intercept(RetryEvent::new(Operation::Copy, err, dur, attempt)) + }) .await .map_err(|e| e.set_persistent()) } async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result { + let mut attempt: u32 = 0; { || self.inner.rename(from, to, args.clone()) } .retry(self.builder) .when(|e| e.is_temporary()) - .notify(|err, dur| self.notify.intercept(err, dur)) + .notify(|err, dur| { + attempt += 1; + self.notify + .intercept(RetryEvent::new(Operation::Rename, err, dur, attempt)) + }) .await .map_err(|e| e.set_persistent()) } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { + let mut attempt: u32 = 0; { || self.inner.list(path, args.clone()) } .retry(self.builder) .when(|e| e.is_temporary()) - .notify(|err, dur| self.notify.intercept(err, dur)) + .notify(|err, dur| { + attempt += 1; + self.notify + .intercept(RetryEvent::new(Operation::List, err, dur, attempt)) + }) .await .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder))) .map_err(|e| e.set_persistent()) @@ -442,6 +504,7 @@ impl oio::Read for RetryWrapper { use backon::RetryableWithContext; let inner = self.take_inner()?; + let mut attempt: u32 = 0; let (inner, res) = { |mut r: R| async move { @@ -453,7 +516,11 @@ impl oio::Read for RetryWrapper { .retry(self.builder) .when(|e| e.is_temporary()) .context(inner) - .notify(|err, dur| self.notify.intercept(err, dur)) + .notify(|err, dur| { + attempt += 1; + self.notify + .intercept(RetryEvent::new(Operation::Read, err, dur, attempt)) + }) .await; self.inner = Some(inner); @@ -466,6 +533,7 @@ impl oio::Write for RetryWrapper { use backon::RetryableWithContext; let inner = self.take_inner()?; + let mut attempt: u32 = 0; let ((inner, _), res) = { |(mut r, bs): (R, Buffer)| async move { @@ -477,7 +545,11 @@ impl oio::Write for RetryWrapper { .retry(self.builder) .when(|e| e.is_temporary()) .context((inner, bs)) - .notify(|err, dur| self.notify.intercept(err, dur)) + .notify(|err, dur| { + attempt += 1; + self.notify + .intercept(RetryEvent::new(Operation::Write, err, dur, attempt)) + }) .await; self.inner = Some(inner); @@ -488,6 +560,7 @@ impl oio::Write for RetryWrapper { use backon::RetryableWithContext; let inner = self.take_inner()?; + let mut attempt: u32 = 0; let (inner, res) = { |mut r: R| async move { @@ -499,7 +572,11 @@ impl oio::Write for RetryWrapper { .retry(self.builder) .when(|e| e.is_temporary()) .context(inner) - .notify(|err, dur| self.notify.intercept(err, dur)) + .notify(|err, dur| { + attempt += 1; + self.notify + .intercept(RetryEvent::new(Operation::Write, err, dur, attempt)) + }) .await; self.inner = Some(inner); @@ -510,6 +587,7 @@ impl oio::Write for RetryWrapper { use backon::RetryableWithContext; let inner = self.take_inner()?; + let mut attempt: u32 = 0; let (inner, res) = { |mut r: R| async move { @@ -521,7 +599,11 @@ impl oio::Write for RetryWrapper { .retry(self.builder) .when(|e| e.is_temporary()) .context(inner) - .notify(|err, dur| self.notify.intercept(err, dur)) + .notify(|err, dur| { + attempt += 1; + self.notify + .intercept(RetryEvent::new(Operation::Write, err, dur, attempt)) + }) .await; self.inner = Some(inner); @@ -534,6 +616,7 @@ impl oio::List for RetryWrapper { use backon::RetryableWithContext; let inner = self.take_inner()?; + let mut attempt: u32 = 0; let (inner, res) = { |mut p: P| async move { @@ -545,7 +628,11 @@ impl oio::List for RetryWrapper { .retry(self.builder) .when(|e| e.is_temporary()) .context(inner) - .notify(|err, dur| self.notify.intercept(err, dur)) + .notify(|err, dur| { + attempt += 1; + self.notify + .intercept(RetryEvent::new(Operation::List, err, dur, attempt)) + }) .await; self.inner = Some(inner); @@ -560,6 +647,7 @@ impl oio::Delete for RetryWrapper { let inner = self.take_inner()?; let path = path.to_string(); let args_cloned = args.clone(); + let mut attempt: u32 = 0; let (inner, res) = { |mut p: P| { @@ -575,7 +663,9 @@ impl oio::Delete for RetryWrapper { .when(|e| e.is_temporary()) .context(inner) .notify(|err, dur| { - self.notify.intercept(err, dur); + attempt += 1; + self.notify + .intercept(RetryEvent::new(Operation::Delete, err, dur, attempt)); }) .await; @@ -587,6 +677,7 @@ impl oio::Delete for RetryWrapper { use backon::RetryableWithContext; let inner = self.take_inner()?; + let mut attempt: u32 = 0; let (inner, res) = { |mut p: P| async move { @@ -598,7 +689,11 @@ impl oio::Delete for RetryWrapper { .retry(self.builder) .when(|e| e.is_temporary()) .context(inner) - .notify(|err, dur| self.notify.intercept(err, dur)) + .notify(|err, dur| { + attempt += 1; + self.notify + .intercept(RetryEvent::new(Operation::Delete, err, dur, attempt)) + }) .await; self.inner = Some(inner); @@ -923,6 +1018,48 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_retry_event_attempt_and_op() -> Result<()> { + setup(); + + #[derive(Default, Clone)] + struct Recorder { + events: Arc>>, + } + + impl RetryInterceptor for Recorder { + fn intercept(&self, event: RetryEvent<'_>) { + self.events.lock().unwrap().push((event.op, event.attempt)); + } + } + + let recorder = Recorder::default(); + let builder = MockBuilder::default(); + let op = Operator::new(builder.clone())? + .layer( + RetryLayer::default() + .with_min_delay(Duration::from_millis(1)) + .with_max_delay(Duration::from_millis(1)) + .with_notify(recorder.clone()), + ) + .finish(); + + let r = op.reader("retryable_error").await?; + let mut content = Vec::new(); + let _ = r.read_into(&mut content, ..).await?; + + let events = recorder.events.lock().unwrap().clone(); + assert_eq!( + events, + vec![ + (Operation::Read, 1), + (Operation::Read, 2), + (Operation::Read, 1), + ], + ); + Ok(()) + } + #[tokio::test] async fn test_retry_batch() -> Result<()> { setup();