1use std::any::Any;
23use std::fmt;
24use std::fmt::{Debug, Formatter};
25use std::sync::Arc;
26
27use crate::common::spawn_buffered;
28use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
29use crate::expressions::PhysicalSortExpr;
30use crate::limit::LimitStream;
31use crate::metrics::{
32 BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
33};
34use crate::projection::{make_with_child, update_expr, ProjectionExec};
35use crate::sorts::streaming_merge::StreamingMergeBuilder;
36use crate::spill::{
37 get_record_batch_memory_size, read_spill_as_stream, spill_record_batches,
38};
39use crate::stream::RecordBatchStreamAdapter;
40use crate::topk::TopK;
41use crate::{
42 DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan,
43 ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream,
44 Statistics,
45};
46
47use arrow::array::{
48 Array, RecordBatch, RecordBatchOptions, StringViewArray, UInt32Array,
49};
50use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, SortColumn};
51use arrow::datatypes::{DataType, SchemaRef};
52use arrow::row::{RowConverter, SortField};
53use datafusion_common::{internal_err, Result};
54use datafusion_execution::disk_manager::RefCountedTempFile;
55use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
56use datafusion_execution::runtime_env::RuntimeEnv;
57use datafusion_execution::TaskContext;
58use datafusion_physical_expr::LexOrdering;
59use datafusion_physical_expr_common::sort_expr::LexRequirement;
60
61use futures::{StreamExt, TryStreamExt};
62use log::{debug, trace};
63
64struct ExternalSorterMetrics {
65 baseline: BaselineMetrics,
67
68 spill_count: Count,
70
71 spilled_bytes: Count,
73
74 spilled_rows: Count,
76}
77
78impl ExternalSorterMetrics {
79 fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
80 Self {
81 baseline: BaselineMetrics::new(metrics, partition),
82 spill_count: MetricBuilder::new(metrics).spill_count(partition),
83 spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition),
84 spilled_rows: MetricBuilder::new(metrics).spilled_rows(partition),
85 }
86 }
87}
88
89struct ExternalSorter {
207 schema: SchemaRef,
213 expr: Arc<[PhysicalSortExpr]>,
215 fetch: Option<usize>,
217 batch_size: usize,
219 sort_in_place_threshold_bytes: usize,
223
224 in_mem_batches: Vec<RecordBatch>,
230 in_mem_batches_sorted: bool,
232
233 spills: Vec<RefCountedTempFile>,
236
237 metrics: ExternalSorterMetrics,
243 runtime: Arc<RuntimeEnv>,
245 reservation: MemoryReservation,
247
248 merge_reservation: MemoryReservation,
252 sort_spill_reservation_bytes: usize,
255}
256
257impl ExternalSorter {
258 #[allow(clippy::too_many_arguments)]
261 pub fn new(
262 partition_id: usize,
263 schema: SchemaRef,
264 expr: LexOrdering,
265 batch_size: usize,
266 fetch: Option<usize>,
267 sort_spill_reservation_bytes: usize,
268 sort_in_place_threshold_bytes: usize,
269 metrics: &ExecutionPlanMetricsSet,
270 runtime: Arc<RuntimeEnv>,
271 ) -> Self {
272 let metrics = ExternalSorterMetrics::new(metrics, partition_id);
273 let reservation = MemoryConsumer::new(format!("ExternalSorter[{partition_id}]"))
274 .with_can_spill(true)
275 .register(&runtime.memory_pool);
276
277 let merge_reservation =
278 MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
279 .register(&runtime.memory_pool);
280
281 Self {
282 schema,
283 in_mem_batches: vec![],
284 in_mem_batches_sorted: false,
285 spills: vec![],
286 expr: expr.into(),
287 metrics,
288 fetch,
289 reservation,
290 merge_reservation,
291 runtime,
292 batch_size,
293 sort_spill_reservation_bytes,
294 sort_in_place_threshold_bytes,
295 }
296 }
297
298 async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
302 if input.num_rows() == 0 {
303 return Ok(());
304 }
305
306 self.reserve_memory_for_merge()?;
307
308 let size = get_reserved_byte_for_record_batch(&input);
309 if self.reservation.try_grow(size).is_err() {
310 self.sort_or_spill_in_mem_batches().await?;
311 self.reservation.try_grow(size)?;
315 }
316
317 self.in_mem_batches.push(input);
318 self.in_mem_batches_sorted = false;
319 Ok(())
320 }
321
322 fn spilled_before(&self) -> bool {
323 !self.spills.is_empty()
324 }
325
326 fn sort(&mut self) -> Result<SendableRecordBatchStream> {
336 self.merge_reservation.free();
340
341 if self.spilled_before() {
342 let mut streams = vec![];
343 if !self.in_mem_batches.is_empty() {
344 let in_mem_stream =
345 self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
346 streams.push(in_mem_stream);
347 }
348
349 for spill in self.spills.drain(..) {
350 if !spill.path().exists() {
351 return internal_err!("Spill file {:?} does not exist", spill.path());
352 }
353 let stream = read_spill_as_stream(spill, Arc::clone(&self.schema), 2)?;
354 streams.push(stream);
355 }
356
357 let expressions: LexOrdering = self.expr.iter().cloned().collect();
358
359 StreamingMergeBuilder::new()
360 .with_streams(streams)
361 .with_schema(Arc::clone(&self.schema))
362 .with_expressions(expressions.as_ref())
363 .with_metrics(self.metrics.baseline.clone())
364 .with_batch_size(self.batch_size)
365 .with_fetch(self.fetch)
366 .with_reservation(self.merge_reservation.new_empty())
367 .build()
368 } else {
369 self.in_mem_sort_stream(self.metrics.baseline.clone())
370 }
371 }
372
373 fn used(&self) -> usize {
375 self.reservation.size()
376 }
377
378 fn spilled_bytes(&self) -> usize {
380 self.metrics.spilled_bytes.value()
381 }
382
383 fn spilled_rows(&self) -> usize {
385 self.metrics.spilled_rows.value()
386 }
387
388 fn spill_count(&self) -> usize {
390 self.metrics.spill_count.value()
391 }
392
393 async fn spill(&mut self) -> Result<usize> {
398 if self.in_mem_batches.is_empty() {
400 return Ok(0);
401 }
402
403 self.organize_stringview_arrays()?;
404
405 debug!("Spilling sort data of ExternalSorter to disk whilst inserting");
406
407 let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?;
408 let batches = std::mem::take(&mut self.in_mem_batches);
409 let (spilled_rows, spilled_bytes) = spill_record_batches(
410 &batches,
411 spill_file.path().into(),
412 Arc::clone(&self.schema),
413 )?;
414 let used = self.reservation.free();
415 self.metrics.spill_count.add(1);
416 self.metrics.spilled_bytes.add(spilled_bytes);
417 self.metrics.spilled_rows.add(spilled_rows);
418 self.spills.push(spill_file);
419 Ok(used)
420 }
421
422 fn organize_stringview_arrays(&mut self) -> Result<()> {
452 let mut organized_batches = Vec::with_capacity(self.in_mem_batches.len());
453
454 for batch in self.in_mem_batches.drain(..) {
455 let mut new_columns: Vec<Arc<dyn Array>> =
456 Vec::with_capacity(batch.num_columns());
457
458 let mut arr_mutated = false;
459 for array in batch.columns() {
460 if let Some(string_view_array) =
461 array.as_any().downcast_ref::<StringViewArray>()
462 {
463 let new_array = string_view_array.gc();
464 new_columns.push(Arc::new(new_array));
465 arr_mutated = true;
466 } else {
467 new_columns.push(Arc::clone(array));
468 }
469 }
470
471 let organized_batch = if arr_mutated {
472 RecordBatch::try_new(batch.schema(), new_columns)?
473 } else {
474 batch
475 };
476
477 organized_batches.push(organized_batch);
478 }
479
480 self.in_mem_batches = organized_batches;
481
482 Ok(())
483 }
484
485 async fn sort_or_spill_in_mem_batches(&mut self) -> Result<()> {
496 self.merge_reservation.free();
501
502 let before = self.reservation.size();
503
504 let mut sorted_stream =
505 self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
506
507 while let Some(batch) = sorted_stream.next().await {
511 let batch = batch?;
512 let sorted_size = get_reserved_byte_for_record_batch(&batch);
513 if self.reservation.try_grow(sorted_size).is_err() {
514 self.in_mem_batches.push(batch);
518 self.spill().await?; } else {
520 self.in_mem_batches.push(batch);
521 self.in_mem_batches_sorted = true;
522 }
523 }
524
525 drop(sorted_stream);
528
529 if self.reservation.size() > before / 2 {
533 self.spill().await?;
536 }
537
538 self.reserve_memory_for_merge()?;
540
541 Ok(())
542 }
543
544 fn in_mem_sort_stream(
603 &mut self,
604 metrics: BaselineMetrics,
605 ) -> Result<SendableRecordBatchStream> {
606 if self.in_mem_batches.is_empty() {
607 return Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
608 &self.schema,
609 ))));
610 }
611
612 let elapsed_compute = metrics.elapsed_compute().clone();
615 let _timer = elapsed_compute.timer();
616
617 if self.in_mem_batches.len() == 1 {
624 let batch = self.in_mem_batches.swap_remove(0);
625 let reservation = self.reservation.take();
626 return self.sort_batch_stream(batch, metrics, reservation);
627 }
628
629 if self.reservation.size() < self.sort_in_place_threshold_bytes {
631 let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
633 self.in_mem_batches.clear();
634 self.reservation
635 .try_resize(get_reserved_byte_for_record_batch(&batch))?;
636 let reservation = self.reservation.take();
637 return self.sort_batch_stream(batch, metrics, reservation);
638 }
639
640 let streams = std::mem::take(&mut self.in_mem_batches)
641 .into_iter()
642 .map(|batch| {
643 let metrics = self.metrics.baseline.intermediate();
644 let reservation = self
645 .reservation
646 .split(get_reserved_byte_for_record_batch(&batch));
647 let input = self.sort_batch_stream(batch, metrics, reservation)?;
648 Ok(spawn_buffered(input, 1))
649 })
650 .collect::<Result<_>>()?;
651
652 let expressions: LexOrdering = self.expr.iter().cloned().collect();
653
654 StreamingMergeBuilder::new()
655 .with_streams(streams)
656 .with_schema(Arc::clone(&self.schema))
657 .with_expressions(expressions.as_ref())
658 .with_metrics(metrics)
659 .with_batch_size(self.batch_size)
660 .with_fetch(self.fetch)
661 .with_reservation(self.merge_reservation.new_empty())
662 .build()
663 }
664
665 fn sort_batch_stream(
670 &self,
671 batch: RecordBatch,
672 metrics: BaselineMetrics,
673 reservation: MemoryReservation,
674 ) -> Result<SendableRecordBatchStream> {
675 assert_eq!(
676 get_reserved_byte_for_record_batch(&batch),
677 reservation.size()
678 );
679 let schema = batch.schema();
680
681 let fetch = self.fetch;
682 let expressions: LexOrdering = self.expr.iter().cloned().collect();
683 let stream = futures::stream::once(futures::future::lazy(move |_| {
684 let timer = metrics.elapsed_compute().timer();
685 let sorted = sort_batch(&batch, &expressions, fetch)?;
686 timer.done();
687 metrics.record_output(sorted.num_rows());
688 drop(batch);
689 drop(reservation);
690 Ok(sorted)
691 }));
692 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
693 }
694
695 fn reserve_memory_for_merge(&mut self) -> Result<()> {
699 if self.runtime.disk_manager.tmp_files_enabled() {
701 let size = self.sort_spill_reservation_bytes;
702 if self.merge_reservation.size() != size {
703 self.merge_reservation.try_resize(size)?;
704 }
705 }
706
707 Ok(())
708 }
709}
710
711fn get_reserved_byte_for_record_batch(batch: &RecordBatch) -> usize {
719 get_record_batch_memory_size(batch) * 2
723}
724
725impl Debug for ExternalSorter {
726 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
727 f.debug_struct("ExternalSorter")
728 .field("memory_used", &self.used())
729 .field("spilled_bytes", &self.spilled_bytes())
730 .field("spilled_rows", &self.spilled_rows())
731 .field("spill_count", &self.spill_count())
732 .finish()
733 }
734}
735
736pub fn sort_batch(
737 batch: &RecordBatch,
738 expressions: &LexOrdering,
739 fetch: Option<usize>,
740) -> Result<RecordBatch> {
741 let sort_columns = expressions
742 .iter()
743 .map(|expr| expr.evaluate_to_sort_column(batch))
744 .collect::<Result<Vec<_>>>()?;
745
746 let indices = if is_multi_column_with_lists(&sort_columns) {
747 lexsort_to_indices_multi_columns(sort_columns, fetch)?
750 } else {
751 lexsort_to_indices(&sort_columns, fetch)?
752 };
753
754 let mut columns = take_arrays(batch.columns(), &indices, None)?;
755
756 columns.iter_mut().for_each(|c| {
761 c.shrink_to_fit();
762 });
763
764 let options = RecordBatchOptions::new().with_row_count(Some(indices.len()));
765 Ok(RecordBatch::try_new_with_options(
766 batch.schema(),
767 columns,
768 &options,
769 )?)
770}
771
772#[inline]
773fn is_multi_column_with_lists(sort_columns: &[SortColumn]) -> bool {
774 sort_columns.iter().any(|c| {
775 matches!(
776 c.values.data_type(),
777 DataType::List(_) | DataType::LargeList(_) | DataType::FixedSizeList(_, _)
778 )
779 })
780}
781
782pub(crate) fn lexsort_to_indices_multi_columns(
783 sort_columns: Vec<SortColumn>,
784 limit: Option<usize>,
785) -> Result<UInt32Array> {
786 let (fields, columns) = sort_columns.into_iter().fold(
787 (vec![], vec![]),
788 |(mut fields, mut columns), sort_column| {
789 fields.push(SortField::new_with_options(
790 sort_column.values.data_type().clone(),
791 sort_column.options.unwrap_or_default(),
792 ));
793 columns.push(sort_column.values);
794 (fields, columns)
795 },
796 );
797
798 let converter = RowConverter::new(fields)?;
800 let rows = converter.convert_columns(&columns)?;
801 let mut sort: Vec<_> = rows.iter().enumerate().collect();
802 sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
803
804 let mut len = rows.num_rows();
805 if let Some(limit) = limit {
806 len = limit.min(len);
807 }
808 let indices =
809 UInt32Array::from_iter_values(sort.iter().take(len).map(|(i, _)| *i as u32));
810
811 Ok(indices)
812}
813
814#[derive(Debug, Clone)]
819pub struct SortExec {
820 pub(crate) input: Arc<dyn ExecutionPlan>,
822 expr: LexOrdering,
824 metrics_set: ExecutionPlanMetricsSet,
826 preserve_partitioning: bool,
829 fetch: Option<usize>,
831 cache: PlanProperties,
833}
834
835impl SortExec {
836 pub fn new(expr: LexOrdering, input: Arc<dyn ExecutionPlan>) -> Self {
839 let preserve_partitioning = false;
840 let cache = Self::compute_properties(&input, expr.clone(), preserve_partitioning);
841 Self {
842 expr,
843 input,
844 metrics_set: ExecutionPlanMetricsSet::new(),
845 preserve_partitioning,
846 fetch: None,
847 cache,
848 }
849 }
850
851 pub fn preserve_partitioning(&self) -> bool {
853 self.preserve_partitioning
854 }
855
856 pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self {
864 self.preserve_partitioning = preserve_partitioning;
865 self.cache = self
866 .cache
867 .with_partitioning(Self::output_partitioning_helper(
868 &self.input,
869 self.preserve_partitioning,
870 ));
871 self
872 }
873
874 pub fn with_fetch(&self, fetch: Option<usize>) -> Self {
882 let mut cache = self.cache.clone();
883 let is_pipeline_friendly = matches!(
887 self.cache.emission_type,
888 EmissionType::Incremental | EmissionType::Both
889 );
890 if fetch.is_some() && is_pipeline_friendly {
891 cache = cache.with_boundedness(Boundedness::Bounded);
892 }
893 SortExec {
894 input: Arc::clone(&self.input),
895 expr: self.expr.clone(),
896 metrics_set: self.metrics_set.clone(),
897 preserve_partitioning: self.preserve_partitioning,
898 fetch,
899 cache,
900 }
901 }
902
903 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
905 &self.input
906 }
907
908 pub fn expr(&self) -> &LexOrdering {
910 &self.expr
911 }
912
913 pub fn fetch(&self) -> Option<usize> {
915 self.fetch
916 }
917
918 fn output_partitioning_helper(
919 input: &Arc<dyn ExecutionPlan>,
920 preserve_partitioning: bool,
921 ) -> Partitioning {
922 if preserve_partitioning {
924 input.output_partitioning().clone()
925 } else {
926 Partitioning::UnknownPartitioning(1)
927 }
928 }
929
930 fn compute_properties(
932 input: &Arc<dyn ExecutionPlan>,
933 sort_exprs: LexOrdering,
934 preserve_partitioning: bool,
935 ) -> PlanProperties {
936 let requirement = LexRequirement::from(sort_exprs);
938 let sort_satisfied = input
939 .equivalence_properties()
940 .ordering_satisfy_requirement(&requirement);
941
942 let emission_type = if sort_satisfied {
946 input.pipeline_behavior()
947 } else {
948 EmissionType::Final
949 };
950
951 let boundedness = if sort_satisfied {
957 input.boundedness()
958 } else {
959 match input.boundedness() {
960 Boundedness::Unbounded { .. } => Boundedness::Unbounded {
961 requires_infinite_memory: true,
962 },
963 bounded => bounded,
964 }
965 };
966
967 let sort_exprs = LexOrdering::from(requirement);
970 let eq_properties = input
971 .equivalence_properties()
972 .clone()
973 .with_reorder(sort_exprs);
974
975 let output_partitioning =
977 Self::output_partitioning_helper(input, preserve_partitioning);
978
979 PlanProperties::new(
980 eq_properties,
981 output_partitioning,
982 emission_type,
983 boundedness,
984 )
985 }
986}
987
988impl DisplayAs for SortExec {
989 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
990 match t {
991 DisplayFormatType::Default | DisplayFormatType::Verbose => {
992 let preserve_partitioning = self.preserve_partitioning;
993 match self.fetch {
994 Some(fetch) => {
995 write!(f, "SortExec: TopK(fetch={fetch}), expr=[{}], preserve_partitioning=[{preserve_partitioning}]", self.expr)
996 }
997 None => write!(f, "SortExec: expr=[{}], preserve_partitioning=[{preserve_partitioning}]", self.expr),
998 }
999 }
1000 }
1001 }
1002}
1003
1004impl ExecutionPlan for SortExec {
1005 fn name(&self) -> &'static str {
1006 "SortExec"
1007 }
1008
1009 fn as_any(&self) -> &dyn Any {
1010 self
1011 }
1012
1013 fn properties(&self) -> &PlanProperties {
1014 &self.cache
1015 }
1016
1017 fn required_input_distribution(&self) -> Vec<Distribution> {
1018 if self.preserve_partitioning {
1019 vec![Distribution::UnspecifiedDistribution]
1020 } else {
1021 vec![Distribution::SinglePartition]
1024 }
1025 }
1026
1027 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1028 vec![&self.input]
1029 }
1030
1031 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
1032 vec![false]
1033 }
1034
1035 fn with_new_children(
1036 self: Arc<Self>,
1037 children: Vec<Arc<dyn ExecutionPlan>>,
1038 ) -> Result<Arc<dyn ExecutionPlan>> {
1039 let new_sort = SortExec::new(self.expr.clone(), Arc::clone(&children[0]))
1040 .with_fetch(self.fetch)
1041 .with_preserve_partitioning(self.preserve_partitioning);
1042
1043 Ok(Arc::new(new_sort))
1044 }
1045
1046 fn execute(
1047 &self,
1048 partition: usize,
1049 context: Arc<TaskContext>,
1050 ) -> Result<SendableRecordBatchStream> {
1051 trace!("Start SortExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
1052
1053 let mut input = self.input.execute(partition, Arc::clone(&context))?;
1054
1055 let execution_options = &context.session_config().options().execution;
1056
1057 trace!("End SortExec's input.execute for partition: {}", partition);
1058
1059 let sort_satisfied = self
1060 .input
1061 .equivalence_properties()
1062 .ordering_satisfy_requirement(&LexRequirement::from(self.expr.clone()));
1063
1064 match (sort_satisfied, self.fetch.as_ref()) {
1065 (true, Some(fetch)) => Ok(Box::pin(LimitStream::new(
1066 input,
1067 0,
1068 Some(*fetch),
1069 BaselineMetrics::new(&self.metrics_set, partition),
1070 ))),
1071 (true, None) => Ok(input),
1072 (false, Some(fetch)) => {
1073 let mut topk = TopK::try_new(
1074 partition,
1075 input.schema(),
1076 self.expr.clone(),
1077 *fetch,
1078 context.session_config().batch_size(),
1079 context.runtime_env(),
1080 &self.metrics_set,
1081 )?;
1082 Ok(Box::pin(RecordBatchStreamAdapter::new(
1083 self.schema(),
1084 futures::stream::once(async move {
1085 while let Some(batch) = input.next().await {
1086 let batch = batch?;
1087 topk.insert_batch(batch)?;
1088 }
1089 topk.emit()
1090 })
1091 .try_flatten(),
1092 )))
1093 }
1094 (false, None) => {
1095 let mut sorter = ExternalSorter::new(
1096 partition,
1097 input.schema(),
1098 self.expr.clone(),
1099 context.session_config().batch_size(),
1100 self.fetch,
1101 execution_options.sort_spill_reservation_bytes,
1102 execution_options.sort_in_place_threshold_bytes,
1103 &self.metrics_set,
1104 context.runtime_env(),
1105 );
1106 Ok(Box::pin(RecordBatchStreamAdapter::new(
1107 self.schema(),
1108 futures::stream::once(async move {
1109 while let Some(batch) = input.next().await {
1110 let batch = batch?;
1111 sorter.insert_batch(batch).await?;
1112 }
1113 sorter.sort()
1114 })
1115 .try_flatten(),
1116 )))
1117 }
1118 }
1119 }
1120
1121 fn metrics(&self) -> Option<MetricsSet> {
1122 Some(self.metrics_set.clone_inner())
1123 }
1124
1125 fn statistics(&self) -> Result<Statistics> {
1126 Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1)
1127 }
1128
1129 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
1130 Some(Arc::new(SortExec::with_fetch(self, limit)))
1131 }
1132
1133 fn fetch(&self) -> Option<usize> {
1134 self.fetch
1135 }
1136
1137 fn cardinality_effect(&self) -> CardinalityEffect {
1138 if self.fetch.is_none() {
1139 CardinalityEffect::Equal
1140 } else {
1141 CardinalityEffect::LowerEqual
1142 }
1143 }
1144
1145 fn try_swapping_with_projection(
1149 &self,
1150 projection: &ProjectionExec,
1151 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1152 if projection.expr().len() >= projection.input().schema().fields().len() {
1154 return Ok(None);
1155 }
1156
1157 let mut updated_exprs = LexOrdering::default();
1158 for sort in self.expr() {
1159 let Some(new_expr) = update_expr(&sort.expr, projection.expr(), false)?
1160 else {
1161 return Ok(None);
1162 };
1163 updated_exprs.push(PhysicalSortExpr {
1164 expr: new_expr,
1165 options: sort.options,
1166 });
1167 }
1168
1169 Ok(Some(Arc::new(
1170 SortExec::new(updated_exprs, make_with_child(projection, self.input())?)
1171 .with_fetch(self.fetch())
1172 .with_preserve_partitioning(self.preserve_partitioning()),
1173 )))
1174 }
1175}
1176
1177#[cfg(test)]
1178mod tests {
1179 use std::collections::HashMap;
1180 use std::pin::Pin;
1181 use std::task::{Context, Poll};
1182
1183 use super::*;
1184 use crate::coalesce_partitions::CoalescePartitionsExec;
1185 use crate::collect;
1186 use crate::execution_plan::Boundedness;
1187 use crate::expressions::col;
1188 use crate::test;
1189 use crate::test::assert_is_pending;
1190 use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
1191 use crate::test::TestMemoryExec;
1192
1193 use arrow::array::*;
1194 use arrow::compute::SortOptions;
1195 use arrow::datatypes::*;
1196 use datafusion_common::cast::as_primitive_array;
1197 use datafusion_common::{assert_batches_eq, Result, ScalarValue};
1198 use datafusion_execution::config::SessionConfig;
1199 use datafusion_execution::runtime_env::RuntimeEnvBuilder;
1200 use datafusion_execution::RecordBatchStream;
1201 use datafusion_physical_expr::expressions::{Column, Literal};
1202 use datafusion_physical_expr::EquivalenceProperties;
1203
1204 use futures::{FutureExt, Stream};
1205
1206 #[derive(Debug, Clone)]
1207 pub struct SortedUnboundedExec {
1208 schema: Schema,
1209 batch_size: u64,
1210 cache: PlanProperties,
1211 }
1212
1213 impl DisplayAs for SortedUnboundedExec {
1214 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
1215 match t {
1216 DisplayFormatType::Default | DisplayFormatType::Verbose => {
1217 write!(f, "UnboundableExec",).unwrap()
1218 }
1219 }
1220 Ok(())
1221 }
1222 }
1223
1224 impl SortedUnboundedExec {
1225 fn compute_properties(schema: SchemaRef) -> PlanProperties {
1226 let mut eq_properties = EquivalenceProperties::new(schema);
1227 eq_properties.add_new_orderings(vec![LexOrdering::new(vec![
1228 PhysicalSortExpr::new_default(Arc::new(Column::new("c1", 0))),
1229 ])]);
1230 PlanProperties::new(
1231 eq_properties,
1232 Partitioning::UnknownPartitioning(1),
1233 EmissionType::Final,
1234 Boundedness::Unbounded {
1235 requires_infinite_memory: false,
1236 },
1237 )
1238 }
1239 }
1240
1241 impl ExecutionPlan for SortedUnboundedExec {
1242 fn name(&self) -> &'static str {
1243 Self::static_name()
1244 }
1245
1246 fn as_any(&self) -> &dyn Any {
1247 self
1248 }
1249
1250 fn properties(&self) -> &PlanProperties {
1251 &self.cache
1252 }
1253
1254 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1255 vec![]
1256 }
1257
1258 fn with_new_children(
1259 self: Arc<Self>,
1260 _: Vec<Arc<dyn ExecutionPlan>>,
1261 ) -> Result<Arc<dyn ExecutionPlan>> {
1262 Ok(self)
1263 }
1264
1265 fn execute(
1266 &self,
1267 _partition: usize,
1268 _context: Arc<TaskContext>,
1269 ) -> Result<SendableRecordBatchStream> {
1270 Ok(Box::pin(SortedUnboundedStream {
1271 schema: Arc::new(self.schema.clone()),
1272 batch_size: self.batch_size,
1273 offset: 0,
1274 }))
1275 }
1276 }
1277
1278 #[derive(Debug)]
1279 pub struct SortedUnboundedStream {
1280 schema: SchemaRef,
1281 batch_size: u64,
1282 offset: u64,
1283 }
1284
1285 impl Stream for SortedUnboundedStream {
1286 type Item = Result<RecordBatch>;
1287
1288 fn poll_next(
1289 mut self: Pin<&mut Self>,
1290 _cx: &mut Context<'_>,
1291 ) -> Poll<Option<Self::Item>> {
1292 let batch = SortedUnboundedStream::create_record_batch(
1293 Arc::clone(&self.schema),
1294 self.offset,
1295 self.batch_size,
1296 );
1297 self.offset += self.batch_size;
1298 Poll::Ready(Some(Ok(batch)))
1299 }
1300 }
1301
1302 impl RecordBatchStream for SortedUnboundedStream {
1303 fn schema(&self) -> SchemaRef {
1304 Arc::clone(&self.schema)
1305 }
1306 }
1307
1308 impl SortedUnboundedStream {
1309 fn create_record_batch(
1310 schema: SchemaRef,
1311 offset: u64,
1312 batch_size: u64,
1313 ) -> RecordBatch {
1314 let values = (0..batch_size).map(|i| offset + i).collect::<Vec<_>>();
1315 let array = UInt64Array::from(values);
1316 let array_ref: ArrayRef = Arc::new(array);
1317 RecordBatch::try_new(schema, vec![array_ref]).unwrap()
1318 }
1319 }
1320
1321 #[tokio::test]
1322 async fn test_in_mem_sort() -> Result<()> {
1323 let task_ctx = Arc::new(TaskContext::default());
1324 let partitions = 4;
1325 let csv = test::scan_partitioned(partitions);
1326 let schema = csv.schema();
1327
1328 let sort_exec = Arc::new(SortExec::new(
1329 LexOrdering::new(vec![PhysicalSortExpr {
1330 expr: col("i", &schema)?,
1331 options: SortOptions::default(),
1332 }]),
1333 Arc::new(CoalescePartitionsExec::new(csv)),
1334 ));
1335
1336 let result = collect(sort_exec, Arc::clone(&task_ctx)).await?;
1337
1338 assert_eq!(result.len(), 1);
1339 assert_eq!(result[0].num_rows(), 400);
1340
1341 assert_eq!(
1342 task_ctx.runtime_env().memory_pool.reserved(),
1343 0,
1344 "The sort should have returned all memory used back to the memory manager"
1345 );
1346
1347 Ok(())
1348 }
1349
1350 #[tokio::test]
1351 async fn test_sort_spill() -> Result<()> {
1352 let session_config = SessionConfig::new();
1354 let sort_spill_reservation_bytes = session_config
1355 .options()
1356 .execution
1357 .sort_spill_reservation_bytes;
1358 let runtime = RuntimeEnvBuilder::new()
1359 .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0)
1360 .build_arc()?;
1361 let task_ctx = Arc::new(
1362 TaskContext::default()
1363 .with_session_config(session_config)
1364 .with_runtime(runtime),
1365 );
1366
1367 let partitions = 100;
1371 let input = test::scan_partitioned(partitions);
1372 let schema = input.schema();
1373
1374 let sort_exec = Arc::new(SortExec::new(
1375 LexOrdering::new(vec![PhysicalSortExpr {
1376 expr: col("i", &schema)?,
1377 options: SortOptions::default(),
1378 }]),
1379 Arc::new(CoalescePartitionsExec::new(input)),
1380 ));
1381
1382 let result = collect(
1383 Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>,
1384 Arc::clone(&task_ctx),
1385 )
1386 .await?;
1387
1388 assert_eq!(result.len(), 2);
1389
1390 let metrics = sort_exec.metrics().unwrap();
1392
1393 assert_eq!(metrics.output_rows().unwrap(), 10000);
1394 assert!(metrics.elapsed_compute().unwrap() > 0);
1395
1396 let spill_count = metrics.spill_count().unwrap();
1397 let spilled_rows = metrics.spilled_rows().unwrap();
1398 let spilled_bytes = metrics.spilled_bytes().unwrap();
1399 assert!((3..=10).contains(&spill_count));
1403 assert!((9000..=10000).contains(&spilled_rows));
1404 assert!((36000..=40000).contains(&spilled_bytes));
1405
1406 let columns = result[0].columns();
1407
1408 let i = as_primitive_array::<Int32Type>(&columns[0])?;
1409 assert_eq!(i.value(0), 0);
1410 assert_eq!(i.value(i.len() - 1), 81);
1411
1412 assert_eq!(
1413 task_ctx.runtime_env().memory_pool.reserved(),
1414 0,
1415 "The sort should have returned all memory used back to the memory manager"
1416 );
1417
1418 Ok(())
1419 }
1420
1421 #[tokio::test]
1422 async fn test_sort_spill_utf8_strings() -> Result<()> {
1423 let session_config = SessionConfig::new()
1424 .with_batch_size(100)
1425 .with_sort_in_place_threshold_bytes(20 * 1024)
1426 .with_sort_spill_reservation_bytes(100 * 1024);
1427 let runtime = RuntimeEnvBuilder::new()
1428 .with_memory_limit(500 * 1024, 1.0)
1429 .build_arc()?;
1430 let task_ctx = Arc::new(
1431 TaskContext::default()
1432 .with_session_config(session_config)
1433 .with_runtime(runtime),
1434 );
1435
1436 let input = test::scan_partitioned_utf8(200);
1440 let schema = input.schema();
1441
1442 let sort_exec = Arc::new(SortExec::new(
1443 LexOrdering::new(vec![PhysicalSortExpr {
1444 expr: col("i", &schema)?,
1445 options: SortOptions::default(),
1446 }]),
1447 Arc::new(CoalescePartitionsExec::new(input)),
1448 ));
1449
1450 let result = collect(
1451 Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>,
1452 Arc::clone(&task_ctx),
1453 )
1454 .await?;
1455
1456 let num_rows = result.iter().map(|batch| batch.num_rows()).sum::<usize>();
1457 assert_eq!(num_rows, 20000);
1458
1459 let metrics = sort_exec.metrics().unwrap();
1461
1462 assert_eq!(metrics.output_rows().unwrap(), 20000);
1463 assert!(metrics.elapsed_compute().unwrap() > 0);
1464
1465 let spill_count = metrics.spill_count().unwrap();
1466 let spilled_rows = metrics.spilled_rows().unwrap();
1467 let spilled_bytes = metrics.spilled_bytes().unwrap();
1468
1469 assert!((12..=18).contains(&spill_count));
1476 assert!((15000..=20000).contains(&spilled_rows));
1477 assert!((700000..=900000).contains(&spilled_bytes));
1478
1479 let concated_result = concat_batches(&schema, &result)?;
1481 let columns = concated_result.columns();
1482 let string_array = as_string_array(&columns[0]);
1483 for i in 0..string_array.len() - 1 {
1484 assert!(string_array.value(i) <= string_array.value(i + 1));
1485 }
1486
1487 assert_eq!(
1488 task_ctx.runtime_env().memory_pool.reserved(),
1489 0,
1490 "The sort should have returned all memory used back to the memory manager"
1491 );
1492
1493 Ok(())
1494 }
1495
1496 #[tokio::test]
1497 async fn test_sort_fetch_memory_calculation() -> Result<()> {
1498 let avg_batch_size = 400;
1500 let partitions = 4;
1501
1502 let test_options = vec![
1504 (None, true),
1507 (Some(1), false),
1510 ];
1511
1512 for (fetch, expect_spillage) in test_options {
1513 let session_config = SessionConfig::new();
1514 let sort_spill_reservation_bytes = session_config
1515 .options()
1516 .execution
1517 .sort_spill_reservation_bytes;
1518
1519 let runtime = RuntimeEnvBuilder::new()
1520 .with_memory_limit(
1521 sort_spill_reservation_bytes + avg_batch_size * (partitions - 1),
1522 1.0,
1523 )
1524 .build_arc()?;
1525 let task_ctx = Arc::new(
1526 TaskContext::default()
1527 .with_runtime(runtime)
1528 .with_session_config(session_config),
1529 );
1530
1531 let csv = test::scan_partitioned(partitions);
1532 let schema = csv.schema();
1533
1534 let sort_exec = Arc::new(
1535 SortExec::new(
1536 LexOrdering::new(vec![PhysicalSortExpr {
1537 expr: col("i", &schema)?,
1538 options: SortOptions::default(),
1539 }]),
1540 Arc::new(CoalescePartitionsExec::new(csv)),
1541 )
1542 .with_fetch(fetch),
1543 );
1544
1545 let result = collect(
1546 Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>,
1547 Arc::clone(&task_ctx),
1548 )
1549 .await?;
1550 assert_eq!(result.len(), 1);
1551
1552 let metrics = sort_exec.metrics().unwrap();
1553 let did_it_spill = metrics.spill_count().unwrap_or(0) > 0;
1554 assert_eq!(did_it_spill, expect_spillage, "with fetch: {fetch:?}");
1555 }
1556 Ok(())
1557 }
1558
1559 #[tokio::test]
1560 async fn test_sort_metadata() -> Result<()> {
1561 let task_ctx = Arc::new(TaskContext::default());
1562 let field_metadata: HashMap<String, String> =
1563 vec![("foo".to_string(), "bar".to_string())]
1564 .into_iter()
1565 .collect();
1566 let schema_metadata: HashMap<String, String> =
1567 vec![("baz".to_string(), "barf".to_string())]
1568 .into_iter()
1569 .collect();
1570
1571 let mut field = Field::new("field_name", DataType::UInt64, true);
1572 field.set_metadata(field_metadata.clone());
1573 let schema = Schema::new_with_metadata(vec![field], schema_metadata.clone());
1574 let schema = Arc::new(schema);
1575
1576 let data: ArrayRef =
1577 Arc::new(vec![3, 2, 1].into_iter().map(Some).collect::<UInt64Array>());
1578
1579 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![data]).unwrap();
1580 let input =
1581 TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)
1582 .unwrap();
1583
1584 let sort_exec = Arc::new(SortExec::new(
1585 LexOrdering::new(vec![PhysicalSortExpr {
1586 expr: col("field_name", &schema)?,
1587 options: SortOptions::default(),
1588 }]),
1589 input,
1590 ));
1591
1592 let result: Vec<RecordBatch> = collect(sort_exec, task_ctx).await?;
1593
1594 let expected_data: ArrayRef =
1595 Arc::new(vec![1, 2, 3].into_iter().map(Some).collect::<UInt64Array>());
1596 let expected_batch =
1597 RecordBatch::try_new(Arc::clone(&schema), vec![expected_data]).unwrap();
1598
1599 assert_eq!(&vec![expected_batch], &result);
1601
1602 assert_eq!(result[0].schema().fields()[0].metadata(), &field_metadata);
1604 assert_eq!(result[0].schema().metadata(), &schema_metadata);
1605
1606 Ok(())
1607 }
1608
1609 #[tokio::test]
1610 async fn test_lex_sort_by_mixed_types() -> Result<()> {
1611 let task_ctx = Arc::new(TaskContext::default());
1612 let schema = Arc::new(Schema::new(vec![
1613 Field::new("a", DataType::Int32, true),
1614 Field::new(
1615 "b",
1616 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1617 true,
1618 ),
1619 ]));
1620
1621 let batch = RecordBatch::try_new(
1623 Arc::clone(&schema),
1624 vec![
1625 Arc::new(Int32Array::from(vec![Some(2), None, Some(1), Some(2)])),
1626 Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1627 Some(vec![Some(3)]),
1628 Some(vec![Some(1)]),
1629 Some(vec![Some(6), None]),
1630 Some(vec![Some(5)]),
1631 ])),
1632 ],
1633 )?;
1634
1635 let sort_exec = Arc::new(SortExec::new(
1636 LexOrdering::new(vec![
1637 PhysicalSortExpr {
1638 expr: col("a", &schema)?,
1639 options: SortOptions {
1640 descending: false,
1641 nulls_first: true,
1642 },
1643 },
1644 PhysicalSortExpr {
1645 expr: col("b", &schema)?,
1646 options: SortOptions {
1647 descending: true,
1648 nulls_first: false,
1649 },
1650 },
1651 ]),
1652 TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?,
1653 ));
1654
1655 assert_eq!(DataType::Int32, *sort_exec.schema().field(0).data_type());
1656 assert_eq!(
1657 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1658 *sort_exec.schema().field(1).data_type()
1659 );
1660
1661 let result: Vec<RecordBatch> =
1662 collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
1663 let metrics = sort_exec.metrics().unwrap();
1664 assert!(metrics.elapsed_compute().unwrap() > 0);
1665 assert_eq!(metrics.output_rows().unwrap(), 4);
1666 assert_eq!(result.len(), 1);
1667
1668 let expected = RecordBatch::try_new(
1669 schema,
1670 vec![
1671 Arc::new(Int32Array::from(vec![None, Some(1), Some(2), Some(2)])),
1672 Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1673 Some(vec![Some(1)]),
1674 Some(vec![Some(6), None]),
1675 Some(vec![Some(5)]),
1676 Some(vec![Some(3)]),
1677 ])),
1678 ],
1679 )?;
1680
1681 assert_eq!(expected, result[0]);
1682
1683 Ok(())
1684 }
1685
1686 #[tokio::test]
1687 async fn test_lex_sort_by_float() -> Result<()> {
1688 let task_ctx = Arc::new(TaskContext::default());
1689 let schema = Arc::new(Schema::new(vec![
1690 Field::new("a", DataType::Float32, true),
1691 Field::new("b", DataType::Float64, true),
1692 ]));
1693
1694 let batch = RecordBatch::try_new(
1696 Arc::clone(&schema),
1697 vec![
1698 Arc::new(Float32Array::from(vec![
1699 Some(f32::NAN),
1700 None,
1701 None,
1702 Some(f32::NAN),
1703 Some(1.0_f32),
1704 Some(1.0_f32),
1705 Some(2.0_f32),
1706 Some(3.0_f32),
1707 ])),
1708 Arc::new(Float64Array::from(vec![
1709 Some(200.0_f64),
1710 Some(20.0_f64),
1711 Some(10.0_f64),
1712 Some(100.0_f64),
1713 Some(f64::NAN),
1714 None,
1715 None,
1716 Some(f64::NAN),
1717 ])),
1718 ],
1719 )?;
1720
1721 let sort_exec = Arc::new(SortExec::new(
1722 LexOrdering::new(vec![
1723 PhysicalSortExpr {
1724 expr: col("a", &schema)?,
1725 options: SortOptions {
1726 descending: true,
1727 nulls_first: true,
1728 },
1729 },
1730 PhysicalSortExpr {
1731 expr: col("b", &schema)?,
1732 options: SortOptions {
1733 descending: false,
1734 nulls_first: false,
1735 },
1736 },
1737 ]),
1738 TestMemoryExec::try_new_exec(&[vec![batch]], schema, None)?,
1739 ));
1740
1741 assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type());
1742 assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type());
1743
1744 let result: Vec<RecordBatch> =
1745 collect(Arc::clone(&sort_exec) as Arc<dyn ExecutionPlan>, task_ctx).await?;
1746 let metrics = sort_exec.metrics().unwrap();
1747 assert!(metrics.elapsed_compute().unwrap() > 0);
1748 assert_eq!(metrics.output_rows().unwrap(), 8);
1749 assert_eq!(result.len(), 1);
1750
1751 let columns = result[0].columns();
1752
1753 assert_eq!(DataType::Float32, *columns[0].data_type());
1754 assert_eq!(DataType::Float64, *columns[1].data_type());
1755
1756 let a = as_primitive_array::<Float32Type>(&columns[0])?;
1757 let b = as_primitive_array::<Float64Type>(&columns[1])?;
1758
1759 let result: Vec<(Option<String>, Option<String>)> = (0..result[0].num_rows())
1761 .map(|i| {
1762 let aval = if a.is_valid(i) {
1763 Some(a.value(i).to_string())
1764 } else {
1765 None
1766 };
1767 let bval = if b.is_valid(i) {
1768 Some(b.value(i).to_string())
1769 } else {
1770 None
1771 };
1772 (aval, bval)
1773 })
1774 .collect();
1775
1776 let expected: Vec<(Option<String>, Option<String>)> = vec![
1777 (None, Some("10".to_owned())),
1778 (None, Some("20".to_owned())),
1779 (Some("NaN".to_owned()), Some("100".to_owned())),
1780 (Some("NaN".to_owned()), Some("200".to_owned())),
1781 (Some("3".to_owned()), Some("NaN".to_owned())),
1782 (Some("2".to_owned()), None),
1783 (Some("1".to_owned()), Some("NaN".to_owned())),
1784 (Some("1".to_owned()), None),
1785 ];
1786
1787 assert_eq!(expected, result);
1788
1789 Ok(())
1790 }
1791
1792 #[tokio::test]
1793 async fn test_drop_cancel() -> Result<()> {
1794 let task_ctx = Arc::new(TaskContext::default());
1795 let schema =
1796 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
1797
1798 let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
1799 let refs = blocking_exec.refs();
1800 let sort_exec = Arc::new(SortExec::new(
1801 LexOrdering::new(vec![PhysicalSortExpr {
1802 expr: col("a", &schema)?,
1803 options: SortOptions::default(),
1804 }]),
1805 blocking_exec,
1806 ));
1807
1808 let fut = collect(sort_exec, Arc::clone(&task_ctx));
1809 let mut fut = fut.boxed();
1810
1811 assert_is_pending(&mut fut);
1812 drop(fut);
1813 assert_strong_count_converges_to_zero(refs).await;
1814
1815 assert_eq!(
1816 task_ctx.runtime_env().memory_pool.reserved(),
1817 0,
1818 "The sort should have returned all memory used back to the memory manager"
1819 );
1820
1821 Ok(())
1822 }
1823
1824 #[test]
1825 fn test_empty_sort_batch() {
1826 let schema = Arc::new(Schema::empty());
1827 let options = RecordBatchOptions::new().with_row_count(Some(1));
1828 let batch =
1829 RecordBatch::try_new_with_options(Arc::clone(&schema), vec![], &options)
1830 .unwrap();
1831
1832 let expressions = LexOrdering::new(vec![PhysicalSortExpr {
1833 expr: Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
1834 options: SortOptions::default(),
1835 }]);
1836
1837 let result = sort_batch(&batch, expressions.as_ref(), None).unwrap();
1838 assert_eq!(result.num_rows(), 1);
1839 }
1840
1841 #[tokio::test]
1842 async fn topk_unbounded_source() -> Result<()> {
1843 let task_ctx = Arc::new(TaskContext::default());
1844 let schema = Schema::new(vec![Field::new("c1", DataType::UInt64, false)]);
1845 let source = SortedUnboundedExec {
1846 schema: schema.clone(),
1847 batch_size: 2,
1848 cache: SortedUnboundedExec::compute_properties(Arc::new(schema.clone())),
1849 };
1850 let mut plan = SortExec::new(
1851 LexOrdering::new(vec![PhysicalSortExpr::new_default(Arc::new(Column::new(
1852 "c1", 0,
1853 )))]),
1854 Arc::new(source),
1855 );
1856 plan = plan.with_fetch(Some(9));
1857
1858 let batches = collect(Arc::new(plan), task_ctx).await?;
1859 #[rustfmt::skip]
1860 let expected = [
1861 "+----+",
1862 "| c1 |",
1863 "+----+",
1864 "| 0 |",
1865 "| 1 |",
1866 "| 2 |",
1867 "| 3 |",
1868 "| 4 |",
1869 "| 5 |",
1870 "| 6 |",
1871 "| 7 |",
1872 "| 8 |",
1873 "+----+",];
1874 assert_batches_eq!(expected, &batches);
1875 Ok(())
1876 }
1877}