odbc_api/
columnar_bulk_inserter.rs

1use crate::{
2    buffers::{ColumnBuffer, TextColumn},
3    execute::execute,
4    handles::{AsStatementRef, HasDataType, Statement, StatementRef},
5    CursorImpl, Error,
6};
7
8/// Can be used to execute a statement with bulk array paramters. Contrary to its name any statement
9/// with parameters can be executed, not only `INSERT` however inserting large amounts of data in
10/// batches is the primary intended use case.
11///
12/// Binding new buffers is quite expensive in ODBC, so the parameter buffers are reused for each
13/// batch (so the pointers bound to the statment stay valid). So we copy each batch of data into the
14/// buffers already bound first rather than binding user defined buffer. Often the data might need
15/// to be transformed anyway, so the copy is no actual overhead. Once the buffers are filled with a
16/// batch, we send the data.
17pub struct ColumnarBulkInserter<S, C> {
18    // We maintain the invariant that the parameters are bound to the statement that parameter set
19    // size reflects the number of valid rows in the batch.
20    statement: S,
21    parameter_set_size: usize,
22    capacity: usize,
23    /// We maintain the invariant that none of these buffers is truncated.
24    parameters: Vec<C>,
25}
26
27impl<S, C> ColumnarBulkInserter<S, C>
28where
29    S: AsStatementRef,
30{
31    /// Users are not encouraged to call this directly.
32    ///
33    /// # Safety
34    ///
35    /// * Statement is expected to be a perpared statement.
36    /// * Parameters must all be valid for insertion. An example for an invalid parameter would be
37    ///   a text buffer with a cell those indiactor value exceeds the maximum element length. This
38    ///   can happen after when truncation occurs then writing into a buffer.
39    pub unsafe fn new(mut statement: S, parameters: Vec<C>) -> Result<Self, Error>
40    where
41        C: ColumnBuffer + HasDataType,
42    {
43        let mut stmt = statement.as_stmt_ref();
44        stmt.reset_parameters();
45        let mut parameter_number = 1;
46        // Bind buffers to statement.
47        for column in &parameters {
48            if let Err(error) = stmt
49                .bind_input_parameter(parameter_number, column)
50                .into_result(&stmt)
51            {
52                // This early return using `?` is risky. We actually did bind some parameters
53                // already. We cannot guarantee that the bound pointers stay valid in case of an
54                // error since `Self` is never constructed. We would away with this, if we took
55                // ownership of the statement and it is destroyed should the constructor not
56                // succeed. However columnar bulk inserter can also be instantiated with borrowed
57                // statements. This is why we reset the parameters on error.
58                stmt.reset_parameters();
59                return Err(error);
60            }
61            parameter_number += 1;
62        }
63        let capacity = parameters
64            .iter()
65            .map(|col| col.capacity())
66            .min()
67            .unwrap_or(0);
68        Ok(Self {
69            statement,
70            parameter_set_size: 0,
71            capacity,
72            parameters,
73        })
74    }
75
76    /// Execute the prepared statement, with the parameters bound
77    pub fn execute(&mut self) -> Result<Option<CursorImpl<StatementRef<'_>>>, Error> {
78        let mut stmt = self.statement.as_stmt_ref();
79        unsafe {
80            if self.parameter_set_size == 0 {
81                // A batch size of 0 will not execute anything, same as for execute on connection or
82                // prepared.
83                Ok(None)
84            } else {
85                // We reset the parameter set size, in order to adequatly handle batches of
86                // different size then inserting into the database.
87                stmt.set_paramset_size(self.parameter_set_size);
88                execute(stmt, None)
89            }
90        }
91    }
92
93    /// Sets the number of rows in the buffer to zero.
94    pub fn clear(&mut self) {
95        self.parameter_set_size = 0;
96    }
97
98    /// Number of valid rows in the buffer
99    pub fn num_rows(&self) -> usize {
100        self.parameter_set_size
101    }
102
103    /// Set number of valid rows in the buffer. Must not be larger than the batch size. If the
104    /// specified number than the number of valid rows currently held by the buffer additional they
105    /// will just hold the value previously assigned to them. Therfore if extending the number of
106    /// valid rows users should take care to assign values to these rows. However, even if not
107    /// assigend it is always guaranteed that every cell is valid for insertion and will not cause
108    /// out of bounds access down in the ODBC driver. Therefore this method is safe. You can set
109    /// the number of valid rows before or after filling values into the buffer, but you must do so
110    /// before executing the query.
111    pub fn set_num_rows(&mut self, num_rows: usize) {
112        if num_rows > self.capacity {
113            panic!(
114                "Columnar buffer may not be resized to a value higher than the maximum number of \
115                rows initially specified in the constructor."
116            );
117        }
118        self.parameter_set_size = num_rows;
119    }
120
121    /// Use this method to gain write access to the actual column data.
122    ///
123    /// # Parameters
124    ///
125    /// * `buffer_index`: Please note that the buffer index is not identical to the ODBC column
126    ///   index. For one it is zero based. It also indexes the buffer bound, and not the columns of
127    ///   the output result set. This is important, because not every column needs to be bound. Some
128    ///   columns may simply be ignored. That being said, if every column of the output is bound in
129    ///   the buffer, in the same order in which they are enumerated in the result set, the
130    ///   relationship between column index and buffer index is `buffer_index = column_index - 1`.
131    ///
132    /// # Example
133    ///
134    /// This method is intended to be called if using [`ColumnarBulkInserter`] for column wise bulk
135    /// inserts.
136    ///
137    /// ```no_run
138    /// use odbc_api::{Connection, Error, buffers::BufferDesc};
139    ///
140    /// fn insert_birth_years(conn: &Connection, names: &[&str], years: &[i16])
141    ///     -> Result<(), Error>
142    /// {
143    ///
144    ///     // All columns must have equal length.
145    ///     assert_eq!(names.len(), years.len());
146    ///     // Prepare the insert statement
147    ///     let prepared = conn.prepare("INSERT INTO Birthdays (name, year) VALUES (?, ?)")?;
148    ///     // Create a columnar buffer which fits the input parameters.
149    ///     let buffer_description = [
150    ///         BufferDesc::Text { max_str_len: 255 },
151    ///         BufferDesc::I16 { nullable: false },
152    ///     ];
153    ///     // Here we do everything in one batch. So the capacity is the number of input
154    ///     // parameters.
155    ///     let capacity = names.len();
156    ///     let mut prebound = prepared.into_column_inserter(capacity, buffer_description)?;
157    ///     // Set number of input rows in the current batch.
158    ///     prebound.set_num_rows(names.len());
159    ///     // Fill the buffer with values column by column
160    ///
161    ///     // Fill names
162    ///     let mut col = prebound
163    ///         .column_mut(0)
164    ///         .as_text_view()
165    ///         .expect("We know the name column to hold text.");
166    ///     for (index, name) in names.iter().map(|s| Some(s.as_bytes())).enumerate() {
167    ///         col.set_cell(index, name);
168    ///     }
169    ///
170    ///     // Fill birth years
171    ///     let mut col = prebound
172    ///         .column_mut(1)
173    ///         .as_slice::<i16>()
174    ///         .expect("We know the year column to hold i16.");
175    ///     col.copy_from_slice(years);
176    ///
177    ///     // Execute the prepared statment with the bound array parameters. Sending the values to
178    ///     // the database.
179    ///     prebound.execute()?;
180    ///     Ok(())
181    /// }
182    /// ```
183    pub fn column_mut<'a>(&'a mut self, buffer_index: usize) -> C::SliceMut
184    where
185        C: BoundInputSlice<'a>,
186    {
187        unsafe {
188            self.parameters[buffer_index]
189                .as_view_mut((buffer_index + 1) as u16, self.statement.as_stmt_ref())
190        }
191    }
192
193    /// Maximum number of rows the buffer can hold at once.
194    pub fn capacity(&self) -> usize {
195        self.capacity
196    }
197}
198
199/// You can obtain a mutable slice of a column buffer which allows you to change its contents.
200///
201/// # Safety
202///
203/// * If any operations have been performed which would invalidate the pointers bound to the
204///   statement, the slice must use the statement handle to rebind the column, at the end of its
205///   lifetime (at the latest).
206/// * All values must be complete. I.e. none of the values must be truncated.
207pub unsafe trait BoundInputSlice<'a> {
208    /// Intended to allow for modifying buffer contents, while leaving the bound parameter buffers
209    /// valid.
210    type SliceMut;
211
212    /// Obtain a mutable view on a parameter buffer in order to change the parameter value(s)
213    /// submitted when executing the statement.
214    ///
215    /// # Safety
216    ///
217    /// * The statement must be the statment the column buffer is bound to. The index must be the
218    ///   parameter index it is bound at.
219    /// * All values must be complete. I.e. none of the values must be truncated.
220    unsafe fn as_view_mut(
221        &'a mut self,
222        parameter_index: u16,
223        stmt: StatementRef<'a>,
224    ) -> Self::SliceMut;
225}
226
227impl<S> ColumnarBulkInserter<S, TextColumn<u8>> {
228    /// Takes one element from the iterator for each internal column buffer and appends it to the
229    /// end of the buffer. Should a cell of the row be too large for the associated column buffer,
230    /// the column buffer will be reallocated with `1.2` times its size, and rebound to the
231    /// statement.
232    ///
233    /// This method panics if it is tried to insert elements beyond batch size. It will also panic
234    /// if row does not contain at least one item for each internal column buffer.
235    pub fn append<'b>(
236        &mut self,
237        mut row: impl Iterator<Item = Option<&'b [u8]>>,
238    ) -> Result<(), Error>
239    where
240        S: AsStatementRef,
241    {
242        if self.capacity == self.parameter_set_size {
243            panic!("Trying to insert elements into TextRowSet beyond batch size.")
244        }
245
246        let mut col_index = 1;
247        for column in &mut self.parameters {
248            let text = row.next().expect(
249                "Row passed to TextRowSet::append must contain one element for each column.",
250            );
251            if let Some(text) = text {
252                unsafe {
253                    column
254                        .as_view_mut(col_index, self.statement.as_stmt_ref())
255                        .ensure_max_element_length(text.len(), self.parameter_set_size)?;
256                }
257                column.set_value(self.parameter_set_size, Some(text));
258            } else {
259                column.set_value(self.parameter_set_size, None);
260            }
261            col_index += 1;
262        }
263
264        self.parameter_set_size += 1;
265
266        Ok(())
267    }
268}