1use std::sync::Arc;
2use std::time::Duration;
3
4use bytes::{BufMut, Bytes, BytesMut};
5use futures::stream::BoxStream;
6use futures::{stream, Stream, StreamExt};
7use log::warn;
8
9use crate::common::config::Configuration;
10use crate::ec::{resolve_ec_policy, EcSchema};
11use crate::hdfs::block_reader::get_block_stream;
12use crate::hdfs::block_writer::BlockWriter;
13use crate::hdfs::protocol::{LeaseTracker, NamenodeProtocol};
14use crate::proto::hdfs;
15use crate::{HdfsError, Result};
16
17const COMPLETE_RETRY_DELAY_MS: u64 = 500;
18const COMPLETE_RETRIES: u32 = 5;
19
20pub struct FileReader {
21 protocol: Arc<NamenodeProtocol>,
22 status: hdfs::HdfsFileStatusProto,
23 located_blocks: hdfs::LocatedBlocksProto,
24 ec_schema: Option<EcSchema>,
25 position: usize,
26}
27
28impl FileReader {
29 pub(crate) fn new(
30 protocol: Arc<NamenodeProtocol>,
31 status: hdfs::HdfsFileStatusProto,
32 located_blocks: hdfs::LocatedBlocksProto,
33 ec_schema: Option<EcSchema>,
34 ) -> Self {
35 Self {
36 protocol,
37 status,
38 located_blocks,
39 ec_schema,
40 position: 0,
41 }
42 }
43
44 pub fn file_length(&self) -> usize {
46 self.status.length as usize
47 }
48
49 pub fn remaining(&self) -> usize {
51 if self.position > self.file_length() {
52 0
53 } else {
54 self.file_length() - self.position
55 }
56 }
57
58 pub fn seek(&mut self, pos: usize) {
60 if pos > self.file_length() {
61 panic!("Cannot seek beyond the end of a file");
62 }
63 self.position = pos;
64 }
65
66 pub fn tell(&self) -> usize {
68 self.position
69 }
70
71 pub async fn read(&mut self, len: usize) -> Result<Bytes> {
74 if self.position >= self.file_length() {
75 Ok(Bytes::new())
76 } else {
77 let offset = self.position;
78 self.position = usize::min(self.position + len, self.file_length());
79 self.read_range(offset, self.position - offset).await
80 }
81 }
82
83 pub async fn read_buf(&mut self, buf: &mut [u8]) -> Result<usize> {
86 if self.position >= self.file_length() {
87 Ok(0)
88 } else {
89 let offset = self.position;
90 self.position = usize::min(self.position + buf.len(), self.file_length());
91 let read_bytes = self.position - offset;
92 self.read_range_buf(&mut buf[..read_bytes], offset).await?;
93 Ok(read_bytes)
94 }
95 }
96
97 pub async fn read_range(&self, offset: usize, len: usize) -> Result<Bytes> {
102 let mut stream = self.read_range_stream(offset, len);
103 let mut buf = BytesMut::with_capacity(len);
104 while let Some(bytes) = stream.next().await.transpose()? {
105 buf.put(bytes);
106 }
107 Ok(buf.freeze())
108 }
109
110 pub async fn read_range_buf(&self, mut buf: &mut [u8], offset: usize) -> Result<()> {
114 let mut stream = self.read_range_stream(offset, buf.len());
115 while let Some(bytes) = stream.next().await.transpose()? {
116 buf.put(bytes);
117 }
118
119 Ok(())
120 }
121
122 pub fn read_range_stream(
126 &self,
127 offset: usize,
128 len: usize,
129 ) -> impl Stream<Item = Result<Bytes>> {
130 if offset + len > self.file_length() {
131 panic!("Cannot read past end of the file");
132 }
133
134 let block_streams: Vec<BoxStream<Result<Bytes>>> = self
135 .located_blocks
136 .blocks
137 .iter()
138 .flat_map(move |block| {
139 let block_file_start = block.offset as usize;
140 let block_file_end = block_file_start + block.b.num_bytes() as usize;
141
142 if block_file_start < (offset + len) && block_file_end > offset {
143 let block_start = offset - usize::min(offset, block_file_start);
145 let block_end = usize::min(offset + len, block_file_end) - block_file_start;
146 Some(get_block_stream(
147 Arc::clone(&self.protocol),
148 block.clone(),
149 block_start,
150 block_end - block_start,
151 self.ec_schema.clone(),
152 ))
153 } else {
154 None
156 }
157 })
158 .collect();
159
160 stream::iter(block_streams).flatten()
161 }
162}
163
164pub struct FileWriter {
165 src: String,
166 protocol: Arc<NamenodeProtocol>,
167 status: hdfs::HdfsFileStatusProto,
168 config: Arc<Configuration>,
169 block_writer: Option<BlockWriter>,
170 last_block: Option<hdfs::LocatedBlockProto>,
171 closed: bool,
172 bytes_written: usize,
173}
174
175impl FileWriter {
176 pub(crate) fn new(
177 protocol: Arc<NamenodeProtocol>,
178 src: String,
179 status: hdfs::HdfsFileStatusProto,
180 config: Arc<Configuration>,
181 last_block: Option<hdfs::LocatedBlockProto>,
183 ) -> Self {
184 protocol.add_file_lease(status.file_id(), status.namespace.clone());
185 Self {
186 protocol,
187 src,
188 status,
189 config,
190 block_writer: None,
191 last_block,
192 closed: false,
193 bytes_written: 0,
194 }
195 }
196
197 async fn create_block_writer(&mut self) -> Result<()> {
198 let new_block = if let Some(last_block) = self.last_block.take() {
199 if last_block.b.num_bytes() < self.status.blocksize() && self.status.ec_policy.is_none()
201 {
202 last_block
204 } else {
205 self.protocol
207 .add_block(&self.src, Some(last_block.b), self.status.file_id)
208 .await?
209 .block
210 }
211 } else {
212 let extended_block = if let Some(block_writer) = self.block_writer.take() {
215 Some(block_writer.close().await?)
216 } else {
217 None
218 };
219
220 self.protocol
221 .add_block(&self.src, extended_block, self.status.file_id)
222 .await?
223 .block
224 };
225
226 let block_writer = BlockWriter::new(
227 Arc::clone(&self.protocol),
228 new_block,
229 self.protocol.get_cached_server_defaults().await?,
230 Arc::clone(&self.config),
231 self.status
232 .ec_policy
233 .as_ref()
234 .map(resolve_ec_policy)
235 .transpose()?
236 .as_ref(),
237 &self.src,
238 &self.status,
239 )
240 .await?;
241
242 self.block_writer = Some(block_writer);
243 Ok(())
244 }
245
246 async fn get_block_writer(&mut self) -> Result<&mut BlockWriter> {
247 if self.block_writer.as_ref().is_some_and(|b| b.is_full()) || self.block_writer.is_none() {
249 self.create_block_writer().await?;
250 }
251
252 Ok(self.block_writer.as_mut().unwrap())
253 }
254
255 pub async fn write(&mut self, mut buf: Bytes) -> Result<usize> {
256 let bytes_to_write = buf.len();
257 while !buf.is_empty() {
259 let block_writer = self.get_block_writer().await?;
260
261 block_writer.write(&mut buf).await?;
262 }
263
264 self.bytes_written += bytes_to_write;
265
266 Ok(bytes_to_write)
267 }
268
269 pub async fn close(&mut self) -> Result<()> {
270 if !self.closed {
271 let extended_block = if let Some(block_writer) = self.block_writer.take() {
272 Some(block_writer.close().await?)
273 } else {
274 None
275 };
276
277 let mut retry_delay = COMPLETE_RETRY_DELAY_MS;
278 let mut retries = 0;
279 while retries < COMPLETE_RETRIES {
280 let successful = self
281 .protocol
282 .complete(&self.src, extended_block.clone(), self.status.file_id)
283 .await?
284 .result;
285
286 if successful {
287 self.closed = true;
288 return Ok(());
289 }
290
291 tokio::time::sleep(Duration::from_millis(retry_delay)).await;
292
293 retry_delay *= 2;
294 retries += 1;
295 }
296 Err(HdfsError::OperationFailed(
297 "Failed to complete file in time".to_string(),
298 ))
299 } else {
300 Ok(())
301 }
302 }
303}
304
305impl Drop for FileWriter {
306 fn drop(&mut self) {
307 if !self.closed {
308 warn!("FileWriter dropped without being closed. File content may not have saved or may not be complete");
309 }
310
311 self.protocol
312 .remove_file_lease(self.status.file_id(), self.status.namespace.clone());
313 }
314}