polars_stream/
skeleton.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#![allow(unused)] // TODO: remove me
use std::cmp::Reverse;

use polars_core::prelude::*;
use polars_core::POOL;
use polars_expr::planner::{create_physical_expr, get_expr_depth_limit, ExpressionConversionState};
use polars_plan::plans::{Context, IRPlan, IsStreamableContext, IR};
use polars_plan::prelude::expr_ir::ExprIR;
use polars_plan::prelude::AExpr;
use polars_utils::arena::{Arena, Node};
use slotmap::{SecondaryMap, SlotMap};

fn is_streamable(node: Node, arena: &Arena<AExpr>) -> bool {
    polars_plan::plans::is_streamable(node, arena, IsStreamableContext::new(Context::Default))
}

pub fn run_query(
    node: Node,
    mut ir_arena: Arena<IR>,
    expr_arena: &mut Arena<AExpr>,
) -> PolarsResult<Option<DataFrame>> {
    if let Ok(visual_path) = std::env::var("POLARS_VISUALIZE_IR") {
        let plan = IRPlan {
            lp_top: node,
            lp_arena: ir_arena.clone(),
            expr_arena: expr_arena.clone(),
        };
        let visualization = plan.display_dot().to_string();
        std::fs::write(visual_path, visualization).unwrap();
    }
    let mut phys_sm = SlotMap::with_capacity_and_key(ir_arena.len());
    let root =
        crate::physical_plan::build_physical_plan(node, &mut ir_arena, expr_arena, &mut phys_sm)?;
    if let Ok(visual_path) = std::env::var("POLARS_VISUALIZE_PHYSICAL_PLAN") {
        let visualization = crate::physical_plan::visualize_plan(root, &phys_sm, expr_arena);
        std::fs::write(visual_path, visualization).unwrap();
    }
    let (mut graph, phys_to_graph) =
        crate::physical_plan::physical_plan_to_graph(root, &phys_sm, expr_arena)?;
    crate::async_executor::clear_task_wait_statistics();
    let mut results = crate::execute::execute_graph(&mut graph)?;
    if std::env::var("POLARS_TRACK_WAIT_STATS").as_deref() == Ok("1") {
        let mut stats = crate::async_executor::get_task_wait_statistics();
        stats.sort_by_key(|(_l, w)| Reverse(*w));
        eprintln!("Time spent waiting for async tasks:");
        for (loc, wait_time) in stats {
            eprintln!("{}:{} - {:?}", loc.file(), loc.line(), wait_time);
        }
    }

    Ok(results.remove(phys_to_graph[root]))
}