arrow_row/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! A comparable row-oriented representation of a collection of [`Array`].
19//!
20//! [`Row`]s are [normalized for sorting], and can therefore be very efficiently [compared],
21//! using [`memcmp`] under the hood, or used in [non-comparison sorts] such as [radix sort].
22//! This makes the row format ideal for implementing efficient multi-column sorting,
23//! grouping, aggregation, windowing and more, as described in more detail
24//! [in this blog post](https://arrow.apache.org/blog/2022/11/07/multi-column-sorts-in-arrow-rust-part-1/).
25//!
26//! For example, given three input [`Array`], [`RowConverter`] creates byte
27//! sequences that [compare] the same as when using [`lexsort`].
28//!
29//! ```text
30//!    ┌─────┐   ┌─────┐   ┌─────┐
31//!    │     │   │     │   │     │
32//!    ├─────┤ ┌ ┼─────┼ ─ ┼─────┼ ┐              ┏━━━━━━━━━━━━━┓
33//!    │     │   │     │   │     │  ─────────────▶┃             ┃
34//!    ├─────┤ └ ┼─────┼ ─ ┼─────┼ ┘              ┗━━━━━━━━━━━━━┛
35//!    │     │   │     │   │     │
36//!    └─────┘   └─────┘   └─────┘
37//!                ...
38//!    ┌─────┐ ┌ ┬─────┬ ─ ┬─────┬ ┐              ┏━━━━━━━━┓
39//!    │     │   │     │   │     │  ─────────────▶┃        ┃
40//!    └─────┘ └ ┴─────┴ ─ ┴─────┴ ┘              ┗━━━━━━━━┛
41//!     UInt64      Utf8     F64
42//!
43//!           Input Arrays                          Row Format
44//!     (Columns)
45//! ```
46//!
47//! _[`Rows`] must be generated by the same [`RowConverter`] for the comparison
48//! to be meaningful._
49//!
50//! # Basic Example
51//! ```
52//! # use std::sync::Arc;
53//! # use arrow_row::{RowConverter, SortField};
54//! # use arrow_array::{ArrayRef, Int32Array, StringArray};
55//! # use arrow_array::cast::{AsArray, as_string_array};
56//! # use arrow_array::types::Int32Type;
57//! # use arrow_schema::DataType;
58//!
59//! let a1 = Arc::new(Int32Array::from_iter_values([-1, -1, 0, 3, 3])) as ArrayRef;
60//! let a2 = Arc::new(StringArray::from_iter_values(["a", "b", "c", "d", "d"])) as ArrayRef;
61//! let arrays = vec![a1, a2];
62//!
63//! // Convert arrays to rows
64//! let converter = RowConverter::new(vec![
65//!     SortField::new(DataType::Int32),
66//!     SortField::new(DataType::Utf8),
67//! ]).unwrap();
68//! let rows = converter.convert_columns(&arrays).unwrap();
69//!
70//! // Compare rows
71//! for i in 0..4 {
72//!     assert!(rows.row(i) <= rows.row(i + 1));
73//! }
74//! assert_eq!(rows.row(3), rows.row(4));
75//!
76//! // Convert rows back to arrays
77//! let converted = converter.convert_rows(&rows).unwrap();
78//! assert_eq!(arrays, converted);
79//!
80//! // Compare rows from different arrays
81//! let a1 = Arc::new(Int32Array::from_iter_values([3, 4])) as ArrayRef;
82//! let a2 = Arc::new(StringArray::from_iter_values(["e", "f"])) as ArrayRef;
83//! let arrays = vec![a1, a2];
84//! let rows2 = converter.convert_columns(&arrays).unwrap();
85//!
86//! assert!(rows.row(4) < rows2.row(0));
87//! assert!(rows.row(4) < rows2.row(1));
88//!
89//! // Convert selection of rows back to arrays
90//! let selection = [rows.row(0), rows2.row(1), rows.row(2), rows2.row(0)];
91//! let converted = converter.convert_rows(selection).unwrap();
92//! let c1 = converted[0].as_primitive::<Int32Type>();
93//! assert_eq!(c1.values(), &[-1, 4, 0, 3]);
94//!
95//! let c2 = converted[1].as_string::<i32>();
96//! let c2_values: Vec<_> = c2.iter().flatten().collect();
97//! assert_eq!(&c2_values, &["a", "f", "c", "e"]);
98//! ```
99//!
100//! # Lexsort
101//!
102//! The row format can also be used to implement a fast multi-column / lexicographic sort
103//!
104//! ```
105//! # use arrow_row::{RowConverter, SortField};
106//! # use arrow_array::{ArrayRef, UInt32Array};
107//! fn lexsort_to_indices(arrays: &[ArrayRef]) -> UInt32Array {
108//!     let fields = arrays
109//!         .iter()
110//!         .map(|a| SortField::new(a.data_type().clone()))
111//!         .collect();
112//!     let converter = RowConverter::new(fields).unwrap();
113//!     let rows = converter.convert_columns(arrays).unwrap();
114//!     let mut sort: Vec<_> = rows.iter().enumerate().collect();
115//!     sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
116//!     UInt32Array::from_iter_values(sort.iter().map(|(i, _)| *i as u32))
117//! }
118//! ```
119//!
120//! [non-comparison sorts]: https://en.wikipedia.org/wiki/Sorting_algorithm#Non-comparison_sorts
121//! [radix sort]: https://en.wikipedia.org/wiki/Radix_sort
122//! [normalized for sorting]: http://wwwlgis.informatik.uni-kl.de/archiv/wwwdvs.informatik.uni-kl.de/courses/DBSREAL/SS2005/Vorlesungsunterlagen/Implementing_Sorting.pdf
123//! [`memcmp`]: https://www.man7.org/linux/man-pages/man3/memcmp.3.html
124//! [`lexsort`]: https://docs.rs/arrow-ord/latest/arrow_ord/sort/fn.lexsort.html
125//! [compared]: PartialOrd
126//! [compare]: PartialOrd
127
128#![warn(missing_docs)]
129use std::cmp::Ordering;
130use std::hash::{Hash, Hasher};
131use std::sync::Arc;
132
133use arrow_array::cast::*;
134use arrow_array::types::ArrowDictionaryKeyType;
135use arrow_array::*;
136use arrow_buffer::{ArrowNativeType, Buffer, OffsetBuffer, ScalarBuffer};
137use arrow_data::ArrayDataBuilder;
138use arrow_schema::*;
139use variable::{decode_binary_view, decode_string_view};
140
141use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive};
142use crate::variable::{decode_binary, decode_string};
143
144mod fixed;
145mod list;
146mod variable;
147
148/// Converts [`ArrayRef`] columns into a [row-oriented](self) format.
149///
150/// *Note: The encoding of the row format may change from release to release.*
151///
152/// ## Overview
153///
154/// The row format is a variable length byte sequence created by
155/// concatenating the encoded form of each column. The encoding for
156/// each column depends on its datatype (and sort options).
157///
158/// The encoding is carefully designed in such a way that escaping is
159/// unnecessary: it is never ambiguous as to whether a byte is part of
160/// a sentinel (e.g. null) or a value.
161///
162/// ## Unsigned Integer Encoding
163///
164/// A null integer is encoded as a `0_u8`, followed by a zero-ed number of bytes corresponding
165/// to the integer's length.
166///
167/// A valid integer is encoded as `1_u8`, followed by the big-endian representation of the
168/// integer.
169///
170/// ```text
171///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
172///    3          │03│00│00│00│      │01│00│00│00│03│
173///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
174///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
175///   258         │02│01│00│00│      │01│00│00│01│02│
176///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
177///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
178///  23423        │7F│5B│00│00│      │01│00│00│5B│7F│
179///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
180///               ┌──┬──┬──┬──┐      ┌──┬──┬──┬──┬──┐
181///  NULL         │??│??│??│??│      │00│00│00│00│00│
182///               └──┴──┴──┴──┘      └──┴──┴──┴──┴──┘
183///
184///              32-bit (4 bytes)        Row Format
185///  Value        Little Endian
186/// ```
187///
188/// ## Signed Integer Encoding
189///
190/// Signed integers have their most significant sign bit flipped, and are then encoded in the
191/// same manner as an unsigned integer.
192///
193/// ```text
194///        ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┬──┐
195///     5  │05│00│00│00│       │05│00│00│80│       │01│80│00│00│05│
196///        └──┴──┴──┴──┘       └──┴──┴──┴──┘       └──┴──┴──┴──┴──┘
197///        ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┐       ┌──┬──┬──┬──┬──┐
198///    -5  │FB│FF│FF│FF│       │FB│FF│FF│7F│       │01│7F│FF│FF│FB│
199///        └──┴──┴──┴──┘       └──┴──┴──┴──┘       └──┴──┴──┴──┴──┘
200///
201///  Value  32-bit (4 bytes)    High bit flipped      Row Format
202///          Little Endian
203/// ```
204///
205/// ## Float Encoding
206///
207/// Floats are converted from IEEE 754 representation to a signed integer representation
208/// by flipping all bar the sign bit if they are negative.
209///
210/// They are then encoded in the same manner as a signed integer.
211///
212/// ## Fixed Length Bytes Encoding
213///
214/// Fixed length bytes are encoded in the same fashion as primitive types above.
215///
216/// For a fixed length array of length `n`:
217///
218/// A null is encoded as `0_u8` null sentinel followed by `n` `0_u8` bytes
219///
220/// A valid value is encoded as `1_u8` followed by the value bytes
221///
222/// ## Variable Length Bytes (including Strings) Encoding
223///
224/// A null is encoded as a `0_u8`.
225///
226/// An empty byte array is encoded as `1_u8`.
227///
228/// A non-null, non-empty byte array is encoded as `2_u8` followed by the byte array
229/// encoded using a block based scheme described below.
230///
231/// The byte array is broken up into fixed-width blocks, each block is written in turn
232/// to the output, followed by `0xFF_u8`. The final block is padded to 32-bytes
233/// with `0_u8` and written to the output, followed by the un-padded length in bytes
234/// of this final block as a `u8`. The first 4 blocks have a length of 8, with subsequent
235/// blocks using a length of 32, this is to reduce space amplification for small strings.
236///
237/// Note the following example encodings use a block size of 4 bytes for brevity:
238///
239/// ```text
240///                       ┌───┬───┬───┬───┬───┬───┐
241///  "MEEP"               │02 │'M'│'E'│'E'│'P'│04 │
242///                       └───┴───┴───┴───┴───┴───┘
243///
244///                       ┌───┐
245///  ""                   │01 |
246///                       └───┘
247///
248///  NULL                 ┌───┐
249///                       │00 │
250///                       └───┘
251///
252/// "Defenestration"      ┌───┬───┬───┬───┬───┬───┐
253///                       │02 │'D'│'e'│'f'│'e'│FF │
254///                       └───┼───┼───┼───┼───┼───┤
255///                           │'n'│'e'│'s'│'t'│FF │
256///                           ├───┼───┼───┼───┼───┤
257///                           │'r'│'a'│'t'│'r'│FF │
258///                           ├───┼───┼───┼───┼───┤
259///                           │'a'│'t'│'i'│'o'│FF │
260///                           ├───┼───┼───┼───┼───┤
261///                           │'n'│00 │00 │00 │01 │
262///                           └───┴───┴───┴───┴───┘
263/// ```
264///
265/// This approach is loosely inspired by [COBS] encoding, and chosen over more traditional
266/// [byte stuffing] as it is more amenable to vectorisation, in particular AVX-256.
267///
268/// ## Dictionary Encoding
269///
270/// Dictionaries are hydrated to their underlying values
271///
272/// ## Struct Encoding
273///
274/// A null is encoded as a `0_u8`.
275///
276/// A valid value is encoded as `1_u8` followed by the row encoding of each child.
277///
278/// This encoding effectively flattens the schema in a depth-first fashion.
279///
280/// For example
281///
282/// ```text
283/// ┌───────┬────────────────────────┬───────┐
284/// │ Int32 │ Struct[Int32, Float32] │ Int32 │
285/// └───────┴────────────────────────┴───────┘
286/// ```
287///
288/// Is encoded as
289///
290/// ```text
291/// ┌───────┬───────────────┬───────┬─────────┬───────┐
292/// │ Int32 │ Null Sentinel │ Int32 │ Float32 │ Int32 │
293/// └───────┴───────────────┴───────┴─────────┴───────┘
294/// ```
295///
296/// ## List Encoding
297///
298/// Lists are encoded by first encoding all child elements to the row format.
299///
300/// A list value is then encoded as the concatenation of each of the child elements,
301/// separately encoded using the variable length encoding described above, followed
302/// by the variable length encoding of an empty byte array.
303///
304/// For example given:
305///
306/// ```text
307/// [1_u8, 2_u8, 3_u8]
308/// [1_u8, null]
309/// []
310/// null
311/// ```
312///
313/// The elements would be converted to:
314///
315/// ```text
316///     ┌──┬──┐     ┌──┬──┐     ┌──┬──┐     ┌──┬──┐        ┌──┬──┐
317///  1  │01│01│  2  │01│02│  3  │01│03│  1  │01│01│  null  │00│00│
318///     └──┴──┘     └──┴──┘     └──┴──┘     └──┴──┘        └──┴──┘
319///```
320///
321/// Which would be encoded as
322///
323/// ```text
324///                         ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
325///  [1_u8, 2_u8, 3_u8]     │02│01│01│00│00│02│02│01│02│00│00│02│02│01│03│00│00│02│01│
326///                         └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
327///                          └──── 1_u8 ────┘   └──── 2_u8 ────┘  └──── 3_u8 ────┘
328///
329///                         ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
330///  [1_u8, null]           │02│01│01│00│00│02│02│00│00│00│00│02│01│
331///                         └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
332///                          └──── 1_u8 ────┘   └──── null ────┘
333///
334///```
335///
336/// With `[]` represented by an empty byte array, and `null` a null byte array.
337///
338/// # Ordering
339///
340/// ## Float Ordering
341///
342/// Floats are totally ordered in accordance to the `totalOrder` predicate as defined
343/// in the IEEE 754 (2008 revision) floating point standard.
344///
345/// The ordering established by this does not always agree with the
346/// [`PartialOrd`] and [`PartialEq`] implementations of `f32`. For example,
347/// they consider negative and positive zero equal, while this does not
348///
349/// ## Null Ordering
350///
351/// The encoding described above will order nulls first, this can be inverted by representing
352/// nulls as `0xFF_u8` instead of `0_u8`
353///
354/// ## Reverse Column Ordering
355///
356/// The order of a given column can be reversed by negating the encoded bytes of non-null values
357///
358/// [COBS]: https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing
359/// [byte stuffing]: https://en.wikipedia.org/wiki/High-Level_Data_Link_Control#Asynchronous_framing
360#[derive(Debug)]
361pub struct RowConverter {
362    fields: Arc<[SortField]>,
363    /// State for codecs
364    codecs: Vec<Codec>,
365}
366
367#[derive(Debug)]
368enum Codec {
369    /// No additional codec state is necessary
370    Stateless,
371    /// A row converter for the dictionary values
372    /// and the encoding of a row containing only nulls
373    Dictionary(RowConverter, OwnedRow),
374    /// A row converter for the child fields
375    /// and the encoding of a row containing only nulls
376    Struct(RowConverter, OwnedRow),
377    /// A row converter for the child field
378    List(RowConverter),
379}
380
381impl Codec {
382    fn new(sort_field: &SortField) -> Result<Self, ArrowError> {
383        match &sort_field.data_type {
384            DataType::Dictionary(_, values) => {
385                let sort_field =
386                    SortField::new_with_options(values.as_ref().clone(), sort_field.options);
387
388                let converter = RowConverter::new(vec![sort_field])?;
389                let null_array = new_null_array(values.as_ref(), 1);
390                let nulls = converter.convert_columns(&[null_array])?;
391
392                let owned = OwnedRow {
393                    data: nulls.buffer.into(),
394                    config: nulls.config,
395                };
396                Ok(Self::Dictionary(converter, owned))
397            }
398            d if !d.is_nested() => Ok(Self::Stateless),
399            DataType::List(f) | DataType::LargeList(f) => {
400                // The encoded contents will be inverted if descending is set to true
401                // As such we set `descending` to false and negate nulls first if it
402                // it set to true
403                let options = SortOptions {
404                    descending: false,
405                    nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
406                };
407
408                let field = SortField::new_with_options(f.data_type().clone(), options);
409                let converter = RowConverter::new(vec![field])?;
410                Ok(Self::List(converter))
411            }
412            DataType::Struct(f) => {
413                let sort_fields = f
414                    .iter()
415                    .map(|x| SortField::new_with_options(x.data_type().clone(), sort_field.options))
416                    .collect();
417
418                let converter = RowConverter::new(sort_fields)?;
419                let nulls: Vec<_> = f.iter().map(|x| new_null_array(x.data_type(), 1)).collect();
420
421                let nulls = converter.convert_columns(&nulls)?;
422                let owned = OwnedRow {
423                    data: nulls.buffer.into(),
424                    config: nulls.config,
425                };
426
427                Ok(Self::Struct(converter, owned))
428            }
429            _ => Err(ArrowError::NotYetImplemented(format!(
430                "not yet implemented: {:?}",
431                sort_field.data_type
432            ))),
433        }
434    }
435
436    fn encoder(&self, array: &dyn Array) -> Result<Encoder<'_>, ArrowError> {
437        match self {
438            Codec::Stateless => Ok(Encoder::Stateless),
439            Codec::Dictionary(converter, nulls) => {
440                let values = array.as_any_dictionary().values().clone();
441                let rows = converter.convert_columns(&[values])?;
442                Ok(Encoder::Dictionary(rows, nulls.row()))
443            }
444            Codec::Struct(converter, null) => {
445                let v = as_struct_array(array);
446                let rows = converter.convert_columns(v.columns())?;
447                Ok(Encoder::Struct(rows, null.row()))
448            }
449            Codec::List(converter) => {
450                let values = match array.data_type() {
451                    DataType::List(_) => as_list_array(array).values(),
452                    DataType::LargeList(_) => as_large_list_array(array).values(),
453                    _ => unreachable!(),
454                };
455                let rows = converter.convert_columns(&[values.clone()])?;
456                Ok(Encoder::List(rows))
457            }
458        }
459    }
460
461    fn size(&self) -> usize {
462        match self {
463            Codec::Stateless => 0,
464            Codec::Dictionary(converter, nulls) => converter.size() + nulls.data.len(),
465            Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(),
466            Codec::List(converter) => converter.size(),
467        }
468    }
469}
470
471#[derive(Debug)]
472enum Encoder<'a> {
473    /// No additional encoder state is necessary
474    Stateless,
475    /// The encoding of the child array and the encoding of a null row
476    Dictionary(Rows, Row<'a>),
477    /// The row encoding of the child arrays and the encoding of a null row
478    ///
479    /// It is necessary to encode to a temporary [`Rows`] to avoid serializing
480    /// values that are masked by a null in the parent StructArray, otherwise
481    /// this would establish an ordering between semantically null values
482    Struct(Rows, Row<'a>),
483    /// The row encoding of the child array
484    List(Rows),
485}
486
487/// Configure the data type and sort order for a given column
488#[derive(Debug, Clone, PartialEq, Eq)]
489pub struct SortField {
490    /// Sort options
491    options: SortOptions,
492    /// Data type
493    data_type: DataType,
494}
495
496impl SortField {
497    /// Create a new column with the given data type
498    pub fn new(data_type: DataType) -> Self {
499        Self::new_with_options(data_type, Default::default())
500    }
501
502    /// Create a new column with the given data type and [`SortOptions`]
503    pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self {
504        Self { options, data_type }
505    }
506
507    /// Return size of this instance in bytes.
508    ///
509    /// Includes the size of `Self`.
510    pub fn size(&self) -> usize {
511        self.data_type.size() + std::mem::size_of::<Self>() - std::mem::size_of::<DataType>()
512    }
513}
514
515impl RowConverter {
516    /// Create a new [`RowConverter`] with the provided schema
517    pub fn new(fields: Vec<SortField>) -> Result<Self, ArrowError> {
518        if !Self::supports_fields(&fields) {
519            return Err(ArrowError::NotYetImplemented(format!(
520                "Row format support not yet implemented for: {fields:?}"
521            )));
522        }
523
524        let codecs = fields.iter().map(Codec::new).collect::<Result<_, _>>()?;
525        Ok(Self {
526            fields: fields.into(),
527            codecs,
528        })
529    }
530
531    /// Check if the given fields are supported by the row format.
532    pub fn supports_fields(fields: &[SortField]) -> bool {
533        fields.iter().all(|x| Self::supports_datatype(&x.data_type))
534    }
535
536    fn supports_datatype(d: &DataType) -> bool {
537        match d {
538            _ if !d.is_nested() => true,
539            DataType::List(f) | DataType::LargeList(f) | DataType::Map(f, _) => {
540                Self::supports_datatype(f.data_type())
541            }
542            DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())),
543            _ => false,
544        }
545    }
546
547    /// Convert [`ArrayRef`] columns into [`Rows`]
548    ///
549    /// See [`Row`] for information on when [`Row`] can be compared
550    ///
551    /// # Panics
552    ///
553    /// Panics if the schema of `columns` does not match that provided to [`RowConverter::new`]
554    pub fn convert_columns(&self, columns: &[ArrayRef]) -> Result<Rows, ArrowError> {
555        let num_rows = columns.first().map(|x| x.len()).unwrap_or(0);
556        let mut rows = self.empty_rows(num_rows, 0);
557        self.append(&mut rows, columns)?;
558        Ok(rows)
559    }
560
561    /// Convert [`ArrayRef`] columns appending to an existing [`Rows`]
562    ///
563    /// See [`Row`] for information on when [`Row`] can be compared
564    ///
565    /// # Panics
566    ///
567    /// Panics if
568    /// * The schema of `columns` does not match that provided to [`RowConverter::new`]
569    /// * The provided [`Rows`] were not created by this [`RowConverter`]
570    ///
571    /// ```
572    /// # use std::sync::Arc;
573    /// # use std::collections::HashSet;
574    /// # use arrow_array::cast::AsArray;
575    /// # use arrow_array::StringArray;
576    /// # use arrow_row::{Row, RowConverter, SortField};
577    /// # use arrow_schema::DataType;
578    /// #
579    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
580    /// let a1 = StringArray::from(vec!["hello", "world"]);
581    /// let a2 = StringArray::from(vec!["a", "a", "hello"]);
582    ///
583    /// let mut rows = converter.empty_rows(5, 128);
584    /// converter.append(&mut rows, &[Arc::new(a1)]).unwrap();
585    /// converter.append(&mut rows, &[Arc::new(a2)]).unwrap();
586    ///
587    /// let back = converter.convert_rows(&rows).unwrap();
588    /// let values: Vec<_> = back[0].as_string::<i32>().iter().map(Option::unwrap).collect();
589    /// assert_eq!(&values, &["hello", "world", "a", "a", "hello"]);
590    /// ```
591    pub fn append(&self, rows: &mut Rows, columns: &[ArrayRef]) -> Result<(), ArrowError> {
592        assert!(
593            Arc::ptr_eq(&rows.config.fields, &self.fields),
594            "rows were not produced by this RowConverter"
595        );
596
597        if columns.len() != self.fields.len() {
598            return Err(ArrowError::InvalidArgumentError(format!(
599                "Incorrect number of arrays provided to RowConverter, expected {} got {}",
600                self.fields.len(),
601                columns.len()
602            )));
603        }
604
605        let encoders = columns
606            .iter()
607            .zip(&self.codecs)
608            .zip(self.fields.iter())
609            .map(|((column, codec), field)| {
610                if !column.data_type().equals_datatype(&field.data_type) {
611                    return Err(ArrowError::InvalidArgumentError(format!(
612                        "RowConverter column schema mismatch, expected {} got {}",
613                        field.data_type,
614                        column.data_type()
615                    )));
616                }
617                codec.encoder(column.as_ref())
618            })
619            .collect::<Result<Vec<_>, _>>()?;
620
621        let write_offset = rows.num_rows();
622        let lengths = row_lengths(columns, &encoders);
623
624        // We initialize the offsets shifted down by one row index.
625        //
626        // As the rows are appended to the offsets will be incremented to match
627        //
628        // For example, consider the case of 3 rows of length 3, 4, and 6 respectively.
629        // The offsets would be initialized to `0, 0, 3, 7`
630        //
631        // Writing the first row entirely would yield `0, 3, 3, 7`
632        // The second, `0, 3, 7, 7`
633        // The third, `0, 3, 7, 13`
634        //
635        // This would be the final offsets for reading
636        //
637        // In this way offsets tracks the position during writing whilst eventually serving
638        // as identifying the offsets of the written rows
639        rows.offsets.reserve(lengths.len());
640        let mut cur_offset = rows.offsets[write_offset];
641        for l in lengths {
642            rows.offsets.push(cur_offset);
643            cur_offset = cur_offset.checked_add(l).expect("overflow");
644        }
645
646        // Note this will not zero out any trailing data in `rows.buffer`,
647        // e.g. resulting from a call to `Rows::clear`, relying instead on the
648        // encoders not assuming a zero-initialized buffer
649        rows.buffer.resize(cur_offset, 0);
650
651        for ((column, field), encoder) in columns.iter().zip(self.fields.iter()).zip(encoders) {
652            // We encode a column at a time to minimise dispatch overheads
653            encode_column(
654                &mut rows.buffer,
655                &mut rows.offsets[write_offset..],
656                column.as_ref(),
657                field.options,
658                &encoder,
659            )
660        }
661
662        if cfg!(debug_assertions) {
663            assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len());
664            rows.offsets
665                .windows(2)
666                .for_each(|w| assert!(w[0] <= w[1], "offsets should be monotonic"));
667        }
668
669        Ok(())
670    }
671
672    /// Convert [`Rows`] columns into [`ArrayRef`]
673    ///
674    /// # Panics
675    ///
676    /// Panics if the rows were not produced by this [`RowConverter`]
677    pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>, ArrowError>
678    where
679        I: IntoIterator<Item = Row<'a>>,
680    {
681        let mut validate_utf8 = false;
682        let mut rows: Vec<_> = rows
683            .into_iter()
684            .map(|row| {
685                assert!(
686                    Arc::ptr_eq(&row.config.fields, &self.fields),
687                    "rows were not produced by this RowConverter"
688                );
689                validate_utf8 |= row.config.validate_utf8;
690                row.data
691            })
692            .collect();
693
694        // SAFETY
695        // We have validated that the rows came from this [`RowConverter`]
696        // and therefore must be valid
697        unsafe { self.convert_raw(&mut rows, validate_utf8) }
698    }
699
700    /// Returns an empty [`Rows`] with capacity for `row_capacity` rows with
701    /// a total length of `data_capacity`
702    ///
703    /// This can be used to buffer a selection of [`Row`]
704    ///
705    /// ```
706    /// # use std::sync::Arc;
707    /// # use std::collections::HashSet;
708    /// # use arrow_array::cast::AsArray;
709    /// # use arrow_array::StringArray;
710    /// # use arrow_row::{Row, RowConverter, SortField};
711    /// # use arrow_schema::DataType;
712    /// #
713    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
714    /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
715    ///
716    /// // Convert to row format and deduplicate
717    /// let converted = converter.convert_columns(&[Arc::new(array)]).unwrap();
718    /// let mut distinct_rows = converter.empty_rows(3, 100);
719    /// let mut dedup: HashSet<Row> = HashSet::with_capacity(3);
720    /// converted.iter().filter(|row| dedup.insert(*row)).for_each(|row| distinct_rows.push(row));
721    ///
722    /// // Note: we could skip buffering and feed the filtered iterator directly
723    /// // into convert_rows, this is done for demonstration purposes only
724    /// let distinct = converter.convert_rows(&distinct_rows).unwrap();
725    /// let values: Vec<_> = distinct[0].as_string::<i32>().iter().map(Option::unwrap).collect();
726    /// assert_eq!(&values, &["hello", "world", "a"]);
727    /// ```
728    pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows {
729        let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1));
730        offsets.push(0);
731
732        Rows {
733            offsets,
734            buffer: Vec::with_capacity(data_capacity),
735            config: RowConfig {
736                fields: self.fields.clone(),
737                validate_utf8: false,
738            },
739        }
740    }
741
742    /// Create a new [Rows] instance from the given binary data.
743    ///
744    /// ```
745    /// # use std::sync::Arc;
746    /// # use std::collections::HashSet;
747    /// # use arrow_array::cast::AsArray;
748    /// # use arrow_array::StringArray;
749    /// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField};
750    /// # use arrow_schema::DataType;
751    /// #
752    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
753    /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
754    /// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
755    ///
756    /// // We can convert rows into binary format and back in batch.
757    /// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect();
758    /// let binary = rows.try_into_binary().expect("known-small array");
759    /// let converted = converter.from_binary(binary.clone());
760    /// assert!(converted.iter().eq(values.iter().map(|r| r.row())));
761    /// ```
762    ///
763    /// # Panics
764    ///
765    /// This function expects the passed [BinaryArray] to contain valid row data as produced by this
766    /// [RowConverter]. It will panic if any rows are null. Operations on the returned [Rows] may
767    /// panic if the data is malformed.
768    pub fn from_binary(&self, array: BinaryArray) -> Rows {
769        assert_eq!(
770            array.null_count(),
771            0,
772            "can't construct Rows instance from array with nulls"
773        );
774        Rows {
775            buffer: array.values().to_vec(),
776            offsets: array.offsets().iter().map(|&i| i.as_usize()).collect(),
777            config: RowConfig {
778                fields: Arc::clone(&self.fields),
779                validate_utf8: true,
780            },
781        }
782    }
783
784    /// Convert raw bytes into [`ArrayRef`]
785    ///
786    /// # Safety
787    ///
788    /// `rows` must contain valid data for this [`RowConverter`]
789    unsafe fn convert_raw(
790        &self,
791        rows: &mut [&[u8]],
792        validate_utf8: bool,
793    ) -> Result<Vec<ArrayRef>, ArrowError> {
794        self.fields
795            .iter()
796            .zip(&self.codecs)
797            .map(|(field, codec)| decode_column(field, rows, codec, validate_utf8))
798            .collect()
799    }
800
801    /// Returns a [`RowParser`] that can be used to parse [`Row`] from bytes
802    pub fn parser(&self) -> RowParser {
803        RowParser::new(Arc::clone(&self.fields))
804    }
805
806    /// Returns the size of this instance in bytes
807    ///
808    /// Includes the size of `Self`.
809    pub fn size(&self) -> usize {
810        std::mem::size_of::<Self>()
811            + self.fields.iter().map(|x| x.size()).sum::<usize>()
812            + self.codecs.capacity() * std::mem::size_of::<Codec>()
813            + self.codecs.iter().map(Codec::size).sum::<usize>()
814    }
815}
816
817/// A [`RowParser`] can be created from a [`RowConverter`] and used to parse bytes to [`Row`]
818#[derive(Debug)]
819pub struct RowParser {
820    config: RowConfig,
821}
822
823impl RowParser {
824    fn new(fields: Arc<[SortField]>) -> Self {
825        Self {
826            config: RowConfig {
827                fields,
828                validate_utf8: true,
829            },
830        }
831    }
832
833    /// Creates a [`Row`] from the provided `bytes`.
834    ///
835    /// `bytes` must be a [`Row`] produced by the [`RowConverter`] associated with
836    /// this [`RowParser`], otherwise subsequent operations with the produced [`Row`] may panic
837    pub fn parse<'a>(&'a self, bytes: &'a [u8]) -> Row<'a> {
838        Row {
839            data: bytes,
840            config: &self.config,
841        }
842    }
843}
844
845/// The config of a given set of [`Row`]
846#[derive(Debug, Clone)]
847struct RowConfig {
848    /// The schema for these rows
849    fields: Arc<[SortField]>,
850    /// Whether to run UTF-8 validation when converting to arrow arrays
851    validate_utf8: bool,
852}
853
854/// A row-oriented representation of arrow data, that is normalized for comparison.
855///
856/// See the [module level documentation](self) and [`RowConverter`] for more details.
857#[derive(Debug)]
858pub struct Rows {
859    /// Underlying row bytes
860    buffer: Vec<u8>,
861    /// Row `i` has data `&buffer[offsets[i]..offsets[i+1]]`
862    offsets: Vec<usize>,
863    /// The config for these rows
864    config: RowConfig,
865}
866
867impl Rows {
868    /// Append a [`Row`] to this [`Rows`]
869    pub fn push(&mut self, row: Row<'_>) {
870        assert!(
871            Arc::ptr_eq(&row.config.fields, &self.config.fields),
872            "row was not produced by this RowConverter"
873        );
874        self.config.validate_utf8 |= row.config.validate_utf8;
875        self.buffer.extend_from_slice(row.data);
876        self.offsets.push(self.buffer.len())
877    }
878
879    /// Returns the row at index `row`
880    pub fn row(&self, row: usize) -> Row<'_> {
881        assert!(row + 1 < self.offsets.len());
882        unsafe { self.row_unchecked(row) }
883    }
884
885    /// Returns the row at `index` without bounds checking
886    ///
887    /// # Safety
888    /// Caller must ensure that `index` is less than the number of offsets (#rows + 1)
889    pub unsafe fn row_unchecked(&self, index: usize) -> Row<'_> {
890        let end = unsafe { self.offsets.get_unchecked(index + 1) };
891        let start = unsafe { self.offsets.get_unchecked(index) };
892        let data = unsafe { self.buffer.get_unchecked(*start..*end) };
893        Row {
894            data,
895            config: &self.config,
896        }
897    }
898
899    /// Sets the length of this [`Rows`] to 0
900    pub fn clear(&mut self) {
901        self.offsets.truncate(1);
902        self.buffer.clear();
903    }
904
905    /// Returns the number of [`Row`] in this [`Rows`]
906    pub fn num_rows(&self) -> usize {
907        self.offsets.len() - 1
908    }
909
910    /// Returns an iterator over the [`Row`] in this [`Rows`]
911    pub fn iter(&self) -> RowsIter<'_> {
912        self.into_iter()
913    }
914
915    /// Returns the size of this instance in bytes
916    ///
917    /// Includes the size of `Self`.
918    pub fn size(&self) -> usize {
919        // Size of fields is accounted for as part of RowConverter
920        std::mem::size_of::<Self>()
921            + self.buffer.len()
922            + self.offsets.len() * std::mem::size_of::<usize>()
923    }
924
925    /// Create a [BinaryArray] from the [Rows] data without reallocating the
926    /// underlying bytes.
927    ///
928    ///
929    /// ```
930    /// # use std::sync::Arc;
931    /// # use std::collections::HashSet;
932    /// # use arrow_array::cast::AsArray;
933    /// # use arrow_array::StringArray;
934    /// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField};
935    /// # use arrow_schema::DataType;
936    /// #
937    /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
938    /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
939    /// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
940    ///
941    /// // We can convert rows into binary format and back.
942    /// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect();
943    /// let binary = rows.try_into_binary().expect("known-small array");
944    /// let parser = converter.parser();
945    /// let parsed: Vec<OwnedRow> =
946    ///   binary.iter().flatten().map(|b| parser.parse(b).owned()).collect();
947    /// assert_eq!(values, parsed);
948    /// ```
949    ///
950    /// # Errors
951    ///
952    /// This function will return an error if there is more data than can be stored in
953    /// a [BinaryArray] -- i.e. if the total data size is more than 2GiB.
954    pub fn try_into_binary(self) -> Result<BinaryArray, ArrowError> {
955        if self.buffer.len() > i32::MAX as usize {
956            return Err(ArrowError::InvalidArgumentError(format!(
957                "{}-byte rows buffer too long to convert into a i32-indexed BinaryArray",
958                self.buffer.len()
959            )));
960        }
961        // We've checked that the buffer length fits in an i32; so all offsets into that buffer should fit as well.
962        let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as));
963        // SAFETY: offsets buffer is nonempty, monotonically increasing, and all represent valid indexes into buffer.
964        let array = unsafe {
965            BinaryArray::new_unchecked(
966                OffsetBuffer::new_unchecked(offsets_scalar),
967                Buffer::from_vec(self.buffer),
968                None,
969            )
970        };
971        Ok(array)
972    }
973}
974
975impl<'a> IntoIterator for &'a Rows {
976    type Item = Row<'a>;
977    type IntoIter = RowsIter<'a>;
978
979    fn into_iter(self) -> Self::IntoIter {
980        RowsIter {
981            rows: self,
982            start: 0,
983            end: self.num_rows(),
984        }
985    }
986}
987
988/// An iterator over [`Rows`]
989#[derive(Debug)]
990pub struct RowsIter<'a> {
991    rows: &'a Rows,
992    start: usize,
993    end: usize,
994}
995
996impl<'a> Iterator for RowsIter<'a> {
997    type Item = Row<'a>;
998
999    fn next(&mut self) -> Option<Self::Item> {
1000        if self.end == self.start {
1001            return None;
1002        }
1003
1004        // SAFETY: We have checked that `start` is less than `end`
1005        let row = unsafe { self.rows.row_unchecked(self.start) };
1006        self.start += 1;
1007        Some(row)
1008    }
1009
1010    fn size_hint(&self) -> (usize, Option<usize>) {
1011        let len = self.len();
1012        (len, Some(len))
1013    }
1014}
1015
1016impl ExactSizeIterator for RowsIter<'_> {
1017    fn len(&self) -> usize {
1018        self.end - self.start
1019    }
1020}
1021
1022impl DoubleEndedIterator for RowsIter<'_> {
1023    fn next_back(&mut self) -> Option<Self::Item> {
1024        if self.end == self.start {
1025            return None;
1026        }
1027        // Safety: We have checked that `start` is less than `end`
1028        let row = unsafe { self.rows.row_unchecked(self.end) };
1029        self.end -= 1;
1030        Some(row)
1031    }
1032}
1033
1034/// A comparable representation of a row.
1035///
1036/// See the [module level documentation](self) for more details.
1037///
1038/// Two [`Row`] can only be compared if they both belong to [`Rows`]
1039/// returned by calls to [`RowConverter::convert_columns`] on the same
1040/// [`RowConverter`]. If different [`RowConverter`]s are used, any
1041/// ordering established by comparing the [`Row`] is arbitrary.
1042#[derive(Debug, Copy, Clone)]
1043pub struct Row<'a> {
1044    data: &'a [u8],
1045    config: &'a RowConfig,
1046}
1047
1048impl<'a> Row<'a> {
1049    /// Create owned version of the row to detach it from the shared [`Rows`].
1050    pub fn owned(&self) -> OwnedRow {
1051        OwnedRow {
1052            data: self.data.into(),
1053            config: self.config.clone(),
1054        }
1055    }
1056
1057    /// The row's bytes, with the lifetime of the underlying data.
1058    pub fn data(&self) -> &'a [u8] {
1059        self.data
1060    }
1061}
1062
1063// Manually derive these as don't wish to include `fields`
1064
1065impl PartialEq for Row<'_> {
1066    #[inline]
1067    fn eq(&self, other: &Self) -> bool {
1068        self.data.eq(other.data)
1069    }
1070}
1071
1072impl Eq for Row<'_> {}
1073
1074impl PartialOrd for Row<'_> {
1075    #[inline]
1076    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1077        Some(self.cmp(other))
1078    }
1079}
1080
1081impl Ord for Row<'_> {
1082    #[inline]
1083    fn cmp(&self, other: &Self) -> Ordering {
1084        self.data.cmp(other.data)
1085    }
1086}
1087
1088impl Hash for Row<'_> {
1089    #[inline]
1090    fn hash<H: Hasher>(&self, state: &mut H) {
1091        self.data.hash(state)
1092    }
1093}
1094
1095impl AsRef<[u8]> for Row<'_> {
1096    #[inline]
1097    fn as_ref(&self) -> &[u8] {
1098        self.data
1099    }
1100}
1101
1102/// Owned version of a [`Row`] that can be moved/cloned freely.
1103///
1104/// This contains the data for the one specific row (not the entire buffer of all rows).
1105#[derive(Debug, Clone)]
1106pub struct OwnedRow {
1107    data: Box<[u8]>,
1108    config: RowConfig,
1109}
1110
1111impl OwnedRow {
1112    /// Get borrowed [`Row`] from owned version.
1113    ///
1114    /// This is helpful if you want to compare an [`OwnedRow`] with a [`Row`].
1115    pub fn row(&self) -> Row<'_> {
1116        Row {
1117            data: &self.data,
1118            config: &self.config,
1119        }
1120    }
1121}
1122
1123// Manually derive these as don't wish to include `fields`. Also we just want to use the same `Row` implementations here.
1124
1125impl PartialEq for OwnedRow {
1126    #[inline]
1127    fn eq(&self, other: &Self) -> bool {
1128        self.row().eq(&other.row())
1129    }
1130}
1131
1132impl Eq for OwnedRow {}
1133
1134impl PartialOrd for OwnedRow {
1135    #[inline]
1136    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1137        Some(self.cmp(other))
1138    }
1139}
1140
1141impl Ord for OwnedRow {
1142    #[inline]
1143    fn cmp(&self, other: &Self) -> Ordering {
1144        self.row().cmp(&other.row())
1145    }
1146}
1147
1148impl Hash for OwnedRow {
1149    #[inline]
1150    fn hash<H: Hasher>(&self, state: &mut H) {
1151        self.row().hash(state)
1152    }
1153}
1154
1155impl AsRef<[u8]> for OwnedRow {
1156    #[inline]
1157    fn as_ref(&self) -> &[u8] {
1158        &self.data
1159    }
1160}
1161
1162/// Returns the null sentinel, negated if `invert` is true
1163#[inline]
1164fn null_sentinel(options: SortOptions) -> u8 {
1165    match options.nulls_first {
1166        true => 0,
1167        false => 0xFF,
1168    }
1169}
1170
1171/// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`]
1172fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> Vec<usize> {
1173    use fixed::FixedLengthEncoding;
1174
1175    let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
1176    let mut lengths = vec![0; num_rows];
1177
1178    for (array, encoder) in cols.iter().zip(encoders) {
1179        match encoder {
1180            Encoder::Stateless => {
1181                downcast_primitive_array! {
1182                    array => lengths.iter_mut().for_each(|x| *x += fixed::encoded_len(array)),
1183                    DataType::Null => {},
1184                    DataType::Boolean => lengths.iter_mut().for_each(|x| *x += bool::ENCODED_LEN),
1185                    DataType::Binary => as_generic_binary_array::<i32>(array)
1186                        .iter()
1187                        .zip(lengths.iter_mut())
1188                        .for_each(|(slice, length)| *length += variable::encoded_len(slice)),
1189                    DataType::LargeBinary => as_generic_binary_array::<i64>(array)
1190                        .iter()
1191                        .zip(lengths.iter_mut())
1192                        .for_each(|(slice, length)| *length += variable::encoded_len(slice)),
1193                    DataType::BinaryView => array.as_binary_view().iter().zip(lengths.iter_mut()).for_each(|(slice, length)| {
1194                            *length += variable::encoded_len(slice)
1195                        }),
1196                    DataType::Utf8 => array.as_string::<i32>()
1197                        .iter()
1198                        .zip(lengths.iter_mut())
1199                        .for_each(|(slice, length)| {
1200                            *length += variable::encoded_len(slice.map(|x| x.as_bytes()))
1201                        }),
1202                    DataType::LargeUtf8 => array.as_string::<i64>()
1203                        .iter()
1204                        .zip(lengths.iter_mut())
1205                        .for_each(|(slice, length)| {
1206                            *length += variable::encoded_len(slice.map(|x| x.as_bytes()))
1207                        }),
1208                    DataType::Utf8View => array.as_string_view().iter().zip(lengths.iter_mut()).for_each(|(slice, length)| {
1209                        *length += variable::encoded_len(slice.map(|x| x.as_bytes()))
1210                    }),
1211                    DataType::FixedSizeBinary(len) => {
1212                        let len = len.to_usize().unwrap();
1213                        lengths.iter_mut().for_each(|x| *x += 1 + len)
1214                    }
1215                    _ => unimplemented!("unsupported data type: {}", array.data_type()),
1216                }
1217            }
1218            Encoder::Dictionary(values, null) => {
1219                downcast_dictionary_array! {
1220                    array => {
1221                        for (v, length) in array.keys().iter().zip(lengths.iter_mut()) {
1222                            *length += match v {
1223                                Some(k) => values.row(k.as_usize()).data.len(),
1224                                None => null.data.len(),
1225                            }
1226                        }
1227                    }
1228                    _ => unreachable!(),
1229                }
1230            }
1231            Encoder::Struct(rows, null) => {
1232                let array = as_struct_array(array);
1233                lengths.iter_mut().enumerate().for_each(|(idx, length)| {
1234                    match array.is_valid(idx) {
1235                        true => *length += 1 + rows.row(idx).as_ref().len(),
1236                        false => *length += 1 + null.data.len(),
1237                    }
1238                });
1239            }
1240            Encoder::List(rows) => match array.data_type() {
1241                DataType::List(_) => {
1242                    list::compute_lengths(&mut lengths, rows, as_list_array(array))
1243                }
1244                DataType::LargeList(_) => {
1245                    list::compute_lengths(&mut lengths, rows, as_large_list_array(array))
1246                }
1247                _ => unreachable!(),
1248            },
1249        }
1250    }
1251
1252    lengths
1253}
1254
1255/// Encodes a column to the provided [`Rows`] incrementing the offsets as it progresses
1256fn encode_column(
1257    data: &mut [u8],
1258    offsets: &mut [usize],
1259    column: &dyn Array,
1260    opts: SortOptions,
1261    encoder: &Encoder<'_>,
1262) {
1263    match encoder {
1264        Encoder::Stateless => {
1265            downcast_primitive_array! {
1266                column => {
1267                    if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1268                        fixed::encode(data, offsets, column.values(), nulls, opts)
1269                    } else {
1270                        fixed::encode_not_null(data, offsets, column.values(), opts)
1271                    }
1272                }
1273                DataType::Null => {}
1274                DataType::Boolean => {
1275                    if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1276                        fixed::encode_boolean(data, offsets, column.as_boolean().values(), nulls, opts)
1277                    } else {
1278                        fixed::encode_boolean_not_null(data, offsets, column.as_boolean().values(), opts)
1279                    }
1280                }
1281                DataType::Binary => {
1282                    variable::encode(data, offsets, as_generic_binary_array::<i32>(column).iter(), opts)
1283                }
1284                DataType::BinaryView => {
1285                    variable::encode(data, offsets, column.as_binary_view().iter(), opts)
1286                }
1287                DataType::LargeBinary => {
1288                    variable::encode(data, offsets, as_generic_binary_array::<i64>(column).iter(), opts)
1289                }
1290                DataType::Utf8 => variable::encode(
1291                    data, offsets,
1292                    column.as_string::<i32>().iter().map(|x| x.map(|x| x.as_bytes())),
1293                    opts,
1294                ),
1295                DataType::LargeUtf8 => variable::encode(
1296                    data, offsets,
1297                    column.as_string::<i64>()
1298                        .iter()
1299                        .map(|x| x.map(|x| x.as_bytes())),
1300                    opts,
1301                ),
1302                DataType::Utf8View => variable::encode(
1303                    data, offsets,
1304                    column.as_string_view().iter().map(|x| x.map(|x| x.as_bytes())),
1305                    opts,
1306                ),
1307                DataType::FixedSizeBinary(_) => {
1308                    let array = column.as_any().downcast_ref().unwrap();
1309                    fixed::encode_fixed_size_binary(data, offsets, array, opts)
1310                }
1311                _ => unimplemented!("unsupported data type: {}", column.data_type()),
1312            }
1313        }
1314        Encoder::Dictionary(values, nulls) => {
1315            downcast_dictionary_array! {
1316                column => encode_dictionary_values(data, offsets, column, values, nulls),
1317                _ => unreachable!()
1318            }
1319        }
1320        Encoder::Struct(rows, null) => {
1321            let array = as_struct_array(column);
1322            let null_sentinel = null_sentinel(opts);
1323            offsets
1324                .iter_mut()
1325                .skip(1)
1326                .enumerate()
1327                .for_each(|(idx, offset)| {
1328                    let (row, sentinel) = match array.is_valid(idx) {
1329                        true => (rows.row(idx), 0x01),
1330                        false => (*null, null_sentinel),
1331                    };
1332                    let end_offset = *offset + 1 + row.as_ref().len();
1333                    data[*offset] = sentinel;
1334                    data[*offset + 1..end_offset].copy_from_slice(row.as_ref());
1335                    *offset = end_offset;
1336                })
1337        }
1338        Encoder::List(rows) => match column.data_type() {
1339            DataType::List(_) => list::encode(data, offsets, rows, opts, as_list_array(column)),
1340            DataType::LargeList(_) => {
1341                list::encode(data, offsets, rows, opts, as_large_list_array(column))
1342            }
1343            _ => unreachable!(),
1344        },
1345    }
1346}
1347
1348/// Encode dictionary values not preserving the dictionary encoding
1349pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
1350    data: &mut [u8],
1351    offsets: &mut [usize],
1352    column: &DictionaryArray<K>,
1353    values: &Rows,
1354    null: &Row<'_>,
1355) {
1356    for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
1357        let row = match k {
1358            Some(k) => values.row(k.as_usize()).data,
1359            None => null.data,
1360        };
1361        let end_offset = *offset + row.len();
1362        data[*offset..end_offset].copy_from_slice(row);
1363        *offset = end_offset;
1364    }
1365}
1366
1367macro_rules! decode_primitive_helper {
1368    ($t:ty, $rows:ident, $data_type:ident, $options:ident) => {
1369        Arc::new(decode_primitive::<$t>($rows, $data_type, $options))
1370    };
1371}
1372
1373/// Decodes a the provided `field` from `rows`
1374///
1375/// # Safety
1376///
1377/// Rows must contain valid data for the provided field
1378unsafe fn decode_column(
1379    field: &SortField,
1380    rows: &mut [&[u8]],
1381    codec: &Codec,
1382    validate_utf8: bool,
1383) -> Result<ArrayRef, ArrowError> {
1384    let options = field.options;
1385
1386    let array: ArrayRef = match codec {
1387        Codec::Stateless => {
1388            let data_type = field.data_type.clone();
1389            downcast_primitive! {
1390                data_type => (decode_primitive_helper, rows, data_type, options),
1391                DataType::Null => Arc::new(NullArray::new(rows.len())),
1392                DataType::Boolean => Arc::new(decode_bool(rows, options)),
1393                DataType::Binary => Arc::new(decode_binary::<i32>(rows, options)),
1394                DataType::LargeBinary => Arc::new(decode_binary::<i64>(rows, options)),
1395                DataType::BinaryView => Arc::new(decode_binary_view(rows, options)),
1396                DataType::FixedSizeBinary(size) => Arc::new(decode_fixed_size_binary(rows, size, options)),
1397                DataType::Utf8 => Arc::new(decode_string::<i32>(rows, options, validate_utf8)),
1398                DataType::LargeUtf8 => Arc::new(decode_string::<i64>(rows, options, validate_utf8)),
1399                DataType::Utf8View => Arc::new(decode_string_view(rows, options, validate_utf8)),
1400                _ => return Err(ArrowError::NotYetImplemented(format!("unsupported data type: {}", data_type)))
1401            }
1402        }
1403        Codec::Dictionary(converter, _) => {
1404            let cols = converter.convert_raw(rows, validate_utf8)?;
1405            cols.into_iter().next().unwrap()
1406        }
1407        Codec::Struct(converter, _) => {
1408            let (null_count, nulls) = fixed::decode_nulls(rows);
1409            rows.iter_mut().for_each(|row| *row = &row[1..]);
1410            let children = converter.convert_raw(rows, validate_utf8)?;
1411
1412            let child_data = children.iter().map(|c| c.to_data()).collect();
1413            let builder = ArrayDataBuilder::new(field.data_type.clone())
1414                .len(rows.len())
1415                .null_count(null_count)
1416                .null_bit_buffer(Some(nulls))
1417                .child_data(child_data);
1418
1419            Arc::new(StructArray::from(builder.build_unchecked()))
1420        }
1421        Codec::List(converter) => match &field.data_type {
1422            DataType::List(_) => {
1423                Arc::new(list::decode::<i32>(converter, rows, field, validate_utf8)?)
1424            }
1425            DataType::LargeList(_) => {
1426                Arc::new(list::decode::<i64>(converter, rows, field, validate_utf8)?)
1427            }
1428            _ => unreachable!(),
1429        },
1430    };
1431    Ok(array)
1432}
1433
1434#[cfg(test)]
1435mod tests {
1436    use rand::distributions::uniform::SampleUniform;
1437    use rand::distributions::{Distribution, Standard};
1438    use rand::{thread_rng, Rng};
1439
1440    use arrow_array::builder::*;
1441    use arrow_array::types::*;
1442    use arrow_array::*;
1443    use arrow_buffer::{i256, NullBuffer};
1444    use arrow_buffer::{Buffer, OffsetBuffer};
1445    use arrow_cast::display::{ArrayFormatter, FormatOptions};
1446    use arrow_ord::sort::{LexicographicalComparator, SortColumn};
1447
1448    use super::*;
1449
1450    #[test]
1451    fn test_fixed_width() {
1452        let cols = [
1453            Arc::new(Int16Array::from_iter([
1454                Some(1),
1455                Some(2),
1456                None,
1457                Some(-5),
1458                Some(2),
1459                Some(2),
1460                Some(0),
1461            ])) as ArrayRef,
1462            Arc::new(Float32Array::from_iter([
1463                Some(1.3),
1464                Some(2.5),
1465                None,
1466                Some(4.),
1467                Some(0.1),
1468                Some(-4.),
1469                Some(-0.),
1470            ])) as ArrayRef,
1471        ];
1472
1473        let converter = RowConverter::new(vec![
1474            SortField::new(DataType::Int16),
1475            SortField::new(DataType::Float32),
1476        ])
1477        .unwrap();
1478        let rows = converter.convert_columns(&cols).unwrap();
1479
1480        assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]);
1481        assert_eq!(
1482            rows.buffer,
1483            &[
1484                1, 128, 1, //
1485                1, 191, 166, 102, 102, //
1486                1, 128, 2, //
1487                1, 192, 32, 0, 0, //
1488                0, 0, 0, //
1489                0, 0, 0, 0, 0, //
1490                1, 127, 251, //
1491                1, 192, 128, 0, 0, //
1492                1, 128, 2, //
1493                1, 189, 204, 204, 205, //
1494                1, 128, 2, //
1495                1, 63, 127, 255, 255, //
1496                1, 128, 0, //
1497                1, 127, 255, 255, 255 //
1498            ]
1499        );
1500
1501        assert!(rows.row(3) < rows.row(6));
1502        assert!(rows.row(0) < rows.row(1));
1503        assert!(rows.row(3) < rows.row(0));
1504        assert!(rows.row(4) < rows.row(1));
1505        assert!(rows.row(5) < rows.row(4));
1506
1507        let back = converter.convert_rows(&rows).unwrap();
1508        for (expected, actual) in cols.iter().zip(&back) {
1509            assert_eq!(expected, actual);
1510        }
1511    }
1512
1513    #[test]
1514    fn test_decimal128() {
1515        let converter = RowConverter::new(vec![SortField::new(DataType::Decimal128(
1516            DECIMAL128_MAX_PRECISION,
1517            7,
1518        ))])
1519        .unwrap();
1520        let col = Arc::new(
1521            Decimal128Array::from_iter([
1522                None,
1523                Some(i128::MIN),
1524                Some(-13),
1525                Some(46_i128),
1526                Some(5456_i128),
1527                Some(i128::MAX),
1528            ])
1529            .with_precision_and_scale(38, 7)
1530            .unwrap(),
1531        ) as ArrayRef;
1532
1533        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1534        for i in 0..rows.num_rows() - 1 {
1535            assert!(rows.row(i) < rows.row(i + 1));
1536        }
1537
1538        let back = converter.convert_rows(&rows).unwrap();
1539        assert_eq!(back.len(), 1);
1540        assert_eq!(col.as_ref(), back[0].as_ref())
1541    }
1542
1543    #[test]
1544    fn test_decimal256() {
1545        let converter = RowConverter::new(vec![SortField::new(DataType::Decimal256(
1546            DECIMAL256_MAX_PRECISION,
1547            7,
1548        ))])
1549        .unwrap();
1550        let col = Arc::new(
1551            Decimal256Array::from_iter([
1552                None,
1553                Some(i256::MIN),
1554                Some(i256::from_parts(0, -1)),
1555                Some(i256::from_parts(u128::MAX, -1)),
1556                Some(i256::from_parts(u128::MAX, 0)),
1557                Some(i256::from_parts(0, 46_i128)),
1558                Some(i256::from_parts(5, 46_i128)),
1559                Some(i256::MAX),
1560            ])
1561            .with_precision_and_scale(DECIMAL256_MAX_PRECISION, 7)
1562            .unwrap(),
1563        ) as ArrayRef;
1564
1565        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1566        for i in 0..rows.num_rows() - 1 {
1567            assert!(rows.row(i) < rows.row(i + 1));
1568        }
1569
1570        let back = converter.convert_rows(&rows).unwrap();
1571        assert_eq!(back.len(), 1);
1572        assert_eq!(col.as_ref(), back[0].as_ref())
1573    }
1574
1575    #[test]
1576    fn test_bool() {
1577        let converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]).unwrap();
1578
1579        let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])) as ArrayRef;
1580
1581        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1582        assert!(rows.row(2) > rows.row(1));
1583        assert!(rows.row(2) > rows.row(0));
1584        assert!(rows.row(1) > rows.row(0));
1585
1586        let cols = converter.convert_rows(&rows).unwrap();
1587        assert_eq!(&cols[0], &col);
1588
1589        let converter = RowConverter::new(vec![SortField::new_with_options(
1590            DataType::Boolean,
1591            SortOptions::default().desc().with_nulls_first(false),
1592        )])
1593        .unwrap();
1594
1595        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1596        assert!(rows.row(2) < rows.row(1));
1597        assert!(rows.row(2) < rows.row(0));
1598        assert!(rows.row(1) < rows.row(0));
1599        let cols = converter.convert_rows(&rows).unwrap();
1600        assert_eq!(&cols[0], &col);
1601    }
1602
1603    #[test]
1604    fn test_timezone() {
1605        let a =
1606            TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]).with_timezone("+01:00".to_string());
1607        let d = a.data_type().clone();
1608
1609        let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
1610        let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
1611        let back = converter.convert_rows(&rows).unwrap();
1612        assert_eq!(back.len(), 1);
1613        assert_eq!(back[0].data_type(), &d);
1614
1615        // Test dictionary
1616        let mut a = PrimitiveDictionaryBuilder::<Int32Type, TimestampNanosecondType>::new();
1617        a.append(34).unwrap();
1618        a.append_null();
1619        a.append(345).unwrap();
1620
1621        // Construct dictionary with a timezone
1622        let dict = a.finish();
1623        let values = TimestampNanosecondArray::from(dict.values().to_data());
1624        let dict_with_tz = dict.with_values(Arc::new(values.with_timezone("+02:00")));
1625        let v = DataType::Timestamp(TimeUnit::Nanosecond, Some("+02:00".into()));
1626        let d = DataType::Dictionary(Box::new(DataType::Int32), Box::new(v.clone()));
1627
1628        assert_eq!(dict_with_tz.data_type(), &d);
1629        let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
1630        let rows = converter
1631            .convert_columns(&[Arc::new(dict_with_tz) as _])
1632            .unwrap();
1633        let back = converter.convert_rows(&rows).unwrap();
1634        assert_eq!(back.len(), 1);
1635        assert_eq!(back[0].data_type(), &v);
1636    }
1637
1638    #[test]
1639    fn test_null_encoding() {
1640        let col = Arc::new(NullArray::new(10));
1641        let converter = RowConverter::new(vec![SortField::new(DataType::Null)]).unwrap();
1642        let rows = converter.convert_columns(&[col]).unwrap();
1643        assert_eq!(rows.num_rows(), 10);
1644        assert_eq!(rows.row(1).data.len(), 0);
1645    }
1646
1647    #[test]
1648    fn test_variable_width() {
1649        let col = Arc::new(StringArray::from_iter([
1650            Some("hello"),
1651            Some("he"),
1652            None,
1653            Some("foo"),
1654            Some(""),
1655        ])) as ArrayRef;
1656
1657        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1658        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1659
1660        assert!(rows.row(1) < rows.row(0));
1661        assert!(rows.row(2) < rows.row(4));
1662        assert!(rows.row(3) < rows.row(0));
1663        assert!(rows.row(3) < rows.row(1));
1664
1665        let cols = converter.convert_rows(&rows).unwrap();
1666        assert_eq!(&cols[0], &col);
1667
1668        let col = Arc::new(BinaryArray::from_iter([
1669            None,
1670            Some(vec![0_u8; 0]),
1671            Some(vec![0_u8; 6]),
1672            Some(vec![0_u8; variable::MINI_BLOCK_SIZE]),
1673            Some(vec![0_u8; variable::MINI_BLOCK_SIZE + 1]),
1674            Some(vec![0_u8; variable::BLOCK_SIZE]),
1675            Some(vec![0_u8; variable::BLOCK_SIZE + 1]),
1676            Some(vec![1_u8; 6]),
1677            Some(vec![1_u8; variable::MINI_BLOCK_SIZE]),
1678            Some(vec![1_u8; variable::MINI_BLOCK_SIZE + 1]),
1679            Some(vec![1_u8; variable::BLOCK_SIZE]),
1680            Some(vec![1_u8; variable::BLOCK_SIZE + 1]),
1681            Some(vec![0xFF_u8; 6]),
1682            Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE]),
1683            Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE + 1]),
1684            Some(vec![0xFF_u8; variable::BLOCK_SIZE]),
1685            Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]),
1686        ])) as ArrayRef;
1687
1688        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
1689        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1690
1691        for i in 0..rows.num_rows() {
1692            for j in i + 1..rows.num_rows() {
1693                assert!(
1694                    rows.row(i) < rows.row(j),
1695                    "{} < {} - {:?} < {:?}",
1696                    i,
1697                    j,
1698                    rows.row(i),
1699                    rows.row(j)
1700                );
1701            }
1702        }
1703
1704        let cols = converter.convert_rows(&rows).unwrap();
1705        assert_eq!(&cols[0], &col);
1706
1707        let converter = RowConverter::new(vec![SortField::new_with_options(
1708            DataType::Binary,
1709            SortOptions::default().desc().with_nulls_first(false),
1710        )])
1711        .unwrap();
1712        let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1713
1714        for i in 0..rows.num_rows() {
1715            for j in i + 1..rows.num_rows() {
1716                assert!(
1717                    rows.row(i) > rows.row(j),
1718                    "{} > {} - {:?} > {:?}",
1719                    i,
1720                    j,
1721                    rows.row(i),
1722                    rows.row(j)
1723                );
1724            }
1725        }
1726
1727        let cols = converter.convert_rows(&rows).unwrap();
1728        assert_eq!(&cols[0], &col);
1729    }
1730
1731    /// If `exact` is false performs a logical comparison between a and dictionary-encoded b
1732    fn dictionary_eq(a: &dyn Array, b: &dyn Array) {
1733        match b.data_type() {
1734            DataType::Dictionary(_, v) => {
1735                assert_eq!(a.data_type(), v.as_ref());
1736                let b = arrow_cast::cast(b, v).unwrap();
1737                assert_eq!(a, b.as_ref())
1738            }
1739            _ => assert_eq!(a, b),
1740        }
1741    }
1742
1743    #[test]
1744    fn test_string_dictionary() {
1745        let a = Arc::new(DictionaryArray::<Int32Type>::from_iter([
1746            Some("foo"),
1747            Some("hello"),
1748            Some("he"),
1749            None,
1750            Some("hello"),
1751            Some(""),
1752            Some("hello"),
1753            Some("hello"),
1754        ])) as ArrayRef;
1755
1756        let field = SortField::new(a.data_type().clone());
1757        let converter = RowConverter::new(vec![field]).unwrap();
1758        let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
1759
1760        assert!(rows_a.row(3) < rows_a.row(5));
1761        assert!(rows_a.row(2) < rows_a.row(1));
1762        assert!(rows_a.row(0) < rows_a.row(1));
1763        assert!(rows_a.row(3) < rows_a.row(0));
1764
1765        assert_eq!(rows_a.row(1), rows_a.row(4));
1766        assert_eq!(rows_a.row(1), rows_a.row(6));
1767        assert_eq!(rows_a.row(1), rows_a.row(7));
1768
1769        let cols = converter.convert_rows(&rows_a).unwrap();
1770        dictionary_eq(&cols[0], &a);
1771
1772        let b = Arc::new(DictionaryArray::<Int32Type>::from_iter([
1773            Some("hello"),
1774            None,
1775            Some("cupcakes"),
1776        ])) as ArrayRef;
1777
1778        let rows_b = converter.convert_columns(&[Arc::clone(&b)]).unwrap();
1779        assert_eq!(rows_a.row(1), rows_b.row(0));
1780        assert_eq!(rows_a.row(3), rows_b.row(1));
1781        assert!(rows_b.row(2) < rows_a.row(0));
1782
1783        let cols = converter.convert_rows(&rows_b).unwrap();
1784        dictionary_eq(&cols[0], &b);
1785
1786        let converter = RowConverter::new(vec![SortField::new_with_options(
1787            a.data_type().clone(),
1788            SortOptions::default().desc().with_nulls_first(false),
1789        )])
1790        .unwrap();
1791
1792        let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
1793        assert!(rows_c.row(3) > rows_c.row(5));
1794        assert!(rows_c.row(2) > rows_c.row(1));
1795        assert!(rows_c.row(0) > rows_c.row(1));
1796        assert!(rows_c.row(3) > rows_c.row(0));
1797
1798        let cols = converter.convert_rows(&rows_c).unwrap();
1799        dictionary_eq(&cols[0], &a);
1800
1801        let converter = RowConverter::new(vec![SortField::new_with_options(
1802            a.data_type().clone(),
1803            SortOptions::default().desc().with_nulls_first(true),
1804        )])
1805        .unwrap();
1806
1807        let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
1808        assert!(rows_c.row(3) < rows_c.row(5));
1809        assert!(rows_c.row(2) > rows_c.row(1));
1810        assert!(rows_c.row(0) > rows_c.row(1));
1811        assert!(rows_c.row(3) < rows_c.row(0));
1812
1813        let cols = converter.convert_rows(&rows_c).unwrap();
1814        dictionary_eq(&cols[0], &a);
1815    }
1816
1817    #[test]
1818    fn test_struct() {
1819        // Test basic
1820        let a = Arc::new(Int32Array::from(vec![1, 1, 2, 2])) as ArrayRef;
1821        let a_f = Arc::new(Field::new("int", DataType::Int32, false));
1822        let u = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as ArrayRef;
1823        let u_f = Arc::new(Field::new("s", DataType::Utf8, false));
1824        let s1 = Arc::new(StructArray::from(vec![(a_f, a), (u_f, u)])) as ArrayRef;
1825
1826        let sort_fields = vec![SortField::new(s1.data_type().clone())];
1827        let converter = RowConverter::new(sort_fields).unwrap();
1828        let r1 = converter.convert_columns(&[Arc::clone(&s1)]).unwrap();
1829
1830        for (a, b) in r1.iter().zip(r1.iter().skip(1)) {
1831            assert!(a < b);
1832        }
1833
1834        let back = converter.convert_rows(&r1).unwrap();
1835        assert_eq!(back.len(), 1);
1836        assert_eq!(&back[0], &s1);
1837
1838        // Test struct nullability
1839        let data = s1
1840            .to_data()
1841            .into_builder()
1842            .null_bit_buffer(Some(Buffer::from_slice_ref([0b00001010])))
1843            .null_count(2)
1844            .build()
1845            .unwrap();
1846
1847        let s2 = Arc::new(StructArray::from(data)) as ArrayRef;
1848        let r2 = converter.convert_columns(&[Arc::clone(&s2)]).unwrap();
1849        assert_eq!(r2.row(0), r2.row(2)); // Nulls equal
1850        assert!(r2.row(0) < r2.row(1)); // Nulls first
1851        assert_ne!(r1.row(0), r2.row(0)); // Value does not equal null
1852        assert_eq!(r1.row(1), r2.row(1)); // Values equal
1853
1854        let back = converter.convert_rows(&r2).unwrap();
1855        assert_eq!(back.len(), 1);
1856        assert_eq!(&back[0], &s2);
1857
1858        back[0].to_data().validate_full().unwrap();
1859    }
1860
1861    #[test]
1862    fn test_primitive_dictionary() {
1863        let mut builder = PrimitiveDictionaryBuilder::<Int32Type, Int32Type>::new();
1864        builder.append(2).unwrap();
1865        builder.append(3).unwrap();
1866        builder.append(0).unwrap();
1867        builder.append_null();
1868        builder.append(5).unwrap();
1869        builder.append(3).unwrap();
1870        builder.append(-1).unwrap();
1871
1872        let a = builder.finish();
1873        let data_type = a.data_type().clone();
1874        let columns = [Arc::new(a) as ArrayRef];
1875
1876        let field = SortField::new(data_type.clone());
1877        let converter = RowConverter::new(vec![field]).unwrap();
1878        let rows = converter.convert_columns(&columns).unwrap();
1879        assert!(rows.row(0) < rows.row(1));
1880        assert!(rows.row(2) < rows.row(0));
1881        assert!(rows.row(3) < rows.row(2));
1882        assert!(rows.row(6) < rows.row(2));
1883        assert!(rows.row(3) < rows.row(6));
1884    }
1885
1886    #[test]
1887    fn test_dictionary_nulls() {
1888        let values = Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data();
1889        let keys =
1890            Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]).into_data();
1891
1892        let data_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32));
1893        let data = keys
1894            .into_builder()
1895            .data_type(data_type.clone())
1896            .child_data(vec![values])
1897            .build()
1898            .unwrap();
1899
1900        let columns = [Arc::new(DictionaryArray::<Int32Type>::from(data)) as ArrayRef];
1901        let field = SortField::new(data_type.clone());
1902        let converter = RowConverter::new(vec![field]).unwrap();
1903        let rows = converter.convert_columns(&columns).unwrap();
1904
1905        assert_eq!(rows.row(0), rows.row(1));
1906        assert_eq!(rows.row(3), rows.row(4));
1907        assert_eq!(rows.row(4), rows.row(5));
1908        assert!(rows.row(3) < rows.row(0));
1909    }
1910
1911    #[test]
1912    #[should_panic(expected = "Encountered non UTF-8 data")]
1913    fn test_invalid_utf8() {
1914        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
1915        let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
1916        let rows = converter.convert_columns(&[array]).unwrap();
1917        let binary_row = rows.row(0);
1918
1919        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1920        let parser = converter.parser();
1921        let utf8_row = parser.parse(binary_row.as_ref());
1922
1923        converter.convert_rows(std::iter::once(utf8_row)).unwrap();
1924    }
1925
1926    #[test]
1927    #[should_panic(expected = "Encountered non UTF-8 data")]
1928    fn test_invalid_utf8_array() {
1929        let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
1930        let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
1931        let rows = converter.convert_columns(&[array]).unwrap();
1932        let binary_rows = rows.try_into_binary().expect("known-small rows");
1933
1934        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1935        let parsed = converter.from_binary(binary_rows);
1936
1937        converter.convert_rows(parsed.iter()).unwrap();
1938    }
1939
1940    #[test]
1941    #[should_panic(expected = "index out of bounds")]
1942    fn test_invalid_empty() {
1943        let binary_row: &[u8] = &[];
1944
1945        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1946        let parser = converter.parser();
1947        let utf8_row = parser.parse(binary_row.as_ref());
1948
1949        converter.convert_rows(std::iter::once(utf8_row)).unwrap();
1950    }
1951
1952    #[test]
1953    #[should_panic(expected = "index out of bounds")]
1954    fn test_invalid_empty_array() {
1955        let row: &[u8] = &[];
1956        let binary_rows = BinaryArray::from(vec![row]);
1957
1958        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1959        let parsed = converter.from_binary(binary_rows);
1960
1961        converter.convert_rows(parsed.iter()).unwrap();
1962    }
1963
1964    #[test]
1965    #[should_panic(expected = "index out of bounds")]
1966    fn test_invalid_truncated() {
1967        let binary_row: &[u8] = &[0x02];
1968
1969        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1970        let parser = converter.parser();
1971        let utf8_row = parser.parse(binary_row.as_ref());
1972
1973        converter.convert_rows(std::iter::once(utf8_row)).unwrap();
1974    }
1975
1976    #[test]
1977    #[should_panic(expected = "index out of bounds")]
1978    fn test_invalid_truncated_array() {
1979        let row: &[u8] = &[0x02];
1980        let binary_rows = BinaryArray::from(vec![row]);
1981
1982        let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1983        let parsed = converter.from_binary(binary_rows);
1984
1985        converter.convert_rows(parsed.iter()).unwrap();
1986    }
1987
1988    #[test]
1989    #[should_panic(expected = "rows were not produced by this RowConverter")]
1990    fn test_different_converter() {
1991        let values = Arc::new(Int32Array::from_iter([Some(1), Some(-1)]));
1992        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
1993        let rows = converter.convert_columns(&[values]).unwrap();
1994
1995        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
1996        let _ = converter.convert_rows(&rows);
1997    }
1998
1999    fn test_single_list<O: OffsetSizeTrait>() {
2000        let mut builder = GenericListBuilder::<O, _>::new(Int32Builder::new());
2001        builder.values().append_value(32);
2002        builder.values().append_value(52);
2003        builder.values().append_value(32);
2004        builder.append(true);
2005        builder.values().append_value(32);
2006        builder.values().append_value(52);
2007        builder.values().append_value(12);
2008        builder.append(true);
2009        builder.values().append_value(32);
2010        builder.values().append_value(52);
2011        builder.append(true);
2012        builder.values().append_value(32); // MASKED
2013        builder.values().append_value(52); // MASKED
2014        builder.append(false);
2015        builder.values().append_value(32);
2016        builder.values().append_null();
2017        builder.append(true);
2018        builder.append(true);
2019
2020        let list = Arc::new(builder.finish()) as ArrayRef;
2021        let d = list.data_type().clone();
2022
2023        let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2024
2025        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2026        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
2027        assert!(rows.row(2) < rows.row(1)); // [32, 42] < [32, 52, 12]
2028        assert!(rows.row(3) < rows.row(2)); // null < [32, 42]
2029        assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 42]
2030        assert!(rows.row(5) < rows.row(2)); // [] < [32, 42]
2031        assert!(rows.row(3) < rows.row(5)); // null < []
2032
2033        let back = converter.convert_rows(&rows).unwrap();
2034        assert_eq!(back.len(), 1);
2035        back[0].to_data().validate_full().unwrap();
2036        assert_eq!(&back[0], &list);
2037
2038        let options = SortOptions::default().asc().with_nulls_first(false);
2039        let field = SortField::new_with_options(d.clone(), options);
2040        let converter = RowConverter::new(vec![field]).unwrap();
2041        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2042
2043        assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12]
2044        assert!(rows.row(2) < rows.row(1)); // [32, 42] < [32, 52, 12]
2045        assert!(rows.row(3) > rows.row(2)); // null > [32, 42]
2046        assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 42]
2047        assert!(rows.row(5) < rows.row(2)); // [] < [32, 42]
2048        assert!(rows.row(3) > rows.row(5)); // null > []
2049
2050        let back = converter.convert_rows(&rows).unwrap();
2051        assert_eq!(back.len(), 1);
2052        back[0].to_data().validate_full().unwrap();
2053        assert_eq!(&back[0], &list);
2054
2055        let options = SortOptions::default().desc().with_nulls_first(false);
2056        let field = SortField::new_with_options(d.clone(), options);
2057        let converter = RowConverter::new(vec![field]).unwrap();
2058        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2059
2060        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
2061        assert!(rows.row(2) > rows.row(1)); // [32, 42] > [32, 52, 12]
2062        assert!(rows.row(3) > rows.row(2)); // null > [32, 42]
2063        assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 42]
2064        assert!(rows.row(5) > rows.row(2)); // [] > [32, 42]
2065        assert!(rows.row(3) > rows.row(5)); // null > []
2066
2067        let back = converter.convert_rows(&rows).unwrap();
2068        assert_eq!(back.len(), 1);
2069        back[0].to_data().validate_full().unwrap();
2070        assert_eq!(&back[0], &list);
2071
2072        let options = SortOptions::default().desc().with_nulls_first(true);
2073        let field = SortField::new_with_options(d, options);
2074        let converter = RowConverter::new(vec![field]).unwrap();
2075        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2076
2077        assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12]
2078        assert!(rows.row(2) > rows.row(1)); // [32, 42] > [32, 52, 12]
2079        assert!(rows.row(3) < rows.row(2)); // null < [32, 42]
2080        assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 42]
2081        assert!(rows.row(5) > rows.row(2)); // [] > [32, 42]
2082        assert!(rows.row(3) < rows.row(5)); // null < []
2083
2084        let back = converter.convert_rows(&rows).unwrap();
2085        assert_eq!(back.len(), 1);
2086        back[0].to_data().validate_full().unwrap();
2087        assert_eq!(&back[0], &list);
2088    }
2089
2090    fn test_nested_list<O: OffsetSizeTrait>() {
2091        let mut builder =
2092            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(Int32Builder::new()));
2093
2094        builder.values().values().append_value(1);
2095        builder.values().values().append_value(2);
2096        builder.values().append(true);
2097        builder.values().values().append_value(1);
2098        builder.values().values().append_null();
2099        builder.values().append(true);
2100        builder.append(true);
2101
2102        builder.values().values().append_value(1);
2103        builder.values().values().append_null();
2104        builder.values().append(true);
2105        builder.values().values().append_value(1);
2106        builder.values().values().append_null();
2107        builder.values().append(true);
2108        builder.append(true);
2109
2110        builder.values().values().append_value(1);
2111        builder.values().values().append_null();
2112        builder.values().append(true);
2113        builder.values().append(false);
2114        builder.append(true);
2115        builder.append(false);
2116
2117        builder.values().values().append_value(1);
2118        builder.values().values().append_value(2);
2119        builder.values().append(true);
2120        builder.append(true);
2121
2122        let list = Arc::new(builder.finish()) as ArrayRef;
2123        let d = list.data_type().clone();
2124
2125        // [
2126        //   [[1, 2], [1, null]],
2127        //   [[1, null], [1, null]],
2128        //   [[1, null], null]
2129        //   null
2130        //   [[1, 2]]
2131        // ]
2132        let options = SortOptions::default().asc().with_nulls_first(true);
2133        let field = SortField::new_with_options(d.clone(), options);
2134        let converter = RowConverter::new(vec![field]).unwrap();
2135        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2136
2137        assert!(rows.row(0) > rows.row(1));
2138        assert!(rows.row(1) > rows.row(2));
2139        assert!(rows.row(2) > rows.row(3));
2140        assert!(rows.row(4) < rows.row(0));
2141        assert!(rows.row(4) > rows.row(1));
2142
2143        let back = converter.convert_rows(&rows).unwrap();
2144        assert_eq!(back.len(), 1);
2145        back[0].to_data().validate_full().unwrap();
2146        assert_eq!(&back[0], &list);
2147
2148        let options = SortOptions::default().desc().with_nulls_first(true);
2149        let field = SortField::new_with_options(d.clone(), options);
2150        let converter = RowConverter::new(vec![field]).unwrap();
2151        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2152
2153        assert!(rows.row(0) > rows.row(1));
2154        assert!(rows.row(1) > rows.row(2));
2155        assert!(rows.row(2) > rows.row(3));
2156        assert!(rows.row(4) > rows.row(0));
2157        assert!(rows.row(4) > rows.row(1));
2158
2159        let back = converter.convert_rows(&rows).unwrap();
2160        assert_eq!(back.len(), 1);
2161        back[0].to_data().validate_full().unwrap();
2162        assert_eq!(&back[0], &list);
2163
2164        let options = SortOptions::default().desc().with_nulls_first(false);
2165        let field = SortField::new_with_options(d, options);
2166        let converter = RowConverter::new(vec![field]).unwrap();
2167        let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2168
2169        assert!(rows.row(0) < rows.row(1));
2170        assert!(rows.row(1) < rows.row(2));
2171        assert!(rows.row(2) < rows.row(3));
2172        assert!(rows.row(4) > rows.row(0));
2173        assert!(rows.row(4) < rows.row(1));
2174
2175        let back = converter.convert_rows(&rows).unwrap();
2176        assert_eq!(back.len(), 1);
2177        back[0].to_data().validate_full().unwrap();
2178        assert_eq!(&back[0], &list);
2179    }
2180
2181    #[test]
2182    fn test_list() {
2183        test_single_list::<i32>();
2184        test_nested_list::<i32>();
2185    }
2186
2187    #[test]
2188    fn test_large_list() {
2189        test_single_list::<i64>();
2190        test_nested_list::<i64>();
2191    }
2192
2193    fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K>
2194    where
2195        K: ArrowPrimitiveType,
2196        Standard: Distribution<K::Native>,
2197    {
2198        let mut rng = thread_rng();
2199        (0..len)
2200            .map(|_| rng.gen_bool(valid_percent).then(|| rng.gen()))
2201            .collect()
2202    }
2203
2204    fn generate_strings<O: OffsetSizeTrait>(
2205        len: usize,
2206        valid_percent: f64,
2207    ) -> GenericStringArray<O> {
2208        let mut rng = thread_rng();
2209        (0..len)
2210            .map(|_| {
2211                rng.gen_bool(valid_percent).then(|| {
2212                    let len = rng.gen_range(0..100);
2213                    let bytes = (0..len).map(|_| rng.gen_range(0..128)).collect();
2214                    String::from_utf8(bytes).unwrap()
2215                })
2216            })
2217            .collect()
2218    }
2219
2220    fn generate_string_view(len: usize, valid_percent: f64) -> StringViewArray {
2221        let mut rng = thread_rng();
2222        (0..len)
2223            .map(|_| {
2224                rng.gen_bool(valid_percent).then(|| {
2225                    let len = rng.gen_range(0..100);
2226                    let bytes = (0..len).map(|_| rng.gen_range(0..128)).collect();
2227                    String::from_utf8(bytes).unwrap()
2228                })
2229            })
2230            .collect()
2231    }
2232
2233    fn generate_byte_view(len: usize, valid_percent: f64) -> BinaryViewArray {
2234        let mut rng = thread_rng();
2235        (0..len)
2236            .map(|_| {
2237                rng.gen_bool(valid_percent).then(|| {
2238                    let len = rng.gen_range(0..100);
2239                    let bytes: Vec<_> = (0..len).map(|_| rng.gen_range(0..128)).collect();
2240                    bytes
2241                })
2242            })
2243            .collect()
2244    }
2245
2246    fn generate_dictionary<K>(
2247        values: ArrayRef,
2248        len: usize,
2249        valid_percent: f64,
2250    ) -> DictionaryArray<K>
2251    where
2252        K: ArrowDictionaryKeyType,
2253        K::Native: SampleUniform,
2254    {
2255        let mut rng = thread_rng();
2256        let min_key = K::Native::from_usize(0).unwrap();
2257        let max_key = K::Native::from_usize(values.len()).unwrap();
2258        let keys: PrimitiveArray<K> = (0..len)
2259            .map(|_| {
2260                rng.gen_bool(valid_percent)
2261                    .then(|| rng.gen_range(min_key..max_key))
2262            })
2263            .collect();
2264
2265        let data_type =
2266            DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(values.data_type().clone()));
2267
2268        let data = keys
2269            .into_data()
2270            .into_builder()
2271            .data_type(data_type)
2272            .add_child_data(values.to_data())
2273            .build()
2274            .unwrap();
2275
2276        DictionaryArray::from(data)
2277    }
2278
2279    fn generate_fixed_size_binary(len: usize, valid_percent: f64) -> FixedSizeBinaryArray {
2280        let mut rng = thread_rng();
2281        let width = rng.gen_range(0..20);
2282        let mut builder = FixedSizeBinaryBuilder::new(width);
2283
2284        let mut b = vec![0; width as usize];
2285        for _ in 0..len {
2286            match rng.gen_bool(valid_percent) {
2287                true => {
2288                    b.iter_mut().for_each(|x| *x = rng.gen());
2289                    builder.append_value(&b).unwrap();
2290                }
2291                false => builder.append_null(),
2292            }
2293        }
2294
2295        builder.finish()
2296    }
2297
2298    fn generate_struct(len: usize, valid_percent: f64) -> StructArray {
2299        let mut rng = thread_rng();
2300        let nulls = NullBuffer::from_iter((0..len).map(|_| rng.gen_bool(valid_percent)));
2301        let a = generate_primitive_array::<Int32Type>(len, valid_percent);
2302        let b = generate_strings::<i32>(len, valid_percent);
2303        let fields = Fields::from(vec![
2304            Field::new("a", DataType::Int32, true),
2305            Field::new("b", DataType::Utf8, true),
2306        ]);
2307        let values = vec![Arc::new(a) as _, Arc::new(b) as _];
2308        StructArray::new(fields, values, Some(nulls))
2309    }
2310
2311    fn generate_list<F>(len: usize, valid_percent: f64, values: F) -> ListArray
2312    where
2313        F: FnOnce(usize) -> ArrayRef,
2314    {
2315        let mut rng = thread_rng();
2316        let offsets = OffsetBuffer::<i32>::from_lengths((0..len).map(|_| rng.gen_range(0..10)));
2317        let values_len = offsets.last().unwrap().to_usize().unwrap();
2318        let values = values(values_len);
2319        let nulls = NullBuffer::from_iter((0..len).map(|_| rng.gen_bool(valid_percent)));
2320        let field = Arc::new(Field::new_list_field(values.data_type().clone(), true));
2321        ListArray::new(field, offsets, values, Some(nulls))
2322    }
2323
2324    fn generate_column(len: usize) -> ArrayRef {
2325        let mut rng = thread_rng();
2326        match rng.gen_range(0..16) {
2327            0 => Arc::new(generate_primitive_array::<Int32Type>(len, 0.8)),
2328            1 => Arc::new(generate_primitive_array::<UInt32Type>(len, 0.8)),
2329            2 => Arc::new(generate_primitive_array::<Int64Type>(len, 0.8)),
2330            3 => Arc::new(generate_primitive_array::<UInt64Type>(len, 0.8)),
2331            4 => Arc::new(generate_primitive_array::<Float32Type>(len, 0.8)),
2332            5 => Arc::new(generate_primitive_array::<Float64Type>(len, 0.8)),
2333            6 => Arc::new(generate_strings::<i32>(len, 0.8)),
2334            7 => Arc::new(generate_dictionary::<Int64Type>(
2335                // Cannot test dictionaries containing null values because of #2687
2336                Arc::new(generate_strings::<i32>(rng.gen_range(1..len), 1.0)),
2337                len,
2338                0.8,
2339            )),
2340            8 => Arc::new(generate_dictionary::<Int64Type>(
2341                // Cannot test dictionaries containing null values because of #2687
2342                Arc::new(generate_primitive_array::<Int64Type>(
2343                    rng.gen_range(1..len),
2344                    1.0,
2345                )),
2346                len,
2347                0.8,
2348            )),
2349            9 => Arc::new(generate_fixed_size_binary(len, 0.8)),
2350            10 => Arc::new(generate_struct(len, 0.8)),
2351            11 => Arc::new(generate_list(len, 0.8, |values_len| {
2352                Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
2353            })),
2354            12 => Arc::new(generate_list(len, 0.8, |values_len| {
2355                Arc::new(generate_strings::<i32>(values_len, 0.8))
2356            })),
2357            13 => Arc::new(generate_list(len, 0.8, |values_len| {
2358                Arc::new(generate_struct(values_len, 0.8))
2359            })),
2360            14 => Arc::new(generate_string_view(len, 0.8)),
2361            15 => Arc::new(generate_byte_view(len, 0.8)),
2362            _ => unreachable!(),
2363        }
2364    }
2365
2366    fn print_row(cols: &[SortColumn], row: usize) -> String {
2367        let t: Vec<_> = cols
2368            .iter()
2369            .map(|x| match x.values.is_valid(row) {
2370                true => {
2371                    let opts = FormatOptions::default().with_null("NULL");
2372                    let formatter = ArrayFormatter::try_new(x.values.as_ref(), &opts).unwrap();
2373                    formatter.value(row).to_string()
2374                }
2375                false => "NULL".to_string(),
2376            })
2377            .collect();
2378        t.join(",")
2379    }
2380
2381    fn print_col_types(cols: &[SortColumn]) -> String {
2382        let t: Vec<_> = cols
2383            .iter()
2384            .map(|x| x.values.data_type().to_string())
2385            .collect();
2386        t.join(",")
2387    }
2388
2389    #[test]
2390    #[cfg_attr(miri, ignore)]
2391    fn fuzz_test() {
2392        for _ in 0..100 {
2393            let mut rng = thread_rng();
2394            let num_columns = rng.gen_range(1..5);
2395            let len = rng.gen_range(5..100);
2396            let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect();
2397
2398            let options: Vec<_> = (0..num_columns)
2399                .map(|_| SortOptions {
2400                    descending: rng.gen_bool(0.5),
2401                    nulls_first: rng.gen_bool(0.5),
2402                })
2403                .collect();
2404
2405            let sort_columns: Vec<_> = options
2406                .iter()
2407                .zip(&arrays)
2408                .map(|(o, c)| SortColumn {
2409                    values: Arc::clone(c),
2410                    options: Some(*o),
2411                })
2412                .collect();
2413
2414            let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap();
2415
2416            let columns: Vec<SortField> = options
2417                .into_iter()
2418                .zip(&arrays)
2419                .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
2420                .collect();
2421
2422            let converter = RowConverter::new(columns).unwrap();
2423            let rows = converter.convert_columns(&arrays).unwrap();
2424
2425            for i in 0..len {
2426                for j in 0..len {
2427                    let row_i = rows.row(i);
2428                    let row_j = rows.row(j);
2429                    let row_cmp = row_i.cmp(&row_j);
2430                    let lex_cmp = comparator.compare(i, j);
2431                    assert_eq!(
2432                        row_cmp,
2433                        lex_cmp,
2434                        "({:?} vs {:?}) vs ({:?} vs {:?}) for types {}",
2435                        print_row(&sort_columns, i),
2436                        print_row(&sort_columns, j),
2437                        row_i,
2438                        row_j,
2439                        print_col_types(&sort_columns)
2440                    );
2441                }
2442            }
2443
2444            let back = converter.convert_rows(&rows).unwrap();
2445            for (actual, expected) in back.iter().zip(&arrays) {
2446                actual.to_data().validate_full().unwrap();
2447                dictionary_eq(actual, expected)
2448            }
2449
2450            // Check that we can convert
2451            let rows = rows.try_into_binary().expect("reasonable size");
2452            let parser = converter.parser();
2453            let back = converter
2454                .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
2455                .unwrap();
2456            for (actual, expected) in back.iter().zip(&arrays) {
2457                actual.to_data().validate_full().unwrap();
2458                dictionary_eq(actual, expected)
2459            }
2460
2461            let rows = converter.from_binary(rows);
2462            let back = converter.convert_rows(&rows).unwrap();
2463            for (actual, expected) in back.iter().zip(&arrays) {
2464                actual.to_data().validate_full().unwrap();
2465                dictionary_eq(actual, expected)
2466            }
2467        }
2468    }
2469
2470    #[test]
2471    fn test_clear() {
2472        let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2473        let mut rows = converter.empty_rows(3, 128);
2474
2475        let first = Int32Array::from(vec![None, Some(2), Some(4)]);
2476        let second = Int32Array::from(vec![Some(2), None, Some(4)]);
2477        let arrays = [Arc::new(first) as ArrayRef, Arc::new(second) as ArrayRef];
2478
2479        for array in arrays.iter() {
2480            rows.clear();
2481            converter.append(&mut rows, &[array.clone()]).unwrap();
2482            let back = converter.convert_rows(&rows).unwrap();
2483            assert_eq!(&back[0], array);
2484        }
2485
2486        let mut rows_expected = converter.empty_rows(3, 128);
2487        converter.append(&mut rows_expected, &arrays[1..]).unwrap();
2488
2489        for (i, (actual, expected)) in rows.iter().zip(rows_expected.iter()).enumerate() {
2490            assert_eq!(
2491                actual, expected,
2492                "For row {}: expected {:?}, actual: {:?}",
2493                i, expected, actual
2494            );
2495        }
2496    }
2497
2498    #[test]
2499    fn test_append_codec_dictionary_binary() {
2500        use DataType::*;
2501        // Dictionary RowConverter
2502        let converter = RowConverter::new(vec![SortField::new(Dictionary(
2503            Box::new(Int32),
2504            Box::new(Binary),
2505        ))])
2506        .unwrap();
2507        let mut rows = converter.empty_rows(4, 128);
2508
2509        let keys = Int32Array::from_iter_values([0, 1, 2, 3]);
2510        let values = BinaryArray::from(vec![
2511            Some("a".as_bytes()),
2512            Some(b"b"),
2513            Some(b"c"),
2514            Some(b"d"),
2515        ]);
2516        let dict_array = DictionaryArray::new(keys, Arc::new(values));
2517
2518        rows.clear();
2519        let array = Arc::new(dict_array) as ArrayRef;
2520        converter.append(&mut rows, &[array.clone()]).unwrap();
2521        let back = converter.convert_rows(&rows).unwrap();
2522
2523        dictionary_eq(&back[0], &array);
2524    }
2525
2526    #[test]
2527    fn test_list_prefix() {
2528        let mut a = ListBuilder::new(Int8Builder::new());
2529        a.append_value([None]);
2530        a.append_value([None, None]);
2531        let a = a.finish();
2532
2533        let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
2534        let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
2535        assert_eq!(rows.row(0).cmp(&rows.row(1)), Ordering::Less);
2536    }
2537}