hdfs_native/
file.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use bytes::{BufMut, Bytes, BytesMut};
5use futures::stream::BoxStream;
6use futures::{stream, Stream, StreamExt};
7use log::warn;
8
9use crate::ec::{resolve_ec_policy, EcSchema};
10use crate::hdfs::block_reader::get_block_stream;
11use crate::hdfs::block_writer::BlockWriter;
12use crate::hdfs::protocol::{LeaseTracker, NamenodeProtocol};
13use crate::proto::hdfs;
14use crate::{HdfsError, Result};
15
16const COMPLETE_RETRY_DELAY_MS: u64 = 500;
17const COMPLETE_RETRIES: u32 = 5;
18
19pub struct FileReader {
20    protocol: Arc<NamenodeProtocol>,
21    status: hdfs::HdfsFileStatusProto,
22    located_blocks: hdfs::LocatedBlocksProto,
23    ec_schema: Option<EcSchema>,
24    position: usize,
25}
26
27impl FileReader {
28    pub(crate) fn new(
29        protocol: Arc<NamenodeProtocol>,
30        status: hdfs::HdfsFileStatusProto,
31        located_blocks: hdfs::LocatedBlocksProto,
32        ec_schema: Option<EcSchema>,
33    ) -> Self {
34        Self {
35            protocol,
36            status,
37            located_blocks,
38            ec_schema,
39            position: 0,
40        }
41    }
42
43    /// Returns the total size of the file
44    pub fn file_length(&self) -> usize {
45        self.status.length as usize
46    }
47
48    /// Returns the remaining bytes left based on the current cursor position.
49    pub fn remaining(&self) -> usize {
50        if self.position > self.file_length() {
51            0
52        } else {
53            self.file_length() - self.position
54        }
55    }
56
57    /// Sets the cursor to the position. Panics if the position is beyond the end of the file
58    pub fn seek(&mut self, pos: usize) {
59        if pos > self.file_length() {
60            panic!("Cannot seek beyond the end of a file");
61        }
62        self.position = pos;
63    }
64
65    /// Returns the current cursor position in the file
66    pub fn tell(&self) -> usize {
67        self.position
68    }
69
70    /// Read up to `len` bytes into a new [Bytes] object, advancing the internal position in the file.
71    /// An empty [Bytes] object will be returned if the end of the file has been reached.
72    pub async fn read(&mut self, len: usize) -> Result<Bytes> {
73        if self.position >= self.file_length() {
74            Ok(Bytes::new())
75        } else {
76            let offset = self.position;
77            self.position = usize::min(self.position + len, self.file_length());
78            self.read_range(offset, self.position - offset).await
79        }
80    }
81
82    /// Read up to `buf.len()` bytes into the provided slice, advancing the internal position in the file.
83    /// Returns the number of bytes that were read, or 0 if the end of the file has been reached.
84    pub async fn read_buf(&mut self, buf: &mut [u8]) -> Result<usize> {
85        if self.position >= self.file_length() {
86            Ok(0)
87        } else {
88            let offset = self.position;
89            self.position = usize::min(self.position + buf.len(), self.file_length());
90            let read_bytes = self.position - offset;
91            self.read_range_buf(buf, offset).await?;
92            Ok(read_bytes)
93        }
94    }
95
96    /// Read up to `len` bytes starting at `offset` into a new [Bytes] object. The returned buffer
97    /// could be smaller than `len` if `offset + len` extends beyond the end of the file.
98    ///
99    /// Panics if the requested range is outside of the file
100    pub async fn read_range(&self, offset: usize, len: usize) -> Result<Bytes> {
101        let mut stream = self.read_range_stream(offset, len).boxed();
102        let mut buf = BytesMut::with_capacity(len);
103        while let Some(bytes) = stream.next().await.transpose()? {
104            buf.put(bytes);
105        }
106        Ok(buf.freeze())
107    }
108
109    /// Read file data into an existing buffer
110    ///
111    /// Panics if the requested range is outside of the file
112    pub async fn read_range_buf(&self, mut buf: &mut [u8], offset: usize) -> Result<()> {
113        let mut stream = self.read_range_stream(offset, buf.len()).boxed();
114        while let Some(bytes) = stream.next().await.transpose()? {
115            buf.put(bytes);
116        }
117
118        Ok(())
119    }
120
121    /// Return a stream of `Bytes` objects containing the content of the file
122    ///
123    /// Panics if the requested range is outside of the file
124    pub fn read_range_stream(
125        &self,
126        offset: usize,
127        len: usize,
128    ) -> impl Stream<Item = Result<Bytes>> {
129        if offset + len > self.file_length() {
130            panic!("Cannot read past end of the file");
131        }
132
133        let block_streams: Vec<BoxStream<Result<Bytes>>> = self
134            .located_blocks
135            .blocks
136            .iter()
137            .flat_map(move |block| {
138                let block_file_start = block.offset as usize;
139                let block_file_end = block_file_start + block.b.num_bytes() as usize;
140
141                if block_file_start < (offset + len) && block_file_end > offset {
142                    // We need to read this block
143                    let block_start = offset - usize::min(offset, block_file_start);
144                    let block_end = usize::min(offset + len, block_file_end) - block_file_start;
145                    Some(get_block_stream(
146                        Arc::clone(&self.protocol),
147                        block.clone(),
148                        block_start,
149                        block_end - block_start,
150                        self.ec_schema.clone(),
151                    ))
152                } else {
153                    // No data is needed from this block
154                    None
155                }
156            })
157            .collect();
158
159        stream::iter(block_streams).flatten()
160    }
161}
162
163pub struct FileWriter {
164    src: String,
165    protocol: Arc<NamenodeProtocol>,
166    status: hdfs::HdfsFileStatusProto,
167    block_writer: Option<BlockWriter>,
168    last_block: Option<hdfs::LocatedBlockProto>,
169    closed: bool,
170    bytes_written: usize,
171}
172
173impl FileWriter {
174    pub(crate) fn new(
175        protocol: Arc<NamenodeProtocol>,
176        src: String,
177        status: hdfs::HdfsFileStatusProto,
178        // Some for append, None for create
179        last_block: Option<hdfs::LocatedBlockProto>,
180    ) -> Self {
181        protocol.add_file_lease(status.file_id(), status.namespace.clone());
182        Self {
183            protocol,
184            src,
185            status,
186            block_writer: None,
187            last_block,
188            closed: false,
189            bytes_written: 0,
190        }
191    }
192
193    async fn create_block_writer(&mut self) -> Result<()> {
194        let new_block = if let Some(last_block) = self.last_block.take() {
195            // Append operation on first write. Erasure code appends always just create a new block.
196            if last_block.b.num_bytes() < self.status.blocksize() && self.status.ec_policy.is_none()
197            {
198                // The last block isn't full, just write data to it
199                last_block
200            } else {
201                // The last block is full, so create a new block to write to
202                self.protocol
203                    .add_block(&self.src, Some(last_block.b), self.status.file_id)
204                    .await?
205                    .block
206            }
207        } else {
208            // Not appending to an existing block, just create a new one
209            // If there's an existing block writer, close it first
210            let extended_block = if let Some(block_writer) = self.block_writer.take() {
211                Some(block_writer.close().await?)
212            } else {
213                None
214            };
215
216            self.protocol
217                .add_block(&self.src, extended_block, self.status.file_id)
218                .await?
219                .block
220        };
221
222        let block_writer = BlockWriter::new(
223            Arc::clone(&self.protocol),
224            new_block,
225            self.status.blocksize() as usize,
226            self.protocol.get_cached_server_defaults().await?,
227            self.status
228                .ec_policy
229                .as_ref()
230                .map(resolve_ec_policy)
231                .transpose()?
232                .as_ref(),
233        )
234        .await?;
235
236        self.block_writer = Some(block_writer);
237        Ok(())
238    }
239
240    async fn get_block_writer(&mut self) -> Result<&mut BlockWriter> {
241        // If the current writer is full, or hasn't been created, create one
242        if self.block_writer.as_ref().is_some_and(|b| b.is_full()) || self.block_writer.is_none() {
243            self.create_block_writer().await?;
244        }
245
246        Ok(self.block_writer.as_mut().unwrap())
247    }
248
249    pub async fn write(&mut self, mut buf: Bytes) -> Result<usize> {
250        let bytes_to_write = buf.len();
251        // Create a shallow copy of the bytes instance to mutate and track what's been read
252        while !buf.is_empty() {
253            let block_writer = self.get_block_writer().await?;
254
255            block_writer.write(&mut buf).await?;
256        }
257
258        self.bytes_written += bytes_to_write;
259
260        Ok(bytes_to_write)
261    }
262
263    pub async fn close(&mut self) -> Result<()> {
264        if !self.closed {
265            let extended_block = if let Some(block_writer) = self.block_writer.take() {
266                Some(block_writer.close().await?)
267            } else {
268                None
269            };
270
271            let mut retry_delay = COMPLETE_RETRY_DELAY_MS;
272            let mut retries = 0;
273            while retries < COMPLETE_RETRIES {
274                let successful = self
275                    .protocol
276                    .complete(&self.src, extended_block.clone(), self.status.file_id)
277                    .await?
278                    .result;
279
280                if successful {
281                    self.closed = true;
282                    return Ok(());
283                }
284
285                tokio::time::sleep(Duration::from_millis(retry_delay)).await;
286
287                retry_delay *= 2;
288                retries += 1;
289            }
290            Err(HdfsError::OperationFailed(
291                "Failed to complete file in time".to_string(),
292            ))
293        } else {
294            Ok(())
295        }
296    }
297}
298
299impl Drop for FileWriter {
300    fn drop(&mut self) {
301        if !self.closed {
302            warn!("FileWriter dropped without being closed. File content may not have saved or may not be complete");
303        }
304
305        self.protocol
306            .remove_file_lease(self.status.file_id(), self.status.namespace.clone());
307    }
308}