datafusion_common/utils/
proxy.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`VecAllocExt`] and [`RawTableAllocExt`] to help tracking of memory allocations
19
20use hashbrown::{
21    hash_table::HashTable,
22    raw::{Bucket, RawTable},
23};
24use std::mem::size_of;
25
26/// Extension trait for [`Vec`] to account for allocations.
27pub trait VecAllocExt {
28    /// Item type.
29    type T;
30
31    /// [Push](Vec::push) new element to vector and increase
32    /// `accounting` by any newly allocated bytes.
33    ///
34    /// Note that allocation counts  capacity, not size
35    ///
36    /// # Example:
37    /// ```
38    /// # use datafusion_common::utils::proxy::VecAllocExt;
39    /// // use allocated to incrementally track how much memory is allocated in the vec
40    /// let mut allocated = 0;
41    /// let mut vec = Vec::new();
42    /// // Push data into the vec and the accounting will be updated to reflect
43    /// // memory allocation
44    /// vec.push_accounted(1, &mut allocated);
45    /// assert_eq!(allocated, 16); // space for 4 u32s
46    /// vec.push_accounted(1, &mut allocated);
47    /// assert_eq!(allocated, 16); // no new allocation needed
48    ///
49    /// // push more data into the vec
50    /// for _ in 0..10 { vec.push_accounted(1, &mut allocated); }
51    /// assert_eq!(allocated, 64); // underlying vec has space for 10 u32s
52    /// assert_eq!(vec.allocated_size(), 64);
53    /// ```
54    /// # Example with other allocations:
55    /// ```
56    /// # use datafusion_common::utils::proxy::VecAllocExt;
57    /// // You can use the same allocated size to track memory allocated by
58    /// // another source. For example
59    /// let mut allocated = 27;
60    /// let mut vec = Vec::new();
61    /// vec.push_accounted(1, &mut allocated); // allocates 16 bytes for vec
62    /// assert_eq!(allocated, 43); // 16 bytes for vec, 27 bytes for other
63    /// ```
64    fn push_accounted(&mut self, x: Self::T, accounting: &mut usize);
65
66    /// Return the amount of memory allocated by this Vec to store elements
67    /// (`size_of<T> * capacity`).
68    ///
69    /// Note this calculation is not recursive, and does not include any heap
70    /// allocations contained within the Vec's elements. Does not include the
71    /// size of `self`
72    ///
73    /// # Example:
74    /// ```
75    /// # use datafusion_common::utils::proxy::VecAllocExt;
76    /// let mut vec = Vec::new();
77    /// // Push data into the vec and the accounting will be updated to reflect
78    /// // memory allocation
79    /// vec.push(1);
80    /// assert_eq!(vec.allocated_size(), 16); // space for 4 u32s
81    /// vec.push(1);
82    /// assert_eq!(vec.allocated_size(), 16); // no new allocation needed
83    ///
84    /// // push more data into the vec
85    /// for _ in 0..10 { vec.push(1); }
86    /// assert_eq!(vec.allocated_size(), 64); // space for 64 now
87    /// ```
88    fn allocated_size(&self) -> usize;
89}
90
91impl<T> VecAllocExt for Vec<T> {
92    type T = T;
93
94    fn push_accounted(&mut self, x: Self::T, accounting: &mut usize) {
95        let prev_capacity = self.capacity();
96        self.push(x);
97        let new_capacity = self.capacity();
98        if new_capacity > prev_capacity {
99            // capacity changed, so we allocated more
100            let bump_size = (new_capacity - prev_capacity) * size_of::<T>();
101            // Note multiplication should never overflow because `push` would
102            // have panic'd first, but the checked_add could potentially
103            // overflow since accounting could be tracking additional values, and
104            // could be greater than what is stored in the Vec
105            *accounting = (*accounting).checked_add(bump_size).expect("overflow");
106        }
107    }
108    fn allocated_size(&self) -> usize {
109        size_of::<T>() * self.capacity()
110    }
111}
112
113/// Extension trait for hash browns [`RawTable`] to account for allocations.
114pub trait RawTableAllocExt {
115    /// Item type.
116    type T;
117
118    /// [Insert](RawTable::insert) new element into table and increase
119    /// `accounting` by any newly allocated bytes.
120    ///
121    /// Returns the bucket where the element was inserted.
122    /// Note that allocation counts capacity, not size.
123    ///
124    /// # Example:
125    /// ```
126    /// # use datafusion_common::utils::proxy::RawTableAllocExt;
127    /// # use hashbrown::raw::RawTable;
128    /// let mut table = RawTable::new();
129    /// let mut allocated = 0;
130    /// let hash_fn = |x: &u32| (*x as u64) % 1000;
131    /// // pretend 0x3117 is the hash value for 1
132    /// table.insert_accounted(1, hash_fn, &mut allocated);
133    /// assert_eq!(allocated, 64);
134    ///
135    /// // insert more values
136    /// for i in 0..100 { table.insert_accounted(i, hash_fn, &mut allocated); }
137    /// assert_eq!(allocated, 400);
138    /// ```
139    fn insert_accounted(
140        &mut self,
141        x: Self::T,
142        hasher: impl Fn(&Self::T) -> u64,
143        accounting: &mut usize,
144    ) -> Bucket<Self::T>;
145}
146
147impl<T> RawTableAllocExt for RawTable<T> {
148    type T = T;
149
150    fn insert_accounted(
151        &mut self,
152        x: Self::T,
153        hasher: impl Fn(&Self::T) -> u64,
154        accounting: &mut usize,
155    ) -> Bucket<Self::T> {
156        let hash = hasher(&x);
157
158        match self.try_insert_no_grow(hash, x) {
159            Ok(bucket) => bucket,
160            Err(x) => {
161                // need to request more memory
162
163                let bump_elements = self.capacity().max(16);
164                let bump_size = bump_elements * size_of::<T>();
165                *accounting = (*accounting).checked_add(bump_size).expect("overflow");
166
167                self.reserve(bump_elements, hasher);
168
169                // still need to insert the element since first try failed
170                // Note: cannot use `.expect` here because `T` may not implement `Debug`
171                match self.try_insert_no_grow(hash, x) {
172                    Ok(bucket) => bucket,
173                    Err(_) => panic!("just grew the container"),
174                }
175            }
176        }
177    }
178}
179
180/// Extension trait for hash browns [`HashTable`] to account for allocations.
181pub trait HashTableAllocExt {
182    /// Item type.
183    type T;
184
185    /// Insert new element into table and increase
186    /// `accounting` by any newly allocated bytes.
187    ///
188    /// Returns the bucket where the element was inserted.
189    /// Note that allocation counts capacity, not size.
190    ///
191    /// # Example:
192    /// ```
193    /// # use datafusion_common::utils::proxy::HashTableAllocExt;
194    /// # use hashbrown::hash_table::HashTable;
195    /// let mut table = HashTable::new();
196    /// let mut allocated = 0;
197    /// let hash_fn = |x: &u32| (*x as u64) % 1000;
198    /// // pretend 0x3117 is the hash value for 1
199    /// table.insert_accounted(1, hash_fn, &mut allocated);
200    /// assert_eq!(allocated, 64);
201    ///
202    /// // insert more values
203    /// for i in 0..100 { table.insert_accounted(i, hash_fn, &mut allocated); }
204    /// assert_eq!(allocated, 400);
205    /// ```
206    fn insert_accounted(
207        &mut self,
208        x: Self::T,
209        hasher: impl Fn(&Self::T) -> u64,
210        accounting: &mut usize,
211    );
212}
213
214impl<T> HashTableAllocExt for HashTable<T>
215where
216    T: Eq,
217{
218    type T = T;
219
220    fn insert_accounted(
221        &mut self,
222        x: Self::T,
223        hasher: impl Fn(&Self::T) -> u64,
224        accounting: &mut usize,
225    ) {
226        let hash = hasher(&x);
227
228        // NOTE: `find_entry` does NOT grow!
229        match self.find_entry(hash, |y| y == &x) {
230            Ok(_occupied) => {}
231            Err(_absent) => {
232                if self.len() == self.capacity() {
233                    // need to request more memory
234                    let bump_elements = self.capacity().max(16);
235                    let bump_size = bump_elements * size_of::<T>();
236                    *accounting = (*accounting).checked_add(bump_size).expect("overflow");
237
238                    self.reserve(bump_elements, &hasher);
239                }
240
241                // still need to insert the element since first try failed
242                self.entry(hash, |y| y == &x, hasher).insert(x);
243            }
244        }
245    }
246}