
1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
4use futures::stream::BoxStream;
5use futures::{stream, StreamExt};
6use url::Url;
8use crate::acl::{AclEntry, AclStatus};
9use crate::common::config::{self, Configuration};
10use crate::ec::resolve_ec_policy;
11use crate::error::{HdfsError, Result};
12use crate::file::{FileReader, FileWriter};
13use crate::hdfs::protocol::NamenodeProtocol;
14use crate::hdfs::proxy::NameServiceProxy;
15use crate::proto::hdfs::hdfs_file_status_proto::FileType;
17use crate::proto::hdfs::{ContentSummaryProto, HdfsFileStatusProto};
20pub struct WriteOptions {
21    /// Block size. Default is retrieved from the server.
22    pub block_size: Option<u64>,
23    /// Replication factor. Default is retrieved from the server.
24    pub replication: Option<u32>,
25    /// Unix file permission, defaults to 0o644, which is "rw-r--r--" as a Unix permission.
26    /// This is the raw octal value represented in base 10.
27    pub permission: u32,
28    /// Whether to overwrite the file, defaults to false. If true and the
29    /// file does not exist, it will result in an error.
30    pub overwrite: bool,
31    /// Whether to create any missing parent directories, defaults to true. If false
32    /// and the parent directory does not exist, an error will be returned.
33    pub create_parent: bool,
36impl Default for WriteOptions {
37    fn default() -> Self {
38        Self {
39            block_size: None,
40            replication: None,
41            permission: 0o644,
42            overwrite: false,
43            create_parent: true,
44        }
45    }
48impl AsRef<WriteOptions> for WriteOptions {
49    fn as_ref(&self) -> &WriteOptions {
50        self
51    }
54impl WriteOptions {
55    /// Set the block_size for the new file
56    pub fn block_size(mut self, block_size: u64) -> Self {
57        self.block_size = Some(block_size);
58        self
59    }
61    /// Set the replication for the new file
62    pub fn replication(mut self, replication: u32) -> Self {
63        self.replication = Some(replication);
64        self
65    }
67    /// Set the raw octal permission value for the new file
68    pub fn permission(mut self, permission: u32) -> Self {
69        self.permission = permission;
70        self
71    }
73    /// Set whether to overwrite an existing file
74    pub fn overwrite(mut self, overwrite: bool) -> Self {
75        self.overwrite = overwrite;
76        self
77    }
79    /// Set whether to create all missing parent directories
80    pub fn create_parent(mut self, create_parent: bool) -> Self {
81        self.create_parent = create_parent;
82        self
83    }
86#[derive(Debug, Clone)]
87struct MountLink {
88    viewfs_path: String,
89    hdfs_path: String,
90    protocol: Arc<NamenodeProtocol>,
93impl MountLink {
94    fn new(viewfs_path: &str, hdfs_path: &str, protocol: Arc<NamenodeProtocol>) -> Self {
95        // We should never have an empty path, we always want things mounted at root ("/") by default.
96        Self {
97            viewfs_path: viewfs_path.trim_end_matches("/").to_string(),
98            hdfs_path: hdfs_path.trim_end_matches("/").to_string(),
99            protocol,
100        }
101    }
102    /// Convert a viewfs path into a name service path if it matches this link
103    fn resolve(&self, path: &str) -> Option<String> {
104        // Make sure we don't partially match the last component. It either needs to be an exact
105        // match to a viewfs path, or needs to match with a trailing slash
106        if path == self.viewfs_path {
107            Some(self.hdfs_path.clone())
108        } else {
109            path.strip_prefix(&format!("{}/", self.viewfs_path))
110                .map(|relative_path| format!("{}/{}", &self.hdfs_path, relative_path))
111        }
112    }
116struct MountTable {
117    mounts: Vec<MountLink>,
118    fallback: MountLink,
121impl MountTable {
122    fn resolve(&self, src: &str) -> (&MountLink, String) {
123        for link in self.mounts.iter() {
124            if let Some(resolved) = link.resolve(src) {
125                return (link, resolved);
126            }
127        }
128        (&self.fallback, self.fallback.resolve(src).unwrap())
129    }
133pub struct Client {
134    mount_table: Arc<MountTable>,
137impl Client {
138    /// Creates a new HDFS Client. The URL must include the protocol and host, and optionally a port.
139    /// If a port is included, the host is treated as a single NameNode. If no port is included, the
140    /// host is treated as a name service that will be resolved using the HDFS config.
141    pub fn new(url: &str) -> Result<Self> {
142        let parsed_url = Url::parse(url)?;
143        Self::with_config(&parsed_url, Configuration::new()?)
144    }
146    pub fn new_with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
147        let parsed_url = Url::parse(url)?;
148        Self::with_config(&parsed_url, Configuration::new_with_config(config)?)
149    }
151    pub fn default_with_config(config: HashMap<String, String>) -> Result<Self> {
152        let config = Configuration::new_with_config(config)?;
153        Self::with_config(&Self::default_fs(&config)?, config)
154    }
156    fn default_fs(config: &Configuration) -> Result<Url> {
157        let url = config
158            .get(config::DEFAULT_FS)
159            .ok_or(HdfsError::InvalidArgument(format!(
160                "No {} setting found",
161                config::DEFAULT_FS
162            )))?;
163        Ok(Url::parse(&url)?)
164    }
166    fn with_config(url: &Url, config: Configuration) -> Result<Self> {
167        let resolved_url = if !url.has_host() {
168            let default_url = Self::default_fs(&config)?;
169            if url.scheme() != default_url.scheme() || !default_url.has_host() {
170                return Err(HdfsError::InvalidArgument(
171                    "URL must contain a host".to_string(),
172                ));
173            }
174            default_url
175        } else {
176            url.clone()
177        };
179        let mount_table = match url.scheme() {
180            "hdfs" => {
181                let proxy = NameServiceProxy::new(&resolved_url, &config)?;
182                let protocol = Arc::new(NamenodeProtocol::new(proxy));
184                MountTable {
185                    mounts: Vec::new(),
186                    fallback: MountLink::new("/", "/", protocol),
187                }
188            }
189            "viewfs" => Self::build_mount_table(resolved_url.host_str().unwrap(), &config)?,
190            _ => {
191                return Err(HdfsError::InvalidArgument(
192                    "Only `hdfs` and `viewfs` schemes are supported".to_string(),
193                ))
194            }
195        };
197        Ok(Self {
198            mount_table: Arc::new(mount_table),
199        })
200    }
202    fn build_mount_table(host: &str, config: &Configuration) -> Result<MountTable> {
203        let mut mounts: Vec<MountLink> = Vec::new();
204        let mut fallback: Option<MountLink> = None;
206        for (viewfs_path, hdfs_url) in config.get_mount_table(host).iter() {
207            let url = Url::parse(hdfs_url)?;
208            if !url.has_host() {
209                return Err(HdfsError::InvalidArgument(
210                    "URL must contain a host".to_string(),
211                ));
212            }
213            if url.scheme() != "hdfs" {
214                return Err(HdfsError::InvalidArgument(
215                    "Only hdfs mounts are supported for viewfs".to_string(),
216                ));
217            }
218            let proxy = NameServiceProxy::new(&url, config)?;
219            let protocol = Arc::new(NamenodeProtocol::new(proxy));
221            if let Some(prefix) = viewfs_path {
222                mounts.push(MountLink::new(prefix, url.path(), protocol));
223            } else {
224                if fallback.is_some() {
225                    return Err(HdfsError::InvalidArgument(
226                        "Multiple viewfs fallback links found".to_string(),
227                    ));
228                }
229                fallback = Some(MountLink::new("/", url.path(), protocol));
230            }
231        }
233        if let Some(fallback) = fallback {
234            // Sort the mount table from longest viewfs path to shortest. This makes sure more specific paths are considered first.
235            mounts.sort_by_key(|m| m.viewfs_path.chars().filter(|c| *c == '/').count());
236            mounts.reverse();
238            Ok(MountTable { mounts, fallback })
239        } else {
240            Err(HdfsError::InvalidArgument(
241                "No viewfs fallback mount found".to_string(),
242            ))
243        }
244    }
246    /// Retrieve the file status for the file at `path`.
247    pub async fn get_file_info(&self, path: &str) -> Result<FileStatus> {
248        let (link, resolved_path) = self.mount_table.resolve(path);
249        match link.protocol.get_file_info(&resolved_path).await?.fs {
250            Some(status) => Ok(FileStatus::from(status, path)),
251            None => Err(HdfsError::FileNotFound(path.to_string())),
252        }
253    }
255    /// Retrives a list of all files in directories located at `path`. Wrapper around `list_status_iter` that
256    /// returns Err if any part of the stream fails, or Ok if all file statuses were found successfully.
257    pub async fn list_status(&self, path: &str, recursive: bool) -> Result<Vec<FileStatus>> {
258        let iter = self.list_status_iter(path, recursive);
259        let statuses = iter
260            .into_stream()
261            .collect::<Vec<Result<FileStatus>>>()
262            .await;
264        let mut resolved_statues = Vec::<FileStatus>::with_capacity(statuses.len());
265        for status in statuses.into_iter() {
266            resolved_statues.push(status?);
267        }
269        Ok(resolved_statues)
270    }
272    /// Retrives an iterator of all files in directories located at `path`.
273    pub fn list_status_iter(&self, path: &str, recursive: bool) -> ListStatusIterator {
274        ListStatusIterator::new(path.to_string(), Arc::clone(&self.mount_table), recursive)
275    }
277    /// Opens a file reader for the file at `path`. Path should not include a scheme.
278    pub async fn read(&self, path: &str) -> Result<FileReader> {
279        let (link, resolved_path) = self.mount_table.resolve(path);
280        let located_info = link.protocol.get_located_file_info(&resolved_path).await?;
281        match located_info.fs {
282            Some(mut status) => {
283                let ec_schema = if let Some(ec_policy) = status.ec_policy.as_ref() {
284                    Some(resolve_ec_policy(ec_policy)?)
285                } else {
286                    None
287                };
289                if status.file_encryption_info.is_some() {
290                    return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
291                }
292                if status.file_type() == FileType::IsDir {
293                    return Err(HdfsError::IsADirectoryError(path.to_string()));
294                }
296                if let Some(locations) = status.locations.take() {
297                    Ok(FileReader::new(
298                        Arc::clone(&link.protocol),
299                        status,
300                        locations,
301                        ec_schema,
302                    ))
303                } else {
304                    Err(HdfsError::BlocksNotFound(path.to_string()))
305                }
306            }
307            None => Err(HdfsError::FileNotFound(path.to_string())),
308        }
309    }
311    /// Opens a new file for writing. See [WriteOptions] for options and behavior for different
312    /// scenarios.
313    pub async fn create(
314        &self,
315        src: &str,
316        write_options: impl AsRef<WriteOptions>,
317    ) -> Result<FileWriter> {
318        let write_options = write_options.as_ref();
320        let (link, resolved_path) = self.mount_table.resolve(src);
322        let create_response = link
323            .protocol
324            .create(
325                &resolved_path,
326                write_options.permission,
327                write_options.overwrite,
328                write_options.create_parent,
329                write_options.replication,
330                write_options.block_size,
331            )
332            .await?;
334        match create_response.fs {
335            Some(status) => {
336                if status.file_encryption_info.is_some() {
337                    let _ = self.delete(src, false).await;
338                    return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
339                }
341                Ok(FileWriter::new(
342                    Arc::clone(&link.protocol),
343                    resolved_path,
344                    status,
345                    None,
346                ))
347            }
348            None => Err(HdfsError::FileNotFound(src.to_string())),
349        }
350    }
352    fn needs_new_block(class: &str, msg: &str) -> bool {
353        class == "java.lang.UnsupportedOperationException" && msg.contains("NEW_BLOCK")
354    }
356    /// Opens an existing file for appending. An Err will be returned if the file does not exist. If the
357    /// file is replicated, the current block will be appended to until it is full. If the file is erasure
358    /// coded, a new block will be created.
359    pub async fn append(&self, src: &str) -> Result<FileWriter> {
360        let (link, resolved_path) = self.mount_table.resolve(src);
362        // Assume the file is replicated and try to append to the current block. If the file is
363        // erasure coded, then try again by appending to a new block.
364        let append_response = match link.protocol.append(&resolved_path, false).await {
365            Err(HdfsError::RPCError(class, msg)) if Self::needs_new_block(&class, &msg) => {
366                link.protocol.append(&resolved_path, true).await?
367            }
368            resp => resp?,
369        };
371        match append_response.stat {
372            Some(status) => {
373                if status.file_encryption_info.is_some() {
374                    let _ = link
375                        .protocol
376                        .complete(src, append_response.block.map(|b| b.b), status.file_id)
377                        .await;
378                    return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
379                }
381                Ok(FileWriter::new(
382                    Arc::clone(&link.protocol),
383                    resolved_path,
384                    status,
385                    append_response.block,
386                ))
387            }
388            None => Err(HdfsError::FileNotFound(src.to_string())),
389        }
390    }
392    /// Create a new directory at `path` with the given `permission`.
393    ///
394    /// `permission` is the raw octal value representing the Unix style permission. For example, to
395    /// set 755 (`rwxr-x-rx`) permissions, use 0o755.
396    ///
397    /// If `create_parent` is true, any missing parent directories will be created as well,
398    /// otherwise an error will be returned if the parent directory doesn't already exist.
399    pub async fn mkdirs(&self, path: &str, permission: u32, create_parent: bool) -> Result<()> {
400        let (link, resolved_path) = self.mount_table.resolve(path);
401        link.protocol
402            .mkdirs(&resolved_path, permission, create_parent)
403            .await
404            .map(|_| ())
405    }
407    /// Renames `src` to `dst`. Returns Ok(()) on success, and Err otherwise.
408    pub async fn rename(&self, src: &str, dst: &str, overwrite: bool) -> Result<()> {
409        let (src_link, src_resolved_path) = self.mount_table.resolve(src);
410        let (dst_link, dst_resolved_path) = self.mount_table.resolve(dst);
411        if src_link.viewfs_path == dst_link.viewfs_path {
412            src_link
413                .protocol
414                .rename(&src_resolved_path, &dst_resolved_path, overwrite)
415                .await
416                .map(|_| ())
417        } else {
418            Err(HdfsError::InvalidArgument(
419                "Cannot rename across different name services".to_string(),
420            ))
421        }
422    }
424    /// Deletes the file or directory at `path`. If `recursive` is false and `path` is a non-empty
425    /// directory, this will fail. Returns `Ok(true)` if it was successfully deleted.
426    pub async fn delete(&self, path: &str, recursive: bool) -> Result<bool> {
427        let (link, resolved_path) = self.mount_table.resolve(path);
428        link.protocol
429            .delete(&resolved_path, recursive)
430            .await
431            .map(|r| r.result)
432    }
434    /// Sets the modified and access times for a file. Times should be in milliseconds from the epoch.
435    pub async fn set_times(&self, path: &str, mtime: u64, atime: u64) -> Result<()> {
436        let (link, resolved_path) = self.mount_table.resolve(path);
437        link.protocol
438            .set_times(&resolved_path, mtime, atime)
439            .await?;
440        Ok(())
441    }
443    /// Optionally sets the owner and group for a file.
444    pub async fn set_owner(
445        &self,
446        path: &str,
447        owner: Option<&str>,
448        group: Option<&str>,
449    ) -> Result<()> {
450        let (link, resolved_path) = self.mount_table.resolve(path);
451        link.protocol
452            .set_owner(&resolved_path, owner, group)
453            .await?;
454        Ok(())
455    }
457    /// Sets permissions for a file. Permission should be an octal number reprenting the Unix style
458    /// permission.
459    ///
460    /// For example, to set permissions to rwxr-xr-x:
461    /// ```rust
462    /// # async fn func() {
463    /// # let client = hdfs_native::Client::new("localhost:9000").unwrap();
464    /// client.set_permission("/path", 0o755).await.unwrap();
465    /// }
466    /// ```
467    pub async fn set_permission(&self, path: &str, permission: u32) -> Result<()> {
468        let (link, resolved_path) = self.mount_table.resolve(path);
469        link.protocol
470            .set_permission(&resolved_path, permission)
471            .await?;
472        Ok(())
473    }
475    /// Sets the replication for a file.
476    pub async fn set_replication(&self, path: &str, replication: u32) -> Result<bool> {
477        let (link, resolved_path) = self.mount_table.resolve(path);
478        let result = link
479            .protocol
480            .set_replication(&resolved_path, replication)
481            .await?
482            .result;
484        Ok(result)
485    }
487    /// Gets a content summary for a file or directory rooted at `path`.
488    pub async fn get_content_summary(&self, path: &str) -> Result<ContentSummary> {
489        let (link, resolved_path) = self.mount_table.resolve(path);
490        let result = link
491            .protocol
492            .get_content_summary(&resolved_path)
493            .await?
494            .summary;
496        Ok(result.into())
497    }
499    /// Update ACL entries for file or directory at `path`. Existing entries will remain.
500    pub async fn modify_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
501        let (link, resolved_path) = self.mount_table.resolve(path);
502        link.protocol
503            .modify_acl_entries(&resolved_path, acl_spec)
504            .await?;
506        Ok(())
507    }
509    /// Remove specific ACL entries for file or directory at `path`.
510    pub async fn remove_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
511        let (link, resolved_path) = self.mount_table.resolve(path);
512        link.protocol
513            .remove_acl_entries(&resolved_path, acl_spec)
514            .await?;
516        Ok(())
517    }
519    /// Remove all default ACLs for file or directory at `path`.
520    pub async fn remove_default_acl(&self, path: &str) -> Result<()> {
521        let (link, resolved_path) = self.mount_table.resolve(path);
522        link.protocol.remove_default_acl(&resolved_path).await?;
524        Ok(())
525    }
527    /// Remove all ACL entries for file or directory at `path`.
528    pub async fn remove_acl(&self, path: &str) -> Result<()> {
529        let (link, resolved_path) = self.mount_table.resolve(path);
530        link.protocol.remove_acl(&resolved_path).await?;
532        Ok(())
533    }
535    /// Override all ACL entries for file or directory at `path`. If only access ACLs are provided,
536    /// default ACLs are maintained. Likewise if only default ACLs are provided, access ACLs are
537    /// maintained.
538    pub async fn set_acl(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
539        let (link, resolved_path) = self.mount_table.resolve(path);
540        link.protocol.set_acl(&resolved_path, acl_spec).await?;
542        Ok(())
543    }
545    /// Get the ACL status for the file or directory at `path`.
546    pub async fn get_acl_status(&self, path: &str) -> Result<AclStatus> {
547        let (link, resolved_path) = self.mount_table.resolve(path);
548        Ok(link
549            .protocol
550            .get_acl_status(&resolved_path)
551            .await?
552            .result
553            .into())
554    }
557impl Default for Client {
558    /// Creates a new HDFS Client based on the fs.defaultFS setting. Panics if the config files fail to load,
559    /// no defaultFS is defined, or the defaultFS is invalid.
560    fn default() -> Self {
561        Self::default_with_config(Default::default()).expect("Failed to create default client")
562    }
565pub(crate) struct DirListingIterator {
566    path: String,
567    resolved_path: String,
568    link: MountLink,
569    files_only: bool,
570    partial_listing: VecDeque<HdfsFileStatusProto>,
571    remaining: u32,
572    last_seen: Vec<u8>,
575impl DirListingIterator {
576    fn new(path: String, mount_table: &Arc<MountTable>, files_only: bool) -> Self {
577        let (link, resolved_path) = mount_table.resolve(&path);
579        DirListingIterator {
580            path,
581            resolved_path,
582            link: link.clone(),
583            files_only,
584            partial_listing: VecDeque::new(),
585            remaining: 1,
586            last_seen: Vec::new(),
587        }
588    }
590    async fn get_next_batch(&mut self) -> Result<bool> {
591        let listing = self
592            .link
593            .protocol
594            .get_listing(&self.resolved_path, self.last_seen.clone(), false)
595            .await?;
597        if let Some(dir_list) = listing.dir_list {
598            self.last_seen = dir_list
599                .partial_listing
600                .last()
601                .map(|p| p.path.clone())
602                .unwrap_or(Vec::new());
604            self.remaining = dir_list.remaining_entries;
606            self.partial_listing = dir_list
607                .partial_listing
608                .into_iter()
609                .filter(|s| !self.files_only || s.file_type() != FileType::IsDir)
610                .collect();
611            Ok(!self.partial_listing.is_empty())
612        } else {
613            Err(HdfsError::FileNotFound(self.path.clone()))
614        }
615    }
617    pub async fn next(&mut self) -> Option<Result<FileStatus>> {
618        if self.partial_listing.is_empty() && self.remaining > 0 {
619            if let Err(error) = self.get_next_batch().await {
620                self.remaining = 0;
621                return Some(Err(error));
622            }
623        }
624        if let Some(next) = self.partial_listing.pop_front() {
625            Some(Ok(FileStatus::from(next, &self.path)))
626        } else {
627            None
628        }
629    }
632pub struct ListStatusIterator {
633    mount_table: Arc<MountTable>,
634    recursive: bool,
635    iters: Arc<tokio::sync::Mutex<Vec<DirListingIterator>>>,
638impl ListStatusIterator {
639    fn new(path: String, mount_table: Arc<MountTable>, recursive: bool) -> Self {
640        let initial = DirListingIterator::new(path.clone(), &mount_table, false);
642        ListStatusIterator {
643            mount_table,
644            recursive,
645            iters: Arc::new(tokio::sync::Mutex::new(vec![initial])),
646        }
647    }
649    pub async fn next(&self) -> Option<Result<FileStatus>> {
650        let mut next_file: Option<Result<FileStatus>> = None;
651        let mut iters = self.iters.lock().await;
652        while next_file.is_none() {
653            if let Some(iter) = iters.last_mut() {
654                if let Some(file_result) = iter.next().await {
655                    if let Ok(file) = file_result {
656                        // Return the directory as the next result, but start traversing into that directory
657                        // next if we're doing a recursive listing
658                        if file.isdir && self.recursive {
659                            iters.push(DirListingIterator::new(
660                                file.path.clone(),
661                                &self.mount_table,
662                                false,
663                            ))
664                        }
665                        next_file = Some(Ok(file));
666                    } else {
667                        // Error, return that as the next element
668                        next_file = Some(file_result)
669                    }
670                } else {
671                    // We've exhausted this directory
672                    iters.pop();
673                }
674            } else {
675                // There's nothing left, just return None
676                break;
677            }
678        }
680        next_file
681    }
683    pub fn into_stream(self) -> BoxStream<'static, Result<FileStatus>> {
684        let listing = stream::unfold(self, |state| async move {
685            let next = state.next().await;
686            next.map(|n| (n, state))
687        });
688        Box::pin(listing)
689    }
693pub struct FileStatus {
694    pub path: String,
695    pub length: usize,
696    pub isdir: bool,
697    pub permission: u16,
698    pub owner: String,
699    pub group: String,
700    pub modification_time: u64,
701    pub access_time: u64,
702    pub replication: Option<u32>,
703    pub blocksize: Option<u64>,
706impl FileStatus {
707    fn from(value: HdfsFileStatusProto, base_path: &str) -> Self {
708        let mut path = base_path.trim_end_matches("/").to_string();
709        let relative_path = std::str::from_utf8(&value.path).unwrap();
710        if !relative_path.is_empty() {
711            path.push('/');
712            path.push_str(relative_path);
713        }
715        FileStatus {
716            isdir: value.file_type() == FileType::IsDir,
717            path,
718            length: value.length as usize,
719            permission: value.permission.perm as u16,
720            owner: value.owner,
721            group: value.group,
722            modification_time: value.modification_time,
723            access_time: value.access_time,
724            replication: value.block_replication,
725            blocksize: value.blocksize,
726        }
727    }
731pub struct ContentSummary {
732    pub length: u64,
733    pub file_count: u64,
734    pub directory_count: u64,
735    pub quota: u64,
736    pub space_consumed: u64,
737    pub space_quota: u64,
740impl From<ContentSummaryProto> for ContentSummary {
741    fn from(value: ContentSummaryProto) -> Self {
742        ContentSummary {
743            length: value.length,
744            file_count: value.file_count,
745            directory_count: value.directory_count,
746            quota: value.quota,
747            space_consumed: value.space_consumed,
748            space_quota: value.space_quota,
749        }
750    }
754mod test {
755    use std::sync::Arc;
757    use url::Url;
759    use crate::{
760        common::config::Configuration,
761        hdfs::{protocol::NamenodeProtocol, proxy::NameServiceProxy},
762        Client,
763    };
765    use super::{MountLink, MountTable};
767    fn create_protocol(url: &str) -> Arc<NamenodeProtocol> {
768        let proxy =
769            NameServiceProxy::new(&Url::parse(url).unwrap(), &Configuration::new().unwrap())
770                .unwrap();
771        Arc::new(NamenodeProtocol::new(proxy))
772    }
774    #[test]
775    fn test_default_fs() {
776        assert!(Client::default_with_config(
777            vec![("fs.defaultFS".to_string(), "hdfs://test:9000".to_string())]
778                .into_iter()
779                .collect(),
780        )
781        .is_ok());
783        assert!(Client::default_with_config(
784            vec![("fs.defaultFS".to_string(), "hdfs://".to_string())]
785                .into_iter()
786                .collect(),
787        )
788        .is_err());
790        assert!(Client::new_with_config(
791            "hdfs://",
792            vec![("fs.defaultFS".to_string(), "hdfs://test:9000".to_string())]
793                .into_iter()
794                .collect(),
795        )
796        .is_ok());
798        assert!(Client::new_with_config(
799            "hdfs://",
800            vec![("fs.defaultFS".to_string(), "hdfs://".to_string())]
801                .into_iter()
802                .collect(),
803        )
804        .is_err());
806        assert!(Client::new_with_config(
807            "hdfs://",
808            vec![("fs.defaultFS".to_string(), "viewfs://test".to_string())]
809                .into_iter()
810                .collect(),
811        )
812        .is_err());
813    }
815    #[test]
816    fn test_mount_link_resolve() {
817        let protocol = create_protocol("hdfs://");
818        let link = MountLink::new("/view", "/hdfs", protocol);
820        assert_eq!(link.resolve("/view/dir/file").unwrap(), "/hdfs/dir/file");
821        assert_eq!(link.resolve("/view").unwrap(), "/hdfs");
822        assert!(link.resolve("/hdfs/path").is_none());
823    }
825    #[test]
826    fn test_fallback_link() {
827        let protocol = create_protocol("hdfs://");
828        let link = MountLink::new("", "/hdfs", Arc::clone(&protocol));
830        assert_eq!(link.resolve("/path/to/file").unwrap(), "/hdfs/path/to/file");
831        assert_eq!(link.resolve("/").unwrap(), "/hdfs/");
832        assert_eq!(link.resolve("/hdfs/path").unwrap(), "/hdfs/hdfs/path");
834        let link = MountLink::new("", "", protocol);
835        assert_eq!(link.resolve("/").unwrap(), "/");
836    }
838    #[test]
839    fn test_mount_table_resolve() {
840        let link1 = MountLink::new(
841            "/mount1",
842            "/path1/nested",
843            create_protocol("hdfs://"),
844        );
845        let link2 = MountLink::new(
846            "/mount2",
847            "/path2",
848            create_protocol("hdfs://"),
849        );
850        let link3 = MountLink::new(
851            "/mount3/nested",
852            "/path3",
853            create_protocol("hdfs://"),
854        );
855        let fallback = MountLink::new("/", "/path4", create_protocol("hdfs://"));
857        let mount_table = MountTable {
858            mounts: vec![link1, link2, link3],
859            fallback,
860        };
862        // Exact mount path resolves to the exact HDFS path
863        let (link, resolved) = mount_table.resolve("/mount1");
864        assert_eq!(link.viewfs_path, "/mount1");
865        assert_eq!(resolved, "/path1/nested");
867        // Trailing slash is treated the same
868        let (link, resolved) = mount_table.resolve("/mount1/");
869        assert_eq!(link.viewfs_path, "/mount1");
870        assert_eq!(resolved, "/path1/nested/");
872        // Doesn't do partial matches on a directory name
873        let (link, resolved) = mount_table.resolve("/mount12");
874        assert_eq!(link.viewfs_path, "");
875        assert_eq!(resolved, "/path4/mount12");
877        let (link, resolved) = mount_table.resolve("/mount3/file");
878        assert_eq!(link.viewfs_path, "");
879        assert_eq!(resolved, "/path4/mount3/file");
881        let (link, resolved) = mount_table.resolve("/mount3/nested/file");
882        assert_eq!(link.viewfs_path, "/mount3/nested");
883        assert_eq!(resolved, "/path3/file");
884    }