lance_io/
object_reader.rs1use std::ops::Range;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use bytes::Bytes;
9use deepsize::DeepSizeOf;
10use futures::future::BoxFuture;
11use lance_core::Result;
12use object_store::{path::Path, GetOptions, GetResult, ObjectStore, Result as OSResult};
13use tokio::sync::OnceCell;
14use tracing::instrument;
15
16use crate::{object_store::DEFAULT_CLOUD_IO_PARALLELISM, traits::Reader};
17
18#[derive(Debug)]
22pub struct CloudObjectReader {
23 pub object_store: Arc<dyn ObjectStore>,
25 pub path: Path,
27 size: OnceCell<usize>,
29
30 block_size: usize,
31 download_retry_count: usize,
32}
33
34impl DeepSizeOf for CloudObjectReader {
35 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
36 self.path.as_ref().deep_size_of_children(context)
38 }
39}
40
41impl CloudObjectReader {
42 pub fn new(
44 object_store: Arc<dyn ObjectStore>,
45 path: Path,
46 block_size: usize,
47 known_size: Option<usize>,
48 download_retry_count: usize,
49 ) -> Result<Self> {
50 Ok(Self {
51 object_store,
52 path,
53 size: OnceCell::new_with(known_size),
54 block_size,
55 download_retry_count,
56 })
57 }
58
59 async fn do_with_retry<'a, O>(
63 &self,
64 f: impl Fn() -> BoxFuture<'a, OSResult<O>>,
65 ) -> OSResult<O> {
66 let mut retries = 3;
67 loop {
68 match f().await {
69 Ok(val) => return Ok(val),
70 Err(err) => {
71 if retries == 0 {
72 return Err(err);
73 }
74 retries -= 1;
75 }
76 }
77 }
78 }
79
80 async fn do_get_with_outer_retry<'a>(
87 &self,
88 f: impl Fn() -> BoxFuture<'a, OSResult<GetResult>> + Copy,
89 desc: impl Fn() -> String,
90 ) -> OSResult<Bytes> {
91 let mut retries = self.download_retry_count;
92 loop {
93 let get_result = self.do_with_retry(f).await?;
94 match get_result.bytes().await {
95 Ok(bytes) => return Ok(bytes),
96 Err(err) => {
97 if retries == 0 {
98 log::warn!("Failed to download {} from {} after {} attempts. This may indicate that cloud storage is overloaded or your timeout settings are too restrictive. Error details: {:?}", desc(), self.path, self.download_retry_count, err);
99 return Err(err);
100 }
101 log::debug!(
102 "Retrying {} from {} (remaining retries: {}). Error details: {:?}",
103 desc(),
104 self.path,
105 retries,
106 err
107 );
108 retries -= 1;
109 }
110 }
111 }
112 }
113}
114
115#[async_trait]
116impl Reader for CloudObjectReader {
117 fn path(&self) -> &Path {
118 &self.path
119 }
120
121 fn block_size(&self) -> usize {
122 self.block_size
123 }
124
125 fn io_parallelism(&self) -> usize {
126 DEFAULT_CLOUD_IO_PARALLELISM
127 }
128
129 async fn size(&self) -> object_store::Result<usize> {
131 self.size
132 .get_or_try_init(|| async move {
133 let meta = self
134 .do_with_retry(|| self.object_store.head(&self.path))
135 .await?;
136 Ok(meta.size)
137 })
138 .await
139 .cloned()
140 }
141
142 #[instrument(level = "debug", skip(self))]
143 async fn get_range(&self, range: Range<usize>) -> OSResult<Bytes> {
144 self.do_get_with_outer_retry(
145 || {
146 let options = GetOptions {
147 range: Some(range.clone().into()),
148 ..Default::default()
149 };
150 self.object_store.get_opts(&self.path, options)
151 },
152 || format!("range {:?}", range),
153 )
154 .await
155 }
156
157 #[instrument(level = "debug", skip_all)]
158 async fn get_all(&self) -> OSResult<Bytes> {
159 self.do_get_with_outer_retry(
160 || {
161 self.object_store
162 .get_opts(&self.path, GetOptions::default())
163 },
164 || "read_all".to_string(),
165 )
166 .await
167 }
168}