1pub mod checker;
2pub(crate) mod executor;
3pub(crate) mod iterators;
4pub(in crate::idx) mod knn;
5pub(crate) mod plan;
6pub(in crate::idx) mod rewriter;
7pub(in crate::idx) mod tree;
8
9use crate::ctx::Context;
10use crate::dbs::{Iterable, Iterator, Options, Statement};
11use crate::err::Error;
12use crate::idx::planner::executor::{InnerQueryExecutor, IteratorEntry, QueryExecutor};
13use crate::idx::planner::iterators::IteratorRef;
14use crate::idx::planner::knn::KnnBruteForceResults;
15use crate::idx::planner::plan::{Plan, PlanBuilder};
16use crate::idx::planner::tree::Tree;
17use crate::sql::with::With;
18use crate::sql::{order::Ordering, Cond, Fields, Groups, Table};
19use reblessive::tree::Stk;
20use std::collections::HashMap;
21use std::sync::atomic::{self, AtomicU8};
22
23pub(crate) struct StatementContext<'a> {
27 pub(crate) ctx: &'a Context,
28 pub(crate) opt: &'a Options,
29 pub(crate) ns: &'a str,
30 pub(crate) db: &'a str,
31 pub(crate) stm: &'a Statement<'a>,
32 pub(crate) fields: Option<&'a Fields>,
33 pub(crate) with: Option<&'a With>,
34 pub(crate) order: Option<&'a Ordering>,
35 pub(crate) cond: Option<&'a Cond>,
36 pub(crate) group: Option<&'a Groups>,
37 is_perm: bool,
38}
39
40#[derive(Clone, Copy, Debug)]
41pub(crate) enum RecordStrategy {
42 Count,
43 KeysOnly,
44 KeysAndValues,
45}
46
47#[derive(Clone, Copy, Debug)]
48pub(crate) enum GrantedPermission {
49 None,
50 Full,
51 Specific,
52}
53
54impl<'a> StatementContext<'a> {
55 pub(crate) fn new(
56 ctx: &'a Context,
57 opt: &'a Options,
58 stm: &'a Statement<'a>,
59 ) -> Result<Self, Error> {
60 let is_perm = opt.check_perms(stm.into())?;
61 let (ns, db) = opt.ns_db()?;
62 Ok(Self {
63 ctx,
64 opt,
65 stm,
66 ns,
67 db,
68 fields: stm.expr(),
69 with: stm.with(),
70 order: stm.order(),
71 cond: stm.cond(),
72 group: stm.group(),
73 is_perm,
74 })
75 }
76
77 pub(crate) async fn check_table_permission(
78 &self,
79 tb: &str,
80 ) -> Result<GrantedPermission, Error> {
81 if !self.is_perm {
82 return Ok(GrantedPermission::Full);
83 }
84 match self.ctx.tx().get_tb(self.ns, self.db, tb).await {
86 Ok(table) => {
87 let perms = self.stm.permissions(&table, false);
93 if perms.is_specific() {
96 return Ok(GrantedPermission::Specific);
97 }
98 if perms.is_none() {
101 return Ok(GrantedPermission::None);
102 }
103 }
104 Err(Error::TbNotFound {
105 ..
106 }) => {
107 }
111 Err(e) => return Err(e),
112 }
113 Ok(GrantedPermission::Full)
114 }
115
116 pub(crate) async fn check_record_strategy(
117 &self,
118 with_all_indexes: bool,
119 granted_permission: GrantedPermission,
120 ) -> Result<RecordStrategy, Error> {
121 if !with_all_indexes && self.cond.is_some() {
125 return Ok(RecordStrategy::KeysAndValues);
126 }
127
128 let is_group_all = if let Some(g) = self.group {
132 if !g.is_empty() {
133 return Ok(RecordStrategy::KeysAndValues);
134 }
135 true
136 } else {
137 false
138 };
139
140 if let Some(p) = self.order {
144 match p {
145 Ordering::Random => {}
146 Ordering::Order(x) => {
147 if !x.is_empty() {
148 return Ok(RecordStrategy::KeysAndValues);
149 }
150 }
151 }
152 }
153
154 let is_count_all = if let Some(fields) = self.fields {
158 if !fields.is_count_all_only() {
159 return Ok(RecordStrategy::KeysAndValues);
160 }
161 true
162 } else {
163 false
164 };
165
166 if matches!(granted_permission, GrantedPermission::Specific) {
170 return Ok(RecordStrategy::KeysAndValues);
171 }
172
173 if is_count_all && is_group_all {
175 return Ok(RecordStrategy::Count);
176 }
177 Ok(RecordStrategy::KeysOnly)
179 }
180}
181
182pub(crate) struct QueryPlanner {
183 executors: HashMap<String, QueryExecutor>,
185 requires_distinct: bool,
186 fallbacks: Vec<String>,
187 iteration_workflow: Vec<IterationStage>,
188 iteration_index: AtomicU8,
189 orders: Vec<IteratorRef>,
190 granted_permissions: HashMap<String, GrantedPermission>,
191 any_specific_permission: bool,
192}
193
194impl QueryPlanner {
195 pub(crate) fn new() -> Self {
196 Self {
197 executors: HashMap::default(),
198 requires_distinct: false,
199 fallbacks: vec![],
200 iteration_workflow: Vec::default(),
201 iteration_index: AtomicU8::new(0),
202 orders: vec![],
203 granted_permissions: HashMap::default(),
204 any_specific_permission: false,
205 }
206 }
207
208 pub(crate) async fn check_table_permission(
211 &mut self,
212 ctx: &StatementContext<'_>,
213 tb: &str,
214 ) -> Result<GrantedPermission, Error> {
215 if ctx.is_perm {
216 if let Some(p) = self.granted_permissions.get(tb) {
217 return Ok(*p);
218 }
219 let p = ctx.check_table_permission(tb).await?;
220 self.granted_permissions.insert(tb.to_string(), p);
221 if matches!(p, GrantedPermission::Specific) {
222 self.any_specific_permission = true;
223 }
224 return Ok(p);
225 }
226 Ok(GrantedPermission::Full)
227 }
228
229 pub(crate) async fn add_iterables(
230 &mut self,
231 stk: &mut Stk,
232 ctx: &StatementContext<'_>,
233 t: Table,
234 gp: GrantedPermission,
235 it: &mut Iterator,
236 ) -> Result<(), Error> {
237 let mut is_table_iterator = false;
238
239 let tree = Tree::build(stk, ctx, &t).await?;
240
241 let is_knn = !tree.knn_expressions.is_empty();
242 let mut exe = InnerQueryExecutor::new(
243 stk,
244 ctx.ctx,
245 ctx.opt,
246 &t,
247 tree.index_map.options,
248 tree.knn_expressions,
249 tree.knn_brute_force_expressions,
250 tree.knn_condition,
251 )
252 .await?;
253 match PlanBuilder::build(
254 gp,
255 tree.root,
256 ctx,
257 tree.with_indexes,
258 tree.index_map.compound_indexes,
259 tree.index_map.order_limit,
260 tree.all_and_groups,
261 tree.all_and,
262 tree.all_expressions_with_index,
263 )
264 .await?
265 {
266 Plan::SingleIndex(exp, io, rs) => {
267 if io.require_distinct() {
268 self.requires_distinct = true;
269 }
270 let is_order = exp.is_none();
271 let ir = exe.add_iterator(IteratorEntry::Single(exp, io));
272 self.add(t.clone(), Some(ir), exe, it, rs);
273 if is_order {
274 self.orders.push(ir);
275 }
276 }
277 Plan::MultiIndex(non_range_indexes, ranges_indexes, rs) => {
278 for (exp, io) in non_range_indexes {
279 let ie = IteratorEntry::Single(Some(exp), io);
280 let ir = exe.add_iterator(ie);
281 it.ingest(Iterable::Index(t.clone(), ir, rs));
282 }
283 for (ixr, rq) in ranges_indexes {
284 let ie = IteratorEntry::Range(rq.exps, ixr, rq.from, rq.to);
285 let ir = exe.add_iterator(ie);
286 it.ingest(Iterable::Index(t.clone(), ir, rs));
287 }
288 self.requires_distinct = true;
289 self.add(t.clone(), None, exe, it, rs);
290 }
291 Plan::SingleIndexRange(ixn, rq, keys_only) => {
292 let ir = exe.add_iterator(IteratorEntry::Range(rq.exps, ixn, rq.from, rq.to));
293 self.add(t.clone(), Some(ir), exe, it, keys_only);
294 }
295 Plan::TableIterator(reason, rs) => {
296 if let Some(reason) = reason {
297 self.fallbacks.push(reason);
298 }
299 self.add(t.clone(), None, exe, it, rs);
300 it.ingest(Iterable::Table(t, rs));
301 is_table_iterator = true;
302 }
303 }
304 if is_knn && is_table_iterator {
305 self.iteration_workflow = vec![IterationStage::CollectKnn, IterationStage::BuildKnn];
306 } else {
307 self.iteration_workflow = vec![IterationStage::Iterate(None)];
308 }
309 Ok(())
310 }
311
312 fn add(
313 &mut self,
314 tb: Table,
315 irf: Option<IteratorRef>,
316 exe: InnerQueryExecutor,
317 it: &mut Iterator,
318 rs: RecordStrategy,
319 ) {
320 self.executors.insert(tb.0.clone(), exe.into());
321 if let Some(irf) = irf {
322 it.ingest(Iterable::Index(tb, irf, rs));
323 }
324 }
325 pub(crate) fn has_executors(&self) -> bool {
326 !self.executors.is_empty()
327 }
328
329 pub(crate) fn get_query_executor(&self, tb: &str) -> Option<&QueryExecutor> {
330 self.executors.get(tb)
331 }
332
333 pub(crate) fn requires_distinct(&self) -> bool {
334 self.requires_distinct
335 }
336
337 pub(crate) fn fallbacks(&self) -> &Vec<String> {
338 &self.fallbacks
339 }
340
341 pub(crate) fn is_order(&self, irf: &IteratorRef) -> bool {
342 self.orders.contains(irf)
343 }
344
345 pub(crate) fn is_any_specific_permission(&self) -> bool {
346 self.any_specific_permission
347 }
348
349 pub(crate) async fn next_iteration_stage(&self) -> Option<IterationStage> {
350 let pos = self.iteration_index.fetch_add(1, atomic::Ordering::Relaxed);
351 match self.iteration_workflow.get(pos as usize) {
352 Some(IterationStage::BuildKnn) => {
353 Some(IterationStage::Iterate(Some(self.build_bruteforce_knn_results().await)))
354 }
355 is => is.cloned(),
356 }
357 }
358
359 async fn build_bruteforce_knn_results(&self) -> KnnBruteForceResults {
360 let mut results = HashMap::with_capacity(self.executors.len());
361 for (tb, exe) in &self.executors {
362 results.insert(tb.clone(), exe.build_bruteforce_knn_result().await);
363 }
364 results.into()
365 }
366}
367
368#[derive(Clone)]
369pub(crate) enum IterationStage {
370 Iterate(Option<KnnBruteForceResults>),
371 CollectKnn,
372 BuildKnn,
373}