Skip to content

Commit c695aa1

Browse files
fix[layout]: canonical validity in compressor (#6797)
Decompress validity in the compressor and pass around zctl in those functions --------- Signed-off-by: Joe Isaacs <joe.isaacs@live.co.uk>
1 parent accb451 commit c695aa1

5 files changed

Lines changed: 67 additions & 4 deletions

File tree

vortex-array/src/canonical.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,7 @@ impl Executable for CanonicalValidity {
528528
})))
529529
}
530530
Canonical::List(l) => {
531+
let zctl = l.is_zero_copy_to_list();
531532
let ListViewArrayParts {
532533
elements,
533534
offsets,
@@ -537,6 +538,7 @@ impl Executable for CanonicalValidity {
537538
} = l.into_parts();
538539
Ok(CanonicalValidity(Canonical::List(unsafe {
539540
ListViewArray::new_unchecked(elements, offsets, sizes, validity.execute(ctx)?)
541+
.with_zero_copy_to_list(zctl)
540542
})))
541543
}
542544
Canonical::FixedSizeList(fsl) => {
@@ -636,6 +638,7 @@ impl Executable for RecursiveCanonical {
636638
})))
637639
}
638640
Canonical::List(l) => {
641+
let zctl = l.is_zero_copy_to_list();
639642
let ListViewArrayParts {
640643
elements,
641644
offsets,
@@ -650,6 +653,7 @@ impl Executable for RecursiveCanonical {
650653
sizes.execute::<RecursiveCanonical>(ctx)?.0.into_array(),
651654
validity.execute(ctx)?,
652655
)
656+
.with_zero_copy_to_list(zctl)
653657
})))
654658
}
655659
Canonical::FixedSizeList(fsl) => {

vortex-btrblocks/src/canonical_compressor.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@
55
66
use vortex_array::ArrayRef;
77
use vortex_array::Canonical;
8+
use vortex_array::CanonicalValidity;
89
use vortex_array::DynArray;
910
use vortex_array::IntoArray;
11+
use vortex_array::LEGACY_SESSION;
1012
use vortex_array::ToCanonical;
13+
use vortex_array::VortexSessionExecute;
1114
use vortex_array::arrays::ConstantArray;
1215
use vortex_array::arrays::ExtensionArray;
1316
use vortex_array::arrays::FixedSizeListArray;
@@ -113,7 +116,11 @@ impl BtrBlocksCompressor {
113116
/// First canonicalizes and compacts the array, then applies optimal compression schemes.
114117
pub fn compress(&self, array: &ArrayRef) -> VortexResult<ArrayRef> {
115118
// Canonicalize the array
116-
let canonical = array.to_canonical()?;
119+
// TODO(joe): receive `ctx` and use it.
120+
let canonical = array
121+
.clone()
122+
.execute::<CanonicalValidity>(&mut LEGACY_SESSION.create_execution_ctx())?
123+
.0;
117124

118125
// Compact it, removing any wasted space before we attempt to compress it
119126
let compact = canonical.compact()?;

vortex-file/tests/test_write_table.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ use futures::StreamExt;
1010
use futures::pin_mut;
1111
use vortex_array::IntoArray;
1212
use vortex_array::ToCanonical;
13+
use vortex_array::arrays::BoolArray;
14+
use vortex_array::arrays::DictArray;
15+
use vortex_array::arrays::ListViewArray;
1316
use vortex_array::arrays::PrimitiveArray;
1417
use vortex_array::arrays::StructArray;
1518
use vortex_array::dtype::FieldNames;
@@ -111,3 +114,52 @@ async fn test_file_roundtrip() {
111114
assert!(raw.nbytes() > compressed.nbytes());
112115
}
113116
}
117+
118+
/// Regression test: writing a Dict<ListView> where the list has
119+
/// Validity::Array(BoolArray) and the dict codes are nullable used to fail
120+
/// with "Array vortex.fill_null does not support serialization".
121+
#[tokio::test]
122+
async fn test_dict_listview_validity_roundtrip() {
123+
let elements = PrimitiveArray::from_iter(vec![1i32, 2, 3, 4, 5]).into_array();
124+
let offsets = PrimitiveArray::from_iter(vec![0u32, 2, 4]).into_array();
125+
let sizes = PrimitiveArray::from_iter(vec![2u32, 2, 1]).into_array();
126+
let list_validity = Validity::Array(BoolArray::from_iter([true, false, true]).into_array());
127+
let listview = ListViewArray::new(elements, offsets, sizes, list_validity).into_array();
128+
129+
let codes = PrimitiveArray::new(
130+
vortex_buffer::buffer![0u32, 0, 1, 0, 2],
131+
Validity::from_iter(vec![true, false, true, true, true]),
132+
)
133+
.into_array();
134+
135+
let dict = DictArray::new(codes, listview).into_array();
136+
137+
let data = StructArray::from_fields(&[("col", dict)])
138+
.expect("from_fields")
139+
.into_array();
140+
141+
let mut bytes = Vec::new();
142+
SESSION
143+
.write_options()
144+
.write(&mut bytes, data.to_array_stream())
145+
.await
146+
.expect("write should not fail with fill_null serialization error");
147+
148+
let bytes = ByteBuffer::from(bytes);
149+
let vxf = SESSION.open_options().open_buffer(bytes).expect("open");
150+
151+
let stream = vxf
152+
.scan()
153+
.expect("scan")
154+
.into_stream()
155+
.expect("into_stream");
156+
pin_mut!(stream);
157+
158+
let chunk = stream
159+
.next()
160+
.await
161+
.unwrap()
162+
.expect("read back should succeed");
163+
vortex_array::assert_arrays_eq!(data, chunk);
164+
assert!(stream.next().await.is_none(), "expected a single chunk");
165+
}

vortex-python/src/io.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ impl PyVortexWriteOptions {
277277
/// >>> vx.io.VortexWriteOptions.default().write(sprl, "chonky.vortex")
278278
/// >>> import os
279279
/// >>> os.path.getsize('chonky.vortex')
280-
/// 216020
280+
/// 215972
281281
/// ```
282282
///
283283
/// Wow, Vortex manages to use about two bytes per integer! So advanced. So tiny.

vortex-test/e2e/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ mod tests {
4141
.collect();
4242

4343
#[cfg(feature = "unstable_encodings")]
44-
const EXPECTED_SIZE: usize = 216052;
44+
const EXPECTED_SIZE: usize = 216004;
4545
#[cfg(not(feature = "unstable_encodings"))]
46-
const EXPECTED_SIZE: usize = 216020;
46+
const EXPECTED_SIZE: usize = 215972;
4747

4848
let sizes = futures::future::try_join_all(futures).await?;
4949
for (i, size) in sizes.iter().enumerate() {

0 commit comments

Comments
 (0)