datafusion_physical_plan/
explain.rs1use std::any::Any;
21use std::sync::Arc;
22
23use super::{DisplayAs, PlanProperties, SendableRecordBatchStream};
24use crate::execution_plan::{Boundedness, EmissionType};
25use crate::stream::RecordBatchStreamAdapter;
26use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
27
28use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
29use datafusion_common::display::StringifiedPlan;
30use datafusion_common::{internal_err, Result};
31use datafusion_execution::TaskContext;
32use datafusion_physical_expr::EquivalenceProperties;
33
34use log::trace;
35
36#[derive(Debug, Clone)]
40pub struct ExplainExec {
41 schema: SchemaRef,
43 stringified_plans: Vec<StringifiedPlan>,
45 verbose: bool,
47 cache: PlanProperties,
48}
49
50impl ExplainExec {
51 pub fn new(
53 schema: SchemaRef,
54 stringified_plans: Vec<StringifiedPlan>,
55 verbose: bool,
56 ) -> Self {
57 let cache = Self::compute_properties(Arc::clone(&schema));
58 ExplainExec {
59 schema,
60 stringified_plans,
61 verbose,
62 cache,
63 }
64 }
65
66 pub fn stringified_plans(&self) -> &[StringifiedPlan] {
68 &self.stringified_plans
69 }
70
71 pub fn verbose(&self) -> bool {
73 self.verbose
74 }
75
76 fn compute_properties(schema: SchemaRef) -> PlanProperties {
78 PlanProperties::new(
79 EquivalenceProperties::new(schema),
80 Partitioning::UnknownPartitioning(1),
81 EmissionType::Final,
82 Boundedness::Bounded,
83 )
84 }
85}
86
87impl DisplayAs for ExplainExec {
88 fn fmt_as(
89 &self,
90 t: DisplayFormatType,
91 f: &mut std::fmt::Formatter,
92 ) -> std::fmt::Result {
93 match t {
94 DisplayFormatType::Default | DisplayFormatType::Verbose => {
95 write!(f, "ExplainExec")
96 }
97 }
98 }
99}
100
101impl ExecutionPlan for ExplainExec {
102 fn name(&self) -> &'static str {
103 "ExplainExec"
104 }
105
106 fn as_any(&self) -> &dyn Any {
108 self
109 }
110
111 fn properties(&self) -> &PlanProperties {
112 &self.cache
113 }
114
115 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
116 vec![]
118 }
119
120 fn with_new_children(
121 self: Arc<Self>,
122 _: Vec<Arc<dyn ExecutionPlan>>,
123 ) -> Result<Arc<dyn ExecutionPlan>> {
124 Ok(self)
125 }
126
127 fn execute(
128 &self,
129 partition: usize,
130 context: Arc<TaskContext>,
131 ) -> Result<SendableRecordBatchStream> {
132 trace!("Start ExplainExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
133 if 0 != partition {
134 return internal_err!("ExplainExec invalid partition {partition}");
135 }
136 let mut type_builder =
137 StringBuilder::with_capacity(self.stringified_plans.len(), 1024);
138 let mut plan_builder =
139 StringBuilder::with_capacity(self.stringified_plans.len(), 1024);
140
141 let plans_to_print = self
142 .stringified_plans
143 .iter()
144 .filter(|s| s.should_display(self.verbose));
145
146 let mut prev: Option<&StringifiedPlan> = None;
148
149 for p in plans_to_print {
150 type_builder.append_value(p.plan_type.to_string());
151 match prev {
152 Some(prev) if !should_show(prev, p) => {
153 plan_builder.append_value("SAME TEXT AS ABOVE");
154 }
155 Some(_) | None => {
156 plan_builder.append_value(&*p.plan);
157 }
158 }
159 prev = Some(p);
160 }
161
162 let record_batch = RecordBatch::try_new(
163 Arc::clone(&self.schema),
164 vec![
165 Arc::new(type_builder.finish()),
166 Arc::new(plan_builder.finish()),
167 ],
168 )?;
169
170 trace!(
171 "Before returning RecordBatchStream in ExplainExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
172
173 Ok(Box::pin(RecordBatchStreamAdapter::new(
174 Arc::clone(&self.schema),
175 futures::stream::iter(vec![Ok(record_batch)]),
176 )))
177 }
178}
179
180fn should_show(previous_plan: &StringifiedPlan, this_plan: &StringifiedPlan) -> bool {
186 (previous_plan.plan != this_plan.plan) || this_plan.should_display(false)
189}