solana_accounts_db/
shared_buffer_reader.rs

1//! SharedBuffer is given a Reader and SharedBufferReader implements the Reader trait.
2//! SharedBuffer reads ahead in the underlying file and saves the data.
3//! SharedBufferReaders can be created for the buffer and independently keep track of each reader's read location.
4//! The background reader keeps track of the progress of each client. After data has been read by all readers,
5//!  the buffer is recycled and reading ahead continues.
6//! A primary use case is the underlying reader being decompressing a file, which can be computationally expensive.
7//! The clients of SharedBufferReaders could be parallel instances which need access to the decompressed data.
8use {
9    crate::waitable_condvar::WaitableCondvar,
10    log::*,
11    solana_measure::measure::Measure,
12    std::{
13        io::*,
14        sync::{
15            atomic::{AtomicBool, Ordering},
16            Arc, Mutex, RwLock,
17        },
18        thread::{Builder, JoinHandle},
19        time::Duration,
20    },
21};
22
23// tunable parameters:
24// # bytes allocated and populated by reading ahead
25const TOTAL_BUFFER_BUDGET_DEFAULT: usize = 2_000_000_000;
26// data is read-ahead and saved in chunks of this many bytes
27const CHUNK_SIZE_DEFAULT: usize = 100_000_000;
28
29type OneSharedBuffer = Arc<Vec<u8>>;
30
31struct SharedBufferInternal {
32    bg_reader_data: Arc<SharedBufferBgReader>,
33
34    bg_reader_join_handle: Mutex<Option<JoinHandle<()>>>,
35
36    // Keep track of the next read location per outstanding client.
37    // index is client's my_client_index.
38    // Value at index is index into buffers where that client is currently reading.
39    // Any buffer at index < min(clients) can be recycled or destroyed.
40    clients: RwLock<Vec<usize>>,
41
42    // unpacking callers read from 'data'. newly_read_data is transferred to 'data when 'data' is exhausted.
43    // This minimizes lock contention since bg file reader has to have almost constant write access.
44    data: RwLock<Vec<OneSharedBuffer>>,
45
46    // it is convenient to have one of these around
47    empty_buffer: OneSharedBuffer,
48}
49
50pub struct SharedBuffer {
51    instance: Arc<SharedBufferInternal>,
52}
53
54impl SharedBuffer {
55    pub fn new<T: 'static + Read + std::marker::Send>(reader: T) -> Self {
56        Self::new_with_sizes(TOTAL_BUFFER_BUDGET_DEFAULT, CHUNK_SIZE_DEFAULT, reader)
57    }
58    fn new_with_sizes<T: 'static + Read + std::marker::Send>(
59        total_buffer_budget: usize,
60        chunk_size: usize,
61        reader: T,
62    ) -> Self {
63        assert!(total_buffer_budget > 0);
64        assert!(chunk_size > 0);
65        let instance = SharedBufferInternal {
66            bg_reader_data: Arc::new(SharedBufferBgReader::new()),
67            data: RwLock::new(vec![OneSharedBuffer::default()]), // initialize with 1 vector of empty data at data[0]
68
69            // default values
70            bg_reader_join_handle: Mutex::default(),
71            clients: RwLock::default(),
72            empty_buffer: OneSharedBuffer::default(),
73        };
74        let instance = Arc::new(instance);
75        let bg_reader_data = instance.bg_reader_data.clone();
76
77        let handle = Builder::new()
78            .name("solCompFileRead".to_string())
79            .spawn(move || {
80                // importantly, this thread does NOT hold a refcount on the arc of 'instance'
81                bg_reader_data.read_entire_file_in_bg(reader, total_buffer_budget, chunk_size);
82            });
83        *instance.bg_reader_join_handle.lock().unwrap() = Some(handle.unwrap());
84        Self { instance }
85    }
86}
87
88pub struct SharedBufferReader {
89    instance: Arc<SharedBufferInternal>,
90    my_client_index: usize,
91    // index in 'instance' of the current buffer this reader is reading from.
92    // The current buffer is referenced from 'current_data'.
93    // Until we exhaust this buffer, we don't need to get a lock to read from this.
94    current_buffer_index: usize,
95    // the index within current_data where we will next read
96    index_in_current_data: usize,
97    current_data: OneSharedBuffer,
98
99    // convenient to have access to
100    empty_buffer: OneSharedBuffer,
101}
102
103impl Drop for SharedBufferInternal {
104    fn drop(&mut self) {
105        if let Some(handle) = self.bg_reader_join_handle.lock().unwrap().take() {
106            self.bg_reader_data.stop.store(true, Ordering::Relaxed);
107            handle.join().unwrap();
108        }
109    }
110}
111
112impl Drop for SharedBufferReader {
113    fn drop(&mut self) {
114        self.client_done_reading();
115    }
116}
117
118#[derive(Debug)]
119struct SharedBufferBgReader {
120    stop: AtomicBool,
121    // error encountered during read
122    error: RwLock<std::io::Result<usize>>,
123    // bg thread reads to 'newly_read_data' and signals
124    newly_read_data: RwLock<Vec<OneSharedBuffer>>,
125    // set when newly_read_data gets new data written to it and can be transferred
126    newly_read_data_signal: WaitableCondvar,
127
128    // currently available set of buffers for bg to read into
129    // during operation, this is exhausted as the bg reads ahead
130    // As all clients are done with an earlier buffer, it is recycled by being put back into this vec for the bg thread to pull out.
131    buffers: RwLock<Vec<OneSharedBuffer>>,
132    // signaled when a new buffer is added to buffers. This throttles the bg reading.
133    new_buffer_signal: WaitableCondvar,
134
135    bg_eof_reached: AtomicBool,
136}
137
138impl SharedBufferBgReader {
139    fn new() -> Self {
140        SharedBufferBgReader {
141            buffers: RwLock::new(vec![]),
142            error: RwLock::new(Ok(0)),
143
144            // easy defaults
145            stop: AtomicBool::new(false),
146            newly_read_data: RwLock::default(),
147            newly_read_data_signal: WaitableCondvar::default(),
148            new_buffer_signal: WaitableCondvar::default(),
149            bg_eof_reached: AtomicBool::default(),
150        }
151    }
152
153    fn default_wait_timeout() -> Duration {
154        Duration::from_millis(100) // short enough to be unnoticable in case of trouble, long enough for efficient waiting
155    }
156    fn wait_for_new_buffer(&self) -> bool {
157        self.new_buffer_signal
158            .wait_timeout(Self::default_wait_timeout())
159    }
160    fn num_buffers(total_buffer_budget: usize, chunk_size: usize) -> usize {
161        std::cmp::max(1, total_buffer_budget / chunk_size) // at least 1 buffer
162    }
163    fn set_error(&self, error: std::io::Error) {
164        *self.error.write().unwrap() = Err(error);
165        self.newly_read_data_signal.notify_all(); // any client waiting for new data needs to wake up and check for errors
166    }
167
168    // read ahead the entire file.
169    // This is governed by the supply of buffers.
170    // Buffers are likely limited to cap memory usage.
171    // A buffer is recycled after the last client finishes reading from it.
172    // When a buffer is available (initially or recycled), this code wakes up and reads into that buffer.
173    fn read_entire_file_in_bg<T: 'static + Read + std::marker::Send>(
174        &self,
175        mut reader: T,
176        total_buffer_budget: usize,
177        chunk_size: usize,
178    ) {
179        let now = std::time::Instant::now();
180        let mut read_us = 0;
181
182        let mut max_bytes_read = 0;
183        let mut wait_us = 0;
184        let mut total_bytes = 0;
185        let mut error = SharedBufferReader::default_error();
186        let mut remaining_buffers_to_allocate = Self::num_buffers(total_buffer_budget, chunk_size);
187        loop {
188            if self.stop.load(Ordering::Relaxed) {
189                // unsure what error is most appropriate here.
190                // bg reader was told to stop. All clients need to see that as an error if they try to read.
191                self.set_error(std::io::Error::from(std::io::ErrorKind::TimedOut));
192                break;
193            }
194            let mut buffers = self.buffers.write().unwrap();
195            let buffer = buffers.pop();
196            drop(buffers);
197            let mut dest_data = if let Some(dest_data) = buffer {
198                // assert that this should not result in a vector copy
199                // These are internal buffers and should not be held by anyone else.
200                assert_eq!(Arc::strong_count(&dest_data), 1);
201                dest_data
202            } else if remaining_buffers_to_allocate > 0 {
203                // we still haven't allocated all the buffers we are allowed to allocate
204                remaining_buffers_to_allocate -= 1;
205                Arc::new(vec![0; chunk_size])
206            } else {
207                // nowhere to write, so wait for a buffer to become available
208                let mut wait_for_new_buffer = Measure::start("wait_for_new_buffer");
209                self.wait_for_new_buffer();
210                wait_for_new_buffer.stop();
211                wait_us += wait_for_new_buffer.as_us();
212                continue; // check stop, try to get a buffer again
213            };
214            let target = Arc::make_mut(&mut dest_data);
215            let dest_size = target.len();
216
217            let mut bytes_read = 0;
218            let mut eof = false;
219            let mut error_received = false;
220
221            while bytes_read < dest_size {
222                let mut time_read = Measure::start("read");
223                // Read from underlying reader into the remaining range in dest_data
224                // Note that this read takes less time (up to 2x) if we read into the same static buffer location each call.
225                // But, we have to copy the data out later, so we choose to pay the price at read time to put the data where it is useful.
226                let result = reader.read(&mut target[bytes_read..]);
227                time_read.stop();
228                read_us += time_read.as_us();
229                match result {
230                    Ok(size) => {
231                        if size == 0 {
232                            eof = true;
233                            break;
234                        }
235                        total_bytes += size;
236                        max_bytes_read = std::cmp::max(max_bytes_read, size);
237                        bytes_read += size;
238                        // loop to read some more. Underlying reader does not usually read all we ask for.
239                    }
240                    Err(err) => {
241                        error_received = true;
242                        error = err;
243                        break;
244                    }
245                }
246            }
247
248            if bytes_read > 0 {
249                // store this buffer in the bg data list
250                target.truncate(bytes_read);
251                let mut data = self.newly_read_data.write().unwrap();
252                data.push(dest_data);
253                drop(data);
254                self.newly_read_data_signal.notify_all();
255            }
256
257            if eof {
258                self.bg_eof_reached.store(true, Ordering::Relaxed);
259                self.newly_read_data_signal.notify_all(); // anyone waiting for new data needs to know that we reached eof
260                break;
261            }
262
263            if error_received {
264                // do not ask for more data from 'reader'. We got an error and saved all the data we got before the error.
265                // but, wait to set error until we have added our buffer to newly_read_data
266                self.set_error(error);
267                break;
268            }
269        }
270
271        info!(
272            "reading entire decompressed file took: {} us, bytes: {}, read_us: {}, waiting_for_buffer_us: {}, largest fetch: {}, error: {:?}",
273            now.elapsed().as_micros(),
274            total_bytes,
275            read_us,
276            wait_us,
277            max_bytes_read,
278            self.error.read().unwrap()
279        );
280    }
281}
282
283impl SharedBufferInternal {
284    fn wait_for_newly_read_data(&self) -> bool {
285        self.bg_reader_data
286            .newly_read_data_signal
287            .wait_timeout(SharedBufferBgReader::default_wait_timeout())
288    }
289    // bg reader uses write lock on 'newly_read_data' each time a buffer is read or recycled
290    // client readers read from 'data' using read locks
291    // when all of 'data' has been exhausted by clients, 1 client needs to transfer from 'newly_read_data' to 'data' one time.
292    // returns true if any data was added to 'data'
293    fn transfer_data_from_bg(&self) -> bool {
294        let mut from_lock = self.bg_reader_data.newly_read_data.write().unwrap();
295        if from_lock.is_empty() {
296            // no data available from bg
297            return false;
298        }
299        // grab all data from bg
300        let mut newly_read_data: Vec<OneSharedBuffer> = std::mem::take(&mut *from_lock);
301        // append all data to fg
302        let mut to_lock = self.data.write().unwrap();
303        // from_lock has to be held until we have the to_lock lock. Otherwise, we can race with another reader and append to to_lock out of order.
304        drop(from_lock);
305        to_lock.append(&mut newly_read_data);
306        true // data was transferred
307    }
308    fn has_reached_eof(&self) -> bool {
309        self.bg_reader_data.bg_eof_reached.load(Ordering::Relaxed)
310    }
311}
312
313// only public methods are new and from trait Read
314impl SharedBufferReader {
315    pub fn new(original_instance: &SharedBuffer) -> Self {
316        let original_instance = &original_instance.instance;
317        let current_buffer_index = 0;
318        let mut list = original_instance.clients.write().unwrap();
319        let my_client_index = list.len();
320        if my_client_index > 0 {
321            let current_min = list.iter().min().unwrap();
322            if current_min > &0 {
323                drop(list);
324                panic!("SharedBufferReaders must all be created before the first one reads");
325            }
326        }
327        list.push(current_buffer_index);
328        drop(list);
329
330        Self {
331            instance: Arc::clone(original_instance),
332            my_client_index,
333            current_buffer_index,
334            index_in_current_data: 0,
335            // startup condition for our local reference to the buffer we want to read from.
336            // data[0] will always exist. It will be empty, But that is ok. Corresponds to current_buffer_index initial value of 0.
337            current_data: original_instance.data.read().unwrap()[0].clone(),
338            empty_buffer: original_instance.empty_buffer.clone(),
339        }
340    }
341    fn default_error() -> std::io::Error {
342        // AN error
343        std::io::Error::from(std::io::ErrorKind::TimedOut)
344    }
345    fn client_done_reading(&mut self) {
346        // has the effect of causing nobody to ever again wait on this reader's progress
347        self.update_client_index(usize::MAX);
348    }
349
350    // this client will now be reading from current_buffer_index
351    // We may be able to recycle the buffer(s) this client may have been previously potentially using.
352    fn update_client_index(&mut self, new_buffer_index: usize) {
353        let previous_buffer_index = self.current_buffer_index;
354        self.current_buffer_index = new_buffer_index;
355        let client_index = self.my_client_index;
356        let mut indexes = self.instance.clients.write().unwrap();
357        indexes[client_index] = new_buffer_index;
358        drop(indexes);
359        let mut new_min = *self.instance.clients.read().unwrap().iter().min().unwrap();
360        // if new_min == usize::MAX, then every caller is done reading. We could shut down the bg reader and effectively drop everything.
361        new_min = std::cmp::min(new_min, self.instance.data.read().unwrap().len());
362
363        // if any buffer indexes are now no longer used by any readers, then this reader was the last reader holding onto some indexes.
364        if new_min > previous_buffer_index {
365            // if bg reader reached eof, there is no need to recycle any buffers and they can all be dropped
366            let eof = self.instance.has_reached_eof();
367
368            for recycle in previous_buffer_index..new_min {
369                let remove = {
370                    let mut data = self.instance.data.write().unwrap();
371                    std::mem::replace(&mut data[recycle], self.empty_buffer.clone())
372                };
373                if remove.is_empty() {
374                    continue; // another thread beat us swapping out this buffer, so nothing to recycle here
375                }
376
377                if !eof {
378                    // if !eof, recycle this buffer and notify waiting reader(s)
379                    // if eof, just drop buffer this buffer since it isn't needed for reading anymore
380                    self.instance
381                        .bg_reader_data
382                        .buffers
383                        .write()
384                        .unwrap()
385                        .push(remove);
386                    self.instance.bg_reader_data.new_buffer_signal.notify_all();
387                    // new buffer available for bg reader
388                }
389            }
390        }
391    }
392}
393
394impl Read for SharedBufferReader {
395    // called many times by client to read small buffer lengths
396    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
397        let dest_len = buf.len();
398        let mut offset_in_dest = 0;
399
400        let mut eof_seen = false;
401        'outer: while offset_in_dest < dest_len {
402            // this code is optimized for the common case where we can satisfy this entire read request from current_data without locks
403            let source = &*self.current_data;
404
405            let remaining_source_len = source.len() - self.index_in_current_data;
406            let bytes_to_transfer = std::cmp::min(dest_len - offset_in_dest, remaining_source_len);
407            // copy what we can
408            buf[offset_in_dest..(offset_in_dest + bytes_to_transfer)].copy_from_slice(
409                &source
410                    [self.index_in_current_data..(self.index_in_current_data + bytes_to_transfer)],
411            );
412            self.index_in_current_data += bytes_to_transfer;
413            offset_in_dest += bytes_to_transfer;
414
415            if offset_in_dest >= dest_len {
416                break;
417            }
418
419            // we exhausted the current buffer
420            // increment current_buffer_index get the next buffer to continue reading
421            self.current_data = self.empty_buffer.clone(); // unref it so it can be recycled without copy
422            self.index_in_current_data = 0;
423            self.update_client_index(self.current_buffer_index + 1);
424
425            let instance = &*self.instance;
426            let mut lock;
427            // hang out in this loop until the buffer we need is available
428            loop {
429                lock = instance.data.read().unwrap();
430                if self.current_buffer_index < lock.len() {
431                    break;
432                }
433                drop(lock);
434
435                if self.instance.transfer_data_from_bg() {
436                    continue;
437                }
438
439                // another thread may have transferred data, so check again to see if we have data now
440                lock = instance.data.read().unwrap();
441                if self.current_buffer_index < lock.len() {
442                    break;
443                }
444                drop(lock);
445
446                if eof_seen {
447                    // eof detected on previous iteration, we have had a chance to read all data that was buffered, and there is not enough for us
448                    break 'outer;
449                }
450
451                // no data, we could not transfer, and still no data, so check for eof.
452                // If we got an eof, then we have to check again for data to make sure there isn't data now that we may be able to transfer or read. Our reading can lag behind the bg read ahead.
453                if instance.has_reached_eof() {
454                    eof_seen = true;
455                    continue;
456                }
457
458                {
459                    // Since the bg reader could not satisfy our read, now is a good time to check to see if the bg reader encountered an error.
460                    // Note this is a write lock because we want to get the actual error detected and return it here and avoid races with other readers if we tried a read and then subsequent write lock.
461                    // This would be simpler if I could clone an io error.
462                    let mut error = instance.bg_reader_data.error.write().unwrap();
463                    if error.is_err() {
464                        // replace the current error (with AN error instead of ok)
465                        // return the original error
466                        return std::mem::replace(&mut *error, Err(Self::default_error()));
467                    }
468                }
469
470                // no data to transfer, and file not finished, but no error, so wait for bg reader to read some more data
471                instance.wait_for_newly_read_data();
472            }
473
474            // refresh current_data inside the lock
475            self.current_data = Arc::clone(&lock[self.current_buffer_index]);
476        }
477        Ok(offset_in_dest)
478    }
479}
480
481#[cfg(test)]
482pub mod tests {
483    use {
484        super::*,
485        crossbeam_channel::{unbounded, Receiver},
486        rayon::prelude::*,
487    };
488
489    type SimpleReaderReceiverType = Receiver<(Vec<u8>, Option<std::io::Error>)>;
490    struct SimpleReader {
491        pub receiver: SimpleReaderReceiverType,
492        pub data: Vec<u8>,
493        pub done: bool,
494        pub err: Option<std::io::Error>,
495    }
496    impl SimpleReader {
497        fn new(receiver: SimpleReaderReceiverType) -> Self {
498            Self {
499                receiver,
500                data: Vec::default(),
501                done: false,
502                err: None,
503            }
504        }
505    }
506
507    impl Read for SimpleReader {
508        fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
509            if !self.done && self.data.is_empty() {
510                let (mut data, err) = self.receiver.recv().unwrap();
511                if err.is_some() {
512                    self.err = err;
513                }
514                if data.is_empty() {
515                    self.done = true;
516                } else {
517                    self.data.append(&mut data);
518                }
519            }
520            if self.err.is_some() {
521                return Err(self.err.take().unwrap());
522            }
523            let len_request = buf.len();
524            let len_data = self.data.len();
525            let to_read = std::cmp::min(len_request, len_data);
526            buf[0..to_read].copy_from_slice(&self.data[0..to_read]);
527            self.data.drain(0..to_read);
528            Ok(to_read)
529        }
530    }
531
532    #[test]
533    #[should_panic(expected = "total_buffer_budget > 0")]
534    fn test_shared_buffer_buffers_invalid() {
535        solana_logger::setup();
536        let (_sender, receiver) = unbounded();
537        let file = SimpleReader::new(receiver);
538        SharedBuffer::new_with_sizes(0, 1, file);
539    }
540
541    #[test]
542    #[should_panic(expected = "chunk_size > 0")]
543    fn test_shared_buffer_buffers_invalid2() {
544        solana_logger::setup();
545        let (_sender, receiver) = unbounded();
546        let file = SimpleReader::new(receiver);
547        SharedBuffer::new_with_sizes(1, 0, file);
548    }
549
550    #[test]
551    #[should_panic(expected = "SharedBufferReaders must all be created before the first one reads")]
552    fn test_shared_buffer_start_too_late() {
553        solana_logger::setup();
554        let (sender, receiver) = unbounded();
555        let file = SimpleReader::new(receiver);
556        let shared_buffer = SharedBuffer::new(file);
557        let mut reader = SharedBufferReader::new(&shared_buffer);
558        let mut data = Vec::new();
559        let done_signal = vec![];
560
561        let sent = vec![1, 2, 3];
562        let _ = sender.send((sent, None));
563        let _ = sender.send((done_signal, None));
564        assert!(reader.read_to_end(&mut data).is_ok());
565        SharedBufferReader::new(&shared_buffer); // created after reader already read
566    }
567
568    #[test]
569    fn test_shared_buffer_simple_read_to_end() {
570        solana_logger::setup();
571        let (sender, receiver) = unbounded();
572        let file = SimpleReader::new(receiver);
573        let shared_buffer = SharedBuffer::new(file);
574        let mut reader = SharedBufferReader::new(&shared_buffer);
575        let mut data = Vec::new();
576        let done_signal = vec![];
577
578        let sent = vec![1, 2, 3];
579        let _ = sender.send((sent.clone(), None));
580        let _ = sender.send((done_signal, None));
581        assert!(reader.read_to_end(&mut data).is_ok());
582        assert_eq!(sent, data);
583    }
584
585    fn get_error() -> std::io::Error {
586        std::io::Error::from(std::io::ErrorKind::WriteZero)
587    }
588
589    #[test]
590    fn test_shared_buffer_simple_read() {
591        solana_logger::setup();
592        let (sender, receiver) = unbounded();
593        let file = SimpleReader::new(receiver);
594        let shared_buffer = SharedBuffer::new(file);
595        let mut reader = SharedBufferReader::new(&shared_buffer);
596        let done_signal = vec![];
597
598        let sent = vec![1, 2, 3];
599        let mut data = vec![0; sent.len()];
600        let _ = sender.send((sent.clone(), None));
601        let _ = sender.send((done_signal, None));
602        assert_eq!(reader.read(&mut data[..]).unwrap(), sent.len());
603        assert_eq!(sent, data);
604    }
605
606    #[test]
607    fn test_shared_buffer_error() {
608        solana_logger::setup();
609        let (sender, receiver) = unbounded();
610        let file = SimpleReader::new(receiver);
611        let shared_buffer = SharedBuffer::new(file);
612        let mut reader = SharedBufferReader::new(&shared_buffer);
613        let mut data = Vec::new();
614        let done_signal = vec![];
615
616        let _ = sender.send((done_signal, Some(get_error())));
617        assert_eq!(
618            reader.read_to_end(&mut data).unwrap_err().kind(),
619            get_error().kind()
620        );
621    }
622
623    #[test]
624    fn test_shared_buffer_2_errors() {
625        solana_logger::setup();
626        let (sender, receiver) = unbounded();
627        let file = SimpleReader::new(receiver);
628        let shared_buffer = SharedBuffer::new(file);
629        let mut reader = SharedBufferReader::new(&shared_buffer);
630        let mut reader2 = SharedBufferReader::new(&shared_buffer);
631        let mut data = Vec::new();
632        let done_signal = vec![];
633
634        let _ = sender.send((done_signal, Some(get_error())));
635        assert_eq!(
636            reader.read_to_end(&mut data).unwrap_err().kind(),
637            get_error().kind()
638        );
639        // #2 will read 2nd, so should get default error, but still an error
640        assert_eq!(
641            reader2.read_to_end(&mut data).unwrap_err().kind(),
642            SharedBufferReader::default_error().kind()
643        );
644    }
645
646    #[test]
647    fn test_shared_buffer_2_errors_after_read() {
648        solana_logger::setup();
649        let (sender, receiver) = unbounded();
650        let file = SimpleReader::new(receiver);
651        let shared_buffer = SharedBuffer::new(file);
652        let mut reader = SharedBufferReader::new(&shared_buffer);
653        let mut reader2 = SharedBufferReader::new(&shared_buffer);
654        let mut data = Vec::new();
655        let done_signal = vec![];
656
657        // send some data
658        let sent = vec![1, 2, 3];
659        let _ = sender.send((sent.clone(), None));
660        // send an error
661        let _ = sender.send((done_signal, Some(get_error())));
662        assert_eq!(
663            reader.read_to_end(&mut data).unwrap_err().kind(),
664            get_error().kind()
665        );
666        // #2 will read valid bytes first and succeed, then get error
667        let mut data = vec![0; sent.len()];
668        // this read should succeed because it was prior to error being received by bg reader
669        assert_eq!(reader2.read(&mut data[..]).unwrap(), sent.len(),);
670        assert_eq!(sent, data);
671        assert_eq!(
672            reader2.read_to_end(&mut data).unwrap_err().kind(),
673            SharedBufferReader::default_error().kind()
674        );
675    }
676
677    #[test]
678    fn test_shared_buffer_2_errors_after_read2() {
679        solana_logger::setup();
680        let (sender, receiver) = unbounded();
681        let file = SimpleReader::new(receiver);
682        let shared_buffer = SharedBuffer::new(file);
683        let mut reader = SharedBufferReader::new(&shared_buffer);
684        let mut reader2 = SharedBufferReader::new(&shared_buffer);
685        let mut data = Vec::new();
686        let done_signal = vec![];
687
688        // send some data
689        let sent = vec![1, 2, 3];
690        let _ = sender.send((sent.clone(), None));
691        // send an error
692        let _ = sender.send((done_signal, Some(get_error())));
693        assert_eq!(
694            reader.read_to_end(&mut data).unwrap_err().kind(),
695            get_error().kind()
696        );
697        // #2 will read valid bytes first and succeed, then get error
698        let mut data = vec![0; sent.len()];
699        // this read should succeed because it is reading data prior to error being received by bg reader
700        let expected_len = 1;
701        for i in 0..sent.len() {
702            let len = reader2.read(&mut data[i..=i]);
703            assert!(len.is_ok(), "{len:?}, progress: {i}");
704            assert_eq!(len.unwrap(), expected_len, "progress: {i}");
705        }
706        assert_eq!(sent, data);
707        assert_eq!(
708            reader2.read(&mut data[0..=0]).unwrap_err().kind(),
709            SharedBufferReader::default_error().kind()
710        );
711    }
712
713    // read either all or in specified block sizes
714    fn test_read_all(
715        reader: &mut SharedBufferReader,
716        individual_read_size: Option<usize>,
717    ) -> Vec<u8> {
718        let mut data = Vec::new();
719        match individual_read_size {
720            Some(size) => {
721                loop {
722                    let mut buffer = vec![0; size];
723                    let result = reader.read(&mut buffer[..]);
724                    assert!(result.is_ok());
725                    let len = result.unwrap();
726                    if len == 0 {
727                        break; // done reading
728                    }
729                    buffer.truncate(len);
730                    data.append(&mut buffer);
731                }
732            }
733            None => {
734                let result = reader.read_to_end(&mut data);
735                assert!(result.is_ok());
736                assert_eq!(result.unwrap(), data.len());
737            }
738        }
739        data
740    }
741
742    #[test]
743    fn test_shared_buffer_drop_reader2() {
744        let done_signal = vec![];
745        let (sender, receiver) = unbounded();
746        let file = SimpleReader::new(receiver);
747        let budget_sz = 100;
748        let chunk_sz = 10;
749        let shared_buffer = SharedBuffer::new_with_sizes(budget_sz, chunk_sz, file);
750        let size = budget_sz * 2;
751        let mut reader = SharedBufferReader::new(&shared_buffer);
752        // with the Read trait, we don't know we are eof until we get Ok(0) from the underlying reader.
753        // This can't happen until we have enough space to store another chunk, thus we try to read another chunk and see the Ok(0) returned.
754        // Thus, we have to use size < budget_sz here instead of <=
755        let reader2 = SharedBufferReader::new(&shared_buffer);
756
757        let sent = (0..size)
758            .map(|i| ((i + size) % 256) as u8)
759            .collect::<Vec<_>>();
760
761        let _ = sender.send((sent.clone(), None));
762        let _ = sender.send((done_signal, None));
763
764        // can't read all data because it is 2x the buffer budget
765        let mut data = vec![0; budget_sz];
766        assert!(reader.read(&mut data[0..budget_sz]).is_ok());
767        drop(reader2);
768        let mut rest = test_read_all(&mut reader, None);
769        data.append(&mut rest);
770        assert_eq!(sent, data);
771    }
772
773    fn adjusted_buffer_size(total_buffer_budget: usize, chunk_size: usize) -> usize {
774        let num_buffers = SharedBufferBgReader::num_buffers(total_buffer_budget, chunk_size);
775        num_buffers * chunk_size
776    }
777
778    #[test]
779    fn test_shared_buffer_sweep() {
780        solana_logger::setup();
781        // try the inflection points with 1 to 3 readers, including a parallel reader
782        // a few different chunk sizes
783        for chunk_sz in [1, 2, 10] {
784            // same # of buffers as default
785            let equivalent_buffer_sz =
786                chunk_sz * (TOTAL_BUFFER_BUDGET_DEFAULT / CHUNK_SIZE_DEFAULT);
787            // 1 buffer, 2 buffers,
788            for budget_sz in [
789                1,
790                chunk_sz,
791                chunk_sz * 2,
792                equivalent_buffer_sz - 1,
793                equivalent_buffer_sz,
794                equivalent_buffer_sz * 2,
795            ] {
796                for read_sz in [0, 1, chunk_sz - 1, chunk_sz, chunk_sz + 1] {
797                    let read_sz = if read_sz > 0 { Some(read_sz) } else { None };
798                    for reader_ct in 1..=3 {
799                        for data_size in [
800                            0,
801                            1,
802                            chunk_sz - 1,
803                            chunk_sz,
804                            chunk_sz + 1,
805                            chunk_sz * 2 - 1,
806                            chunk_sz * 2,
807                            chunk_sz * 2 + 1,
808                            budget_sz - 1,
809                            budget_sz,
810                            budget_sz + 1,
811                            budget_sz * 2,
812                            budget_sz * 2 - 1,
813                            budget_sz * 2 + 1,
814                        ] {
815                            let adjusted_budget_sz = adjusted_buffer_size(budget_sz, chunk_sz);
816                            let done_signal = vec![];
817                            let (sender, receiver) = unbounded();
818                            let file = SimpleReader::new(receiver);
819                            let shared_buffer =
820                                SharedBuffer::new_with_sizes(budget_sz, chunk_sz, file);
821                            let mut reader = SharedBufferReader::new(&shared_buffer);
822                            // with the Read trait, we don't know we are eof until we get Ok(0) from the underlying reader.
823                            // This can't happen until we have enough space to store another chunk, thus we try to read another chunk and see the Ok(0) returned.
824                            // Thus, we have to use data_size < adjusted_budget_sz here instead of <=
825                            let second_reader = reader_ct > 1
826                                && data_size < adjusted_budget_sz
827                                && read_sz
828                                    .as_ref()
829                                    .map(|sz| sz < &adjusted_budget_sz)
830                                    .unwrap_or(true);
831                            let reader2 = if second_reader {
832                                Some(SharedBufferReader::new(&shared_buffer))
833                            } else {
834                                None
835                            };
836                            let sent = (0..data_size)
837                                .map(|i| ((i + data_size) % 256) as u8)
838                                .collect::<Vec<_>>();
839
840                            let parallel_reader = reader_ct > 2;
841                            let handle = if parallel_reader {
842                                // Avoid to create more than the number of threads available in the
843                                // current rayon threadpool. Deadlock could happen otherwise.
844                                let threads = std::cmp::min(8, rayon::current_num_threads());
845                                Some({
846                                    let parallel = (0..threads)
847                                        .map(|_| {
848                                            // create before any reading starts
849                                            let reader_ = SharedBufferReader::new(&shared_buffer);
850                                            let sent_ = sent.clone();
851                                            (reader_, sent_)
852                                        })
853                                        .collect::<Vec<_>>();
854
855                                    Builder::new()
856                                        .spawn(move || {
857                                            parallel.into_par_iter().for_each(
858                                                |(mut reader, sent)| {
859                                                    let data = test_read_all(&mut reader, read_sz);
860                                                    assert_eq!(
861                                                        sent,
862                                                        data,
863                                                        "{:?}",
864                                                        (
865                                                            chunk_sz,
866                                                            budget_sz,
867                                                            read_sz,
868                                                            reader_ct,
869                                                            data_size,
870                                                            adjusted_budget_sz
871                                                        )
872                                                    );
873                                                },
874                                            )
875                                        })
876                                        .unwrap()
877                                })
878                            } else {
879                                None
880                            };
881                            drop(shared_buffer); // readers should work fine even if shared buffer is dropped
882                            let _ = sender.send((sent.clone(), None));
883                            let _ = sender.send((done_signal, None));
884                            let data = test_read_all(&mut reader, read_sz);
885                            assert_eq!(
886                                sent,
887                                data,
888                                "{:?}",
889                                (
890                                    chunk_sz,
891                                    budget_sz,
892                                    read_sz,
893                                    reader_ct,
894                                    data_size,
895                                    adjusted_budget_sz
896                                )
897                            );
898                            // a 2nd reader would stall us if we exceed the total buffer size
899                            if second_reader {
900                                // #2 will read valid bytes first and succeed, then get error
901                                let data = test_read_all(&mut reader2.unwrap(), read_sz);
902                                assert_eq!(sent, data);
903                            }
904                            if parallel_reader {
905                                assert!(handle.unwrap().join().is_ok());
906                            }
907                        }
908                    }
909                }
910            }
911        }
912    }
913}