1use {
9 crate::waitable_condvar::WaitableCondvar,
10 log::*,
11 solana_measure::measure::Measure,
12 std::{
13 io::*,
14 sync::{
15 atomic::{AtomicBool, Ordering},
16 Arc, Mutex, RwLock,
17 },
18 thread::{Builder, JoinHandle},
19 time::Duration,
20 },
21};
22
23const TOTAL_BUFFER_BUDGET_DEFAULT: usize = 2_000_000_000;
26const CHUNK_SIZE_DEFAULT: usize = 100_000_000;
28
29type OneSharedBuffer = Arc<Vec<u8>>;
30
31struct SharedBufferInternal {
32 bg_reader_data: Arc<SharedBufferBgReader>,
33
34 bg_reader_join_handle: Mutex<Option<JoinHandle<()>>>,
35
36 clients: RwLock<Vec<usize>>,
41
42 data: RwLock<Vec<OneSharedBuffer>>,
45
46 empty_buffer: OneSharedBuffer,
48}
49
50pub struct SharedBuffer {
51 instance: Arc<SharedBufferInternal>,
52}
53
54impl SharedBuffer {
55 pub fn new<T: 'static + Read + std::marker::Send>(reader: T) -> Self {
56 Self::new_with_sizes(TOTAL_BUFFER_BUDGET_DEFAULT, CHUNK_SIZE_DEFAULT, reader)
57 }
58 fn new_with_sizes<T: 'static + Read + std::marker::Send>(
59 total_buffer_budget: usize,
60 chunk_size: usize,
61 reader: T,
62 ) -> Self {
63 assert!(total_buffer_budget > 0);
64 assert!(chunk_size > 0);
65 let instance = SharedBufferInternal {
66 bg_reader_data: Arc::new(SharedBufferBgReader::new()),
67 data: RwLock::new(vec![OneSharedBuffer::default()]), bg_reader_join_handle: Mutex::default(),
71 clients: RwLock::default(),
72 empty_buffer: OneSharedBuffer::default(),
73 };
74 let instance = Arc::new(instance);
75 let bg_reader_data = instance.bg_reader_data.clone();
76
77 let handle = Builder::new()
78 .name("solCompFileRead".to_string())
79 .spawn(move || {
80 bg_reader_data.read_entire_file_in_bg(reader, total_buffer_budget, chunk_size);
82 });
83 *instance.bg_reader_join_handle.lock().unwrap() = Some(handle.unwrap());
84 Self { instance }
85 }
86}
87
88pub struct SharedBufferReader {
89 instance: Arc<SharedBufferInternal>,
90 my_client_index: usize,
91 current_buffer_index: usize,
95 index_in_current_data: usize,
97 current_data: OneSharedBuffer,
98
99 empty_buffer: OneSharedBuffer,
101}
102
103impl Drop for SharedBufferInternal {
104 fn drop(&mut self) {
105 if let Some(handle) = self.bg_reader_join_handle.lock().unwrap().take() {
106 self.bg_reader_data.stop.store(true, Ordering::Relaxed);
107 handle.join().unwrap();
108 }
109 }
110}
111
112impl Drop for SharedBufferReader {
113 fn drop(&mut self) {
114 self.client_done_reading();
115 }
116}
117
118#[derive(Debug)]
119struct SharedBufferBgReader {
120 stop: AtomicBool,
121 error: RwLock<std::io::Result<usize>>,
123 newly_read_data: RwLock<Vec<OneSharedBuffer>>,
125 newly_read_data_signal: WaitableCondvar,
127
128 buffers: RwLock<Vec<OneSharedBuffer>>,
132 new_buffer_signal: WaitableCondvar,
134
135 bg_eof_reached: AtomicBool,
136}
137
138impl SharedBufferBgReader {
139 fn new() -> Self {
140 SharedBufferBgReader {
141 buffers: RwLock::new(vec![]),
142 error: RwLock::new(Ok(0)),
143
144 stop: AtomicBool::new(false),
146 newly_read_data: RwLock::default(),
147 newly_read_data_signal: WaitableCondvar::default(),
148 new_buffer_signal: WaitableCondvar::default(),
149 bg_eof_reached: AtomicBool::default(),
150 }
151 }
152
153 fn default_wait_timeout() -> Duration {
154 Duration::from_millis(100) }
156 fn wait_for_new_buffer(&self) -> bool {
157 self.new_buffer_signal
158 .wait_timeout(Self::default_wait_timeout())
159 }
160 fn num_buffers(total_buffer_budget: usize, chunk_size: usize) -> usize {
161 std::cmp::max(1, total_buffer_budget / chunk_size) }
163 fn set_error(&self, error: std::io::Error) {
164 *self.error.write().unwrap() = Err(error);
165 self.newly_read_data_signal.notify_all(); }
167
168 fn read_entire_file_in_bg<T: 'static + Read + std::marker::Send>(
174 &self,
175 mut reader: T,
176 total_buffer_budget: usize,
177 chunk_size: usize,
178 ) {
179 let now = std::time::Instant::now();
180 let mut read_us = 0;
181
182 let mut max_bytes_read = 0;
183 let mut wait_us = 0;
184 let mut total_bytes = 0;
185 let mut error = SharedBufferReader::default_error();
186 let mut remaining_buffers_to_allocate = Self::num_buffers(total_buffer_budget, chunk_size);
187 loop {
188 if self.stop.load(Ordering::Relaxed) {
189 self.set_error(std::io::Error::from(std::io::ErrorKind::TimedOut));
192 break;
193 }
194 let mut buffers = self.buffers.write().unwrap();
195 let buffer = buffers.pop();
196 drop(buffers);
197 let mut dest_data = if let Some(dest_data) = buffer {
198 assert_eq!(Arc::strong_count(&dest_data), 1);
201 dest_data
202 } else if remaining_buffers_to_allocate > 0 {
203 remaining_buffers_to_allocate -= 1;
205 Arc::new(vec![0; chunk_size])
206 } else {
207 let mut wait_for_new_buffer = Measure::start("wait_for_new_buffer");
209 self.wait_for_new_buffer();
210 wait_for_new_buffer.stop();
211 wait_us += wait_for_new_buffer.as_us();
212 continue; };
214 let target = Arc::make_mut(&mut dest_data);
215 let dest_size = target.len();
216
217 let mut bytes_read = 0;
218 let mut eof = false;
219 let mut error_received = false;
220
221 while bytes_read < dest_size {
222 let mut time_read = Measure::start("read");
223 let result = reader.read(&mut target[bytes_read..]);
227 time_read.stop();
228 read_us += time_read.as_us();
229 match result {
230 Ok(size) => {
231 if size == 0 {
232 eof = true;
233 break;
234 }
235 total_bytes += size;
236 max_bytes_read = std::cmp::max(max_bytes_read, size);
237 bytes_read += size;
238 }
240 Err(err) => {
241 error_received = true;
242 error = err;
243 break;
244 }
245 }
246 }
247
248 if bytes_read > 0 {
249 target.truncate(bytes_read);
251 let mut data = self.newly_read_data.write().unwrap();
252 data.push(dest_data);
253 drop(data);
254 self.newly_read_data_signal.notify_all();
255 }
256
257 if eof {
258 self.bg_eof_reached.store(true, Ordering::Relaxed);
259 self.newly_read_data_signal.notify_all(); break;
261 }
262
263 if error_received {
264 self.set_error(error);
267 break;
268 }
269 }
270
271 info!(
272 "reading entire decompressed file took: {} us, bytes: {}, read_us: {}, waiting_for_buffer_us: {}, largest fetch: {}, error: {:?}",
273 now.elapsed().as_micros(),
274 total_bytes,
275 read_us,
276 wait_us,
277 max_bytes_read,
278 self.error.read().unwrap()
279 );
280 }
281}
282
283impl SharedBufferInternal {
284 fn wait_for_newly_read_data(&self) -> bool {
285 self.bg_reader_data
286 .newly_read_data_signal
287 .wait_timeout(SharedBufferBgReader::default_wait_timeout())
288 }
289 fn transfer_data_from_bg(&self) -> bool {
294 let mut from_lock = self.bg_reader_data.newly_read_data.write().unwrap();
295 if from_lock.is_empty() {
296 return false;
298 }
299 let mut newly_read_data: Vec<OneSharedBuffer> = std::mem::take(&mut *from_lock);
301 let mut to_lock = self.data.write().unwrap();
303 drop(from_lock);
305 to_lock.append(&mut newly_read_data);
306 true }
308 fn has_reached_eof(&self) -> bool {
309 self.bg_reader_data.bg_eof_reached.load(Ordering::Relaxed)
310 }
311}
312
313impl SharedBufferReader {
315 pub fn new(original_instance: &SharedBuffer) -> Self {
316 let original_instance = &original_instance.instance;
317 let current_buffer_index = 0;
318 let mut list = original_instance.clients.write().unwrap();
319 let my_client_index = list.len();
320 if my_client_index > 0 {
321 let current_min = list.iter().min().unwrap();
322 if current_min > &0 {
323 drop(list);
324 panic!("SharedBufferReaders must all be created before the first one reads");
325 }
326 }
327 list.push(current_buffer_index);
328 drop(list);
329
330 Self {
331 instance: Arc::clone(original_instance),
332 my_client_index,
333 current_buffer_index,
334 index_in_current_data: 0,
335 current_data: original_instance.data.read().unwrap()[0].clone(),
338 empty_buffer: original_instance.empty_buffer.clone(),
339 }
340 }
341 fn default_error() -> std::io::Error {
342 std::io::Error::from(std::io::ErrorKind::TimedOut)
344 }
345 fn client_done_reading(&mut self) {
346 self.update_client_index(usize::MAX);
348 }
349
350 fn update_client_index(&mut self, new_buffer_index: usize) {
353 let previous_buffer_index = self.current_buffer_index;
354 self.current_buffer_index = new_buffer_index;
355 let client_index = self.my_client_index;
356 let mut indexes = self.instance.clients.write().unwrap();
357 indexes[client_index] = new_buffer_index;
358 drop(indexes);
359 let mut new_min = *self.instance.clients.read().unwrap().iter().min().unwrap();
360 new_min = std::cmp::min(new_min, self.instance.data.read().unwrap().len());
362
363 if new_min > previous_buffer_index {
365 let eof = self.instance.has_reached_eof();
367
368 for recycle in previous_buffer_index..new_min {
369 let remove = {
370 let mut data = self.instance.data.write().unwrap();
371 std::mem::replace(&mut data[recycle], self.empty_buffer.clone())
372 };
373 if remove.is_empty() {
374 continue; }
376
377 if !eof {
378 self.instance
381 .bg_reader_data
382 .buffers
383 .write()
384 .unwrap()
385 .push(remove);
386 self.instance.bg_reader_data.new_buffer_signal.notify_all();
387 }
389 }
390 }
391 }
392}
393
394impl Read for SharedBufferReader {
395 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
397 let dest_len = buf.len();
398 let mut offset_in_dest = 0;
399
400 let mut eof_seen = false;
401 'outer: while offset_in_dest < dest_len {
402 let source = &*self.current_data;
404
405 let remaining_source_len = source.len() - self.index_in_current_data;
406 let bytes_to_transfer = std::cmp::min(dest_len - offset_in_dest, remaining_source_len);
407 buf[offset_in_dest..(offset_in_dest + bytes_to_transfer)].copy_from_slice(
409 &source
410 [self.index_in_current_data..(self.index_in_current_data + bytes_to_transfer)],
411 );
412 self.index_in_current_data += bytes_to_transfer;
413 offset_in_dest += bytes_to_transfer;
414
415 if offset_in_dest >= dest_len {
416 break;
417 }
418
419 self.current_data = self.empty_buffer.clone(); self.index_in_current_data = 0;
423 self.update_client_index(self.current_buffer_index + 1);
424
425 let instance = &*self.instance;
426 let mut lock;
427 loop {
429 lock = instance.data.read().unwrap();
430 if self.current_buffer_index < lock.len() {
431 break;
432 }
433 drop(lock);
434
435 if self.instance.transfer_data_from_bg() {
436 continue;
437 }
438
439 lock = instance.data.read().unwrap();
441 if self.current_buffer_index < lock.len() {
442 break;
443 }
444 drop(lock);
445
446 if eof_seen {
447 break 'outer;
449 }
450
451 if instance.has_reached_eof() {
454 eof_seen = true;
455 continue;
456 }
457
458 {
459 let mut error = instance.bg_reader_data.error.write().unwrap();
463 if error.is_err() {
464 return std::mem::replace(&mut *error, Err(Self::default_error()));
467 }
468 }
469
470 instance.wait_for_newly_read_data();
472 }
473
474 self.current_data = Arc::clone(&lock[self.current_buffer_index]);
476 }
477 Ok(offset_in_dest)
478 }
479}
480
481#[cfg(test)]
482pub mod tests {
483 use {
484 super::*,
485 crossbeam_channel::{unbounded, Receiver},
486 rayon::prelude::*,
487 };
488
489 type SimpleReaderReceiverType = Receiver<(Vec<u8>, Option<std::io::Error>)>;
490 struct SimpleReader {
491 pub receiver: SimpleReaderReceiverType,
492 pub data: Vec<u8>,
493 pub done: bool,
494 pub err: Option<std::io::Error>,
495 }
496 impl SimpleReader {
497 fn new(receiver: SimpleReaderReceiverType) -> Self {
498 Self {
499 receiver,
500 data: Vec::default(),
501 done: false,
502 err: None,
503 }
504 }
505 }
506
507 impl Read for SimpleReader {
508 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
509 if !self.done && self.data.is_empty() {
510 let (mut data, err) = self.receiver.recv().unwrap();
511 if err.is_some() {
512 self.err = err;
513 }
514 if data.is_empty() {
515 self.done = true;
516 } else {
517 self.data.append(&mut data);
518 }
519 }
520 if self.err.is_some() {
521 return Err(self.err.take().unwrap());
522 }
523 let len_request = buf.len();
524 let len_data = self.data.len();
525 let to_read = std::cmp::min(len_request, len_data);
526 buf[0..to_read].copy_from_slice(&self.data[0..to_read]);
527 self.data.drain(0..to_read);
528 Ok(to_read)
529 }
530 }
531
532 #[test]
533 #[should_panic(expected = "total_buffer_budget > 0")]
534 fn test_shared_buffer_buffers_invalid() {
535 solana_logger::setup();
536 let (_sender, receiver) = unbounded();
537 let file = SimpleReader::new(receiver);
538 SharedBuffer::new_with_sizes(0, 1, file);
539 }
540
541 #[test]
542 #[should_panic(expected = "chunk_size > 0")]
543 fn test_shared_buffer_buffers_invalid2() {
544 solana_logger::setup();
545 let (_sender, receiver) = unbounded();
546 let file = SimpleReader::new(receiver);
547 SharedBuffer::new_with_sizes(1, 0, file);
548 }
549
550 #[test]
551 #[should_panic(expected = "SharedBufferReaders must all be created before the first one reads")]
552 fn test_shared_buffer_start_too_late() {
553 solana_logger::setup();
554 let (sender, receiver) = unbounded();
555 let file = SimpleReader::new(receiver);
556 let shared_buffer = SharedBuffer::new(file);
557 let mut reader = SharedBufferReader::new(&shared_buffer);
558 let mut data = Vec::new();
559 let done_signal = vec![];
560
561 let sent = vec![1, 2, 3];
562 let _ = sender.send((sent, None));
563 let _ = sender.send((done_signal, None));
564 assert!(reader.read_to_end(&mut data).is_ok());
565 SharedBufferReader::new(&shared_buffer); }
567
568 #[test]
569 fn test_shared_buffer_simple_read_to_end() {
570 solana_logger::setup();
571 let (sender, receiver) = unbounded();
572 let file = SimpleReader::new(receiver);
573 let shared_buffer = SharedBuffer::new(file);
574 let mut reader = SharedBufferReader::new(&shared_buffer);
575 let mut data = Vec::new();
576 let done_signal = vec![];
577
578 let sent = vec![1, 2, 3];
579 let _ = sender.send((sent.clone(), None));
580 let _ = sender.send((done_signal, None));
581 assert!(reader.read_to_end(&mut data).is_ok());
582 assert_eq!(sent, data);
583 }
584
585 fn get_error() -> std::io::Error {
586 std::io::Error::from(std::io::ErrorKind::WriteZero)
587 }
588
589 #[test]
590 fn test_shared_buffer_simple_read() {
591 solana_logger::setup();
592 let (sender, receiver) = unbounded();
593 let file = SimpleReader::new(receiver);
594 let shared_buffer = SharedBuffer::new(file);
595 let mut reader = SharedBufferReader::new(&shared_buffer);
596 let done_signal = vec![];
597
598 let sent = vec![1, 2, 3];
599 let mut data = vec![0; sent.len()];
600 let _ = sender.send((sent.clone(), None));
601 let _ = sender.send((done_signal, None));
602 assert_eq!(reader.read(&mut data[..]).unwrap(), sent.len());
603 assert_eq!(sent, data);
604 }
605
606 #[test]
607 fn test_shared_buffer_error() {
608 solana_logger::setup();
609 let (sender, receiver) = unbounded();
610 let file = SimpleReader::new(receiver);
611 let shared_buffer = SharedBuffer::new(file);
612 let mut reader = SharedBufferReader::new(&shared_buffer);
613 let mut data = Vec::new();
614 let done_signal = vec![];
615
616 let _ = sender.send((done_signal, Some(get_error())));
617 assert_eq!(
618 reader.read_to_end(&mut data).unwrap_err().kind(),
619 get_error().kind()
620 );
621 }
622
623 #[test]
624 fn test_shared_buffer_2_errors() {
625 solana_logger::setup();
626 let (sender, receiver) = unbounded();
627 let file = SimpleReader::new(receiver);
628 let shared_buffer = SharedBuffer::new(file);
629 let mut reader = SharedBufferReader::new(&shared_buffer);
630 let mut reader2 = SharedBufferReader::new(&shared_buffer);
631 let mut data = Vec::new();
632 let done_signal = vec![];
633
634 let _ = sender.send((done_signal, Some(get_error())));
635 assert_eq!(
636 reader.read_to_end(&mut data).unwrap_err().kind(),
637 get_error().kind()
638 );
639 assert_eq!(
641 reader2.read_to_end(&mut data).unwrap_err().kind(),
642 SharedBufferReader::default_error().kind()
643 );
644 }
645
646 #[test]
647 fn test_shared_buffer_2_errors_after_read() {
648 solana_logger::setup();
649 let (sender, receiver) = unbounded();
650 let file = SimpleReader::new(receiver);
651 let shared_buffer = SharedBuffer::new(file);
652 let mut reader = SharedBufferReader::new(&shared_buffer);
653 let mut reader2 = SharedBufferReader::new(&shared_buffer);
654 let mut data = Vec::new();
655 let done_signal = vec![];
656
657 let sent = vec![1, 2, 3];
659 let _ = sender.send((sent.clone(), None));
660 let _ = sender.send((done_signal, Some(get_error())));
662 assert_eq!(
663 reader.read_to_end(&mut data).unwrap_err().kind(),
664 get_error().kind()
665 );
666 let mut data = vec![0; sent.len()];
668 assert_eq!(reader2.read(&mut data[..]).unwrap(), sent.len(),);
670 assert_eq!(sent, data);
671 assert_eq!(
672 reader2.read_to_end(&mut data).unwrap_err().kind(),
673 SharedBufferReader::default_error().kind()
674 );
675 }
676
677 #[test]
678 fn test_shared_buffer_2_errors_after_read2() {
679 solana_logger::setup();
680 let (sender, receiver) = unbounded();
681 let file = SimpleReader::new(receiver);
682 let shared_buffer = SharedBuffer::new(file);
683 let mut reader = SharedBufferReader::new(&shared_buffer);
684 let mut reader2 = SharedBufferReader::new(&shared_buffer);
685 let mut data = Vec::new();
686 let done_signal = vec![];
687
688 let sent = vec![1, 2, 3];
690 let _ = sender.send((sent.clone(), None));
691 let _ = sender.send((done_signal, Some(get_error())));
693 assert_eq!(
694 reader.read_to_end(&mut data).unwrap_err().kind(),
695 get_error().kind()
696 );
697 let mut data = vec![0; sent.len()];
699 let expected_len = 1;
701 for i in 0..sent.len() {
702 let len = reader2.read(&mut data[i..=i]);
703 assert!(len.is_ok(), "{len:?}, progress: {i}");
704 assert_eq!(len.unwrap(), expected_len, "progress: {i}");
705 }
706 assert_eq!(sent, data);
707 assert_eq!(
708 reader2.read(&mut data[0..=0]).unwrap_err().kind(),
709 SharedBufferReader::default_error().kind()
710 );
711 }
712
713 fn test_read_all(
715 reader: &mut SharedBufferReader,
716 individual_read_size: Option<usize>,
717 ) -> Vec<u8> {
718 let mut data = Vec::new();
719 match individual_read_size {
720 Some(size) => {
721 loop {
722 let mut buffer = vec![0; size];
723 let result = reader.read(&mut buffer[..]);
724 assert!(result.is_ok());
725 let len = result.unwrap();
726 if len == 0 {
727 break; }
729 buffer.truncate(len);
730 data.append(&mut buffer);
731 }
732 }
733 None => {
734 let result = reader.read_to_end(&mut data);
735 assert!(result.is_ok());
736 assert_eq!(result.unwrap(), data.len());
737 }
738 }
739 data
740 }
741
742 #[test]
743 fn test_shared_buffer_drop_reader2() {
744 let done_signal = vec![];
745 let (sender, receiver) = unbounded();
746 let file = SimpleReader::new(receiver);
747 let budget_sz = 100;
748 let chunk_sz = 10;
749 let shared_buffer = SharedBuffer::new_with_sizes(budget_sz, chunk_sz, file);
750 let size = budget_sz * 2;
751 let mut reader = SharedBufferReader::new(&shared_buffer);
752 let reader2 = SharedBufferReader::new(&shared_buffer);
756
757 let sent = (0..size)
758 .map(|i| ((i + size) % 256) as u8)
759 .collect::<Vec<_>>();
760
761 let _ = sender.send((sent.clone(), None));
762 let _ = sender.send((done_signal, None));
763
764 let mut data = vec![0; budget_sz];
766 assert!(reader.read(&mut data[0..budget_sz]).is_ok());
767 drop(reader2);
768 let mut rest = test_read_all(&mut reader, None);
769 data.append(&mut rest);
770 assert_eq!(sent, data);
771 }
772
773 fn adjusted_buffer_size(total_buffer_budget: usize, chunk_size: usize) -> usize {
774 let num_buffers = SharedBufferBgReader::num_buffers(total_buffer_budget, chunk_size);
775 num_buffers * chunk_size
776 }
777
778 #[test]
779 fn test_shared_buffer_sweep() {
780 solana_logger::setup();
781 for chunk_sz in [1, 2, 10] {
784 let equivalent_buffer_sz =
786 chunk_sz * (TOTAL_BUFFER_BUDGET_DEFAULT / CHUNK_SIZE_DEFAULT);
787 for budget_sz in [
789 1,
790 chunk_sz,
791 chunk_sz * 2,
792 equivalent_buffer_sz - 1,
793 equivalent_buffer_sz,
794 equivalent_buffer_sz * 2,
795 ] {
796 for read_sz in [0, 1, chunk_sz - 1, chunk_sz, chunk_sz + 1] {
797 let read_sz = if read_sz > 0 { Some(read_sz) } else { None };
798 for reader_ct in 1..=3 {
799 for data_size in [
800 0,
801 1,
802 chunk_sz - 1,
803 chunk_sz,
804 chunk_sz + 1,
805 chunk_sz * 2 - 1,
806 chunk_sz * 2,
807 chunk_sz * 2 + 1,
808 budget_sz - 1,
809 budget_sz,
810 budget_sz + 1,
811 budget_sz * 2,
812 budget_sz * 2 - 1,
813 budget_sz * 2 + 1,
814 ] {
815 let adjusted_budget_sz = adjusted_buffer_size(budget_sz, chunk_sz);
816 let done_signal = vec![];
817 let (sender, receiver) = unbounded();
818 let file = SimpleReader::new(receiver);
819 let shared_buffer =
820 SharedBuffer::new_with_sizes(budget_sz, chunk_sz, file);
821 let mut reader = SharedBufferReader::new(&shared_buffer);
822 let second_reader = reader_ct > 1
826 && data_size < adjusted_budget_sz
827 && read_sz
828 .as_ref()
829 .map(|sz| sz < &adjusted_budget_sz)
830 .unwrap_or(true);
831 let reader2 = if second_reader {
832 Some(SharedBufferReader::new(&shared_buffer))
833 } else {
834 None
835 };
836 let sent = (0..data_size)
837 .map(|i| ((i + data_size) % 256) as u8)
838 .collect::<Vec<_>>();
839
840 let parallel_reader = reader_ct > 2;
841 let handle = if parallel_reader {
842 let threads = std::cmp::min(8, rayon::current_num_threads());
845 Some({
846 let parallel = (0..threads)
847 .map(|_| {
848 let reader_ = SharedBufferReader::new(&shared_buffer);
850 let sent_ = sent.clone();
851 (reader_, sent_)
852 })
853 .collect::<Vec<_>>();
854
855 Builder::new()
856 .spawn(move || {
857 parallel.into_par_iter().for_each(
858 |(mut reader, sent)| {
859 let data = test_read_all(&mut reader, read_sz);
860 assert_eq!(
861 sent,
862 data,
863 "{:?}",
864 (
865 chunk_sz,
866 budget_sz,
867 read_sz,
868 reader_ct,
869 data_size,
870 adjusted_budget_sz
871 )
872 );
873 },
874 )
875 })
876 .unwrap()
877 })
878 } else {
879 None
880 };
881 drop(shared_buffer); let _ = sender.send((sent.clone(), None));
883 let _ = sender.send((done_signal, None));
884 let data = test_read_all(&mut reader, read_sz);
885 assert_eq!(
886 sent,
887 data,
888 "{:?}",
889 (
890 chunk_sz,
891 budget_sz,
892 read_sz,
893 reader_ct,
894 data_size,
895 adjusted_budget_sz
896 )
897 );
898 if second_reader {
900 let data = test_read_all(&mut reader2.unwrap(), read_sz);
902 assert_eq!(sent, data);
903 }
904 if parallel_reader {
905 assert!(handle.unwrap().join().is_ok());
906 }
907 }
908 }
909 }
910 }
911 }
912 }
913}