1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3
4use futures::stream::BoxStream;
5use futures::{stream, StreamExt};
6use url::Url;
7
8use crate::acl::{AclEntry, AclStatus};
9use crate::common::config::{self, Configuration};
10use crate::ec::resolve_ec_policy;
11use crate::error::{HdfsError, Result};
12use crate::file::{FileReader, FileWriter};
13use crate::hdfs::protocol::NamenodeProtocol;
14use crate::hdfs::proxy::NameServiceProxy;
15use crate::proto::hdfs::hdfs_file_status_proto::FileType;
16
17use crate::proto::hdfs::{ContentSummaryProto, HdfsFileStatusProto};
18
19#[derive(Clone)]
20pub struct WriteOptions {
21 pub block_size: Option<u64>,
23 pub replication: Option<u32>,
25 pub permission: u32,
28 pub overwrite: bool,
31 pub create_parent: bool,
34}
35
36impl Default for WriteOptions {
37 fn default() -> Self {
38 Self {
39 block_size: None,
40 replication: None,
41 permission: 0o644,
42 overwrite: false,
43 create_parent: true,
44 }
45 }
46}
47
48impl AsRef<WriteOptions> for WriteOptions {
49 fn as_ref(&self) -> &WriteOptions {
50 self
51 }
52}
53
54impl WriteOptions {
55 pub fn block_size(mut self, block_size: u64) -> Self {
57 self.block_size = Some(block_size);
58 self
59 }
60
61 pub fn replication(mut self, replication: u32) -> Self {
63 self.replication = Some(replication);
64 self
65 }
66
67 pub fn permission(mut self, permission: u32) -> Self {
69 self.permission = permission;
70 self
71 }
72
73 pub fn overwrite(mut self, overwrite: bool) -> Self {
75 self.overwrite = overwrite;
76 self
77 }
78
79 pub fn create_parent(mut self, create_parent: bool) -> Self {
81 self.create_parent = create_parent;
82 self
83 }
84}
85
86#[derive(Debug, Clone)]
87struct MountLink {
88 viewfs_path: String,
89 hdfs_path: String,
90 protocol: Arc<NamenodeProtocol>,
91}
92
93impl MountLink {
94 fn new(viewfs_path: &str, hdfs_path: &str, protocol: Arc<NamenodeProtocol>) -> Self {
95 Self {
97 viewfs_path: viewfs_path.trim_end_matches("/").to_string(),
98 hdfs_path: hdfs_path.trim_end_matches("/").to_string(),
99 protocol,
100 }
101 }
102 fn resolve(&self, path: &str) -> Option<String> {
104 if path == self.viewfs_path {
107 Some(self.hdfs_path.clone())
108 } else {
109 path.strip_prefix(&format!("{}/", self.viewfs_path))
110 .map(|relative_path| format!("{}/{}", &self.hdfs_path, relative_path))
111 }
112 }
113}
114
115#[derive(Debug)]
116struct MountTable {
117 mounts: Vec<MountLink>,
118 fallback: MountLink,
119}
120
121impl MountTable {
122 fn resolve(&self, src: &str) -> (&MountLink, String) {
123 for link in self.mounts.iter() {
124 if let Some(resolved) = link.resolve(src) {
125 return (link, resolved);
126 }
127 }
128 (&self.fallback, self.fallback.resolve(src).unwrap())
129 }
130}
131
132#[derive(Debug)]
133pub struct Client {
134 mount_table: Arc<MountTable>,
135}
136
137impl Client {
138 pub fn new(url: &str) -> Result<Self> {
142 let parsed_url = Url::parse(url)?;
143 Self::with_config(&parsed_url, Configuration::new()?)
144 }
145
146 pub fn new_with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
147 let parsed_url = Url::parse(url)?;
148 Self::with_config(&parsed_url, Configuration::new_with_config(config)?)
149 }
150
151 pub fn default_with_config(config: HashMap<String, String>) -> Result<Self> {
152 let config = Configuration::new_with_config(config)?;
153 Self::with_config(&Self::default_fs(&config)?, config)
154 }
155
156 fn default_fs(config: &Configuration) -> Result<Url> {
157 let url = config
158 .get(config::DEFAULT_FS)
159 .ok_or(HdfsError::InvalidArgument(format!(
160 "No {} setting found",
161 config::DEFAULT_FS
162 )))?;
163 Ok(Url::parse(&url)?)
164 }
165
166 fn with_config(url: &Url, config: Configuration) -> Result<Self> {
167 let resolved_url = if !url.has_host() {
168 let default_url = Self::default_fs(&config)?;
169 if url.scheme() != default_url.scheme() || !default_url.has_host() {
170 return Err(HdfsError::InvalidArgument(
171 "URL must contain a host".to_string(),
172 ));
173 }
174 default_url
175 } else {
176 url.clone()
177 };
178
179 let mount_table = match url.scheme() {
180 "hdfs" => {
181 let proxy = NameServiceProxy::new(&resolved_url, &config)?;
182 let protocol = Arc::new(NamenodeProtocol::new(proxy));
183
184 MountTable {
185 mounts: Vec::new(),
186 fallback: MountLink::new("/", "/", protocol),
187 }
188 }
189 "viewfs" => Self::build_mount_table(resolved_url.host_str().unwrap(), &config)?,
190 _ => {
191 return Err(HdfsError::InvalidArgument(
192 "Only `hdfs` and `viewfs` schemes are supported".to_string(),
193 ))
194 }
195 };
196
197 Ok(Self {
198 mount_table: Arc::new(mount_table),
199 })
200 }
201
202 fn build_mount_table(host: &str, config: &Configuration) -> Result<MountTable> {
203 let mut mounts: Vec<MountLink> = Vec::new();
204 let mut fallback: Option<MountLink> = None;
205
206 for (viewfs_path, hdfs_url) in config.get_mount_table(host).iter() {
207 let url = Url::parse(hdfs_url)?;
208 if !url.has_host() {
209 return Err(HdfsError::InvalidArgument(
210 "URL must contain a host".to_string(),
211 ));
212 }
213 if url.scheme() != "hdfs" {
214 return Err(HdfsError::InvalidArgument(
215 "Only hdfs mounts are supported for viewfs".to_string(),
216 ));
217 }
218 let proxy = NameServiceProxy::new(&url, config)?;
219 let protocol = Arc::new(NamenodeProtocol::new(proxy));
220
221 if let Some(prefix) = viewfs_path {
222 mounts.push(MountLink::new(prefix, url.path(), protocol));
223 } else {
224 if fallback.is_some() {
225 return Err(HdfsError::InvalidArgument(
226 "Multiple viewfs fallback links found".to_string(),
227 ));
228 }
229 fallback = Some(MountLink::new("/", url.path(), protocol));
230 }
231 }
232
233 if let Some(fallback) = fallback {
234 mounts.sort_by_key(|m| m.viewfs_path.chars().filter(|c| *c == '/').count());
236 mounts.reverse();
237
238 Ok(MountTable { mounts, fallback })
239 } else {
240 Err(HdfsError::InvalidArgument(
241 "No viewfs fallback mount found".to_string(),
242 ))
243 }
244 }
245
246 pub async fn get_file_info(&self, path: &str) -> Result<FileStatus> {
248 let (link, resolved_path) = self.mount_table.resolve(path);
249 match link.protocol.get_file_info(&resolved_path).await?.fs {
250 Some(status) => Ok(FileStatus::from(status, path)),
251 None => Err(HdfsError::FileNotFound(path.to_string())),
252 }
253 }
254
255 pub async fn list_status(&self, path: &str, recursive: bool) -> Result<Vec<FileStatus>> {
258 let iter = self.list_status_iter(path, recursive);
259 let statuses = iter
260 .into_stream()
261 .collect::<Vec<Result<FileStatus>>>()
262 .await;
263
264 let mut resolved_statues = Vec::<FileStatus>::with_capacity(statuses.len());
265 for status in statuses.into_iter() {
266 resolved_statues.push(status?);
267 }
268
269 Ok(resolved_statues)
270 }
271
272 pub fn list_status_iter(&self, path: &str, recursive: bool) -> ListStatusIterator {
274 ListStatusIterator::new(path.to_string(), Arc::clone(&self.mount_table), recursive)
275 }
276
277 pub async fn read(&self, path: &str) -> Result<FileReader> {
279 let (link, resolved_path) = self.mount_table.resolve(path);
280 let located_info = link.protocol.get_located_file_info(&resolved_path).await?;
281 match located_info.fs {
282 Some(mut status) => {
283 let ec_schema = if let Some(ec_policy) = status.ec_policy.as_ref() {
284 Some(resolve_ec_policy(ec_policy)?)
285 } else {
286 None
287 };
288
289 if status.file_encryption_info.is_some() {
290 return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
291 }
292 if status.file_type() == FileType::IsDir {
293 return Err(HdfsError::IsADirectoryError(path.to_string()));
294 }
295
296 if let Some(locations) = status.locations.take() {
297 Ok(FileReader::new(
298 Arc::clone(&link.protocol),
299 status,
300 locations,
301 ec_schema,
302 ))
303 } else {
304 Err(HdfsError::BlocksNotFound(path.to_string()))
305 }
306 }
307 None => Err(HdfsError::FileNotFound(path.to_string())),
308 }
309 }
310
311 pub async fn create(
314 &self,
315 src: &str,
316 write_options: impl AsRef<WriteOptions>,
317 ) -> Result<FileWriter> {
318 let write_options = write_options.as_ref();
319
320 let (link, resolved_path) = self.mount_table.resolve(src);
321
322 let create_response = link
323 .protocol
324 .create(
325 &resolved_path,
326 write_options.permission,
327 write_options.overwrite,
328 write_options.create_parent,
329 write_options.replication,
330 write_options.block_size,
331 )
332 .await?;
333
334 match create_response.fs {
335 Some(status) => {
336 if status.file_encryption_info.is_some() {
337 let _ = self.delete(src, false).await;
338 return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
339 }
340
341 Ok(FileWriter::new(
342 Arc::clone(&link.protocol),
343 resolved_path,
344 status,
345 None,
346 ))
347 }
348 None => Err(HdfsError::FileNotFound(src.to_string())),
349 }
350 }
351
352 fn needs_new_block(class: &str, msg: &str) -> bool {
353 class == "java.lang.UnsupportedOperationException" && msg.contains("NEW_BLOCK")
354 }
355
356 pub async fn append(&self, src: &str) -> Result<FileWriter> {
360 let (link, resolved_path) = self.mount_table.resolve(src);
361
362 let append_response = match link.protocol.append(&resolved_path, false).await {
365 Err(HdfsError::RPCError(class, msg)) if Self::needs_new_block(&class, &msg) => {
366 link.protocol.append(&resolved_path, true).await?
367 }
368 resp => resp?,
369 };
370
371 match append_response.stat {
372 Some(status) => {
373 if status.file_encryption_info.is_some() {
374 let _ = link
375 .protocol
376 .complete(src, append_response.block.map(|b| b.b), status.file_id)
377 .await;
378 return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
379 }
380
381 Ok(FileWriter::new(
382 Arc::clone(&link.protocol),
383 resolved_path,
384 status,
385 append_response.block,
386 ))
387 }
388 None => Err(HdfsError::FileNotFound(src.to_string())),
389 }
390 }
391
392 pub async fn mkdirs(&self, path: &str, permission: u32, create_parent: bool) -> Result<()> {
400 let (link, resolved_path) = self.mount_table.resolve(path);
401 link.protocol
402 .mkdirs(&resolved_path, permission, create_parent)
403 .await
404 .map(|_| ())
405 }
406
407 pub async fn rename(&self, src: &str, dst: &str, overwrite: bool) -> Result<()> {
409 let (src_link, src_resolved_path) = self.mount_table.resolve(src);
410 let (dst_link, dst_resolved_path) = self.mount_table.resolve(dst);
411 if src_link.viewfs_path == dst_link.viewfs_path {
412 src_link
413 .protocol
414 .rename(&src_resolved_path, &dst_resolved_path, overwrite)
415 .await
416 .map(|_| ())
417 } else {
418 Err(HdfsError::InvalidArgument(
419 "Cannot rename across different name services".to_string(),
420 ))
421 }
422 }
423
424 pub async fn delete(&self, path: &str, recursive: bool) -> Result<bool> {
427 let (link, resolved_path) = self.mount_table.resolve(path);
428 link.protocol
429 .delete(&resolved_path, recursive)
430 .await
431 .map(|r| r.result)
432 }
433
434 pub async fn set_times(&self, path: &str, mtime: u64, atime: u64) -> Result<()> {
436 let (link, resolved_path) = self.mount_table.resolve(path);
437 link.protocol
438 .set_times(&resolved_path, mtime, atime)
439 .await?;
440 Ok(())
441 }
442
443 pub async fn set_owner(
445 &self,
446 path: &str,
447 owner: Option<&str>,
448 group: Option<&str>,
449 ) -> Result<()> {
450 let (link, resolved_path) = self.mount_table.resolve(path);
451 link.protocol
452 .set_owner(&resolved_path, owner, group)
453 .await?;
454 Ok(())
455 }
456
457 pub async fn set_permission(&self, path: &str, permission: u32) -> Result<()> {
468 let (link, resolved_path) = self.mount_table.resolve(path);
469 link.protocol
470 .set_permission(&resolved_path, permission)
471 .await?;
472 Ok(())
473 }
474
475 pub async fn set_replication(&self, path: &str, replication: u32) -> Result<bool> {
477 let (link, resolved_path) = self.mount_table.resolve(path);
478 let result = link
479 .protocol
480 .set_replication(&resolved_path, replication)
481 .await?
482 .result;
483
484 Ok(result)
485 }
486
487 pub async fn get_content_summary(&self, path: &str) -> Result<ContentSummary> {
489 let (link, resolved_path) = self.mount_table.resolve(path);
490 let result = link
491 .protocol
492 .get_content_summary(&resolved_path)
493 .await?
494 .summary;
495
496 Ok(result.into())
497 }
498
499 pub async fn modify_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
501 let (link, resolved_path) = self.mount_table.resolve(path);
502 link.protocol
503 .modify_acl_entries(&resolved_path, acl_spec)
504 .await?;
505
506 Ok(())
507 }
508
509 pub async fn remove_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
511 let (link, resolved_path) = self.mount_table.resolve(path);
512 link.protocol
513 .remove_acl_entries(&resolved_path, acl_spec)
514 .await?;
515
516 Ok(())
517 }
518
519 pub async fn remove_default_acl(&self, path: &str) -> Result<()> {
521 let (link, resolved_path) = self.mount_table.resolve(path);
522 link.protocol.remove_default_acl(&resolved_path).await?;
523
524 Ok(())
525 }
526
527 pub async fn remove_acl(&self, path: &str) -> Result<()> {
529 let (link, resolved_path) = self.mount_table.resolve(path);
530 link.protocol.remove_acl(&resolved_path).await?;
531
532 Ok(())
533 }
534
535 pub async fn set_acl(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
539 let (link, resolved_path) = self.mount_table.resolve(path);
540 link.protocol.set_acl(&resolved_path, acl_spec).await?;
541
542 Ok(())
543 }
544
545 pub async fn get_acl_status(&self, path: &str) -> Result<AclStatus> {
547 let (link, resolved_path) = self.mount_table.resolve(path);
548 Ok(link
549 .protocol
550 .get_acl_status(&resolved_path)
551 .await?
552 .result
553 .into())
554 }
555}
556
557impl Default for Client {
558 fn default() -> Self {
561 Self::default_with_config(Default::default()).expect("Failed to create default client")
562 }
563}
564
565pub(crate) struct DirListingIterator {
566 path: String,
567 resolved_path: String,
568 link: MountLink,
569 files_only: bool,
570 partial_listing: VecDeque<HdfsFileStatusProto>,
571 remaining: u32,
572 last_seen: Vec<u8>,
573}
574
575impl DirListingIterator {
576 fn new(path: String, mount_table: &Arc<MountTable>, files_only: bool) -> Self {
577 let (link, resolved_path) = mount_table.resolve(&path);
578
579 DirListingIterator {
580 path,
581 resolved_path,
582 link: link.clone(),
583 files_only,
584 partial_listing: VecDeque::new(),
585 remaining: 1,
586 last_seen: Vec::new(),
587 }
588 }
589
590 async fn get_next_batch(&mut self) -> Result<bool> {
591 let listing = self
592 .link
593 .protocol
594 .get_listing(&self.resolved_path, self.last_seen.clone(), false)
595 .await?;
596
597 if let Some(dir_list) = listing.dir_list {
598 self.last_seen = dir_list
599 .partial_listing
600 .last()
601 .map(|p| p.path.clone())
602 .unwrap_or(Vec::new());
603
604 self.remaining = dir_list.remaining_entries;
605
606 self.partial_listing = dir_list
607 .partial_listing
608 .into_iter()
609 .filter(|s| !self.files_only || s.file_type() != FileType::IsDir)
610 .collect();
611 Ok(!self.partial_listing.is_empty())
612 } else {
613 Err(HdfsError::FileNotFound(self.path.clone()))
614 }
615 }
616
617 pub async fn next(&mut self) -> Option<Result<FileStatus>> {
618 if self.partial_listing.is_empty() && self.remaining > 0 {
619 if let Err(error) = self.get_next_batch().await {
620 self.remaining = 0;
621 return Some(Err(error));
622 }
623 }
624 if let Some(next) = self.partial_listing.pop_front() {
625 Some(Ok(FileStatus::from(next, &self.path)))
626 } else {
627 None
628 }
629 }
630}
631
632pub struct ListStatusIterator {
633 mount_table: Arc<MountTable>,
634 recursive: bool,
635 iters: Arc<tokio::sync::Mutex<Vec<DirListingIterator>>>,
636}
637
638impl ListStatusIterator {
639 fn new(path: String, mount_table: Arc<MountTable>, recursive: bool) -> Self {
640 let initial = DirListingIterator::new(path.clone(), &mount_table, false);
641
642 ListStatusIterator {
643 mount_table,
644 recursive,
645 iters: Arc::new(tokio::sync::Mutex::new(vec![initial])),
646 }
647 }
648
649 pub async fn next(&self) -> Option<Result<FileStatus>> {
650 let mut next_file: Option<Result<FileStatus>> = None;
651 let mut iters = self.iters.lock().await;
652 while next_file.is_none() {
653 if let Some(iter) = iters.last_mut() {
654 if let Some(file_result) = iter.next().await {
655 if let Ok(file) = file_result {
656 if file.isdir && self.recursive {
659 iters.push(DirListingIterator::new(
660 file.path.clone(),
661 &self.mount_table,
662 false,
663 ))
664 }
665 next_file = Some(Ok(file));
666 } else {
667 next_file = Some(file_result)
669 }
670 } else {
671 iters.pop();
673 }
674 } else {
675 break;
677 }
678 }
679
680 next_file
681 }
682
683 pub fn into_stream(self) -> BoxStream<'static, Result<FileStatus>> {
684 let listing = stream::unfold(self, |state| async move {
685 let next = state.next().await;
686 next.map(|n| (n, state))
687 });
688 Box::pin(listing)
689 }
690}
691
692#[derive(Debug)]
693pub struct FileStatus {
694 pub path: String,
695 pub length: usize,
696 pub isdir: bool,
697 pub permission: u16,
698 pub owner: String,
699 pub group: String,
700 pub modification_time: u64,
701 pub access_time: u64,
702 pub replication: Option<u32>,
703 pub blocksize: Option<u64>,
704}
705
706impl FileStatus {
707 fn from(value: HdfsFileStatusProto, base_path: &str) -> Self {
708 let mut path = base_path.trim_end_matches("/").to_string();
709 let relative_path = std::str::from_utf8(&value.path).unwrap();
710 if !relative_path.is_empty() {
711 path.push('/');
712 path.push_str(relative_path);
713 }
714
715 FileStatus {
716 isdir: value.file_type() == FileType::IsDir,
717 path,
718 length: value.length as usize,
719 permission: value.permission.perm as u16,
720 owner: value.owner,
721 group: value.group,
722 modification_time: value.modification_time,
723 access_time: value.access_time,
724 replication: value.block_replication,
725 blocksize: value.blocksize,
726 }
727 }
728}
729
730#[derive(Debug)]
731pub struct ContentSummary {
732 pub length: u64,
733 pub file_count: u64,
734 pub directory_count: u64,
735 pub quota: u64,
736 pub space_consumed: u64,
737 pub space_quota: u64,
738}
739
740impl From<ContentSummaryProto> for ContentSummary {
741 fn from(value: ContentSummaryProto) -> Self {
742 ContentSummary {
743 length: value.length,
744 file_count: value.file_count,
745 directory_count: value.directory_count,
746 quota: value.quota,
747 space_consumed: value.space_consumed,
748 space_quota: value.space_quota,
749 }
750 }
751}
752
753#[cfg(test)]
754mod test {
755 use std::sync::Arc;
756
757 use url::Url;
758
759 use crate::{
760 common::config::Configuration,
761 hdfs::{protocol::NamenodeProtocol, proxy::NameServiceProxy},
762 Client,
763 };
764
765 use super::{MountLink, MountTable};
766
767 fn create_protocol(url: &str) -> Arc<NamenodeProtocol> {
768 let proxy =
769 NameServiceProxy::new(&Url::parse(url).unwrap(), &Configuration::new().unwrap())
770 .unwrap();
771 Arc::new(NamenodeProtocol::new(proxy))
772 }
773
774 #[test]
775 fn test_default_fs() {
776 assert!(Client::default_with_config(
777 vec![("fs.defaultFS".to_string(), "hdfs://test:9000".to_string())]
778 .into_iter()
779 .collect(),
780 )
781 .is_ok());
782
783 assert!(Client::default_with_config(
784 vec![("fs.defaultFS".to_string(), "hdfs://".to_string())]
785 .into_iter()
786 .collect(),
787 )
788 .is_err());
789
790 assert!(Client::new_with_config(
791 "hdfs://",
792 vec![("fs.defaultFS".to_string(), "hdfs://test:9000".to_string())]
793 .into_iter()
794 .collect(),
795 )
796 .is_ok());
797
798 assert!(Client::new_with_config(
799 "hdfs://",
800 vec![("fs.defaultFS".to_string(), "hdfs://".to_string())]
801 .into_iter()
802 .collect(),
803 )
804 .is_err());
805
806 assert!(Client::new_with_config(
807 "hdfs://",
808 vec![("fs.defaultFS".to_string(), "viewfs://test".to_string())]
809 .into_iter()
810 .collect(),
811 )
812 .is_err());
813 }
814
815 #[test]
816 fn test_mount_link_resolve() {
817 let protocol = create_protocol("hdfs://127.0.0.1:9000");
818 let link = MountLink::new("/view", "/hdfs", protocol);
819
820 assert_eq!(link.resolve("/view/dir/file").unwrap(), "/hdfs/dir/file");
821 assert_eq!(link.resolve("/view").unwrap(), "/hdfs");
822 assert!(link.resolve("/hdfs/path").is_none());
823 }
824
825 #[test]
826 fn test_fallback_link() {
827 let protocol = create_protocol("hdfs://127.0.0.1:9000");
828 let link = MountLink::new("", "/hdfs", Arc::clone(&protocol));
829
830 assert_eq!(link.resolve("/path/to/file").unwrap(), "/hdfs/path/to/file");
831 assert_eq!(link.resolve("/").unwrap(), "/hdfs/");
832 assert_eq!(link.resolve("/hdfs/path").unwrap(), "/hdfs/hdfs/path");
833
834 let link = MountLink::new("", "", protocol);
835 assert_eq!(link.resolve("/").unwrap(), "/");
836 }
837
838 #[test]
839 fn test_mount_table_resolve() {
840 let link1 = MountLink::new(
841 "/mount1",
842 "/path1/nested",
843 create_protocol("hdfs://127.0.0.1:9000"),
844 );
845 let link2 = MountLink::new(
846 "/mount2",
847 "/path2",
848 create_protocol("hdfs://127.0.0.1:9001"),
849 );
850 let link3 = MountLink::new(
851 "/mount3/nested",
852 "/path3",
853 create_protocol("hdfs://127.0.0.1:9002"),
854 );
855 let fallback = MountLink::new("/", "/path4", create_protocol("hdfs://127.0.0.1:9003"));
856
857 let mount_table = MountTable {
858 mounts: vec![link1, link2, link3],
859 fallback,
860 };
861
862 let (link, resolved) = mount_table.resolve("/mount1");
864 assert_eq!(link.viewfs_path, "/mount1");
865 assert_eq!(resolved, "/path1/nested");
866
867 let (link, resolved) = mount_table.resolve("/mount1/");
869 assert_eq!(link.viewfs_path, "/mount1");
870 assert_eq!(resolved, "/path1/nested/");
871
872 let (link, resolved) = mount_table.resolve("/mount12");
874 assert_eq!(link.viewfs_path, "");
875 assert_eq!(resolved, "/path4/mount12");
876
877 let (link, resolved) = mount_table.resolve("/mount3/file");
878 assert_eq!(link.viewfs_path, "");
879 assert_eq!(resolved, "/path4/mount3/file");
880
881 let (link, resolved) = mount_table.resolve("/mount3/nested/file");
882 assert_eq!(link.viewfs_path, "/mount3/nested");
883 assert_eq!(resolved, "/path3/file");
884 }
885}