datafusion_physical_plan/sorts/
sort.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Sort that deals with an arbitrary size of the input.
19//! It will do in-memory sorting if it has enough memory budget
20//! but spills to disk if needed.
21
22use std::any::Any;
23use std::fmt;
24use std::fmt::{Debug, Formatter};
25use std::sync::Arc;
26
27use crate::common::spawn_buffered;
28use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
29use crate::expressions::PhysicalSortExpr;
30use crate::limit::LimitStream;
31use crate::metrics::{
32    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
33};
34use crate::projection::{make_with_child, update_expr, ProjectionExec};
35use crate::sorts::streaming_merge::StreamingMergeBuilder;
36use crate::spill::{
37    get_record_batch_memory_size, read_spill_as_stream, spill_record_batches,
38};
39use crate::stream::RecordBatchStreamAdapter;
40use crate::topk::TopK;
41use crate::{
42    DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan,
43    ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream,
44    Statistics,
45};
46
47use arrow::array::{
48    Array, RecordBatch, RecordBatchOptions, StringViewArray, UInt32Array,
49};
50use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, SortColumn};
51use arrow::datatypes::{DataType, SchemaRef};
52use arrow::row::{RowConverter, SortField};
53use datafusion_common::{internal_err, Result};
54use datafusion_execution::disk_manager::RefCountedTempFile;
55use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
56use datafusion_execution::runtime_env::RuntimeEnv;
57use datafusion_execution::TaskContext;
58use datafusion_physical_expr::LexOrdering;
59use datafusion_physical_expr_common::sort_expr::LexRequirement;
60
61use futures::{StreamExt, TryStreamExt};
62use log::{debug, trace};
63
64struct ExternalSorterMetrics {
65    /// metrics
66    baseline: BaselineMetrics,
67
68    /// count of spills during the execution of the operator
69    spill_count: Count,
70
71    /// total spilled bytes during the execution of the operator
72    spilled_bytes: Count,
73
74    /// total spilled rows during the execution of the operator
75    spilled_rows: Count,
76}
77
78impl ExternalSorterMetrics {
79    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
80        Self {
81            baseline: BaselineMetrics::new(metrics, partition),
82            spill_count: MetricBuilder::new(metrics).spill_count(partition),
83            spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition),
84            spilled_rows: MetricBuilder::new(metrics).spilled_rows(partition),
85        }
86    }
87}
88
89/// Sorts an arbitrary sized, unsorted, stream of [`RecordBatch`]es to
90/// a total order. Depending on the input size and memory manager
91/// configuration, writes intermediate results to disk ("spills")
92/// using Arrow IPC format.
93///
94/// # Algorithm
95///
96/// 1. get a non-empty new batch from input
97///
98/// 2. check with the memory manager there is sufficient space to
99///    buffer the batch in memory 2.1 if memory sufficient, buffer
100///    batch in memory, go to 1.
101///
102/// 2.2 if no more memory is available, sort all buffered batches and
103///     spill to file.  buffer the next batch in memory, go to 1.
104///
105/// 3. when input is exhausted, merge all in memory batches and spills
106///    to get a total order.
107///
108/// # When data fits in available memory
109///
110/// If there is sufficient memory, data is sorted in memory to produce the output
111///
112/// ```text
113///    ┌─────┐
114///    │  2  │
115///    │  3  │
116///    │  1  │─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
117///    │  4  │
118///    │  2  │                  │
119///    └─────┘                  ▼
120///    ┌─────┐
121///    │  1  │              In memory
122///    │  4  │─ ─ ─ ─ ─ ─▶ sort/merge  ─ ─ ─ ─ ─▶  total sorted output
123///    │  1  │
124///    └─────┘                  ▲
125///      ...                    │
126///
127///    ┌─────┐                  │
128///    │  4  │
129///    │  3  │─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
130///    └─────┘
131///
132/// in_mem_batches
133///
134/// ```
135///
136/// # When data does not fit in available memory
137///
138///  When memory is exhausted, data is first sorted and written to one
139///  or more spill files on disk:
140///
141/// ```text
142///    ┌─────┐                               .─────────────────.
143///    │  2  │                              (                   )
144///    │  3  │                              │`─────────────────'│
145///    │  1  │─ ─ ─ ─ ─ ─ ─                 │  ┌────┐           │
146///    │  4  │             │                │  │ 1  │░          │
147///    │  2  │                              │  │... │░          │
148///    └─────┘             ▼                │  │ 4  │░  ┌ ─ ─   │
149///    ┌─────┐                              │  └────┘░    1  │░ │
150///    │  1  │         In memory            │   ░░░░░░  │    ░░ │
151///    │  4  │─ ─ ▶   sort/merge    ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─▶ ... │░ │
152///    │  1  │     and write to file        │           │    ░░ │
153///    └─────┘                              │             4  │░ │
154///      ...               ▲                │           └░─░─░░ │
155///                        │                │            ░░░░░░ │
156///    ┌─────┐                              │.─────────────────.│
157///    │  4  │             │                (                   )
158///    │  3  │─ ─ ─ ─ ─ ─ ─                  `─────────────────'
159///    └─────┘
160///
161/// in_mem_batches                                  spills
162///                                         (file on disk in Arrow
163///                                               IPC format)
164/// ```
165///
166/// Once the input is completely read, the spill files are read and
167/// merged with any in memory batches to produce a single total sorted
168/// output:
169///
170/// ```text
171///   .─────────────────.
172///  (                   )
173///  │`─────────────────'│
174///  │  ┌────┐           │
175///  │  │ 1  │░          │
176///  │  │... │─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─
177///  │  │ 4  │░ ┌────┐   │           │
178///  │  └────┘░ │ 1  │░  │           ▼
179///  │   ░░░░░░ │    │░  │
180///  │          │... │─ ─│─ ─ ─ ▶ merge  ─ ─ ─▶  total sorted output
181///  │          │    │░  │
182///  │          │ 4  │░  │           ▲
183///  │          └────┘░  │           │
184///  │           ░░░░░░  │
185///  │.─────────────────.│           │
186///  (                   )
187///   `─────────────────'            │
188///         spills
189///                                  │
190///
191///                                  │
192///
193///     ┌─────┐                      │
194///     │  1  │
195///     │  4  │─ ─ ─ ─               │
196///     └─────┘       │
197///       ...                   In memory
198///                   └ ─ ─ ─▶  sort/merge
199///     ┌─────┐
200///     │  4  │                      ▲
201///     │  3  │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
202///     └─────┘
203///
204///  in_mem_batches
205/// ```
206struct ExternalSorter {
207    // ========================================================================
208    // PROPERTIES:
209    // Fields that define the sorter's configuration and remain constant
210    // ========================================================================
211    /// Schema of the output (and the input)
212    schema: SchemaRef,
213    /// Sort expressions
214    expr: Arc<[PhysicalSortExpr]>,
215    /// If Some, the maximum number of output rows that will be produced
216    fetch: Option<usize>,
217    /// The target number of rows for output batches
218    batch_size: usize,
219    /// If the in size of buffered memory batches is below this size,
220    /// the data will be concatenated and sorted in place rather than
221    /// sort/merged.
222    sort_in_place_threshold_bytes: usize,
223
224    // ========================================================================
225    // STATE BUFFERS:
226    // Fields that hold intermediate data during sorting
227    // ========================================================================
228    /// Potentially unsorted in memory buffer
229    in_mem_batches: Vec<RecordBatch>,
230    /// if `Self::in_mem_batches` are sorted
231    in_mem_batches_sorted: bool,
232
233    /// If data has previously been spilled, the locations of the
234    /// spill files (in Arrow IPC format)
235    spills: Vec<RefCountedTempFile>,
236
237    // ========================================================================
238    // EXECUTION RESOURCES:
239    // Fields related to managing execution resources and monitoring performance.
240    // ========================================================================
241    /// Runtime metrics
242    metrics: ExternalSorterMetrics,
243    /// A handle to the runtime to get spill files
244    runtime: Arc<RuntimeEnv>,
245    /// Reservation for in_mem_batches
246    reservation: MemoryReservation,
247
248    /// Reservation for the merging of in-memory batches. If the sort
249    /// might spill, `sort_spill_reservation_bytes` will be
250    /// pre-reserved to ensure there is some space for this sort/merge.
251    merge_reservation: MemoryReservation,
252    /// How much memory to reserve for performing in-memory sort/merges
253    /// prior to spilling.
254    sort_spill_reservation_bytes: usize,
255}
256
257impl ExternalSorter {
258    // TODO: make a builder or some other nicer API to avoid the
259    // clippy warning
260    #[allow(clippy::too_many_arguments)]
261    pub fn new(
262        partition_id: usize,
263        schema: SchemaRef,
264        expr: LexOrdering,
265        batch_size: usize,
266        fetch: Option<usize>,
267        sort_spill_reservation_bytes: usize,
268        sort_in_place_threshold_bytes: usize,
269        metrics: &ExecutionPlanMetricsSet,
270        runtime: Arc<RuntimeEnv>,
271    ) -> Self {
272        let metrics = ExternalSorterMetrics::new(metrics, partition_id);
273        let reservation = MemoryConsumer::new(format!("ExternalSorter[{partition_id}]"))
274            .with_can_spill(true)
275            .register(&runtime.memory_pool);
276
277        let merge_reservation =
278            MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
279                .register(&runtime.memory_pool);
280
281        Self {
282            schema,
283            in_mem_batches: vec![],
284            in_mem_batches_sorted: false,
285            spills: vec![],
286            expr: expr.into(),
287            metrics,
288            fetch,
289            reservation,
290            merge_reservation,
291            runtime,
292            batch_size,
293            sort_spill_reservation_bytes,
294            sort_in_place_threshold_bytes,
295        }
296    }
297
298    /// Appends an unsorted [`RecordBatch`] to `in_mem_batches`
299    ///
300    /// Updates memory usage metrics, and possibly triggers spilling to disk
301    async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
302        if input.num_rows() == 0 {
303            return Ok(());
304        }
305
306        self.reserve_memory_for_merge()?;
307
308        let size = get_reserved_byte_for_record_batch(&input);
309        if self.reservation.try_grow(size).is_err() {
310            self.sort_or_spill_in_mem_batches().await?;
311            // We've already freed more than half of reserved memory,
312            // so we can grow the reservation again. There's nothing we can do
313            // if this try_grow fails.
314            self.reservation.try_grow(size)?;
315        }
316
317        self.in_mem_batches.push(input);
318        self.in_mem_batches_sorted = false;
319        Ok(())
320    }
321
322    fn spilled_before(&self) -> bool {
323        !self.spills.is_empty()
324    }
325
326    /// Returns the final sorted output of all batches inserted via
327    /// [`Self::insert_batch`] as a stream of [`RecordBatch`]es.
328    ///
329    /// This process could either be:
330    ///
331    /// 1. An in-memory sort/merge (if the input fit in memory)
332    ///
333    /// 2. A combined streaming merge incorporating both in-memory
334    ///    batches and data from spill files on disk.
335    fn sort(&mut self) -> Result<SendableRecordBatchStream> {
336        // Release the memory reserved for merge back to the pool so
337        // there is some left when `in_mem_sort_stream` requests an
338        // allocation.
339        self.merge_reservation.free();
340
341        if self.spilled_before() {
342            let mut streams = vec![];
343            if !self.in_mem_batches.is_empty() {
344                let in_mem_stream =
345                    self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
346                streams.push(in_mem_stream);
347            }
348
349            for spill in self.spills.drain(..) {
350                if !spill.path().exists() {
351                    return internal_err!("Spill file {:?} does not exist", spill.path());
352                }
353                let stream = read_spill_as_stream(spill, Arc::clone(&self.schema), 2)?;
354                streams.push(stream);
355            }
356
357            let expressions: LexOrdering = self.expr.iter().cloned().collect();
358
359            StreamingMergeBuilder::new()
360                .with_streams(streams)
361                .with_schema(Arc::clone(&self.schema))
362                .with_expressions(expressions.as_ref())
363                .with_metrics(self.metrics.baseline.clone())
364                .with_batch_size(self.batch_size)
365                .with_fetch(self.fetch)
366                .with_reservation(self.merge_reservation.new_empty())
367                .build()
368        } else {
369            self.in_mem_sort_stream(self.metrics.baseline.clone())
370        }
371    }
372
373    /// How much memory is buffered in this `ExternalSorter`?
374    fn used(&self) -> usize {
375        self.reservation.size()
376    }
377
378    /// How many bytes have been spilled to disk?
379    fn spilled_bytes(&self) -> usize {
380        self.metrics.spilled_bytes.value()
381    }
382
383    /// How many rows have been spilled to disk?
384    fn spilled_rows(&self) -> usize {
385        self.metrics.spilled_rows.value()
386    }
387
388    /// How many spill files have been created?
389    fn spill_count(&self) -> usize {
390        self.metrics.spill_count.value()
391    }
392
393    /// Writes any `in_memory_batches` to a spill file and clears
394    /// the batches. The contents of the spill file are sorted.
395    ///
396    /// Returns the amount of memory freed.
397    async fn spill(&mut self) -> Result<usize> {
398        // we could always get a chance to free some memory as long as we are holding some
399        if self.in_mem_batches.is_empty() {
400            return Ok(0);
401        }
402
403        self.organize_stringview_arrays()?;
404
405        debug!("Spilling sort data of ExternalSorter to disk whilst inserting");
406
407        let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?;
408        let batches = std::mem::take(&mut self.in_mem_batches);
409        let (spilled_rows, spilled_bytes) = spill_record_batches(
410            &batches,
411            spill_file.path().into(),
412            Arc::clone(&self.schema),
413        )?;
414        let used = self.reservation.free();
415        self.metrics.spill_count.add(1);
416        self.metrics.spilled_bytes.add(spilled_bytes);
417        self.metrics.spilled_rows.add(spilled_rows);
418        self.spills.push(spill_file);
419        Ok(used)
420    }
421
422    /// Reconstruct `self.in_mem_batches` to organize the payload buffers of each
423    /// `StringViewArray` in sequential order by calling `gc()` on them.
424    ///
425    /// Note this is a workaround until <https://github.com/apache/arrow-rs/issues/7185> is
426    /// available
427    ///
428    /// # Rationale
429    /// After (merge-based) sorting, all batches will be sorted into a single run,
430    /// but physically this sorted run is chunked into many small batches. For
431    /// `StringViewArray`s inside each sorted run, their inner buffers are not
432    /// re-constructed by default, leading to non-sequential payload locations
433    /// (permutated by `interleave()` Arrow kernel). A single payload buffer might
434    /// be shared by multiple `RecordBatch`es.
435    /// When writing each batch to disk, the writer has to write all referenced buffers,
436    /// because they have to be read back one by one to reduce memory usage. This
437    /// causes extra disk reads and writes, and potentially execution failure.
438    ///
439    /// # Example
440    /// Before sorting:
441    /// batch1 -> buffer1
442    /// batch2 -> buffer2
443    ///
444    /// sorted_batch1 -> buffer1
445    ///               -> buffer2
446    /// sorted_batch2 -> buffer1
447    ///               -> buffer2
448    ///
449    /// Then when spilling each batch, the writer has to write all referenced buffers
450    /// repeatedly.
451    fn organize_stringview_arrays(&mut self) -> Result<()> {
452        let mut organized_batches = Vec::with_capacity(self.in_mem_batches.len());
453
454        for batch in self.in_mem_batches.drain(..) {
455            let mut new_columns: Vec<Arc<dyn Array>> =
456                Vec::with_capacity(batch.num_columns());
457
458            let mut arr_mutated = false;
459            for array in batch.columns() {
460                if let Some(string_view_array) =
461                    array.as_any().downcast_ref::<StringViewArray>()
462                {
463                    let new_array = string_view_array.gc();
464                    new_columns.push(Arc::new(new_array));
465                    arr_mutated = true;
466                } else {
467                    new_columns.push(Arc::clone(array));
468                }
469            }
470
471            let organized_batch = if arr_mutated {
472                RecordBatch::try_new(batch.schema(), new_columns)?
473            } else {
474                batch
475            };
476
477            organized_batches.push(organized_batch);
478        }
479
480        self.in_mem_batches = organized_batches;
481
482        Ok(())
483    }
484
485    /// Sorts the in_mem_batches in place
486    ///
487    /// Sorting may have freed memory, especially if fetch is `Some`. If
488    /// the memory usage has dropped by a factor of 2, then we don't have
489    /// to spill. Otherwise, we spill to free up memory for inserting
490    /// more batches.
491    ///
492    /// The factor of 2 aims to avoid a degenerate case where the
493    /// memory required for `fetch` is just under the memory available,
494    // causing repeated re-sorting of data
495    async fn sort_or_spill_in_mem_batches(&mut self) -> Result<()> {
496        // Release the memory reserved for merge back to the pool so
497        // there is some left when `in_mem_sort_stream` requests an
498        // allocation. At the end of this function, memory will be
499        // reserved again for the next spill.
500        self.merge_reservation.free();
501
502        let before = self.reservation.size();
503
504        let mut sorted_stream =
505            self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
506
507        // `self.in_mem_batches` is already taken away by the sort_stream, now it is empty.
508        // We'll gradually collect the sorted stream into self.in_mem_batches, or directly
509        // write sorted batches to disk when the memory is insufficient.
510        while let Some(batch) = sorted_stream.next().await {
511            let batch = batch?;
512            let sorted_size = get_reserved_byte_for_record_batch(&batch);
513            if self.reservation.try_grow(sorted_size).is_err() {
514                // Although the reservation is not enough, the batch is
515                // already in memory, so it's okay to combine it with previously
516                // sorted batches, and spill together.
517                self.in_mem_batches.push(batch);
518                self.spill().await?; // reservation is freed in spill()
519            } else {
520                self.in_mem_batches.push(batch);
521                self.in_mem_batches_sorted = true;
522            }
523        }
524
525        // Drop early to free up memory reserved by the sorted stream, otherwise the
526        // upcoming `self.reserve_memory_for_merge()` may fail due to insufficient memory.
527        drop(sorted_stream);
528
529        // Sorting may free up some memory especially when fetch is `Some`. If we have
530        // not freed more than 50% of the memory, then we have to spill to free up more
531        // memory for inserting more batches.
532        if self.reservation.size() > before / 2 {
533            // We have not freed more than 50% of the memory, so we have to spill to
534            // free up more memory
535            self.spill().await?;
536        }
537
538        // Reserve headroom for next sort/merge
539        self.reserve_memory_for_merge()?;
540
541        Ok(())
542    }
543
544    /// Consumes in_mem_batches returning a sorted stream of
545    /// batches. This proceeds in one of two ways:
546    ///
547    /// # Small Datasets
548    ///
549    /// For "smaller" datasets, the data is first concatenated into a
550    /// single batch and then sorted. This is often faster than
551    /// sorting and then merging.
552    ///
553    /// ```text
554    ///        ┌─────┐
555    ///        │  2  │
556    ///        │  3  │
557    ///        │  1  │─ ─ ─ ─ ┐            ┌─────┐
558    ///        │  4  │                     │  2  │
559    ///        │  2  │        │            │  3  │
560    ///        └─────┘                     │  1  │             sorted output
561    ///        ┌─────┐        ▼            │  4  │                stream
562    ///        │  1  │                     │  2  │
563    ///        │  4  │─ ─▶ concat ─ ─ ─ ─ ▶│  1  │─ ─ ▶  sort  ─ ─ ─ ─ ─▶
564    ///        │  1  │                     │  4  │
565    ///        └─────┘        ▲            │  1  │
566    ///          ...          │            │ ... │
567    ///                                    │  4  │
568    ///        ┌─────┐        │            │  3  │
569    ///        │  4  │                     └─────┘
570    ///        │  3  │─ ─ ─ ─ ┘
571    ///        └─────┘
572    ///     in_mem_batches
573    /// ```
574    ///
575    /// # Larger datasets
576    ///
577    /// For larger datasets, the batches are first sorted individually
578    /// and then merged together.
579    ///
580    /// ```text
581    ///      ┌─────┐                ┌─────┐
582    ///      │  2  │                │  1  │
583    ///      │  3  │                │  2  │
584    ///      │  1  │─ ─▶  sort  ─ ─▶│  2  │─ ─ ─ ─ ─ ┐
585    ///      │  4  │                │  3  │
586    ///      │  2  │                │  4  │          │
587    ///      └─────┘                └─────┘               sorted output
588    ///      ┌─────┐                ┌─────┐          ▼       stream
589    ///      │  1  │                │  1  │
590    ///      │  4  │─ ▶  sort  ─ ─ ▶│  1  ├ ─ ─ ▶ merge  ─ ─ ─ ─▶
591    ///      │  1  │                │  4  │
592    ///      └─────┘                └─────┘          ▲
593    ///        ...       ...         ...             │
594    ///
595    ///      ┌─────┐                ┌─────┐          │
596    ///      │  4  │                │  3  │
597    ///      │  3  │─ ▶  sort  ─ ─ ▶│  4  │─ ─ ─ ─ ─ ┘
598    ///      └─────┘                └─────┘
599    ///
600    ///   in_mem_batches
601    /// ```
602    fn in_mem_sort_stream(
603        &mut self,
604        metrics: BaselineMetrics,
605    ) -> Result<SendableRecordBatchStream> {
606        if self.in_mem_batches.is_empty() {
607            return Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
608                &self.schema,
609            ))));
610        }
611
612        // The elapsed compute timer is updated when the value is dropped.
613        // There is no need for an explicit call to drop.
614        let elapsed_compute = metrics.elapsed_compute().clone();
615        let _timer = elapsed_compute.timer();
616
617        // Please pay attention that any operation inside of `in_mem_sort_stream` will
618        // not perform any memory reservation. This is for avoiding the need of handling
619        // reservation failure and spilling in the middle of the sort/merge. The memory
620        // space for batches produced by the resulting stream will be reserved by the
621        // consumer of the stream.
622
623        if self.in_mem_batches.len() == 1 {
624            let batch = self.in_mem_batches.swap_remove(0);
625            let reservation = self.reservation.take();
626            return self.sort_batch_stream(batch, metrics, reservation);
627        }
628
629        // If less than sort_in_place_threshold_bytes, concatenate and sort in place
630        if self.reservation.size() < self.sort_in_place_threshold_bytes {
631            // Concatenate memory batches together and sort
632            let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
633            self.in_mem_batches.clear();
634            self.reservation
635                .try_resize(get_reserved_byte_for_record_batch(&batch))?;
636            let reservation = self.reservation.take();
637            return self.sort_batch_stream(batch, metrics, reservation);
638        }
639
640        let streams = std::mem::take(&mut self.in_mem_batches)
641            .into_iter()
642            .map(|batch| {
643                let metrics = self.metrics.baseline.intermediate();
644                let reservation = self
645                    .reservation
646                    .split(get_reserved_byte_for_record_batch(&batch));
647                let input = self.sort_batch_stream(batch, metrics, reservation)?;
648                Ok(spawn_buffered(input, 1))
649            })
650            .collect::<Result<_>>()?;
651
652        let expressions: LexOrdering = self.expr.iter().cloned().collect();
653
654        StreamingMergeBuilder::new()
655            .with_streams(streams)
656            .with_schema(Arc::clone(&self.schema))
657            .with_expressions(expressions.as_ref())
658            .with_metrics(metrics)
659            .with_batch_size(self.batch_size)
660            .with_fetch(self.fetch)
661            .with_reservation(self.merge_reservation.new_empty())
662            .build()
663    }
664
665    /// Sorts a single `RecordBatch` into a single stream.
666    ///
667    /// `reservation` accounts for the memory used by this batch and
668    /// is released when the sort is complete
669    fn sort_batch_stream(
670        &self,
671        batch: RecordBatch,
672        metrics: BaselineMetrics,
673        reservation: MemoryReservation,
674    ) -> Result<SendableRecordBatchStream> {
675        assert_eq!(
676            get_reserved_byte_for_record_batch(&batch),
677            reservation.size()
678        );
679        let schema = batch.schema();
680
681        let fetch = self.fetch;
682        let expressions: LexOrdering = self.expr.iter().cloned().collect();
683        let stream = futures::stream::once(futures::future::lazy(move |_| {
684            let timer = metrics.elapsed_compute().timer();
685            let sorted = sort_batch(&batch, &expressions, fetch)?;
686            timer.done();
687            metrics.record_output(sorted.num_rows());
688            drop(batch);
689            drop(reservation);
690            Ok(sorted)
691        }));
692        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
693    }
694
695    /// If this sort may spill, pre-allocates
696    /// `sort_spill_reservation_bytes` of memory to guarantee memory
697    /// left for the in memory sort/merge.
698    fn reserve_memory_for_merge(&mut self) -> Result<()> {
699        // Reserve headroom for next merge sort
700        if self.runtime.disk_manager.tmp_files_enabled() {
701            let size = self.sort_spill_reservation_bytes;
702            if self.merge_reservation.size() != size {
703                self.merge_reservation.try_resize(size)?;
704            }
705        }
706
707        Ok(())
708    }
709}
710
711/// Estimate how much memory is needed to sort a `RecordBatch`.
712///
713/// This is used to pre-reserve memory for the sort/merge. The sort/merge process involves
714/// creating sorted copies of sorted columns in record batches for speeding up comparison
715/// in sorting and merging. The sorted copies are in either row format or array format.
716/// Please refer to cursor.rs and stream.rs for more details. No matter what format the
717/// sorted copies are, they will use more memory than the original record batch.
718fn get_reserved_byte_for_record_batch(batch: &RecordBatch) -> usize {
719    // 2x may not be enough for some cases, but it's a good start.
720    // If 2x is not enough, user can set a larger value for `sort_spill_reservation_bytes`
721    // to compensate for the extra memory needed.
722    get_record_batch_memory_size(batch) * 2
723}
724
725impl Debug for ExternalSorter {
726    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
727        f.debug_struct("ExternalSorter")
728            .field("memory_used", &self.used())
729            .field("spilled_bytes", &self.spilled_bytes())
730            .field("spilled_rows", &self.spilled_rows())
731            .field("spill_count", &self.spill_count())
732            .finish()
733    }
734}
735
736pub fn sort_batch(
737    batch: &RecordBatch,
738    expressions: &LexOrdering,
739    fetch: Option<usize>,
740) -> Result<RecordBatch> {
741    let sort_columns = expressions
742        .iter()
743        .map(|expr| expr.evaluate_to_sort_column(batch))
744        .collect::<Result<Vec<_>>>()?;
745
746    let indices = if is_multi_column_with_lists(&sort_columns) {
747        // lex_sort_to_indices doesn't support List with more than one column
748        // https://github.com/apache/arrow-rs/issues/5454
749        lexsort_to_indices_multi_columns(sort_columns, fetch)?
750    } else {
751        lexsort_to_indices(&sort_columns, fetch)?
752    };
753
754    let mut columns = take_arrays(batch.columns(), &indices, None)?;
755
756    // The columns may be larger than the unsorted columns in `batch` especially for variable length
757    // data types due to exponential growth when building the sort columns. We shrink the columns
758    // to prevent memory reservation failures, as well as excessive memory allocation when running
759    // merges in `SortPreservingMergeStream`.
760    columns.iter_mut().for_each(|c| {
761        c.shrink_to_fit();
762    });
763
764    let options = RecordBatchOptions::new().with_row_count(Some(indices.len()));
765    Ok(RecordBatch::try_new_with_options(
766        batch.schema(),
767        columns,
768        &options,
769    )?)
770}
771
772#[inline]
773fn is_multi_column_with_lists(sort_columns: &[SortColumn]) -> bool {
774    sort_columns.iter().any(|c| {
775        matches!(
776            c.values.data_type(),
777            DataType::List(_) | DataType::LargeList(_) | DataType::FixedSizeList(_, _)
778        )
779    })
780}
781
782pub(crate) fn lexsort_to_indices_multi_columns(
783    sort_columns: Vec<SortColumn>,
784    limit: Option<usize>,
785) -> Result<UInt32Array> {
786    let (fields, columns) = sort_columns.into_iter().fold(
787        (vec![], vec![]),
788        |(mut fields, mut columns), sort_column| {
789            fields.push(SortField::new_with_options(
790                sort_column.values.data_type().clone(),
791                sort_column.options.unwrap_or_default(),
792            ));
793            columns.push(sort_column.values);
794            (fields, columns)
795        },
796    );
797
798    // TODO reuse converter and rows, refer to TopK.
799    let converter = RowConverter::new(fields)?;
800    let rows = converter.convert_columns(&columns)?;
801    let mut sort: Vec<_> = rows.iter().enumerate().collect();
802    sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
803
804    let mut len = rows.num_rows();
805    if let Some(limit) = limit {
806        len = limit.min(len);
807    }
808    let indices =
809        UInt32Array::from_iter_values(sort.iter().take(len).map(|(i, _)| *i as u32));
810
811    Ok(indices)
812}
813
814/// Sort execution plan.
815///
816/// Support sorting datasets that are larger than the memory allotted
817/// by the memory manager, by spilling to disk.
818#[derive(Debug, Clone)]
819pub struct SortExec {
820    /// Input schema
821    pub(crate) input: Arc<dyn ExecutionPlan>,
822    /// Sort expressions
823    expr: LexOrdering,
824    /// Containing all metrics set created during sort
825    metrics_set: ExecutionPlanMetricsSet,
826    /// Preserve partitions of input plan. If false, the input partitions
827    /// will be sorted and merged into a single output partition.
828    preserve_partitioning: bool,
829    /// Fetch highest/lowest n results
830    fetch: Option<usize>,
831    /// Cache holding plan properties like equivalences, output partitioning etc.
832    cache: PlanProperties,
833}
834
835impl SortExec {
836    /// Create a new sort execution plan that produces a single,
837    /// sorted output partition.
838    pub fn new(expr: LexOrdering, input: Arc<dyn ExecutionPlan>) -> Self {
839        let preserve_partitioning = false;
840        let cache = Self::compute_properties(&input, expr.clone(), preserve_partitioning);
841        Self {
842            expr,
843            input,
844            metrics_set: ExecutionPlanMetricsSet::new(),
845            preserve_partitioning,
846            fetch: None,
847            cache,
848        }
849    }
850
851    /// Whether this `SortExec` preserves partitioning of the children
852    pub fn preserve_partitioning(&self) -> bool {
853        self.preserve_partitioning
854    }
855
856    /// Specify the partitioning behavior of this sort exec
857    ///
858    /// If `preserve_partitioning` is true, sorts each partition
859    /// individually, producing one sorted stream for each input partition.
860    ///
861    /// If `preserve_partitioning` is false, sorts and merges all
862    /// input partitions producing a single, sorted partition.
863    pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self {
864        self.preserve_partitioning = preserve_partitioning;
865        self.cache = self
866            .cache
867            .with_partitioning(Self::output_partitioning_helper(
868                &self.input,
869                self.preserve_partitioning,
870            ));
871        self
872    }
873
874    /// Modify how many rows to include in the result
875    ///
876    /// If None, then all rows will be returned, in sorted order.
877    /// If Some, then only the top `fetch` rows will be returned.
878    /// This can reduce the memory pressure required by the sort
879    /// operation since rows that are not going to be included
880    /// can be dropped.
881    pub fn with_fetch(&self, fetch: Option<usize>) -> Self {
882        let mut cache = self.cache.clone();
883        // If the SortExec can emit incrementally (that means the sort requirements
884        // and properties of the input match), the SortExec can generate its result
885        // without scanning the entire input when a fetch value exists.
886        let is_pipeline_friendly = matches!(
887            self.cache.emission_type,
888            EmissionType::Incremental | EmissionType::Both
889        );
890        if fetch.is_some() && is_pipeline_friendly {
891            cache = cache.with_boundedness(Boundedness::Bounded);
892        }
893        SortExec {
894            input: Arc::clone(&self.input),
895            expr: self.expr.clone(),
896            metrics_set: self.metrics_set.clone(),
897            preserve_partitioning: self.preserve_partitioning,
898            fetch,
899            cache,
900        }
901    }
902
903    /// Input schema
904    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
905        &self.input
906    }
907
908    /// Sort expressions
909    pub fn expr(&self) -> &LexOrdering {
910        &self.expr
911    }
912
913    /// If `Some(fetch)`, limits output to only the first "fetch" items
914    pub fn fetch(&self) -> Option<usize> {
915        self.fetch
916    }
917
918    fn output_partitioning_helper(
919        input: &Arc<dyn ExecutionPlan>,
920        preserve_partitioning: bool,
921    ) -> Partitioning {
922        // Get output partitioning:
923        if preserve_partitioning {
924            input.output_partitioning().clone()
925        } else {
926            Partitioning::UnknownPartitioning(1)
927        }
928    }
929
930    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
931    fn compute_properties(
932        input: &Arc<dyn ExecutionPlan>,
933        sort_exprs: LexOrdering,
934        preserve_partitioning: bool,
935    ) -> PlanProperties {
936        // Determine execution mode:
937        let requirement = LexRequirement::from(sort_exprs);
938        let sort_satisfied = input
939            .equivalence_properties()
940            .ordering_satisfy_requirement(&requirement);
941
942        // The emission type depends on whether the input is already sorted:
943        // - If already sorted, we can emit results in the same way as the input
944        // - If not sorted, we must wait until all data is processed to emit results (Final)
945        let emission_type = if sort_satisfied {
946            input.pipeline_behavior()
947        } else {
948            EmissionType::Final
949        };
950
951        // The boundedness depends on whether the input is already sorted:
952        // - If already sorted, we have the same property as the input
953        // - If not sorted and input is unbounded, we require infinite memory and generates
954        //   unbounded data (not practical).
955        // - If not sorted and input is bounded, then the SortExec is bounded, too.
956        let boundedness = if sort_satisfied {
957            input.boundedness()
958        } else {
959            match input.boundedness() {
960                Boundedness::Unbounded { .. } => Boundedness::Unbounded {
961                    requires_infinite_memory: true,
962                },
963                bounded => bounded,
964            }
965        };
966
967        // Calculate equivalence properties; i.e. reset the ordering equivalence
968        // class with the new ordering:
969        let sort_exprs = LexOrdering::from(requirement);
970        let eq_properties = input
971            .equivalence_properties()
972            .clone()
973            .with_reorder(sort_exprs);
974
975        // Get output partitioning:
976        let output_partitioning =
977            Self::output_partitioning_helper(input, preserve_partitioning);
978
979        PlanProperties::new(
980            eq_properties,
981            output_partitioning,
982            emission_type,
983            boundedness,
984        )
985    }
986}
987
988impl DisplayAs for SortExec {
989    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
990        match t {
991            DisplayFormatType::Default | DisplayFormatType::Verbose => {
992                let preserve_partitioning = self.preserve_partitioning;
993                match self.fetch {
994                    Some(fetch) => {
995                        write!(f, "SortExec: TopK(fetch={fetch}), expr=[{}], preserve_partitioning=[{preserve_partitioning}]", self.expr)
996                    }
997                    None => write!(f, "SortExec: expr=[{}], preserve_partitioning=[{preserve_partitioning}]", self.expr),
998                }
999            }
1000        }
1001    }
1002}
1003
1004impl ExecutionPlan for SortExec {
1005    fn name(&self) -> &'static str {
1006        "SortExec"
1007    }
1008
1009    fn as_any(&self) -> &dyn Any {
1010        self
1011    }
1012
1013    fn properties(&self) -> &PlanProperties {
1014        &self.cache
1015    }
1016
1017    fn required_input_distribution(&self) -> Vec<Distribution> {
1018        if self.preserve_partitioning {
1019            vec![Distribution::UnspecifiedDistribution]
1020        } else {
1021            // global sort
1022            // TODO support RangePartition and OrderedDistribution
1023            vec![Distribution::SinglePartition]
1024        }
1025    }
1026
1027    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1028        vec![&self.input]
1029    }
1030
1031    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
1032        vec![false]
1033    }
1034
1035    fn with_new_children(
1036        self: Arc<Self>,
1037        children: Vec<Arc<dyn ExecutionPlan>>,
1038    ) -> Result<Arc<dyn ExecutionPlan>> {
1039        let new_sort = SortExec::new(self.expr.clone(), Arc::clone(&children[0]))
1040            .with_fetch(self.fetch)
1041            .with_preserve_partitioning(self.preserve_partitioning);
1042
1043        Ok(Arc::new(new_sort))
1044    }
1045
1046    fn execute(
1047        &self,
1048        partition: usize,
1049        context: Arc<TaskContext>,
1050    ) -> Result<SendableRecordBatchStream> {
1051        trace!("Start SortExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
1052
1053        let mut input = self.input.execute(partition, Arc::clone(&context))?;
1054
1055        let execution_options = &context.session_config().options().execution;
1056
1057        trace!("End SortExec's input.execute for partition: {}", partition);
1058
1059        let sort_satisfied = self
1060            .input
1061            .equivalence_properties()
1062            .ordering_satisfy_requirement(&LexRequirement::from(self.expr.clone()));
1063
1064        match (sort_satisfied, self.fetch.as_ref()) {
1065            (true, Some(fetch)) => Ok(Box::pin(LimitStream::new(
1066                input,
1067                0,
1068                Some(*fetch),
1069                BaselineMetrics::new(&self.metrics_set, partition),
1070            ))),
1071            (true, None) => Ok(input),
1072            (false, Some(fetch)) => {
1073                let mut topk = TopK::try_new(
1074                    partition,
1075                    input.schema(),
1076                    self.expr.clone(),
1077                    *fetch,
1078                    context.session_config().batch_size(),
1079                    context.runtime_env(),
1080                    &self.metrics_set,
1081                )?;
1082                Ok(Box::pin(RecordBatchStreamAdapter::new(
1083                    self.schema(),
1084                    futures::stream::once(async move {
1085                        while let Some(batch) = input.next().await {
1086                            let batch = batch?;
1087                            topk.insert_batch(batch)?;
1088                        }
1089                        topk.emit()
1090                    })
1091                    .try_flatten(),
1092                )))
1093            }
1094            (false, None) => {
1095                let mut sorter = ExternalSorter::new(
1096                    partition,
1097                    input.schema(),
1098                    self.expr.clone(),
1099                    context.session_config().batch_size(),
1100                    self.fetch,
1101                    execution_options.sort_spill_reservation_bytes,
1102                    execution_options.sort_in_place_threshold_bytes,
1103                    &self.metrics_set,
1104                    context.runtime_env(),
1105                );
1106                Ok(Box::pin(RecordBatchStreamAdapter::new(
1107                    self.schema(),
1108                    futures::stream::once(async move {
1109                        while let Some(batch) = input.next().await {
1110                            let batch = batch?;
1111                            sorter.insert_batch(batch).await?;
1112                        }
1113                        sorter.sort()
1114                    })
1115                    .try_flatten(),
1116                )))
1117            }
1118        }
1119    }
1120
1121    fn metrics(&self) -> Option<MetricsSet> {
1122        Some(self.metrics_set.clone_inner())
1123    }
1124
1125    fn statistics(&self) -> Result<Statistics> {
1126        Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1)
1127    }
1128
1129    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
1130        Some(Arc::new(SortExec::with_fetch(self, limit)))
1131    }
1132
1133    fn fetch(&self) -> Option<usize> {
1134        self.fetch
1135    }
1136
1137    fn cardinality_effect(&self) -> CardinalityEffect {
1138        if self.fetch.is_none() {
1139            CardinalityEffect::Equal
1140        } else {
1141            CardinalityEffect::LowerEqual
1142        }
1143    }
1144
1145    /// Tries to swap the projection with its input [`SortExec`]. If it can be done,
1146    /// it returns the new swapped version having the [`SortExec`] as the top plan.
1147    /// Otherwise, it returns None.
1148    fn try_swapping_with_projection(
1149        &self,
1150        projection: &ProjectionExec,
1151    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1152        // If the projection does not narrow the schema, we should not try to push it down.
1153        if projection.expr().len() >= projection.input().schema().fields().len() {
1154            return Ok(None);
1155        }
1156
1157        let mut updated_exprs = LexOrdering::default();
1158        for sort in self.expr() {
1159            let Some(new_expr) = update_expr(&sort.expr, projection.expr(), false)?
1160            else {
1161                return Ok(None);
1162            };
1163            updated_exprs.push(PhysicalSortExpr {
1164                expr: new_expr,
1165                options: sort.options,
1166            });
1167        }
1168
1169        Ok(Some(Arc::new(
1170            SortExec::new(updated_exprs, make_with_child(projection, self.input())?)
1171                .with_fetch(self.fetch())
1172                .with_preserve_partitioning(self.preserve_partitioning()),
1173        )))
1174    }
1175}
1176
1177#[cfg(test)]
1178mod tests {
1179    use std::collections::HashMap;
1180    use std::pin::Pin;
1181    use std::task::{Context, Poll};
1182
1183    use super::*;
1184    use crate::coalesce_partitions::CoalescePartitionsExec;
1185    use crate::collect;
1186    use crate::execution_plan::Boundedness;
1187    use crate::expressions::col;
1188    use crate::test;
1189    use crate::test::assert_is_pending;
1190    use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
1191    use crate::test::TestMemoryExec;
1192
1193    use arrow::array::*;
1194    use arrow::compute::SortOptions;
1195    use arrow::datatypes::*;
1196    use datafusion_common::cast::as_primitive_array;
1197    use datafusion_common::{assert_batches_eq, Result, ScalarValue};
1198    use datafusion_execution::config::SessionConfig;
1199    use datafusion_execution::runtime_env::RuntimeEnvBuilder;
1200    use datafusion_execution::RecordBatchStream;
1201    use datafusion_physical_expr::expressions::{Column, Literal};
1202    use datafusion_physical_expr::EquivalenceProperties;
1203
1204    use futures::{FutureExt, Stream};
1205
1206    #[derive(Debug, Clone)]
1207    pub struct SortedUnboundedExec {
1208        schema: Schema,
1209        batch_size: u64,
1210        cache: PlanProperties,
1211    }
1212
1213    impl DisplayAs for SortedUnboundedExec {
1214        fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
1215            match t {
1216                DisplayFormatType::Default | DisplayFormatType::Verbose => {
1217                    write!(f, "UnboundableExec",).unwrap()
1218                }
1219            }
1220            Ok(())
1221        }
1222    }
1223
1224    impl SortedUnboundedExec {
1225        fn compute_properties(schema: SchemaRef) -> PlanProperties {
1226            let mut eq_properties = EquivalenceProperties::new(schema);
1227            eq_properties.add_new_orderings(vec![LexOrdering::new(vec![
1228                PhysicalSortExpr::new_default(Arc::new(Column::new("c1", 0))),
1229            ])]);
1230            PlanProperties::new(
1231                eq_properties,
1232                Partitioning::UnknownPartitioning(1),
1233                EmissionType::Final,
1234                Boundedness::Unbounded {
1235                    requires_infinite_memory: false,
1236                },
1237            )
1238        }
1239    }
1240
1241    impl ExecutionPlan for SortedUnboundedExec {
1242        fn name(&self) -> &'static str {
1243            Self::static_name()
1244        }
1245
1246        fn as_any(&self) -> &dyn Any {
1247            self
1248        }
1249
1250        fn properties(&self) -> &PlanProperties {
1251            &self.cache
1252        }
1253
1254        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1255            vec![]
1256        }
1257
1258        fn with_new_children(
1259            self: Arc<Self>,
1260            _: Vec<Arc<dyn ExecutionPlan>>,
1261        ) -> Result<Arc<dyn ExecutionPlan>> {
1262            Ok(self)
1263        }
1264
1265        fn execute(
1266            &self,
1267            _partition: usize,
1268            _context: Arc<TaskContext>,
1269        ) -> Result<SendableRecordBatchStream> {
1270            Ok(Box::pin(SortedUnboundedStream {
1271                schema: Arc::new(self.schema.clone()),
1272                batch_size: self.batch_size,
1273                offset: 0,
1274            }))
1275        }
1276    }
1277
1278    #[derive(Debug)]
1279    pub struct SortedUnboundedStream {
1280        schema: SchemaRef,
1281        batch_size: u64,
1282        offset: u64,
1283    }
1284
1285    impl Stream for SortedUnboundedStream {
1286        type Item = Result<RecordBatch>;
1287
1288        fn poll_next(
1289            mut self: Pin<&mut Self>,
1290            _cx: &mut Context<'_>,
1291        ) -> Poll<Option<Self::Item>> {
1292            let batch = SortedUnboundedStream::create_record_batch(
1293                Arc::clone(&self.schema),
1294                self.offset,
1295                self.batch_size,
1296            );
1297            self.offset += self.batch_size;
1298            Poll::Ready(Some(Ok(batch)))
1299        }
1300    }
1301
1302    impl RecordBatchStream for SortedUnboundedStream {
1303        fn schema(&self) -> SchemaRef {
1304            Arc::clone(&self.schema)
1305        }
1306    }
1307
1308    impl SortedUnboundedStream {
1309        fn create_record_batch(
1310            schema: SchemaRef,
1311            offset: u64,
1312            batch_size: u64,
1313        ) -> RecordBatch {
1314            let values = (0..batch_size).map(|i| offset + i).collect::<Vec<_>>();
1315            let array = UInt64Array::from(values);
1316            let array_ref: ArrayRef = Arc::new(array);
1317            RecordBatch::try_new(schema, vec![array_ref]).unwrap()
1318        }
1319    }
1320
1321    #[tokio::test]
1322    async fn test_in_mem_sort() -> Result<()> {
1323        let task_ctx = Arc::new(TaskContext::default());
1324        let partitions = 4;
1325        let csv = test::scan_partitioned(partitions);
1326        let schema = csv.schema();
1327
1328        let sort_exec = Arc::new(SortExec::new(
1329            LexOrdering::new(vec![PhysicalSortExpr {
1330                expr: col("i", &schema)?,
1331                options: SortOptions::default(),
1332            }]),
1333            Arc::new(CoalescePartitionsExec::new(csv)),
1334        ));
1335
1336        let result = collect(sort_exec, Arc::clone(&task_ctx)).await?;
1337
1338        assert_eq!(result.len(), 1);
1339        assert_eq!(result[0].num_rows(), 400);
1340
1341        assert_eq!(
1342            task_ctx.runtime_env().memory_pool.reserved(),
1343            0,
1344            "The sort should have returned all memory used back to the memory manager"
1345        );
1346
1347        Ok(())
1348    }
1349
1350    #[tokio::test]
1351    async fn test_sort_spill() -> Result<()> {
1352        // trigger spill w/ 100 batches
1353        let session_config = SessionConfig::new();
1354        let sort_spill_reservation_bytes = session_config
1355            .options()
1356            .execution
1357            .sort_spill_reservation_bytes;
1358        let runtime = RuntimeEnvBuilder::new()
1359            .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0)
1360            .build_arc()?;
1361        let task_ctx = Arc::new(
1362            TaskContext::default()
1363                .with_session_config(session_config)
1364                .with_runtime(runtime),
1365        );
1366
1367        // The input has 100 partitions, each partition has a batch containing 100 rows.
1368        // Each row has a single Int32 column with values 0..100. The total size of the
1369        // input is roughly 40000 bytes.
1370        let partitions = 100;
1371        let input = test::scan_partitioned(partitions);
1372        let schema = input.schema();
1373
1374        let sort_exec = Arc::new(SortExec::new(
1375            LexOrdering::new(vec![PhysicalSortExpr {
1376                expr: col("i", &schema)?,
1377                options: SortOptions::default(),
1378            }]),
1379            Arc::new(CoalescePartitionsExec::new(input)),
1380        ));
1381
1382        let result = collect(
1383            Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>,
1384            Arc::clone(&task_ctx),
1385        )
1386        .await?;
1387
1388        assert_eq!(result.len(), 2);
1389
1390        // Now, validate metrics
1391        let metrics = sort_exec.metrics().unwrap();
1392
1393        assert_eq!(metrics.output_rows().unwrap(), 10000);
1394        assert!(metrics.elapsed_compute().unwrap() > 0);
1395
1396        let spill_count = metrics.spill_count().unwrap();
1397        let spilled_rows = metrics.spilled_rows().unwrap();
1398        let spilled_bytes = metrics.spilled_bytes().unwrap();
1399        // Processing 40000 bytes of data using 12288 bytes of memory requires 3 spills
1400        // unless we do something really clever. It will spill roughly 9000+ rows and 36000
1401        // bytes. We leave a little wiggle room for the actual numbers.
1402        assert!((3..=10).contains(&spill_count));
1403        assert!((9000..=10000).contains(&spilled_rows));
1404        assert!((36000..=40000).contains(&spilled_bytes));
1405
1406        let columns = result[0].columns();
1407
1408        let i = as_primitive_array::<Int32Type>(&columns[0])?;
1409        assert_eq!(i.value(0), 0);
1410        assert_eq!(i.value(i.len() - 1), 81);
1411
1412        assert_eq!(
1413            task_ctx.runtime_env().memory_pool.reserved(),
1414            0,
1415            "The sort should have returned all memory used back to the memory manager"
1416        );
1417
1418        Ok(())
1419    }
1420
1421    #[tokio::test]
1422    async fn test_sort_spill_utf8_strings() -> Result<()> {
1423        let session_config = SessionConfig::new()
1424            .with_batch_size(100)
1425            .with_sort_in_place_threshold_bytes(20 * 1024)
1426            .with_sort_spill_reservation_bytes(100 * 1024);
1427        let runtime = RuntimeEnvBuilder::new()
1428            .with_memory_limit(500 * 1024, 1.0)
1429            .build_arc()?;
1430        let task_ctx = Arc::new(
1431            TaskContext::default()
1432                .with_session_config(session_config)
1433                .with_runtime(runtime),
1434        );
1435
1436        // The input has 200 partitions, each partition has a batch containing 100 rows.
1437        // Each row has a single Utf8 column, the Utf8 string values are roughly 42 bytes.
1438        // The total size of the input is roughly 8.4 KB.
1439        let input = test::scan_partitioned_utf8(200);
1440        let schema = input.schema();
1441
1442        let sort_exec = Arc::new(SortExec::new(
1443            LexOrdering::new(vec![PhysicalSortExpr {
1444                expr: col("i", &schema)?,
1445                options: SortOptions::default(),
1446            }]),
1447            Arc::new(CoalescePartitionsExec::new(input)),
1448        ));
1449
1450        let result = collect(
1451            Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>,
1452            Arc::clone(&task_ctx),
1453        )
1454        .await?;
1455
1456        let num_rows = result.iter().map(|batch| batch.num_rows()).sum::<usize>();
1457        assert_eq!(num_rows, 20000);
1458
1459        // Now, validate metrics
1460        let metrics = sort_exec.metrics().unwrap();
1461
1462        assert_eq!(metrics.output_rows().unwrap(), 20000);
1463        assert!(metrics.elapsed_compute().unwrap() > 0);
1464
1465        let spill_count = metrics.spill_count().unwrap();
1466        let spilled_rows = metrics.spilled_rows().unwrap();
1467        let spilled_bytes = metrics.spilled_bytes().unwrap();
1468
1469        // This test case is processing 840KB of data using 400KB of memory. Note
1470        // that buffered batches can't be dropped until all sorted batches are
1471        // generated, so we can only buffer `sort_spill_reservation_bytes` of sorted
1472        // batches.
1473        // The number of spills is roughly calculated as:
1474        //  `number_of_batches / (sort_spill_reservation_bytes / batch_size)`
1475        assert!((12..=18).contains(&spill_count));
1476        assert!((15000..=20000).contains(&spilled_rows));
1477        assert!((700000..=900000).contains(&spilled_bytes));
1478
1479        // Verify that the result is sorted
1480        let concated_result = concat_batches(&schema, &result)?;
1481        let columns = concated_result.columns();
1482        let string_array = as_string_array(&columns[0]);
1483        for i in 0..string_array.len() - 1 {
1484            assert!(string_array.value(i) <= string_array.value(i + 1));
1485        }
1486
1487        assert_eq!(
1488            task_ctx.runtime_env().memory_pool.reserved(),
1489            0,
1490            "The sort should have returned all memory used back to the memory manager"
1491        );
1492
1493        Ok(())
1494    }
1495
1496    #[tokio::test]
1497    async fn test_sort_fetch_memory_calculation() -> Result<()> {
1498        // This test mirrors down the size from the example above.
1499        let avg_batch_size = 400;
1500        let partitions = 4;
1501
1502        // A tuple of (fetch, expect_spillage)
1503        let test_options = vec![
1504            // Since we don't have a limit (and the memory is less than the total size of
1505            // all the batches we are processing, we expect it to spill.
1506            (None, true),
1507            // When we have a limit however, the buffered size of batches should fit in memory
1508            // since it is much lower than the total size of the input batch.
1509            (Some(1), false),
1510        ];
1511
1512        for (fetch, expect_spillage) in test_options {
1513            let session_config = SessionConfig::new();
1514            let sort_spill_reservation_bytes = session_config
1515                .options()
1516                .execution
1517                .sort_spill_reservation_bytes;
1518
1519            let runtime = RuntimeEnvBuilder::new()
1520                .with_memory_limit(
1521                    sort_spill_reservation_bytes + avg_batch_size * (partitions - 1),
1522                    1.0,
1523                )
1524                .build_arc()?;
1525            let task_ctx = Arc::new(
1526                TaskContext::default()
1527                    .with_runtime(runtime)
1528                    .with_session_config(session_config),
1529            );
1530
1531            let csv = test::scan_partitioned(partitions);
1532            let schema = csv.schema();
1533
1534            let sort_exec = Arc::new(
1535                SortExec::new(
1536                    LexOrdering::new(vec![PhysicalSortExpr {
1537                        expr: col("i", &schema)?,
1538                        options: SortOptions::default(),
1539                    }]),
1540                    Arc::new(CoalescePartitionsExec::new(csv)),
1541                )
1542                .with_fetch(fetch),
1543            );
1544
1545            let result = collect(
1546                Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>,
1547                Arc::clone(&task_ctx),
1548            )
1549            .await?;
1550            assert_eq!(result.len(), 1);
1551
1552            let metrics = sort_exec.metrics().unwrap();
1553            let did_it_spill = metrics.spill_count().unwrap_or(0) > 0;
1554            assert_eq!(did_it_spill, expect_spillage, "with fetch: {fetch:?}");
1555        }
1556        Ok(())
1557    }
1558
1559    #[tokio::test]
1560    async fn test_sort_metadata() -> Result<()> {
1561        let task_ctx = Arc::new(TaskContext::default());
1562        let field_metadata: HashMap<String, String> =
1563            vec![("foo".to_string(), "bar".to_string())]
1564                .into_iter()
1565                .collect();
1566        let schema_metadata: HashMap<String, String> =
1567            vec![("baz".to_string(), "barf".to_string())]
1568                .into_iter()
1569                .collect();
1570
1571        let mut field = Field::new("field_name", DataType::UInt64, true);
1572        field.set_metadata(field_metadata.clone());
1573        let schema = Schema::new_with_metadata(vec![field], schema_metadata.clone());
1574        let schema = Arc::new(schema);
1575
1576        let data: ArrayRef =
1577            Arc::new(vec![3, 2, 1].into_iter().map(Some).collect::<UInt64Array>());
1578
1579        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![data]).unwrap();
1580        let input =
1581            TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)
1582                .unwrap();
1583
1584        let sort_exec = Arc::new(SortExec::new(
1585            LexOrdering::new(vec![PhysicalSortExpr {
1586                expr: col("field_name", &schema)?,
1587                options: SortOptions::default(),
1588            }]),
1589            input,
1590        ));
1591
1592        let result: Vec<RecordBatch> = collect(sort_exec, task_ctx).await?;
1593
1594        let expected_data: ArrayRef =
1595            Arc::new(vec![1, 2, 3].into_iter().map(Some).collect::<UInt64Array>());
1596        let expected_batch =
1597            RecordBatch::try_new(Arc::clone(&schema), vec![expected_data]).unwrap();
1598
1599        // Data is correct
1600        assert_eq!(&vec![expected_batch], &result);
1601
1602        // explicitly ensure the metadata is present
1603        assert_eq!(result[0].schema().fields()[0].metadata(), &field_metadata);
1604        assert_eq!(result[0].schema().metadata(), &schema_metadata);
1605
1606        Ok(())
1607    }
1608
1609    #[tokio::test]
1610    async fn test_lex_sort_by_mixed_types() -> Result<()> {
1611        let task_ctx = Arc::new(TaskContext::default());
1612        let schema = Arc::new(Schema::new(vec![
1613            Field::new("a", DataType::Int32, true),
1614            Field::new(
1615                "b",
1616                DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1617                true,
1618            ),
1619        ]));
1620
1621        // define data.
1622        let batch = RecordBatch::try_new(
1623            Arc::clone(&schema),
1624            vec![
1625                Arc::new(Int32Array::from(vec![Some(2), None, Some(1), Some(2)])),
1626                Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1627                    Some(vec![Some(3)]),
1628                    Some(vec![Some(1)]),
1629                    Some(vec![Some(6), None]),
1630                    Some(vec![Some(5)]),
1631                ])),
1632            ],
1633        )?;
1634
1635        let sort_exec = Arc::new(SortExec::new(
1636            LexOrdering::new(vec![
1637                PhysicalSortExpr {
1638                    expr: col("a", &schema)?,
1639                    options: SortOptions {
1640                        descending: false,
1641                        nulls_first: true,
1642                    },
1643                },
1644                PhysicalSortExpr {
1645                    expr: col("b", &schema)?,
1646                    options: SortOptions {
1647                        descending: true,
1648                        nulls_first: false,
1649                    },
1650                },
1651            ]),
1652            TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?,
1653        ));
1654
1655        assert_eq!(DataType::Int32, *sort_exec.schema().field(0).data_type());
1656        assert_eq!(
1657            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1658            *sort_exec.schema().field(1).data_type()
1659        );
1660
1661        let result: Vec<RecordBatch> =
1662            collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
1663        let metrics = sort_exec.metrics().unwrap();
1664        assert!(metrics.elapsed_compute().unwrap() > 0);
1665        assert_eq!(metrics.output_rows().unwrap(), 4);
1666        assert_eq!(result.len(), 1);
1667
1668        let expected = RecordBatch::try_new(
1669            schema,
1670            vec![
1671                Arc::new(Int32Array::from(vec![None, Some(1), Some(2), Some(2)])),
1672                Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1673                    Some(vec![Some(1)]),
1674                    Some(vec![Some(6), None]),
1675                    Some(vec![Some(5)]),
1676                    Some(vec![Some(3)]),
1677                ])),
1678            ],
1679        )?;
1680
1681        assert_eq!(expected, result[0]);
1682
1683        Ok(())
1684    }
1685
1686    #[tokio::test]
1687    async fn test_lex_sort_by_float() -> Result<()> {
1688        let task_ctx = Arc::new(TaskContext::default());
1689        let schema = Arc::new(Schema::new(vec![
1690            Field::new("a", DataType::Float32, true),
1691            Field::new("b", DataType::Float64, true),
1692        ]));
1693
1694        // define data.
1695        let batch = RecordBatch::try_new(
1696            Arc::clone(&schema),
1697            vec![
1698                Arc::new(Float32Array::from(vec![
1699                    Some(f32::NAN),
1700                    None,
1701                    None,
1702                    Some(f32::NAN),
1703                    Some(1.0_f32),
1704                    Some(1.0_f32),
1705                    Some(2.0_f32),
1706                    Some(3.0_f32),
1707                ])),
1708                Arc::new(Float64Array::from(vec![
1709                    Some(200.0_f64),
1710                    Some(20.0_f64),
1711                    Some(10.0_f64),
1712                    Some(100.0_f64),
1713                    Some(f64::NAN),
1714                    None,
1715                    None,
1716                    Some(f64::NAN),
1717                ])),
1718            ],
1719        )?;
1720
1721        let sort_exec = Arc::new(SortExec::new(
1722            LexOrdering::new(vec![
1723                PhysicalSortExpr {
1724                    expr: col("a", &schema)?,
1725                    options: SortOptions {
1726                        descending: true,
1727                        nulls_first: true,
1728                    },
1729                },
1730                PhysicalSortExpr {
1731                    expr: col("b", &schema)?,
1732                    options: SortOptions {
1733                        descending: false,
1734                        nulls_first: false,
1735                    },
1736                },
1737            ]),
1738            TestMemoryExec::try_new_exec(&[vec![batch]], schema, None)?,
1739        ));
1740
1741        assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type());
1742        assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type());
1743
1744        let result: Vec<RecordBatch> =
1745            collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
1746        let metrics = sort_exec.metrics().unwrap();
1747        assert!(metrics.elapsed_compute().unwrap() > 0);
1748        assert_eq!(metrics.output_rows().unwrap(), 8);
1749        assert_eq!(result.len(), 1);
1750
1751        let columns = result[0].columns();
1752
1753        assert_eq!(DataType::Float32, *columns[0].data_type());
1754        assert_eq!(DataType::Float64, *columns[1].data_type());
1755
1756        let a = as_primitive_array::<Float32Type>(&columns[0])?;
1757        let b = as_primitive_array::<Float64Type>(&columns[1])?;
1758
1759        // convert result to strings to allow comparing to expected result containing NaN
1760        let result: Vec<(Option<String>, Option<String>)> = (0..result[0].num_rows())
1761            .map(|i| {
1762                let aval = if a.is_valid(i) {
1763                    Some(a.value(i).to_string())
1764                } else {
1765                    None
1766                };
1767                let bval = if b.is_valid(i) {
1768                    Some(b.value(i).to_string())
1769                } else {
1770                    None
1771                };
1772                (aval, bval)
1773            })
1774            .collect();
1775
1776        let expected: Vec<(Option<String>, Option<String>)> = vec![
1777            (None, Some("10".to_owned())),
1778            (None, Some("20".to_owned())),
1779            (Some("NaN".to_owned()), Some("100".to_owned())),
1780            (Some("NaN".to_owned()), Some("200".to_owned())),
1781            (Some("3".to_owned()), Some("NaN".to_owned())),
1782            (Some("2".to_owned()), None),
1783            (Some("1".to_owned()), Some("NaN".to_owned())),
1784            (Some("1".to_owned()), None),
1785        ];
1786
1787        assert_eq!(expected, result);
1788
1789        Ok(())
1790    }
1791
1792    #[tokio::test]
1793    async fn test_drop_cancel() -> Result<()> {
1794        let task_ctx = Arc::new(TaskContext::default());
1795        let schema =
1796            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
1797
1798        let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
1799        let refs = blocking_exec.refs();
1800        let sort_exec = Arc::new(SortExec::new(
1801            LexOrdering::new(vec![PhysicalSortExpr {
1802                expr: col("a", &schema)?,
1803                options: SortOptions::default(),
1804            }]),
1805            blocking_exec,
1806        ));
1807
1808        let fut = collect(sort_exec, Arc::clone(&task_ctx));
1809        let mut fut = fut.boxed();
1810
1811        assert_is_pending(&mut fut);
1812        drop(fut);
1813        assert_strong_count_converges_to_zero(refs).await;
1814
1815        assert_eq!(
1816            task_ctx.runtime_env().memory_pool.reserved(),
1817            0,
1818            "The sort should have returned all memory used back to the memory manager"
1819        );
1820
1821        Ok(())
1822    }
1823
1824    #[test]
1825    fn test_empty_sort_batch() {
1826        let schema = Arc::new(Schema::empty());
1827        let options = RecordBatchOptions::new().with_row_count(Some(1));
1828        let batch =
1829            RecordBatch::try_new_with_options(Arc::clone(&schema), vec![], &options)
1830                .unwrap();
1831
1832        let expressions = LexOrdering::new(vec![PhysicalSortExpr {
1833            expr: Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
1834            options: SortOptions::default(),
1835        }]);
1836
1837        let result = sort_batch(&batch, expressions.as_ref(), None).unwrap();
1838        assert_eq!(result.num_rows(), 1);
1839    }
1840
1841    #[tokio::test]
1842    async fn topk_unbounded_source() -> Result<()> {
1843        let task_ctx = Arc::new(TaskContext::default());
1844        let schema = Schema::new(vec![Field::new("c1", DataType::UInt64, false)]);
1845        let source = SortedUnboundedExec {
1846            schema: schema.clone(),
1847            batch_size: 2,
1848            cache: SortedUnboundedExec::compute_properties(Arc::new(schema.clone())),
1849        };
1850        let mut plan = SortExec::new(
1851            LexOrdering::new(vec![PhysicalSortExpr::new_default(Arc::new(Column::new(
1852                "c1", 0,
1853            )))]),
1854            Arc::new(source),
1855        );
1856        plan = plan.with_fetch(Some(9));
1857
1858        let batches = collect(Arc::new(plan), task_ctx).await?;
1859        #[rustfmt::skip]
1860        let expected = [
1861            "+----+",
1862            "| c1 |",
1863            "+----+",
1864            "| 0  |",
1865            "| 1  |",
1866            "| 2  |",
1867            "| 3  |",
1868            "| 4  |",
1869            "| 5  |",
1870            "| 6  |",
1871            "| 7  |",
1872            "| 8  |",
1873            "+----+",];
1874        assert_batches_eq!(expected, &batches);
1875        Ok(())
1876    }
1877}