use std::collections::{HashMap, VecDeque};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use futures::stream::BoxStream;
use futures::{stream, StreamExt};
use url::Url;
use crate::common::config::{self, Configuration};
use crate::ec::resolve_ec_policy;
use crate::error::{HdfsError, Result};
use crate::file::{FileReader, FileWriter};
use crate::hdfs::protocol::NamenodeProtocol;
use crate::hdfs::proxy::NameServiceProxy;
use crate::proto::hdfs::hdfs_file_status_proto::FileType;
use crate::proto::hdfs::{ContentSummaryProto, HdfsFileStatusProto};
#[derive(Clone)]
pub struct WriteOptions {
pub block_size: Option<u64>,
pub replication: Option<u32>,
pub permission: u32,
pub overwrite: bool,
pub create_parent: bool,
}
impl Default for WriteOptions {
fn default() -> Self {
Self {
block_size: None,
replication: None,
permission: 0o644,
overwrite: false,
create_parent: true,
}
}
}
impl AsRef<WriteOptions> for WriteOptions {
fn as_ref(&self) -> &WriteOptions {
self
}
}
impl WriteOptions {
pub fn block_size(mut self, block_size: u64) -> Self {
self.block_size = Some(block_size);
self
}
pub fn replication(mut self, replication: u32) -> Self {
self.replication = Some(replication);
self
}
pub fn permission(mut self, permission: u32) -> Self {
self.permission = permission;
self
}
pub fn overwrite(mut self, overwrite: bool) -> Self {
self.overwrite = overwrite;
self
}
pub fn create_parent(mut self, create_parent: bool) -> Self {
self.create_parent = create_parent;
self
}
}
#[derive(Debug, Clone)]
struct MountLink {
viewfs_path: PathBuf,
hdfs_path: PathBuf,
protocol: Arc<NamenodeProtocol>,
}
impl MountLink {
fn new(viewfs_path: &str, hdfs_path: &str, protocol: Arc<NamenodeProtocol>) -> Self {
Self {
viewfs_path: PathBuf::from(if viewfs_path.is_empty() {
"/"
} else {
viewfs_path
}),
hdfs_path: PathBuf::from(if hdfs_path.is_empty() { "/" } else { hdfs_path }),
protocol,
}
}
fn resolve(&self, path: &Path) -> Option<PathBuf> {
if let Ok(relative_path) = path.strip_prefix(&self.viewfs_path) {
if relative_path.components().count() == 0 {
Some(self.hdfs_path.clone())
} else {
Some(self.hdfs_path.join(relative_path))
}
} else {
None
}
}
}
#[derive(Debug)]
struct MountTable {
mounts: Vec<MountLink>,
fallback: MountLink,
}
impl MountTable {
fn resolve(&self, src: &str) -> (&MountLink, String) {
let path = Path::new(src);
for link in self.mounts.iter() {
if let Some(resolved) = link.resolve(path) {
return (link, resolved.to_string_lossy().into());
}
}
(
&self.fallback,
self.fallback
.resolve(path)
.unwrap()
.to_string_lossy()
.into(),
)
}
}
#[derive(Debug)]
pub struct Client {
mount_table: Arc<MountTable>,
}
impl Client {
pub fn new(url: &str) -> Result<Self> {
let parsed_url = Url::parse(url)?;
Self::with_config(&parsed_url, Configuration::new()?)
}
pub fn new_with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
let parsed_url = Url::parse(url)?;
Self::with_config(&parsed_url, Configuration::new_with_config(config)?)
}
pub fn default_with_config(config: HashMap<String, String>) -> Result<Self> {
let config = Configuration::new_with_config(config)?;
Self::with_config(&Self::default_fs(&config)?, config)
}
fn default_fs(config: &Configuration) -> Result<Url> {
let url = config
.get(config::DEFAULT_FS)
.ok_or(HdfsError::InvalidArgument(format!(
"No {} setting found",
config::DEFAULT_FS
)))?;
Ok(Url::parse(&url)?)
}
fn with_config(url: &Url, config: Configuration) -> Result<Self> {
let resolved_url = if !url.has_host() {
let default_url = Self::default_fs(&config)?;
if url.scheme() != default_url.scheme() || !default_url.has_host() {
return Err(HdfsError::InvalidArgument(
"URL must contain a host".to_string(),
));
}
default_url
} else {
url.clone()
};
let mount_table = match url.scheme() {
"hdfs" => {
let proxy = NameServiceProxy::new(&resolved_url, &config)?;
let protocol = Arc::new(NamenodeProtocol::new(proxy));
MountTable {
mounts: Vec::new(),
fallback: MountLink::new("/", "/", protocol),
}
}
"viewfs" => Self::build_mount_table(resolved_url.host_str().unwrap(), &config)?,
_ => {
return Err(HdfsError::InvalidArgument(
"Only `hdfs` and `viewfs` schemes are supported".to_string(),
))
}
};
Ok(Self {
mount_table: Arc::new(mount_table),
})
}
fn build_mount_table(host: &str, config: &Configuration) -> Result<MountTable> {
let mut mounts: Vec<MountLink> = Vec::new();
let mut fallback: Option<MountLink> = None;
for (viewfs_path, hdfs_url) in config.get_mount_table(host).iter() {
let url = Url::parse(hdfs_url)?;
if !url.has_host() {
return Err(HdfsError::InvalidArgument(
"URL must contain a host".to_string(),
));
}
if url.scheme() != "hdfs" {
return Err(HdfsError::InvalidArgument(
"Only hdfs mounts are supported for viewfs".to_string(),
));
}
let proxy = NameServiceProxy::new(&url, config)?;
let protocol = Arc::new(NamenodeProtocol::new(proxy));
if let Some(prefix) = viewfs_path {
mounts.push(MountLink::new(prefix, url.path(), protocol));
} else {
if fallback.is_some() {
return Err(HdfsError::InvalidArgument(
"Multiple viewfs fallback links found".to_string(),
));
}
fallback = Some(MountLink::new("/", url.path(), protocol));
}
}
if let Some(fallback) = fallback {
mounts.sort_by_key(|m| m.viewfs_path.components().count());
mounts.reverse();
Ok(MountTable { mounts, fallback })
} else {
Err(HdfsError::InvalidArgument(
"No viewfs fallback mount found".to_string(),
))
}
}
pub async fn get_file_info(&self, path: &str) -> Result<FileStatus> {
let (link, resolved_path) = self.mount_table.resolve(path);
match link.protocol.get_file_info(&resolved_path).await?.fs {
Some(status) => Ok(FileStatus::from(status, path)),
None => Err(HdfsError::FileNotFound(path.to_string())),
}
}
pub async fn list_status(&self, path: &str, recursive: bool) -> Result<Vec<FileStatus>> {
let iter = self.list_status_iter(path, recursive);
let statuses = iter
.into_stream()
.collect::<Vec<Result<FileStatus>>>()
.await;
let mut resolved_statues = Vec::<FileStatus>::with_capacity(statuses.len());
for status in statuses.into_iter() {
resolved_statues.push(status?);
}
Ok(resolved_statues)
}
pub fn list_status_iter(&self, path: &str, recursive: bool) -> ListStatusIterator {
ListStatusIterator::new(path.to_string(), Arc::clone(&self.mount_table), recursive)
}
pub async fn read(&self, path: &str) -> Result<FileReader> {
let (link, resolved_path) = self.mount_table.resolve(path);
let located_info = link.protocol.get_located_file_info(&resolved_path).await?;
match located_info.fs {
Some(mut status) => {
let ec_schema = if let Some(ec_policy) = status.ec_policy.as_ref() {
Some(resolve_ec_policy(ec_policy)?)
} else {
None
};
if status.file_encryption_info.is_some() {
return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
}
if status.file_type() == FileType::IsDir {
return Err(HdfsError::IsADirectoryError(path.to_string()));
}
if let Some(locations) = status.locations.take() {
Ok(FileReader::new(
Arc::clone(&link.protocol),
status,
locations,
ec_schema,
))
} else {
Err(HdfsError::BlocksNotFound(path.to_string()))
}
}
None => Err(HdfsError::FileNotFound(path.to_string())),
}
}
pub async fn create(
&self,
src: &str,
write_options: impl AsRef<WriteOptions>,
) -> Result<FileWriter> {
let write_options = write_options.as_ref();
let (link, resolved_path) = self.mount_table.resolve(src);
let create_response = link
.protocol
.create(
&resolved_path,
write_options.permission,
write_options.overwrite,
write_options.create_parent,
write_options.replication,
write_options.block_size,
)
.await?;
match create_response.fs {
Some(status) => {
if status.file_encryption_info.is_some() {
let _ = self.delete(src, false).await;
return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
}
Ok(FileWriter::new(
Arc::clone(&link.protocol),
resolved_path,
status,
None,
))
}
None => Err(HdfsError::FileNotFound(src.to_string())),
}
}
fn needs_new_block(class: &str, msg: &str) -> bool {
class == "java.lang.UnsupportedOperationException" && msg.contains("NEW_BLOCK")
}
pub async fn append(&self, src: &str) -> Result<FileWriter> {
let (link, resolved_path) = self.mount_table.resolve(src);
let append_response = match link.protocol.append(&resolved_path, false).await {
Err(HdfsError::RPCError(class, msg)) if Self::needs_new_block(&class, &msg) => {
link.protocol.append(&resolved_path, true).await?
}
resp => resp?,
};
match append_response.stat {
Some(status) => {
if status.file_encryption_info.is_some() {
let _ = link
.protocol
.complete(src, append_response.block.map(|b| b.b), status.file_id)
.await;
return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
}
Ok(FileWriter::new(
Arc::clone(&link.protocol),
resolved_path,
status,
append_response.block,
))
}
None => Err(HdfsError::FileNotFound(src.to_string())),
}
}
pub async fn mkdirs(&self, path: &str, permission: u32, create_parent: bool) -> Result<()> {
let (link, resolved_path) = self.mount_table.resolve(path);
link.protocol
.mkdirs(&resolved_path, permission, create_parent)
.await
.map(|_| ())
}
pub async fn rename(&self, src: &str, dst: &str, overwrite: bool) -> Result<()> {
let (src_link, src_resolved_path) = self.mount_table.resolve(src);
let (dst_link, dst_resolved_path) = self.mount_table.resolve(dst);
if src_link.viewfs_path == dst_link.viewfs_path {
src_link
.protocol
.rename(&src_resolved_path, &dst_resolved_path, overwrite)
.await
.map(|_| ())
} else {
Err(HdfsError::InvalidArgument(
"Cannot rename across different name services".to_string(),
))
}
}
pub async fn delete(&self, path: &str, recursive: bool) -> Result<bool> {
let (link, resolved_path) = self.mount_table.resolve(path);
link.protocol
.delete(&resolved_path, recursive)
.await
.map(|r| r.result)
}
pub async fn set_times(&self, path: &str, mtime: u64, atime: u64) -> Result<()> {
let (link, resolved_path) = self.mount_table.resolve(path);
link.protocol
.set_times(&resolved_path, mtime, atime)
.await?;
Ok(())
}
pub async fn set_owner(
&self,
path: &str,
owner: Option<&str>,
group: Option<&str>,
) -> Result<()> {
let (link, resolved_path) = self.mount_table.resolve(path);
link.protocol
.set_owner(&resolved_path, owner, group)
.await?;
Ok(())
}
pub async fn set_permission(&self, path: &str, permission: u32) -> Result<()> {
let (link, resolved_path) = self.mount_table.resolve(path);
link.protocol
.set_permission(&resolved_path, permission)
.await?;
Ok(())
}
pub async fn set_replication(&self, path: &str, replication: u32) -> Result<bool> {
let (link, resolved_path) = self.mount_table.resolve(path);
let result = link
.protocol
.set_replication(&resolved_path, replication)
.await?
.result;
Ok(result)
}
pub async fn get_content_summary(&self, path: &str) -> Result<ContentSummary> {
let (link, resolved_path) = self.mount_table.resolve(path);
let result = link
.protocol
.get_content_summary(&resolved_path)
.await?
.summary;
Ok(result.into())
}
}
impl Default for Client {
fn default() -> Self {
Self::default_with_config(Default::default()).expect("Failed to create default client")
}
}
pub(crate) struct DirListingIterator {
path: String,
resolved_path: String,
link: MountLink,
files_only: bool,
partial_listing: VecDeque<HdfsFileStatusProto>,
remaining: u32,
last_seen: Vec<u8>,
}
impl DirListingIterator {
fn new(path: String, mount_table: &Arc<MountTable>, files_only: bool) -> Self {
let (link, resolved_path) = mount_table.resolve(&path);
DirListingIterator {
path,
resolved_path,
link: link.clone(),
files_only,
partial_listing: VecDeque::new(),
remaining: 1,
last_seen: Vec::new(),
}
}
async fn get_next_batch(&mut self) -> Result<bool> {
let listing = self
.link
.protocol
.get_listing(&self.resolved_path, self.last_seen.clone(), false)
.await?;
if let Some(dir_list) = listing.dir_list {
self.last_seen = dir_list
.partial_listing
.last()
.map(|p| p.path.clone())
.unwrap_or(Vec::new());
self.remaining = dir_list.remaining_entries;
self.partial_listing = dir_list
.partial_listing
.into_iter()
.filter(|s| !self.files_only || s.file_type() != FileType::IsDir)
.collect();
Ok(!self.partial_listing.is_empty())
} else {
Err(HdfsError::FileNotFound(self.path.clone()))
}
}
pub async fn next(&mut self) -> Option<Result<FileStatus>> {
if self.partial_listing.is_empty() && self.remaining > 0 {
if let Err(error) = self.get_next_batch().await {
self.remaining = 0;
return Some(Err(error));
}
}
if let Some(next) = self.partial_listing.pop_front() {
Some(Ok(FileStatus::from(next, &self.path)))
} else {
None
}
}
}
pub struct ListStatusIterator {
mount_table: Arc<MountTable>,
recursive: bool,
iters: Arc<tokio::sync::Mutex<Vec<DirListingIterator>>>,
}
impl ListStatusIterator {
fn new(path: String, mount_table: Arc<MountTable>, recursive: bool) -> Self {
let initial = DirListingIterator::new(path.clone(), &mount_table, false);
ListStatusIterator {
mount_table,
recursive,
iters: Arc::new(tokio::sync::Mutex::new(vec![initial])),
}
}
pub async fn next(&self) -> Option<Result<FileStatus>> {
let mut next_file: Option<Result<FileStatus>> = None;
let mut iters = self.iters.lock().await;
while next_file.is_none() {
if let Some(iter) = iters.last_mut() {
if let Some(file_result) = iter.next().await {
if let Ok(file) = file_result {
if file.isdir && self.recursive {
iters.push(DirListingIterator::new(
file.path.clone(),
&self.mount_table,
false,
))
}
next_file = Some(Ok(file));
} else {
next_file = Some(file_result)
}
} else {
iters.pop();
}
} else {
break;
}
}
next_file
}
pub fn into_stream(self) -> BoxStream<'static, Result<FileStatus>> {
let listing = stream::unfold(self, |state| async move {
let next = state.next().await;
next.map(|n| (n, state))
});
Box::pin(listing)
}
}
#[derive(Debug)]
pub struct FileStatus {
pub path: String,
pub length: usize,
pub isdir: bool,
pub permission: u16,
pub owner: String,
pub group: String,
pub modification_time: u64,
pub access_time: u64,
pub replication: Option<u32>,
pub blocksize: Option<u64>,
}
impl FileStatus {
fn from(value: HdfsFileStatusProto, base_path: &str) -> Self {
let mut path = PathBuf::from(base_path);
if let Ok(relative_path) = std::str::from_utf8(&value.path) {
if !relative_path.is_empty() {
path.push(relative_path)
}
}
FileStatus {
isdir: value.file_type() == FileType::IsDir,
path: path
.to_str()
.map(|x| x.to_string())
.unwrap_or(String::new()),
length: value.length as usize,
permission: value.permission.perm as u16,
owner: value.owner,
group: value.group,
modification_time: value.modification_time,
access_time: value.access_time,
replication: value.block_replication,
blocksize: value.blocksize,
}
}
}
#[derive(Debug)]
pub struct ContentSummary {
pub length: u64,
pub file_count: u64,
pub directory_count: u64,
pub quota: u64,
pub space_consumed: u64,
pub space_quota: u64,
}
impl From<ContentSummaryProto> for ContentSummary {
fn from(value: ContentSummaryProto) -> Self {
ContentSummary {
length: value.length,
file_count: value.file_count,
directory_count: value.directory_count,
quota: value.quota,
space_consumed: value.space_consumed,
space_quota: value.space_quota,
}
}
}
#[cfg(test)]
mod test {
use std::{
path::{Path, PathBuf},
sync::Arc,
};
use url::Url;
use crate::{
common::config::Configuration,
hdfs::{protocol::NamenodeProtocol, proxy::NameServiceProxy},
Client,
};
use super::{MountLink, MountTable};
fn create_protocol(url: &str) -> Arc<NamenodeProtocol> {
let proxy =
NameServiceProxy::new(&Url::parse(url).unwrap(), &Configuration::new().unwrap())
.unwrap();
Arc::new(NamenodeProtocol::new(proxy))
}
#[test]
fn test_default_fs() {
assert!(Client::default_with_config(
vec![("fs.defaultFS".to_string(), "hdfs://test:9000".to_string())]
.into_iter()
.collect(),
)
.is_ok());
assert!(Client::default_with_config(
vec![("fs.defaultFS".to_string(), "hdfs://".to_string())]
.into_iter()
.collect(),
)
.is_err());
assert!(Client::new_with_config(
"hdfs://",
vec![("fs.defaultFS".to_string(), "hdfs://test:9000".to_string())]
.into_iter()
.collect(),
)
.is_ok());
assert!(Client::new_with_config(
"hdfs://",
vec![("fs.defaultFS".to_string(), "hdfs://".to_string())]
.into_iter()
.collect(),
)
.is_err());
assert!(Client::new_with_config(
"hdfs://",
vec![("fs.defaultFS".to_string(), "viewfs://test".to_string())]
.into_iter()
.collect(),
)
.is_err());
}
#[test]
fn test_mount_link_resolve() {
let protocol = create_protocol("hdfs://127.0.0.1:9000");
let link = MountLink::new("/view", "/hdfs", protocol);
assert_eq!(
link.resolve(Path::new("/view/dir/file")).unwrap(),
PathBuf::from("/hdfs/dir/file")
);
assert_eq!(
link.resolve(Path::new("/view")).unwrap(),
PathBuf::from("/hdfs")
);
assert!(link.resolve(Path::new("/hdfs/path")).is_none());
}
#[test]
fn test_fallback_link() {
let protocol = create_protocol("hdfs://127.0.0.1:9000");
let link = MountLink::new("", "/hdfs", protocol);
assert_eq!(
link.resolve(Path::new("/path/to/file")).unwrap(),
PathBuf::from("/hdfs/path/to/file")
);
assert_eq!(
link.resolve(Path::new("/")).unwrap(),
PathBuf::from("/hdfs")
);
assert_eq!(
link.resolve(Path::new("/hdfs/path")).unwrap(),
PathBuf::from("/hdfs/hdfs/path")
);
}
#[test]
fn test_mount_table_resolve() {
let link1 = MountLink::new(
"/mount1",
"/path1/nested",
create_protocol("hdfs://127.0.0.1:9000"),
);
let link2 = MountLink::new(
"/mount2",
"/path2",
create_protocol("hdfs://127.0.0.1:9001"),
);
let link3 = MountLink::new(
"/mount3/nested",
"/path3",
create_protocol("hdfs://127.0.0.1:9002"),
);
let fallback = MountLink::new("/", "/path4", create_protocol("hdfs://127.0.0.1:9003"));
let mount_table = MountTable {
mounts: vec![link1, link2, link3],
fallback,
};
let (link, resolved) = mount_table.resolve("/mount1");
assert_eq!(link.viewfs_path, Path::new("/mount1"));
assert_eq!(resolved, "/path1/nested");
let (link, resolved) = mount_table.resolve("/mount1/");
assert_eq!(link.viewfs_path, Path::new("/mount1"));
assert_eq!(resolved, "/path1/nested");
let (link, resolved) = mount_table.resolve("/mount12");
assert_eq!(link.viewfs_path, Path::new("/"));
assert_eq!(resolved, "/path4/mount12");
let (link, resolved) = mount_table.resolve("/mount3/file");
assert_eq!(link.viewfs_path, Path::new("/"));
assert_eq!(resolved, "/path4/mount3/file");
let (link, resolved) = mount_table.resolve("/mount3/nested/file");
assert_eq!(link.viewfs_path, Path::new("/mount3/nested"));
assert_eq!(resolved, "/path3/file");
}
}