deltalake_core/storage/
retry_ext.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
//! Retry extension for [`ObjectStore`]

use object_store::{path::Path, Error, ObjectStore, PutPayload, PutResult, Result};
use tracing::log::*;

/// Retry extension for [`ObjectStore`]
///
/// Read-only operations are retried by [`ObjectStore`] internally. However, PUT/DELETE operations
/// are not retried even thought they are technically idempotent. [`ObjectStore`] does not retry
/// those operations because having preconditions may produce different results for the same
/// request. PUT/DELETE operations without preconditions are idempotent and can be retried.
/// Unfortunately, [`ObjectStore`]'s retry mechanism only works on HTTP request level, thus there
/// is no way to distinguish whether a request has preconditions or not.
///
/// This trait provides additional methods for working with [`ObjectStore`] that automatically retry
/// unconditional operations when they fail.
///
/// See also:
/// - https://github.com/apache/arrow-rs/pull/5278
#[async_trait::async_trait]
pub trait ObjectStoreRetryExt: ObjectStore {
    /// Save the provided bytes to the specified location
    ///
    /// The operation is guaranteed to be atomic, it will either successfully write the entirety of
    /// bytes to location, or fail. No clients should be able to observe a partially written object
    ///
    /// Note that `put_with_opts` may have precondition semantics, and thus may not be retriable.
    async fn put_with_retries(
        &self,
        location: &Path,
        bytes: PutPayload,
        max_retries: usize,
    ) -> Result<PutResult> {
        let mut attempt_number = 1;
        while attempt_number <= max_retries {
            match self.put(location, bytes.clone()).await {
                Ok(result) => return Ok(result),
                Err(err) if attempt_number == max_retries => {
                    return Err(err);
                }
                Err(Error::Generic { store, source }) => {
                    debug!(
                        "put_with_retries attempt {} failed: {} {}",
                        attempt_number, store, source
                    );
                    attempt_number += 1;
                }
                Err(err) => {
                    return Err(err);
                }
            }
        }
        unreachable!("loop yields Ok or Err in body when attempt_number = max_retries")
    }

    /// Delete the object at the specified location
    async fn delete_with_retries(&self, location: &Path, max_retries: usize) -> Result<()> {
        let mut attempt_number = 1;
        while attempt_number <= max_retries {
            match self.delete(location).await {
                Ok(()) | Err(Error::NotFound { .. }) => return Ok(()),
                Err(err) if attempt_number == max_retries => {
                    return Err(err);
                }
                Err(Error::Generic { store, source }) => {
                    debug!(
                        "delete_with_retries attempt {} failed: {} {}",
                        attempt_number, store, source
                    );
                    attempt_number += 1;
                }
                Err(err) => {
                    return Err(err);
                }
            }
        }
        unreachable!("loop yields Ok or Err in body when attempt_number = max_retries")
    }
}

impl<T: ObjectStore + ?Sized> ObjectStoreRetryExt for T {}