datafusion_physical_plan/
display.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Implementation of physical plan display. See
19//! [`crate::displayable`] for examples of how to format
20
21use std::fmt;
22use std::fmt::Formatter;
23
24use arrow::datatypes::SchemaRef;
25
26use datafusion_common::display::{GraphvizBuilder, PlanType, StringifiedPlan};
27use datafusion_expr::display_schema;
28use datafusion_physical_expr::LexOrdering;
29
30use super::{accept, ExecutionPlan, ExecutionPlanVisitor};
31
32/// Options for controlling how each [`ExecutionPlan`] should format itself
33#[derive(Debug, Clone, Copy)]
34pub enum DisplayFormatType {
35    /// Default, compact format. Example: `FilterExec: c12 < 10.0`
36    Default,
37    /// Verbose, showing all available details
38    Verbose,
39}
40
41/// Wraps an `ExecutionPlan` with various methods for formatting
42///
43///
44/// # Example
45/// ```
46/// # use std::sync::Arc;
47/// # use arrow::datatypes::{Field, Schema, DataType};
48/// # use datafusion_expr::Operator;
49/// # use datafusion_physical_expr::expressions::{binary, col, lit};
50/// # use datafusion_physical_plan::{displayable, ExecutionPlan};
51/// # use datafusion_physical_plan::empty::EmptyExec;
52/// # use datafusion_physical_plan::filter::FilterExec;
53/// # let schema = Schema::new(vec![Field::new("i", DataType::Int32, false)]);
54/// # let plan = EmptyExec::new(Arc::new(schema));
55/// # let i = col("i", &plan.schema()).unwrap();
56/// # let predicate = binary(i, Operator::Eq, lit(1), &plan.schema()).unwrap();
57/// # let plan: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(predicate, Arc::new(plan)).unwrap());
58/// // Get a one line description (Displayable)
59/// let display_plan = displayable(plan.as_ref());
60///
61/// // you can use the returned objects to format plans
62/// // where you can use `Display` such as  format! or println!
63/// assert_eq!(
64///    &format!("The plan is: {}", display_plan.one_line()),
65///   "The plan is: FilterExec: i@0 = 1\n"
66/// );
67/// // You can also print out the plan and its children in indented mode
68/// assert_eq!(display_plan.indent(false).to_string(),
69///   "FilterExec: i@0 = 1\
70///   \n  EmptyExec\
71///   \n"
72/// );
73/// ```
74#[derive(Debug, Clone)]
75pub struct DisplayableExecutionPlan<'a> {
76    inner: &'a dyn ExecutionPlan,
77    /// How to show metrics
78    show_metrics: ShowMetrics,
79    /// If statistics should be displayed
80    show_statistics: bool,
81    /// If schema should be displayed. See [`Self::set_show_schema`]
82    show_schema: bool,
83}
84
85impl<'a> DisplayableExecutionPlan<'a> {
86    /// Create a wrapper around an [`ExecutionPlan`] which can be
87    /// pretty printed in a variety of ways
88    pub fn new(inner: &'a dyn ExecutionPlan) -> Self {
89        Self {
90            inner,
91            show_metrics: ShowMetrics::None,
92            show_statistics: false,
93            show_schema: false,
94        }
95    }
96
97    /// Create a wrapper around an [`ExecutionPlan`] which can be
98    /// pretty printed in a variety of ways that also shows aggregated
99    /// metrics
100    pub fn with_metrics(inner: &'a dyn ExecutionPlan) -> Self {
101        Self {
102            inner,
103            show_metrics: ShowMetrics::Aggregated,
104            show_statistics: false,
105            show_schema: false,
106        }
107    }
108
109    /// Create a wrapper around an [`ExecutionPlan`] which can be
110    /// pretty printed in a variety of ways that also shows all low
111    /// level metrics
112    pub fn with_full_metrics(inner: &'a dyn ExecutionPlan) -> Self {
113        Self {
114            inner,
115            show_metrics: ShowMetrics::Full,
116            show_statistics: false,
117            show_schema: false,
118        }
119    }
120
121    /// Enable display of schema
122    ///
123    /// If true, plans will be displayed with schema information at the end
124    /// of each line. The format is `schema=[[a:Int32;N, b:Int32;N, c:Int32;N]]`
125    pub fn set_show_schema(mut self, show_schema: bool) -> Self {
126        self.show_schema = show_schema;
127        self
128    }
129
130    /// Enable display of statistics
131    pub fn set_show_statistics(mut self, show_statistics: bool) -> Self {
132        self.show_statistics = show_statistics;
133        self
134    }
135
136    /// Return a `format`able structure that produces a single line
137    /// per node.
138    ///
139    /// ```text
140    /// ProjectionExec: expr=[a]
141    ///   CoalesceBatchesExec: target_batch_size=8192
142    ///     FilterExec: a < 5
143    ///       RepartitionExec: partitioning=RoundRobinBatch(16)
144    ///         DataSourceExec: source=...",
145    /// ```
146    pub fn indent(&self, verbose: bool) -> impl fmt::Display + 'a {
147        let format_type = if verbose {
148            DisplayFormatType::Verbose
149        } else {
150            DisplayFormatType::Default
151        };
152        struct Wrapper<'a> {
153            format_type: DisplayFormatType,
154            plan: &'a dyn ExecutionPlan,
155            show_metrics: ShowMetrics,
156            show_statistics: bool,
157            show_schema: bool,
158        }
159        impl fmt::Display for Wrapper<'_> {
160            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
161                let mut visitor = IndentVisitor {
162                    t: self.format_type,
163                    f,
164                    indent: 0,
165                    show_metrics: self.show_metrics,
166                    show_statistics: self.show_statistics,
167                    show_schema: self.show_schema,
168                };
169                accept(self.plan, &mut visitor)
170            }
171        }
172        Wrapper {
173            format_type,
174            plan: self.inner,
175            show_metrics: self.show_metrics,
176            show_statistics: self.show_statistics,
177            show_schema: self.show_schema,
178        }
179    }
180
181    /// Returns a `format`able structure that produces graphviz format for execution plan, which can
182    /// be directly visualized [here](https://dreampuf.github.io/GraphvizOnline).
183    ///
184    /// An example is
185    /// ```dot
186    /// strict digraph dot_plan {
187    //     0[label="ProjectionExec: expr=[id@0 + 2 as employee.id + Int32(2)]",tooltip=""]
188    //     1[label="EmptyExec",tooltip=""]
189    //     0 -> 1
190    // }
191    /// ```
192    pub fn graphviz(&self) -> impl fmt::Display + 'a {
193        struct Wrapper<'a> {
194            plan: &'a dyn ExecutionPlan,
195            show_metrics: ShowMetrics,
196            show_statistics: bool,
197        }
198        impl fmt::Display for Wrapper<'_> {
199            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
200                let t = DisplayFormatType::Default;
201
202                let mut visitor = GraphvizVisitor {
203                    f,
204                    t,
205                    show_metrics: self.show_metrics,
206                    show_statistics: self.show_statistics,
207                    graphviz_builder: GraphvizBuilder::default(),
208                    parents: Vec::new(),
209                };
210
211                visitor.start_graph()?;
212
213                accept(self.plan, &mut visitor)?;
214
215                visitor.end_graph()?;
216                Ok(())
217            }
218        }
219
220        Wrapper {
221            plan: self.inner,
222            show_metrics: self.show_metrics,
223            show_statistics: self.show_statistics,
224        }
225    }
226
227    /// Return a single-line summary of the root of the plan
228    /// Example: `ProjectionExec: expr=[a@0 as a]`.
229    pub fn one_line(&self) -> impl fmt::Display + 'a {
230        struct Wrapper<'a> {
231            plan: &'a dyn ExecutionPlan,
232            show_metrics: ShowMetrics,
233            show_statistics: bool,
234            show_schema: bool,
235        }
236
237        impl fmt::Display for Wrapper<'_> {
238            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
239                let mut visitor = IndentVisitor {
240                    f,
241                    t: DisplayFormatType::Default,
242                    indent: 0,
243                    show_metrics: self.show_metrics,
244                    show_statistics: self.show_statistics,
245                    show_schema: self.show_schema,
246                };
247                visitor.pre_visit(self.plan)?;
248                Ok(())
249            }
250        }
251
252        Wrapper {
253            plan: self.inner,
254            show_metrics: self.show_metrics,
255            show_statistics: self.show_statistics,
256            show_schema: self.show_schema,
257        }
258    }
259
260    /// format as a `StringifiedPlan`
261    pub fn to_stringified(&self, verbose: bool, plan_type: PlanType) -> StringifiedPlan {
262        StringifiedPlan::new(plan_type, self.indent(verbose).to_string())
263    }
264}
265
266/// Enum representing the different levels of metrics to display
267#[derive(Debug, Clone, Copy)]
268enum ShowMetrics {
269    /// Do not show any metrics
270    None,
271
272    /// Show aggregated metrics across partition
273    Aggregated,
274
275    /// Show full per-partition metrics
276    Full,
277}
278
279/// Formats plans with a single line per node.
280///
281/// # Example
282///
283/// ```text
284/// ProjectionExec: expr=[column1@0 + 2 as column1 + Int64(2)]
285///   FilterExec: column1@0 = 5
286///     ValuesExec
287/// ```
288struct IndentVisitor<'a, 'b> {
289    /// How to format each node
290    t: DisplayFormatType,
291    /// Write to this formatter
292    f: &'a mut Formatter<'b>,
293    /// Indent size
294    indent: usize,
295    /// How to show metrics
296    show_metrics: ShowMetrics,
297    /// If statistics should be displayed
298    show_statistics: bool,
299    /// If schema should be displayed
300    show_schema: bool,
301}
302
303impl ExecutionPlanVisitor for IndentVisitor<'_, '_> {
304    type Error = fmt::Error;
305    fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
306        write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
307        plan.fmt_as(self.t, self.f)?;
308        match self.show_metrics {
309            ShowMetrics::None => {}
310            ShowMetrics::Aggregated => {
311                if let Some(metrics) = plan.metrics() {
312                    let metrics = metrics
313                        .aggregate_by_name()
314                        .sorted_for_display()
315                        .timestamps_removed();
316
317                    write!(self.f, ", metrics=[{metrics}]")?;
318                } else {
319                    write!(self.f, ", metrics=[]")?;
320                }
321            }
322            ShowMetrics::Full => {
323                if let Some(metrics) = plan.metrics() {
324                    write!(self.f, ", metrics=[{metrics}]")?;
325                } else {
326                    write!(self.f, ", metrics=[]")?;
327                }
328            }
329        }
330        if self.show_statistics {
331            let stats = plan.statistics().map_err(|_e| fmt::Error)?;
332            write!(self.f, ", statistics=[{}]", stats)?;
333        }
334        if self.show_schema {
335            write!(
336                self.f,
337                ", schema={}",
338                display_schema(plan.schema().as_ref())
339            )?;
340        }
341        writeln!(self.f)?;
342        self.indent += 1;
343        Ok(true)
344    }
345
346    fn post_visit(&mut self, _plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
347        self.indent -= 1;
348        Ok(true)
349    }
350}
351
352struct GraphvizVisitor<'a, 'b> {
353    f: &'a mut Formatter<'b>,
354    /// How to format each node
355    t: DisplayFormatType,
356    /// How to show metrics
357    show_metrics: ShowMetrics,
358    /// If statistics should be displayed
359    show_statistics: bool,
360
361    graphviz_builder: GraphvizBuilder,
362    /// Used to record parent node ids when visiting a plan.
363    parents: Vec<usize>,
364}
365
366impl GraphvizVisitor<'_, '_> {
367    fn start_graph(&mut self) -> fmt::Result {
368        self.graphviz_builder.start_graph(self.f)
369    }
370
371    fn end_graph(&mut self) -> fmt::Result {
372        self.graphviz_builder.end_graph(self.f)
373    }
374}
375
376impl ExecutionPlanVisitor for GraphvizVisitor<'_, '_> {
377    type Error = fmt::Error;
378
379    fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
380        let id = self.graphviz_builder.next_id();
381
382        struct Wrapper<'a>(&'a dyn ExecutionPlan, DisplayFormatType);
383
384        impl fmt::Display for Wrapper<'_> {
385            fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
386                self.0.fmt_as(self.1, f)
387            }
388        }
389
390        let label = { format!("{}", Wrapper(plan, self.t)) };
391
392        let metrics = match self.show_metrics {
393            ShowMetrics::None => "".to_string(),
394            ShowMetrics::Aggregated => {
395                if let Some(metrics) = plan.metrics() {
396                    let metrics = metrics
397                        .aggregate_by_name()
398                        .sorted_for_display()
399                        .timestamps_removed();
400
401                    format!("metrics=[{metrics}]")
402                } else {
403                    "metrics=[]".to_string()
404                }
405            }
406            ShowMetrics::Full => {
407                if let Some(metrics) = plan.metrics() {
408                    format!("metrics=[{metrics}]")
409                } else {
410                    "metrics=[]".to_string()
411                }
412            }
413        };
414
415        let statistics = if self.show_statistics {
416            let stats = plan.statistics().map_err(|_e| fmt::Error)?;
417            format!("statistics=[{}]", stats)
418        } else {
419            "".to_string()
420        };
421
422        let delimiter = if !metrics.is_empty() && !statistics.is_empty() {
423            ", "
424        } else {
425            ""
426        };
427
428        self.graphviz_builder.add_node(
429            self.f,
430            id,
431            &label,
432            Some(&format!("{}{}{}", metrics, delimiter, statistics)),
433        )?;
434
435        if let Some(parent_node_id) = self.parents.last() {
436            self.graphviz_builder
437                .add_edge(self.f, *parent_node_id, id)?;
438        }
439
440        self.parents.push(id);
441
442        Ok(true)
443    }
444
445    fn post_visit(&mut self, _plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
446        self.parents.pop();
447        Ok(true)
448    }
449}
450
451/// Trait for types which could have additional details when formatted in `Verbose` mode
452pub trait DisplayAs {
453    /// Format according to `DisplayFormatType`, used when verbose representation looks
454    /// different from the default one
455    ///
456    /// Should not include a newline
457    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
458}
459
460/// A new type wrapper to display `T` implementing`DisplayAs` using the `Default` mode
461pub struct DefaultDisplay<T>(pub T);
462
463impl<T: DisplayAs> fmt::Display for DefaultDisplay<T> {
464    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
465        self.0.fmt_as(DisplayFormatType::Default, f)
466    }
467}
468
469/// A new type wrapper to display `T` implementing `DisplayAs` using the `Verbose` mode
470pub struct VerboseDisplay<T>(pub T);
471
472impl<T: DisplayAs> fmt::Display for VerboseDisplay<T> {
473    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
474        self.0.fmt_as(DisplayFormatType::Verbose, f)
475    }
476}
477
478/// A wrapper to customize partitioned file display
479#[derive(Debug)]
480pub struct ProjectSchemaDisplay<'a>(pub &'a SchemaRef);
481
482impl fmt::Display for ProjectSchemaDisplay<'_> {
483    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
484        let parts: Vec<_> = self
485            .0
486            .fields()
487            .iter()
488            .map(|x| x.name().to_owned())
489            .collect::<Vec<String>>();
490        write!(f, "[{}]", parts.join(", "))
491    }
492}
493
494pub fn display_orderings(f: &mut Formatter, orderings: &[LexOrdering]) -> fmt::Result {
495    if let Some(ordering) = orderings.first() {
496        if !ordering.is_empty() {
497            let start = if orderings.len() == 1 {
498                ", output_ordering="
499            } else {
500                ", output_orderings=["
501            };
502            write!(f, "{}", start)?;
503            for (idx, ordering) in
504                orderings.iter().enumerate().filter(|(_, o)| !o.is_empty())
505            {
506                match idx {
507                    0 => write!(f, "[{}]", ordering)?,
508                    _ => write!(f, ", [{}]", ordering)?,
509                }
510            }
511            let end = if orderings.len() == 1 { "" } else { "]" };
512            write!(f, "{}", end)?;
513        }
514    }
515
516    Ok(())
517}
518
519#[cfg(test)]
520mod tests {
521    use std::fmt::Write;
522    use std::sync::Arc;
523
524    use datafusion_common::{DataFusionError, Result, Statistics};
525    use datafusion_execution::{SendableRecordBatchStream, TaskContext};
526
527    use crate::{DisplayAs, ExecutionPlan, PlanProperties};
528
529    use super::DisplayableExecutionPlan;
530
531    #[derive(Debug, Clone, Copy)]
532    enum TestStatsExecPlan {
533        Panic,
534        Error,
535        Ok,
536    }
537
538    impl DisplayAs for TestStatsExecPlan {
539        fn fmt_as(
540            &self,
541            _t: crate::DisplayFormatType,
542            f: &mut std::fmt::Formatter,
543        ) -> std::fmt::Result {
544            write!(f, "TestStatsExecPlan")
545        }
546    }
547
548    impl ExecutionPlan for TestStatsExecPlan {
549        fn name(&self) -> &'static str {
550            "TestStatsExecPlan"
551        }
552
553        fn as_any(&self) -> &dyn std::any::Any {
554            self
555        }
556
557        fn properties(&self) -> &PlanProperties {
558            unimplemented!()
559        }
560
561        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
562            vec![]
563        }
564
565        fn with_new_children(
566            self: Arc<Self>,
567            _: Vec<Arc<dyn ExecutionPlan>>,
568        ) -> Result<Arc<dyn ExecutionPlan>> {
569            unimplemented!()
570        }
571
572        fn execute(
573            &self,
574            _: usize,
575            _: Arc<TaskContext>,
576        ) -> Result<SendableRecordBatchStream> {
577            todo!()
578        }
579
580        fn statistics(&self) -> Result<Statistics> {
581            match self {
582                Self::Panic => panic!("expected panic"),
583                Self::Error => {
584                    Err(DataFusionError::Internal("expected error".to_string()))
585                }
586                Self::Ok => Ok(Statistics::new_unknown(self.schema().as_ref())),
587            }
588        }
589    }
590
591    fn test_stats_display(exec: TestStatsExecPlan, show_stats: bool) {
592        let display =
593            DisplayableExecutionPlan::new(&exec).set_show_statistics(show_stats);
594
595        let mut buf = String::new();
596        write!(&mut buf, "{}", display.one_line()).unwrap();
597        let buf = buf.trim();
598        assert_eq!(buf, "TestStatsExecPlan");
599    }
600
601    #[test]
602    fn test_display_when_stats_panic_with_no_show_stats() {
603        test_stats_display(TestStatsExecPlan::Panic, false);
604    }
605
606    #[test]
607    fn test_display_when_stats_error_with_no_show_stats() {
608        test_stats_display(TestStatsExecPlan::Error, false);
609    }
610
611    #[test]
612    fn test_display_when_stats_ok_with_no_show_stats() {
613        test_stats_display(TestStatsExecPlan::Ok, false);
614    }
615
616    #[test]
617    #[should_panic(expected = "expected panic")]
618    fn test_display_when_stats_panic_with_show_stats() {
619        test_stats_display(TestStatsExecPlan::Panic, true);
620    }
621
622    #[test]
623    #[should_panic(expected = "Error")] // fmt::Error
624    fn test_display_when_stats_error_with_show_stats() {
625        test_stats_display(TestStatsExecPlan::Error, true);
626    }
627
628    #[test]
629    fn test_display_when_stats_ok_with_show_stats() {
630        test_stats_display(TestStatsExecPlan::Ok, false);
631    }
632}