1use std::sync::Arc;
2use std::time::Duration;
3
4use bytes::{BufMut, Bytes, BytesMut};
5use futures::stream::BoxStream;
6use futures::{stream, Stream, StreamExt};
7use log::warn;
8
9use crate::ec::{resolve_ec_policy, EcSchema};
10use crate::hdfs::block_reader::get_block_stream;
11use crate::hdfs::block_writer::BlockWriter;
12use crate::hdfs::protocol::{LeaseTracker, NamenodeProtocol};
13use crate::proto::hdfs;
14use crate::{HdfsError, Result};
15
16const COMPLETE_RETRY_DELAY_MS: u64 = 500;
17const COMPLETE_RETRIES: u32 = 5;
18
19pub struct FileReader {
20 protocol: Arc<NamenodeProtocol>,
21 status: hdfs::HdfsFileStatusProto,
22 located_blocks: hdfs::LocatedBlocksProto,
23 ec_schema: Option<EcSchema>,
24 position: usize,
25}
26
27impl FileReader {
28 pub(crate) fn new(
29 protocol: Arc<NamenodeProtocol>,
30 status: hdfs::HdfsFileStatusProto,
31 located_blocks: hdfs::LocatedBlocksProto,
32 ec_schema: Option<EcSchema>,
33 ) -> Self {
34 Self {
35 protocol,
36 status,
37 located_blocks,
38 ec_schema,
39 position: 0,
40 }
41 }
42
43 pub fn file_length(&self) -> usize {
45 self.status.length as usize
46 }
47
48 pub fn remaining(&self) -> usize {
50 if self.position > self.file_length() {
51 0
52 } else {
53 self.file_length() - self.position
54 }
55 }
56
57 pub fn seek(&mut self, pos: usize) {
59 if pos > self.file_length() {
60 panic!("Cannot seek beyond the end of a file");
61 }
62 self.position = pos;
63 }
64
65 pub fn tell(&self) -> usize {
67 self.position
68 }
69
70 pub async fn read(&mut self, len: usize) -> Result<Bytes> {
73 if self.position >= self.file_length() {
74 Ok(Bytes::new())
75 } else {
76 let offset = self.position;
77 self.position = usize::min(self.position + len, self.file_length());
78 self.read_range(offset, self.position - offset).await
79 }
80 }
81
82 pub async fn read_buf(&mut self, buf: &mut [u8]) -> Result<usize> {
85 if self.position >= self.file_length() {
86 Ok(0)
87 } else {
88 let offset = self.position;
89 self.position = usize::min(self.position + buf.len(), self.file_length());
90 let read_bytes = self.position - offset;
91 self.read_range_buf(buf, offset).await?;
92 Ok(read_bytes)
93 }
94 }
95
96 pub async fn read_range(&self, offset: usize, len: usize) -> Result<Bytes> {
101 let mut stream = self.read_range_stream(offset, len).boxed();
102 let mut buf = BytesMut::with_capacity(len);
103 while let Some(bytes) = stream.next().await.transpose()? {
104 buf.put(bytes);
105 }
106 Ok(buf.freeze())
107 }
108
109 pub async fn read_range_buf(&self, mut buf: &mut [u8], offset: usize) -> Result<()> {
113 let mut stream = self.read_range_stream(offset, buf.len()).boxed();
114 while let Some(bytes) = stream.next().await.transpose()? {
115 buf.put(bytes);
116 }
117
118 Ok(())
119 }
120
121 pub fn read_range_stream(
125 &self,
126 offset: usize,
127 len: usize,
128 ) -> impl Stream<Item = Result<Bytes>> {
129 if offset + len > self.file_length() {
130 panic!("Cannot read past end of the file");
131 }
132
133 let block_streams: Vec<BoxStream<Result<Bytes>>> = self
134 .located_blocks
135 .blocks
136 .iter()
137 .flat_map(move |block| {
138 let block_file_start = block.offset as usize;
139 let block_file_end = block_file_start + block.b.num_bytes() as usize;
140
141 if block_file_start < (offset + len) && block_file_end > offset {
142 let block_start = offset - usize::min(offset, block_file_start);
144 let block_end = usize::min(offset + len, block_file_end) - block_file_start;
145 Some(get_block_stream(
146 Arc::clone(&self.protocol),
147 block.clone(),
148 block_start,
149 block_end - block_start,
150 self.ec_schema.clone(),
151 ))
152 } else {
153 None
155 }
156 })
157 .collect();
158
159 stream::iter(block_streams).flatten()
160 }
161}
162
163pub struct FileWriter {
164 src: String,
165 protocol: Arc<NamenodeProtocol>,
166 status: hdfs::HdfsFileStatusProto,
167 block_writer: Option<BlockWriter>,
168 last_block: Option<hdfs::LocatedBlockProto>,
169 closed: bool,
170 bytes_written: usize,
171}
172
173impl FileWriter {
174 pub(crate) fn new(
175 protocol: Arc<NamenodeProtocol>,
176 src: String,
177 status: hdfs::HdfsFileStatusProto,
178 last_block: Option<hdfs::LocatedBlockProto>,
180 ) -> Self {
181 protocol.add_file_lease(status.file_id(), status.namespace.clone());
182 Self {
183 protocol,
184 src,
185 status,
186 block_writer: None,
187 last_block,
188 closed: false,
189 bytes_written: 0,
190 }
191 }
192
193 async fn create_block_writer(&mut self) -> Result<()> {
194 let new_block = if let Some(last_block) = self.last_block.take() {
195 if last_block.b.num_bytes() < self.status.blocksize() && self.status.ec_policy.is_none()
197 {
198 last_block
200 } else {
201 self.protocol
203 .add_block(&self.src, Some(last_block.b), self.status.file_id)
204 .await?
205 .block
206 }
207 } else {
208 let extended_block = if let Some(block_writer) = self.block_writer.take() {
211 Some(block_writer.close().await?)
212 } else {
213 None
214 };
215
216 self.protocol
217 .add_block(&self.src, extended_block, self.status.file_id)
218 .await?
219 .block
220 };
221
222 let block_writer = BlockWriter::new(
223 Arc::clone(&self.protocol),
224 new_block,
225 self.status.blocksize() as usize,
226 self.protocol.get_cached_server_defaults().await?,
227 self.status
228 .ec_policy
229 .as_ref()
230 .map(resolve_ec_policy)
231 .transpose()?
232 .as_ref(),
233 )
234 .await?;
235
236 self.block_writer = Some(block_writer);
237 Ok(())
238 }
239
240 async fn get_block_writer(&mut self) -> Result<&mut BlockWriter> {
241 if self.block_writer.as_ref().is_some_and(|b| b.is_full()) || self.block_writer.is_none() {
243 self.create_block_writer().await?;
244 }
245
246 Ok(self.block_writer.as_mut().unwrap())
247 }
248
249 pub async fn write(&mut self, mut buf: Bytes) -> Result<usize> {
250 let bytes_to_write = buf.len();
251 while !buf.is_empty() {
253 let block_writer = self.get_block_writer().await?;
254
255 block_writer.write(&mut buf).await?;
256 }
257
258 self.bytes_written += bytes_to_write;
259
260 Ok(bytes_to_write)
261 }
262
263 pub async fn close(&mut self) -> Result<()> {
264 if !self.closed {
265 let extended_block = if let Some(block_writer) = self.block_writer.take() {
266 Some(block_writer.close().await?)
267 } else {
268 None
269 };
270
271 let mut retry_delay = COMPLETE_RETRY_DELAY_MS;
272 let mut retries = 0;
273 while retries < COMPLETE_RETRIES {
274 let successful = self
275 .protocol
276 .complete(&self.src, extended_block.clone(), self.status.file_id)
277 .await?
278 .result;
279
280 if successful {
281 self.closed = true;
282 return Ok(());
283 }
284
285 tokio::time::sleep(Duration::from_millis(retry_delay)).await;
286
287 retry_delay *= 2;
288 retries += 1;
289 }
290 Err(HdfsError::OperationFailed(
291 "Failed to complete file in time".to_string(),
292 ))
293 } else {
294 Ok(())
295 }
296 }
297}
298
299impl Drop for FileWriter {
300 fn drop(&mut self) {
301 if !self.closed {
302 warn!("FileWriter dropped without being closed. File content may not have saved or may not be complete");
303 }
304
305 self.protocol
306 .remove_file_lease(self.status.file_id(), self.status.namespace.clone());
307 }
308}