1use std::any::Any;
29use std::fmt::{self, Debug};
30use std::mem::{size_of, size_of_val};
31use std::sync::Arc;
32use std::task::{Context, Poll};
33use std::vec;
34
35use crate::common::SharedMemoryReservation;
36use crate::execution_plan::{boundedness_from_children, emission_type_from_children};
37use crate::joins::hash_join::{equal_rows_arr, update_hash};
38use crate::joins::stream_join_utils::{
39 calculate_filter_expr_intervals, combine_two_batches,
40 convert_sort_expr_with_filter_schema, get_pruning_anti_indices,
41 get_pruning_semi_indices, prepare_sorted_exprs, record_visited_indices,
42 PruningJoinHashMap, SortedFilterExpr, StreamJoinMetrics,
43};
44use crate::joins::utils::{
45 apply_join_filter_to_indices, build_batch_from_indices, build_join_schema,
46 check_join_is_valid, symmetric_join_output_partitioning, BatchSplitter,
47 BatchTransformer, ColumnIndex, JoinFilter, JoinHashMapType, JoinOn, JoinOnRef,
48 NoopBatchTransformer, StatefulStreamResult,
49};
50use crate::projection::{
51 join_allows_pushdown, join_table_borders, new_join_children,
52 physical_to_column_exprs, update_join_filter, update_join_on, ProjectionExec,
53};
54use crate::{
55 joins::StreamJoinPartitionMode,
56 metrics::{ExecutionPlanMetricsSet, MetricsSet},
57 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties,
58 PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
59};
60
61use arrow::array::{
62 ArrowPrimitiveType, NativeAdapter, PrimitiveArray, PrimitiveBuilder, UInt32Array,
63 UInt64Array,
64};
65use arrow::compute::concat_batches;
66use arrow::datatypes::{ArrowNativeType, Schema, SchemaRef};
67use arrow::record_batch::RecordBatch;
68use datafusion_common::hash_utils::create_hashes;
69use datafusion_common::utils::bisect;
70use datafusion_common::{internal_err, plan_err, HashSet, JoinSide, JoinType, Result};
71use datafusion_execution::memory_pool::MemoryConsumer;
72use datafusion_execution::TaskContext;
73use datafusion_expr::interval_arithmetic::Interval;
74use datafusion_physical_expr::equivalence::join_equivalence_properties;
75use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph;
76use datafusion_physical_expr::PhysicalExprRef;
77use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
78
79use ahash::RandomState;
80use futures::{ready, Stream, StreamExt};
81use parking_lot::Mutex;
82
83const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4;
84
85#[derive(Debug, Clone)]
171pub struct SymmetricHashJoinExec {
172 pub(crate) left: Arc<dyn ExecutionPlan>,
174 pub(crate) right: Arc<dyn ExecutionPlan>,
176 pub(crate) on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
178 pub(crate) filter: Option<JoinFilter>,
180 pub(crate) join_type: JoinType,
182 random_state: RandomState,
184 metrics: ExecutionPlanMetricsSet,
186 column_indices: Vec<ColumnIndex>,
188 pub(crate) null_equals_null: bool,
190 pub(crate) left_sort_exprs: Option<LexOrdering>,
192 pub(crate) right_sort_exprs: Option<LexOrdering>,
194 mode: StreamJoinPartitionMode,
196 cache: PlanProperties,
198}
199
200impl SymmetricHashJoinExec {
201 #[allow(clippy::too_many_arguments)]
208 pub fn try_new(
209 left: Arc<dyn ExecutionPlan>,
210 right: Arc<dyn ExecutionPlan>,
211 on: JoinOn,
212 filter: Option<JoinFilter>,
213 join_type: &JoinType,
214 null_equals_null: bool,
215 left_sort_exprs: Option<LexOrdering>,
216 right_sort_exprs: Option<LexOrdering>,
217 mode: StreamJoinPartitionMode,
218 ) -> Result<Self> {
219 let left_schema = left.schema();
220 let right_schema = right.schema();
221
222 if on.is_empty() {
224 return plan_err!(
225 "On constraints in SymmetricHashJoinExec should be non-empty"
226 );
227 }
228
229 check_join_is_valid(&left_schema, &right_schema, &on)?;
231
232 let (schema, column_indices) =
234 build_join_schema(&left_schema, &right_schema, join_type);
235
236 let random_state = RandomState::with_seeds(0, 0, 0, 0);
238 let schema = Arc::new(schema);
239 let cache =
240 Self::compute_properties(&left, &right, Arc::clone(&schema), *join_type, &on);
241 Ok(SymmetricHashJoinExec {
242 left,
243 right,
244 on,
245 filter,
246 join_type: *join_type,
247 random_state,
248 metrics: ExecutionPlanMetricsSet::new(),
249 column_indices,
250 null_equals_null,
251 left_sort_exprs,
252 right_sort_exprs,
253 mode,
254 cache,
255 })
256 }
257
258 fn compute_properties(
260 left: &Arc<dyn ExecutionPlan>,
261 right: &Arc<dyn ExecutionPlan>,
262 schema: SchemaRef,
263 join_type: JoinType,
264 join_on: JoinOnRef,
265 ) -> PlanProperties {
266 let eq_properties = join_equivalence_properties(
268 left.equivalence_properties().clone(),
269 right.equivalence_properties().clone(),
270 &join_type,
271 schema,
272 &[false, false],
273 None,
275 join_on,
276 );
277
278 let output_partitioning =
279 symmetric_join_output_partitioning(left, right, &join_type);
280
281 PlanProperties::new(
282 eq_properties,
283 output_partitioning,
284 emission_type_from_children([left, right]),
285 boundedness_from_children([left, right]),
286 )
287 }
288
289 pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
291 &self.left
292 }
293
294 pub fn right(&self) -> &Arc<dyn ExecutionPlan> {
296 &self.right
297 }
298
299 pub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)] {
301 &self.on
302 }
303
304 pub fn filter(&self) -> Option<&JoinFilter> {
306 self.filter.as_ref()
307 }
308
309 pub fn join_type(&self) -> &JoinType {
311 &self.join_type
312 }
313
314 pub fn null_equals_null(&self) -> bool {
316 self.null_equals_null
317 }
318
319 pub fn partition_mode(&self) -> StreamJoinPartitionMode {
321 self.mode
322 }
323
324 pub fn left_sort_exprs(&self) -> Option<&LexOrdering> {
326 self.left_sort_exprs.as_ref()
327 }
328
329 pub fn right_sort_exprs(&self) -> Option<&LexOrdering> {
331 self.right_sort_exprs.as_ref()
332 }
333
334 pub fn check_if_order_information_available(&self) -> Result<bool> {
336 if let Some(filter) = self.filter() {
337 let left = self.left();
338 if let Some(left_ordering) = left.output_ordering() {
339 let right = self.right();
340 if let Some(right_ordering) = right.output_ordering() {
341 let left_convertible = convert_sort_expr_with_filter_schema(
342 &JoinSide::Left,
343 filter,
344 &left.schema(),
345 &left_ordering[0],
346 )?
347 .is_some();
348 let right_convertible = convert_sort_expr_with_filter_schema(
349 &JoinSide::Right,
350 filter,
351 &right.schema(),
352 &right_ordering[0],
353 )?
354 .is_some();
355 return Ok(left_convertible && right_convertible);
356 }
357 }
358 }
359 Ok(false)
360 }
361}
362
363impl DisplayAs for SymmetricHashJoinExec {
364 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
365 match t {
366 DisplayFormatType::Default | DisplayFormatType::Verbose => {
367 let display_filter = self.filter.as_ref().map_or_else(
368 || "".to_string(),
369 |f| format!(", filter={}", f.expression()),
370 );
371 let on = self
372 .on
373 .iter()
374 .map(|(c1, c2)| format!("({}, {})", c1, c2))
375 .collect::<Vec<String>>()
376 .join(", ");
377 write!(
378 f,
379 "SymmetricHashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}",
380 self.mode, self.join_type, on, display_filter
381 )
382 }
383 }
384 }
385}
386
387impl ExecutionPlan for SymmetricHashJoinExec {
388 fn name(&self) -> &'static str {
389 "SymmetricHashJoinExec"
390 }
391
392 fn as_any(&self) -> &dyn Any {
393 self
394 }
395
396 fn properties(&self) -> &PlanProperties {
397 &self.cache
398 }
399
400 fn required_input_distribution(&self) -> Vec<Distribution> {
401 match self.mode {
402 StreamJoinPartitionMode::Partitioned => {
403 let (left_expr, right_expr) = self
404 .on
405 .iter()
406 .map(|(l, r)| (Arc::clone(l) as _, Arc::clone(r) as _))
407 .unzip();
408 vec![
409 Distribution::HashPartitioned(left_expr),
410 Distribution::HashPartitioned(right_expr),
411 ]
412 }
413 StreamJoinPartitionMode::SinglePartition => {
414 vec![Distribution::SinglePartition, Distribution::SinglePartition]
415 }
416 }
417 }
418
419 fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
420 vec![
421 self.left_sort_exprs
422 .as_ref()
423 .cloned()
424 .map(LexRequirement::from),
425 self.right_sort_exprs
426 .as_ref()
427 .cloned()
428 .map(LexRequirement::from),
429 ]
430 }
431
432 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
433 vec![&self.left, &self.right]
434 }
435
436 fn with_new_children(
437 self: Arc<Self>,
438 children: Vec<Arc<dyn ExecutionPlan>>,
439 ) -> Result<Arc<dyn ExecutionPlan>> {
440 Ok(Arc::new(SymmetricHashJoinExec::try_new(
441 Arc::clone(&children[0]),
442 Arc::clone(&children[1]),
443 self.on.clone(),
444 self.filter.clone(),
445 &self.join_type,
446 self.null_equals_null,
447 self.left_sort_exprs.clone(),
448 self.right_sort_exprs.clone(),
449 self.mode,
450 )?))
451 }
452
453 fn metrics(&self) -> Option<MetricsSet> {
454 Some(self.metrics.clone_inner())
455 }
456
457 fn statistics(&self) -> Result<Statistics> {
458 Ok(Statistics::new_unknown(&self.schema()))
460 }
461
462 fn execute(
463 &self,
464 partition: usize,
465 context: Arc<TaskContext>,
466 ) -> Result<SendableRecordBatchStream> {
467 let left_partitions = self.left.output_partitioning().partition_count();
468 let right_partitions = self.right.output_partitioning().partition_count();
469 if left_partitions != right_partitions {
470 return internal_err!(
471 "Invalid SymmetricHashJoinExec, partition count mismatch {left_partitions}!={right_partitions},\
472 consider using RepartitionExec"
473 );
474 }
475 let (left_sorted_filter_expr, right_sorted_filter_expr, graph) = match (
478 self.left_sort_exprs(),
479 self.right_sort_exprs(),
480 &self.filter,
481 ) {
482 (Some(left_sort_exprs), Some(right_sort_exprs), Some(filter)) => {
483 let (left, right, graph) = prepare_sorted_exprs(
484 filter,
485 &self.left,
486 &self.right,
487 left_sort_exprs,
488 right_sort_exprs,
489 )?;
490 (Some(left), Some(right), Some(graph))
491 }
492 _ => (None, None, None),
495 };
496
497 let (on_left, on_right) = self.on.iter().cloned().unzip();
498
499 let left_side_joiner =
500 OneSideHashJoiner::new(JoinSide::Left, on_left, self.left.schema());
501 let right_side_joiner =
502 OneSideHashJoiner::new(JoinSide::Right, on_right, self.right.schema());
503
504 let left_stream = self.left.execute(partition, Arc::clone(&context))?;
505
506 let right_stream = self.right.execute(partition, Arc::clone(&context))?;
507
508 let batch_size = context.session_config().batch_size();
509 let enforce_batch_size_in_joins =
510 context.session_config().enforce_batch_size_in_joins();
511
512 let reservation = Arc::new(Mutex::new(
513 MemoryConsumer::new(format!("SymmetricHashJoinStream[{partition}]"))
514 .register(context.memory_pool()),
515 ));
516 if let Some(g) = graph.as_ref() {
517 reservation.lock().try_grow(g.size())?;
518 }
519
520 if enforce_batch_size_in_joins {
521 Ok(Box::pin(SymmetricHashJoinStream {
522 left_stream,
523 right_stream,
524 schema: self.schema(),
525 filter: self.filter.clone(),
526 join_type: self.join_type,
527 random_state: self.random_state.clone(),
528 left: left_side_joiner,
529 right: right_side_joiner,
530 column_indices: self.column_indices.clone(),
531 metrics: StreamJoinMetrics::new(partition, &self.metrics),
532 graph,
533 left_sorted_filter_expr,
534 right_sorted_filter_expr,
535 null_equals_null: self.null_equals_null,
536 state: SHJStreamState::PullRight,
537 reservation,
538 batch_transformer: BatchSplitter::new(batch_size),
539 }))
540 } else {
541 Ok(Box::pin(SymmetricHashJoinStream {
542 left_stream,
543 right_stream,
544 schema: self.schema(),
545 filter: self.filter.clone(),
546 join_type: self.join_type,
547 random_state: self.random_state.clone(),
548 left: left_side_joiner,
549 right: right_side_joiner,
550 column_indices: self.column_indices.clone(),
551 metrics: StreamJoinMetrics::new(partition, &self.metrics),
552 graph,
553 left_sorted_filter_expr,
554 right_sorted_filter_expr,
555 null_equals_null: self.null_equals_null,
556 state: SHJStreamState::PullRight,
557 reservation,
558 batch_transformer: NoopBatchTransformer::new(),
559 }))
560 }
561 }
562
563 fn try_swapping_with_projection(
567 &self,
568 projection: &ProjectionExec,
569 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
570 let Some(projection_as_columns) = physical_to_column_exprs(projection.expr())
572 else {
573 return Ok(None);
574 };
575
576 let (far_right_left_col_ind, far_left_right_col_ind) = join_table_borders(
577 self.left().schema().fields().len(),
578 &projection_as_columns,
579 );
580
581 if !join_allows_pushdown(
582 &projection_as_columns,
583 &self.schema(),
584 far_right_left_col_ind,
585 far_left_right_col_ind,
586 ) {
587 return Ok(None);
588 }
589
590 let Some(new_on) = update_join_on(
591 &projection_as_columns[0..=far_right_left_col_ind as _],
592 &projection_as_columns[far_left_right_col_ind as _..],
593 self.on(),
594 self.left().schema().fields().len(),
595 ) else {
596 return Ok(None);
597 };
598
599 let new_filter = if let Some(filter) = self.filter() {
600 match update_join_filter(
601 &projection_as_columns[0..=far_right_left_col_ind as _],
602 &projection_as_columns[far_left_right_col_ind as _..],
603 filter,
604 self.left().schema().fields().len(),
605 ) {
606 Some(updated_filter) => Some(updated_filter),
607 None => return Ok(None),
608 }
609 } else {
610 None
611 };
612
613 let (new_left, new_right) = new_join_children(
614 &projection_as_columns,
615 far_right_left_col_ind,
616 far_left_right_col_ind,
617 self.left(),
618 self.right(),
619 )?;
620
621 Ok(Some(Arc::new(SymmetricHashJoinExec::try_new(
622 Arc::new(new_left),
623 Arc::new(new_right),
624 new_on,
625 new_filter,
626 self.join_type(),
627 self.null_equals_null(),
628 self.right()
629 .output_ordering()
630 .map(|p| LexOrdering::new(p.to_vec())),
631 self.left()
632 .output_ordering()
633 .map(|p| LexOrdering::new(p.to_vec())),
634 self.partition_mode(),
635 )?)))
636 }
637}
638
639struct SymmetricHashJoinStream<T> {
641 left_stream: SendableRecordBatchStream,
643 right_stream: SendableRecordBatchStream,
644 schema: Arc<Schema>,
646 filter: Option<JoinFilter>,
648 join_type: JoinType,
650 left: OneSideHashJoiner,
652 right: OneSideHashJoiner,
654 column_indices: Vec<ColumnIndex>,
656 graph: Option<ExprIntervalGraph>,
658 left_sorted_filter_expr: Option<SortedFilterExpr>,
660 right_sorted_filter_expr: Option<SortedFilterExpr>,
662 random_state: RandomState,
664 null_equals_null: bool,
666 metrics: StreamJoinMetrics,
668 reservation: SharedMemoryReservation,
670 state: SHJStreamState,
672 batch_transformer: T,
674}
675
676impl<T: BatchTransformer + Unpin + Send> RecordBatchStream
677 for SymmetricHashJoinStream<T>
678{
679 fn schema(&self) -> SchemaRef {
680 Arc::clone(&self.schema)
681 }
682}
683
684impl<T: BatchTransformer + Unpin + Send> Stream for SymmetricHashJoinStream<T> {
685 type Item = Result<RecordBatch>;
686
687 fn poll_next(
688 mut self: std::pin::Pin<&mut Self>,
689 cx: &mut Context<'_>,
690 ) -> Poll<Option<Self::Item>> {
691 self.poll_next_impl(cx)
692 }
693}
694
695fn determine_prune_length(
714 buffer: &RecordBatch,
715 build_side_filter_expr: &SortedFilterExpr,
716) -> Result<usize> {
717 let origin_sorted_expr = build_side_filter_expr.origin_sorted_expr();
718 let interval = build_side_filter_expr.interval();
719 let batch_arr = origin_sorted_expr
721 .expr
722 .evaluate(buffer)?
723 .into_array(buffer.num_rows())?;
724
725 let target = if origin_sorted_expr.options.descending {
727 interval.upper().clone()
728 } else {
729 interval.lower().clone()
730 };
731
732 bisect::<true>(&[batch_arr], &[target], &[origin_sorted_expr.options])
734}
735
736fn need_to_produce_result_in_final(build_side: JoinSide, join_type: JoinType) -> bool {
751 if build_side == JoinSide::Left {
752 matches!(
753 join_type,
754 JoinType::Left
755 | JoinType::LeftAnti
756 | JoinType::Full
757 | JoinType::LeftSemi
758 | JoinType::LeftMark
759 )
760 } else {
761 matches!(
762 join_type,
763 JoinType::Right | JoinType::RightAnti | JoinType::Full | JoinType::RightSemi
764 )
765 }
766}
767
768fn calculate_indices_by_join_type<L: ArrowPrimitiveType, R: ArrowPrimitiveType>(
786 build_side: JoinSide,
787 prune_length: usize,
788 visited_rows: &HashSet<usize>,
789 deleted_offset: usize,
790 join_type: JoinType,
791) -> Result<(PrimitiveArray<L>, PrimitiveArray<R>)>
792where
793 NativeAdapter<L>: From<<L as ArrowPrimitiveType>::Native>,
794{
795 let result = match (build_side, join_type) {
797 (JoinSide::Left, JoinType::LeftMark) => {
798 let build_indices = (0..prune_length)
799 .map(L::Native::from_usize)
800 .collect::<PrimitiveArray<L>>();
801 let probe_indices = (0..prune_length)
802 .map(|idx| {
803 visited_rows
805 .contains(&(idx + deleted_offset))
806 .then_some(R::Native::from_usize(0).unwrap())
807 })
808 .collect();
809 (build_indices, probe_indices)
810 }
811 (JoinSide::Left, JoinType::Left | JoinType::LeftAnti)
813 | (JoinSide::Right, JoinType::Right | JoinType::RightAnti)
814 | (_, JoinType::Full) => {
815 let build_unmatched_indices =
816 get_pruning_anti_indices(prune_length, deleted_offset, visited_rows);
817 let mut builder =
818 PrimitiveBuilder::<R>::with_capacity(build_unmatched_indices.len());
819 builder.append_nulls(build_unmatched_indices.len());
820 let probe_indices = builder.finish();
821 (build_unmatched_indices, probe_indices)
822 }
823 (JoinSide::Left, JoinType::LeftSemi) | (JoinSide::Right, JoinType::RightSemi) => {
825 let build_unmatched_indices =
826 get_pruning_semi_indices(prune_length, deleted_offset, visited_rows);
827 let mut builder =
828 PrimitiveBuilder::<R>::with_capacity(build_unmatched_indices.len());
829 builder.append_nulls(build_unmatched_indices.len());
830 let probe_indices = builder.finish();
831 (build_unmatched_indices, probe_indices)
832 }
833 _ => unreachable!(),
835 };
836 Ok(result)
837}
838
839pub(crate) fn build_side_determined_results(
857 build_hash_joiner: &OneSideHashJoiner,
858 output_schema: &SchemaRef,
859 prune_length: usize,
860 probe_schema: SchemaRef,
861 join_type: JoinType,
862 column_indices: &[ColumnIndex],
863) -> Result<Option<RecordBatch>> {
864 if prune_length > 0
866 && need_to_produce_result_in_final(build_hash_joiner.build_side, join_type)
867 {
868 let (build_indices, probe_indices) = calculate_indices_by_join_type(
870 build_hash_joiner.build_side,
871 prune_length,
872 &build_hash_joiner.visited_rows,
873 build_hash_joiner.deleted_offset,
874 join_type,
875 )?;
876
877 let empty_probe_batch = RecordBatch::new_empty(probe_schema);
879 build_batch_from_indices(
881 output_schema.as_ref(),
882 &build_hash_joiner.input_buffer,
883 &empty_probe_batch,
884 &build_indices,
885 &probe_indices,
886 column_indices,
887 build_hash_joiner.build_side,
888 )
889 .map(|batch| (batch.num_rows() > 0).then_some(batch))
890 } else {
891 Ok(None)
893 }
894}
895
896#[allow(clippy::too_many_arguments)]
916pub(crate) fn join_with_probe_batch(
917 build_hash_joiner: &mut OneSideHashJoiner,
918 probe_hash_joiner: &mut OneSideHashJoiner,
919 schema: &SchemaRef,
920 join_type: JoinType,
921 filter: Option<&JoinFilter>,
922 probe_batch: &RecordBatch,
923 column_indices: &[ColumnIndex],
924 random_state: &RandomState,
925 null_equals_null: bool,
926) -> Result<Option<RecordBatch>> {
927 if build_hash_joiner.input_buffer.num_rows() == 0 || probe_batch.num_rows() == 0 {
928 return Ok(None);
929 }
930 let (build_indices, probe_indices) = lookup_join_hashmap(
931 &build_hash_joiner.hashmap,
932 &build_hash_joiner.input_buffer,
933 probe_batch,
934 &build_hash_joiner.on,
935 &probe_hash_joiner.on,
936 random_state,
937 null_equals_null,
938 &mut build_hash_joiner.hashes_buffer,
939 Some(build_hash_joiner.deleted_offset),
940 )?;
941
942 let (build_indices, probe_indices) = if let Some(filter) = filter {
943 apply_join_filter_to_indices(
944 &build_hash_joiner.input_buffer,
945 probe_batch,
946 build_indices,
947 probe_indices,
948 filter,
949 build_hash_joiner.build_side,
950 )?
951 } else {
952 (build_indices, probe_indices)
953 };
954
955 if need_to_produce_result_in_final(build_hash_joiner.build_side, join_type) {
956 record_visited_indices(
957 &mut build_hash_joiner.visited_rows,
958 build_hash_joiner.deleted_offset,
959 &build_indices,
960 );
961 }
962 if need_to_produce_result_in_final(build_hash_joiner.build_side.negate(), join_type) {
963 record_visited_indices(
964 &mut probe_hash_joiner.visited_rows,
965 probe_hash_joiner.offset,
966 &probe_indices,
967 );
968 }
969 if matches!(
970 join_type,
971 JoinType::LeftAnti
972 | JoinType::RightAnti
973 | JoinType::LeftSemi
974 | JoinType::LeftMark
975 | JoinType::RightSemi
976 ) {
977 Ok(None)
978 } else {
979 build_batch_from_indices(
980 schema,
981 &build_hash_joiner.input_buffer,
982 probe_batch,
983 &build_indices,
984 &probe_indices,
985 column_indices,
986 build_hash_joiner.build_side,
987 )
988 .map(|batch| (batch.num_rows() > 0).then_some(batch))
989 }
990}
991
992#[allow(clippy::too_many_arguments)]
1012fn lookup_join_hashmap(
1013 build_hashmap: &PruningJoinHashMap,
1014 build_batch: &RecordBatch,
1015 probe_batch: &RecordBatch,
1016 build_on: &[PhysicalExprRef],
1017 probe_on: &[PhysicalExprRef],
1018 random_state: &RandomState,
1019 null_equals_null: bool,
1020 hashes_buffer: &mut Vec<u64>,
1021 deleted_offset: Option<usize>,
1022) -> Result<(UInt64Array, UInt32Array)> {
1023 let keys_values = probe_on
1024 .iter()
1025 .map(|c| c.evaluate(probe_batch)?.into_array(probe_batch.num_rows()))
1026 .collect::<Result<Vec<_>>>()?;
1027 let build_join_values = build_on
1028 .iter()
1029 .map(|c| c.evaluate(build_batch)?.into_array(build_batch.num_rows()))
1030 .collect::<Result<Vec<_>>>()?;
1031
1032 hashes_buffer.clear();
1033 hashes_buffer.resize(probe_batch.num_rows(), 0);
1034 let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?;
1035
1036 let (mut matched_probe, mut matched_build) = build_hashmap
1067 .get_matched_indices(hash_values.iter().enumerate().rev(), deleted_offset);
1068
1069 matched_probe.reverse();
1070 matched_build.reverse();
1071
1072 let build_indices: UInt64Array = matched_build.into();
1073 let probe_indices: UInt32Array = matched_probe.into();
1074
1075 let (build_indices, probe_indices) = equal_rows_arr(
1076 &build_indices,
1077 &probe_indices,
1078 &build_join_values,
1079 &keys_values,
1080 null_equals_null,
1081 )?;
1082
1083 Ok((build_indices, probe_indices))
1084}
1085
1086pub struct OneSideHashJoiner {
1087 build_side: JoinSide,
1089 pub input_buffer: RecordBatch,
1091 pub(crate) on: Vec<PhysicalExprRef>,
1093 pub(crate) hashmap: PruningJoinHashMap,
1095 pub(crate) hashes_buffer: Vec<u64>,
1097 pub(crate) visited_rows: HashSet<usize>,
1099 pub(crate) offset: usize,
1101 pub(crate) deleted_offset: usize,
1103}
1104
1105impl OneSideHashJoiner {
1106 pub fn size(&self) -> usize {
1107 let mut size = 0;
1108 size += size_of_val(self);
1109 size += size_of_val(&self.build_side);
1110 size += self.input_buffer.get_array_memory_size();
1111 size += size_of_val(&self.on);
1112 size += self.hashmap.size();
1113 size += self.hashes_buffer.capacity() * size_of::<u64>();
1114 size += self.visited_rows.capacity() * size_of::<usize>();
1115 size += size_of_val(&self.offset);
1116 size += size_of_val(&self.deleted_offset);
1117 size
1118 }
1119 pub fn new(
1120 build_side: JoinSide,
1121 on: Vec<PhysicalExprRef>,
1122 schema: SchemaRef,
1123 ) -> Self {
1124 Self {
1125 build_side,
1126 input_buffer: RecordBatch::new_empty(schema),
1127 on,
1128 hashmap: PruningJoinHashMap::with_capacity(0),
1129 hashes_buffer: vec![],
1130 visited_rows: HashSet::new(),
1131 offset: 0,
1132 deleted_offset: 0,
1133 }
1134 }
1135
1136 pub(crate) fn update_internal_state(
1147 &mut self,
1148 batch: &RecordBatch,
1149 random_state: &RandomState,
1150 ) -> Result<()> {
1151 self.input_buffer = concat_batches(&batch.schema(), [&self.input_buffer, batch])?;
1153 self.hashes_buffer.resize(batch.num_rows(), 0);
1155 update_hash(
1158 &self.on,
1159 batch,
1160 &mut self.hashmap,
1161 self.offset,
1162 random_state,
1163 &mut self.hashes_buffer,
1164 self.deleted_offset,
1165 false,
1166 )?;
1167 Ok(())
1168 }
1169
1170 pub(crate) fn calculate_prune_length_with_probe_batch(
1182 &mut self,
1183 build_side_sorted_filter_expr: &mut SortedFilterExpr,
1184 probe_side_sorted_filter_expr: &mut SortedFilterExpr,
1185 graph: &mut ExprIntervalGraph,
1186 ) -> Result<usize> {
1187 if self.input_buffer.num_rows() == 0 {
1189 return Ok(0);
1190 }
1191 let mut filter_intervals = vec![];
1194 for expr in [
1195 &build_side_sorted_filter_expr,
1196 &probe_side_sorted_filter_expr,
1197 ] {
1198 filter_intervals.push((expr.node_index(), expr.interval().clone()))
1199 }
1200 graph.update_ranges(&mut filter_intervals, Interval::CERTAINLY_TRUE)?;
1202 let calculated_build_side_interval = filter_intervals.remove(0).1;
1204 if calculated_build_side_interval.eq(build_side_sorted_filter_expr.interval()) {
1206 return Ok(0);
1207 }
1208 build_side_sorted_filter_expr.set_interval(calculated_build_side_interval);
1210
1211 determine_prune_length(&self.input_buffer, build_side_sorted_filter_expr)
1212 }
1213
1214 pub(crate) fn prune_internal_state(&mut self, prune_length: usize) -> Result<()> {
1215 self.hashmap.prune_hash_values(
1217 prune_length,
1218 self.deleted_offset as u64,
1219 HASHMAP_SHRINK_SCALE_FACTOR,
1220 );
1221 for row in self.deleted_offset..(self.deleted_offset + prune_length) {
1223 self.visited_rows.remove(&row);
1224 }
1225 self.input_buffer = self
1227 .input_buffer
1228 .slice(prune_length, self.input_buffer.num_rows() - prune_length);
1229 self.deleted_offset += prune_length;
1231 Ok(())
1232 }
1233}
1234
1235impl<T: BatchTransformer> SymmetricHashJoinStream<T> {
1280 fn poll_next_impl(
1294 &mut self,
1295 cx: &mut Context<'_>,
1296 ) -> Poll<Option<Result<RecordBatch>>> {
1297 loop {
1298 match self.batch_transformer.next() {
1299 None => {
1300 let result = match self.state() {
1301 SHJStreamState::PullRight => {
1302 ready!(self.fetch_next_from_right_stream(cx))
1303 }
1304 SHJStreamState::PullLeft => {
1305 ready!(self.fetch_next_from_left_stream(cx))
1306 }
1307 SHJStreamState::RightExhausted => {
1308 ready!(self.handle_right_stream_end(cx))
1309 }
1310 SHJStreamState::LeftExhausted => {
1311 ready!(self.handle_left_stream_end(cx))
1312 }
1313 SHJStreamState::BothExhausted {
1314 final_result: false,
1315 } => self.prepare_for_final_results_after_exhaustion(),
1316 SHJStreamState::BothExhausted { final_result: true } => {
1317 return Poll::Ready(None);
1318 }
1319 };
1320
1321 match result? {
1322 StatefulStreamResult::Ready(None) => {
1323 return Poll::Ready(None);
1324 }
1325 StatefulStreamResult::Ready(Some(batch)) => {
1326 self.batch_transformer.set_batch(batch);
1327 }
1328 _ => {}
1329 }
1330 }
1331 Some((batch, _)) => {
1332 self.metrics.output_batches.add(1);
1333 self.metrics.output_rows.add(batch.num_rows());
1334 return Poll::Ready(Some(Ok(batch)));
1335 }
1336 }
1337 }
1338 }
1339 fn fetch_next_from_right_stream(
1349 &mut self,
1350 cx: &mut Context<'_>,
1351 ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
1352 match ready!(self.right_stream().poll_next_unpin(cx)) {
1353 Some(Ok(batch)) => {
1354 if batch.num_rows() == 0 {
1355 return Poll::Ready(Ok(StatefulStreamResult::Continue));
1356 }
1357 self.set_state(SHJStreamState::PullLeft);
1358 Poll::Ready(self.process_batch_from_right(batch))
1359 }
1360 Some(Err(e)) => Poll::Ready(Err(e)),
1361 None => {
1362 self.set_state(SHJStreamState::RightExhausted);
1363 Poll::Ready(Ok(StatefulStreamResult::Continue))
1364 }
1365 }
1366 }
1367
1368 fn fetch_next_from_left_stream(
1378 &mut self,
1379 cx: &mut Context<'_>,
1380 ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
1381 match ready!(self.left_stream().poll_next_unpin(cx)) {
1382 Some(Ok(batch)) => {
1383 if batch.num_rows() == 0 {
1384 return Poll::Ready(Ok(StatefulStreamResult::Continue));
1385 }
1386 self.set_state(SHJStreamState::PullRight);
1387 Poll::Ready(self.process_batch_from_left(batch))
1388 }
1389 Some(Err(e)) => Poll::Ready(Err(e)),
1390 None => {
1391 self.set_state(SHJStreamState::LeftExhausted);
1392 Poll::Ready(Ok(StatefulStreamResult::Continue))
1393 }
1394 }
1395 }
1396
1397 fn handle_right_stream_end(
1408 &mut self,
1409 cx: &mut Context<'_>,
1410 ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
1411 match ready!(self.left_stream().poll_next_unpin(cx)) {
1412 Some(Ok(batch)) => {
1413 if batch.num_rows() == 0 {
1414 return Poll::Ready(Ok(StatefulStreamResult::Continue));
1415 }
1416 Poll::Ready(self.process_batch_after_right_end(batch))
1417 }
1418 Some(Err(e)) => Poll::Ready(Err(e)),
1419 None => {
1420 self.set_state(SHJStreamState::BothExhausted {
1421 final_result: false,
1422 });
1423 Poll::Ready(Ok(StatefulStreamResult::Continue))
1424 }
1425 }
1426 }
1427
1428 fn handle_left_stream_end(
1439 &mut self,
1440 cx: &mut Context<'_>,
1441 ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
1442 match ready!(self.right_stream().poll_next_unpin(cx)) {
1443 Some(Ok(batch)) => {
1444 if batch.num_rows() == 0 {
1445 return Poll::Ready(Ok(StatefulStreamResult::Continue));
1446 }
1447 Poll::Ready(self.process_batch_after_left_end(batch))
1448 }
1449 Some(Err(e)) => Poll::Ready(Err(e)),
1450 None => {
1451 self.set_state(SHJStreamState::BothExhausted {
1452 final_result: false,
1453 });
1454 Poll::Ready(Ok(StatefulStreamResult::Continue))
1455 }
1456 }
1457 }
1458
1459 fn prepare_for_final_results_after_exhaustion(
1469 &mut self,
1470 ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1471 self.set_state(SHJStreamState::BothExhausted { final_result: true });
1472 self.process_batches_before_finalization()
1473 }
1474
1475 fn process_batch_from_right(
1476 &mut self,
1477 batch: RecordBatch,
1478 ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1479 self.perform_join_for_given_side(batch, JoinSide::Right)
1480 .map(|maybe_batch| {
1481 if maybe_batch.is_some() {
1482 StatefulStreamResult::Ready(maybe_batch)
1483 } else {
1484 StatefulStreamResult::Continue
1485 }
1486 })
1487 }
1488
1489 fn process_batch_from_left(
1490 &mut self,
1491 batch: RecordBatch,
1492 ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1493 self.perform_join_for_given_side(batch, JoinSide::Left)
1494 .map(|maybe_batch| {
1495 if maybe_batch.is_some() {
1496 StatefulStreamResult::Ready(maybe_batch)
1497 } else {
1498 StatefulStreamResult::Continue
1499 }
1500 })
1501 }
1502
1503 fn process_batch_after_left_end(
1504 &mut self,
1505 right_batch: RecordBatch,
1506 ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1507 self.process_batch_from_right(right_batch)
1508 }
1509
1510 fn process_batch_after_right_end(
1511 &mut self,
1512 left_batch: RecordBatch,
1513 ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1514 self.process_batch_from_left(left_batch)
1515 }
1516
1517 fn process_batches_before_finalization(
1518 &mut self,
1519 ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
1520 let left_result = build_side_determined_results(
1522 &self.left,
1523 &self.schema,
1524 self.left.input_buffer.num_rows(),
1525 self.right.input_buffer.schema(),
1526 self.join_type,
1527 &self.column_indices,
1528 )?;
1529 let right_result = build_side_determined_results(
1531 &self.right,
1532 &self.schema,
1533 self.right.input_buffer.num_rows(),
1534 self.left.input_buffer.schema(),
1535 self.join_type,
1536 &self.column_indices,
1537 )?;
1538
1539 let result = combine_two_batches(&self.schema, left_result, right_result)?;
1541
1542 if result.is_some() {
1544 return Ok(StatefulStreamResult::Ready(result));
1545 }
1546 Ok(StatefulStreamResult::Continue)
1547 }
1548
1549 fn right_stream(&mut self) -> &mut SendableRecordBatchStream {
1550 &mut self.right_stream
1551 }
1552
1553 fn left_stream(&mut self) -> &mut SendableRecordBatchStream {
1554 &mut self.left_stream
1555 }
1556
1557 fn set_state(&mut self, state: SHJStreamState) {
1558 self.state = state;
1559 }
1560
1561 fn state(&mut self) -> SHJStreamState {
1562 self.state.clone()
1563 }
1564
1565 fn size(&self) -> usize {
1566 let mut size = 0;
1567 size += size_of_val(&self.schema);
1568 size += size_of_val(&self.filter);
1569 size += size_of_val(&self.join_type);
1570 size += self.left.size();
1571 size += self.right.size();
1572 size += size_of_val(&self.column_indices);
1573 size += self.graph.as_ref().map(|g| g.size()).unwrap_or(0);
1574 size += size_of_val(&self.left_sorted_filter_expr);
1575 size += size_of_val(&self.right_sorted_filter_expr);
1576 size += size_of_val(&self.random_state);
1577 size += size_of_val(&self.null_equals_null);
1578 size += size_of_val(&self.metrics);
1579 size
1580 }
1581
1582 fn perform_join_for_given_side(
1590 &mut self,
1591 probe_batch: RecordBatch,
1592 probe_side: JoinSide,
1593 ) -> Result<Option<RecordBatch>> {
1594 let (
1595 probe_hash_joiner,
1596 build_hash_joiner,
1597 probe_side_sorted_filter_expr,
1598 build_side_sorted_filter_expr,
1599 probe_side_metrics,
1600 ) = if probe_side.eq(&JoinSide::Left) {
1601 (
1602 &mut self.left,
1603 &mut self.right,
1604 &mut self.left_sorted_filter_expr,
1605 &mut self.right_sorted_filter_expr,
1606 &mut self.metrics.left,
1607 )
1608 } else {
1609 (
1610 &mut self.right,
1611 &mut self.left,
1612 &mut self.right_sorted_filter_expr,
1613 &mut self.left_sorted_filter_expr,
1614 &mut self.metrics.right,
1615 )
1616 };
1617 probe_side_metrics.input_batches.add(1);
1619 probe_side_metrics.input_rows.add(probe_batch.num_rows());
1620 probe_hash_joiner.update_internal_state(&probe_batch, &self.random_state)?;
1622 let equal_result = join_with_probe_batch(
1624 build_hash_joiner,
1625 probe_hash_joiner,
1626 &self.schema,
1627 self.join_type,
1628 self.filter.as_ref(),
1629 &probe_batch,
1630 &self.column_indices,
1631 &self.random_state,
1632 self.null_equals_null,
1633 )?;
1634 probe_hash_joiner.offset += probe_batch.num_rows();
1636
1637 let anti_result = if let (
1638 Some(build_side_sorted_filter_expr),
1639 Some(probe_side_sorted_filter_expr),
1640 Some(graph),
1641 ) = (
1642 build_side_sorted_filter_expr.as_mut(),
1643 probe_side_sorted_filter_expr.as_mut(),
1644 self.graph.as_mut(),
1645 ) {
1646 calculate_filter_expr_intervals(
1648 &build_hash_joiner.input_buffer,
1649 build_side_sorted_filter_expr,
1650 &probe_batch,
1651 probe_side_sorted_filter_expr,
1652 )?;
1653 let prune_length = build_hash_joiner
1654 .calculate_prune_length_with_probe_batch(
1655 build_side_sorted_filter_expr,
1656 probe_side_sorted_filter_expr,
1657 graph,
1658 )?;
1659 let result = build_side_determined_results(
1660 build_hash_joiner,
1661 &self.schema,
1662 prune_length,
1663 probe_batch.schema(),
1664 self.join_type,
1665 &self.column_indices,
1666 )?;
1667 build_hash_joiner.prune_internal_state(prune_length)?;
1668 result
1669 } else {
1670 None
1671 };
1672
1673 let result = combine_two_batches(&self.schema, equal_result, anti_result)?;
1675 let capacity = self.size();
1676 self.metrics.stream_memory_usage.set(capacity);
1677 self.reservation.lock().try_resize(capacity)?;
1678 Ok(result)
1679 }
1680}
1681
1682#[derive(Clone, Debug)]
1690pub enum SHJStreamState {
1691 PullRight,
1693
1694 PullLeft,
1696
1697 RightExhausted,
1699
1700 LeftExhausted,
1702
1703 BothExhausted { final_result: bool },
1708}
1709
1710#[cfg(test)]
1711mod tests {
1712 use std::collections::HashMap;
1713 use std::sync::{LazyLock, Mutex};
1714
1715 use super::*;
1716 use crate::joins::test_utils::{
1717 build_sides_record_batches, compare_batches, complicated_filter,
1718 create_memory_table, join_expr_tests_fixture_f64, join_expr_tests_fixture_i32,
1719 join_expr_tests_fixture_temporal, partitioned_hash_join_with_filter,
1720 partitioned_sym_join_with_filter, split_record_batches,
1721 };
1722
1723 use arrow::compute::SortOptions;
1724 use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
1725 use datafusion_common::ScalarValue;
1726 use datafusion_execution::config::SessionConfig;
1727 use datafusion_expr::Operator;
1728 use datafusion_physical_expr::expressions::{binary, col, lit, Column};
1729 use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
1730
1731 use rstest::*;
1732
1733 const TABLE_SIZE: i32 = 30;
1734
1735 type TableKey = (i32, i32, usize); type TableValue = (Vec<RecordBatch>, Vec<RecordBatch>); static TABLE_CACHE: LazyLock<Mutex<HashMap<TableKey, TableValue>>> =
1740 LazyLock::new(|| Mutex::new(HashMap::new()));
1741
1742 fn get_or_create_table(
1743 cardinality: (i32, i32),
1744 batch_size: usize,
1745 ) -> Result<TableValue> {
1746 {
1747 let cache = TABLE_CACHE.lock().unwrap();
1748 if let Some(table) = cache.get(&(cardinality.0, cardinality.1, batch_size)) {
1749 return Ok(table.clone());
1750 }
1751 }
1752
1753 let (left_batch, right_batch) =
1755 build_sides_record_batches(TABLE_SIZE, cardinality)?;
1756
1757 let (left_partition, right_partition) = (
1758 split_record_batches(&left_batch, batch_size)?,
1759 split_record_batches(&right_batch, batch_size)?,
1760 );
1761
1762 let mut cache = TABLE_CACHE.lock().unwrap();
1764
1765 cache.insert(
1767 (cardinality.0, cardinality.1, batch_size),
1768 (left_partition.clone(), right_partition.clone()),
1769 );
1770
1771 Ok((left_partition, right_partition))
1772 }
1773
1774 pub async fn experiment(
1775 left: Arc<dyn ExecutionPlan>,
1776 right: Arc<dyn ExecutionPlan>,
1777 filter: Option<JoinFilter>,
1778 join_type: JoinType,
1779 on: JoinOn,
1780 task_ctx: Arc<TaskContext>,
1781 ) -> Result<()> {
1782 let first_batches = partitioned_sym_join_with_filter(
1783 Arc::clone(&left),
1784 Arc::clone(&right),
1785 on.clone(),
1786 filter.clone(),
1787 &join_type,
1788 false,
1789 Arc::clone(&task_ctx),
1790 )
1791 .await?;
1792 let second_batches = partitioned_hash_join_with_filter(
1793 left, right, on, filter, &join_type, false, task_ctx,
1794 )
1795 .await?;
1796 compare_batches(&first_batches, &second_batches);
1797 Ok(())
1798 }
1799
1800 #[rstest]
1801 #[tokio::test(flavor = "multi_thread")]
1802 async fn complex_join_all_one_ascending_numeric(
1803 #[values(
1804 JoinType::Inner,
1805 JoinType::Left,
1806 JoinType::Right,
1807 JoinType::RightSemi,
1808 JoinType::LeftSemi,
1809 JoinType::LeftAnti,
1810 JoinType::LeftMark,
1811 JoinType::RightAnti,
1812 JoinType::Full
1813 )]
1814 join_type: JoinType,
1815 #[values(
1816 (4, 5),
1817 (12, 17),
1818 )]
1819 cardinality: (i32, i32),
1820 ) -> Result<()> {
1821 let task_ctx = Arc::new(TaskContext::default());
1823
1824 let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
1825
1826 let left_schema = &left_partition[0].schema();
1827 let right_schema = &right_partition[0].schema();
1828
1829 let left_sorted = LexOrdering::new(vec![PhysicalSortExpr {
1830 expr: binary(
1831 col("la1", left_schema)?,
1832 Operator::Plus,
1833 col("la2", left_schema)?,
1834 left_schema,
1835 )?,
1836 options: SortOptions::default(),
1837 }]);
1838 let right_sorted = LexOrdering::new(vec![PhysicalSortExpr {
1839 expr: col("ra1", right_schema)?,
1840 options: SortOptions::default(),
1841 }]);
1842 let (left, right) = create_memory_table(
1843 left_partition,
1844 right_partition,
1845 vec![left_sorted],
1846 vec![right_sorted],
1847 )?;
1848
1849 let on = vec![(
1850 binary(
1851 col("lc1", left_schema)?,
1852 Operator::Plus,
1853 lit(ScalarValue::Int32(Some(1))),
1854 left_schema,
1855 )?,
1856 Arc::new(Column::new_with_schema("rc1", right_schema)?) as _,
1857 )];
1858
1859 let intermediate_schema = Schema::new(vec![
1860 Field::new("0", DataType::Int32, true),
1861 Field::new("1", DataType::Int32, true),
1862 Field::new("2", DataType::Int32, true),
1863 ]);
1864 let filter_expr = complicated_filter(&intermediate_schema)?;
1865 let column_indices = vec![
1866 ColumnIndex {
1867 index: left_schema.index_of("la1")?,
1868 side: JoinSide::Left,
1869 },
1870 ColumnIndex {
1871 index: left_schema.index_of("la2")?,
1872 side: JoinSide::Left,
1873 },
1874 ColumnIndex {
1875 index: right_schema.index_of("ra1")?,
1876 side: JoinSide::Right,
1877 },
1878 ];
1879 let filter =
1880 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
1881
1882 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
1883 Ok(())
1884 }
1885
1886 #[rstest]
1887 #[tokio::test(flavor = "multi_thread")]
1888 async fn join_all_one_ascending_numeric(
1889 #[values(
1890 JoinType::Inner,
1891 JoinType::Left,
1892 JoinType::Right,
1893 JoinType::RightSemi,
1894 JoinType::LeftSemi,
1895 JoinType::LeftAnti,
1896 JoinType::LeftMark,
1897 JoinType::RightAnti,
1898 JoinType::Full
1899 )]
1900 join_type: JoinType,
1901 #[values(0, 1, 2, 3, 4, 5)] case_expr: usize,
1902 ) -> Result<()> {
1903 let task_ctx = Arc::new(TaskContext::default());
1904 let (left_partition, right_partition) = get_or_create_table((4, 5), 8)?;
1905
1906 let left_schema = &left_partition[0].schema();
1907 let right_schema = &right_partition[0].schema();
1908
1909 let left_sorted = LexOrdering::new(vec![PhysicalSortExpr {
1910 expr: col("la1", left_schema)?,
1911 options: SortOptions::default(),
1912 }]);
1913 let right_sorted = LexOrdering::new(vec![PhysicalSortExpr {
1914 expr: col("ra1", right_schema)?,
1915 options: SortOptions::default(),
1916 }]);
1917 let (left, right) = create_memory_table(
1918 left_partition,
1919 right_partition,
1920 vec![left_sorted],
1921 vec![right_sorted],
1922 )?;
1923
1924 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
1925
1926 let intermediate_schema = Schema::new(vec![
1927 Field::new("left", DataType::Int32, true),
1928 Field::new("right", DataType::Int32, true),
1929 ]);
1930 let filter_expr = join_expr_tests_fixture_i32(
1931 case_expr,
1932 col("left", &intermediate_schema)?,
1933 col("right", &intermediate_schema)?,
1934 );
1935 let column_indices = vec![
1936 ColumnIndex {
1937 index: 0,
1938 side: JoinSide::Left,
1939 },
1940 ColumnIndex {
1941 index: 0,
1942 side: JoinSide::Right,
1943 },
1944 ];
1945 let filter =
1946 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
1947
1948 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
1949 Ok(())
1950 }
1951
1952 #[rstest]
1953 #[tokio::test(flavor = "multi_thread")]
1954 async fn join_without_sort_information(
1955 #[values(
1956 JoinType::Inner,
1957 JoinType::Left,
1958 JoinType::Right,
1959 JoinType::RightSemi,
1960 JoinType::LeftSemi,
1961 JoinType::LeftAnti,
1962 JoinType::LeftMark,
1963 JoinType::RightAnti,
1964 JoinType::Full
1965 )]
1966 join_type: JoinType,
1967 #[values(0, 1, 2, 3, 4, 5)] case_expr: usize,
1968 ) -> Result<()> {
1969 let task_ctx = Arc::new(TaskContext::default());
1970 let (left_partition, right_partition) = get_or_create_table((4, 5), 8)?;
1971
1972 let left_schema = &left_partition[0].schema();
1973 let right_schema = &right_partition[0].schema();
1974 let (left, right) =
1975 create_memory_table(left_partition, right_partition, vec![], vec![])?;
1976
1977 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
1978
1979 let intermediate_schema = Schema::new(vec![
1980 Field::new("left", DataType::Int32, true),
1981 Field::new("right", DataType::Int32, true),
1982 ]);
1983 let filter_expr = join_expr_tests_fixture_i32(
1984 case_expr,
1985 col("left", &intermediate_schema)?,
1986 col("right", &intermediate_schema)?,
1987 );
1988 let column_indices = vec![
1989 ColumnIndex {
1990 index: 5,
1991 side: JoinSide::Left,
1992 },
1993 ColumnIndex {
1994 index: 5,
1995 side: JoinSide::Right,
1996 },
1997 ];
1998 let filter =
1999 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2000
2001 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2002 Ok(())
2003 }
2004
2005 #[rstest]
2006 #[tokio::test(flavor = "multi_thread")]
2007 async fn join_without_filter(
2008 #[values(
2009 JoinType::Inner,
2010 JoinType::Left,
2011 JoinType::Right,
2012 JoinType::RightSemi,
2013 JoinType::LeftSemi,
2014 JoinType::LeftAnti,
2015 JoinType::LeftMark,
2016 JoinType::RightAnti,
2017 JoinType::Full
2018 )]
2019 join_type: JoinType,
2020 ) -> Result<()> {
2021 let task_ctx = Arc::new(TaskContext::default());
2022 let (left_partition, right_partition) = get_or_create_table((11, 21), 8)?;
2023 let left_schema = &left_partition[0].schema();
2024 let right_schema = &right_partition[0].schema();
2025 let (left, right) =
2026 create_memory_table(left_partition, right_partition, vec![], vec![])?;
2027
2028 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2029 experiment(left, right, None, join_type, on, task_ctx).await?;
2030 Ok(())
2031 }
2032
2033 #[rstest]
2034 #[tokio::test(flavor = "multi_thread")]
2035 async fn join_all_one_descending_numeric_particular(
2036 #[values(
2037 JoinType::Inner,
2038 JoinType::Left,
2039 JoinType::Right,
2040 JoinType::RightSemi,
2041 JoinType::LeftSemi,
2042 JoinType::LeftAnti,
2043 JoinType::LeftMark,
2044 JoinType::RightAnti,
2045 JoinType::Full
2046 )]
2047 join_type: JoinType,
2048 #[values(0, 1, 2, 3, 4, 5)] case_expr: usize,
2049 ) -> Result<()> {
2050 let task_ctx = Arc::new(TaskContext::default());
2051 let (left_partition, right_partition) = get_or_create_table((11, 21), 8)?;
2052 let left_schema = &left_partition[0].schema();
2053 let right_schema = &right_partition[0].schema();
2054 let left_sorted = LexOrdering::new(vec![PhysicalSortExpr {
2055 expr: col("la1_des", left_schema)?,
2056 options: SortOptions {
2057 descending: true,
2058 nulls_first: true,
2059 },
2060 }]);
2061 let right_sorted = LexOrdering::new(vec![PhysicalSortExpr {
2062 expr: col("ra1_des", right_schema)?,
2063 options: SortOptions {
2064 descending: true,
2065 nulls_first: true,
2066 },
2067 }]);
2068 let (left, right) = create_memory_table(
2069 left_partition,
2070 right_partition,
2071 vec![left_sorted],
2072 vec![right_sorted],
2073 )?;
2074
2075 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2076
2077 let intermediate_schema = Schema::new(vec![
2078 Field::new("left", DataType::Int32, true),
2079 Field::new("right", DataType::Int32, true),
2080 ]);
2081 let filter_expr = join_expr_tests_fixture_i32(
2082 case_expr,
2083 col("left", &intermediate_schema)?,
2084 col("right", &intermediate_schema)?,
2085 );
2086 let column_indices = vec![
2087 ColumnIndex {
2088 index: 5,
2089 side: JoinSide::Left,
2090 },
2091 ColumnIndex {
2092 index: 5,
2093 side: JoinSide::Right,
2094 },
2095 ];
2096 let filter =
2097 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2098
2099 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2100 Ok(())
2101 }
2102
2103 #[tokio::test(flavor = "multi_thread")]
2104 async fn build_null_columns_first() -> Result<()> {
2105 let join_type = JoinType::Full;
2106 let case_expr = 1;
2107 let session_config = SessionConfig::new().with_repartition_joins(false);
2108 let task_ctx = TaskContext::default().with_session_config(session_config);
2109 let task_ctx = Arc::new(task_ctx);
2110 let (left_partition, right_partition) = get_or_create_table((10, 11), 8)?;
2111 let left_schema = &left_partition[0].schema();
2112 let right_schema = &right_partition[0].schema();
2113 let left_sorted = LexOrdering::new(vec![PhysicalSortExpr {
2114 expr: col("l_asc_null_first", left_schema)?,
2115 options: SortOptions {
2116 descending: false,
2117 nulls_first: true,
2118 },
2119 }]);
2120 let right_sorted = LexOrdering::new(vec![PhysicalSortExpr {
2121 expr: col("r_asc_null_first", right_schema)?,
2122 options: SortOptions {
2123 descending: false,
2124 nulls_first: true,
2125 },
2126 }]);
2127 let (left, right) = create_memory_table(
2128 left_partition,
2129 right_partition,
2130 vec![left_sorted],
2131 vec![right_sorted],
2132 )?;
2133
2134 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2135
2136 let intermediate_schema = Schema::new(vec![
2137 Field::new("left", DataType::Int32, true),
2138 Field::new("right", DataType::Int32, true),
2139 ]);
2140 let filter_expr = join_expr_tests_fixture_i32(
2141 case_expr,
2142 col("left", &intermediate_schema)?,
2143 col("right", &intermediate_schema)?,
2144 );
2145 let column_indices = vec![
2146 ColumnIndex {
2147 index: 6,
2148 side: JoinSide::Left,
2149 },
2150 ColumnIndex {
2151 index: 6,
2152 side: JoinSide::Right,
2153 },
2154 ];
2155 let filter =
2156 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2157 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2158 Ok(())
2159 }
2160
2161 #[tokio::test(flavor = "multi_thread")]
2162 async fn build_null_columns_last() -> Result<()> {
2163 let join_type = JoinType::Full;
2164 let case_expr = 1;
2165 let session_config = SessionConfig::new().with_repartition_joins(false);
2166 let task_ctx = TaskContext::default().with_session_config(session_config);
2167 let task_ctx = Arc::new(task_ctx);
2168 let (left_partition, right_partition) = get_or_create_table((10, 11), 8)?;
2169
2170 let left_schema = &left_partition[0].schema();
2171 let right_schema = &right_partition[0].schema();
2172 let left_sorted = LexOrdering::new(vec![PhysicalSortExpr {
2173 expr: col("l_asc_null_last", left_schema)?,
2174 options: SortOptions {
2175 descending: false,
2176 nulls_first: false,
2177 },
2178 }]);
2179 let right_sorted = LexOrdering::new(vec![PhysicalSortExpr {
2180 expr: col("r_asc_null_last", right_schema)?,
2181 options: SortOptions {
2182 descending: false,
2183 nulls_first: false,
2184 },
2185 }]);
2186 let (left, right) = create_memory_table(
2187 left_partition,
2188 right_partition,
2189 vec![left_sorted],
2190 vec![right_sorted],
2191 )?;
2192
2193 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2194
2195 let intermediate_schema = Schema::new(vec![
2196 Field::new("left", DataType::Int32, true),
2197 Field::new("right", DataType::Int32, true),
2198 ]);
2199 let filter_expr = join_expr_tests_fixture_i32(
2200 case_expr,
2201 col("left", &intermediate_schema)?,
2202 col("right", &intermediate_schema)?,
2203 );
2204 let column_indices = vec![
2205 ColumnIndex {
2206 index: 7,
2207 side: JoinSide::Left,
2208 },
2209 ColumnIndex {
2210 index: 7,
2211 side: JoinSide::Right,
2212 },
2213 ];
2214 let filter =
2215 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2216
2217 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2218 Ok(())
2219 }
2220
2221 #[tokio::test(flavor = "multi_thread")]
2222 async fn build_null_columns_first_descending() -> Result<()> {
2223 let join_type = JoinType::Full;
2224 let cardinality = (10, 11);
2225 let case_expr = 1;
2226 let session_config = SessionConfig::new().with_repartition_joins(false);
2227 let task_ctx = TaskContext::default().with_session_config(session_config);
2228 let task_ctx = Arc::new(task_ctx);
2229 let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2230
2231 let left_schema = &left_partition[0].schema();
2232 let right_schema = &right_partition[0].schema();
2233 let left_sorted = LexOrdering::new(vec![PhysicalSortExpr {
2234 expr: col("l_desc_null_first", left_schema)?,
2235 options: SortOptions {
2236 descending: true,
2237 nulls_first: true,
2238 },
2239 }]);
2240 let right_sorted = LexOrdering::new(vec![PhysicalSortExpr {
2241 expr: col("r_desc_null_first", right_schema)?,
2242 options: SortOptions {
2243 descending: true,
2244 nulls_first: true,
2245 },
2246 }]);
2247 let (left, right) = create_memory_table(
2248 left_partition,
2249 right_partition,
2250 vec![left_sorted],
2251 vec![right_sorted],
2252 )?;
2253
2254 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2255
2256 let intermediate_schema = Schema::new(vec![
2257 Field::new("left", DataType::Int32, true),
2258 Field::new("right", DataType::Int32, true),
2259 ]);
2260 let filter_expr = join_expr_tests_fixture_i32(
2261 case_expr,
2262 col("left", &intermediate_schema)?,
2263 col("right", &intermediate_schema)?,
2264 );
2265 let column_indices = vec![
2266 ColumnIndex {
2267 index: 8,
2268 side: JoinSide::Left,
2269 },
2270 ColumnIndex {
2271 index: 8,
2272 side: JoinSide::Right,
2273 },
2274 ];
2275 let filter =
2276 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2277
2278 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2279 Ok(())
2280 }
2281
2282 #[tokio::test(flavor = "multi_thread")]
2283 async fn complex_join_all_one_ascending_numeric_missing_stat() -> Result<()> {
2284 let cardinality = (3, 4);
2285 let join_type = JoinType::Full;
2286
2287 let session_config = SessionConfig::new().with_repartition_joins(false);
2289 let task_ctx = TaskContext::default().with_session_config(session_config);
2290 let task_ctx = Arc::new(task_ctx);
2291 let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2292
2293 let left_schema = &left_partition[0].schema();
2294 let right_schema = &right_partition[0].schema();
2295 let left_sorted = LexOrdering::new(vec![PhysicalSortExpr {
2296 expr: col("la1", left_schema)?,
2297 options: SortOptions::default(),
2298 }]);
2299
2300 let right_sorted = LexOrdering::new(vec![PhysicalSortExpr {
2301 expr: col("ra1", right_schema)?,
2302 options: SortOptions::default(),
2303 }]);
2304 let (left, right) = create_memory_table(
2305 left_partition,
2306 right_partition,
2307 vec![left_sorted],
2308 vec![right_sorted],
2309 )?;
2310
2311 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2312
2313 let intermediate_schema = Schema::new(vec![
2314 Field::new("0", DataType::Int32, true),
2315 Field::new("1", DataType::Int32, true),
2316 Field::new("2", DataType::Int32, true),
2317 ]);
2318 let filter_expr = complicated_filter(&intermediate_schema)?;
2319 let column_indices = vec![
2320 ColumnIndex {
2321 index: 0,
2322 side: JoinSide::Left,
2323 },
2324 ColumnIndex {
2325 index: 4,
2326 side: JoinSide::Left,
2327 },
2328 ColumnIndex {
2329 index: 0,
2330 side: JoinSide::Right,
2331 },
2332 ];
2333 let filter =
2334 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2335
2336 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2337 Ok(())
2338 }
2339
2340 #[tokio::test(flavor = "multi_thread")]
2341 async fn complex_join_all_one_ascending_equivalence() -> Result<()> {
2342 let cardinality = (3, 4);
2343 let join_type = JoinType::Full;
2344
2345 let config = SessionConfig::new().with_repartition_joins(false);
2347 let task_ctx = Arc::new(TaskContext::default().with_session_config(config));
2350 let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2351 let left_schema = &left_partition[0].schema();
2352 let right_schema = &right_partition[0].schema();
2353 let left_sorted = vec![
2354 LexOrdering::new(vec![PhysicalSortExpr {
2355 expr: col("la1", left_schema)?,
2356 options: SortOptions::default(),
2357 }]),
2358 LexOrdering::new(vec![PhysicalSortExpr {
2359 expr: col("la2", left_schema)?,
2360 options: SortOptions::default(),
2361 }]),
2362 ];
2363
2364 let right_sorted = LexOrdering::new(vec![PhysicalSortExpr {
2365 expr: col("ra1", right_schema)?,
2366 options: SortOptions::default(),
2367 }]);
2368
2369 let (left, right) = create_memory_table(
2370 left_partition,
2371 right_partition,
2372 left_sorted,
2373 vec![right_sorted],
2374 )?;
2375
2376 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2377
2378 let intermediate_schema = Schema::new(vec![
2379 Field::new("0", DataType::Int32, true),
2380 Field::new("1", DataType::Int32, true),
2381 Field::new("2", DataType::Int32, true),
2382 ]);
2383 let filter_expr = complicated_filter(&intermediate_schema)?;
2384 let column_indices = vec![
2385 ColumnIndex {
2386 index: 0,
2387 side: JoinSide::Left,
2388 },
2389 ColumnIndex {
2390 index: 4,
2391 side: JoinSide::Left,
2392 },
2393 ColumnIndex {
2394 index: 0,
2395 side: JoinSide::Right,
2396 },
2397 ];
2398 let filter =
2399 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2400
2401 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2402 Ok(())
2403 }
2404
2405 #[rstest]
2406 #[tokio::test(flavor = "multi_thread")]
2407 async fn testing_with_temporal_columns(
2408 #[values(
2409 JoinType::Inner,
2410 JoinType::Left,
2411 JoinType::Right,
2412 JoinType::RightSemi,
2413 JoinType::LeftSemi,
2414 JoinType::LeftAnti,
2415 JoinType::LeftMark,
2416 JoinType::RightAnti,
2417 JoinType::Full
2418 )]
2419 join_type: JoinType,
2420 #[values(
2421 (4, 5),
2422 (12, 17),
2423 )]
2424 cardinality: (i32, i32),
2425 #[values(0, 1, 2)] case_expr: usize,
2426 ) -> Result<()> {
2427 let session_config = SessionConfig::new().with_repartition_joins(false);
2428 let task_ctx = TaskContext::default().with_session_config(session_config);
2429 let task_ctx = Arc::new(task_ctx);
2430 let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2431
2432 let left_schema = &left_partition[0].schema();
2433 let right_schema = &right_partition[0].schema();
2434 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2435 let left_sorted = LexOrdering::new(vec![PhysicalSortExpr {
2436 expr: col("lt1", left_schema)?,
2437 options: SortOptions {
2438 descending: false,
2439 nulls_first: true,
2440 },
2441 }]);
2442 let right_sorted = LexOrdering::new(vec![PhysicalSortExpr {
2443 expr: col("rt1", right_schema)?,
2444 options: SortOptions {
2445 descending: false,
2446 nulls_first: true,
2447 },
2448 }]);
2449 let (left, right) = create_memory_table(
2450 left_partition,
2451 right_partition,
2452 vec![left_sorted],
2453 vec![right_sorted],
2454 )?;
2455 let intermediate_schema = Schema::new(vec![
2456 Field::new(
2457 "left",
2458 DataType::Timestamp(TimeUnit::Millisecond, None),
2459 false,
2460 ),
2461 Field::new(
2462 "right",
2463 DataType::Timestamp(TimeUnit::Millisecond, None),
2464 false,
2465 ),
2466 ]);
2467 let filter_expr = join_expr_tests_fixture_temporal(
2468 case_expr,
2469 col("left", &intermediate_schema)?,
2470 col("right", &intermediate_schema)?,
2471 &intermediate_schema,
2472 )?;
2473 let column_indices = vec![
2474 ColumnIndex {
2475 index: 3,
2476 side: JoinSide::Left,
2477 },
2478 ColumnIndex {
2479 index: 3,
2480 side: JoinSide::Right,
2481 },
2482 ];
2483 let filter =
2484 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2485 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2486 Ok(())
2487 }
2488
2489 #[rstest]
2490 #[tokio::test(flavor = "multi_thread")]
2491 async fn test_with_interval_columns(
2492 #[values(
2493 JoinType::Inner,
2494 JoinType::Left,
2495 JoinType::Right,
2496 JoinType::RightSemi,
2497 JoinType::LeftSemi,
2498 JoinType::LeftAnti,
2499 JoinType::LeftMark,
2500 JoinType::RightAnti,
2501 JoinType::Full
2502 )]
2503 join_type: JoinType,
2504 #[values(
2505 (4, 5),
2506 (12, 17),
2507 )]
2508 cardinality: (i32, i32),
2509 ) -> Result<()> {
2510 let session_config = SessionConfig::new().with_repartition_joins(false);
2511 let task_ctx = TaskContext::default().with_session_config(session_config);
2512 let task_ctx = Arc::new(task_ctx);
2513 let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2514
2515 let left_schema = &left_partition[0].schema();
2516 let right_schema = &right_partition[0].schema();
2517 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2518 let left_sorted = LexOrdering::new(vec![PhysicalSortExpr {
2519 expr: col("li1", left_schema)?,
2520 options: SortOptions {
2521 descending: false,
2522 nulls_first: true,
2523 },
2524 }]);
2525 let right_sorted = LexOrdering::new(vec![PhysicalSortExpr {
2526 expr: col("ri1", right_schema)?,
2527 options: SortOptions {
2528 descending: false,
2529 nulls_first: true,
2530 },
2531 }]);
2532 let (left, right) = create_memory_table(
2533 left_partition,
2534 right_partition,
2535 vec![left_sorted],
2536 vec![right_sorted],
2537 )?;
2538 let intermediate_schema = Schema::new(vec![
2539 Field::new("left", DataType::Interval(IntervalUnit::DayTime), false),
2540 Field::new("right", DataType::Interval(IntervalUnit::DayTime), false),
2541 ]);
2542 let filter_expr = join_expr_tests_fixture_temporal(
2543 0,
2544 col("left", &intermediate_schema)?,
2545 col("right", &intermediate_schema)?,
2546 &intermediate_schema,
2547 )?;
2548 let column_indices = vec![
2549 ColumnIndex {
2550 index: 9,
2551 side: JoinSide::Left,
2552 },
2553 ColumnIndex {
2554 index: 9,
2555 side: JoinSide::Right,
2556 },
2557 ];
2558 let filter =
2559 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2560 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2561
2562 Ok(())
2563 }
2564
2565 #[rstest]
2566 #[tokio::test(flavor = "multi_thread")]
2567 async fn testing_ascending_float_pruning(
2568 #[values(
2569 JoinType::Inner,
2570 JoinType::Left,
2571 JoinType::Right,
2572 JoinType::RightSemi,
2573 JoinType::LeftSemi,
2574 JoinType::LeftAnti,
2575 JoinType::LeftMark,
2576 JoinType::RightAnti,
2577 JoinType::Full
2578 )]
2579 join_type: JoinType,
2580 #[values(
2581 (4, 5),
2582 (12, 17),
2583 )]
2584 cardinality: (i32, i32),
2585 #[values(0, 1, 2, 3, 4, 5)] case_expr: usize,
2586 ) -> Result<()> {
2587 let session_config = SessionConfig::new().with_repartition_joins(false);
2588 let task_ctx = TaskContext::default().with_session_config(session_config);
2589 let task_ctx = Arc::new(task_ctx);
2590 let (left_partition, right_partition) = get_or_create_table(cardinality, 8)?;
2591
2592 let left_schema = &left_partition[0].schema();
2593 let right_schema = &right_partition[0].schema();
2594 let left_sorted = LexOrdering::new(vec![PhysicalSortExpr {
2595 expr: col("l_float", left_schema)?,
2596 options: SortOptions::default(),
2597 }]);
2598 let right_sorted = LexOrdering::new(vec![PhysicalSortExpr {
2599 expr: col("r_float", right_schema)?,
2600 options: SortOptions::default(),
2601 }]);
2602 let (left, right) = create_memory_table(
2603 left_partition,
2604 right_partition,
2605 vec![left_sorted],
2606 vec![right_sorted],
2607 )?;
2608
2609 let on = vec![(col("lc1", left_schema)?, col("rc1", right_schema)?)];
2610
2611 let intermediate_schema = Schema::new(vec![
2612 Field::new("left", DataType::Float64, true),
2613 Field::new("right", DataType::Float64, true),
2614 ]);
2615 let filter_expr = join_expr_tests_fixture_f64(
2616 case_expr,
2617 col("left", &intermediate_schema)?,
2618 col("right", &intermediate_schema)?,
2619 );
2620 let column_indices = vec![
2621 ColumnIndex {
2622 index: 10, side: JoinSide::Left,
2624 },
2625 ColumnIndex {
2626 index: 10, side: JoinSide::Right,
2628 },
2629 ];
2630 let filter =
2631 JoinFilter::new(filter_expr, column_indices, Arc::new(intermediate_schema));
2632
2633 experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
2634 Ok(())
2635 }
2636}