odbc_api/cursor/concurrent_block_cursor.rs
1use std::{
2 mem::swap,
3 sync::mpsc::{sync_channel, Receiver, SyncSender},
4 thread::{self, JoinHandle},
5};
6
7use crate::{BlockCursor, Cursor, Error};
8
9use super::RowSetBuffer;
10
11/// A wrapper around block cursors which fetches data in a dedicated system thread. Intended to
12/// fetch data batch by batch while the application processes the batch last fetched. Works best
13/// with a double buffer strategy using two fetch buffers.
14///
15/// # Example
16///
17/// ```no_run
18/// use odbc_api::{
19/// Environment, buffers::{ColumnarAnyBuffer, BufferDesc}, Cursor, ConcurrentBlockCursor
20/// };
21/// use std::sync::OnceLock;
22///
23/// // We want to use the ODBC environment from another system thread without scope => Therefore it
24/// // needs to be static.
25/// static ENV: OnceLock<Environment> = OnceLock::new();
26/// let env = Environment::new()?;
27///
28/// let conn = ENV.get_or_init(|| env).connect_with_connection_string(
29/// "Driver={ODBC Driver 18 for SQL Server};Server=localhost;UID=SA;PWD=My@Test@Password1;",
30/// Default::default())?;
31///
32/// let query = "SELECT * FROM very_big_table";
33/// let params = ();
34/// let timeout_sec = None;
35/// // We must use into_cursor to create a statement handle with static lifetime, which also owns
36/// // the connection. This way we can send it to another thread safely.
37/// let cursor = conn.into_cursor(query, params, timeout_sec)?.unwrap();
38///
39/// // Batch size and buffer description. Here we assume there is only one integer column
40/// let buffer_a = ColumnarAnyBuffer::from_descs(1000, [BufferDesc::I32 { nullable: false }]);
41/// let mut buffer_b = ColumnarAnyBuffer::from_descs(1000, [BufferDesc::I32 { nullable: false }]);
42/// // And now we have a sendable block cursor with static lifetime
43/// let block_cursor = cursor.bind_buffer(buffer_a)?;
44///
45/// let mut cbc = ConcurrentBlockCursor::from_block_cursor(block_cursor);
46/// while cbc.fetch_into(&mut buffer_b)? {
47/// // Proccess batch in buffer b asynchronously to fetching it
48/// }
49///
50/// # Ok::<_, odbc_api::Error>(())
51/// ```
52pub struct ConcurrentBlockCursor<C, B> {
53 /// In order to avoid reallocating buffers over and over again, we use this channel to send the
54 /// buffers back to the fetch thread after we copied their contents into arrow arrays.
55 send_buffer: SyncSender<B>,
56 /// Receives filled batches from the fetch thread. Once the source is empty or if an error
57 /// occurs its associated sender is dropped, and receiving batches will return an error (which
58 /// we expect during normal operation and cleanup, and is not forwarded to the user).
59 receive_batch: Receiver<B>,
60 /// We join with the fetch thread if we stop receiving batches (i.e. receive_batch.recv()
61 /// returns an error) or `into_cursor` is called. `None` if the thread has already been joined.
62 /// In this case either an error has been reported to the user, or the cursor is stored in
63 /// `cursor`.
64 fetch_thread: Option<JoinHandle<Result<C, Error>>>,
65 /// Only `Some`, if the cursor has been consumed succesfully and `fetch_thread` has been joined.
66 /// Can only be `Some` if `fetch_thread` is `None`. If both `fetch_thread` and `cursor` are
67 /// `None`, it is implied that `fetch_thread` returned an error joining.
68 cursor: Option<C>,
69}
70
71impl<C, B> ConcurrentBlockCursor<C, B>
72where
73 C: Cursor + Send + 'static,
74 B: RowSetBuffer + Send + 'static,
75{
76 /// Construct a new concurrent block cursor.
77 ///
78 /// # Parameters
79 ///
80 /// * `block_cursor`: Taking a BlockCursor instead of a Cursor allows for better resource
81 /// stealing if constructing starting from a sequential Cursor, as we do not need to undbind
82 /// and bind the cursor.
83 pub fn from_block_cursor(block_cursor: BlockCursor<C, B>) -> Self {
84 let (send_buffer, receive_buffer) = sync_channel(1);
85 let (send_batch, receive_batch) = sync_channel(1);
86
87 let fetch_thread = thread::spawn(move || {
88 let mut block_cursor = block_cursor;
89 loop {
90 match block_cursor.fetch_with_truncation_check(true) {
91 Ok(Some(_batch)) => (),
92 Ok(None) => {
93 break block_cursor
94 .unbind()
95 .map(|(undbound_cursor, _buffer)| undbound_cursor);
96 }
97 Err(odbc_error) => {
98 drop(send_batch);
99 break Err(odbc_error);
100 }
101 }
102 // There has been another row group fetched by the cursor. We unbind the buffers so
103 // we can pass ownership of it to the application and bind a new buffer to the
104 // cursor in order to start fetching the next batch.
105 let (cursor, buffer) = block_cursor.unbind()?;
106 if send_batch.send(buffer).is_err() {
107 // Should the main thread stop receiving buffers, this thread should
108 // also stop fetching batches.
109 break Ok(cursor);
110 }
111 // Wait for the application thread to give us a buffer to fill.
112 match receive_buffer.recv() {
113 Err(_) => {
114 // Application thread dropped sender and does not want more buffers to be
115 // filled. Let's stop this thread and return the cursor
116 break Ok(cursor);
117 }
118 Ok(next_buffer) => {
119 block_cursor = cursor.bind_buffer(next_buffer).unwrap();
120 }
121 }
122 }
123 });
124
125 Self {
126 send_buffer,
127 receive_batch,
128 fetch_thread: Some(fetch_thread),
129 cursor: None,
130 }
131 }
132
133 /// Join fetch thread and yield the cursor back.
134 pub fn into_cursor(self) -> Result<C, Error> {
135 drop(self.receive_batch);
136 // Dropping the send buffer is necessary to avoid deadlocks, in case there would not be any
137 // buffer in the channel waiting for the fetch thread. Since we consume the cursor here, it
138 // is also impossible for the application to send another buffer.
139 drop(self.send_buffer);
140 if let Some(cursor) = self.cursor {
141 Ok(cursor)
142 } else {
143 self.fetch_thread.unwrap().join().unwrap()
144 }
145 }
146}
147
148impl<C, B> ConcurrentBlockCursor<C, B> {
149 /// Receive the current batch and take ownership of its buffer. `None` if the cursor is already
150 /// consumed, or had an error previously. This method blocks until a new batch available. In
151 /// order for new batches available new buffers must be send to the thread in order for it to
152 /// fill them. So calling fetch repeatedly without calling [`Self::fill`] in between may
153 /// deadlock.
154 pub fn fetch(&mut self) -> Result<Option<B>, Error> {
155 match self.receive_batch.recv() {
156 // We successfully fetched a batch from the database.
157 Ok(batch) => Ok(Some(batch)),
158 // Fetch thread stopped sending batches. Either because we consumed the result set
159 // completly or we hit an error.
160 Err(_receive_error) => {
161 if let Some(join_handle) = self.fetch_thread.take() {
162 // If there has been an error returning the batch, or unbinding the buffer `?`
163 // will raise it.
164 self.cursor = Some(join_handle.join().unwrap()?);
165 // We ran out of batches in the result set. End the stream.
166 Ok(None)
167 } else {
168 // This only happen if this method is called after it returned either `false` or
169 // `Err` once. Let us treat this scenario like a result set which is consumed
170 // completly.
171 Ok(None)
172 }
173 }
174 }
175 }
176
177 /// Send a buffer to the thread fetching in order for it to be filled and to be retrieved later
178 /// using either `fetch`, or `fetch_into`.
179 pub fn fill(&mut self, buffer: B) {
180 let _ = self.send_buffer.send(buffer);
181 }
182
183 /// Fetches values from the ODBC datasource into buffer. Values are streamed batch by batch in
184 /// order to avoid reallocation of the buffers used for tranistion. This call blocks until a new
185 /// batch is ready. This method combines both [`Self::fetch`] and [`Self::fill`].
186 ///
187 /// # Parameters
188 ///
189 /// * `buffer`: A columnar any buffer which can bind to the cursor wrapped by this instance.
190 /// After the method call the reference will not point to the same instance which had been
191 /// passed into the function call, but to the one which was bound to the cursor in order to
192 /// fetch the last batch. The buffer passed into this method, is then used to fetch the next
193 /// batch. As such this method is ideal to implement concurrent fetching using two buffers.
194 /// One which is written to, and one that is read, which flip their roles between batches.
195 /// Also called double buffering.
196 ///
197 /// # Return
198 ///
199 /// * `true`: Fetched a batch from the data source. The contents of that batch are now in
200 /// `buffer`.
201 /// * `false`: No batch could be fetched. The result set is consumed completly.
202 pub fn fetch_into(&mut self, buffer: &mut B) -> Result<bool, Error> {
203 if let Some(mut batch) = self.fetch()? {
204 swap(buffer, &mut batch);
205 self.fill(batch);
206 Ok(true)
207 } else {
208 Ok(false)
209 }
210 }
211}