odbc_api/buffers/columnar.rs
1use std::{
2 collections::HashSet,
3 num::NonZeroUsize,
4 str::{Utf8Error, from_utf8},
5};
6
7use crate::{
8 Error, ResultSetMetadata, RowSetBuffer,
9 columnar_bulk_inserter::BoundInputSlice,
10 cursor::TruncationInfo,
11 fixed_sized::Pod,
12 handles::{CDataMut, Statement, StatementRef},
13 parameter::WithDataType,
14 result_set_metadata::utf8_display_sizes,
15};
16
17use super::{Indicator, TextColumn};
18
19impl<C: ColumnBuffer> ColumnarBuffer<C> {
20 /// Create a new instance from columns with unique indicies. Capacity of the buffer will be the
21 /// minimum capacity of the columns. The constructed buffer is always empty (i.e. the number of
22 /// valid rows is considered to be zero).
23 ///
24 /// You do not want to call this constructor directly unless you want to provide your own buffer
25 /// implentation. Most users of this crate may want to use the constructors like
26 /// [`crate::buffers::ColumnarAnyBuffer::from_descs`] or
27 /// [`crate::buffers::TextRowSet::from_max_str_lens`] instead.
28 pub fn new(columns: Vec<(u16, C)>) -> Self {
29 // Assert capacity
30 let capacity = columns
31 .iter()
32 .map(|(_, col)| col.capacity())
33 .min()
34 .unwrap_or(0);
35
36 // Assert uniqueness of indices
37 let mut indices = HashSet::new();
38 if columns
39 .iter()
40 .any(move |&(col_index, _)| !indices.insert(col_index))
41 {
42 panic!("Column indices must be unique.")
43 }
44
45 unsafe { Self::new_unchecked(capacity, columns) }
46 }
47
48 /// # Safety
49 ///
50 /// * Indices must be unique
51 /// * Columns all must have enough `capacity`.
52 pub unsafe fn new_unchecked(capacity: usize, columns: Vec<(u16, C)>) -> Self {
53 ColumnarBuffer {
54 num_rows: Box::new(0),
55 row_capacity: capacity,
56 columns,
57 }
58 }
59
60 /// Number of valid rows in the buffer.
61 pub fn num_rows(&self) -> usize {
62 *self.num_rows
63 }
64
65 /// Return the number of columns in the row set.
66 pub fn num_cols(&self) -> usize {
67 self.columns.len()
68 }
69
70 /// Use this method to gain read access to the actual column data.
71 ///
72 /// # Parameters
73 ///
74 /// * `buffer_index`: Please note that the buffer index is not identical to the ODBC column
75 /// index. For one it is zero based. It also indexes the buffer bound, and not the columns of
76 /// the output result set. This is important, because not every column needs to be bound. Some
77 /// columns may simply be ignored. That being said, if every column of the output is bound in
78 /// the buffer, in the same order in which they are enumerated in the result set, the
79 /// relationship between column index and buffer index is `buffer_index = column_index - 1`.
80 pub fn column(&self, buffer_index: usize) -> C::View<'_> {
81 self.columns[buffer_index].1.view(*self.num_rows)
82 }
83}
84
85unsafe impl<C> RowSetBuffer for ColumnarBuffer<C>
86where
87 C: ColumnBuffer,
88{
89 fn bind_type(&self) -> usize {
90 0 // Specify columnar binding
91 }
92
93 fn row_array_size(&self) -> usize {
94 self.row_capacity
95 }
96
97 fn mut_num_fetch_rows(&mut self) -> &mut usize {
98 self.num_rows.as_mut()
99 }
100
101 unsafe fn bind_colmuns_to_cursor(&mut self, mut cursor: StatementRef<'_>) -> Result<(), Error> {
102 unsafe {
103 for (col_number, column) in &mut self.columns {
104 cursor.bind_col(*col_number, column).into_result(&cursor)?;
105 }
106 }
107 Ok(())
108 }
109
110 fn find_truncation(&self) -> Option<TruncationInfo> {
111 self.columns
112 .iter()
113 .enumerate()
114 .find_map(|(buffer_index, (_col_index, col_buffer))| {
115 col_buffer
116 .has_truncated_values(*self.num_rows)
117 .map(|indicator| TruncationInfo {
118 indicator: indicator.length(),
119 buffer_index,
120 })
121 })
122 }
123}
124
125/// A columnar buffer intended to be bound with [crate::Cursor::bind_buffer] in order to obtain
126/// results from a cursor.
127///
128/// Binds to the result set column wise. This is usually helpful in dataengineering or data sciense
129/// tasks. This buffer type can be used in situations there the schema of the queried data is known
130/// at compile time, as well as for generic applications which do work with wide range of different
131/// data.
132///
133/// # Example: Fetching results column wise with `ColumnarBuffer`.
134///
135/// Consider querying a table with two columns `year` and `name`.
136///
137/// ```no_run
138/// use odbc_api::{
139/// Environment, Cursor, ConnectionOptions,
140/// buffers::{AnySlice, BufferDesc, Item, ColumnarAnyBuffer},
141/// };
142///
143/// let env = Environment::new()?;
144///
145/// let batch_size = 1000; // Maximum number of rows in each row set
146/// let buffer_description = [
147/// // We know year to be a Nullable SMALLINT
148/// BufferDesc::I16 { nullable: true },
149/// // and name to be a required VARCHAR
150/// BufferDesc::Text { max_str_len: 255 },
151/// ];
152///
153/// /// Creates a columnar buffer fitting the buffer description with the capacity of `batch_size`.
154/// let mut buffer = ColumnarAnyBuffer::from_descs(batch_size, buffer_description);
155///
156/// let mut conn = env.connect(
157/// "YourDatabase", "SA", "My@Test@Password1",
158/// ConnectionOptions::default(),
159/// )?;
160/// let query = "SELECT year, name FROM Birthdays;";
161/// let params = ();
162/// let timeout_sec = None;
163/// if let Some(cursor) = conn.execute(query, params, timeout_sec)? {
164/// // Bind buffer to cursor. We bind the buffer as a mutable reference here, which makes it
165/// // easier to reuse for other queries, but we could have taken ownership.
166/// let mut row_set_cursor = cursor.bind_buffer(&mut buffer)?;
167/// // Loop over row sets
168/// while let Some(row_set) = row_set_cursor.fetch()? {
169/// // Process years in row set
170/// let year_col = row_set.column(0);
171/// for year in i16::as_nullable_slice(year_col)
172/// .expect("Year column buffer expected to be nullable Int")
173/// {
174/// // Iterate over `Option<i16>` with it ..
175/// }
176/// // Process names in row set
177/// let name_col = row_set.column(1);
178/// for name in name_col
179/// .as_text_view()
180/// .expect("Name column buffer expected to be text")
181/// .iter()
182/// {
183/// // Iterate over `Option<&CStr> ..
184/// }
185/// }
186/// }
187/// # Ok::<(), odbc_api::Error>(())
188/// ```
189///
190/// This second examples changes two things, we do not know the schema in advance and use the
191/// SQL DataType to determine the best fit for the buffers. Also we want to do everything in a
192/// function and return a `Cursor` with an already bound buffer. This approach is best if you have
193/// few and very long query, so the overhead of allocating buffers is negligible and you want to
194/// have an easier time with the borrow checker.
195///
196/// ```no_run
197/// use odbc_api::{
198/// Connection, BlockCursor, Error, Cursor, Nullability, ResultSetMetadata,
199/// buffers::{ AnyBuffer, BufferDesc, ColumnarAnyBuffer, ColumnarBuffer }
200/// };
201///
202/// fn get_birthdays<'a>(conn: &'a mut Connection)
203/// -> Result<BlockCursor<impl Cursor + 'a, ColumnarAnyBuffer>, Error>
204/// {
205/// let query = "SELECT year, name FROM Birthdays;";
206/// let params = ();
207/// let timeout_sec = None;
208/// let mut cursor = conn.execute(query, params, timeout_sec)?.unwrap();
209/// let mut column_description = Default::default();
210/// let buffer_description : Vec<_> = (0..cursor.num_result_cols()?).map(|index| {
211/// cursor.describe_col(index as u16 + 1, &mut column_description)?;
212/// let nullable = matches!(
213/// column_description.nullability,
214/// Nullability::Unknown | Nullability::Nullable
215/// );
216/// let desc = BufferDesc::from_data_type(
217/// column_description.data_type,
218/// nullable
219/// ).unwrap_or(BufferDesc::Text{ max_str_len: 255 });
220/// Ok(desc)
221/// }).collect::<Result<_, Error>>()?;
222///
223/// // Row set size of 5000 rows.
224/// let buffer = ColumnarAnyBuffer::from_descs(5000, buffer_description);
225/// // Bind buffer and take ownership over it.
226/// cursor.bind_buffer(buffer)
227/// }
228/// ```
229pub struct ColumnarBuffer<C> {
230 /// A mutable pointer to num_rows_fetched is passed to the C-API. It is used to write back the
231 /// number of fetched rows. `num_rows` is heap allocated, so the pointer is not invalidated,
232 /// even if the `ColumnarBuffer` instance is moved in memory.
233 num_rows: Box<usize>,
234 /// aka: batch size, row array size
235 row_capacity: usize,
236 /// Column index and bound buffer
237 columns: Vec<(u16, C)>,
238}
239
240/// A buffer for a single column intended to be used together with [`ColumnarBuffer`].
241///
242/// # Safety
243///
244/// Views must not allow access to unintialized / invalid rows.
245pub unsafe trait ColumnBuffer: CDataMut {
246 /// Immutable view on the column data. Used in safe abstractions. User must not be able to
247 /// access uninitialized or invalid memory of the buffer through this interface.
248 type View<'a>
249 where
250 Self: 'a;
251
252 /// Num rows may not exceed the actual amount of valid num_rows filled by the ODBC API. The
253 /// column buffer does not know how many elements were in the last row group, and therefore can
254 /// not guarantee the accessed element to be valid and in a defined state. It also can not panic
255 /// on accessing an undefined element.
256 fn view(&self, valid_rows: usize) -> Self::View<'_>;
257
258 /// Fills the column with the default representation of values, between `from` and `to` index.
259 fn fill_default(&mut self, from: usize, to: usize);
260
261 /// Current capacity of the column
262 fn capacity(&self) -> usize;
263
264 /// `Some` if any value is truncated in the range [0, num_rows).
265 ///
266 /// After fetching data we may want to know if any value has been truncated due to the buffer
267 /// not being able to hold elements of that size. This method checks the indicator buffer
268 /// element wise.
269 fn has_truncated_values(&self, num_rows: usize) -> Option<Indicator>;
270}
271
272unsafe impl<T> ColumnBuffer for WithDataType<T>
273where
274 T: ColumnBuffer,
275{
276 type View<'a>
277 = T::View<'a>
278 where
279 T: 'a;
280
281 fn view(&self, valid_rows: usize) -> T::View<'_> {
282 self.value.view(valid_rows)
283 }
284
285 fn fill_default(&mut self, from: usize, to: usize) {
286 self.value.fill_default(from, to)
287 }
288
289 fn capacity(&self) -> usize {
290 self.value.capacity()
291 }
292
293 fn has_truncated_values(&self, num_rows: usize) -> Option<Indicator> {
294 self.value.has_truncated_values(num_rows)
295 }
296}
297
298unsafe impl<'a, T> BoundInputSlice<'a> for WithDataType<T>
299where
300 T: BoundInputSlice<'a>,
301{
302 type SliceMut = T::SliceMut;
303
304 unsafe fn as_view_mut(
305 &'a mut self,
306 parameter_index: u16,
307 stmt: StatementRef<'a>,
308 ) -> Self::SliceMut {
309 unsafe { self.value.as_view_mut(parameter_index, stmt) }
310 }
311}
312
313/// This row set binds a string buffer to each column, which is large enough to hold the maximum
314/// length string representation for each element in the row set at once.
315///
316/// # Example
317///
318/// ```no_run
319/// //! A program executing a query and printing the result as csv to standard out. Requires
320/// //! `anyhow` and `csv` crate.
321///
322/// use anyhow::Error;
323/// use odbc_api::{buffers::TextRowSet, Cursor, Environment, ConnectionOptions, ResultSetMetadata};
324/// use std::{
325/// ffi::CStr,
326/// io::{stdout, Write},
327/// path::PathBuf,
328/// };
329///
330/// /// Maximum number of rows fetched with one row set. Fetching batches of rows is usually much
331/// /// faster than fetching individual rows.
332/// const BATCH_SIZE: usize = 5000;
333///
334/// fn main() -> Result<(), Error> {
335/// // Write csv to standard out
336/// let out = stdout();
337/// let mut writer = csv::Writer::from_writer(out);
338///
339/// // We know this is going to be the only ODBC environment in the entire process, so this is
340/// // safe.
341/// let environment = unsafe { Environment::new() }?;
342///
343/// // Connect using a DSN. Alternatively we could have used a connection string
344/// let mut connection = environment.connect(
345/// "DataSourceName",
346/// "Username",
347/// "Password",
348/// ConnectionOptions::default(),
349/// )?;
350///
351/// // Execute a one-off query without any parameters.
352/// let query = "SELECT * FROM TableName";
353/// let params = ();
354/// let timeout_sec = None;
355/// match connection.execute(query, params, timeout_sec)? {
356/// Some(mut cursor) => {
357/// // Write the column names to stdout
358/// let mut headline : Vec<String> = cursor.column_names()?.collect::<Result<_,_>>()?;
359/// writer.write_record(headline)?;
360///
361/// // Use schema in cursor to initialize a text buffer large enough to hold the largest
362/// // possible strings for each column up to an upper limit of 4KiB
363/// let mut buffers = TextRowSet::for_cursor(BATCH_SIZE, &mut cursor, Some(4096))?;
364/// // Bind the buffer to the cursor. It is now being filled with every call to fetch.
365/// let mut row_set_cursor = cursor.bind_buffer(&mut buffers)?;
366///
367/// // Iterate over batches
368/// while let Some(batch) = row_set_cursor.fetch()? {
369/// // Within a batch, iterate over every row
370/// for row_index in 0..batch.num_rows() {
371/// // Within a row iterate over every column
372/// let record = (0..batch.num_cols()).map(|col_index| {
373/// batch
374/// .at(col_index, row_index)
375/// .unwrap_or(&[])
376/// });
377/// // Writes row as csv
378/// writer.write_record(record)?;
379/// }
380/// }
381/// }
382/// None => {
383/// eprintln!(
384/// "Query came back empty. No output has been created."
385/// );
386/// }
387/// }
388///
389/// Ok(())
390/// }
391/// ```
392pub type TextRowSet = ColumnarBuffer<TextColumn<u8>>;
393
394impl TextRowSet {
395 /// The resulting text buffer is not in any way tied to the cursor, other than that its buffer
396 /// sizes a tailor fitted to result set the cursor is iterating over.
397 ///
398 /// This method performs fallible buffer allocations, if no upper bound is set, so you may see
399 /// a speedup, by setting an upper bound using `max_str_limit`.
400 ///
401 ///
402 /// # Parameters
403 ///
404 /// * `batch_size`: The maximum number of rows the buffer is able to hold.
405 /// * `cursor`: Used to query the display size for each column of the row set. For character
406 /// data the length in characters is multiplied by 4 in order to have enough space for 4 byte
407 /// utf-8 characters. This is a pessimization for some data sources (e.g. SQLite 3) which do
408 /// interpret the size of a `VARCHAR(5)` column as 5 bytes rather than 5 characters.
409 /// * `max_str_limit`: Some queries make it hard to estimate a sensible upper bound and
410 /// sometimes drivers are just not that good at it. This argument allows you to specify an
411 /// upper bound for the length of character data. Any size reported by the driver is capped to
412 /// this value. In case the upper bound can not inferred by the metadata reported by the
413 /// driver the element size is set to this upper bound, too.
414 pub fn for_cursor(
415 batch_size: usize,
416 cursor: &mut impl ResultSetMetadata,
417 max_str_limit: Option<usize>,
418 ) -> Result<TextRowSet, Error> {
419 let buffers = utf8_display_sizes(cursor)?
420 .enumerate()
421 .map(|(buffer_index, reported_len)| {
422 let buffer_index = buffer_index as u16;
423 let col_index = buffer_index + 1;
424 let max_str_len = reported_len?;
425 let buffer = if let Some(upper_bound) = max_str_limit {
426 let max_str_len = max_str_len
427 .map(NonZeroUsize::get)
428 .unwrap_or(upper_bound)
429 .min(upper_bound);
430 TextColumn::new(batch_size, max_str_len)
431 } else {
432 let max_str_len = max_str_len.map(NonZeroUsize::get).ok_or(
433 Error::TooLargeColumnBufferSize {
434 buffer_index,
435 num_elements: batch_size,
436 element_size: usize::MAX,
437 },
438 )?;
439 TextColumn::try_new(batch_size, max_str_len).map_err(|source| {
440 Error::TooLargeColumnBufferSize {
441 buffer_index,
442 num_elements: source.num_elements,
443 element_size: source.element_size,
444 }
445 })?
446 };
447
448 Ok::<_, Error>((col_index, buffer))
449 })
450 .collect::<Result<_, _>>()?;
451 Ok(TextRowSet {
452 row_capacity: batch_size,
453 num_rows: Box::new(0),
454 columns: buffers,
455 })
456 }
457
458 /// Creates a text buffer large enough to hold `batch_size` rows with one column for each item
459 /// `max_str_lengths` of respective size.
460 pub fn from_max_str_lens(
461 row_capacity: usize,
462 max_str_lengths: impl IntoIterator<Item = usize>,
463 ) -> Result<Self, Error> {
464 let buffers = max_str_lengths
465 .into_iter()
466 .enumerate()
467 .map(|(index, max_str_len)| {
468 Ok::<_, Error>((
469 (index + 1).try_into().unwrap(),
470 TextColumn::try_new(row_capacity, max_str_len)
471 .map_err(|source| source.add_context(index.try_into().unwrap()))?,
472 ))
473 })
474 .collect::<Result<_, _>>()?;
475 Ok(TextRowSet {
476 row_capacity,
477 num_rows: Box::new(0),
478 columns: buffers,
479 })
480 }
481
482 /// Access the element at the specified position in the row set.
483 pub fn at(&self, buffer_index: usize, row_index: usize) -> Option<&[u8]> {
484 assert!(row_index < *self.num_rows);
485 self.columns[buffer_index].1.value_at(row_index)
486 }
487
488 /// Access the element at the specified position in the row set.
489 pub fn at_as_str(&self, col_index: usize, row_index: usize) -> Result<Option<&str>, Utf8Error> {
490 self.at(col_index, row_index).map(from_utf8).transpose()
491 }
492
493 /// Indicator value at the specified position. Useful to detect truncation of data.
494 ///
495 /// # Example
496 ///
497 /// ```
498 /// use odbc_api::buffers::{Indicator, TextRowSet};
499 ///
500 /// fn is_truncated(buffer: &TextRowSet, col_index: usize, row_index: usize) -> bool {
501 /// match buffer.indicator_at(col_index, row_index) {
502 /// // There is no value, therefore there is no value not fitting in the column buffer.
503 /// Indicator::Null => false,
504 /// // The value did not fit into the column buffer, we do not even know, by how much.
505 /// Indicator::NoTotal => true,
506 /// Indicator::Length(total_length) => {
507 /// // If the maximum string length is shorter than the values total length, the
508 /// // has been truncated to fit into the buffer.
509 /// buffer.max_len(col_index) < total_length
510 /// }
511 /// }
512 /// }
513 /// ```
514 pub fn indicator_at(&self, buf_index: usize, row_index: usize) -> Indicator {
515 assert!(row_index < *self.num_rows);
516 self.columns[buf_index].1.indicator_at(row_index)
517 }
518
519 /// Maximum length in bytes of elements in a column.
520 pub fn max_len(&self, buf_index: usize) -> usize {
521 self.columns[buf_index].1.max_len()
522 }
523}
524
525unsafe impl<T> ColumnBuffer for Vec<T>
526where
527 T: Pod,
528{
529 type View<'a> = &'a [T];
530
531 fn view(&self, valid_rows: usize) -> &[T] {
532 &self[..valid_rows]
533 }
534
535 fn fill_default(&mut self, from: usize, to: usize) {
536 for item in &mut self[from..to] {
537 *item = Default::default();
538 }
539 }
540
541 fn capacity(&self) -> usize {
542 self.len()
543 }
544
545 fn has_truncated_values(&self, _num_rows: usize) -> Option<Indicator> {
546 None
547 }
548}
549
550#[cfg(test)]
551mod tests {
552
553 use crate::buffers::{BufferDesc, ColumnarAnyBuffer};
554
555 #[test]
556 #[should_panic(expected = "Column indices must be unique.")]
557 fn assert_unique_column_indices() {
558 let bd = BufferDesc::I32 { nullable: false };
559 ColumnarAnyBuffer::from_descs_and_indices(1, [(1, bd), (2, bd), (1, bd)].iter().cloned());
560 }
561}