lance_io/
scheduler.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use bytes::Bytes;
5use futures::channel::oneshot;
6use futures::{FutureExt, TryFutureExt};
7use object_store::path::Path;
8use snafu::location;
9use std::collections::BinaryHeap;
10use std::fmt::Debug;
11use std::future::Future;
12use std::ops::Range;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::{Arc, Mutex};
15use std::time::Instant;
16use tokio::sync::{Notify, Semaphore, SemaphorePermit};
17
18use lance_core::{Error, Result};
19
20use crate::object_store::ObjectStore;
21use crate::traits::Reader;
22
23// Don't log backpressure warnings until at least this many seconds have passed
24const BACKPRESSURE_MIN: u64 = 5;
25// Don't log backpressure warnings more than once / minute
26const BACKPRESSURE_DEBOUNCE: u64 = 60;
27
28// Global counter of how many IOPS we have issued
29static IOPS_COUNTER: AtomicU64 = AtomicU64::new(0);
30// Global counter of how many bytes were read by the scheduler
31static BYTES_READ_COUNTER: AtomicU64 = AtomicU64::new(0);
32
33pub fn iops_counter() -> u64 {
34    IOPS_COUNTER.load(Ordering::Acquire)
35}
36
37pub fn bytes_read_counter() -> u64 {
38    BYTES_READ_COUNTER.load(Ordering::Acquire)
39}
40
41// There are two structures that control the I/O scheduler concurrency.  First,
42// we have a hard limit on the number of IOPS that can be issued concurrently.
43// This limit is process-wide.
44//
45// Second, we try and limit how many I/O requests can be buffered in memory without
46// being consumed by a decoder of some kind.  This limit is per-scheduler.  We cannot
47// make this limit process wide without introducing deadlock (because the decoder for
48// file 0 might be waiting on IOPS blocked by a queue filled with requests for file 1)
49// and vice-versa.
50//
51// There is also a per-scan limit on the number of IOPS that can be issued concurrently.
52//
53// The process-wide limit exists when users need a hard limit on the number of parallel
54// IOPS, e.g. due to port availability limits or to prevent multiple scans from saturating
55// the network.  (Note: a process-wide limit of X will not necessarily limit the number of
56// open TCP connections to exactly X.  The underlying object store may open more connections
57// anyways)
58//
59// However, it can be too tough in some cases, e.g. when some scans are reading from
60// cloud storage and other scans are reading from local disk.  In these cases users don't
61// need to set a process-limit and can rely on the per-scan limits.
62
63// The IopsQuota enforces the first of the above limits, it is the per-process hard cap
64// on the number of IOPS that can be issued concurrently.
65//
66// The per-scan limits are enforced by IoQueue
67struct IopsQuota {
68    // An Option is used here to avoid mutex overhead if no limit is set
69    iops_avail: Option<Semaphore>,
70}
71
72/// A reservation on the global IOPS quota
73///
74/// When the reservation is dropped, the IOPS quota is released unless
75/// [`Self::forget`] is called.
76struct IopsReservation<'a> {
77    value: Option<SemaphorePermit<'a>>,
78}
79
80impl IopsReservation<'_> {
81    // Forget the reservation, so it won't be released on drop
82    fn forget(&mut self) {
83        if let Some(value) = self.value.take() {
84            value.forget();
85        }
86    }
87}
88
89impl IopsQuota {
90    // By default, there is no process-wide limit on IOPS
91    //
92    // However, the user can request one by setting the environment variable
93    // LANCE_PROCESS_IO_THREADS_LIMIT to a positive integer.
94    fn new() -> Self {
95        let initial_capacity = std::env::var("LANCE_PROCESS_IO_THREADS_LIMIT")
96            .map(|s| {
97                let limit = s
98                    .parse::<i32>()
99                    .expect("LANCE_PROCESS_IO_THREADS_LIMIT must be a positive integer");
100                if limit <= 0 {
101                    panic!("LANCE_PROCESS_IO_THREADS_LIMIT must be a positive integer.  To disable the limit, unset the environment variable");
102                }
103                limit
104            })
105            // The default (-1) does not apply any limit
106            .unwrap_or(-1);
107        let iops_avail = if initial_capacity < 0 {
108            None
109        } else {
110            Some(Semaphore::new(initial_capacity as usize))
111        };
112        Self { iops_avail }
113    }
114
115    // Return a reservation on the global IOPS quota
116    fn release(&self) {
117        if let Some(iops_avail) = self.iops_avail.as_ref() {
118            iops_avail.add_permits(1);
119        }
120    }
121
122    // Acquire a reservation on the global IOPS quota
123    async fn acquire(&self) -> IopsReservation {
124        if let Some(iops_avail) = self.iops_avail.as_ref() {
125            IopsReservation {
126                value: Some(iops_avail.acquire().await.unwrap()),
127            }
128        } else {
129            IopsReservation { value: None }
130        }
131    }
132}
133
134lazy_static::lazy_static! {
135    static ref IOPS_QUOTA: IopsQuota = IopsQuota::new();
136}
137
138// We want to allow requests that have a lower priority than any
139// currently in-flight request.  This helps avoid potential deadlocks
140// related to backpressure.  Unfortunately, it is quite expensive to
141// keep track of which priorities are in-flight.
142//
143// TODO: At some point it would be nice if we can optimize this away but
144// in_flight should remain relatively small (generally less than 256 items)
145// and has not shown itself to be a bottleneck yet.
146struct PrioritiesInFlight {
147    in_flight: Vec<u128>,
148}
149
150impl PrioritiesInFlight {
151    fn new(capacity: u32) -> Self {
152        Self {
153            in_flight: Vec::with_capacity(capacity as usize * 2),
154        }
155    }
156
157    fn min_in_flight(&self) -> u128 {
158        self.in_flight.first().copied().unwrap_or(u128::MAX)
159    }
160
161    fn push(&mut self, prio: u128) {
162        let pos = match self.in_flight.binary_search(&prio) {
163            Ok(pos) => pos,
164            Err(pos) => pos,
165        };
166        self.in_flight.insert(pos, prio);
167    }
168
169    fn remove(&mut self, prio: u128) {
170        if let Ok(pos) = self.in_flight.binary_search(&prio) {
171            self.in_flight.remove(pos);
172        } else {
173            unreachable!();
174        }
175    }
176}
177
178struct IoQueueState {
179    // Number of IOPS we can issue concurrently before pausing I/O
180    iops_avail: u32,
181    // Number of bytes we are allowed to buffer in memory before pausing I/O
182    //
183    // This can dip below 0 due to I/O prioritization
184    bytes_avail: i64,
185    // Pending I/O requests
186    pending_requests: BinaryHeap<IoTask>,
187    // Priorities of in-flight requests
188    priorities_in_flight: PrioritiesInFlight,
189    // Set when the scheduler is finished to notify the I/O loop to shut down
190    // once all outstanding requests have been completed.
191    done_scheduling: bool,
192    // Time when the scheduler started
193    start: Instant,
194    // Last time we warned about backpressure
195    last_warn: AtomicU64,
196}
197
198impl IoQueueState {
199    fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
200        Self {
201            iops_avail: io_capacity,
202            bytes_avail: io_buffer_size as i64,
203            pending_requests: BinaryHeap::new(),
204            priorities_in_flight: PrioritiesInFlight::new(io_capacity),
205            done_scheduling: false,
206            start: Instant::now(),
207            last_warn: AtomicU64::from(0),
208        }
209    }
210
211    fn finished(&self) -> bool {
212        self.done_scheduling && self.pending_requests.is_empty()
213    }
214
215    fn warn_if_needed(&self) {
216        let seconds_elapsed = self.start.elapsed().as_secs();
217        let last_warn = self.last_warn.load(Ordering::Acquire);
218        let since_last_warn = seconds_elapsed - last_warn;
219        if (last_warn == 0
220            && seconds_elapsed > BACKPRESSURE_MIN
221            && seconds_elapsed < BACKPRESSURE_DEBOUNCE)
222            || since_last_warn > BACKPRESSURE_DEBOUNCE
223        {
224            tracing::event!(tracing::Level::WARN, "Backpressure throttle exceeded");
225            log::debug!("Backpressure throttle is full, I/O will pause until buffer is drained.  Max I/O bandwidth will not be achieved because CPU is falling behind");
226            self.last_warn
227                .store(seconds_elapsed.max(1), Ordering::Release);
228        }
229    }
230
231    fn can_deliver(&self, task: &IoTask) -> bool {
232        if self.iops_avail == 0 {
233            false
234        } else if task.priority <= self.priorities_in_flight.min_in_flight() {
235            true
236        } else if task.num_bytes() as i64 > self.bytes_avail {
237            self.warn_if_needed();
238            false
239        } else {
240            true
241        }
242    }
243
244    fn next_task(&mut self) -> Option<IoTask> {
245        let task = self.pending_requests.peek()?;
246        if self.can_deliver(task) {
247            self.priorities_in_flight.push(task.priority);
248            self.iops_avail -= 1;
249            self.bytes_avail -= task.num_bytes() as i64;
250            if self.bytes_avail < 0 {
251                // This can happen when we admit special priority requests
252                log::debug!(
253                    "Backpressure throttle temporarily exceeded by {} bytes due to priority I/O",
254                    -self.bytes_avail
255                );
256            }
257            Some(self.pending_requests.pop().unwrap())
258        } else {
259            None
260        }
261    }
262}
263
264// This is modeled after the MPSC queue described here: https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html
265//
266// However, it only needs to be SPSC since there is only one "scheduler thread"
267// and one I/O loop.
268struct IoQueue {
269    // Queue state
270    state: Mutex<IoQueueState>,
271    // Used to signal new I/O requests have arrived that might potentially be runnable
272    notify: Notify,
273}
274
275impl IoQueue {
276    fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
277        Self {
278            state: Mutex::new(IoQueueState::new(io_capacity, io_buffer_size)),
279            notify: Notify::new(),
280        }
281    }
282
283    fn push(&self, task: IoTask) {
284        log::trace!(
285            "Inserting I/O request for {} bytes with priority ({},{}) into I/O queue",
286            task.num_bytes(),
287            task.priority >> 64,
288            task.priority & 0xFFFFFFFFFFFFFFFF
289        );
290        let mut state = self.state.lock().unwrap();
291        state.pending_requests.push(task);
292        drop(state);
293
294        self.notify.notify_one();
295    }
296
297    async fn pop(&self) -> Option<IoTask> {
298        loop {
299            {
300                // First, grab a reservation on the global IOPS quota
301                // If we then get a task to run, transfer the reservation
302                // to the task.  Otherwise, the reservation will be released
303                // when iop_res is dropped.
304                let mut iop_res = IOPS_QUOTA.acquire().await;
305                // Next, try and grab a reservation from the queue
306                let mut state = self.state.lock().unwrap();
307                if let Some(task) = state.next_task() {
308                    // Reservation successfully acquired, we will release the global
309                    // global reservation after task has run.
310                    iop_res.forget();
311                    return Some(task);
312                }
313
314                if state.finished() {
315                    return None;
316                }
317            }
318
319            self.notify.notified().await;
320        }
321    }
322
323    fn on_iop_complete(&self) {
324        let mut state = self.state.lock().unwrap();
325        state.iops_avail += 1;
326        drop(state);
327
328        self.notify.notify_one();
329    }
330
331    fn on_bytes_consumed(&self, bytes: u64, priority: u128, num_reqs: usize) {
332        let mut state = self.state.lock().unwrap();
333        state.bytes_avail += bytes as i64;
334        for _ in 0..num_reqs {
335            state.priorities_in_flight.remove(priority);
336        }
337        drop(state);
338
339        self.notify.notify_one();
340    }
341
342    fn close(&self) {
343        let mut state = self.state.lock().unwrap();
344        state.done_scheduling = true;
345        drop(state);
346
347        self.notify.notify_one();
348    }
349}
350
351// There is one instance of MutableBatch shared by all the I/O operations
352// that make up a single request.  When all the I/O operations complete
353// then the MutableBatch goes out of scope and the batch request is considered
354// complete
355struct MutableBatch<F: FnOnce(Response) + Send> {
356    when_done: Option<F>,
357    data_buffers: Vec<Bytes>,
358    num_bytes: u64,
359    priority: u128,
360    num_reqs: usize,
361    err: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
362}
363
364impl<F: FnOnce(Response) + Send> MutableBatch<F> {
365    fn new(when_done: F, num_data_buffers: u32, priority: u128, num_reqs: usize) -> Self {
366        Self {
367            when_done: Some(when_done),
368            data_buffers: vec![Bytes::default(); num_data_buffers as usize],
369            num_bytes: 0,
370            priority,
371            num_reqs,
372            err: None,
373        }
374    }
375}
376
377// Rather than keep track of when all the I/O requests are finished so that we
378// can deliver the batch of data we let Rust do that for us.  When all I/O's are
379// done then the MutableBatch will go out of scope and we know we have all the
380// data.
381impl<F: FnOnce(Response) + Send> Drop for MutableBatch<F> {
382    fn drop(&mut self) {
383        // If we have an error, return that.  Otherwise return the data
384        let result = if self.err.is_some() {
385            Err(Error::Wrapped {
386                error: self.err.take().unwrap(),
387                location: location!(),
388            })
389        } else {
390            let mut data = Vec::new();
391            std::mem::swap(&mut data, &mut self.data_buffers);
392            Ok(data)
393        };
394        // We don't really care if no one is around to receive it, just let
395        // the result go out of scope and get cleaned up
396        let response = Response {
397            data: result,
398            num_bytes: self.num_bytes,
399            priority: self.priority,
400            num_reqs: self.num_reqs,
401        };
402        (self.when_done.take().unwrap())(response);
403    }
404}
405
406struct DataChunk {
407    task_idx: usize,
408    num_bytes: u64,
409    data: Result<Bytes>,
410}
411
412trait DataSink: Send {
413    fn deliver_data(&mut self, data: DataChunk);
414}
415
416impl<F: FnOnce(Response) + Send> DataSink for MutableBatch<F> {
417    // Called by worker tasks to add data to the MutableBatch
418    fn deliver_data(&mut self, data: DataChunk) {
419        self.num_bytes += data.num_bytes;
420        match data.data {
421            Ok(data_bytes) => {
422                self.data_buffers[data.task_idx] = data_bytes;
423            }
424            Err(err) => {
425                // This keeps the original error, if present
426                self.err.get_or_insert(Box::new(err));
427            }
428        }
429    }
430}
431
432struct IoTask {
433    reader: Arc<dyn Reader>,
434    to_read: Range<u64>,
435    when_done: Box<dyn FnOnce(Result<Bytes>) + Send>,
436    priority: u128,
437}
438
439impl Eq for IoTask {}
440
441impl PartialEq for IoTask {
442    fn eq(&self, other: &Self) -> bool {
443        self.priority == other.priority
444    }
445}
446
447impl PartialOrd for IoTask {
448    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
449        Some(self.cmp(other))
450    }
451}
452
453impl Ord for IoTask {
454    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
455        // This is intentionally inverted.  We want a min-heap
456        other.priority.cmp(&self.priority)
457    }
458}
459
460impl IoTask {
461    fn num_bytes(&self) -> u64 {
462        self.to_read.end - self.to_read.start
463    }
464
465    async fn run(self) {
466        let bytes = if self.to_read.start == self.to_read.end {
467            Ok(Bytes::new())
468        } else {
469            let bytes_fut = self
470                .reader
471                .get_range(self.to_read.start as usize..self.to_read.end as usize);
472            IOPS_COUNTER.fetch_add(1, Ordering::Release);
473            BYTES_READ_COUNTER.fetch_add(self.num_bytes(), Ordering::Release);
474            bytes_fut.await.map_err(Error::from)
475        };
476        IOPS_QUOTA.release();
477        (self.when_done)(bytes);
478    }
479}
480
481// Every time a scheduler starts up it launches a task to run the I/O loop.  This loop
482// repeats endlessly until the scheduler is destroyed.
483async fn run_io_loop(tasks: Arc<IoQueue>) {
484    // Pop the first finished task off the queue and submit another until
485    // we are done
486    loop {
487        let next_task = tasks.pop().await;
488        match next_task {
489            Some(task) => {
490                tokio::spawn(task.run());
491            }
492            None => {
493                // The sender has been dropped, we are done
494                return;
495            }
496        }
497    }
498}
499
500/// An I/O scheduler which wraps an ObjectStore and throttles the amount of
501/// parallel I/O that can be run.
502///
503/// TODO: This will also add coalescing
504pub struct ScanScheduler {
505    object_store: Arc<ObjectStore>,
506    io_queue: Arc<IoQueue>,
507}
508
509impl Debug for ScanScheduler {
510    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
511        f.debug_struct("ScanScheduler")
512            .field("object_store", &self.object_store)
513            .finish()
514    }
515}
516
517struct Response {
518    data: Result<Vec<Bytes>>,
519    priority: u128,
520    num_reqs: usize,
521    num_bytes: u64,
522}
523
524#[derive(Debug, Clone, Copy)]
525pub struct SchedulerConfig {
526    /// the # of bytes that can be buffered but not yet requested.
527    /// This controls back pressure.  If data is not processed quickly enough then this
528    /// buffer will fill up and the I/O loop will pause until the buffer is drained.
529    pub io_buffer_size_bytes: u64,
530}
531
532impl SchedulerConfig {
533    /// Big enough for unit testing
534    pub fn default_for_testing() -> Self {
535        Self {
536            io_buffer_size_bytes: 256 * 1024 * 1024,
537        }
538    }
539
540    /// Configuration that should generally maximize bandwidth (not trying to save RAM
541    /// at all).  We assume a max page size of 32MiB and then allow 32MiB per I/O thread
542    pub fn max_bandwidth(store: &ObjectStore) -> Self {
543        Self {
544            io_buffer_size_bytes: 32 * 1024 * 1024 * store.io_parallelism() as u64,
545        }
546    }
547}
548
549impl ScanScheduler {
550    /// Create a new scheduler with the given I/O capacity
551    ///
552    /// # Arguments
553    ///
554    /// * object_store - the store to wrap
555    /// * config - configuration settings for the scheduler
556    pub fn new(object_store: Arc<ObjectStore>, config: SchedulerConfig) -> Arc<Self> {
557        let io_capacity = object_store.io_parallelism();
558        let io_queue = Arc::new(IoQueue::new(
559            io_capacity as u32,
560            config.io_buffer_size_bytes,
561        ));
562        let scheduler = Self {
563            object_store,
564            io_queue: io_queue.clone(),
565        };
566        tokio::task::spawn(async move { run_io_loop(io_queue).await });
567        Arc::new(scheduler)
568    }
569
570    /// Open a file for reading
571    ///
572    /// # Arguments
573    ///
574    /// * path - the path to the file to open
575    /// * base_priority - the base priority for I/O requests submitted to this file scheduler
576    ///                   this will determine the upper 64 bits of priority (the lower 64 bits
577    ///                   come from `submit_request` and `submit_single`)
578    pub async fn open_file_with_priority(
579        self: &Arc<Self>,
580        path: &Path,
581        base_priority: u64,
582    ) -> Result<FileScheduler> {
583        let reader = self.object_store.open(path).await?;
584        let block_size = self.object_store.block_size() as u64;
585        Ok(FileScheduler {
586            reader: reader.into(),
587            block_size,
588            root: self.clone(),
589            base_priority,
590        })
591    }
592
593    /// Open a file with a default priority of 0
594    ///
595    /// See [`Self::open_file_with_priority`] for more information on the priority
596    pub async fn open_file(self: &Arc<Self>, path: &Path) -> Result<FileScheduler> {
597        self.open_file_with_priority(path, 0).await
598    }
599
600    fn do_submit_request(
601        &self,
602        reader: Arc<dyn Reader>,
603        request: Vec<Range<u64>>,
604        tx: oneshot::Sender<Response>,
605        priority: u128,
606    ) {
607        let num_iops = request.len() as u32;
608
609        let when_all_io_done = move |bytes_and_permits| {
610            // We don't care if the receiver has given up so discard the result
611            let _ = tx.send(bytes_and_permits);
612        };
613
614        let dest = Arc::new(Mutex::new(Box::new(MutableBatch::new(
615            when_all_io_done,
616            num_iops,
617            priority,
618            request.len(),
619        ))));
620
621        for (task_idx, iop) in request.into_iter().enumerate() {
622            let dest = dest.clone();
623            let io_queue = self.io_queue.clone();
624            let num_bytes = iop.end - iop.start;
625            let task = IoTask {
626                reader: reader.clone(),
627                to_read: iop,
628                priority,
629                when_done: Box::new(move |data| {
630                    io_queue.on_iop_complete();
631                    let mut dest = dest.lock().unwrap();
632                    let chunk = DataChunk {
633                        data,
634                        task_idx,
635                        num_bytes,
636                    };
637                    dest.deliver_data(chunk);
638                }),
639            };
640            self.io_queue.push(task);
641        }
642    }
643
644    fn submit_request(
645        &self,
646        reader: Arc<dyn Reader>,
647        request: Vec<Range<u64>>,
648        priority: u128,
649    ) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
650        let (tx, rx) = oneshot::channel::<Response>();
651
652        self.do_submit_request(reader, request, tx, priority);
653
654        let io_queue = self.io_queue.clone();
655
656        rx.map(move |wrapped_rsp| {
657            // Right now, it isn't possible for I/O to be cancelled so a cancel error should
658            // not occur
659            let rsp = wrapped_rsp.unwrap();
660            io_queue.on_bytes_consumed(rsp.num_bytes, rsp.priority, rsp.num_reqs);
661            rsp.data
662        })
663    }
664}
665
666impl Drop for ScanScheduler {
667    fn drop(&mut self) {
668        self.io_queue.close();
669    }
670}
671
672/// A throttled file reader
673#[derive(Clone, Debug)]
674pub struct FileScheduler {
675    reader: Arc<dyn Reader>,
676    root: Arc<ScanScheduler>,
677    block_size: u64,
678    base_priority: u64,
679}
680
681fn is_close_together(range1: &Range<u64>, range2: &Range<u64>, block_size: u64) -> bool {
682    // Note that range1.end <= range2.start is possible (e.g. when decoding string arrays)
683    range2.start <= (range1.end + block_size)
684}
685
686fn is_overlapping(range1: &Range<u64>, range2: &Range<u64>) -> bool {
687    range1.start < range2.end && range2.start < range1.end
688}
689
690impl FileScheduler {
691    /// Submit a batch of I/O requests to the reader
692    ///
693    /// The requests will be queued in a FIFO manner and, when all requests
694    /// have been fulfilled, the returned future will be completed.
695    ///
696    /// Each request has a given priority.  If the I/O loop is full then requests
697    /// will be buffered and requests with the *lowest* priority will be released
698    /// from the buffer first.
699    ///
700    /// Each request has a backpressure ID which controls which backpressure throttle
701    /// is applied to the request.  Requests made to the same backpressure throttle
702    /// will be throttled together.
703    pub fn submit_request(
704        &self,
705        request: Vec<Range<u64>>,
706        priority: u64,
707    ) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
708        // The final priority is a combination of the row offset and the file number
709        let priority = ((self.base_priority as u128) << 64) + priority as u128;
710
711        let mut updated_requests = Vec::with_capacity(request.len());
712
713        if !request.is_empty() {
714            let mut curr_interval = request[0].clone();
715
716            for req in request.iter().skip(1) {
717                if is_close_together(&curr_interval, req, self.block_size) {
718                    curr_interval.end = curr_interval.end.max(req.end);
719                } else {
720                    updated_requests.push(curr_interval);
721                    curr_interval = req.clone();
722                }
723            }
724
725            updated_requests.push(curr_interval);
726        }
727
728        let bytes_vec_fut =
729            self.root
730                .submit_request(self.reader.clone(), updated_requests.clone(), priority);
731
732        let mut updated_index = 0;
733        let mut final_bytes = Vec::with_capacity(request.len());
734
735        async move {
736            let bytes_vec = bytes_vec_fut.await?;
737
738            let mut orig_index = 0;
739            while (updated_index < updated_requests.len()) && (orig_index < request.len()) {
740                let updated_range = &updated_requests[updated_index];
741                let orig_range = &request[orig_index];
742                let byte_offset = updated_range.start as usize;
743
744                if is_overlapping(updated_range, orig_range) {
745                    // Rescale the ranges since they correspond to the entire set of bytes, while
746                    // But we need to slice into a subset of the bytes in a particular index of bytes_vec
747                    let start = orig_range.start as usize - byte_offset;
748                    let end = orig_range.end as usize - byte_offset;
749
750                    let sliced_range = bytes_vec[updated_index].slice(start..end);
751                    final_bytes.push(sliced_range);
752                    orig_index += 1;
753                } else {
754                    updated_index += 1;
755                }
756            }
757
758            Ok(final_bytes)
759        }
760    }
761
762    /// Submit a single IOP to the reader
763    ///
764    /// If you have multiple IOPS to perform then [`Self::submit_request`] is going
765    /// to be more efficient.
766    ///
767    /// See [`Self::submit_request`] for more information on the priority and backpressure.
768    pub fn submit_single(
769        &self,
770        range: Range<u64>,
771        priority: u64,
772    ) -> impl Future<Output = Result<Bytes>> + Send {
773        self.submit_request(vec![range], priority)
774            .map_ok(|vec_bytes| vec_bytes.into_iter().next().unwrap())
775    }
776
777    /// Provides access to the underlying reader
778    ///
779    /// Do not use this for reading data as it will bypass any I/O scheduling!
780    /// This is mainly exposed to allow metadata operations (e.g size, block_size,)
781    /// which either aren't IOPS or we don't throttle
782    pub fn reader(&self) -> &Arc<dyn Reader> {
783        &self.reader
784    }
785}
786
787#[cfg(test)]
788mod tests {
789    use std::{collections::VecDeque, time::Duration};
790
791    use futures::poll;
792    use rand::RngCore;
793    use tempfile::tempdir;
794
795    use object_store::{memory::InMemory, GetRange, ObjectStore as OSObjectStore};
796    use tokio::{runtime::Handle, time::timeout};
797    use url::Url;
798
799    use crate::{object_store::DEFAULT_DOWNLOAD_RETRY_COUNT, testing::MockObjectStore};
800
801    use super::*;
802
803    #[tokio::test]
804    async fn test_full_seq_read() {
805        let tmpdir = tempdir().unwrap();
806        let tmp_path = tmpdir.path().to_str().unwrap();
807        let tmp_path = Path::parse(tmp_path).unwrap();
808        let tmp_file = tmp_path.child("foo.file");
809
810        let obj_store = Arc::new(ObjectStore::local());
811
812        // Write 1MiB of data
813        const DATA_SIZE: u64 = 1024 * 1024;
814        let mut some_data = vec![0; DATA_SIZE as usize];
815        rand::thread_rng().fill_bytes(&mut some_data);
816        obj_store.put(&tmp_file, &some_data).await.unwrap();
817
818        let config = SchedulerConfig::default_for_testing();
819
820        let scheduler = ScanScheduler::new(obj_store, config);
821
822        let file_scheduler = scheduler.open_file(&tmp_file).await.unwrap();
823
824        // Read it back 4KiB at a time
825        const READ_SIZE: u64 = 4 * 1024;
826        let mut reqs = VecDeque::new();
827        let mut offset = 0;
828        while offset < DATA_SIZE {
829            reqs.push_back(
830                #[allow(clippy::single_range_in_vec_init)]
831                file_scheduler
832                    .submit_request(vec![offset..offset + READ_SIZE], 0)
833                    .await
834                    .unwrap(),
835            );
836            offset += READ_SIZE;
837        }
838
839        offset = 0;
840        // Note: we should get parallel I/O even though we are consuming serially
841        while offset < DATA_SIZE {
842            let data = reqs.pop_front().unwrap();
843            let actual = &data[0];
844            let expected = &some_data[offset as usize..(offset + READ_SIZE) as usize];
845            assert_eq!(expected, actual);
846            offset += READ_SIZE;
847        }
848    }
849
850    #[tokio::test]
851    async fn test_priority() {
852        let some_path = Path::parse("foo").unwrap();
853        let base_store = Arc::new(InMemory::new());
854        base_store
855            .put(&some_path, vec![0; 1000].into())
856            .await
857            .unwrap();
858
859        let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
860        let mut obj_store = MockObjectStore::default();
861        let semaphore_copy = semaphore.clone();
862        obj_store
863            .expect_get_opts()
864            .returning(move |location, options| {
865                let semaphore = semaphore.clone();
866                let base_store = base_store.clone();
867                let location = location.clone();
868                async move {
869                    semaphore.acquire().await.unwrap().forget();
870                    base_store.get_opts(&location, options).await
871                }
872                .boxed()
873            });
874        let obj_store = Arc::new(ObjectStore::new(
875            Arc::new(obj_store),
876            Url::parse("mem://").unwrap(),
877            None,
878            None,
879            false,
880            false,
881            1,
882            DEFAULT_DOWNLOAD_RETRY_COUNT,
883        ));
884
885        let config = SchedulerConfig {
886            io_buffer_size_bytes: 1024 * 1024,
887        };
888
889        let scan_scheduler = ScanScheduler::new(obj_store, config);
890
891        let file_scheduler = scan_scheduler
892            .open_file(&Path::parse("foo").unwrap())
893            .await
894            .unwrap();
895
896        // Issue a request, priority doesn't matter, it will be submitted
897        // immediately (it will go pending)
898        // Note: the timeout is to prevent a deadlock if the test fails.
899        let first_fut = timeout(
900            Duration::from_secs(10),
901            file_scheduler.submit_single(0..10, 0),
902        )
903        .boxed();
904
905        // Issue another low priority request (it will go in queue)
906        let mut second_fut = timeout(
907            Duration::from_secs(10),
908            file_scheduler.submit_single(0..20, 100),
909        )
910        .boxed();
911
912        // Issue a high priority request (it will go in queue and should bump
913        // the other queued request down)
914        let mut third_fut = timeout(
915            Duration::from_secs(10),
916            file_scheduler.submit_single(0..30, 0),
917        )
918        .boxed();
919
920        // Finish one file, should be the in-flight first request
921        semaphore_copy.add_permits(1);
922        assert!(first_fut.await.unwrap().unwrap().len() == 10);
923        // Other requests should not be finished
924        assert!(poll!(&mut second_fut).is_pending());
925        assert!(poll!(&mut third_fut).is_pending());
926
927        // Next should be high priority request
928        semaphore_copy.add_permits(1);
929        assert!(third_fut.await.unwrap().unwrap().len() == 30);
930        assert!(poll!(&mut second_fut).is_pending());
931
932        // Finally, the low priority request
933        semaphore_copy.add_permits(1);
934        assert!(second_fut.await.unwrap().unwrap().len() == 20);
935    }
936
937    #[tokio::test(flavor = "multi_thread")]
938    async fn test_backpressure() {
939        let some_path = Path::parse("foo").unwrap();
940        let base_store = Arc::new(InMemory::new());
941        base_store
942            .put(&some_path, vec![0; 100000].into())
943            .await
944            .unwrap();
945
946        let bytes_read = Arc::new(AtomicU64::from(0));
947        let mut obj_store = MockObjectStore::default();
948        let bytes_read_copy = bytes_read.clone();
949        // Wraps the obj_store to keep track of how many bytes have been read
950        obj_store
951            .expect_get_opts()
952            .returning(move |location, options| {
953                let range = options.range.as_ref().unwrap();
954                let num_bytes = match range {
955                    GetRange::Bounded(bounded) => bounded.end - bounded.start,
956                    _ => panic!(),
957                };
958                bytes_read_copy.fetch_add(num_bytes as u64, Ordering::Release);
959                let location = location.clone();
960                let base_store = base_store.clone();
961                async move { base_store.get_opts(&location, options).await }.boxed()
962            });
963        let obj_store = Arc::new(ObjectStore::new(
964            Arc::new(obj_store),
965            Url::parse("mem://").unwrap(),
966            None,
967            None,
968            false,
969            false,
970            1,
971            DEFAULT_DOWNLOAD_RETRY_COUNT,
972        ));
973
974        let config = SchedulerConfig {
975            io_buffer_size_bytes: 10,
976        };
977
978        let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
979
980        let file_scheduler = scan_scheduler
981            .open_file(&Path::parse("foo").unwrap())
982            .await
983            .unwrap();
984
985        let wait_for_idle = || async move {
986            let handle = Handle::current();
987            while handle.metrics().num_alive_tasks() != 1 {
988                tokio::time::sleep(Duration::from_millis(10)).await;
989            }
990        };
991        let wait_for_bytes_read_and_idle = |target_bytes: u64| {
992            // We need to move `target` but don't want to move `bytes_read`
993            let bytes_read = &bytes_read;
994            async move {
995                let bytes_read_copy = bytes_read.clone();
996                while bytes_read_copy.load(Ordering::Acquire) < target_bytes {
997                    tokio::time::sleep(Duration::from_millis(10)).await;
998                }
999                wait_for_idle().await;
1000            }
1001        };
1002
1003        // This read will begin immediately
1004        let first_fut = file_scheduler.submit_single(0..5, 0);
1005        // This read should also begin immediately
1006        let second_fut = file_scheduler.submit_single(0..5, 0);
1007        // This read will be throttled
1008        let third_fut = file_scheduler.submit_single(0..3, 0);
1009        // Two tasks (third_fut and unit test)
1010        wait_for_bytes_read_and_idle(10).await;
1011
1012        assert_eq!(first_fut.await.unwrap().len(), 5);
1013        // One task (unit test)
1014        wait_for_bytes_read_and_idle(13).await;
1015
1016        // 2 bytes are ready but 5 bytes requested, read will be blocked
1017        let fourth_fut = file_scheduler.submit_single(0..5, 0);
1018        wait_for_bytes_read_and_idle(13).await;
1019
1020        // Out of order completion is ok, will unblock backpressure
1021        assert_eq!(third_fut.await.unwrap().len(), 3);
1022        wait_for_bytes_read_and_idle(18).await;
1023
1024        assert_eq!(second_fut.await.unwrap().len(), 5);
1025        // At this point there are 5 bytes available in backpressure queue
1026        // Now we issue multi-read that can be partially fulfilled, it will read some bytes but
1027        // not all of them. (using large range gap to ensure request not coalesced)
1028        //
1029        // I'm actually not sure this behavior is great.  It's possible that we should just
1030        // block until we can fulfill the entire request.
1031        let fifth_fut = file_scheduler.submit_request(vec![0..3, 90000..90007], 0);
1032        wait_for_bytes_read_and_idle(21).await;
1033
1034        // Fifth future should eventually finish due to deadlock prevention
1035        let fifth_bytes = tokio::time::timeout(Duration::from_secs(10), fifth_fut)
1036            .await
1037            .unwrap();
1038        assert_eq!(
1039            fifth_bytes.unwrap().iter().map(|b| b.len()).sum::<usize>(),
1040            10
1041        );
1042
1043        // And now let's just make sure that we can read the rest of the data
1044        assert_eq!(fourth_fut.await.unwrap().len(), 5);
1045        wait_for_bytes_read_and_idle(28).await;
1046
1047        // Ensure deadlock prevention timeout can be disabled
1048        let config = SchedulerConfig {
1049            io_buffer_size_bytes: 10,
1050        };
1051
1052        let scan_scheduler = ScanScheduler::new(obj_store, config);
1053        let file_scheduler = scan_scheduler
1054            .open_file(&Path::parse("foo").unwrap())
1055            .await
1056            .unwrap();
1057
1058        let first_fut = file_scheduler.submit_single(0..10, 0);
1059        let second_fut = file_scheduler.submit_single(0..10, 0);
1060
1061        std::thread::sleep(Duration::from_millis(100));
1062        assert_eq!(first_fut.await.unwrap().len(), 10);
1063        assert_eq!(second_fut.await.unwrap().len(), 10);
1064    }
1065
1066    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1067    async fn stress_backpressure() {
1068        // This test ensures that the backpressure mechanism works correctly with
1069        // regards to priority.  In other words, as long as all requests are consumed
1070        // in priority order then the backpressure mechanism should not deadlock
1071        let some_path = Path::parse("foo").unwrap();
1072        let obj_store = Arc::new(ObjectStore::memory());
1073        obj_store
1074            .put(&some_path, vec![0; 100000].as_slice())
1075            .await
1076            .unwrap();
1077
1078        // Only one request will be allowed in
1079        let config = SchedulerConfig {
1080            io_buffer_size_bytes: 1,
1081        };
1082        let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1083        let file_scheduler = scan_scheduler.open_file(&some_path).await.unwrap();
1084
1085        let mut futs = Vec::with_capacity(10000);
1086        for idx in 0..10000 {
1087            futs.push(file_scheduler.submit_single(idx..idx + 1, idx));
1088        }
1089
1090        for fut in futs {
1091            fut.await.unwrap();
1092        }
1093    }
1094}