odbc_api/columnar_bulk_inserter.rs
1use crate::{
2 buffers::{ColumnBuffer, TextColumn},
3 execute::execute,
4 handles::{AsStatementRef, HasDataType, Statement, StatementRef},
5 CursorImpl, Error,
6};
7
8/// Can be used to execute a statement with bulk array paramters. Contrary to its name any statement
9/// with parameters can be executed, not only `INSERT` however inserting large amounts of data in
10/// batches is the primary intended use case.
11///
12/// Binding new buffers is quite expensive in ODBC, so the parameter buffers are reused for each
13/// batch (so the pointers bound to the statment stay valid). So we copy each batch of data into the
14/// buffers already bound first rather than binding user defined buffer. Often the data might need
15/// to be transformed anyway, so the copy is no actual overhead. Once the buffers are filled with a
16/// batch, we send the data.
17pub struct ColumnarBulkInserter<S, C> {
18 // We maintain the invariant that the parameters are bound to the statement that parameter set
19 // size reflects the number of valid rows in the batch.
20 statement: S,
21 parameter_set_size: usize,
22 capacity: usize,
23 /// We maintain the invariant that none of these buffers is truncated.
24 parameters: Vec<C>,
25}
26
27impl<S, C> ColumnarBulkInserter<S, C>
28where
29 S: AsStatementRef,
30{
31 /// Users are not encouraged to call this directly.
32 ///
33 /// # Safety
34 ///
35 /// * Statement is expected to be a perpared statement.
36 /// * Parameters must all be valid for insertion. An example for an invalid parameter would be
37 /// a text buffer with a cell those indiactor value exceeds the maximum element length. This
38 /// can happen after when truncation occurs then writing into a buffer.
39 pub unsafe fn new(mut statement: S, parameters: Vec<C>) -> Result<Self, Error>
40 where
41 C: ColumnBuffer + HasDataType,
42 {
43 let mut stmt = statement.as_stmt_ref();
44 stmt.reset_parameters();
45 let mut parameter_number = 1;
46 // Bind buffers to statement.
47 for column in ¶meters {
48 if let Err(error) = stmt
49 .bind_input_parameter(parameter_number, column)
50 .into_result(&stmt)
51 {
52 // This early return using `?` is risky. We actually did bind some parameters
53 // already. We cannot guarantee that the bound pointers stay valid in case of an
54 // error since `Self` is never constructed. We would away with this, if we took
55 // ownership of the statement and it is destroyed should the constructor not
56 // succeed. However columnar bulk inserter can also be instantiated with borrowed
57 // statements. This is why we reset the parameters on error.
58 stmt.reset_parameters();
59 return Err(error);
60 }
61 parameter_number += 1;
62 }
63 let capacity = parameters
64 .iter()
65 .map(|col| col.capacity())
66 .min()
67 .unwrap_or(0);
68 Ok(Self {
69 statement,
70 parameter_set_size: 0,
71 capacity,
72 parameters,
73 })
74 }
75
76 /// Execute the prepared statement, with the parameters bound
77 pub fn execute(&mut self) -> Result<Option<CursorImpl<StatementRef<'_>>>, Error> {
78 let mut stmt = self.statement.as_stmt_ref();
79 unsafe {
80 if self.parameter_set_size == 0 {
81 // A batch size of 0 will not execute anything, same as for execute on connection or
82 // prepared.
83 Ok(None)
84 } else {
85 // We reset the parameter set size, in order to adequatly handle batches of
86 // different size then inserting into the database.
87 stmt.set_paramset_size(self.parameter_set_size);
88 execute(stmt, None)
89 }
90 }
91 }
92
93 /// Sets the number of rows in the buffer to zero.
94 pub fn clear(&mut self) {
95 self.parameter_set_size = 0;
96 }
97
98 /// Number of valid rows in the buffer
99 pub fn num_rows(&self) -> usize {
100 self.parameter_set_size
101 }
102
103 /// Set number of valid rows in the buffer. Must not be larger than the batch size. If the
104 /// specified number than the number of valid rows currently held by the buffer additional they
105 /// will just hold the value previously assigned to them. Therfore if extending the number of
106 /// valid rows users should take care to assign values to these rows. However, even if not
107 /// assigend it is always guaranteed that every cell is valid for insertion and will not cause
108 /// out of bounds access down in the ODBC driver. Therefore this method is safe. You can set
109 /// the number of valid rows before or after filling values into the buffer, but you must do so
110 /// before executing the query.
111 pub fn set_num_rows(&mut self, num_rows: usize) {
112 if num_rows > self.capacity {
113 panic!(
114 "Columnar buffer may not be resized to a value higher than the maximum number of \
115 rows initially specified in the constructor."
116 );
117 }
118 self.parameter_set_size = num_rows;
119 }
120
121 /// Use this method to gain write access to the actual column data.
122 ///
123 /// # Parameters
124 ///
125 /// * `buffer_index`: Please note that the buffer index is not identical to the ODBC column
126 /// index. For one it is zero based. It also indexes the buffer bound, and not the columns of
127 /// the output result set. This is important, because not every column needs to be bound. Some
128 /// columns may simply be ignored. That being said, if every column of the output is bound in
129 /// the buffer, in the same order in which they are enumerated in the result set, the
130 /// relationship between column index and buffer index is `buffer_index = column_index - 1`.
131 ///
132 /// # Example
133 ///
134 /// This method is intended to be called if using [`ColumnarBulkInserter`] for column wise bulk
135 /// inserts.
136 ///
137 /// ```no_run
138 /// use odbc_api::{Connection, Error, buffers::BufferDesc};
139 ///
140 /// fn insert_birth_years(conn: &Connection, names: &[&str], years: &[i16])
141 /// -> Result<(), Error>
142 /// {
143 ///
144 /// // All columns must have equal length.
145 /// assert_eq!(names.len(), years.len());
146 /// // Prepare the insert statement
147 /// let prepared = conn.prepare("INSERT INTO Birthdays (name, year) VALUES (?, ?)")?;
148 /// // Create a columnar buffer which fits the input parameters.
149 /// let buffer_description = [
150 /// BufferDesc::Text { max_str_len: 255 },
151 /// BufferDesc::I16 { nullable: false },
152 /// ];
153 /// // Here we do everything in one batch. So the capacity is the number of input
154 /// // parameters.
155 /// let capacity = names.len();
156 /// let mut prebound = prepared.into_column_inserter(capacity, buffer_description)?;
157 /// // Set number of input rows in the current batch.
158 /// prebound.set_num_rows(names.len());
159 /// // Fill the buffer with values column by column
160 ///
161 /// // Fill names
162 /// let mut col = prebound
163 /// .column_mut(0)
164 /// .as_text_view()
165 /// .expect("We know the name column to hold text.");
166 /// for (index, name) in names.iter().map(|s| Some(s.as_bytes())).enumerate() {
167 /// col.set_cell(index, name);
168 /// }
169 ///
170 /// // Fill birth years
171 /// let mut col = prebound
172 /// .column_mut(1)
173 /// .as_slice::<i16>()
174 /// .expect("We know the year column to hold i16.");
175 /// col.copy_from_slice(years);
176 ///
177 /// // Execute the prepared statment with the bound array parameters. Sending the values to
178 /// // the database.
179 /// prebound.execute()?;
180 /// Ok(())
181 /// }
182 /// ```
183 pub fn column_mut<'a>(&'a mut self, buffer_index: usize) -> C::SliceMut
184 where
185 C: BoundInputSlice<'a>,
186 {
187 unsafe {
188 self.parameters[buffer_index]
189 .as_view_mut((buffer_index + 1) as u16, self.statement.as_stmt_ref())
190 }
191 }
192
193 /// Maximum number of rows the buffer can hold at once.
194 pub fn capacity(&self) -> usize {
195 self.capacity
196 }
197}
198
199/// You can obtain a mutable slice of a column buffer which allows you to change its contents.
200///
201/// # Safety
202///
203/// * If any operations have been performed which would invalidate the pointers bound to the
204/// statement, the slice must use the statement handle to rebind the column, at the end of its
205/// lifetime (at the latest).
206/// * All values must be complete. I.e. none of the values must be truncated.
207pub unsafe trait BoundInputSlice<'a> {
208 /// Intended to allow for modifying buffer contents, while leaving the bound parameter buffers
209 /// valid.
210 type SliceMut;
211
212 /// Obtain a mutable view on a parameter buffer in order to change the parameter value(s)
213 /// submitted when executing the statement.
214 ///
215 /// # Safety
216 ///
217 /// * The statement must be the statment the column buffer is bound to. The index must be the
218 /// parameter index it is bound at.
219 /// * All values must be complete. I.e. none of the values must be truncated.
220 unsafe fn as_view_mut(
221 &'a mut self,
222 parameter_index: u16,
223 stmt: StatementRef<'a>,
224 ) -> Self::SliceMut;
225}
226
227impl<S> ColumnarBulkInserter<S, TextColumn<u8>> {
228 /// Takes one element from the iterator for each internal column buffer and appends it to the
229 /// end of the buffer. Should a cell of the row be too large for the associated column buffer,
230 /// the column buffer will be reallocated with `1.2` times its size, and rebound to the
231 /// statement.
232 ///
233 /// This method panics if it is tried to insert elements beyond batch size. It will also panic
234 /// if row does not contain at least one item for each internal column buffer.
235 pub fn append<'b>(
236 &mut self,
237 mut row: impl Iterator<Item = Option<&'b [u8]>>,
238 ) -> Result<(), Error>
239 where
240 S: AsStatementRef,
241 {
242 if self.capacity == self.parameter_set_size {
243 panic!("Trying to insert elements into TextRowSet beyond batch size.")
244 }
245
246 let mut col_index = 1;
247 for column in &mut self.parameters {
248 let text = row.next().expect(
249 "Row passed to TextRowSet::append must contain one element for each column.",
250 );
251 if let Some(text) = text {
252 unsafe {
253 column
254 .as_view_mut(col_index, self.statement.as_stmt_ref())
255 .ensure_max_element_length(text.len(), self.parameter_set_size)?;
256 }
257 column.set_value(self.parameter_set_size, Some(text));
258 } else {
259 column.set_value(self.parameter_set_size, None);
260 }
261 col_index += 1;
262 }
263
264 self.parameter_set_size += 1;
265
266 Ok(())
267 }
268}