lance_core/
cache.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

//! Cache implementation

use std::any::{Any, TypeId};
use std::sync::Arc;

use deepsize::{Context, DeepSizeOf};
use futures::Future;
use moka::sync::Cache;
use object_store::path::Path;

use crate::utils::path::LancePathExt;
use crate::Result;

type ArcAny = Arc<dyn Any + Send + Sync>;

#[derive(Clone)]
struct SizedRecord {
    record: ArcAny,
    size_accessor: Arc<dyn Fn(&ArcAny) -> usize + Send + Sync>,
}

impl std::fmt::Debug for SizedRecord {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("SizedRecord")
            .field("record", &self.record)
            .finish()
    }
}

impl SizedRecord {
    fn new<T: DeepSizeOf + Send + Sync + 'static>(record: Arc<T>) -> Self {
        let size_accessor =
            |record: &ArcAny| -> usize { record.downcast_ref::<T>().unwrap().deep_size_of() };
        Self {
            record,
            size_accessor: Arc::new(size_accessor),
        }
    }
}

/// Cache for various metadata about files.
///
/// The cache is keyed by the file path and the type of metadata.
#[derive(Clone, Debug)]
pub struct FileMetadataCache {
    cache: Option<Arc<Cache<(Path, TypeId), SizedRecord>>>,
    base_path: Option<Path>,
}

impl DeepSizeOf for FileMetadataCache {
    fn deep_size_of_children(&self, _: &mut Context) -> usize {
        self.cache
            .as_ref()
            .map(|cache| {
                cache
                    .iter()
                    .map(|(_, v)| (v.size_accessor)(&v.record))
                    .sum()
            })
            .unwrap_or(0)
    }
}

pub enum CapacityMode {
    Items,
    Bytes,
}

impl FileMetadataCache {
    /// Instantiates a new cache which, for legacy reasons, uses Items capacity mode.
    pub fn new(capacity: usize) -> Self {
        Self {
            cache: Some(Arc::new(Cache::new(capacity as u64))),
            base_path: None,
        }
    }

    /// Instantiates a dummy cache that will never cache anything.
    pub fn no_cache() -> Self {
        Self {
            cache: None,
            base_path: None,
        }
    }

    /// Instantiates a new cache with a given capacity and capacity mode.
    pub fn with_capacity(capacity: usize, mode: CapacityMode) -> Self {
        match mode {
            CapacityMode::Items => Self::new(capacity),
            CapacityMode::Bytes => Self {
                cache: Some(Arc::new(
                    Cache::builder()
                        .weigher(|_, v: &SizedRecord| {
                            (v.size_accessor)(&v.record).try_into().unwrap_or(u32::MAX)
                        })
                        .build(),
                )),
                base_path: None,
            },
        }
    }

    /// Creates a new cache which shares the same underlying cache but prepends `base_path` to all
    /// keys.
    pub fn with_base_path(&self, base_path: Path) -> Self {
        Self {
            cache: self.cache.clone(),
            base_path: Some(base_path),
        }
    }

    pub fn size(&self) -> usize {
        if let Some(cache) = self.cache.as_ref() {
            cache.run_pending_tasks();
            cache.entry_count() as usize
        } else {
            0
        }
    }

    pub fn get<T: Send + Sync + 'static>(&self, path: &Path) -> Option<Arc<T>> {
        let cache = self.cache.as_ref()?;
        let temp: Path;
        let path = if let Some(base_path) = &self.base_path {
            temp = base_path.child_path(path);
            &temp
        } else {
            path
        };
        cache
            .get(&(path.to_owned(), TypeId::of::<T>()))
            .map(|metadata| metadata.record.clone().downcast::<T>().unwrap())
    }

    pub fn insert<T: DeepSizeOf + Send + Sync + 'static>(&self, path: Path, metadata: Arc<T>) {
        let Some(cache) = self.cache.as_ref() else {
            return;
        };
        let path = if let Some(base_path) = &self.base_path {
            base_path.child_path(&path)
        } else {
            path
        };
        cache.insert((path, TypeId::of::<T>()), SizedRecord::new(metadata));
    }

    /// Get an item
    ///
    /// If it exists in the cache return that
    ///
    /// If it doesn't then run `loader` to load the item, insert into cache, and return
    pub async fn get_or_insert<T: DeepSizeOf + Send + Sync + 'static, F, Fut>(
        &self,
        path: &Path,
        loader: F,
    ) -> Result<Arc<T>>
    where
        F: Fn(&Path) -> Fut,
        Fut: Future<Output = Result<T>>,
    {
        if let Some(metadata) = self.get::<T>(path) {
            return Ok(metadata);
        }

        let metadata = Arc::new(loader(path).await?);
        self.insert(path.to_owned(), metadata.clone());
        Ok(metadata)
    }
}