hdfs_native/
file.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use bytes::{BufMut, Bytes, BytesMut};
5use futures::stream::BoxStream;
6use futures::{stream, Stream, StreamExt};
7use log::warn;
8
9use crate::common::config::Configuration;
10use crate::ec::{resolve_ec_policy, EcSchema};
11use crate::hdfs::block_reader::get_block_stream;
12use crate::hdfs::block_writer::BlockWriter;
13use crate::hdfs::protocol::{LeaseTracker, NamenodeProtocol};
14use crate::proto::hdfs;
15use crate::{HdfsError, Result};
16
17const COMPLETE_RETRY_DELAY_MS: u64 = 500;
18const COMPLETE_RETRIES: u32 = 5;
19
20pub struct FileReader {
21    protocol: Arc<NamenodeProtocol>,
22    status: hdfs::HdfsFileStatusProto,
23    located_blocks: hdfs::LocatedBlocksProto,
24    ec_schema: Option<EcSchema>,
25    position: usize,
26}
27
28impl FileReader {
29    pub(crate) fn new(
30        protocol: Arc<NamenodeProtocol>,
31        status: hdfs::HdfsFileStatusProto,
32        located_blocks: hdfs::LocatedBlocksProto,
33        ec_schema: Option<EcSchema>,
34    ) -> Self {
35        Self {
36            protocol,
37            status,
38            located_blocks,
39            ec_schema,
40            position: 0,
41        }
42    }
43
44    /// Returns the total size of the file
45    pub fn file_length(&self) -> usize {
46        self.status.length as usize
47    }
48
49    /// Returns the remaining bytes left based on the current cursor position.
50    pub fn remaining(&self) -> usize {
51        if self.position > self.file_length() {
52            0
53        } else {
54            self.file_length() - self.position
55        }
56    }
57
58    /// Sets the cursor to the position. Panics if the position is beyond the end of the file
59    pub fn seek(&mut self, pos: usize) {
60        if pos > self.file_length() {
61            panic!("Cannot seek beyond the end of a file");
62        }
63        self.position = pos;
64    }
65
66    /// Returns the current cursor position in the file
67    pub fn tell(&self) -> usize {
68        self.position
69    }
70
71    /// Read up to `len` bytes into a new [Bytes] object, advancing the internal position in the file.
72    /// An empty [Bytes] object will be returned if the end of the file has been reached.
73    pub async fn read(&mut self, len: usize) -> Result<Bytes> {
74        if self.position >= self.file_length() {
75            Ok(Bytes::new())
76        } else {
77            let offset = self.position;
78            self.position = usize::min(self.position + len, self.file_length());
79            self.read_range(offset, self.position - offset).await
80        }
81    }
82
83    /// Read up to `buf.len()` bytes into the provided slice, advancing the internal position in the file.
84    /// Returns the number of bytes that were read, or 0 if the end of the file has been reached.
85    pub async fn read_buf(&mut self, buf: &mut [u8]) -> Result<usize> {
86        if self.position >= self.file_length() {
87            Ok(0)
88        } else {
89            let offset = self.position;
90            self.position = usize::min(self.position + buf.len(), self.file_length());
91            let read_bytes = self.position - offset;
92            self.read_range_buf(&mut buf[..read_bytes], offset).await?;
93            Ok(read_bytes)
94        }
95    }
96
97    /// Read up to `len` bytes starting at `offset` into a new [Bytes] object. The returned buffer
98    /// could be smaller than `len` if `offset + len` extends beyond the end of the file.
99    ///
100    /// Panics if the requested range is outside of the file
101    pub async fn read_range(&self, offset: usize, len: usize) -> Result<Bytes> {
102        let mut stream = self.read_range_stream(offset, len);
103        let mut buf = BytesMut::with_capacity(len);
104        while let Some(bytes) = stream.next().await.transpose()? {
105            buf.put(bytes);
106        }
107        Ok(buf.freeze())
108    }
109
110    /// Read file data into an existing buffer
111    ///
112    /// Panics if the requested range is outside of the file
113    pub async fn read_range_buf(&self, mut buf: &mut [u8], offset: usize) -> Result<()> {
114        let mut stream = self.read_range_stream(offset, buf.len());
115        while let Some(bytes) = stream.next().await.transpose()? {
116            buf.put(bytes);
117        }
118
119        Ok(())
120    }
121
122    /// Return a stream of `Bytes` objects containing the content of the file
123    ///
124    /// Panics if the requested range is outside of the file
125    pub fn read_range_stream(
126        &self,
127        offset: usize,
128        len: usize,
129    ) -> impl Stream<Item = Result<Bytes>> {
130        if offset + len > self.file_length() {
131            panic!("Cannot read past end of the file");
132        }
133
134        let block_streams: Vec<BoxStream<Result<Bytes>>> = self
135            .located_blocks
136            .blocks
137            .iter()
138            .flat_map(move |block| {
139                let block_file_start = block.offset as usize;
140                let block_file_end = block_file_start + block.b.num_bytes() as usize;
141
142                if block_file_start < (offset + len) && block_file_end > offset {
143                    // We need to read this block
144                    let block_start = offset - usize::min(offset, block_file_start);
145                    let block_end = usize::min(offset + len, block_file_end) - block_file_start;
146                    Some(get_block_stream(
147                        Arc::clone(&self.protocol),
148                        block.clone(),
149                        block_start,
150                        block_end - block_start,
151                        self.ec_schema.clone(),
152                    ))
153                } else {
154                    // No data is needed from this block
155                    None
156                }
157            })
158            .collect();
159
160        stream::iter(block_streams).flatten()
161    }
162}
163
164pub struct FileWriter {
165    src: String,
166    protocol: Arc<NamenodeProtocol>,
167    status: hdfs::HdfsFileStatusProto,
168    config: Arc<Configuration>,
169    block_writer: Option<BlockWriter>,
170    last_block: Option<hdfs::LocatedBlockProto>,
171    closed: bool,
172    bytes_written: usize,
173}
174
175impl FileWriter {
176    pub(crate) fn new(
177        protocol: Arc<NamenodeProtocol>,
178        src: String,
179        status: hdfs::HdfsFileStatusProto,
180        config: Arc<Configuration>,
181        // Some for append, None for create
182        last_block: Option<hdfs::LocatedBlockProto>,
183    ) -> Self {
184        protocol.add_file_lease(status.file_id(), status.namespace.clone());
185        Self {
186            protocol,
187            src,
188            status,
189            config,
190            block_writer: None,
191            last_block,
192            closed: false,
193            bytes_written: 0,
194        }
195    }
196
197    async fn create_block_writer(&mut self) -> Result<()> {
198        let new_block = if let Some(last_block) = self.last_block.take() {
199            // Append operation on first write. Erasure code appends always just create a new block.
200            if last_block.b.num_bytes() < self.status.blocksize() && self.status.ec_policy.is_none()
201            {
202                // The last block isn't full, just write data to it
203                last_block
204            } else {
205                // The last block is full, so create a new block to write to
206                self.protocol
207                    .add_block(&self.src, Some(last_block.b), self.status.file_id)
208                    .await?
209                    .block
210            }
211        } else {
212            // Not appending to an existing block, just create a new one
213            // If there's an existing block writer, close it first
214            let extended_block = if let Some(block_writer) = self.block_writer.take() {
215                Some(block_writer.close().await?)
216            } else {
217                None
218            };
219
220            self.protocol
221                .add_block(&self.src, extended_block, self.status.file_id)
222                .await?
223                .block
224        };
225
226        let block_writer = BlockWriter::new(
227            Arc::clone(&self.protocol),
228            new_block,
229            self.protocol.get_cached_server_defaults().await?,
230            Arc::clone(&self.config),
231            self.status
232                .ec_policy
233                .as_ref()
234                .map(resolve_ec_policy)
235                .transpose()?
236                .as_ref(),
237            &self.src,
238            &self.status,
239        )
240        .await?;
241
242        self.block_writer = Some(block_writer);
243        Ok(())
244    }
245
246    async fn get_block_writer(&mut self) -> Result<&mut BlockWriter> {
247        // If the current writer is full, or hasn't been created, create one
248        if self.block_writer.as_ref().is_some_and(|b| b.is_full()) || self.block_writer.is_none() {
249            self.create_block_writer().await?;
250        }
251
252        Ok(self.block_writer.as_mut().unwrap())
253    }
254
255    pub async fn write(&mut self, mut buf: Bytes) -> Result<usize> {
256        let bytes_to_write = buf.len();
257        // Create a shallow copy of the bytes instance to mutate and track what's been read
258        while !buf.is_empty() {
259            let block_writer = self.get_block_writer().await?;
260
261            block_writer.write(&mut buf).await?;
262        }
263
264        self.bytes_written += bytes_to_write;
265
266        Ok(bytes_to_write)
267    }
268
269    pub async fn close(&mut self) -> Result<()> {
270        if !self.closed {
271            let extended_block = if let Some(block_writer) = self.block_writer.take() {
272                Some(block_writer.close().await?)
273            } else {
274                None
275            };
276
277            let mut retry_delay = COMPLETE_RETRY_DELAY_MS;
278            let mut retries = 0;
279            while retries < COMPLETE_RETRIES {
280                let successful = self
281                    .protocol
282                    .complete(&self.src, extended_block.clone(), self.status.file_id)
283                    .await?
284                    .result;
285
286                if successful {
287                    self.closed = true;
288                    return Ok(());
289                }
290
291                tokio::time::sleep(Duration::from_millis(retry_delay)).await;
292
293                retry_delay *= 2;
294                retries += 1;
295            }
296            Err(HdfsError::OperationFailed(
297                "Failed to complete file in time".to_string(),
298            ))
299        } else {
300            Ok(())
301        }
302    }
303}
304
305impl Drop for FileWriter {
306    fn drop(&mut self) {
307        if !self.closed {
308            warn!("FileWriter dropped without being closed. File content may not have saved or may not be complete");
309        }
310
311        self.protocol
312            .remove_file_lease(self.status.file_id(), self.status.namespace.clone());
313    }
314}