datafusion_common/utils/proxy.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`VecAllocExt`] and [`RawTableAllocExt`] to help tracking of memory allocations
19
20use hashbrown::{
21 hash_table::HashTable,
22 raw::{Bucket, RawTable},
23};
24use std::mem::size_of;
25
26/// Extension trait for [`Vec`] to account for allocations.
27pub trait VecAllocExt {
28 /// Item type.
29 type T;
30
31 /// [Push](Vec::push) new element to vector and increase
32 /// `accounting` by any newly allocated bytes.
33 ///
34 /// Note that allocation counts capacity, not size
35 ///
36 /// # Example:
37 /// ```
38 /// # use datafusion_common::utils::proxy::VecAllocExt;
39 /// // use allocated to incrementally track how much memory is allocated in the vec
40 /// let mut allocated = 0;
41 /// let mut vec = Vec::new();
42 /// // Push data into the vec and the accounting will be updated to reflect
43 /// // memory allocation
44 /// vec.push_accounted(1, &mut allocated);
45 /// assert_eq!(allocated, 16); // space for 4 u32s
46 /// vec.push_accounted(1, &mut allocated);
47 /// assert_eq!(allocated, 16); // no new allocation needed
48 ///
49 /// // push more data into the vec
50 /// for _ in 0..10 { vec.push_accounted(1, &mut allocated); }
51 /// assert_eq!(allocated, 64); // underlying vec has space for 10 u32s
52 /// assert_eq!(vec.allocated_size(), 64);
53 /// ```
54 /// # Example with other allocations:
55 /// ```
56 /// # use datafusion_common::utils::proxy::VecAllocExt;
57 /// // You can use the same allocated size to track memory allocated by
58 /// // another source. For example
59 /// let mut allocated = 27;
60 /// let mut vec = Vec::new();
61 /// vec.push_accounted(1, &mut allocated); // allocates 16 bytes for vec
62 /// assert_eq!(allocated, 43); // 16 bytes for vec, 27 bytes for other
63 /// ```
64 fn push_accounted(&mut self, x: Self::T, accounting: &mut usize);
65
66 /// Return the amount of memory allocated by this Vec to store elements
67 /// (`size_of<T> * capacity`).
68 ///
69 /// Note this calculation is not recursive, and does not include any heap
70 /// allocations contained within the Vec's elements. Does not include the
71 /// size of `self`
72 ///
73 /// # Example:
74 /// ```
75 /// # use datafusion_common::utils::proxy::VecAllocExt;
76 /// let mut vec = Vec::new();
77 /// // Push data into the vec and the accounting will be updated to reflect
78 /// // memory allocation
79 /// vec.push(1);
80 /// assert_eq!(vec.allocated_size(), 16); // space for 4 u32s
81 /// vec.push(1);
82 /// assert_eq!(vec.allocated_size(), 16); // no new allocation needed
83 ///
84 /// // push more data into the vec
85 /// for _ in 0..10 { vec.push(1); }
86 /// assert_eq!(vec.allocated_size(), 64); // space for 64 now
87 /// ```
88 fn allocated_size(&self) -> usize;
89}
90
91impl<T> VecAllocExt for Vec<T> {
92 type T = T;
93
94 fn push_accounted(&mut self, x: Self::T, accounting: &mut usize) {
95 let prev_capacity = self.capacity();
96 self.push(x);
97 let new_capacity = self.capacity();
98 if new_capacity > prev_capacity {
99 // capacity changed, so we allocated more
100 let bump_size = (new_capacity - prev_capacity) * size_of::<T>();
101 // Note multiplication should never overflow because `push` would
102 // have panic'd first, but the checked_add could potentially
103 // overflow since accounting could be tracking additional values, and
104 // could be greater than what is stored in the Vec
105 *accounting = (*accounting).checked_add(bump_size).expect("overflow");
106 }
107 }
108 fn allocated_size(&self) -> usize {
109 size_of::<T>() * self.capacity()
110 }
111}
112
113/// Extension trait for hash browns [`RawTable`] to account for allocations.
114pub trait RawTableAllocExt {
115 /// Item type.
116 type T;
117
118 /// [Insert](RawTable::insert) new element into table and increase
119 /// `accounting` by any newly allocated bytes.
120 ///
121 /// Returns the bucket where the element was inserted.
122 /// Note that allocation counts capacity, not size.
123 ///
124 /// # Example:
125 /// ```
126 /// # use datafusion_common::utils::proxy::RawTableAllocExt;
127 /// # use hashbrown::raw::RawTable;
128 /// let mut table = RawTable::new();
129 /// let mut allocated = 0;
130 /// let hash_fn = |x: &u32| (*x as u64) % 1000;
131 /// // pretend 0x3117 is the hash value for 1
132 /// table.insert_accounted(1, hash_fn, &mut allocated);
133 /// assert_eq!(allocated, 64);
134 ///
135 /// // insert more values
136 /// for i in 0..100 { table.insert_accounted(i, hash_fn, &mut allocated); }
137 /// assert_eq!(allocated, 400);
138 /// ```
139 fn insert_accounted(
140 &mut self,
141 x: Self::T,
142 hasher: impl Fn(&Self::T) -> u64,
143 accounting: &mut usize,
144 ) -> Bucket<Self::T>;
145}
146
147impl<T> RawTableAllocExt for RawTable<T> {
148 type T = T;
149
150 fn insert_accounted(
151 &mut self,
152 x: Self::T,
153 hasher: impl Fn(&Self::T) -> u64,
154 accounting: &mut usize,
155 ) -> Bucket<Self::T> {
156 let hash = hasher(&x);
157
158 match self.try_insert_no_grow(hash, x) {
159 Ok(bucket) => bucket,
160 Err(x) => {
161 // need to request more memory
162
163 let bump_elements = self.capacity().max(16);
164 let bump_size = bump_elements * size_of::<T>();
165 *accounting = (*accounting).checked_add(bump_size).expect("overflow");
166
167 self.reserve(bump_elements, hasher);
168
169 // still need to insert the element since first try failed
170 // Note: cannot use `.expect` here because `T` may not implement `Debug`
171 match self.try_insert_no_grow(hash, x) {
172 Ok(bucket) => bucket,
173 Err(_) => panic!("just grew the container"),
174 }
175 }
176 }
177 }
178}
179
180/// Extension trait for hash browns [`HashTable`] to account for allocations.
181pub trait HashTableAllocExt {
182 /// Item type.
183 type T;
184
185 /// Insert new element into table and increase
186 /// `accounting` by any newly allocated bytes.
187 ///
188 /// Returns the bucket where the element was inserted.
189 /// Note that allocation counts capacity, not size.
190 ///
191 /// # Example:
192 /// ```
193 /// # use datafusion_common::utils::proxy::HashTableAllocExt;
194 /// # use hashbrown::hash_table::HashTable;
195 /// let mut table = HashTable::new();
196 /// let mut allocated = 0;
197 /// let hash_fn = |x: &u32| (*x as u64) % 1000;
198 /// // pretend 0x3117 is the hash value for 1
199 /// table.insert_accounted(1, hash_fn, &mut allocated);
200 /// assert_eq!(allocated, 64);
201 ///
202 /// // insert more values
203 /// for i in 0..100 { table.insert_accounted(i, hash_fn, &mut allocated); }
204 /// assert_eq!(allocated, 400);
205 /// ```
206 fn insert_accounted(
207 &mut self,
208 x: Self::T,
209 hasher: impl Fn(&Self::T) -> u64,
210 accounting: &mut usize,
211 );
212}
213
214impl<T> HashTableAllocExt for HashTable<T>
215where
216 T: Eq,
217{
218 type T = T;
219
220 fn insert_accounted(
221 &mut self,
222 x: Self::T,
223 hasher: impl Fn(&Self::T) -> u64,
224 accounting: &mut usize,
225 ) {
226 let hash = hasher(&x);
227
228 // NOTE: `find_entry` does NOT grow!
229 match self.find_entry(hash, |y| y == &x) {
230 Ok(_occupied) => {}
231 Err(_absent) => {
232 if self.len() == self.capacity() {
233 // need to request more memory
234 let bump_elements = self.capacity().max(16);
235 let bump_size = bump_elements * size_of::<T>();
236 *accounting = (*accounting).checked_add(bump_size).expect("overflow");
237
238 self.reserve(bump_elements, &hasher);
239 }
240
241 // still need to insert the element since first try failed
242 self.entry(hash, |y| y == &x, hasher).insert(x);
243 }
244 }
245 }
246}