1use std::collections::HashMap;
21use std::fmt;
22
23use crate::{
24 expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn, DmlStatement, Expr,
25 Filter, Join, Limit, LogicalPlan, Partitioning, Projection, RecursiveQuery,
26 Repartition, Sort, Subquery, SubqueryAlias, TableProviderFilterPushDown, TableScan,
27 Unnest, Values, Window,
28};
29
30use crate::dml::CopyTo;
31use arrow::datatypes::Schema;
32use datafusion_common::display::GraphvizBuilder;
33use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor};
34use datafusion_common::{Column, DataFusionError};
35use serde_json::json;
36
37pub struct IndentVisitor<'a, 'b> {
43 f: &'a mut fmt::Formatter<'b>,
44 with_schema: bool,
46 indent: usize,
48}
49
50impl<'a, 'b> IndentVisitor<'a, 'b> {
51 pub fn new(f: &'a mut fmt::Formatter<'b>, with_schema: bool) -> Self {
54 Self {
55 f,
56 with_schema,
57 indent: 0,
58 }
59 }
60}
61
62impl<'n> TreeNodeVisitor<'n> for IndentVisitor<'_, '_> {
63 type Node = LogicalPlan;
64
65 fn f_down(
66 &mut self,
67 plan: &'n LogicalPlan,
68 ) -> datafusion_common::Result<TreeNodeRecursion> {
69 if self.indent > 0 {
70 writeln!(self.f)?;
71 }
72 write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
73 write!(self.f, "{}", plan.display())?;
74 if self.with_schema {
75 write!(
76 self.f,
77 " {}",
78 display_schema(&plan.schema().as_ref().to_owned().into())
79 )?;
80 }
81
82 self.indent += 1;
83 Ok(TreeNodeRecursion::Continue)
84 }
85
86 fn f_up(
87 &mut self,
88 _plan: &'n LogicalPlan,
89 ) -> datafusion_common::Result<TreeNodeRecursion> {
90 self.indent -= 1;
91 Ok(TreeNodeRecursion::Continue)
92 }
93}
94
95pub fn display_schema(schema: &Schema) -> impl fmt::Display + '_ {
114 struct Wrapper<'a>(&'a Schema);
115
116 impl fmt::Display for Wrapper<'_> {
117 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
118 write!(f, "[")?;
119 for (idx, field) in self.0.fields().iter().enumerate() {
120 if idx > 0 {
121 write!(f, ", ")?;
122 }
123 let nullable_str = if field.is_nullable() { ";N" } else { "" };
124 write!(
125 f,
126 "{}:{:?}{}",
127 field.name(),
128 field.data_type(),
129 nullable_str
130 )?;
131 }
132 write!(f, "]")
133 }
134 }
135 Wrapper(schema)
136}
137
138pub struct GraphvizVisitor<'a, 'b> {
142 f: &'a mut fmt::Formatter<'b>,
143 graphviz_builder: GraphvizBuilder,
144 with_schema: bool,
146
147 parent_ids: Vec<usize>,
150}
151
152impl<'a, 'b> GraphvizVisitor<'a, 'b> {
153 pub fn new(f: &'a mut fmt::Formatter<'b>) -> Self {
154 Self {
155 f,
156 graphviz_builder: GraphvizBuilder::default(),
157 with_schema: false,
158 parent_ids: Vec::new(),
159 }
160 }
161
162 pub fn set_with_schema(&mut self, with_schema: bool) {
164 self.with_schema = with_schema;
165 }
166
167 pub fn pre_visit_plan(&mut self, label: &str) -> fmt::Result {
168 self.graphviz_builder.start_cluster(self.f, label)
169 }
170
171 pub fn post_visit_plan(&mut self) -> fmt::Result {
172 self.graphviz_builder.end_cluster(self.f)
173 }
174
175 pub fn start_graph(&mut self) -> fmt::Result {
176 self.graphviz_builder.start_graph(self.f)
177 }
178
179 pub fn end_graph(&mut self) -> fmt::Result {
180 self.graphviz_builder.end_graph(self.f)
181 }
182}
183
184impl<'n> TreeNodeVisitor<'n> for GraphvizVisitor<'_, '_> {
185 type Node = LogicalPlan;
186
187 fn f_down(
188 &mut self,
189 plan: &'n LogicalPlan,
190 ) -> datafusion_common::Result<TreeNodeRecursion> {
191 let id = self.graphviz_builder.next_id();
192
193 let label = if self.with_schema {
196 format!(
197 r"{}\nSchema: {}",
198 plan.display(),
199 display_schema(&plan.schema().as_ref().to_owned().into())
200 )
201 } else {
202 format!("{}", plan.display())
203 };
204
205 self.graphviz_builder
206 .add_node(self.f, id, &label, None)
207 .map_err(|_e| DataFusionError::Internal("Fail to format".to_string()))?;
208
209 if let Some(parent_id) = self.parent_ids.last() {
212 self.graphviz_builder
213 .add_edge(self.f, *parent_id, id)
214 .map_err(|_e| DataFusionError::Internal("Fail to format".to_string()))?;
215 }
216
217 self.parent_ids.push(id);
218 Ok(TreeNodeRecursion::Continue)
219 }
220
221 fn f_up(
222 &mut self,
223 _plan: &LogicalPlan,
224 ) -> datafusion_common::Result<TreeNodeRecursion> {
225 let res = self.parent_ids.pop();
228 res.ok_or(DataFusionError::Internal("Fail to format".to_string()))
229 .map(|_| TreeNodeRecursion::Continue)
230 }
231}
232
233pub struct PgJsonVisitor<'a, 'b> {
281 f: &'a mut fmt::Formatter<'b>,
282
283 objects: HashMap<u32, serde_json::Value>,
285
286 next_id: u32,
287
288 with_schema: bool,
290
291 parent_ids: Vec<u32>,
294}
295
296impl<'a, 'b> PgJsonVisitor<'a, 'b> {
297 pub fn new(f: &'a mut fmt::Formatter<'b>) -> Self {
298 Self {
299 f,
300 objects: HashMap::new(),
301 next_id: 0,
302 with_schema: false,
303 parent_ids: Vec::new(),
304 }
305 }
306
307 pub fn with_schema(&mut self, with_schema: bool) {
309 self.with_schema = with_schema;
310 }
311
312 fn to_json_value(node: &LogicalPlan) -> serde_json::Value {
314 match node {
315 LogicalPlan::EmptyRelation(_) => {
316 json!({
317 "Node Type": "EmptyRelation",
318 })
319 }
320 LogicalPlan::RecursiveQuery(RecursiveQuery { is_distinct, .. }) => {
321 json!({
322 "Node Type": "RecursiveQuery",
323 "Is Distinct": is_distinct,
324 })
325 }
326 LogicalPlan::Values(Values { ref values, .. }) => {
327 let str_values = values
328 .iter()
329 .take(5)
331 .map(|row| {
332 let item = row
333 .iter()
334 .map(|expr| expr.to_string())
335 .collect::<Vec<_>>()
336 .join(", ");
337 format!("({item})")
338 })
339 .collect::<Vec<_>>()
340 .join(", ");
341
342 let eclipse = if values.len() > 5 { "..." } else { "" };
343
344 let values_str = format!("{}{}", str_values, eclipse);
345 json!({
346 "Node Type": "Values",
347 "Values": values_str
348 })
349 }
350 LogicalPlan::TableScan(TableScan {
351 ref source,
352 ref table_name,
353 ref filters,
354 ref fetch,
355 ..
356 }) => {
357 let mut object = json!({
358 "Node Type": "TableScan",
359 "Relation Name": table_name.table(),
360 });
361
362 if let Some(s) = table_name.schema() {
363 object["Schema"] = serde_json::Value::String(s.to_string());
364 }
365
366 if let Some(c) = table_name.catalog() {
367 object["Catalog"] = serde_json::Value::String(c.to_string());
368 }
369
370 if !filters.is_empty() {
371 let mut full_filter = vec![];
372 let mut partial_filter = vec![];
373 let mut unsupported_filters = vec![];
374 let filters: Vec<&Expr> = filters.iter().collect();
375
376 if let Ok(results) = source.supports_filters_pushdown(&filters) {
377 filters.iter().zip(results.iter()).for_each(
378 |(x, res)| match res {
379 TableProviderFilterPushDown::Exact => full_filter.push(x),
380 TableProviderFilterPushDown::Inexact => {
381 partial_filter.push(x)
382 }
383 TableProviderFilterPushDown::Unsupported => {
384 unsupported_filters.push(x)
385 }
386 },
387 );
388 }
389
390 if !full_filter.is_empty() {
391 object["Full Filters"] =
392 serde_json::Value::String(expr_vec_fmt!(full_filter));
393 };
394 if !partial_filter.is_empty() {
395 object["Partial Filters"] =
396 serde_json::Value::String(expr_vec_fmt!(partial_filter));
397 }
398 if !unsupported_filters.is_empty() {
399 object["Unsupported Filters"] =
400 serde_json::Value::String(expr_vec_fmt!(unsupported_filters));
401 }
402 }
403
404 if let Some(f) = fetch {
405 object["Fetch"] = serde_json::Value::Number((*f).into());
406 }
407
408 object
409 }
410 LogicalPlan::Projection(Projection { ref expr, .. }) => {
411 json!({
412 "Node Type": "Projection",
413 "Expressions": expr.iter().map(|e| e.to_string()).collect::<Vec<_>>()
414 })
415 }
416 LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
417 json!({
418 "Node Type": "Projection",
419 "Operation": op.name(),
420 "Table Name": table_name.table()
421 })
422 }
423 LogicalPlan::Copy(CopyTo {
424 input: _,
425 output_url,
426 file_type,
427 partition_by: _,
428 options,
429 }) => {
430 let op_str = options
431 .iter()
432 .map(|(k, v)| format!("{}={}", k, v))
433 .collect::<Vec<_>>()
434 .join(", ");
435 json!({
436 "Node Type": "CopyTo",
437 "Output URL": output_url,
438 "File Type": format!("{}", file_type.get_ext()),
439 "Options": op_str
440 })
441 }
442 LogicalPlan::Ddl(ddl) => {
443 json!({
444 "Node Type": "Ddl",
445 "Operation": format!("{}", ddl.display())
446 })
447 }
448 LogicalPlan::Filter(Filter {
449 predicate: ref expr,
450 ..
451 }) => {
452 json!({
453 "Node Type": "Filter",
454 "Condition": format!("{}", expr)
455 })
456 }
457 LogicalPlan::Window(Window {
458 ref window_expr, ..
459 }) => {
460 json!({
461 "Node Type": "WindowAggr",
462 "Expressions": expr_vec_fmt!(window_expr)
463 })
464 }
465 LogicalPlan::Aggregate(Aggregate {
466 ref group_expr,
467 ref aggr_expr,
468 ..
469 }) => {
470 json!({
471 "Node Type": "Aggregate",
472 "Group By": expr_vec_fmt!(group_expr),
473 "Aggregates": expr_vec_fmt!(aggr_expr)
474 })
475 }
476 LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
477 let mut object = json!({
478 "Node Type": "Sort",
479 "Sort Key": expr_vec_fmt!(expr),
480 });
481
482 if let Some(fetch) = fetch {
483 object["Fetch"] = serde_json::Value::Number((*fetch).into());
484 }
485
486 object
487 }
488 LogicalPlan::Join(Join {
489 on: ref keys,
490 filter,
491 join_constraint,
492 join_type,
493 ..
494 }) => {
495 let join_expr: Vec<String> =
496 keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
497 let filter_expr = filter
498 .as_ref()
499 .map(|expr| format!(" Filter: {expr}"))
500 .unwrap_or_else(|| "".to_string());
501 json!({
502 "Node Type": format!("{} Join", join_type),
503 "Join Constraint": format!("{:?}", join_constraint),
504 "Join Keys": join_expr.join(", "),
505 "Filter": format!("{}", filter_expr)
506 })
507 }
508 LogicalPlan::Repartition(Repartition {
509 partitioning_scheme,
510 ..
511 }) => match partitioning_scheme {
512 Partitioning::RoundRobinBatch(n) => {
513 json!({
514 "Node Type": "Repartition",
515 "Partitioning Scheme": "RoundRobinBatch",
516 "Partition Count": n
517 })
518 }
519 Partitioning::Hash(expr, n) => {
520 let hash_expr: Vec<String> =
521 expr.iter().map(|e| format!("{e}")).collect();
522
523 json!({
524 "Node Type": "Repartition",
525 "Partitioning Scheme": "Hash",
526 "Partition Count": n,
527 "Partitioning Key": hash_expr
528 })
529 }
530 Partitioning::DistributeBy(expr) => {
531 let dist_by_expr: Vec<String> =
532 expr.iter().map(|e| format!("{e}")).collect();
533 json!({
534 "Node Type": "Repartition",
535 "Partitioning Scheme": "DistributeBy",
536 "Partitioning Key": dist_by_expr
537 })
538 }
539 },
540 LogicalPlan::Limit(Limit {
541 ref skip,
542 ref fetch,
543 ..
544 }) => {
545 let mut object = serde_json::json!(
546 {
547 "Node Type": "Limit",
548 }
549 );
550 if let Some(s) = skip {
551 object["Skip"] = s.to_string().into()
552 };
553 if let Some(f) = fetch {
554 object["Fetch"] = f.to_string().into()
555 };
556 object
557 }
558 LogicalPlan::Subquery(Subquery { .. }) => {
559 json!({
560 "Node Type": "Subquery"
561 })
562 }
563 LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
564 json!({
565 "Node Type": "Subquery",
566 "Alias": alias.table(),
567 })
568 }
569 LogicalPlan::Statement(statement) => {
570 json!({
571 "Node Type": "Statement",
572 "Statement": format!("{}", statement.display())
573 })
574 }
575 LogicalPlan::Distinct(distinct) => match distinct {
576 Distinct::All(_) => {
577 json!({
578 "Node Type": "DistinctAll"
579 })
580 }
581 Distinct::On(DistinctOn {
582 on_expr,
583 select_expr,
584 sort_expr,
585 ..
586 }) => {
587 let mut object = json!({
588 "Node Type": "DistinctOn",
589 "On": expr_vec_fmt!(on_expr),
590 "Select": expr_vec_fmt!(select_expr),
591 });
592 if let Some(sort_expr) = sort_expr {
593 object["Sort"] =
594 serde_json::Value::String(expr_vec_fmt!(sort_expr));
595 }
596
597 object
598 }
599 },
600 LogicalPlan::Explain { .. } => {
601 json!({
602 "Node Type": "Explain"
603 })
604 }
605 LogicalPlan::Analyze { .. } => {
606 json!({
607 "Node Type": "Analyze"
608 })
609 }
610 LogicalPlan::Union(_) => {
611 json!({
612 "Node Type": "Union"
613 })
614 }
615 LogicalPlan::Extension(e) => {
616 json!({
617 "Node Type": e.node.name(),
618 "Detail": format!("{:?}", e.node)
619 })
620 }
621 LogicalPlan::DescribeTable(DescribeTable { .. }) => {
622 json!({
623 "Node Type": "DescribeTable"
624 })
625 }
626 LogicalPlan::Unnest(Unnest {
627 input: plan,
628 list_type_columns: list_col_indices,
629 struct_type_columns: struct_col_indices,
630 ..
631 }) => {
632 let input_columns = plan.schema().columns();
633 let list_type_columns = list_col_indices
634 .iter()
635 .map(|(i, unnest_info)| {
636 format!(
637 "{}|depth={:?}",
638 &input_columns[*i].to_string(),
639 unnest_info.depth
640 )
641 })
642 .collect::<Vec<String>>();
643 let struct_type_columns = struct_col_indices
644 .iter()
645 .map(|i| &input_columns[*i])
646 .collect::<Vec<&Column>>();
647 json!({
648 "Node Type": "Unnest",
649 "ListColumn": expr_vec_fmt!(list_type_columns),
650 "StructColumn": expr_vec_fmt!(struct_type_columns),
651 })
652 }
653 }
654 }
655}
656
657impl<'n> TreeNodeVisitor<'n> for PgJsonVisitor<'_, '_> {
658 type Node = LogicalPlan;
659
660 fn f_down(
661 &mut self,
662 node: &'n LogicalPlan,
663 ) -> datafusion_common::Result<TreeNodeRecursion> {
664 let id = self.next_id;
665 self.next_id += 1;
666 let mut object = Self::to_json_value(node);
667
668 object["Plans"] = serde_json::Value::Array(vec![]);
669
670 if self.with_schema {
671 object["Output"] = serde_json::Value::Array(
672 node.schema()
673 .fields()
674 .iter()
675 .map(|f| f.name().to_string())
676 .map(serde_json::Value::String)
677 .collect(),
678 );
679 };
680
681 self.objects.insert(id, object);
682 self.parent_ids.push(id);
683 Ok(TreeNodeRecursion::Continue)
684 }
685
686 fn f_up(
687 &mut self,
688 _node: &Self::Node,
689 ) -> datafusion_common::Result<TreeNodeRecursion> {
690 let id = self.parent_ids.pop().unwrap();
691
692 let current_node = self.objects.remove(&id).ok_or_else(|| {
693 DataFusionError::Internal("Missing current node!".to_string())
694 })?;
695
696 if let Some(parent_id) = self.parent_ids.last() {
697 let parent_node = self
698 .objects
699 .get_mut(parent_id)
700 .expect("Missing parent node!");
701 let plans = parent_node
702 .get_mut("Plans")
703 .and_then(|p| p.as_array_mut())
704 .expect("Plans should be an array");
705
706 plans.push(current_node);
707 } else {
708 let plan = serde_json::json!([{"Plan": current_node}]);
710 write!(
711 self.f,
712 "{}",
713 serde_json::to_string_pretty(&plan)
714 .map_err(|e| DataFusionError::External(Box::new(e)))?
715 )?;
716 }
717
718 Ok(TreeNodeRecursion::Continue)
719 }
720}
721
722#[cfg(test)]
723mod tests {
724 use arrow::datatypes::{DataType, Field};
725
726 use super::*;
727
728 #[test]
729 fn test_display_empty_schema() {
730 let schema = Schema::empty();
731 assert_eq!("[]", format!("{}", display_schema(&schema)));
732 }
733
734 #[test]
735 fn test_display_schema() {
736 let schema = Schema::new(vec![
737 Field::new("id", DataType::Int32, false),
738 Field::new("first_name", DataType::Utf8, true),
739 ]);
740
741 assert_eq!(
742 "[id:Int32, first_name:Utf8;N]",
743 format!("{}", display_schema(&schema))
744 );
745 }
746}