1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3
4use futures::stream::BoxStream;
5use futures::{stream, StreamExt};
6use url::Url;
7
8use crate::acl::{AclEntry, AclStatus};
9use crate::common::config::{self, Configuration};
10use crate::ec::resolve_ec_policy;
11use crate::error::{HdfsError, Result};
12use crate::file::{FileReader, FileWriter};
13use crate::hdfs::protocol::NamenodeProtocol;
14use crate::hdfs::proxy::NameServiceProxy;
15use crate::proto::hdfs::hdfs_file_status_proto::FileType;
16
17use crate::proto::hdfs::{ContentSummaryProto, HdfsFileStatusProto};
18
19#[derive(Clone)]
20pub struct WriteOptions {
21 pub block_size: Option<u64>,
23 pub replication: Option<u32>,
25 pub permission: u32,
28 pub overwrite: bool,
31 pub create_parent: bool,
34}
35
36impl Default for WriteOptions {
37 fn default() -> Self {
38 Self {
39 block_size: None,
40 replication: None,
41 permission: 0o644,
42 overwrite: false,
43 create_parent: true,
44 }
45 }
46}
47
48impl AsRef<WriteOptions> for WriteOptions {
49 fn as_ref(&self) -> &WriteOptions {
50 self
51 }
52}
53
54impl WriteOptions {
55 pub fn block_size(mut self, block_size: u64) -> Self {
57 self.block_size = Some(block_size);
58 self
59 }
60
61 pub fn replication(mut self, replication: u32) -> Self {
63 self.replication = Some(replication);
64 self
65 }
66
67 pub fn permission(mut self, permission: u32) -> Self {
69 self.permission = permission;
70 self
71 }
72
73 pub fn overwrite(mut self, overwrite: bool) -> Self {
75 self.overwrite = overwrite;
76 self
77 }
78
79 pub fn create_parent(mut self, create_parent: bool) -> Self {
81 self.create_parent = create_parent;
82 self
83 }
84}
85
86#[derive(Debug, Clone)]
87struct MountLink {
88 viewfs_path: String,
89 hdfs_path: String,
90 protocol: Arc<NamenodeProtocol>,
91}
92
93impl MountLink {
94 fn new(viewfs_path: &str, hdfs_path: &str, protocol: Arc<NamenodeProtocol>) -> Self {
95 Self {
97 viewfs_path: viewfs_path.trim_end_matches("/").to_string(),
98 hdfs_path: hdfs_path.trim_end_matches("/").to_string(),
99 protocol,
100 }
101 }
102 fn resolve(&self, path: &str) -> Option<String> {
104 if path == self.viewfs_path {
107 Some(self.hdfs_path.clone())
108 } else {
109 path.strip_prefix(&format!("{}/", self.viewfs_path))
110 .map(|relative_path| format!("{}/{}", &self.hdfs_path, relative_path))
111 }
112 }
113}
114
115#[derive(Debug)]
116struct MountTable {
117 mounts: Vec<MountLink>,
118 fallback: MountLink,
119}
120
121impl MountTable {
122 fn resolve(&self, src: &str) -> (&MountLink, String) {
123 for link in self.mounts.iter() {
124 if let Some(resolved) = link.resolve(src) {
125 return (link, resolved);
126 }
127 }
128 (&self.fallback, self.fallback.resolve(src).unwrap())
129 }
130}
131
132#[derive(Debug)]
133pub struct Client {
134 mount_table: Arc<MountTable>,
135 config: Arc<Configuration>,
136}
137
138impl Client {
139 pub fn new(url: &str) -> Result<Self> {
143 let parsed_url = Url::parse(url)?;
144 Self::with_config(&parsed_url, Configuration::new()?)
145 }
146
147 pub fn new_with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
148 let parsed_url = Url::parse(url)?;
149 Self::with_config(&parsed_url, Configuration::new_with_config(config)?)
150 }
151
152 pub fn default_with_config(config: HashMap<String, String>) -> Result<Self> {
153 let config = Configuration::new_with_config(config)?;
154 Self::with_config(&Self::default_fs(&config)?, config)
155 }
156
157 fn default_fs(config: &Configuration) -> Result<Url> {
158 let url = config
159 .get(config::DEFAULT_FS)
160 .ok_or(HdfsError::InvalidArgument(format!(
161 "No {} setting found",
162 config::DEFAULT_FS
163 )))?;
164 Ok(Url::parse(&url)?)
165 }
166
167 fn with_config(url: &Url, config: Configuration) -> Result<Self> {
168 let resolved_url = if !url.has_host() {
169 let default_url = Self::default_fs(&config)?;
170 if url.scheme() != default_url.scheme() || !default_url.has_host() {
171 return Err(HdfsError::InvalidArgument(
172 "URL must contain a host".to_string(),
173 ));
174 }
175 default_url
176 } else {
177 url.clone()
178 };
179
180 let mount_table = match url.scheme() {
181 "hdfs" => {
182 let proxy = NameServiceProxy::new(&resolved_url, &config)?;
183 let protocol = Arc::new(NamenodeProtocol::new(proxy));
184
185 MountTable {
186 mounts: Vec::new(),
187 fallback: MountLink::new("/", "/", protocol),
188 }
189 }
190 "viewfs" => Self::build_mount_table(resolved_url.host_str().unwrap(), &config)?,
191 _ => {
192 return Err(HdfsError::InvalidArgument(
193 "Only `hdfs` and `viewfs` schemes are supported".to_string(),
194 ))
195 }
196 };
197
198 Ok(Self {
199 mount_table: Arc::new(mount_table),
200 config: Arc::new(config),
201 })
202 }
203
204 fn build_mount_table(host: &str, config: &Configuration) -> Result<MountTable> {
205 let mut mounts: Vec<MountLink> = Vec::new();
206 let mut fallback: Option<MountLink> = None;
207
208 for (viewfs_path, hdfs_url) in config.get_mount_table(host).iter() {
209 let url = Url::parse(hdfs_url)?;
210 if !url.has_host() {
211 return Err(HdfsError::InvalidArgument(
212 "URL must contain a host".to_string(),
213 ));
214 }
215 if url.scheme() != "hdfs" {
216 return Err(HdfsError::InvalidArgument(
217 "Only hdfs mounts are supported for viewfs".to_string(),
218 ));
219 }
220 let proxy = NameServiceProxy::new(&url, config)?;
221 let protocol = Arc::new(NamenodeProtocol::new(proxy));
222
223 if let Some(prefix) = viewfs_path {
224 mounts.push(MountLink::new(prefix, url.path(), protocol));
225 } else {
226 if fallback.is_some() {
227 return Err(HdfsError::InvalidArgument(
228 "Multiple viewfs fallback links found".to_string(),
229 ));
230 }
231 fallback = Some(MountLink::new("/", url.path(), protocol));
232 }
233 }
234
235 if let Some(fallback) = fallback {
236 mounts.sort_by_key(|m| m.viewfs_path.chars().filter(|c| *c == '/').count());
238 mounts.reverse();
239
240 Ok(MountTable { mounts, fallback })
241 } else {
242 Err(HdfsError::InvalidArgument(
243 "No viewfs fallback mount found".to_string(),
244 ))
245 }
246 }
247
248 pub async fn get_file_info(&self, path: &str) -> Result<FileStatus> {
250 let (link, resolved_path) = self.mount_table.resolve(path);
251 match link.protocol.get_file_info(&resolved_path).await?.fs {
252 Some(status) => Ok(FileStatus::from(status, path)),
253 None => Err(HdfsError::FileNotFound(path.to_string())),
254 }
255 }
256
257 pub async fn list_status(&self, path: &str, recursive: bool) -> Result<Vec<FileStatus>> {
260 let iter = self.list_status_iter(path, recursive);
261 let statuses = iter
262 .into_stream()
263 .collect::<Vec<Result<FileStatus>>>()
264 .await;
265
266 let mut resolved_statues = Vec::<FileStatus>::with_capacity(statuses.len());
267 for status in statuses.into_iter() {
268 resolved_statues.push(status?);
269 }
270
271 Ok(resolved_statues)
272 }
273
274 pub fn list_status_iter(&self, path: &str, recursive: bool) -> ListStatusIterator {
276 ListStatusIterator::new(path.to_string(), Arc::clone(&self.mount_table), recursive)
277 }
278
279 pub async fn read(&self, path: &str) -> Result<FileReader> {
281 let (link, resolved_path) = self.mount_table.resolve(path);
282 let located_info = link.protocol.get_located_file_info(&resolved_path).await?;
283 match located_info.fs {
284 Some(mut status) => {
285 let ec_schema = if let Some(ec_policy) = status.ec_policy.as_ref() {
286 Some(resolve_ec_policy(ec_policy)?)
287 } else {
288 None
289 };
290
291 if status.file_encryption_info.is_some() {
292 return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
293 }
294 if status.file_type() == FileType::IsDir {
295 return Err(HdfsError::IsADirectoryError(path.to_string()));
296 }
297
298 if let Some(locations) = status.locations.take() {
299 Ok(FileReader::new(
300 Arc::clone(&link.protocol),
301 status,
302 locations,
303 ec_schema,
304 ))
305 } else {
306 Err(HdfsError::BlocksNotFound(path.to_string()))
307 }
308 }
309 None => Err(HdfsError::FileNotFound(path.to_string())),
310 }
311 }
312
313 pub async fn create(
316 &self,
317 src: &str,
318 write_options: impl AsRef<WriteOptions>,
319 ) -> Result<FileWriter> {
320 let write_options = write_options.as_ref();
321
322 let (link, resolved_path) = self.mount_table.resolve(src);
323
324 let create_response = link
325 .protocol
326 .create(
327 &resolved_path,
328 write_options.permission,
329 write_options.overwrite,
330 write_options.create_parent,
331 write_options.replication,
332 write_options.block_size,
333 )
334 .await?;
335
336 match create_response.fs {
337 Some(status) => {
338 if status.file_encryption_info.is_some() {
339 let _ = self.delete(src, false).await;
340 return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
341 }
342
343 Ok(FileWriter::new(
344 Arc::clone(&link.protocol),
345 resolved_path,
346 status,
347 Arc::clone(&self.config),
348 None,
349 ))
350 }
351 None => Err(HdfsError::FileNotFound(src.to_string())),
352 }
353 }
354
355 fn needs_new_block(class: &str, msg: &str) -> bool {
356 class == "java.lang.UnsupportedOperationException" && msg.contains("NEW_BLOCK")
357 }
358
359 pub async fn append(&self, src: &str) -> Result<FileWriter> {
363 let (link, resolved_path) = self.mount_table.resolve(src);
364
365 let append_response = match link.protocol.append(&resolved_path, false).await {
368 Err(HdfsError::RPCError(class, msg)) if Self::needs_new_block(&class, &msg) => {
369 link.protocol.append(&resolved_path, true).await?
370 }
371 resp => resp?,
372 };
373
374 match append_response.stat {
375 Some(status) => {
376 if status.file_encryption_info.is_some() {
377 let _ = link
378 .protocol
379 .complete(src, append_response.block.map(|b| b.b), status.file_id)
380 .await;
381 return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
382 }
383
384 Ok(FileWriter::new(
385 Arc::clone(&link.protocol),
386 resolved_path,
387 status,
388 Arc::clone(&self.config),
389 append_response.block,
390 ))
391 }
392 None => Err(HdfsError::FileNotFound(src.to_string())),
393 }
394 }
395
396 pub async fn mkdirs(&self, path: &str, permission: u32, create_parent: bool) -> Result<()> {
404 let (link, resolved_path) = self.mount_table.resolve(path);
405 link.protocol
406 .mkdirs(&resolved_path, permission, create_parent)
407 .await
408 .map(|_| ())
409 }
410
411 pub async fn rename(&self, src: &str, dst: &str, overwrite: bool) -> Result<()> {
413 let (src_link, src_resolved_path) = self.mount_table.resolve(src);
414 let (dst_link, dst_resolved_path) = self.mount_table.resolve(dst);
415 if src_link.viewfs_path == dst_link.viewfs_path {
416 src_link
417 .protocol
418 .rename(&src_resolved_path, &dst_resolved_path, overwrite)
419 .await
420 .map(|_| ())
421 } else {
422 Err(HdfsError::InvalidArgument(
423 "Cannot rename across different name services".to_string(),
424 ))
425 }
426 }
427
428 pub async fn delete(&self, path: &str, recursive: bool) -> Result<bool> {
431 let (link, resolved_path) = self.mount_table.resolve(path);
432 link.protocol
433 .delete(&resolved_path, recursive)
434 .await
435 .map(|r| r.result)
436 }
437
438 pub async fn set_times(&self, path: &str, mtime: u64, atime: u64) -> Result<()> {
440 let (link, resolved_path) = self.mount_table.resolve(path);
441 link.protocol
442 .set_times(&resolved_path, mtime, atime)
443 .await?;
444 Ok(())
445 }
446
447 pub async fn set_owner(
449 &self,
450 path: &str,
451 owner: Option<&str>,
452 group: Option<&str>,
453 ) -> Result<()> {
454 let (link, resolved_path) = self.mount_table.resolve(path);
455 link.protocol
456 .set_owner(&resolved_path, owner, group)
457 .await?;
458 Ok(())
459 }
460
461 pub async fn set_permission(&self, path: &str, permission: u32) -> Result<()> {
472 let (link, resolved_path) = self.mount_table.resolve(path);
473 link.protocol
474 .set_permission(&resolved_path, permission)
475 .await?;
476 Ok(())
477 }
478
479 pub async fn set_replication(&self, path: &str, replication: u32) -> Result<bool> {
481 let (link, resolved_path) = self.mount_table.resolve(path);
482 let result = link
483 .protocol
484 .set_replication(&resolved_path, replication)
485 .await?
486 .result;
487
488 Ok(result)
489 }
490
491 pub async fn get_content_summary(&self, path: &str) -> Result<ContentSummary> {
493 let (link, resolved_path) = self.mount_table.resolve(path);
494 let result = link
495 .protocol
496 .get_content_summary(&resolved_path)
497 .await?
498 .summary;
499
500 Ok(result.into())
501 }
502
503 pub async fn modify_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
505 let (link, resolved_path) = self.mount_table.resolve(path);
506 link.protocol
507 .modify_acl_entries(&resolved_path, acl_spec)
508 .await?;
509
510 Ok(())
511 }
512
513 pub async fn remove_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
515 let (link, resolved_path) = self.mount_table.resolve(path);
516 link.protocol
517 .remove_acl_entries(&resolved_path, acl_spec)
518 .await?;
519
520 Ok(())
521 }
522
523 pub async fn remove_default_acl(&self, path: &str) -> Result<()> {
525 let (link, resolved_path) = self.mount_table.resolve(path);
526 link.protocol.remove_default_acl(&resolved_path).await?;
527
528 Ok(())
529 }
530
531 pub async fn remove_acl(&self, path: &str) -> Result<()> {
533 let (link, resolved_path) = self.mount_table.resolve(path);
534 link.protocol.remove_acl(&resolved_path).await?;
535
536 Ok(())
537 }
538
539 pub async fn set_acl(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
543 let (link, resolved_path) = self.mount_table.resolve(path);
544 link.protocol.set_acl(&resolved_path, acl_spec).await?;
545
546 Ok(())
547 }
548
549 pub async fn get_acl_status(&self, path: &str) -> Result<AclStatus> {
551 let (link, resolved_path) = self.mount_table.resolve(path);
552 Ok(link
553 .protocol
554 .get_acl_status(&resolved_path)
555 .await?
556 .result
557 .into())
558 }
559}
560
561impl Default for Client {
562 fn default() -> Self {
565 Self::default_with_config(Default::default()).expect("Failed to create default client")
566 }
567}
568
569pub(crate) struct DirListingIterator {
570 path: String,
571 resolved_path: String,
572 link: MountLink,
573 files_only: bool,
574 partial_listing: VecDeque<HdfsFileStatusProto>,
575 remaining: u32,
576 last_seen: Vec<u8>,
577}
578
579impl DirListingIterator {
580 fn new(path: String, mount_table: &Arc<MountTable>, files_only: bool) -> Self {
581 let (link, resolved_path) = mount_table.resolve(&path);
582
583 DirListingIterator {
584 path,
585 resolved_path,
586 link: link.clone(),
587 files_only,
588 partial_listing: VecDeque::new(),
589 remaining: 1,
590 last_seen: Vec::new(),
591 }
592 }
593
594 async fn get_next_batch(&mut self) -> Result<bool> {
595 let listing = self
596 .link
597 .protocol
598 .get_listing(&self.resolved_path, self.last_seen.clone(), false)
599 .await?;
600
601 if let Some(dir_list) = listing.dir_list {
602 self.last_seen = dir_list
603 .partial_listing
604 .last()
605 .map(|p| p.path.clone())
606 .unwrap_or(Vec::new());
607
608 self.remaining = dir_list.remaining_entries;
609
610 self.partial_listing = dir_list
611 .partial_listing
612 .into_iter()
613 .filter(|s| !self.files_only || s.file_type() != FileType::IsDir)
614 .collect();
615 Ok(!self.partial_listing.is_empty())
616 } else {
617 Err(HdfsError::FileNotFound(self.path.clone()))
618 }
619 }
620
621 pub async fn next(&mut self) -> Option<Result<FileStatus>> {
622 if self.partial_listing.is_empty() && self.remaining > 0 {
623 if let Err(error) = self.get_next_batch().await {
624 self.remaining = 0;
625 return Some(Err(error));
626 }
627 }
628 if let Some(next) = self.partial_listing.pop_front() {
629 Some(Ok(FileStatus::from(next, &self.path)))
630 } else {
631 None
632 }
633 }
634}
635
636pub struct ListStatusIterator {
637 mount_table: Arc<MountTable>,
638 recursive: bool,
639 iters: Arc<tokio::sync::Mutex<Vec<DirListingIterator>>>,
640}
641
642impl ListStatusIterator {
643 fn new(path: String, mount_table: Arc<MountTable>, recursive: bool) -> Self {
644 let initial = DirListingIterator::new(path.clone(), &mount_table, false);
645
646 ListStatusIterator {
647 mount_table,
648 recursive,
649 iters: Arc::new(tokio::sync::Mutex::new(vec![initial])),
650 }
651 }
652
653 pub async fn next(&self) -> Option<Result<FileStatus>> {
654 let mut next_file: Option<Result<FileStatus>> = None;
655 let mut iters = self.iters.lock().await;
656 while next_file.is_none() {
657 if let Some(iter) = iters.last_mut() {
658 if let Some(file_result) = iter.next().await {
659 if let Ok(file) = file_result {
660 if file.isdir && self.recursive {
663 iters.push(DirListingIterator::new(
664 file.path.clone(),
665 &self.mount_table,
666 false,
667 ))
668 }
669 next_file = Some(Ok(file));
670 } else {
671 next_file = Some(file_result)
673 }
674 } else {
675 iters.pop();
677 }
678 } else {
679 break;
681 }
682 }
683
684 next_file
685 }
686
687 pub fn into_stream(self) -> BoxStream<'static, Result<FileStatus>> {
688 let listing = stream::unfold(self, |state| async move {
689 let next = state.next().await;
690 next.map(|n| (n, state))
691 });
692 Box::pin(listing)
693 }
694}
695
696#[derive(Debug)]
697pub struct FileStatus {
698 pub path: String,
699 pub length: usize,
700 pub isdir: bool,
701 pub permission: u16,
702 pub owner: String,
703 pub group: String,
704 pub modification_time: u64,
705 pub access_time: u64,
706 pub replication: Option<u32>,
707 pub blocksize: Option<u64>,
708}
709
710impl FileStatus {
711 fn from(value: HdfsFileStatusProto, base_path: &str) -> Self {
712 let mut path = base_path.trim_end_matches("/").to_string();
713 let relative_path = std::str::from_utf8(&value.path).unwrap();
714 if !relative_path.is_empty() {
715 path.push('/');
716 path.push_str(relative_path);
717 }
718
719 FileStatus {
720 isdir: value.file_type() == FileType::IsDir,
721 path,
722 length: value.length as usize,
723 permission: value.permission.perm as u16,
724 owner: value.owner,
725 group: value.group,
726 modification_time: value.modification_time,
727 access_time: value.access_time,
728 replication: value.block_replication,
729 blocksize: value.blocksize,
730 }
731 }
732}
733
734#[derive(Debug)]
735pub struct ContentSummary {
736 pub length: u64,
737 pub file_count: u64,
738 pub directory_count: u64,
739 pub quota: u64,
740 pub space_consumed: u64,
741 pub space_quota: u64,
742}
743
744impl From<ContentSummaryProto> for ContentSummary {
745 fn from(value: ContentSummaryProto) -> Self {
746 ContentSummary {
747 length: value.length,
748 file_count: value.file_count,
749 directory_count: value.directory_count,
750 quota: value.quota,
751 space_consumed: value.space_consumed,
752 space_quota: value.space_quota,
753 }
754 }
755}
756
757#[cfg(test)]
758mod test {
759 use std::sync::Arc;
760
761 use url::Url;
762
763 use crate::{
764 common::config::Configuration,
765 hdfs::{protocol::NamenodeProtocol, proxy::NameServiceProxy},
766 Client,
767 };
768
769 use super::{MountLink, MountTable};
770
771 fn create_protocol(url: &str) -> Arc<NamenodeProtocol> {
772 let proxy =
773 NameServiceProxy::new(&Url::parse(url).unwrap(), &Configuration::new().unwrap())
774 .unwrap();
775 Arc::new(NamenodeProtocol::new(proxy))
776 }
777
778 #[test]
779 fn test_default_fs() {
780 assert!(Client::default_with_config(
781 vec![("fs.defaultFS".to_string(), "hdfs://test:9000".to_string())]
782 .into_iter()
783 .collect(),
784 )
785 .is_ok());
786
787 assert!(Client::default_with_config(
788 vec![("fs.defaultFS".to_string(), "hdfs://".to_string())]
789 .into_iter()
790 .collect(),
791 )
792 .is_err());
793
794 assert!(Client::new_with_config(
795 "hdfs://",
796 vec![("fs.defaultFS".to_string(), "hdfs://test:9000".to_string())]
797 .into_iter()
798 .collect(),
799 )
800 .is_ok());
801
802 assert!(Client::new_with_config(
803 "hdfs://",
804 vec![("fs.defaultFS".to_string(), "hdfs://".to_string())]
805 .into_iter()
806 .collect(),
807 )
808 .is_err());
809
810 assert!(Client::new_with_config(
811 "hdfs://",
812 vec![("fs.defaultFS".to_string(), "viewfs://test".to_string())]
813 .into_iter()
814 .collect(),
815 )
816 .is_err());
817 }
818
819 #[test]
820 fn test_mount_link_resolve() {
821 let protocol = create_protocol("hdfs://127.0.0.1:9000");
822 let link = MountLink::new("/view", "/hdfs", protocol);
823
824 assert_eq!(link.resolve("/view/dir/file").unwrap(), "/hdfs/dir/file");
825 assert_eq!(link.resolve("/view").unwrap(), "/hdfs");
826 assert!(link.resolve("/hdfs/path").is_none());
827 }
828
829 #[test]
830 fn test_fallback_link() {
831 let protocol = create_protocol("hdfs://127.0.0.1:9000");
832 let link = MountLink::new("", "/hdfs", Arc::clone(&protocol));
833
834 assert_eq!(link.resolve("/path/to/file").unwrap(), "/hdfs/path/to/file");
835 assert_eq!(link.resolve("/").unwrap(), "/hdfs/");
836 assert_eq!(link.resolve("/hdfs/path").unwrap(), "/hdfs/hdfs/path");
837
838 let link = MountLink::new("", "", protocol);
839 assert_eq!(link.resolve("/").unwrap(), "/");
840 }
841
842 #[test]
843 fn test_mount_table_resolve() {
844 let link1 = MountLink::new(
845 "/mount1",
846 "/path1/nested",
847 create_protocol("hdfs://127.0.0.1:9000"),
848 );
849 let link2 = MountLink::new(
850 "/mount2",
851 "/path2",
852 create_protocol("hdfs://127.0.0.1:9001"),
853 );
854 let link3 = MountLink::new(
855 "/mount3/nested",
856 "/path3",
857 create_protocol("hdfs://127.0.0.1:9002"),
858 );
859 let fallback = MountLink::new("/", "/path4", create_protocol("hdfs://127.0.0.1:9003"));
860
861 let mount_table = MountTable {
862 mounts: vec![link1, link2, link3],
863 fallback,
864 };
865
866 let (link, resolved) = mount_table.resolve("/mount1");
868 assert_eq!(link.viewfs_path, "/mount1");
869 assert_eq!(resolved, "/path1/nested");
870
871 let (link, resolved) = mount_table.resolve("/mount1/");
873 assert_eq!(link.viewfs_path, "/mount1");
874 assert_eq!(resolved, "/path1/nested/");
875
876 let (link, resolved) = mount_table.resolve("/mount12");
878 assert_eq!(link.viewfs_path, "");
879 assert_eq!(resolved, "/path4/mount12");
880
881 let (link, resolved) = mount_table.resolve("/mount3/file");
882 assert_eq!(link.viewfs_path, "");
883 assert_eq!(resolved, "/path4/mount3/file");
884
885 let (link, resolved) = mount_table.resolve("/mount3/nested/file");
886 assert_eq!(link.viewfs_path, "/mount3/nested");
887 assert_eq!(resolved, "/path3/file");
888 }
889}