lance_io/
object_reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::ops::Range;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use bytes::Bytes;
9use deepsize::DeepSizeOf;
10use futures::future::BoxFuture;
11use lance_core::Result;
12use object_store::{path::Path, GetOptions, GetResult, ObjectStore, Result as OSResult};
13use tokio::sync::OnceCell;
14use tracing::instrument;
15
16use crate::{object_store::DEFAULT_CLOUD_IO_PARALLELISM, traits::Reader};
17
18/// Object Reader
19///
20/// Object Store + Base Path
21#[derive(Debug)]
22pub struct CloudObjectReader {
23    // Object Store.
24    pub object_store: Arc<dyn ObjectStore>,
25    // File path
26    pub path: Path,
27    // File size, if known.
28    size: OnceCell<usize>,
29
30    block_size: usize,
31    download_retry_count: usize,
32}
33
34impl DeepSizeOf for CloudObjectReader {
35    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
36        // Skipping object_store because there is no easy way to do that and it shouldn't be too big
37        self.path.as_ref().deep_size_of_children(context)
38    }
39}
40
41impl CloudObjectReader {
42    /// Create an ObjectReader from URI
43    pub fn new(
44        object_store: Arc<dyn ObjectStore>,
45        path: Path,
46        block_size: usize,
47        known_size: Option<usize>,
48        download_retry_count: usize,
49    ) -> Result<Self> {
50        Ok(Self {
51            object_store,
52            path,
53            size: OnceCell::new_with(known_size),
54            block_size,
55            download_retry_count,
56        })
57    }
58
59    // Retries for the initial request are handled by object store, but
60    // there are no retries for failures that occur during the streaming
61    // of the response body. Thus we add an outer retry loop here.
62    async fn do_with_retry<'a, O>(
63        &self,
64        f: impl Fn() -> BoxFuture<'a, OSResult<O>>,
65    ) -> OSResult<O> {
66        let mut retries = 3;
67        loop {
68            match f().await {
69                Ok(val) => return Ok(val),
70                Err(err) => {
71                    if retries == 0 {
72                        return Err(err);
73                    }
74                    retries -= 1;
75                }
76            }
77        }
78    }
79
80    // We have a separate retry loop here.  This is because object_store does not
81    // attempt retries on downloads that fail during streaming of the response body.
82    //
83    // However, this failure is pretty common (e.g. timeout) and we want to retry in these
84    // situations.  In addition, we provide additional logging information in these
85    // failures cases.
86    async fn do_get_with_outer_retry<'a>(
87        &self,
88        f: impl Fn() -> BoxFuture<'a, OSResult<GetResult>> + Copy,
89        desc: impl Fn() -> String,
90    ) -> OSResult<Bytes> {
91        let mut retries = self.download_retry_count;
92        loop {
93            let get_result = self.do_with_retry(f).await?;
94            match get_result.bytes().await {
95                Ok(bytes) => return Ok(bytes),
96                Err(err) => {
97                    if retries == 0 {
98                        log::warn!("Failed to download {} from {} after {} attempts.  This may indicate that cloud storage is overloaded or your timeout settings are too restrictive.  Error details: {:?}", desc(), self.path, self.download_retry_count, err);
99                        return Err(err);
100                    }
101                    log::debug!(
102                        "Retrying {} from {} (remaining retries: {}).  Error details: {:?}",
103                        desc(),
104                        self.path,
105                        retries,
106                        err
107                    );
108                    retries -= 1;
109                }
110            }
111        }
112    }
113}
114
115#[async_trait]
116impl Reader for CloudObjectReader {
117    fn path(&self) -> &Path {
118        &self.path
119    }
120
121    fn block_size(&self) -> usize {
122        self.block_size
123    }
124
125    fn io_parallelism(&self) -> usize {
126        DEFAULT_CLOUD_IO_PARALLELISM
127    }
128
129    /// Object/File Size.
130    async fn size(&self) -> object_store::Result<usize> {
131        self.size
132            .get_or_try_init(|| async move {
133                let meta = self
134                    .do_with_retry(|| self.object_store.head(&self.path))
135                    .await?;
136                Ok(meta.size)
137            })
138            .await
139            .cloned()
140    }
141
142    #[instrument(level = "debug", skip(self))]
143    async fn get_range(&self, range: Range<usize>) -> OSResult<Bytes> {
144        self.do_get_with_outer_retry(
145            || {
146                let options = GetOptions {
147                    range: Some(range.clone().into()),
148                    ..Default::default()
149                };
150                self.object_store.get_opts(&self.path, options)
151            },
152            || format!("range {:?}", range),
153        )
154        .await
155    }
156
157    #[instrument(level = "debug", skip_all)]
158    async fn get_all(&self) -> OSResult<Bytes> {
159        self.do_get_with_outer_retry(
160            || {
161                self.object_store
162                    .get_opts(&self.path, GetOptions::default())
163            },
164            || "read_all".to_string(),
165        )
166        .await
167    }
168}