1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
mod dictionary_encoded;
mod serialize;

use std::fmt::{self, Debug};
use std::io::Write;
use std::ops::{Range, RangeInclusive};
use std::sync::Arc;

use common::BinarySerializable;
pub use dictionary_encoded::{BytesColumn, StrColumn};
pub use serialize::{
    open_column_bytes, open_column_str, open_column_u128, open_column_u128_as_compact_u64,
    open_column_u64, serialize_column_mappable_to_u128, serialize_column_mappable_to_u64,
};

use crate::column_index::{ColumnIndex, Set};
use crate::column_values::monotonic_mapping::StrictlyMonotonicMappingToInternal;
use crate::column_values::{monotonic_map_column, ColumnValues};
use crate::{Cardinality, DocId, EmptyColumnValues, MonotonicallyMappableToU64, RowId};

#[derive(Clone)]
pub struct Column<T = u64> {
    pub index: ColumnIndex,
    pub values: Arc<dyn ColumnValues<T>>,
}

impl<T: Debug + PartialOrd + Send + Sync + Copy + 'static> Debug for Column<T> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        let num_docs = self.num_docs();
        let entries = (0..num_docs)
            .map(|i| (i, self.values_for_doc(i).collect::<Vec<_>>()))
            .filter(|(_, vals)| !vals.is_empty());
        f.debug_map().entries(entries).finish()
    }
}

impl<T: PartialOrd + Default> Column<T> {
    pub fn build_empty_column(num_docs: u32) -> Column<T> {
        Column {
            index: ColumnIndex::Empty { num_docs },
            values: Arc::new(EmptyColumnValues),
        }
    }
}

impl<T: MonotonicallyMappableToU64> Column<T> {
    pub fn to_u64_monotonic(self) -> Column<u64> {
        let values = Arc::new(monotonic_map_column(
            self.values,
            StrictlyMonotonicMappingToInternal::<T>::new(),
        ));
        Column {
            index: self.index,
            values,
        }
    }
}

impl<T: PartialOrd + Copy + Debug + Send + Sync + 'static> Column<T> {
    #[inline]
    pub fn get_cardinality(&self) -> Cardinality {
        self.index.get_cardinality()
    }

    pub fn num_docs(&self) -> RowId {
        match &self.index {
            ColumnIndex::Empty { num_docs } => *num_docs,
            ColumnIndex::Full => self.values.num_vals(),
            ColumnIndex::Optional(optional_index) => optional_index.num_docs(),
            ColumnIndex::Multivalued(col_index) => {
                // The multivalued index contains all value start row_id,
                // and one extra value at the end with the overall number of rows.
                col_index.num_docs()
            }
        }
    }

    pub fn min_value(&self) -> T {
        self.values.min_value()
    }

    pub fn max_value(&self) -> T {
        self.values.max_value()
    }

    #[inline]
    pub fn first(&self, row_id: RowId) -> Option<T> {
        self.values_for_doc(row_id).next()
    }

    /// Load the first value for each docid in the provided slice.
    #[inline]
    pub fn first_vals(&self, docids: &[DocId], output: &mut [Option<T>]) {
        match &self.index {
            ColumnIndex::Empty { .. } => {}
            ColumnIndex::Full => self.values.get_vals_opt(docids, output),
            ColumnIndex::Optional(optional_index) => {
                for (i, docid) in docids.iter().enumerate() {
                    output[i] = optional_index
                        .rank_if_exists(*docid)
                        .map(|rowid| self.values.get_val(rowid));
                }
            }
            ColumnIndex::Multivalued(multivalued_index) => {
                for (i, docid) in docids.iter().enumerate() {
                    let range = multivalued_index.range(*docid);
                    let is_empty = range.start == range.end;
                    if !is_empty {
                        output[i] = Some(self.values.get_val(range.start));
                    }
                }
            }
        }
    }

    /// Translates a block of docis to row_ids.
    ///
    /// returns the row_ids and the matching docids on the same index
    /// e.g.
    /// DocId In:  [0, 5, 6]
    /// DocId Out: [0, 0, 6, 6]
    /// RowId Out: [0, 1, 2, 3]
    #[inline]
    pub fn row_ids_for_docs(
        &self,
        doc_ids: &[DocId],
        doc_ids_out: &mut Vec<DocId>,
        row_ids: &mut Vec<RowId>,
    ) {
        self.index.docids_to_rowids(doc_ids, doc_ids_out, row_ids)
    }

    pub fn values_for_doc(&self, doc_id: DocId) -> impl Iterator<Item = T> + '_ {
        self.index
            .value_row_ids(doc_id)
            .map(|value_row_id: RowId| self.values.get_val(value_row_id))
    }

    /// Get the docids of values which are in the provided value range.
    #[inline]
    pub fn get_docids_for_value_range(
        &self,
        value_range: RangeInclusive<T>,
        selected_docid_range: Range<u32>,
        doc_ids: &mut Vec<u32>,
    ) {
        // convert passed docid range to row id range
        let rowid_range = self
            .index
            .docid_range_to_rowids(selected_docid_range.clone());

        // Load rows
        self.values
            .get_row_ids_for_value_range(value_range, rowid_range, doc_ids);
        // Convert rows to docids
        self.index
            .select_batch_in_place(selected_docid_range.start, doc_ids);
    }

    /// Fills the output vector with the (possibly multiple values that are associated_with
    /// `row_id`.
    ///
    /// This method clears the `output` vector.
    pub fn fill_vals(&self, row_id: RowId, output: &mut Vec<T>) {
        output.clear();
        output.extend(self.values_for_doc(row_id));
    }

    pub fn first_or_default_col(self, default_value: T) -> Arc<dyn ColumnValues<T>> {
        Arc::new(FirstValueWithDefault {
            column: self,
            default_value,
        })
    }
}

impl BinarySerializable for Cardinality {
    fn serialize<W: Write + ?Sized>(&self, writer: &mut W) -> std::io::Result<()> {
        self.to_code().serialize(writer)
    }

    fn deserialize<R: std::io::Read>(reader: &mut R) -> std::io::Result<Self> {
        let cardinality_code = u8::deserialize(reader)?;
        let cardinality = Cardinality::try_from_code(cardinality_code)?;
        Ok(cardinality)
    }
}

// TODO simplify or optimize
struct FirstValueWithDefault<T: Copy> {
    column: Column<T>,
    default_value: T,
}

impl<T: PartialOrd + Debug + Send + Sync + Copy + 'static> ColumnValues<T>
    for FirstValueWithDefault<T>
{
    #[inline(always)]
    fn get_val(&self, idx: u32) -> T {
        self.column.first(idx).unwrap_or(self.default_value)
    }

    fn min_value(&self) -> T {
        self.column.values.min_value()
    }

    fn max_value(&self) -> T {
        self.column.values.max_value()
    }

    fn num_vals(&self) -> u32 {
        match &self.column.index {
            ColumnIndex::Empty { .. } => 0u32,
            ColumnIndex::Full => self.column.values.num_vals(),
            ColumnIndex::Optional(optional_idx) => optional_idx.num_docs(),
            ColumnIndex::Multivalued(multivalue_idx) => multivalue_idx.num_docs(),
        }
    }
}