datafusion_execution/cache/
cache_unit.rsuse std::sync::Arc;
use crate::cache::CacheAccessor;
use datafusion_common::Statistics;
use dashmap::DashMap;
use object_store::path::Path;
use object_store::ObjectMeta;
#[derive(Default)]
pub struct DefaultFileStatisticsCache {
statistics: DashMap<Path, (ObjectMeta, Arc<Statistics>)>,
}
impl CacheAccessor<Path, Arc<Statistics>> for DefaultFileStatisticsCache {
type Extra = ObjectMeta;
fn get(&self, k: &Path) -> Option<Arc<Statistics>> {
self.statistics
.get(k)
.map(|s| Some(Arc::clone(&s.value().1)))
.unwrap_or(None)
}
fn get_with_extra(&self, k: &Path, e: &Self::Extra) -> Option<Arc<Statistics>> {
self.statistics
.get(k)
.map(|s| {
let (saved_meta, statistics) = s.value();
if saved_meta.size != e.size
|| saved_meta.last_modified != e.last_modified
{
None
} else {
Some(Arc::clone(statistics))
}
})
.unwrap_or(None)
}
fn put(&self, _key: &Path, _value: Arc<Statistics>) -> Option<Arc<Statistics>> {
panic!("Put cache in DefaultFileStatisticsCache without Extra not supported.")
}
fn put_with_extra(
&self,
key: &Path,
value: Arc<Statistics>,
e: &Self::Extra,
) -> Option<Arc<Statistics>> {
self.statistics
.insert(key.clone(), (e.clone(), value))
.map(|x| x.1)
}
fn remove(&mut self, k: &Path) -> Option<Arc<Statistics>> {
self.statistics.remove(k).map(|x| x.1 .1)
}
fn contains_key(&self, k: &Path) -> bool {
self.statistics.contains_key(k)
}
fn len(&self) -> usize {
self.statistics.len()
}
fn clear(&self) {
self.statistics.clear()
}
fn name(&self) -> String {
"DefaultFileStatisticsCache".to_string()
}
}
#[derive(Default)]
pub struct DefaultListFilesCache {
statistics: DashMap<Path, Arc<Vec<ObjectMeta>>>,
}
impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for DefaultListFilesCache {
type Extra = ObjectMeta;
fn get(&self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
self.statistics.get(k).map(|x| Arc::clone(x.value()))
}
fn get_with_extra(
&self,
_k: &Path,
_e: &Self::Extra,
) -> Option<Arc<Vec<ObjectMeta>>> {
panic!("Not supported DefaultListFilesCache get_with_extra")
}
fn put(
&self,
key: &Path,
value: Arc<Vec<ObjectMeta>>,
) -> Option<Arc<Vec<ObjectMeta>>> {
self.statistics.insert(key.clone(), value)
}
fn put_with_extra(
&self,
_key: &Path,
_value: Arc<Vec<ObjectMeta>>,
_e: &Self::Extra,
) -> Option<Arc<Vec<ObjectMeta>>> {
panic!("Not supported DefaultListFilesCache put_with_extra")
}
fn remove(&mut self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
self.statistics.remove(k).map(|x| x.1)
}
fn contains_key(&self, k: &Path) -> bool {
self.statistics.contains_key(k)
}
fn len(&self) -> usize {
self.statistics.len()
}
fn clear(&self) {
self.statistics.clear()
}
fn name(&self) -> String {
"DefaultListFilesCache".to_string()
}
}
#[cfg(test)]
mod tests {
use crate::cache::cache_unit::{DefaultFileStatisticsCache, DefaultListFilesCache};
use crate::cache::CacheAccessor;
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use chrono::DateTime;
use datafusion_common::Statistics;
use object_store::path::Path;
use object_store::ObjectMeta;
#[test]
fn test_statistics_cache() {
let meta = ObjectMeta {
location: Path::from("test"),
last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
.unwrap()
.into(),
size: 1024,
e_tag: None,
version: None,
};
let cache = DefaultFileStatisticsCache::default();
assert!(cache.get_with_extra(&meta.location, &meta).is_none());
cache.put_with_extra(
&meta.location,
Statistics::new_unknown(&Schema::new(vec![Field::new(
"test_column",
DataType::Timestamp(TimeUnit::Second, None),
false,
)]))
.into(),
&meta,
);
assert!(cache.get_with_extra(&meta.location, &meta).is_some());
let mut meta2 = meta.clone();
meta2.size = 2048;
assert!(cache.get_with_extra(&meta2.location, &meta2).is_none());
let mut meta2 = meta.clone();
meta2.last_modified = DateTime::parse_from_rfc3339("2022-09-27T22:40:00+02:00")
.unwrap()
.into();
assert!(cache.get_with_extra(&meta2.location, &meta2).is_none());
let mut meta2 = meta;
meta2.location = Path::from("test2");
assert!(cache.get_with_extra(&meta2.location, &meta2).is_none());
}
#[test]
fn test_list_file_cache() {
let meta = ObjectMeta {
location: Path::from("test"),
last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
.unwrap()
.into(),
size: 1024,
e_tag: None,
version: None,
};
let cache = DefaultListFilesCache::default();
assert!(cache.get(&meta.location).is_none());
cache.put(&meta.location, vec![meta.clone()].into());
assert_eq!(
cache.get(&meta.location).unwrap().first().unwrap().clone(),
meta.clone()
);
}
}