odbc_api/columnar_bulk_inserter.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
use crate::{
buffers::{ColumnBuffer, TextColumn},
execute::execute,
handles::{AsStatementRef, HasDataType, Statement, StatementRef},
CursorImpl, Error,
};
/// Can be used to execute a statement with bulk array paramters. Contrary to its name any statement
/// with parameters can be executed, not only `INSERT` however inserting large amounts of data in
/// batches is the primary intended use case.
///
/// Binding new buffers is quite expensive in ODBC, so the parameter buffers are reused for each
/// batch (so the pointers bound to the statment stay valid). So we copy each batch of data into the
/// buffers already bound first rather than binding user defined buffer. Often the data might need
/// to be transformed anyway, so the copy is no actual overhead. Once the buffers are filled with a
/// batch, we send the data.
pub struct ColumnarBulkInserter<S, C> {
// We maintain the invariant that the parameters are bound to the statement that parameter set
// size reflects the number of valid rows in the batch.
statement: S,
parameter_set_size: usize,
capacity: usize,
/// We maintain the invariant that none of these buffers is truncated.
parameters: Vec<C>,
}
impl<S, C> ColumnarBulkInserter<S, C>
where
S: AsStatementRef,
{
/// Users are not encouraged to call this directly.
///
/// # Safety
///
/// * Statement is expected to be a perpared statement.
/// * Parameters must all be valid for insertion. An example for an invalid parameter would be
/// a text buffer with a cell those indiactor value exceeds the maximum element length. This
/// can happen after when truncation occurs then writing into a buffer.
pub unsafe fn new(mut statement: S, parameters: Vec<C>) -> Result<Self, Error>
where
C: ColumnBuffer + HasDataType,
{
let mut stmt = statement.as_stmt_ref();
stmt.reset_parameters();
let mut parameter_number = 1;
// Bind buffers to statement.
for column in ¶meters {
if let Err(error) = stmt
.bind_input_parameter(parameter_number, column)
.into_result(&stmt)
{
// This early return using `?` is risky. We actually did bind some parameters
// already. We cannot guarantee that the bound pointers stay valid in case of an
// error since `Self` is never constructed. We would away with this, if we took
// ownership of the statement and it is destroyed should the constructor not
// succeed. However columnar bulk inserter can also be instantiated with borrowed
// statements. This is why we reset the parameters on error.
stmt.reset_parameters();
return Err(error);
}
parameter_number += 1;
}
let capacity = parameters
.iter()
.map(|col| col.capacity())
.min()
.unwrap_or(0);
Ok(Self {
statement,
parameter_set_size: 0,
capacity,
parameters,
})
}
/// Execute the prepared statement, with the parameters bound
pub fn execute(&mut self) -> Result<Option<CursorImpl<StatementRef<'_>>>, Error> {
let mut stmt = self.statement.as_stmt_ref();
unsafe {
if self.parameter_set_size == 0 {
// A batch size of 0 will not execute anything, same as for execute on connection or
// prepared.
Ok(None)
} else {
// We reset the parameter set size, in order to adequatly handle batches of
// different size then inserting into the database.
stmt.set_paramset_size(self.parameter_set_size);
execute(stmt, None)
}
}
}
/// Sets the number of rows in the buffer to zero.
pub fn clear(&mut self) {
self.parameter_set_size = 0;
}
/// Number of valid rows in the buffer
pub fn num_rows(&self) -> usize {
self.parameter_set_size
}
/// Set number of valid rows in the buffer. Must not be larger than the batch size. If the
/// specified number than the number of valid rows currently held by the buffer additional they
/// will just hold the value previously assigned to them. Therfore if extending the number of
/// valid rows users should take care to assign values to these rows. However, even if not
/// assigend it is always guaranteed that every cell is valid for insertion and will not cause
/// out of bounds access down in the ODBC driver. Therefore this method is safe. You can set
/// the number of valid rows before or after filling values into the buffer, but you must do so
/// before executing the query.
pub fn set_num_rows(&mut self, num_rows: usize) {
if num_rows > self.capacity {
panic!(
"Columnar buffer may not be resized to a value higher than the maximum number of \
rows initially specified in the constructor."
);
}
self.parameter_set_size = num_rows;
}
/// Use this method to gain write access to the actual column data.
///
/// # Parameters
///
/// * `buffer_index`: Please note that the buffer index is not identical to the ODBC column
/// index. For one it is zero based. It also indexes the buffer bound, and not the columns of
/// the output result set. This is important, because not every column needs to be bound. Some
/// columns may simply be ignored. That being said, if every column of the output is bound in
/// the buffer, in the same order in which they are enumerated in the result set, the
/// relationship between column index and buffer index is `buffer_index = column_index - 1`.
///
/// # Example
///
/// This method is intended to be called if using [`ColumnarBulkInserter`] for column wise bulk
/// inserts.
///
/// ```no_run
/// use odbc_api::{Connection, Error, buffers::BufferDesc};
///
/// fn insert_birth_years(conn: &Connection, names: &[&str], years: &[i16])
/// -> Result<(), Error>
/// {
///
/// // All columns must have equal length.
/// assert_eq!(names.len(), years.len());
/// // Prepare the insert statement
/// let prepared = conn.prepare("INSERT INTO Birthdays (name, year) VALUES (?, ?)")?;
/// // Create a columnar buffer which fits the input parameters.
/// let buffer_description = [
/// BufferDesc::Text { max_str_len: 255 },
/// BufferDesc::I16 { nullable: false },
/// ];
/// // Here we do everything in one batch. So the capacity is the number of input
/// // parameters.
/// let capacity = names.len();
/// let mut prebound = prepared.into_column_inserter(capacity, buffer_description)?;
/// // Set number of input rows in the current batch.
/// prebound.set_num_rows(names.len());
/// // Fill the buffer with values column by column
///
/// // Fill names
/// let mut col = prebound
/// .column_mut(0)
/// .as_text_view()
/// .expect("We know the name column to hold text.");
/// for (index, name) in names.iter().map(|s| Some(s.as_bytes())).enumerate() {
/// col.set_cell(index, name);
/// }
///
/// // Fill birth years
/// let mut col = prebound
/// .column_mut(1)
/// .as_slice::<i16>()
/// .expect("We know the year column to hold i16.");
/// col.copy_from_slice(years);
///
/// // Execute the prepared statment with the bound array parameters. Sending the values to
/// // the database.
/// prebound.execute()?;
/// Ok(())
/// }
/// ```
pub fn column_mut<'a>(&'a mut self, buffer_index: usize) -> C::SliceMut
where
C: BoundInputSlice<'a>,
{
unsafe {
self.parameters[buffer_index]
.as_view_mut((buffer_index + 1) as u16, self.statement.as_stmt_ref())
}
}
/// Maximum number of rows the buffer can hold at once.
pub fn capacity(&self) -> usize {
self.capacity
}
}
/// You can obtain a mutable slice of a column buffer which allows you to change its contents.
///
/// # Safety
///
/// * If any operations have been performed which would invalidate the pointers bound to the
/// statement, the slice must use the statement handle to rebind the column, at the end of its
/// lifetime (at the latest).
/// * All values must be complete. I.e. none of the values must be truncated.
pub unsafe trait BoundInputSlice<'a> {
/// Intended to allow for modifying buffer contents, while leaving the bound parameter buffers
/// valid.
type SliceMut;
/// Obtain a mutable view on a parameter buffer in order to change the parameter value(s)
/// submitted when executing the statement.
///
/// # Safety
///
/// * The statement must be the statment the column buffer is bound to. The index must be the
/// parameter index it is bound at.
/// * All values must be complete. I.e. none of the values must be truncated.
unsafe fn as_view_mut(
&'a mut self,
parameter_index: u16,
stmt: StatementRef<'a>,
) -> Self::SliceMut;
}
impl<S> ColumnarBulkInserter<S, TextColumn<u8>> {
/// Takes one element from the iterator for each internal column buffer and appends it to the
/// end of the buffer. Should a cell of the row be too large for the associated column buffer,
/// the column buffer will be reallocated with `1.2` times its size, and rebound to the
/// statement.
///
/// This method panics if it is tried to insert elements beyond batch size. It will also panic
/// if row does not contain at least one item for each internal column buffer.
pub fn append<'b>(
&mut self,
mut row: impl Iterator<Item = Option<&'b [u8]>>,
) -> Result<(), Error>
where
S: AsStatementRef,
{
if self.capacity == self.parameter_set_size {
panic!("Trying to insert elements into TextRowSet beyond batch size.")
}
let mut col_index = 1;
for column in &mut self.parameters {
let text = row.next().expect(
"Row passed to TextRowSet::append must contain one element for each column.",
);
if let Some(text) = text {
unsafe {
column
.as_view_mut(col_index, self.statement.as_stmt_ref())
.ensure_max_element_length(text.len(), self.parameter_set_size)?;
}
column.set_value(self.parameter_set_size, Some(text));
} else {
column.set_value(self.parameter_set_size, None);
}
col_index += 1;
}
self.parameter_set_size += 1;
Ok(())
}
}