datafusion_physical_expr/
partitioning.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`Partitioning`] and [`Distribution`] for `ExecutionPlans`
19
20use crate::{
21    equivalence::ProjectionMapping, expressions::UnKnownColumn, physical_exprs_equal,
22    EquivalenceProperties, PhysicalExpr,
23};
24use datafusion_physical_expr_common::physical_expr::format_physical_expr_list;
25use std::fmt;
26use std::fmt::Display;
27use std::sync::Arc;
28
29/// Output partitioning supported by [`ExecutionPlan`]s.
30///
31/// Calling [`ExecutionPlan::execute`] produce one or more independent streams of
32/// [`RecordBatch`]es in parallel, referred to as partitions. The streams are Rust
33/// `async` [`Stream`]s (a special kind of future). The number of output
34/// partitions varies based on the input and the operation performed.
35///
36/// For example, an `ExecutionPlan` that has output partitioning of 3 will
37/// produce 3 distinct output streams as the result of calling
38/// `ExecutionPlan::execute(0)`, `ExecutionPlan::execute(1)`, and
39/// `ExecutionPlan::execute(2)`, as shown below:
40///
41/// ```text
42///                                                   ...         ...        ...
43///               ...                                  ▲           ▲           ▲
44///                                                    │           │           │
45///                ▲                                   │           │           │
46///                │                                   │           │           │
47///                │                               ┌───┴────┐  ┌───┴────┐  ┌───┴────┐
48///     ┌────────────────────┐                     │ Stream │  │ Stream │  │ Stream │
49///     │   ExecutionPlan    │                     │  (0)   │  │  (1)   │  │  (2)   │
50///     └────────────────────┘                     └────────┘  └────────┘  └────────┘
51///                ▲                                   ▲           ▲           ▲
52///                │                                   │           │           │
53///     ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                          │           │           │
54///             Input        │                         │           │           │
55///     └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                          │           │           │
56///                ▲                               ┌ ─ ─ ─ ─   ┌ ─ ─ ─ ─   ┌ ─ ─ ─ ─
57///                │                                 Input  │    Input  │    Input  │
58///                │                               │ Stream    │ Stream    │ Stream
59///                                                   (0)   │     (1)   │     (2)   │
60///               ...                              └ ─ ▲ ─ ─   └ ─ ▲ ─ ─   └ ─ ▲ ─ ─
61///                                                    │           │           │
62///                                                    │           │           │
63///                                                    │           │           │
64///
65/// ExecutionPlan with 1 input                      3 (async) streams, one for each
66/// that has 3 partitions, which itself             output partition
67/// has 3 output partitions
68/// ```
69///
70/// It is common (but not required) that an `ExecutionPlan` has the same number
71/// of input partitions as output partitions. However, some plans have different
72/// numbers such as the `RepartitionExec` that redistributes batches from some
73/// number of inputs to some number of outputs
74///
75/// ```text
76///               ...                                     ...         ...        ...
77///
78///                                                        ▲           ▲           ▲
79///                ▲                                       │           │           │
80///                │                                       │           │           │
81///       ┌────────┴───────────┐                           │           │           │
82///       │  RepartitionExec   │                      ┌────┴───┐  ┌────┴───┐  ┌────┴───┐
83///       └────────────────────┘                      │ Stream │  │ Stream │  │ Stream │
84///                ▲                                  │  (0)   │  │  (1)   │  │  (2)   │
85///                │                                  └────────┘  └────────┘  └────────┘
86///                │                                       ▲           ▲           ▲
87///                ...                                     │           │           │
88///                                                        └──────────┐│┌──────────┘
89///                                                                   │││
90///                                                                   │││
91/// RepartitionExec with 1 input
92/// partition and 3 output partitions                 3 (async) streams, that internally
93///                                                    pull from the same input stream
94///                                                                  ...
95/// ```
96///
97/// # Additional Examples
98///
99/// A simple `FileScanExec` might produce one output stream (partition) for each
100/// file (note the actual DataFusion file scanners can read individual files in
101/// parallel, potentially producing multiple partitions per file)
102///
103/// Plans such as `SortPreservingMerge` produce a single output stream
104/// (1 output partition) by combining some number of input streams (input partitions)
105///
106/// Plans such as `FilterExec` produce the same number of output streams
107/// (partitions) as input streams (partitions).
108///
109/// [`RecordBatch`]: arrow::record_batch::RecordBatch
110/// [`ExecutionPlan::execute`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#tymethod.execute
111/// [`ExecutionPlan`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html
112/// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
113#[derive(Debug, Clone)]
114pub enum Partitioning {
115    /// Allocate batches using a round-robin algorithm and the specified number of partitions
116    RoundRobinBatch(usize),
117    /// Allocate rows based on a hash of one of more expressions and the specified number of
118    /// partitions
119    Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
120    /// Unknown partitioning scheme with a known number of partitions
121    UnknownPartitioning(usize),
122}
123
124impl Display for Partitioning {
125    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
126        match self {
127            Partitioning::RoundRobinBatch(size) => write!(f, "RoundRobinBatch({size})"),
128            Partitioning::Hash(phy_exprs, size) => {
129                let phy_exprs_str = phy_exprs
130                    .iter()
131                    .map(|e| format!("{e}"))
132                    .collect::<Vec<String>>()
133                    .join(", ");
134                write!(f, "Hash([{phy_exprs_str}], {size})")
135            }
136            Partitioning::UnknownPartitioning(size) => {
137                write!(f, "UnknownPartitioning({size})")
138            }
139        }
140    }
141}
142impl Partitioning {
143    /// Returns the number of partitions in this partitioning scheme
144    pub fn partition_count(&self) -> usize {
145        use Partitioning::*;
146        match self {
147            RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) => *n,
148        }
149    }
150
151    /// Returns true when the guarantees made by this [`Partitioning`] are sufficient to
152    /// satisfy the partitioning scheme mandated by the `required` [`Distribution`].
153    pub fn satisfy(
154        &self,
155        required: &Distribution,
156        eq_properties: &EquivalenceProperties,
157    ) -> bool {
158        match required {
159            Distribution::UnspecifiedDistribution => true,
160            Distribution::SinglePartition if self.partition_count() == 1 => true,
161            // When partition count is 1, hash requirement is satisfied.
162            Distribution::HashPartitioned(_) if self.partition_count() == 1 => true,
163            Distribution::HashPartitioned(required_exprs) => {
164                match self {
165                    // Here we do not check the partition count for hash partitioning and assumes the partition count
166                    // and hash functions in the system are the same. In future if we plan to support storage partition-wise joins,
167                    // then we need to have the partition count and hash functions validation.
168                    Partitioning::Hash(partition_exprs, _) => {
169                        let fast_match =
170                            physical_exprs_equal(required_exprs, partition_exprs);
171                        // If the required exprs do not match, need to leverage the eq_properties provided by the child
172                        // and normalize both exprs based on the equivalent groups.
173                        if !fast_match {
174                            let eq_groups = eq_properties.eq_group();
175                            if !eq_groups.is_empty() {
176                                let normalized_required_exprs = required_exprs
177                                    .iter()
178                                    .map(|e| eq_groups.normalize_expr(Arc::clone(e)))
179                                    .collect::<Vec<_>>();
180                                let normalized_partition_exprs = partition_exprs
181                                    .iter()
182                                    .map(|e| eq_groups.normalize_expr(Arc::clone(e)))
183                                    .collect::<Vec<_>>();
184                                return physical_exprs_equal(
185                                    &normalized_required_exprs,
186                                    &normalized_partition_exprs,
187                                );
188                            }
189                        }
190                        fast_match
191                    }
192                    _ => false,
193                }
194            }
195            _ => false,
196        }
197    }
198
199    /// Calculate the output partitioning after applying the given projection.
200    pub fn project(
201        &self,
202        projection_mapping: &ProjectionMapping,
203        input_eq_properties: &EquivalenceProperties,
204    ) -> Self {
205        if let Partitioning::Hash(exprs, part) = self {
206            let normalized_exprs = exprs
207                .iter()
208                .map(|expr| {
209                    input_eq_properties
210                        .project_expr(expr, projection_mapping)
211                        .unwrap_or_else(|| {
212                            Arc::new(UnKnownColumn::new(&expr.to_string()))
213                        })
214                })
215                .collect();
216            Partitioning::Hash(normalized_exprs, *part)
217        } else {
218            self.clone()
219        }
220    }
221}
222
223impl PartialEq for Partitioning {
224    fn eq(&self, other: &Partitioning) -> bool {
225        match (self, other) {
226            (
227                Partitioning::RoundRobinBatch(count1),
228                Partitioning::RoundRobinBatch(count2),
229            ) if count1 == count2 => true,
230            (Partitioning::Hash(exprs1, count1), Partitioning::Hash(exprs2, count2))
231                if physical_exprs_equal(exprs1, exprs2) && (count1 == count2) =>
232            {
233                true
234            }
235            _ => false,
236        }
237    }
238}
239
240/// How data is distributed amongst partitions. See [`Partitioning`] for more
241/// details.
242#[derive(Debug, Clone)]
243pub enum Distribution {
244    /// Unspecified distribution
245    UnspecifiedDistribution,
246    /// A single partition is required
247    SinglePartition,
248    /// Requires children to be distributed in such a way that the same
249    /// values of the keys end up in the same partition
250    HashPartitioned(Vec<Arc<dyn PhysicalExpr>>),
251}
252
253impl Distribution {
254    /// Creates a `Partitioning` that satisfies this `Distribution`
255    pub fn create_partitioning(self, partition_count: usize) -> Partitioning {
256        match self {
257            Distribution::UnspecifiedDistribution => {
258                Partitioning::UnknownPartitioning(partition_count)
259            }
260            Distribution::SinglePartition => Partitioning::UnknownPartitioning(1),
261            Distribution::HashPartitioned(expr) => {
262                Partitioning::Hash(expr, partition_count)
263            }
264        }
265    }
266}
267
268impl Display for Distribution {
269    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
270        match self {
271            Distribution::UnspecifiedDistribution => write!(f, "Unspecified"),
272            Distribution::SinglePartition => write!(f, "SinglePartition"),
273            Distribution::HashPartitioned(exprs) => {
274                write!(f, "HashPartitioned[{}])", format_physical_expr_list(exprs))
275            }
276        }
277    }
278}
279
280#[cfg(test)]
281mod tests {
282
283    use super::*;
284    use crate::expressions::Column;
285
286    use arrow::datatypes::{DataType, Field, Schema};
287    use datafusion_common::Result;
288
289    #[test]
290    fn partitioning_satisfy_distribution() -> Result<()> {
291        let schema = Arc::new(Schema::new(vec![
292            Field::new("column_1", DataType::Int64, false),
293            Field::new("column_2", DataType::Utf8, false),
294        ]));
295
296        let partition_exprs1: Vec<Arc<dyn PhysicalExpr>> = vec![
297            Arc::new(Column::new_with_schema("column_1", &schema).unwrap()),
298            Arc::new(Column::new_with_schema("column_2", &schema).unwrap()),
299        ];
300
301        let partition_exprs2: Vec<Arc<dyn PhysicalExpr>> = vec![
302            Arc::new(Column::new_with_schema("column_2", &schema).unwrap()),
303            Arc::new(Column::new_with_schema("column_1", &schema).unwrap()),
304        ];
305
306        let distribution_types = vec![
307            Distribution::UnspecifiedDistribution,
308            Distribution::SinglePartition,
309            Distribution::HashPartitioned(partition_exprs1.clone()),
310        ];
311
312        let single_partition = Partitioning::UnknownPartitioning(1);
313        let unspecified_partition = Partitioning::UnknownPartitioning(10);
314        let round_robin_partition = Partitioning::RoundRobinBatch(10);
315        let hash_partition1 = Partitioning::Hash(partition_exprs1, 10);
316        let hash_partition2 = Partitioning::Hash(partition_exprs2, 10);
317        let eq_properties = EquivalenceProperties::new(schema);
318
319        for distribution in distribution_types {
320            let result = (
321                single_partition.satisfy(&distribution, &eq_properties),
322                unspecified_partition.satisfy(&distribution, &eq_properties),
323                round_robin_partition.satisfy(&distribution, &eq_properties),
324                hash_partition1.satisfy(&distribution, &eq_properties),
325                hash_partition2.satisfy(&distribution, &eq_properties),
326            );
327
328            match distribution {
329                Distribution::UnspecifiedDistribution => {
330                    assert_eq!(result, (true, true, true, true, true))
331                }
332                Distribution::SinglePartition => {
333                    assert_eq!(result, (true, false, false, false, false))
334                }
335                Distribution::HashPartitioned(_) => {
336                    assert_eq!(result, (true, false, false, true, false))
337                }
338            }
339        }
340
341        Ok(())
342    }
343}