datafusion_expr/logical_plan/
display.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module provides logic for displaying LogicalPlans in various styles
19
20use std::collections::HashMap;
21use std::fmt;
22
23use crate::{
24    expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn, DmlStatement, Expr,
25    Filter, Join, Limit, LogicalPlan, Partitioning, Projection, RecursiveQuery,
26    Repartition, Sort, Subquery, SubqueryAlias, TableProviderFilterPushDown, TableScan,
27    Unnest, Values, Window,
28};
29
30use crate::dml::CopyTo;
31use arrow::datatypes::Schema;
32use datafusion_common::display::GraphvizBuilder;
33use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor};
34use datafusion_common::{Column, DataFusionError};
35use serde_json::json;
36
37/// Formats plans with a single line per node. For example:
38///
39/// Projection: id
40///    Filter: state Eq Utf8(\"CO\")\
41///       CsvScan: employee.csv projection=Some([0, 3])";
42pub struct IndentVisitor<'a, 'b> {
43    f: &'a mut fmt::Formatter<'b>,
44    /// If true, includes summarized schema information
45    with_schema: bool,
46    /// The current indent
47    indent: usize,
48}
49
50impl<'a, 'b> IndentVisitor<'a, 'b> {
51    /// Create a visitor that will write a formatted LogicalPlan to f. If `with_schema` is
52    /// true, includes schema information on each line.
53    pub fn new(f: &'a mut fmt::Formatter<'b>, with_schema: bool) -> Self {
54        Self {
55            f,
56            with_schema,
57            indent: 0,
58        }
59    }
60}
61
62impl<'n> TreeNodeVisitor<'n> for IndentVisitor<'_, '_> {
63    type Node = LogicalPlan;
64
65    fn f_down(
66        &mut self,
67        plan: &'n LogicalPlan,
68    ) -> datafusion_common::Result<TreeNodeRecursion> {
69        if self.indent > 0 {
70            writeln!(self.f)?;
71        }
72        write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
73        write!(self.f, "{}", plan.display())?;
74        if self.with_schema {
75            write!(
76                self.f,
77                " {}",
78                display_schema(&plan.schema().as_ref().to_owned().into())
79            )?;
80        }
81
82        self.indent += 1;
83        Ok(TreeNodeRecursion::Continue)
84    }
85
86    fn f_up(
87        &mut self,
88        _plan: &'n LogicalPlan,
89    ) -> datafusion_common::Result<TreeNodeRecursion> {
90        self.indent -= 1;
91        Ok(TreeNodeRecursion::Continue)
92    }
93}
94
95/// Print the schema in a compact representation to `buf`
96///
97/// For example: `foo:Utf8` if `foo` can not be null, and
98/// `foo:Utf8;N` if `foo` is nullable.
99///
100/// ```
101/// use arrow::datatypes::{Field, Schema, DataType};
102/// # use datafusion_expr::logical_plan::display_schema;
103/// let schema = Schema::new(vec![
104///     Field::new("id", DataType::Int32, false),
105///     Field::new("first_name", DataType::Utf8, true),
106///  ]);
107///
108///  assert_eq!(
109///      "[id:Int32, first_name:Utf8;N]",
110///      format!("{}", display_schema(&schema))
111///  );
112/// ```
113pub fn display_schema(schema: &Schema) -> impl fmt::Display + '_ {
114    struct Wrapper<'a>(&'a Schema);
115
116    impl fmt::Display for Wrapper<'_> {
117        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
118            write!(f, "[")?;
119            for (idx, field) in self.0.fields().iter().enumerate() {
120                if idx > 0 {
121                    write!(f, ", ")?;
122                }
123                let nullable_str = if field.is_nullable() { ";N" } else { "" };
124                write!(
125                    f,
126                    "{}:{:?}{}",
127                    field.name(),
128                    field.data_type(),
129                    nullable_str
130                )?;
131            }
132            write!(f, "]")
133        }
134    }
135    Wrapper(schema)
136}
137
138/// Formats plans for graphical display using the `DOT` language. This
139/// format can be visualized using software from
140/// [`graphviz`](https://graphviz.org/)
141pub struct GraphvizVisitor<'a, 'b> {
142    f: &'a mut fmt::Formatter<'b>,
143    graphviz_builder: GraphvizBuilder,
144    /// If true, includes summarized schema information
145    with_schema: bool,
146
147    /// Holds the ids (as generated from `graphviz_builder` of all
148    /// parent nodes
149    parent_ids: Vec<usize>,
150}
151
152impl<'a, 'b> GraphvizVisitor<'a, 'b> {
153    pub fn new(f: &'a mut fmt::Formatter<'b>) -> Self {
154        Self {
155            f,
156            graphviz_builder: GraphvizBuilder::default(),
157            with_schema: false,
158            parent_ids: Vec::new(),
159        }
160    }
161
162    /// Sets a flag which controls if the output schema is displayed
163    pub fn set_with_schema(&mut self, with_schema: bool) {
164        self.with_schema = with_schema;
165    }
166
167    pub fn pre_visit_plan(&mut self, label: &str) -> fmt::Result {
168        self.graphviz_builder.start_cluster(self.f, label)
169    }
170
171    pub fn post_visit_plan(&mut self) -> fmt::Result {
172        self.graphviz_builder.end_cluster(self.f)
173    }
174
175    pub fn start_graph(&mut self) -> fmt::Result {
176        self.graphviz_builder.start_graph(self.f)
177    }
178
179    pub fn end_graph(&mut self) -> fmt::Result {
180        self.graphviz_builder.end_graph(self.f)
181    }
182}
183
184impl<'n> TreeNodeVisitor<'n> for GraphvizVisitor<'_, '_> {
185    type Node = LogicalPlan;
186
187    fn f_down(
188        &mut self,
189        plan: &'n LogicalPlan,
190    ) -> datafusion_common::Result<TreeNodeRecursion> {
191        let id = self.graphviz_builder.next_id();
192
193        // Create a new graph node for `plan` such as
194        // id [label="foo"]
195        let label = if self.with_schema {
196            format!(
197                r"{}\nSchema: {}",
198                plan.display(),
199                display_schema(&plan.schema().as_ref().to_owned().into())
200            )
201        } else {
202            format!("{}", plan.display())
203        };
204
205        self.graphviz_builder
206            .add_node(self.f, id, &label, None)
207            .map_err(|_e| DataFusionError::Internal("Fail to format".to_string()))?;
208
209        // Create an edge to our parent node, if any
210        //  parent_id -> id
211        if let Some(parent_id) = self.parent_ids.last() {
212            self.graphviz_builder
213                .add_edge(self.f, *parent_id, id)
214                .map_err(|_e| DataFusionError::Internal("Fail to format".to_string()))?;
215        }
216
217        self.parent_ids.push(id);
218        Ok(TreeNodeRecursion::Continue)
219    }
220
221    fn f_up(
222        &mut self,
223        _plan: &LogicalPlan,
224    ) -> datafusion_common::Result<TreeNodeRecursion> {
225        // always be non-empty as pre_visit always pushes
226        // So it should always be Ok(true)
227        let res = self.parent_ids.pop();
228        res.ok_or(DataFusionError::Internal("Fail to format".to_string()))
229            .map(|_| TreeNodeRecursion::Continue)
230    }
231}
232
233/// Formats plans to display as postgresql plan json format.
234///
235/// There are already many existing visualizer for this format, for example [dalibo](https://explain.dalibo.com/).
236/// Unfortunately, there is no formal spec for this format, but it is widely used in the PostgreSQL community.
237///
238/// Here is an example of the format:
239///
240/// ```json
241/// [
242///     {
243///         "Plan": {
244///             "Node Type": "Sort",
245///             "Output": [
246///                 "question_1.id",
247///                 "question_1.title",
248///                 "question_1.text",
249///                 "question_1.file",
250///                 "question_1.type",
251///                 "question_1.source",
252///                 "question_1.exam_id"
253///             ],
254///             "Sort Key": [
255///                 "question_1.id"
256///             ],
257///             "Plans": [
258///                 {
259///                     "Node Type": "Seq Scan",
260///                     "Parent Relationship": "Left",
261///                     "Relation Name": "question",
262///                     "Schema": "public",
263///                     "Alias": "question_1",
264///                     "Output": [
265///                        "question_1.id",
266///                         "question_1.title",
267///                        "question_1.text",
268///                         "question_1.file",
269///                         "question_1.type",
270///                         "question_1.source",
271///                         "question_1.exam_id"
272///                     ],
273///                     "Filter": "(question_1.exam_id = 1)"
274///                 }
275///             ]
276///         }
277///     }
278/// ]
279/// ```
280pub struct PgJsonVisitor<'a, 'b> {
281    f: &'a mut fmt::Formatter<'b>,
282
283    /// A mapping from plan node id to the plan node json representation.
284    objects: HashMap<u32, serde_json::Value>,
285
286    next_id: u32,
287
288    /// If true, includes summarized schema information
289    with_schema: bool,
290
291    /// Holds the ids (as generated from `graphviz_builder` of all
292    /// parent nodes
293    parent_ids: Vec<u32>,
294}
295
296impl<'a, 'b> PgJsonVisitor<'a, 'b> {
297    pub fn new(f: &'a mut fmt::Formatter<'b>) -> Self {
298        Self {
299            f,
300            objects: HashMap::new(),
301            next_id: 0,
302            with_schema: false,
303            parent_ids: Vec::new(),
304        }
305    }
306
307    /// Sets a flag which controls if the output schema is displayed
308    pub fn with_schema(&mut self, with_schema: bool) {
309        self.with_schema = with_schema;
310    }
311
312    /// Converts a logical plan node to a json object.
313    fn to_json_value(node: &LogicalPlan) -> serde_json::Value {
314        match node {
315            LogicalPlan::EmptyRelation(_) => {
316                json!({
317                    "Node Type": "EmptyRelation",
318                })
319            }
320            LogicalPlan::RecursiveQuery(RecursiveQuery { is_distinct, .. }) => {
321                json!({
322                    "Node Type": "RecursiveQuery",
323                    "Is Distinct": is_distinct,
324                })
325            }
326            LogicalPlan::Values(Values { ref values, .. }) => {
327                let str_values = values
328                    .iter()
329                    // limit to only 5 values to avoid horrible display
330                    .take(5)
331                    .map(|row| {
332                        let item = row
333                            .iter()
334                            .map(|expr| expr.to_string())
335                            .collect::<Vec<_>>()
336                            .join(", ");
337                        format!("({item})")
338                    })
339                    .collect::<Vec<_>>()
340                    .join(", ");
341
342                let eclipse = if values.len() > 5 { "..." } else { "" };
343
344                let values_str = format!("{}{}", str_values, eclipse);
345                json!({
346                    "Node Type": "Values",
347                    "Values": values_str
348                })
349            }
350            LogicalPlan::TableScan(TableScan {
351                ref source,
352                ref table_name,
353                ref filters,
354                ref fetch,
355                ..
356            }) => {
357                let mut object = json!({
358                    "Node Type": "TableScan",
359                    "Relation Name": table_name.table(),
360                });
361
362                if let Some(s) = table_name.schema() {
363                    object["Schema"] = serde_json::Value::String(s.to_string());
364                }
365
366                if let Some(c) = table_name.catalog() {
367                    object["Catalog"] = serde_json::Value::String(c.to_string());
368                }
369
370                if !filters.is_empty() {
371                    let mut full_filter = vec![];
372                    let mut partial_filter = vec![];
373                    let mut unsupported_filters = vec![];
374                    let filters: Vec<&Expr> = filters.iter().collect();
375
376                    if let Ok(results) = source.supports_filters_pushdown(&filters) {
377                        filters.iter().zip(results.iter()).for_each(
378                            |(x, res)| match res {
379                                TableProviderFilterPushDown::Exact => full_filter.push(x),
380                                TableProviderFilterPushDown::Inexact => {
381                                    partial_filter.push(x)
382                                }
383                                TableProviderFilterPushDown::Unsupported => {
384                                    unsupported_filters.push(x)
385                                }
386                            },
387                        );
388                    }
389
390                    if !full_filter.is_empty() {
391                        object["Full Filters"] =
392                            serde_json::Value::String(expr_vec_fmt!(full_filter));
393                    };
394                    if !partial_filter.is_empty() {
395                        object["Partial Filters"] =
396                            serde_json::Value::String(expr_vec_fmt!(partial_filter));
397                    }
398                    if !unsupported_filters.is_empty() {
399                        object["Unsupported Filters"] =
400                            serde_json::Value::String(expr_vec_fmt!(unsupported_filters));
401                    }
402                }
403
404                if let Some(f) = fetch {
405                    object["Fetch"] = serde_json::Value::Number((*f).into());
406                }
407
408                object
409            }
410            LogicalPlan::Projection(Projection { ref expr, .. }) => {
411                json!({
412                    "Node Type": "Projection",
413                    "Expressions": expr.iter().map(|e| e.to_string()).collect::<Vec<_>>()
414                })
415            }
416            LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
417                json!({
418                    "Node Type": "Projection",
419                    "Operation": op.name(),
420                    "Table Name": table_name.table()
421                })
422            }
423            LogicalPlan::Copy(CopyTo {
424                input: _,
425                output_url,
426                file_type,
427                partition_by: _,
428                options,
429            }) => {
430                let op_str = options
431                    .iter()
432                    .map(|(k, v)| format!("{}={}", k, v))
433                    .collect::<Vec<_>>()
434                    .join(", ");
435                json!({
436                    "Node Type": "CopyTo",
437                    "Output URL": output_url,
438                    "File Type": format!("{}", file_type.get_ext()),
439                    "Options": op_str
440                })
441            }
442            LogicalPlan::Ddl(ddl) => {
443                json!({
444                    "Node Type": "Ddl",
445                    "Operation": format!("{}", ddl.display())
446                })
447            }
448            LogicalPlan::Filter(Filter {
449                predicate: ref expr,
450                ..
451            }) => {
452                json!({
453                    "Node Type": "Filter",
454                    "Condition": format!("{}", expr)
455                })
456            }
457            LogicalPlan::Window(Window {
458                ref window_expr, ..
459            }) => {
460                json!({
461                    "Node Type": "WindowAggr",
462                    "Expressions": expr_vec_fmt!(window_expr)
463                })
464            }
465            LogicalPlan::Aggregate(Aggregate {
466                ref group_expr,
467                ref aggr_expr,
468                ..
469            }) => {
470                json!({
471                    "Node Type": "Aggregate",
472                    "Group By": expr_vec_fmt!(group_expr),
473                    "Aggregates": expr_vec_fmt!(aggr_expr)
474                })
475            }
476            LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
477                let mut object = json!({
478                    "Node Type": "Sort",
479                    "Sort Key": expr_vec_fmt!(expr),
480                });
481
482                if let Some(fetch) = fetch {
483                    object["Fetch"] = serde_json::Value::Number((*fetch).into());
484                }
485
486                object
487            }
488            LogicalPlan::Join(Join {
489                on: ref keys,
490                filter,
491                join_constraint,
492                join_type,
493                ..
494            }) => {
495                let join_expr: Vec<String> =
496                    keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
497                let filter_expr = filter
498                    .as_ref()
499                    .map(|expr| format!(" Filter: {expr}"))
500                    .unwrap_or_else(|| "".to_string());
501                json!({
502                    "Node Type": format!("{} Join", join_type),
503                    "Join Constraint": format!("{:?}", join_constraint),
504                    "Join Keys": join_expr.join(", "),
505                    "Filter": format!("{}", filter_expr)
506                })
507            }
508            LogicalPlan::Repartition(Repartition {
509                partitioning_scheme,
510                ..
511            }) => match partitioning_scheme {
512                Partitioning::RoundRobinBatch(n) => {
513                    json!({
514                        "Node Type": "Repartition",
515                        "Partitioning Scheme": "RoundRobinBatch",
516                        "Partition Count": n
517                    })
518                }
519                Partitioning::Hash(expr, n) => {
520                    let hash_expr: Vec<String> =
521                        expr.iter().map(|e| format!("{e}")).collect();
522
523                    json!({
524                        "Node Type": "Repartition",
525                        "Partitioning Scheme": "Hash",
526                        "Partition Count": n,
527                        "Partitioning Key": hash_expr
528                    })
529                }
530                Partitioning::DistributeBy(expr) => {
531                    let dist_by_expr: Vec<String> =
532                        expr.iter().map(|e| format!("{e}")).collect();
533                    json!({
534                        "Node Type": "Repartition",
535                        "Partitioning Scheme": "DistributeBy",
536                        "Partitioning Key": dist_by_expr
537                    })
538                }
539            },
540            LogicalPlan::Limit(Limit {
541                ref skip,
542                ref fetch,
543                ..
544            }) => {
545                let mut object = serde_json::json!(
546                    {
547                        "Node Type": "Limit",
548                    }
549                );
550                if let Some(s) = skip {
551                    object["Skip"] = s.to_string().into()
552                };
553                if let Some(f) = fetch {
554                    object["Fetch"] = f.to_string().into()
555                };
556                object
557            }
558            LogicalPlan::Subquery(Subquery { .. }) => {
559                json!({
560                    "Node Type": "Subquery"
561                })
562            }
563            LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
564                json!({
565                    "Node Type": "Subquery",
566                    "Alias": alias.table(),
567                })
568            }
569            LogicalPlan::Statement(statement) => {
570                json!({
571                    "Node Type": "Statement",
572                    "Statement": format!("{}", statement.display())
573                })
574            }
575            LogicalPlan::Distinct(distinct) => match distinct {
576                Distinct::All(_) => {
577                    json!({
578                        "Node Type": "DistinctAll"
579                    })
580                }
581                Distinct::On(DistinctOn {
582                    on_expr,
583                    select_expr,
584                    sort_expr,
585                    ..
586                }) => {
587                    let mut object = json!({
588                        "Node Type": "DistinctOn",
589                        "On": expr_vec_fmt!(on_expr),
590                        "Select": expr_vec_fmt!(select_expr),
591                    });
592                    if let Some(sort_expr) = sort_expr {
593                        object["Sort"] =
594                            serde_json::Value::String(expr_vec_fmt!(sort_expr));
595                    }
596
597                    object
598                }
599            },
600            LogicalPlan::Explain { .. } => {
601                json!({
602                    "Node Type": "Explain"
603                })
604            }
605            LogicalPlan::Analyze { .. } => {
606                json!({
607                    "Node Type": "Analyze"
608                })
609            }
610            LogicalPlan::Union(_) => {
611                json!({
612                    "Node Type": "Union"
613                })
614            }
615            LogicalPlan::Extension(e) => {
616                json!({
617                    "Node Type": e.node.name(),
618                    "Detail": format!("{:?}", e.node)
619                })
620            }
621            LogicalPlan::DescribeTable(DescribeTable { .. }) => {
622                json!({
623                    "Node Type": "DescribeTable"
624                })
625            }
626            LogicalPlan::Unnest(Unnest {
627                input: plan,
628                list_type_columns: list_col_indices,
629                struct_type_columns: struct_col_indices,
630                ..
631            }) => {
632                let input_columns = plan.schema().columns();
633                let list_type_columns = list_col_indices
634                    .iter()
635                    .map(|(i, unnest_info)| {
636                        format!(
637                            "{}|depth={:?}",
638                            &input_columns[*i].to_string(),
639                            unnest_info.depth
640                        )
641                    })
642                    .collect::<Vec<String>>();
643                let struct_type_columns = struct_col_indices
644                    .iter()
645                    .map(|i| &input_columns[*i])
646                    .collect::<Vec<&Column>>();
647                json!({
648                    "Node Type": "Unnest",
649                    "ListColumn": expr_vec_fmt!(list_type_columns),
650                    "StructColumn": expr_vec_fmt!(struct_type_columns),
651                })
652            }
653        }
654    }
655}
656
657impl<'n> TreeNodeVisitor<'n> for PgJsonVisitor<'_, '_> {
658    type Node = LogicalPlan;
659
660    fn f_down(
661        &mut self,
662        node: &'n LogicalPlan,
663    ) -> datafusion_common::Result<TreeNodeRecursion> {
664        let id = self.next_id;
665        self.next_id += 1;
666        let mut object = Self::to_json_value(node);
667
668        object["Plans"] = serde_json::Value::Array(vec![]);
669
670        if self.with_schema {
671            object["Output"] = serde_json::Value::Array(
672                node.schema()
673                    .fields()
674                    .iter()
675                    .map(|f| f.name().to_string())
676                    .map(serde_json::Value::String)
677                    .collect(),
678            );
679        };
680
681        self.objects.insert(id, object);
682        self.parent_ids.push(id);
683        Ok(TreeNodeRecursion::Continue)
684    }
685
686    fn f_up(
687        &mut self,
688        _node: &Self::Node,
689    ) -> datafusion_common::Result<TreeNodeRecursion> {
690        let id = self.parent_ids.pop().unwrap();
691
692        let current_node = self.objects.remove(&id).ok_or_else(|| {
693            DataFusionError::Internal("Missing current node!".to_string())
694        })?;
695
696        if let Some(parent_id) = self.parent_ids.last() {
697            let parent_node = self
698                .objects
699                .get_mut(parent_id)
700                .expect("Missing parent node!");
701            let plans = parent_node
702                .get_mut("Plans")
703                .and_then(|p| p.as_array_mut())
704                .expect("Plans should be an array");
705
706            plans.push(current_node);
707        } else {
708            // This is the root node
709            let plan = serde_json::json!([{"Plan": current_node}]);
710            write!(
711                self.f,
712                "{}",
713                serde_json::to_string_pretty(&plan)
714                    .map_err(|e| DataFusionError::External(Box::new(e)))?
715            )?;
716        }
717
718        Ok(TreeNodeRecursion::Continue)
719    }
720}
721
722#[cfg(test)]
723mod tests {
724    use arrow::datatypes::{DataType, Field};
725
726    use super::*;
727
728    #[test]
729    fn test_display_empty_schema() {
730        let schema = Schema::empty();
731        assert_eq!("[]", format!("{}", display_schema(&schema)));
732    }
733
734    #[test]
735    fn test_display_schema() {
736        let schema = Schema::new(vec![
737            Field::new("id", DataType::Int32, false),
738            Field::new("first_name", DataType::Utf8, true),
739        ]);
740
741        assert_eq!(
742            "[id:Int32, first_name:Utf8;N]",
743            format!("{}", display_schema(&schema))
744        );
745    }
746}