1#![warn(missing_docs)]
129use std::cmp::Ordering;
130use std::hash::{Hash, Hasher};
131use std::sync::Arc;
132
133use arrow_array::cast::*;
134use arrow_array::types::ArrowDictionaryKeyType;
135use arrow_array::*;
136use arrow_buffer::{ArrowNativeType, Buffer, OffsetBuffer, ScalarBuffer};
137use arrow_data::ArrayDataBuilder;
138use arrow_schema::*;
139use variable::{decode_binary_view, decode_string_view};
140
141use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive};
142use crate::variable::{decode_binary, decode_string};
143
144mod fixed;
145mod list;
146mod variable;
147
148#[derive(Debug)]
361pub struct RowConverter {
362 fields: Arc<[SortField]>,
363 codecs: Vec<Codec>,
365}
366
367#[derive(Debug)]
368enum Codec {
369 Stateless,
371 Dictionary(RowConverter, OwnedRow),
374 Struct(RowConverter, OwnedRow),
377 List(RowConverter),
379}
380
381impl Codec {
382 fn new(sort_field: &SortField) -> Result<Self, ArrowError> {
383 match &sort_field.data_type {
384 DataType::Dictionary(_, values) => {
385 let sort_field =
386 SortField::new_with_options(values.as_ref().clone(), sort_field.options);
387
388 let converter = RowConverter::new(vec![sort_field])?;
389 let null_array = new_null_array(values.as_ref(), 1);
390 let nulls = converter.convert_columns(&[null_array])?;
391
392 let owned = OwnedRow {
393 data: nulls.buffer.into(),
394 config: nulls.config,
395 };
396 Ok(Self::Dictionary(converter, owned))
397 }
398 d if !d.is_nested() => Ok(Self::Stateless),
399 DataType::List(f) | DataType::LargeList(f) => {
400 let options = SortOptions {
404 descending: false,
405 nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
406 };
407
408 let field = SortField::new_with_options(f.data_type().clone(), options);
409 let converter = RowConverter::new(vec![field])?;
410 Ok(Self::List(converter))
411 }
412 DataType::Struct(f) => {
413 let sort_fields = f
414 .iter()
415 .map(|x| SortField::new_with_options(x.data_type().clone(), sort_field.options))
416 .collect();
417
418 let converter = RowConverter::new(sort_fields)?;
419 let nulls: Vec<_> = f.iter().map(|x| new_null_array(x.data_type(), 1)).collect();
420
421 let nulls = converter.convert_columns(&nulls)?;
422 let owned = OwnedRow {
423 data: nulls.buffer.into(),
424 config: nulls.config,
425 };
426
427 Ok(Self::Struct(converter, owned))
428 }
429 _ => Err(ArrowError::NotYetImplemented(format!(
430 "not yet implemented: {:?}",
431 sort_field.data_type
432 ))),
433 }
434 }
435
436 fn encoder(&self, array: &dyn Array) -> Result<Encoder<'_>, ArrowError> {
437 match self {
438 Codec::Stateless => Ok(Encoder::Stateless),
439 Codec::Dictionary(converter, nulls) => {
440 let values = array.as_any_dictionary().values().clone();
441 let rows = converter.convert_columns(&[values])?;
442 Ok(Encoder::Dictionary(rows, nulls.row()))
443 }
444 Codec::Struct(converter, null) => {
445 let v = as_struct_array(array);
446 let rows = converter.convert_columns(v.columns())?;
447 Ok(Encoder::Struct(rows, null.row()))
448 }
449 Codec::List(converter) => {
450 let values = match array.data_type() {
451 DataType::List(_) => as_list_array(array).values(),
452 DataType::LargeList(_) => as_large_list_array(array).values(),
453 _ => unreachable!(),
454 };
455 let rows = converter.convert_columns(&[values.clone()])?;
456 Ok(Encoder::List(rows))
457 }
458 }
459 }
460
461 fn size(&self) -> usize {
462 match self {
463 Codec::Stateless => 0,
464 Codec::Dictionary(converter, nulls) => converter.size() + nulls.data.len(),
465 Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(),
466 Codec::List(converter) => converter.size(),
467 }
468 }
469}
470
471#[derive(Debug)]
472enum Encoder<'a> {
473 Stateless,
475 Dictionary(Rows, Row<'a>),
477 Struct(Rows, Row<'a>),
483 List(Rows),
485}
486
487#[derive(Debug, Clone, PartialEq, Eq)]
489pub struct SortField {
490 options: SortOptions,
492 data_type: DataType,
494}
495
496impl SortField {
497 pub fn new(data_type: DataType) -> Self {
499 Self::new_with_options(data_type, Default::default())
500 }
501
502 pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self {
504 Self { options, data_type }
505 }
506
507 pub fn size(&self) -> usize {
511 self.data_type.size() + std::mem::size_of::<Self>() - std::mem::size_of::<DataType>()
512 }
513}
514
515impl RowConverter {
516 pub fn new(fields: Vec<SortField>) -> Result<Self, ArrowError> {
518 if !Self::supports_fields(&fields) {
519 return Err(ArrowError::NotYetImplemented(format!(
520 "Row format support not yet implemented for: {fields:?}"
521 )));
522 }
523
524 let codecs = fields.iter().map(Codec::new).collect::<Result<_, _>>()?;
525 Ok(Self {
526 fields: fields.into(),
527 codecs,
528 })
529 }
530
531 pub fn supports_fields(fields: &[SortField]) -> bool {
533 fields.iter().all(|x| Self::supports_datatype(&x.data_type))
534 }
535
536 fn supports_datatype(d: &DataType) -> bool {
537 match d {
538 _ if !d.is_nested() => true,
539 DataType::List(f) | DataType::LargeList(f) | DataType::Map(f, _) => {
540 Self::supports_datatype(f.data_type())
541 }
542 DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())),
543 _ => false,
544 }
545 }
546
547 pub fn convert_columns(&self, columns: &[ArrayRef]) -> Result<Rows, ArrowError> {
555 let num_rows = columns.first().map(|x| x.len()).unwrap_or(0);
556 let mut rows = self.empty_rows(num_rows, 0);
557 self.append(&mut rows, columns)?;
558 Ok(rows)
559 }
560
561 pub fn append(&self, rows: &mut Rows, columns: &[ArrayRef]) -> Result<(), ArrowError> {
592 assert!(
593 Arc::ptr_eq(&rows.config.fields, &self.fields),
594 "rows were not produced by this RowConverter"
595 );
596
597 if columns.len() != self.fields.len() {
598 return Err(ArrowError::InvalidArgumentError(format!(
599 "Incorrect number of arrays provided to RowConverter, expected {} got {}",
600 self.fields.len(),
601 columns.len()
602 )));
603 }
604
605 let encoders = columns
606 .iter()
607 .zip(&self.codecs)
608 .zip(self.fields.iter())
609 .map(|((column, codec), field)| {
610 if !column.data_type().equals_datatype(&field.data_type) {
611 return Err(ArrowError::InvalidArgumentError(format!(
612 "RowConverter column schema mismatch, expected {} got {}",
613 field.data_type,
614 column.data_type()
615 )));
616 }
617 codec.encoder(column.as_ref())
618 })
619 .collect::<Result<Vec<_>, _>>()?;
620
621 let write_offset = rows.num_rows();
622 let lengths = row_lengths(columns, &encoders);
623
624 rows.offsets.reserve(lengths.len());
640 let mut cur_offset = rows.offsets[write_offset];
641 for l in lengths {
642 rows.offsets.push(cur_offset);
643 cur_offset = cur_offset.checked_add(l).expect("overflow");
644 }
645
646 rows.buffer.resize(cur_offset, 0);
650
651 for ((column, field), encoder) in columns.iter().zip(self.fields.iter()).zip(encoders) {
652 encode_column(
654 &mut rows.buffer,
655 &mut rows.offsets[write_offset..],
656 column.as_ref(),
657 field.options,
658 &encoder,
659 )
660 }
661
662 if cfg!(debug_assertions) {
663 assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len());
664 rows.offsets
665 .windows(2)
666 .for_each(|w| assert!(w[0] <= w[1], "offsets should be monotonic"));
667 }
668
669 Ok(())
670 }
671
672 pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>, ArrowError>
678 where
679 I: IntoIterator<Item = Row<'a>>,
680 {
681 let mut validate_utf8 = false;
682 let mut rows: Vec<_> = rows
683 .into_iter()
684 .map(|row| {
685 assert!(
686 Arc::ptr_eq(&row.config.fields, &self.fields),
687 "rows were not produced by this RowConverter"
688 );
689 validate_utf8 |= row.config.validate_utf8;
690 row.data
691 })
692 .collect();
693
694 unsafe { self.convert_raw(&mut rows, validate_utf8) }
698 }
699
700 pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows {
729 let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1));
730 offsets.push(0);
731
732 Rows {
733 offsets,
734 buffer: Vec::with_capacity(data_capacity),
735 config: RowConfig {
736 fields: self.fields.clone(),
737 validate_utf8: false,
738 },
739 }
740 }
741
742 pub fn from_binary(&self, array: BinaryArray) -> Rows {
769 assert_eq!(
770 array.null_count(),
771 0,
772 "can't construct Rows instance from array with nulls"
773 );
774 Rows {
775 buffer: array.values().to_vec(),
776 offsets: array.offsets().iter().map(|&i| i.as_usize()).collect(),
777 config: RowConfig {
778 fields: Arc::clone(&self.fields),
779 validate_utf8: true,
780 },
781 }
782 }
783
784 unsafe fn convert_raw(
790 &self,
791 rows: &mut [&[u8]],
792 validate_utf8: bool,
793 ) -> Result<Vec<ArrayRef>, ArrowError> {
794 self.fields
795 .iter()
796 .zip(&self.codecs)
797 .map(|(field, codec)| decode_column(field, rows, codec, validate_utf8))
798 .collect()
799 }
800
801 pub fn parser(&self) -> RowParser {
803 RowParser::new(Arc::clone(&self.fields))
804 }
805
806 pub fn size(&self) -> usize {
810 std::mem::size_of::<Self>()
811 + self.fields.iter().map(|x| x.size()).sum::<usize>()
812 + self.codecs.capacity() * std::mem::size_of::<Codec>()
813 + self.codecs.iter().map(Codec::size).sum::<usize>()
814 }
815}
816
817#[derive(Debug)]
819pub struct RowParser {
820 config: RowConfig,
821}
822
823impl RowParser {
824 fn new(fields: Arc<[SortField]>) -> Self {
825 Self {
826 config: RowConfig {
827 fields,
828 validate_utf8: true,
829 },
830 }
831 }
832
833 pub fn parse<'a>(&'a self, bytes: &'a [u8]) -> Row<'a> {
838 Row {
839 data: bytes,
840 config: &self.config,
841 }
842 }
843}
844
845#[derive(Debug, Clone)]
847struct RowConfig {
848 fields: Arc<[SortField]>,
850 validate_utf8: bool,
852}
853
854#[derive(Debug)]
858pub struct Rows {
859 buffer: Vec<u8>,
861 offsets: Vec<usize>,
863 config: RowConfig,
865}
866
867impl Rows {
868 pub fn push(&mut self, row: Row<'_>) {
870 assert!(
871 Arc::ptr_eq(&row.config.fields, &self.config.fields),
872 "row was not produced by this RowConverter"
873 );
874 self.config.validate_utf8 |= row.config.validate_utf8;
875 self.buffer.extend_from_slice(row.data);
876 self.offsets.push(self.buffer.len())
877 }
878
879 pub fn row(&self, row: usize) -> Row<'_> {
881 assert!(row + 1 < self.offsets.len());
882 unsafe { self.row_unchecked(row) }
883 }
884
885 pub unsafe fn row_unchecked(&self, index: usize) -> Row<'_> {
890 let end = unsafe { self.offsets.get_unchecked(index + 1) };
891 let start = unsafe { self.offsets.get_unchecked(index) };
892 let data = unsafe { self.buffer.get_unchecked(*start..*end) };
893 Row {
894 data,
895 config: &self.config,
896 }
897 }
898
899 pub fn clear(&mut self) {
901 self.offsets.truncate(1);
902 self.buffer.clear();
903 }
904
905 pub fn num_rows(&self) -> usize {
907 self.offsets.len() - 1
908 }
909
910 pub fn iter(&self) -> RowsIter<'_> {
912 self.into_iter()
913 }
914
915 pub fn size(&self) -> usize {
919 std::mem::size_of::<Self>()
921 + self.buffer.len()
922 + self.offsets.len() * std::mem::size_of::<usize>()
923 }
924
925 pub fn try_into_binary(self) -> Result<BinaryArray, ArrowError> {
955 if self.buffer.len() > i32::MAX as usize {
956 return Err(ArrowError::InvalidArgumentError(format!(
957 "{}-byte rows buffer too long to convert into a i32-indexed BinaryArray",
958 self.buffer.len()
959 )));
960 }
961 let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as));
963 let array = unsafe {
965 BinaryArray::new_unchecked(
966 OffsetBuffer::new_unchecked(offsets_scalar),
967 Buffer::from_vec(self.buffer),
968 None,
969 )
970 };
971 Ok(array)
972 }
973}
974
975impl<'a> IntoIterator for &'a Rows {
976 type Item = Row<'a>;
977 type IntoIter = RowsIter<'a>;
978
979 fn into_iter(self) -> Self::IntoIter {
980 RowsIter {
981 rows: self,
982 start: 0,
983 end: self.num_rows(),
984 }
985 }
986}
987
988#[derive(Debug)]
990pub struct RowsIter<'a> {
991 rows: &'a Rows,
992 start: usize,
993 end: usize,
994}
995
996impl<'a> Iterator for RowsIter<'a> {
997 type Item = Row<'a>;
998
999 fn next(&mut self) -> Option<Self::Item> {
1000 if self.end == self.start {
1001 return None;
1002 }
1003
1004 let row = unsafe { self.rows.row_unchecked(self.start) };
1006 self.start += 1;
1007 Some(row)
1008 }
1009
1010 fn size_hint(&self) -> (usize, Option<usize>) {
1011 let len = self.len();
1012 (len, Some(len))
1013 }
1014}
1015
1016impl ExactSizeIterator for RowsIter<'_> {
1017 fn len(&self) -> usize {
1018 self.end - self.start
1019 }
1020}
1021
1022impl DoubleEndedIterator for RowsIter<'_> {
1023 fn next_back(&mut self) -> Option<Self::Item> {
1024 if self.end == self.start {
1025 return None;
1026 }
1027 let row = unsafe { self.rows.row_unchecked(self.end) };
1029 self.end -= 1;
1030 Some(row)
1031 }
1032}
1033
1034#[derive(Debug, Copy, Clone)]
1043pub struct Row<'a> {
1044 data: &'a [u8],
1045 config: &'a RowConfig,
1046}
1047
1048impl<'a> Row<'a> {
1049 pub fn owned(&self) -> OwnedRow {
1051 OwnedRow {
1052 data: self.data.into(),
1053 config: self.config.clone(),
1054 }
1055 }
1056
1057 pub fn data(&self) -> &'a [u8] {
1059 self.data
1060 }
1061}
1062
1063impl PartialEq for Row<'_> {
1066 #[inline]
1067 fn eq(&self, other: &Self) -> bool {
1068 self.data.eq(other.data)
1069 }
1070}
1071
1072impl Eq for Row<'_> {}
1073
1074impl PartialOrd for Row<'_> {
1075 #[inline]
1076 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1077 Some(self.cmp(other))
1078 }
1079}
1080
1081impl Ord for Row<'_> {
1082 #[inline]
1083 fn cmp(&self, other: &Self) -> Ordering {
1084 self.data.cmp(other.data)
1085 }
1086}
1087
1088impl Hash for Row<'_> {
1089 #[inline]
1090 fn hash<H: Hasher>(&self, state: &mut H) {
1091 self.data.hash(state)
1092 }
1093}
1094
1095impl AsRef<[u8]> for Row<'_> {
1096 #[inline]
1097 fn as_ref(&self) -> &[u8] {
1098 self.data
1099 }
1100}
1101
1102#[derive(Debug, Clone)]
1106pub struct OwnedRow {
1107 data: Box<[u8]>,
1108 config: RowConfig,
1109}
1110
1111impl OwnedRow {
1112 pub fn row(&self) -> Row<'_> {
1116 Row {
1117 data: &self.data,
1118 config: &self.config,
1119 }
1120 }
1121}
1122
1123impl PartialEq for OwnedRow {
1126 #[inline]
1127 fn eq(&self, other: &Self) -> bool {
1128 self.row().eq(&other.row())
1129 }
1130}
1131
1132impl Eq for OwnedRow {}
1133
1134impl PartialOrd for OwnedRow {
1135 #[inline]
1136 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1137 Some(self.cmp(other))
1138 }
1139}
1140
1141impl Ord for OwnedRow {
1142 #[inline]
1143 fn cmp(&self, other: &Self) -> Ordering {
1144 self.row().cmp(&other.row())
1145 }
1146}
1147
1148impl Hash for OwnedRow {
1149 #[inline]
1150 fn hash<H: Hasher>(&self, state: &mut H) {
1151 self.row().hash(state)
1152 }
1153}
1154
1155impl AsRef<[u8]> for OwnedRow {
1156 #[inline]
1157 fn as_ref(&self) -> &[u8] {
1158 &self.data
1159 }
1160}
1161
1162#[inline]
1164fn null_sentinel(options: SortOptions) -> u8 {
1165 match options.nulls_first {
1166 true => 0,
1167 false => 0xFF,
1168 }
1169}
1170
1171fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> Vec<usize> {
1173 use fixed::FixedLengthEncoding;
1174
1175 let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
1176 let mut lengths = vec![0; num_rows];
1177
1178 for (array, encoder) in cols.iter().zip(encoders) {
1179 match encoder {
1180 Encoder::Stateless => {
1181 downcast_primitive_array! {
1182 array => lengths.iter_mut().for_each(|x| *x += fixed::encoded_len(array)),
1183 DataType::Null => {},
1184 DataType::Boolean => lengths.iter_mut().for_each(|x| *x += bool::ENCODED_LEN),
1185 DataType::Binary => as_generic_binary_array::<i32>(array)
1186 .iter()
1187 .zip(lengths.iter_mut())
1188 .for_each(|(slice, length)| *length += variable::encoded_len(slice)),
1189 DataType::LargeBinary => as_generic_binary_array::<i64>(array)
1190 .iter()
1191 .zip(lengths.iter_mut())
1192 .for_each(|(slice, length)| *length += variable::encoded_len(slice)),
1193 DataType::BinaryView => array.as_binary_view().iter().zip(lengths.iter_mut()).for_each(|(slice, length)| {
1194 *length += variable::encoded_len(slice)
1195 }),
1196 DataType::Utf8 => array.as_string::<i32>()
1197 .iter()
1198 .zip(lengths.iter_mut())
1199 .for_each(|(slice, length)| {
1200 *length += variable::encoded_len(slice.map(|x| x.as_bytes()))
1201 }),
1202 DataType::LargeUtf8 => array.as_string::<i64>()
1203 .iter()
1204 .zip(lengths.iter_mut())
1205 .for_each(|(slice, length)| {
1206 *length += variable::encoded_len(slice.map(|x| x.as_bytes()))
1207 }),
1208 DataType::Utf8View => array.as_string_view().iter().zip(lengths.iter_mut()).for_each(|(slice, length)| {
1209 *length += variable::encoded_len(slice.map(|x| x.as_bytes()))
1210 }),
1211 DataType::FixedSizeBinary(len) => {
1212 let len = len.to_usize().unwrap();
1213 lengths.iter_mut().for_each(|x| *x += 1 + len)
1214 }
1215 _ => unimplemented!("unsupported data type: {}", array.data_type()),
1216 }
1217 }
1218 Encoder::Dictionary(values, null) => {
1219 downcast_dictionary_array! {
1220 array => {
1221 for (v, length) in array.keys().iter().zip(lengths.iter_mut()) {
1222 *length += match v {
1223 Some(k) => values.row(k.as_usize()).data.len(),
1224 None => null.data.len(),
1225 }
1226 }
1227 }
1228 _ => unreachable!(),
1229 }
1230 }
1231 Encoder::Struct(rows, null) => {
1232 let array = as_struct_array(array);
1233 lengths.iter_mut().enumerate().for_each(|(idx, length)| {
1234 match array.is_valid(idx) {
1235 true => *length += 1 + rows.row(idx).as_ref().len(),
1236 false => *length += 1 + null.data.len(),
1237 }
1238 });
1239 }
1240 Encoder::List(rows) => match array.data_type() {
1241 DataType::List(_) => {
1242 list::compute_lengths(&mut lengths, rows, as_list_array(array))
1243 }
1244 DataType::LargeList(_) => {
1245 list::compute_lengths(&mut lengths, rows, as_large_list_array(array))
1246 }
1247 _ => unreachable!(),
1248 },
1249 }
1250 }
1251
1252 lengths
1253}
1254
1255fn encode_column(
1257 data: &mut [u8],
1258 offsets: &mut [usize],
1259 column: &dyn Array,
1260 opts: SortOptions,
1261 encoder: &Encoder<'_>,
1262) {
1263 match encoder {
1264 Encoder::Stateless => {
1265 downcast_primitive_array! {
1266 column => {
1267 if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1268 fixed::encode(data, offsets, column.values(), nulls, opts)
1269 } else {
1270 fixed::encode_not_null(data, offsets, column.values(), opts)
1271 }
1272 }
1273 DataType::Null => {}
1274 DataType::Boolean => {
1275 if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1276 fixed::encode_boolean(data, offsets, column.as_boolean().values(), nulls, opts)
1277 } else {
1278 fixed::encode_boolean_not_null(data, offsets, column.as_boolean().values(), opts)
1279 }
1280 }
1281 DataType::Binary => {
1282 variable::encode(data, offsets, as_generic_binary_array::<i32>(column).iter(), opts)
1283 }
1284 DataType::BinaryView => {
1285 variable::encode(data, offsets, column.as_binary_view().iter(), opts)
1286 }
1287 DataType::LargeBinary => {
1288 variable::encode(data, offsets, as_generic_binary_array::<i64>(column).iter(), opts)
1289 }
1290 DataType::Utf8 => variable::encode(
1291 data, offsets,
1292 column.as_string::<i32>().iter().map(|x| x.map(|x| x.as_bytes())),
1293 opts,
1294 ),
1295 DataType::LargeUtf8 => variable::encode(
1296 data, offsets,
1297 column.as_string::<i64>()
1298 .iter()
1299 .map(|x| x.map(|x| x.as_bytes())),
1300 opts,
1301 ),
1302 DataType::Utf8View => variable::encode(
1303 data, offsets,
1304 column.as_string_view().iter().map(|x| x.map(|x| x.as_bytes())),
1305 opts,
1306 ),
1307 DataType::FixedSizeBinary(_) => {
1308 let array = column.as_any().downcast_ref().unwrap();
1309 fixed::encode_fixed_size_binary(data, offsets, array, opts)
1310 }
1311 _ => unimplemented!("unsupported data type: {}", column.data_type()),
1312 }
1313 }
1314 Encoder::Dictionary(values, nulls) => {
1315 downcast_dictionary_array! {
1316 column => encode_dictionary_values(data, offsets, column, values, nulls),
1317 _ => unreachable!()
1318 }
1319 }
1320 Encoder::Struct(rows, null) => {
1321 let array = as_struct_array(column);
1322 let null_sentinel = null_sentinel(opts);
1323 offsets
1324 .iter_mut()
1325 .skip(1)
1326 .enumerate()
1327 .for_each(|(idx, offset)| {
1328 let (row, sentinel) = match array.is_valid(idx) {
1329 true => (rows.row(idx), 0x01),
1330 false => (*null, null_sentinel),
1331 };
1332 let end_offset = *offset + 1 + row.as_ref().len();
1333 data[*offset] = sentinel;
1334 data[*offset + 1..end_offset].copy_from_slice(row.as_ref());
1335 *offset = end_offset;
1336 })
1337 }
1338 Encoder::List(rows) => match column.data_type() {
1339 DataType::List(_) => list::encode(data, offsets, rows, opts, as_list_array(column)),
1340 DataType::LargeList(_) => {
1341 list::encode(data, offsets, rows, opts, as_large_list_array(column))
1342 }
1343 _ => unreachable!(),
1344 },
1345 }
1346}
1347
1348pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
1350 data: &mut [u8],
1351 offsets: &mut [usize],
1352 column: &DictionaryArray<K>,
1353 values: &Rows,
1354 null: &Row<'_>,
1355) {
1356 for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
1357 let row = match k {
1358 Some(k) => values.row(k.as_usize()).data,
1359 None => null.data,
1360 };
1361 let end_offset = *offset + row.len();
1362 data[*offset..end_offset].copy_from_slice(row);
1363 *offset = end_offset;
1364 }
1365}
1366
1367macro_rules! decode_primitive_helper {
1368 ($t:ty, $rows:ident, $data_type:ident, $options:ident) => {
1369 Arc::new(decode_primitive::<$t>($rows, $data_type, $options))
1370 };
1371}
1372
1373unsafe fn decode_column(
1379 field: &SortField,
1380 rows: &mut [&[u8]],
1381 codec: &Codec,
1382 validate_utf8: bool,
1383) -> Result<ArrayRef, ArrowError> {
1384 let options = field.options;
1385
1386 let array: ArrayRef = match codec {
1387 Codec::Stateless => {
1388 let data_type = field.data_type.clone();
1389 downcast_primitive! {
1390 data_type => (decode_primitive_helper, rows, data_type, options),
1391 DataType::Null => Arc::new(NullArray::new(rows.len())),
1392 DataType::Boolean => Arc::new(decode_bool(rows, options)),
1393 DataType::Binary => Arc::new(decode_binary::<i32>(rows, options)),
1394 DataType::LargeBinary => Arc::new(decode_binary::<i64>(rows, options)),
1395 DataType::BinaryView => Arc::new(decode_binary_view(rows, options)),
1396 DataType::FixedSizeBinary(size) => Arc::new(decode_fixed_size_binary(rows, size, options)),
1397 DataType::Utf8 => Arc::new(decode_string::<i32>(rows, options, validate_utf8)),
1398 DataType::LargeUtf8 => Arc::new(decode_string::<i64>(rows, options, validate_utf8)),
1399 DataType::Utf8View => Arc::new(decode_string_view(rows, options, validate_utf8)),
1400 _ => return Err(ArrowError::NotYetImplemented(format!("unsupported data type: {}", data_type)))
1401 }
1402 }
1403 Codec::Dictionary(converter, _) => {
1404 let cols = converter.convert_raw(rows, validate_utf8)?;
1405 cols.into_iter().next().unwrap()
1406 }
1407 Codec::Struct(converter, _) => {
1408 let (null_count, nulls) = fixed::decode_nulls(rows);
1409 rows.iter_mut().for_each(|row| *row = &row[1..]);
1410 let children = converter.convert_raw(rows, validate_utf8)?;
1411
1412 let child_data = children.iter().map(|c| c.to_data()).collect();
1413 let builder = ArrayDataBuilder::new(field.data_type.clone())
1414 .len(rows.len())
1415 .null_count(null_count)
1416 .null_bit_buffer(Some(nulls))
1417 .child_data(child_data);
1418
1419 Arc::new(StructArray::from(builder.build_unchecked()))
1420 }
1421 Codec::List(converter) => match &field.data_type {
1422 DataType::List(_) => {
1423 Arc::new(list::decode::<i32>(converter, rows, field, validate_utf8)?)
1424 }
1425 DataType::LargeList(_) => {
1426 Arc::new(list::decode::<i64>(converter, rows, field, validate_utf8)?)
1427 }
1428 _ => unreachable!(),
1429 },
1430 };
1431 Ok(array)
1432}
1433
1434#[cfg(test)]
1435mod tests {
1436 use rand::distributions::uniform::SampleUniform;
1437 use rand::distributions::{Distribution, Standard};
1438 use rand::{thread_rng, Rng};
1439
1440 use arrow_array::builder::*;
1441 use arrow_array::types::*;
1442 use arrow_array::*;
1443 use arrow_buffer::{i256, NullBuffer};
1444 use arrow_buffer::{Buffer, OffsetBuffer};
1445 use arrow_cast::display::{ArrayFormatter, FormatOptions};
1446 use arrow_ord::sort::{LexicographicalComparator, SortColumn};
1447
1448 use super::*;
1449
1450 #[test]
1451 fn test_fixed_width() {
1452 let cols = [
1453 Arc::new(Int16Array::from_iter([
1454 Some(1),
1455 Some(2),
1456 None,
1457 Some(-5),
1458 Some(2),
1459 Some(2),
1460 Some(0),
1461 ])) as ArrayRef,
1462 Arc::new(Float32Array::from_iter([
1463 Some(1.3),
1464 Some(2.5),
1465 None,
1466 Some(4.),
1467 Some(0.1),
1468 Some(-4.),
1469 Some(-0.),
1470 ])) as ArrayRef,
1471 ];
1472
1473 let converter = RowConverter::new(vec![
1474 SortField::new(DataType::Int16),
1475 SortField::new(DataType::Float32),
1476 ])
1477 .unwrap();
1478 let rows = converter.convert_columns(&cols).unwrap();
1479
1480 assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]);
1481 assert_eq!(
1482 rows.buffer,
1483 &[
1484 1, 128, 1, 1, 191, 166, 102, 102, 1, 128, 2, 1, 192, 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 127, 251, 1, 192, 128, 0, 0, 1, 128, 2, 1, 189, 204, 204, 205, 1, 128, 2, 1, 63, 127, 255, 255, 1, 128, 0, 1, 127, 255, 255, 255 ]
1499 );
1500
1501 assert!(rows.row(3) < rows.row(6));
1502 assert!(rows.row(0) < rows.row(1));
1503 assert!(rows.row(3) < rows.row(0));
1504 assert!(rows.row(4) < rows.row(1));
1505 assert!(rows.row(5) < rows.row(4));
1506
1507 let back = converter.convert_rows(&rows).unwrap();
1508 for (expected, actual) in cols.iter().zip(&back) {
1509 assert_eq!(expected, actual);
1510 }
1511 }
1512
1513 #[test]
1514 fn test_decimal128() {
1515 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal128(
1516 DECIMAL128_MAX_PRECISION,
1517 7,
1518 ))])
1519 .unwrap();
1520 let col = Arc::new(
1521 Decimal128Array::from_iter([
1522 None,
1523 Some(i128::MIN),
1524 Some(-13),
1525 Some(46_i128),
1526 Some(5456_i128),
1527 Some(i128::MAX),
1528 ])
1529 .with_precision_and_scale(38, 7)
1530 .unwrap(),
1531 ) as ArrayRef;
1532
1533 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1534 for i in 0..rows.num_rows() - 1 {
1535 assert!(rows.row(i) < rows.row(i + 1));
1536 }
1537
1538 let back = converter.convert_rows(&rows).unwrap();
1539 assert_eq!(back.len(), 1);
1540 assert_eq!(col.as_ref(), back[0].as_ref())
1541 }
1542
1543 #[test]
1544 fn test_decimal256() {
1545 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal256(
1546 DECIMAL256_MAX_PRECISION,
1547 7,
1548 ))])
1549 .unwrap();
1550 let col = Arc::new(
1551 Decimal256Array::from_iter([
1552 None,
1553 Some(i256::MIN),
1554 Some(i256::from_parts(0, -1)),
1555 Some(i256::from_parts(u128::MAX, -1)),
1556 Some(i256::from_parts(u128::MAX, 0)),
1557 Some(i256::from_parts(0, 46_i128)),
1558 Some(i256::from_parts(5, 46_i128)),
1559 Some(i256::MAX),
1560 ])
1561 .with_precision_and_scale(DECIMAL256_MAX_PRECISION, 7)
1562 .unwrap(),
1563 ) as ArrayRef;
1564
1565 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1566 for i in 0..rows.num_rows() - 1 {
1567 assert!(rows.row(i) < rows.row(i + 1));
1568 }
1569
1570 let back = converter.convert_rows(&rows).unwrap();
1571 assert_eq!(back.len(), 1);
1572 assert_eq!(col.as_ref(), back[0].as_ref())
1573 }
1574
1575 #[test]
1576 fn test_bool() {
1577 let converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]).unwrap();
1578
1579 let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])) as ArrayRef;
1580
1581 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1582 assert!(rows.row(2) > rows.row(1));
1583 assert!(rows.row(2) > rows.row(0));
1584 assert!(rows.row(1) > rows.row(0));
1585
1586 let cols = converter.convert_rows(&rows).unwrap();
1587 assert_eq!(&cols[0], &col);
1588
1589 let converter = RowConverter::new(vec![SortField::new_with_options(
1590 DataType::Boolean,
1591 SortOptions::default().desc().with_nulls_first(false),
1592 )])
1593 .unwrap();
1594
1595 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1596 assert!(rows.row(2) < rows.row(1));
1597 assert!(rows.row(2) < rows.row(0));
1598 assert!(rows.row(1) < rows.row(0));
1599 let cols = converter.convert_rows(&rows).unwrap();
1600 assert_eq!(&cols[0], &col);
1601 }
1602
1603 #[test]
1604 fn test_timezone() {
1605 let a =
1606 TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]).with_timezone("+01:00".to_string());
1607 let d = a.data_type().clone();
1608
1609 let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
1610 let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
1611 let back = converter.convert_rows(&rows).unwrap();
1612 assert_eq!(back.len(), 1);
1613 assert_eq!(back[0].data_type(), &d);
1614
1615 let mut a = PrimitiveDictionaryBuilder::<Int32Type, TimestampNanosecondType>::new();
1617 a.append(34).unwrap();
1618 a.append_null();
1619 a.append(345).unwrap();
1620
1621 let dict = a.finish();
1623 let values = TimestampNanosecondArray::from(dict.values().to_data());
1624 let dict_with_tz = dict.with_values(Arc::new(values.with_timezone("+02:00")));
1625 let v = DataType::Timestamp(TimeUnit::Nanosecond, Some("+02:00".into()));
1626 let d = DataType::Dictionary(Box::new(DataType::Int32), Box::new(v.clone()));
1627
1628 assert_eq!(dict_with_tz.data_type(), &d);
1629 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
1630 let rows = converter
1631 .convert_columns(&[Arc::new(dict_with_tz) as _])
1632 .unwrap();
1633 let back = converter.convert_rows(&rows).unwrap();
1634 assert_eq!(back.len(), 1);
1635 assert_eq!(back[0].data_type(), &v);
1636 }
1637
1638 #[test]
1639 fn test_null_encoding() {
1640 let col = Arc::new(NullArray::new(10));
1641 let converter = RowConverter::new(vec![SortField::new(DataType::Null)]).unwrap();
1642 let rows = converter.convert_columns(&[col]).unwrap();
1643 assert_eq!(rows.num_rows(), 10);
1644 assert_eq!(rows.row(1).data.len(), 0);
1645 }
1646
1647 #[test]
1648 fn test_variable_width() {
1649 let col = Arc::new(StringArray::from_iter([
1650 Some("hello"),
1651 Some("he"),
1652 None,
1653 Some("foo"),
1654 Some(""),
1655 ])) as ArrayRef;
1656
1657 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1658 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1659
1660 assert!(rows.row(1) < rows.row(0));
1661 assert!(rows.row(2) < rows.row(4));
1662 assert!(rows.row(3) < rows.row(0));
1663 assert!(rows.row(3) < rows.row(1));
1664
1665 let cols = converter.convert_rows(&rows).unwrap();
1666 assert_eq!(&cols[0], &col);
1667
1668 let col = Arc::new(BinaryArray::from_iter([
1669 None,
1670 Some(vec![0_u8; 0]),
1671 Some(vec![0_u8; 6]),
1672 Some(vec![0_u8; variable::MINI_BLOCK_SIZE]),
1673 Some(vec![0_u8; variable::MINI_BLOCK_SIZE + 1]),
1674 Some(vec![0_u8; variable::BLOCK_SIZE]),
1675 Some(vec![0_u8; variable::BLOCK_SIZE + 1]),
1676 Some(vec![1_u8; 6]),
1677 Some(vec![1_u8; variable::MINI_BLOCK_SIZE]),
1678 Some(vec![1_u8; variable::MINI_BLOCK_SIZE + 1]),
1679 Some(vec![1_u8; variable::BLOCK_SIZE]),
1680 Some(vec![1_u8; variable::BLOCK_SIZE + 1]),
1681 Some(vec![0xFF_u8; 6]),
1682 Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE]),
1683 Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE + 1]),
1684 Some(vec![0xFF_u8; variable::BLOCK_SIZE]),
1685 Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]),
1686 ])) as ArrayRef;
1687
1688 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
1689 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1690
1691 for i in 0..rows.num_rows() {
1692 for j in i + 1..rows.num_rows() {
1693 assert!(
1694 rows.row(i) < rows.row(j),
1695 "{} < {} - {:?} < {:?}",
1696 i,
1697 j,
1698 rows.row(i),
1699 rows.row(j)
1700 );
1701 }
1702 }
1703
1704 let cols = converter.convert_rows(&rows).unwrap();
1705 assert_eq!(&cols[0], &col);
1706
1707 let converter = RowConverter::new(vec![SortField::new_with_options(
1708 DataType::Binary,
1709 SortOptions::default().desc().with_nulls_first(false),
1710 )])
1711 .unwrap();
1712 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1713
1714 for i in 0..rows.num_rows() {
1715 for j in i + 1..rows.num_rows() {
1716 assert!(
1717 rows.row(i) > rows.row(j),
1718 "{} > {} - {:?} > {:?}",
1719 i,
1720 j,
1721 rows.row(i),
1722 rows.row(j)
1723 );
1724 }
1725 }
1726
1727 let cols = converter.convert_rows(&rows).unwrap();
1728 assert_eq!(&cols[0], &col);
1729 }
1730
1731 fn dictionary_eq(a: &dyn Array, b: &dyn Array) {
1733 match b.data_type() {
1734 DataType::Dictionary(_, v) => {
1735 assert_eq!(a.data_type(), v.as_ref());
1736 let b = arrow_cast::cast(b, v).unwrap();
1737 assert_eq!(a, b.as_ref())
1738 }
1739 _ => assert_eq!(a, b),
1740 }
1741 }
1742
1743 #[test]
1744 fn test_string_dictionary() {
1745 let a = Arc::new(DictionaryArray::<Int32Type>::from_iter([
1746 Some("foo"),
1747 Some("hello"),
1748 Some("he"),
1749 None,
1750 Some("hello"),
1751 Some(""),
1752 Some("hello"),
1753 Some("hello"),
1754 ])) as ArrayRef;
1755
1756 let field = SortField::new(a.data_type().clone());
1757 let converter = RowConverter::new(vec![field]).unwrap();
1758 let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
1759
1760 assert!(rows_a.row(3) < rows_a.row(5));
1761 assert!(rows_a.row(2) < rows_a.row(1));
1762 assert!(rows_a.row(0) < rows_a.row(1));
1763 assert!(rows_a.row(3) < rows_a.row(0));
1764
1765 assert_eq!(rows_a.row(1), rows_a.row(4));
1766 assert_eq!(rows_a.row(1), rows_a.row(6));
1767 assert_eq!(rows_a.row(1), rows_a.row(7));
1768
1769 let cols = converter.convert_rows(&rows_a).unwrap();
1770 dictionary_eq(&cols[0], &a);
1771
1772 let b = Arc::new(DictionaryArray::<Int32Type>::from_iter([
1773 Some("hello"),
1774 None,
1775 Some("cupcakes"),
1776 ])) as ArrayRef;
1777
1778 let rows_b = converter.convert_columns(&[Arc::clone(&b)]).unwrap();
1779 assert_eq!(rows_a.row(1), rows_b.row(0));
1780 assert_eq!(rows_a.row(3), rows_b.row(1));
1781 assert!(rows_b.row(2) < rows_a.row(0));
1782
1783 let cols = converter.convert_rows(&rows_b).unwrap();
1784 dictionary_eq(&cols[0], &b);
1785
1786 let converter = RowConverter::new(vec![SortField::new_with_options(
1787 a.data_type().clone(),
1788 SortOptions::default().desc().with_nulls_first(false),
1789 )])
1790 .unwrap();
1791
1792 let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
1793 assert!(rows_c.row(3) > rows_c.row(5));
1794 assert!(rows_c.row(2) > rows_c.row(1));
1795 assert!(rows_c.row(0) > rows_c.row(1));
1796 assert!(rows_c.row(3) > rows_c.row(0));
1797
1798 let cols = converter.convert_rows(&rows_c).unwrap();
1799 dictionary_eq(&cols[0], &a);
1800
1801 let converter = RowConverter::new(vec![SortField::new_with_options(
1802 a.data_type().clone(),
1803 SortOptions::default().desc().with_nulls_first(true),
1804 )])
1805 .unwrap();
1806
1807 let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
1808 assert!(rows_c.row(3) < rows_c.row(5));
1809 assert!(rows_c.row(2) > rows_c.row(1));
1810 assert!(rows_c.row(0) > rows_c.row(1));
1811 assert!(rows_c.row(3) < rows_c.row(0));
1812
1813 let cols = converter.convert_rows(&rows_c).unwrap();
1814 dictionary_eq(&cols[0], &a);
1815 }
1816
1817 #[test]
1818 fn test_struct() {
1819 let a = Arc::new(Int32Array::from(vec![1, 1, 2, 2])) as ArrayRef;
1821 let a_f = Arc::new(Field::new("int", DataType::Int32, false));
1822 let u = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as ArrayRef;
1823 let u_f = Arc::new(Field::new("s", DataType::Utf8, false));
1824 let s1 = Arc::new(StructArray::from(vec![(a_f, a), (u_f, u)])) as ArrayRef;
1825
1826 let sort_fields = vec![SortField::new(s1.data_type().clone())];
1827 let converter = RowConverter::new(sort_fields).unwrap();
1828 let r1 = converter.convert_columns(&[Arc::clone(&s1)]).unwrap();
1829
1830 for (a, b) in r1.iter().zip(r1.iter().skip(1)) {
1831 assert!(a < b);
1832 }
1833
1834 let back = converter.convert_rows(&r1).unwrap();
1835 assert_eq!(back.len(), 1);
1836 assert_eq!(&back[0], &s1);
1837
1838 let data = s1
1840 .to_data()
1841 .into_builder()
1842 .null_bit_buffer(Some(Buffer::from_slice_ref([0b00001010])))
1843 .null_count(2)
1844 .build()
1845 .unwrap();
1846
1847 let s2 = Arc::new(StructArray::from(data)) as ArrayRef;
1848 let r2 = converter.convert_columns(&[Arc::clone(&s2)]).unwrap();
1849 assert_eq!(r2.row(0), r2.row(2)); assert!(r2.row(0) < r2.row(1)); assert_ne!(r1.row(0), r2.row(0)); assert_eq!(r1.row(1), r2.row(1)); let back = converter.convert_rows(&r2).unwrap();
1855 assert_eq!(back.len(), 1);
1856 assert_eq!(&back[0], &s2);
1857
1858 back[0].to_data().validate_full().unwrap();
1859 }
1860
1861 #[test]
1862 fn test_primitive_dictionary() {
1863 let mut builder = PrimitiveDictionaryBuilder::<Int32Type, Int32Type>::new();
1864 builder.append(2).unwrap();
1865 builder.append(3).unwrap();
1866 builder.append(0).unwrap();
1867 builder.append_null();
1868 builder.append(5).unwrap();
1869 builder.append(3).unwrap();
1870 builder.append(-1).unwrap();
1871
1872 let a = builder.finish();
1873 let data_type = a.data_type().clone();
1874 let columns = [Arc::new(a) as ArrayRef];
1875
1876 let field = SortField::new(data_type.clone());
1877 let converter = RowConverter::new(vec![field]).unwrap();
1878 let rows = converter.convert_columns(&columns).unwrap();
1879 assert!(rows.row(0) < rows.row(1));
1880 assert!(rows.row(2) < rows.row(0));
1881 assert!(rows.row(3) < rows.row(2));
1882 assert!(rows.row(6) < rows.row(2));
1883 assert!(rows.row(3) < rows.row(6));
1884 }
1885
1886 #[test]
1887 fn test_dictionary_nulls() {
1888 let values = Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data();
1889 let keys =
1890 Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]).into_data();
1891
1892 let data_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32));
1893 let data = keys
1894 .into_builder()
1895 .data_type(data_type.clone())
1896 .child_data(vec![values])
1897 .build()
1898 .unwrap();
1899
1900 let columns = [Arc::new(DictionaryArray::<Int32Type>::from(data)) as ArrayRef];
1901 let field = SortField::new(data_type.clone());
1902 let converter = RowConverter::new(vec![field]).unwrap();
1903 let rows = converter.convert_columns(&columns).unwrap();
1904
1905 assert_eq!(rows.row(0), rows.row(1));
1906 assert_eq!(rows.row(3), rows.row(4));
1907 assert_eq!(rows.row(4), rows.row(5));
1908 assert!(rows.row(3) < rows.row(0));
1909 }
1910
1911 #[test]
1912 #[should_panic(expected = "Encountered non UTF-8 data")]
1913 fn test_invalid_utf8() {
1914 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
1915 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
1916 let rows = converter.convert_columns(&[array]).unwrap();
1917 let binary_row = rows.row(0);
1918
1919 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1920 let parser = converter.parser();
1921 let utf8_row = parser.parse(binary_row.as_ref());
1922
1923 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
1924 }
1925
1926 #[test]
1927 #[should_panic(expected = "Encountered non UTF-8 data")]
1928 fn test_invalid_utf8_array() {
1929 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
1930 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
1931 let rows = converter.convert_columns(&[array]).unwrap();
1932 let binary_rows = rows.try_into_binary().expect("known-small rows");
1933
1934 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1935 let parsed = converter.from_binary(binary_rows);
1936
1937 converter.convert_rows(parsed.iter()).unwrap();
1938 }
1939
1940 #[test]
1941 #[should_panic(expected = "index out of bounds")]
1942 fn test_invalid_empty() {
1943 let binary_row: &[u8] = &[];
1944
1945 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1946 let parser = converter.parser();
1947 let utf8_row = parser.parse(binary_row.as_ref());
1948
1949 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
1950 }
1951
1952 #[test]
1953 #[should_panic(expected = "index out of bounds")]
1954 fn test_invalid_empty_array() {
1955 let row: &[u8] = &[];
1956 let binary_rows = BinaryArray::from(vec![row]);
1957
1958 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1959 let parsed = converter.from_binary(binary_rows);
1960
1961 converter.convert_rows(parsed.iter()).unwrap();
1962 }
1963
1964 #[test]
1965 #[should_panic(expected = "index out of bounds")]
1966 fn test_invalid_truncated() {
1967 let binary_row: &[u8] = &[0x02];
1968
1969 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1970 let parser = converter.parser();
1971 let utf8_row = parser.parse(binary_row.as_ref());
1972
1973 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
1974 }
1975
1976 #[test]
1977 #[should_panic(expected = "index out of bounds")]
1978 fn test_invalid_truncated_array() {
1979 let row: &[u8] = &[0x02];
1980 let binary_rows = BinaryArray::from(vec![row]);
1981
1982 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1983 let parsed = converter.from_binary(binary_rows);
1984
1985 converter.convert_rows(parsed.iter()).unwrap();
1986 }
1987
1988 #[test]
1989 #[should_panic(expected = "rows were not produced by this RowConverter")]
1990 fn test_different_converter() {
1991 let values = Arc::new(Int32Array::from_iter([Some(1), Some(-1)]));
1992 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
1993 let rows = converter.convert_columns(&[values]).unwrap();
1994
1995 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
1996 let _ = converter.convert_rows(&rows);
1997 }
1998
1999 fn test_single_list<O: OffsetSizeTrait>() {
2000 let mut builder = GenericListBuilder::<O, _>::new(Int32Builder::new());
2001 builder.values().append_value(32);
2002 builder.values().append_value(52);
2003 builder.values().append_value(32);
2004 builder.append(true);
2005 builder.values().append_value(32);
2006 builder.values().append_value(52);
2007 builder.values().append_value(12);
2008 builder.append(true);
2009 builder.values().append_value(32);
2010 builder.values().append_value(52);
2011 builder.append(true);
2012 builder.values().append_value(32); builder.values().append_value(52); builder.append(false);
2015 builder.values().append_value(32);
2016 builder.values().append_null();
2017 builder.append(true);
2018 builder.append(true);
2019
2020 let list = Arc::new(builder.finish()) as ArrayRef;
2021 let d = list.data_type().clone();
2022
2023 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2024
2025 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2026 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); let back = converter.convert_rows(&rows).unwrap();
2034 assert_eq!(back.len(), 1);
2035 back[0].to_data().validate_full().unwrap();
2036 assert_eq!(&back[0], &list);
2037
2038 let options = SortOptions::default().asc().with_nulls_first(false);
2039 let field = SortField::new_with_options(d.clone(), options);
2040 let converter = RowConverter::new(vec![field]).unwrap();
2041 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2042
2043 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) > rows.row(5)); let back = converter.convert_rows(&rows).unwrap();
2051 assert_eq!(back.len(), 1);
2052 back[0].to_data().validate_full().unwrap();
2053 assert_eq!(&back[0], &list);
2054
2055 let options = SortOptions::default().desc().with_nulls_first(false);
2056 let field = SortField::new_with_options(d.clone(), options);
2057 let converter = RowConverter::new(vec![field]).unwrap();
2058 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2059
2060 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); let back = converter.convert_rows(&rows).unwrap();
2068 assert_eq!(back.len(), 1);
2069 back[0].to_data().validate_full().unwrap();
2070 assert_eq!(&back[0], &list);
2071
2072 let options = SortOptions::default().desc().with_nulls_first(true);
2073 let field = SortField::new_with_options(d, options);
2074 let converter = RowConverter::new(vec![field]).unwrap();
2075 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2076
2077 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) < rows.row(5)); let back = converter.convert_rows(&rows).unwrap();
2085 assert_eq!(back.len(), 1);
2086 back[0].to_data().validate_full().unwrap();
2087 assert_eq!(&back[0], &list);
2088 }
2089
2090 fn test_nested_list<O: OffsetSizeTrait>() {
2091 let mut builder =
2092 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(Int32Builder::new()));
2093
2094 builder.values().values().append_value(1);
2095 builder.values().values().append_value(2);
2096 builder.values().append(true);
2097 builder.values().values().append_value(1);
2098 builder.values().values().append_null();
2099 builder.values().append(true);
2100 builder.append(true);
2101
2102 builder.values().values().append_value(1);
2103 builder.values().values().append_null();
2104 builder.values().append(true);
2105 builder.values().values().append_value(1);
2106 builder.values().values().append_null();
2107 builder.values().append(true);
2108 builder.append(true);
2109
2110 builder.values().values().append_value(1);
2111 builder.values().values().append_null();
2112 builder.values().append(true);
2113 builder.values().append(false);
2114 builder.append(true);
2115 builder.append(false);
2116
2117 builder.values().values().append_value(1);
2118 builder.values().values().append_value(2);
2119 builder.values().append(true);
2120 builder.append(true);
2121
2122 let list = Arc::new(builder.finish()) as ArrayRef;
2123 let d = list.data_type().clone();
2124
2125 let options = SortOptions::default().asc().with_nulls_first(true);
2133 let field = SortField::new_with_options(d.clone(), options);
2134 let converter = RowConverter::new(vec![field]).unwrap();
2135 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2136
2137 assert!(rows.row(0) > rows.row(1));
2138 assert!(rows.row(1) > rows.row(2));
2139 assert!(rows.row(2) > rows.row(3));
2140 assert!(rows.row(4) < rows.row(0));
2141 assert!(rows.row(4) > rows.row(1));
2142
2143 let back = converter.convert_rows(&rows).unwrap();
2144 assert_eq!(back.len(), 1);
2145 back[0].to_data().validate_full().unwrap();
2146 assert_eq!(&back[0], &list);
2147
2148 let options = SortOptions::default().desc().with_nulls_first(true);
2149 let field = SortField::new_with_options(d.clone(), options);
2150 let converter = RowConverter::new(vec![field]).unwrap();
2151 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2152
2153 assert!(rows.row(0) > rows.row(1));
2154 assert!(rows.row(1) > rows.row(2));
2155 assert!(rows.row(2) > rows.row(3));
2156 assert!(rows.row(4) > rows.row(0));
2157 assert!(rows.row(4) > rows.row(1));
2158
2159 let back = converter.convert_rows(&rows).unwrap();
2160 assert_eq!(back.len(), 1);
2161 back[0].to_data().validate_full().unwrap();
2162 assert_eq!(&back[0], &list);
2163
2164 let options = SortOptions::default().desc().with_nulls_first(false);
2165 let field = SortField::new_with_options(d, options);
2166 let converter = RowConverter::new(vec![field]).unwrap();
2167 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2168
2169 assert!(rows.row(0) < rows.row(1));
2170 assert!(rows.row(1) < rows.row(2));
2171 assert!(rows.row(2) < rows.row(3));
2172 assert!(rows.row(4) > rows.row(0));
2173 assert!(rows.row(4) < rows.row(1));
2174
2175 let back = converter.convert_rows(&rows).unwrap();
2176 assert_eq!(back.len(), 1);
2177 back[0].to_data().validate_full().unwrap();
2178 assert_eq!(&back[0], &list);
2179 }
2180
2181 #[test]
2182 fn test_list() {
2183 test_single_list::<i32>();
2184 test_nested_list::<i32>();
2185 }
2186
2187 #[test]
2188 fn test_large_list() {
2189 test_single_list::<i64>();
2190 test_nested_list::<i64>();
2191 }
2192
2193 fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K>
2194 where
2195 K: ArrowPrimitiveType,
2196 Standard: Distribution<K::Native>,
2197 {
2198 let mut rng = thread_rng();
2199 (0..len)
2200 .map(|_| rng.gen_bool(valid_percent).then(|| rng.gen()))
2201 .collect()
2202 }
2203
2204 fn generate_strings<O: OffsetSizeTrait>(
2205 len: usize,
2206 valid_percent: f64,
2207 ) -> GenericStringArray<O> {
2208 let mut rng = thread_rng();
2209 (0..len)
2210 .map(|_| {
2211 rng.gen_bool(valid_percent).then(|| {
2212 let len = rng.gen_range(0..100);
2213 let bytes = (0..len).map(|_| rng.gen_range(0..128)).collect();
2214 String::from_utf8(bytes).unwrap()
2215 })
2216 })
2217 .collect()
2218 }
2219
2220 fn generate_string_view(len: usize, valid_percent: f64) -> StringViewArray {
2221 let mut rng = thread_rng();
2222 (0..len)
2223 .map(|_| {
2224 rng.gen_bool(valid_percent).then(|| {
2225 let len = rng.gen_range(0..100);
2226 let bytes = (0..len).map(|_| rng.gen_range(0..128)).collect();
2227 String::from_utf8(bytes).unwrap()
2228 })
2229 })
2230 .collect()
2231 }
2232
2233 fn generate_byte_view(len: usize, valid_percent: f64) -> BinaryViewArray {
2234 let mut rng = thread_rng();
2235 (0..len)
2236 .map(|_| {
2237 rng.gen_bool(valid_percent).then(|| {
2238 let len = rng.gen_range(0..100);
2239 let bytes: Vec<_> = (0..len).map(|_| rng.gen_range(0..128)).collect();
2240 bytes
2241 })
2242 })
2243 .collect()
2244 }
2245
2246 fn generate_dictionary<K>(
2247 values: ArrayRef,
2248 len: usize,
2249 valid_percent: f64,
2250 ) -> DictionaryArray<K>
2251 where
2252 K: ArrowDictionaryKeyType,
2253 K::Native: SampleUniform,
2254 {
2255 let mut rng = thread_rng();
2256 let min_key = K::Native::from_usize(0).unwrap();
2257 let max_key = K::Native::from_usize(values.len()).unwrap();
2258 let keys: PrimitiveArray<K> = (0..len)
2259 .map(|_| {
2260 rng.gen_bool(valid_percent)
2261 .then(|| rng.gen_range(min_key..max_key))
2262 })
2263 .collect();
2264
2265 let data_type =
2266 DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(values.data_type().clone()));
2267
2268 let data = keys
2269 .into_data()
2270 .into_builder()
2271 .data_type(data_type)
2272 .add_child_data(values.to_data())
2273 .build()
2274 .unwrap();
2275
2276 DictionaryArray::from(data)
2277 }
2278
2279 fn generate_fixed_size_binary(len: usize, valid_percent: f64) -> FixedSizeBinaryArray {
2280 let mut rng = thread_rng();
2281 let width = rng.gen_range(0..20);
2282 let mut builder = FixedSizeBinaryBuilder::new(width);
2283
2284 let mut b = vec![0; width as usize];
2285 for _ in 0..len {
2286 match rng.gen_bool(valid_percent) {
2287 true => {
2288 b.iter_mut().for_each(|x| *x = rng.gen());
2289 builder.append_value(&b).unwrap();
2290 }
2291 false => builder.append_null(),
2292 }
2293 }
2294
2295 builder.finish()
2296 }
2297
2298 fn generate_struct(len: usize, valid_percent: f64) -> StructArray {
2299 let mut rng = thread_rng();
2300 let nulls = NullBuffer::from_iter((0..len).map(|_| rng.gen_bool(valid_percent)));
2301 let a = generate_primitive_array::<Int32Type>(len, valid_percent);
2302 let b = generate_strings::<i32>(len, valid_percent);
2303 let fields = Fields::from(vec![
2304 Field::new("a", DataType::Int32, true),
2305 Field::new("b", DataType::Utf8, true),
2306 ]);
2307 let values = vec![Arc::new(a) as _, Arc::new(b) as _];
2308 StructArray::new(fields, values, Some(nulls))
2309 }
2310
2311 fn generate_list<F>(len: usize, valid_percent: f64, values: F) -> ListArray
2312 where
2313 F: FnOnce(usize) -> ArrayRef,
2314 {
2315 let mut rng = thread_rng();
2316 let offsets = OffsetBuffer::<i32>::from_lengths((0..len).map(|_| rng.gen_range(0..10)));
2317 let values_len = offsets.last().unwrap().to_usize().unwrap();
2318 let values = values(values_len);
2319 let nulls = NullBuffer::from_iter((0..len).map(|_| rng.gen_bool(valid_percent)));
2320 let field = Arc::new(Field::new_list_field(values.data_type().clone(), true));
2321 ListArray::new(field, offsets, values, Some(nulls))
2322 }
2323
2324 fn generate_column(len: usize) -> ArrayRef {
2325 let mut rng = thread_rng();
2326 match rng.gen_range(0..16) {
2327 0 => Arc::new(generate_primitive_array::<Int32Type>(len, 0.8)),
2328 1 => Arc::new(generate_primitive_array::<UInt32Type>(len, 0.8)),
2329 2 => Arc::new(generate_primitive_array::<Int64Type>(len, 0.8)),
2330 3 => Arc::new(generate_primitive_array::<UInt64Type>(len, 0.8)),
2331 4 => Arc::new(generate_primitive_array::<Float32Type>(len, 0.8)),
2332 5 => Arc::new(generate_primitive_array::<Float64Type>(len, 0.8)),
2333 6 => Arc::new(generate_strings::<i32>(len, 0.8)),
2334 7 => Arc::new(generate_dictionary::<Int64Type>(
2335 Arc::new(generate_strings::<i32>(rng.gen_range(1..len), 1.0)),
2337 len,
2338 0.8,
2339 )),
2340 8 => Arc::new(generate_dictionary::<Int64Type>(
2341 Arc::new(generate_primitive_array::<Int64Type>(
2343 rng.gen_range(1..len),
2344 1.0,
2345 )),
2346 len,
2347 0.8,
2348 )),
2349 9 => Arc::new(generate_fixed_size_binary(len, 0.8)),
2350 10 => Arc::new(generate_struct(len, 0.8)),
2351 11 => Arc::new(generate_list(len, 0.8, |values_len| {
2352 Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
2353 })),
2354 12 => Arc::new(generate_list(len, 0.8, |values_len| {
2355 Arc::new(generate_strings::<i32>(values_len, 0.8))
2356 })),
2357 13 => Arc::new(generate_list(len, 0.8, |values_len| {
2358 Arc::new(generate_struct(values_len, 0.8))
2359 })),
2360 14 => Arc::new(generate_string_view(len, 0.8)),
2361 15 => Arc::new(generate_byte_view(len, 0.8)),
2362 _ => unreachable!(),
2363 }
2364 }
2365
2366 fn print_row(cols: &[SortColumn], row: usize) -> String {
2367 let t: Vec<_> = cols
2368 .iter()
2369 .map(|x| match x.values.is_valid(row) {
2370 true => {
2371 let opts = FormatOptions::default().with_null("NULL");
2372 let formatter = ArrayFormatter::try_new(x.values.as_ref(), &opts).unwrap();
2373 formatter.value(row).to_string()
2374 }
2375 false => "NULL".to_string(),
2376 })
2377 .collect();
2378 t.join(",")
2379 }
2380
2381 fn print_col_types(cols: &[SortColumn]) -> String {
2382 let t: Vec<_> = cols
2383 .iter()
2384 .map(|x| x.values.data_type().to_string())
2385 .collect();
2386 t.join(",")
2387 }
2388
2389 #[test]
2390 #[cfg_attr(miri, ignore)]
2391 fn fuzz_test() {
2392 for _ in 0..100 {
2393 let mut rng = thread_rng();
2394 let num_columns = rng.gen_range(1..5);
2395 let len = rng.gen_range(5..100);
2396 let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect();
2397
2398 let options: Vec<_> = (0..num_columns)
2399 .map(|_| SortOptions {
2400 descending: rng.gen_bool(0.5),
2401 nulls_first: rng.gen_bool(0.5),
2402 })
2403 .collect();
2404
2405 let sort_columns: Vec<_> = options
2406 .iter()
2407 .zip(&arrays)
2408 .map(|(o, c)| SortColumn {
2409 values: Arc::clone(c),
2410 options: Some(*o),
2411 })
2412 .collect();
2413
2414 let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap();
2415
2416 let columns: Vec<SortField> = options
2417 .into_iter()
2418 .zip(&arrays)
2419 .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
2420 .collect();
2421
2422 let converter = RowConverter::new(columns).unwrap();
2423 let rows = converter.convert_columns(&arrays).unwrap();
2424
2425 for i in 0..len {
2426 for j in 0..len {
2427 let row_i = rows.row(i);
2428 let row_j = rows.row(j);
2429 let row_cmp = row_i.cmp(&row_j);
2430 let lex_cmp = comparator.compare(i, j);
2431 assert_eq!(
2432 row_cmp,
2433 lex_cmp,
2434 "({:?} vs {:?}) vs ({:?} vs {:?}) for types {}",
2435 print_row(&sort_columns, i),
2436 print_row(&sort_columns, j),
2437 row_i,
2438 row_j,
2439 print_col_types(&sort_columns)
2440 );
2441 }
2442 }
2443
2444 let back = converter.convert_rows(&rows).unwrap();
2445 for (actual, expected) in back.iter().zip(&arrays) {
2446 actual.to_data().validate_full().unwrap();
2447 dictionary_eq(actual, expected)
2448 }
2449
2450 let rows = rows.try_into_binary().expect("reasonable size");
2452 let parser = converter.parser();
2453 let back = converter
2454 .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
2455 .unwrap();
2456 for (actual, expected) in back.iter().zip(&arrays) {
2457 actual.to_data().validate_full().unwrap();
2458 dictionary_eq(actual, expected)
2459 }
2460
2461 let rows = converter.from_binary(rows);
2462 let back = converter.convert_rows(&rows).unwrap();
2463 for (actual, expected) in back.iter().zip(&arrays) {
2464 actual.to_data().validate_full().unwrap();
2465 dictionary_eq(actual, expected)
2466 }
2467 }
2468 }
2469
2470 #[test]
2471 fn test_clear() {
2472 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2473 let mut rows = converter.empty_rows(3, 128);
2474
2475 let first = Int32Array::from(vec![None, Some(2), Some(4)]);
2476 let second = Int32Array::from(vec![Some(2), None, Some(4)]);
2477 let arrays = [Arc::new(first) as ArrayRef, Arc::new(second) as ArrayRef];
2478
2479 for array in arrays.iter() {
2480 rows.clear();
2481 converter.append(&mut rows, &[array.clone()]).unwrap();
2482 let back = converter.convert_rows(&rows).unwrap();
2483 assert_eq!(&back[0], array);
2484 }
2485
2486 let mut rows_expected = converter.empty_rows(3, 128);
2487 converter.append(&mut rows_expected, &arrays[1..]).unwrap();
2488
2489 for (i, (actual, expected)) in rows.iter().zip(rows_expected.iter()).enumerate() {
2490 assert_eq!(
2491 actual, expected,
2492 "For row {}: expected {:?}, actual: {:?}",
2493 i, expected, actual
2494 );
2495 }
2496 }
2497
2498 #[test]
2499 fn test_append_codec_dictionary_binary() {
2500 use DataType::*;
2501 let converter = RowConverter::new(vec![SortField::new(Dictionary(
2503 Box::new(Int32),
2504 Box::new(Binary),
2505 ))])
2506 .unwrap();
2507 let mut rows = converter.empty_rows(4, 128);
2508
2509 let keys = Int32Array::from_iter_values([0, 1, 2, 3]);
2510 let values = BinaryArray::from(vec![
2511 Some("a".as_bytes()),
2512 Some(b"b"),
2513 Some(b"c"),
2514 Some(b"d"),
2515 ]);
2516 let dict_array = DictionaryArray::new(keys, Arc::new(values));
2517
2518 rows.clear();
2519 let array = Arc::new(dict_array) as ArrayRef;
2520 converter.append(&mut rows, &[array.clone()]).unwrap();
2521 let back = converter.convert_rows(&rows).unwrap();
2522
2523 dictionary_eq(&back[0], &array);
2524 }
2525
2526 #[test]
2527 fn test_list_prefix() {
2528 let mut a = ListBuilder::new(Int8Builder::new());
2529 a.append_value([None]);
2530 a.append_value([None, None]);
2531 let a = a.finish();
2532
2533 let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
2534 let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
2535 assert_eq!(rows.row(0).cmp(&rows.row(1)), Ordering::Less);
2536 }
2537}