hdfs_native/
client.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3
4use futures::stream::BoxStream;
5use futures::{stream, StreamExt};
6use url::Url;
7
8use crate::acl::{AclEntry, AclStatus};
9use crate::common::config::{self, Configuration};
10use crate::ec::resolve_ec_policy;
11use crate::error::{HdfsError, Result};
12use crate::file::{FileReader, FileWriter};
13use crate::hdfs::protocol::NamenodeProtocol;
14use crate::hdfs::proxy::NameServiceProxy;
15use crate::proto::hdfs::hdfs_file_status_proto::FileType;
16
17use crate::proto::hdfs::{ContentSummaryProto, HdfsFileStatusProto};
18
19#[derive(Clone)]
20pub struct WriteOptions {
21    /// Block size. Default is retrieved from the server.
22    pub block_size: Option<u64>,
23    /// Replication factor. Default is retrieved from the server.
24    pub replication: Option<u32>,
25    /// Unix file permission, defaults to 0o644, which is "rw-r--r--" as a Unix permission.
26    /// This is the raw octal value represented in base 10.
27    pub permission: u32,
28    /// Whether to overwrite the file, defaults to false. If true and the
29    /// file does not exist, it will result in an error.
30    pub overwrite: bool,
31    /// Whether to create any missing parent directories, defaults to true. If false
32    /// and the parent directory does not exist, an error will be returned.
33    pub create_parent: bool,
34}
35
36impl Default for WriteOptions {
37    fn default() -> Self {
38        Self {
39            block_size: None,
40            replication: None,
41            permission: 0o644,
42            overwrite: false,
43            create_parent: true,
44        }
45    }
46}
47
48impl AsRef<WriteOptions> for WriteOptions {
49    fn as_ref(&self) -> &WriteOptions {
50        self
51    }
52}
53
54impl WriteOptions {
55    /// Set the block_size for the new file
56    pub fn block_size(mut self, block_size: u64) -> Self {
57        self.block_size = Some(block_size);
58        self
59    }
60
61    /// Set the replication for the new file
62    pub fn replication(mut self, replication: u32) -> Self {
63        self.replication = Some(replication);
64        self
65    }
66
67    /// Set the raw octal permission value for the new file
68    pub fn permission(mut self, permission: u32) -> Self {
69        self.permission = permission;
70        self
71    }
72
73    /// Set whether to overwrite an existing file
74    pub fn overwrite(mut self, overwrite: bool) -> Self {
75        self.overwrite = overwrite;
76        self
77    }
78
79    /// Set whether to create all missing parent directories
80    pub fn create_parent(mut self, create_parent: bool) -> Self {
81        self.create_parent = create_parent;
82        self
83    }
84}
85
86#[derive(Debug, Clone)]
87struct MountLink {
88    viewfs_path: String,
89    hdfs_path: String,
90    protocol: Arc<NamenodeProtocol>,
91}
92
93impl MountLink {
94    fn new(viewfs_path: &str, hdfs_path: &str, protocol: Arc<NamenodeProtocol>) -> Self {
95        // We should never have an empty path, we always want things mounted at root ("/") by default.
96        Self {
97            viewfs_path: viewfs_path.trim_end_matches("/").to_string(),
98            hdfs_path: hdfs_path.trim_end_matches("/").to_string(),
99            protocol,
100        }
101    }
102    /// Convert a viewfs path into a name service path if it matches this link
103    fn resolve(&self, path: &str) -> Option<String> {
104        // Make sure we don't partially match the last component. It either needs to be an exact
105        // match to a viewfs path, or needs to match with a trailing slash
106        if path == self.viewfs_path {
107            Some(self.hdfs_path.clone())
108        } else {
109            path.strip_prefix(&format!("{}/", self.viewfs_path))
110                .map(|relative_path| format!("{}/{}", &self.hdfs_path, relative_path))
111        }
112    }
113}
114
115#[derive(Debug)]
116struct MountTable {
117    mounts: Vec<MountLink>,
118    fallback: MountLink,
119}
120
121impl MountTable {
122    fn resolve(&self, src: &str) -> (&MountLink, String) {
123        for link in self.mounts.iter() {
124            if let Some(resolved) = link.resolve(src) {
125                return (link, resolved);
126            }
127        }
128        (&self.fallback, self.fallback.resolve(src).unwrap())
129    }
130}
131
132#[derive(Debug)]
133pub struct Client {
134    mount_table: Arc<MountTable>,
135    config: Arc<Configuration>,
136}
137
138impl Client {
139    /// Creates a new HDFS Client. The URL must include the protocol and host, and optionally a port.
140    /// If a port is included, the host is treated as a single NameNode. If no port is included, the
141    /// host is treated as a name service that will be resolved using the HDFS config.
142    pub fn new(url: &str) -> Result<Self> {
143        let parsed_url = Url::parse(url)?;
144        Self::with_config(&parsed_url, Configuration::new()?)
145    }
146
147    pub fn new_with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
148        let parsed_url = Url::parse(url)?;
149        Self::with_config(&parsed_url, Configuration::new_with_config(config)?)
150    }
151
152    pub fn default_with_config(config: HashMap<String, String>) -> Result<Self> {
153        let config = Configuration::new_with_config(config)?;
154        Self::with_config(&Self::default_fs(&config)?, config)
155    }
156
157    fn default_fs(config: &Configuration) -> Result<Url> {
158        let url = config
159            .get(config::DEFAULT_FS)
160            .ok_or(HdfsError::InvalidArgument(format!(
161                "No {} setting found",
162                config::DEFAULT_FS
163            )))?;
164        Ok(Url::parse(&url)?)
165    }
166
167    fn with_config(url: &Url, config: Configuration) -> Result<Self> {
168        let resolved_url = if !url.has_host() {
169            let default_url = Self::default_fs(&config)?;
170            if url.scheme() != default_url.scheme() || !default_url.has_host() {
171                return Err(HdfsError::InvalidArgument(
172                    "URL must contain a host".to_string(),
173                ));
174            }
175            default_url
176        } else {
177            url.clone()
178        };
179
180        let mount_table = match url.scheme() {
181            "hdfs" => {
182                let proxy = NameServiceProxy::new(&resolved_url, &config)?;
183                let protocol = Arc::new(NamenodeProtocol::new(proxy));
184
185                MountTable {
186                    mounts: Vec::new(),
187                    fallback: MountLink::new("/", "/", protocol),
188                }
189            }
190            "viewfs" => Self::build_mount_table(resolved_url.host_str().unwrap(), &config)?,
191            _ => {
192                return Err(HdfsError::InvalidArgument(
193                    "Only `hdfs` and `viewfs` schemes are supported".to_string(),
194                ))
195            }
196        };
197
198        Ok(Self {
199            mount_table: Arc::new(mount_table),
200            config: Arc::new(config),
201        })
202    }
203
204    fn build_mount_table(host: &str, config: &Configuration) -> Result<MountTable> {
205        let mut mounts: Vec<MountLink> = Vec::new();
206        let mut fallback: Option<MountLink> = None;
207
208        for (viewfs_path, hdfs_url) in config.get_mount_table(host).iter() {
209            let url = Url::parse(hdfs_url)?;
210            if !url.has_host() {
211                return Err(HdfsError::InvalidArgument(
212                    "URL must contain a host".to_string(),
213                ));
214            }
215            if url.scheme() != "hdfs" {
216                return Err(HdfsError::InvalidArgument(
217                    "Only hdfs mounts are supported for viewfs".to_string(),
218                ));
219            }
220            let proxy = NameServiceProxy::new(&url, config)?;
221            let protocol = Arc::new(NamenodeProtocol::new(proxy));
222
223            if let Some(prefix) = viewfs_path {
224                mounts.push(MountLink::new(prefix, url.path(), protocol));
225            } else {
226                if fallback.is_some() {
227                    return Err(HdfsError::InvalidArgument(
228                        "Multiple viewfs fallback links found".to_string(),
229                    ));
230                }
231                fallback = Some(MountLink::new("/", url.path(), protocol));
232            }
233        }
234
235        if let Some(fallback) = fallback {
236            // Sort the mount table from longest viewfs path to shortest. This makes sure more specific paths are considered first.
237            mounts.sort_by_key(|m| m.viewfs_path.chars().filter(|c| *c == '/').count());
238            mounts.reverse();
239
240            Ok(MountTable { mounts, fallback })
241        } else {
242            Err(HdfsError::InvalidArgument(
243                "No viewfs fallback mount found".to_string(),
244            ))
245        }
246    }
247
248    /// Retrieve the file status for the file at `path`.
249    pub async fn get_file_info(&self, path: &str) -> Result<FileStatus> {
250        let (link, resolved_path) = self.mount_table.resolve(path);
251        match link.protocol.get_file_info(&resolved_path).await?.fs {
252            Some(status) => Ok(FileStatus::from(status, path)),
253            None => Err(HdfsError::FileNotFound(path.to_string())),
254        }
255    }
256
257    /// Retrives a list of all files in directories located at `path`. Wrapper around `list_status_iter` that
258    /// returns Err if any part of the stream fails, or Ok if all file statuses were found successfully.
259    pub async fn list_status(&self, path: &str, recursive: bool) -> Result<Vec<FileStatus>> {
260        let iter = self.list_status_iter(path, recursive);
261        let statuses = iter
262            .into_stream()
263            .collect::<Vec<Result<FileStatus>>>()
264            .await;
265
266        let mut resolved_statues = Vec::<FileStatus>::with_capacity(statuses.len());
267        for status in statuses.into_iter() {
268            resolved_statues.push(status?);
269        }
270
271        Ok(resolved_statues)
272    }
273
274    /// Retrives an iterator of all files in directories located at `path`.
275    pub fn list_status_iter(&self, path: &str, recursive: bool) -> ListStatusIterator {
276        ListStatusIterator::new(path.to_string(), Arc::clone(&self.mount_table), recursive)
277    }
278
279    /// Opens a file reader for the file at `path`. Path should not include a scheme.
280    pub async fn read(&self, path: &str) -> Result<FileReader> {
281        let (link, resolved_path) = self.mount_table.resolve(path);
282        let located_info = link.protocol.get_located_file_info(&resolved_path).await?;
283        match located_info.fs {
284            Some(mut status) => {
285                let ec_schema = if let Some(ec_policy) = status.ec_policy.as_ref() {
286                    Some(resolve_ec_policy(ec_policy)?)
287                } else {
288                    None
289                };
290
291                if status.file_encryption_info.is_some() {
292                    return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
293                }
294                if status.file_type() == FileType::IsDir {
295                    return Err(HdfsError::IsADirectoryError(path.to_string()));
296                }
297
298                if let Some(locations) = status.locations.take() {
299                    Ok(FileReader::new(
300                        Arc::clone(&link.protocol),
301                        status,
302                        locations,
303                        ec_schema,
304                    ))
305                } else {
306                    Err(HdfsError::BlocksNotFound(path.to_string()))
307                }
308            }
309            None => Err(HdfsError::FileNotFound(path.to_string())),
310        }
311    }
312
313    /// Opens a new file for writing. See [WriteOptions] for options and behavior for different
314    /// scenarios.
315    pub async fn create(
316        &self,
317        src: &str,
318        write_options: impl AsRef<WriteOptions>,
319    ) -> Result<FileWriter> {
320        let write_options = write_options.as_ref();
321
322        let (link, resolved_path) = self.mount_table.resolve(src);
323
324        let create_response = link
325            .protocol
326            .create(
327                &resolved_path,
328                write_options.permission,
329                write_options.overwrite,
330                write_options.create_parent,
331                write_options.replication,
332                write_options.block_size,
333            )
334            .await?;
335
336        match create_response.fs {
337            Some(status) => {
338                if status.file_encryption_info.is_some() {
339                    let _ = self.delete(src, false).await;
340                    return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
341                }
342
343                Ok(FileWriter::new(
344                    Arc::clone(&link.protocol),
345                    resolved_path,
346                    status,
347                    Arc::clone(&self.config),
348                    None,
349                ))
350            }
351            None => Err(HdfsError::FileNotFound(src.to_string())),
352        }
353    }
354
355    fn needs_new_block(class: &str, msg: &str) -> bool {
356        class == "java.lang.UnsupportedOperationException" && msg.contains("NEW_BLOCK")
357    }
358
359    /// Opens an existing file for appending. An Err will be returned if the file does not exist. If the
360    /// file is replicated, the current block will be appended to until it is full. If the file is erasure
361    /// coded, a new block will be created.
362    pub async fn append(&self, src: &str) -> Result<FileWriter> {
363        let (link, resolved_path) = self.mount_table.resolve(src);
364
365        // Assume the file is replicated and try to append to the current block. If the file is
366        // erasure coded, then try again by appending to a new block.
367        let append_response = match link.protocol.append(&resolved_path, false).await {
368            Err(HdfsError::RPCError(class, msg)) if Self::needs_new_block(&class, &msg) => {
369                link.protocol.append(&resolved_path, true).await?
370            }
371            resp => resp?,
372        };
373
374        match append_response.stat {
375            Some(status) => {
376                if status.file_encryption_info.is_some() {
377                    let _ = link
378                        .protocol
379                        .complete(src, append_response.block.map(|b| b.b), status.file_id)
380                        .await;
381                    return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
382                }
383
384                Ok(FileWriter::new(
385                    Arc::clone(&link.protocol),
386                    resolved_path,
387                    status,
388                    Arc::clone(&self.config),
389                    append_response.block,
390                ))
391            }
392            None => Err(HdfsError::FileNotFound(src.to_string())),
393        }
394    }
395
396    /// Create a new directory at `path` with the given `permission`.
397    ///
398    /// `permission` is the raw octal value representing the Unix style permission. For example, to
399    /// set 755 (`rwxr-x-rx`) permissions, use 0o755.
400    ///
401    /// If `create_parent` is true, any missing parent directories will be created as well,
402    /// otherwise an error will be returned if the parent directory doesn't already exist.
403    pub async fn mkdirs(&self, path: &str, permission: u32, create_parent: bool) -> Result<()> {
404        let (link, resolved_path) = self.mount_table.resolve(path);
405        link.protocol
406            .mkdirs(&resolved_path, permission, create_parent)
407            .await
408            .map(|_| ())
409    }
410
411    /// Renames `src` to `dst`. Returns Ok(()) on success, and Err otherwise.
412    pub async fn rename(&self, src: &str, dst: &str, overwrite: bool) -> Result<()> {
413        let (src_link, src_resolved_path) = self.mount_table.resolve(src);
414        let (dst_link, dst_resolved_path) = self.mount_table.resolve(dst);
415        if src_link.viewfs_path == dst_link.viewfs_path {
416            src_link
417                .protocol
418                .rename(&src_resolved_path, &dst_resolved_path, overwrite)
419                .await
420                .map(|_| ())
421        } else {
422            Err(HdfsError::InvalidArgument(
423                "Cannot rename across different name services".to_string(),
424            ))
425        }
426    }
427
428    /// Deletes the file or directory at `path`. If `recursive` is false and `path` is a non-empty
429    /// directory, this will fail. Returns `Ok(true)` if it was successfully deleted.
430    pub async fn delete(&self, path: &str, recursive: bool) -> Result<bool> {
431        let (link, resolved_path) = self.mount_table.resolve(path);
432        link.protocol
433            .delete(&resolved_path, recursive)
434            .await
435            .map(|r| r.result)
436    }
437
438    /// Sets the modified and access times for a file. Times should be in milliseconds from the epoch.
439    pub async fn set_times(&self, path: &str, mtime: u64, atime: u64) -> Result<()> {
440        let (link, resolved_path) = self.mount_table.resolve(path);
441        link.protocol
442            .set_times(&resolved_path, mtime, atime)
443            .await?;
444        Ok(())
445    }
446
447    /// Optionally sets the owner and group for a file.
448    pub async fn set_owner(
449        &self,
450        path: &str,
451        owner: Option<&str>,
452        group: Option<&str>,
453    ) -> Result<()> {
454        let (link, resolved_path) = self.mount_table.resolve(path);
455        link.protocol
456            .set_owner(&resolved_path, owner, group)
457            .await?;
458        Ok(())
459    }
460
461    /// Sets permissions for a file. Permission should be an octal number reprenting the Unix style
462    /// permission.
463    ///
464    /// For example, to set permissions to rwxr-xr-x:
465    /// ```rust
466    /// # async fn func() {
467    /// # let client = hdfs_native::Client::new("localhost:9000").unwrap();
468    /// client.set_permission("/path", 0o755).await.unwrap();
469    /// }
470    /// ```
471    pub async fn set_permission(&self, path: &str, permission: u32) -> Result<()> {
472        let (link, resolved_path) = self.mount_table.resolve(path);
473        link.protocol
474            .set_permission(&resolved_path, permission)
475            .await?;
476        Ok(())
477    }
478
479    /// Sets the replication for a file.
480    pub async fn set_replication(&self, path: &str, replication: u32) -> Result<bool> {
481        let (link, resolved_path) = self.mount_table.resolve(path);
482        let result = link
483            .protocol
484            .set_replication(&resolved_path, replication)
485            .await?
486            .result;
487
488        Ok(result)
489    }
490
491    /// Gets a content summary for a file or directory rooted at `path`.
492    pub async fn get_content_summary(&self, path: &str) -> Result<ContentSummary> {
493        let (link, resolved_path) = self.mount_table.resolve(path);
494        let result = link
495            .protocol
496            .get_content_summary(&resolved_path)
497            .await?
498            .summary;
499
500        Ok(result.into())
501    }
502
503    /// Update ACL entries for file or directory at `path`. Existing entries will remain.
504    pub async fn modify_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
505        let (link, resolved_path) = self.mount_table.resolve(path);
506        link.protocol
507            .modify_acl_entries(&resolved_path, acl_spec)
508            .await?;
509
510        Ok(())
511    }
512
513    /// Remove specific ACL entries for file or directory at `path`.
514    pub async fn remove_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
515        let (link, resolved_path) = self.mount_table.resolve(path);
516        link.protocol
517            .remove_acl_entries(&resolved_path, acl_spec)
518            .await?;
519
520        Ok(())
521    }
522
523    /// Remove all default ACLs for file or directory at `path`.
524    pub async fn remove_default_acl(&self, path: &str) -> Result<()> {
525        let (link, resolved_path) = self.mount_table.resolve(path);
526        link.protocol.remove_default_acl(&resolved_path).await?;
527
528        Ok(())
529    }
530
531    /// Remove all ACL entries for file or directory at `path`.
532    pub async fn remove_acl(&self, path: &str) -> Result<()> {
533        let (link, resolved_path) = self.mount_table.resolve(path);
534        link.protocol.remove_acl(&resolved_path).await?;
535
536        Ok(())
537    }
538
539    /// Override all ACL entries for file or directory at `path`. If only access ACLs are provided,
540    /// default ACLs are maintained. Likewise if only default ACLs are provided, access ACLs are
541    /// maintained.
542    pub async fn set_acl(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
543        let (link, resolved_path) = self.mount_table.resolve(path);
544        link.protocol.set_acl(&resolved_path, acl_spec).await?;
545
546        Ok(())
547    }
548
549    /// Get the ACL status for the file or directory at `path`.
550    pub async fn get_acl_status(&self, path: &str) -> Result<AclStatus> {
551        let (link, resolved_path) = self.mount_table.resolve(path);
552        Ok(link
553            .protocol
554            .get_acl_status(&resolved_path)
555            .await?
556            .result
557            .into())
558    }
559}
560
561impl Default for Client {
562    /// Creates a new HDFS Client based on the fs.defaultFS setting. Panics if the config files fail to load,
563    /// no defaultFS is defined, or the defaultFS is invalid.
564    fn default() -> Self {
565        Self::default_with_config(Default::default()).expect("Failed to create default client")
566    }
567}
568
569pub(crate) struct DirListingIterator {
570    path: String,
571    resolved_path: String,
572    link: MountLink,
573    files_only: bool,
574    partial_listing: VecDeque<HdfsFileStatusProto>,
575    remaining: u32,
576    last_seen: Vec<u8>,
577}
578
579impl DirListingIterator {
580    fn new(path: String, mount_table: &Arc<MountTable>, files_only: bool) -> Self {
581        let (link, resolved_path) = mount_table.resolve(&path);
582
583        DirListingIterator {
584            path,
585            resolved_path,
586            link: link.clone(),
587            files_only,
588            partial_listing: VecDeque::new(),
589            remaining: 1,
590            last_seen: Vec::new(),
591        }
592    }
593
594    async fn get_next_batch(&mut self) -> Result<bool> {
595        let listing = self
596            .link
597            .protocol
598            .get_listing(&self.resolved_path, self.last_seen.clone(), false)
599            .await?;
600
601        if let Some(dir_list) = listing.dir_list {
602            self.last_seen = dir_list
603                .partial_listing
604                .last()
605                .map(|p| p.path.clone())
606                .unwrap_or(Vec::new());
607
608            self.remaining = dir_list.remaining_entries;
609
610            self.partial_listing = dir_list
611                .partial_listing
612                .into_iter()
613                .filter(|s| !self.files_only || s.file_type() != FileType::IsDir)
614                .collect();
615            Ok(!self.partial_listing.is_empty())
616        } else {
617            Err(HdfsError::FileNotFound(self.path.clone()))
618        }
619    }
620
621    pub async fn next(&mut self) -> Option<Result<FileStatus>> {
622        if self.partial_listing.is_empty() && self.remaining > 0 {
623            if let Err(error) = self.get_next_batch().await {
624                self.remaining = 0;
625                return Some(Err(error));
626            }
627        }
628        if let Some(next) = self.partial_listing.pop_front() {
629            Some(Ok(FileStatus::from(next, &self.path)))
630        } else {
631            None
632        }
633    }
634}
635
636pub struct ListStatusIterator {
637    mount_table: Arc<MountTable>,
638    recursive: bool,
639    iters: Arc<tokio::sync::Mutex<Vec<DirListingIterator>>>,
640}
641
642impl ListStatusIterator {
643    fn new(path: String, mount_table: Arc<MountTable>, recursive: bool) -> Self {
644        let initial = DirListingIterator::new(path.clone(), &mount_table, false);
645
646        ListStatusIterator {
647            mount_table,
648            recursive,
649            iters: Arc::new(tokio::sync::Mutex::new(vec![initial])),
650        }
651    }
652
653    pub async fn next(&self) -> Option<Result<FileStatus>> {
654        let mut next_file: Option<Result<FileStatus>> = None;
655        let mut iters = self.iters.lock().await;
656        while next_file.is_none() {
657            if let Some(iter) = iters.last_mut() {
658                if let Some(file_result) = iter.next().await {
659                    if let Ok(file) = file_result {
660                        // Return the directory as the next result, but start traversing into that directory
661                        // next if we're doing a recursive listing
662                        if file.isdir && self.recursive {
663                            iters.push(DirListingIterator::new(
664                                file.path.clone(),
665                                &self.mount_table,
666                                false,
667                            ))
668                        }
669                        next_file = Some(Ok(file));
670                    } else {
671                        // Error, return that as the next element
672                        next_file = Some(file_result)
673                    }
674                } else {
675                    // We've exhausted this directory
676                    iters.pop();
677                }
678            } else {
679                // There's nothing left, just return None
680                break;
681            }
682        }
683
684        next_file
685    }
686
687    pub fn into_stream(self) -> BoxStream<'static, Result<FileStatus>> {
688        let listing = stream::unfold(self, |state| async move {
689            let next = state.next().await;
690            next.map(|n| (n, state))
691        });
692        Box::pin(listing)
693    }
694}
695
696#[derive(Debug)]
697pub struct FileStatus {
698    pub path: String,
699    pub length: usize,
700    pub isdir: bool,
701    pub permission: u16,
702    pub owner: String,
703    pub group: String,
704    pub modification_time: u64,
705    pub access_time: u64,
706    pub replication: Option<u32>,
707    pub blocksize: Option<u64>,
708}
709
710impl FileStatus {
711    fn from(value: HdfsFileStatusProto, base_path: &str) -> Self {
712        let mut path = base_path.trim_end_matches("/").to_string();
713        let relative_path = std::str::from_utf8(&value.path).unwrap();
714        if !relative_path.is_empty() {
715            path.push('/');
716            path.push_str(relative_path);
717        }
718
719        FileStatus {
720            isdir: value.file_type() == FileType::IsDir,
721            path,
722            length: value.length as usize,
723            permission: value.permission.perm as u16,
724            owner: value.owner,
725            group: value.group,
726            modification_time: value.modification_time,
727            access_time: value.access_time,
728            replication: value.block_replication,
729            blocksize: value.blocksize,
730        }
731    }
732}
733
734#[derive(Debug)]
735pub struct ContentSummary {
736    pub length: u64,
737    pub file_count: u64,
738    pub directory_count: u64,
739    pub quota: u64,
740    pub space_consumed: u64,
741    pub space_quota: u64,
742}
743
744impl From<ContentSummaryProto> for ContentSummary {
745    fn from(value: ContentSummaryProto) -> Self {
746        ContentSummary {
747            length: value.length,
748            file_count: value.file_count,
749            directory_count: value.directory_count,
750            quota: value.quota,
751            space_consumed: value.space_consumed,
752            space_quota: value.space_quota,
753        }
754    }
755}
756
757#[cfg(test)]
758mod test {
759    use std::sync::Arc;
760
761    use url::Url;
762
763    use crate::{
764        common::config::Configuration,
765        hdfs::{protocol::NamenodeProtocol, proxy::NameServiceProxy},
766        Client,
767    };
768
769    use super::{MountLink, MountTable};
770
771    fn create_protocol(url: &str) -> Arc<NamenodeProtocol> {
772        let proxy =
773            NameServiceProxy::new(&Url::parse(url).unwrap(), &Configuration::new().unwrap())
774                .unwrap();
775        Arc::new(NamenodeProtocol::new(proxy))
776    }
777
778    #[test]
779    fn test_default_fs() {
780        assert!(Client::default_with_config(
781            vec![("fs.defaultFS".to_string(), "hdfs://test:9000".to_string())]
782                .into_iter()
783                .collect(),
784        )
785        .is_ok());
786
787        assert!(Client::default_with_config(
788            vec![("fs.defaultFS".to_string(), "hdfs://".to_string())]
789                .into_iter()
790                .collect(),
791        )
792        .is_err());
793
794        assert!(Client::new_with_config(
795            "hdfs://",
796            vec![("fs.defaultFS".to_string(), "hdfs://test:9000".to_string())]
797                .into_iter()
798                .collect(),
799        )
800        .is_ok());
801
802        assert!(Client::new_with_config(
803            "hdfs://",
804            vec![("fs.defaultFS".to_string(), "hdfs://".to_string())]
805                .into_iter()
806                .collect(),
807        )
808        .is_err());
809
810        assert!(Client::new_with_config(
811            "hdfs://",
812            vec![("fs.defaultFS".to_string(), "viewfs://test".to_string())]
813                .into_iter()
814                .collect(),
815        )
816        .is_err());
817    }
818
819    #[test]
820    fn test_mount_link_resolve() {
821        let protocol = create_protocol("hdfs://127.0.0.1:9000");
822        let link = MountLink::new("/view", "/hdfs", protocol);
823
824        assert_eq!(link.resolve("/view/dir/file").unwrap(), "/hdfs/dir/file");
825        assert_eq!(link.resolve("/view").unwrap(), "/hdfs");
826        assert!(link.resolve("/hdfs/path").is_none());
827    }
828
829    #[test]
830    fn test_fallback_link() {
831        let protocol = create_protocol("hdfs://127.0.0.1:9000");
832        let link = MountLink::new("", "/hdfs", Arc::clone(&protocol));
833
834        assert_eq!(link.resolve("/path/to/file").unwrap(), "/hdfs/path/to/file");
835        assert_eq!(link.resolve("/").unwrap(), "/hdfs/");
836        assert_eq!(link.resolve("/hdfs/path").unwrap(), "/hdfs/hdfs/path");
837
838        let link = MountLink::new("", "", protocol);
839        assert_eq!(link.resolve("/").unwrap(), "/");
840    }
841
842    #[test]
843    fn test_mount_table_resolve() {
844        let link1 = MountLink::new(
845            "/mount1",
846            "/path1/nested",
847            create_protocol("hdfs://127.0.0.1:9000"),
848        );
849        let link2 = MountLink::new(
850            "/mount2",
851            "/path2",
852            create_protocol("hdfs://127.0.0.1:9001"),
853        );
854        let link3 = MountLink::new(
855            "/mount3/nested",
856            "/path3",
857            create_protocol("hdfs://127.0.0.1:9002"),
858        );
859        let fallback = MountLink::new("/", "/path4", create_protocol("hdfs://127.0.0.1:9003"));
860
861        let mount_table = MountTable {
862            mounts: vec![link1, link2, link3],
863            fallback,
864        };
865
866        // Exact mount path resolves to the exact HDFS path
867        let (link, resolved) = mount_table.resolve("/mount1");
868        assert_eq!(link.viewfs_path, "/mount1");
869        assert_eq!(resolved, "/path1/nested");
870
871        // Trailing slash is treated the same
872        let (link, resolved) = mount_table.resolve("/mount1/");
873        assert_eq!(link.viewfs_path, "/mount1");
874        assert_eq!(resolved, "/path1/nested/");
875
876        // Doesn't do partial matches on a directory name
877        let (link, resolved) = mount_table.resolve("/mount12");
878        assert_eq!(link.viewfs_path, "");
879        assert_eq!(resolved, "/path4/mount12");
880
881        let (link, resolved) = mount_table.resolve("/mount3/file");
882        assert_eq!(link.viewfs_path, "");
883        assert_eq!(resolved, "/path4/mount3/file");
884
885        let (link, resolved) = mount_table.resolve("/mount3/nested/file");
886        assert_eq!(link.viewfs_path, "/mount3/nested");
887        assert_eq!(resolved, "/path3/file");
888    }
889}