Skip to content

Commit c369135

Browse files
authored
gpu compatible write strategy, move compact strategy to use btrblocks with zstd and pco (#6322)
Signed-off-by: Onur Satici <onur@spiraldb.com>
1 parent c98aab3 commit c369135

22 files changed

Lines changed: 220 additions & 365 deletions

File tree

Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

fuzz/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ cargo-fuzz = true
2121
default = ["native"]
2222
native = ["libfuzzer-sys", "zstd", "vortex-file", "vortex/files"]
2323
wasmfuzz = []
24-
zstd = ["vortex/zstd"]
24+
zstd = ["vortex/zstd", "vortex-btrblocks/zstd", "vortex-btrblocks/pco"]
2525
cuda = ["vortex-cuda", "vortex/cuda", "tokio"]
2626

2727
[dependencies]

fuzz/fuzz_targets/file_io.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ use vortex_fuzz::CompressorStrategy;
2828
use vortex_fuzz::FuzzFileAction;
2929
use vortex_fuzz::RUNTIME;
3030
use vortex_fuzz::SESSION;
31-
use vortex_layout::layouts::compact::CompactCompressor;
3231
use vortex_utils::aliases::DefaultHashBuilder;
3332
use vortex_utils::aliases::hash_set::HashSet;
3433

@@ -62,7 +61,7 @@ fuzz_target!(|fuzz: FuzzFileAction| -> Corpus {
6261
CompressorStrategy::Default => SESSION.write_options(),
6362
CompressorStrategy::Compact => {
6463
let strategy = WriteStrategyBuilder::default()
65-
.with_compressor(CompactCompressor::default())
64+
.with_compact_encodings()
6665
.build();
6766
SESSION.write_options().with_strategy(strategy)
6867
}

fuzz/src/array/mod.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -515,15 +515,22 @@ fn random_action_from_list(
515515
/// Compress an array using the given strategy.
516516
#[cfg(feature = "zstd")]
517517
pub fn compress_array(array: &dyn Array, strategy: CompressorStrategy) -> ArrayRef {
518-
use vortex_layout::layouts::compact::CompactCompressor;
518+
use vortex_btrblocks::BtrBlocksCompressorBuilder;
519+
use vortex_btrblocks::FloatCode;
520+
use vortex_btrblocks::IntCode;
521+
use vortex_btrblocks::StringCode;
519522

520523
match strategy {
521524
CompressorStrategy::Default => BtrBlocksCompressor::default()
522525
.compress(array)
523526
.vortex_expect("BtrBlocksCompressor compress should succeed in fuzz test"),
524-
CompressorStrategy::Compact => CompactCompressor::default()
527+
CompressorStrategy::Compact => BtrBlocksCompressorBuilder::default()
528+
.include_string([StringCode::Zstd])
529+
.include_int([IntCode::Pco])
530+
.include_float([FloatCode::Pco])
531+
.build()
525532
.compress(array)
526-
.vortex_expect("CompactCompressor compress should succeed in fuzz test"),
533+
.vortex_expect("Compact compress should succeed in fuzz test"),
527534
}
528535
}
529536

vortex-bench/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ use vortex::error::VortexExpect;
3030
use vortex::error::vortex_err;
3131
use vortex::file::VortexWriteOptions;
3232
use vortex::file::WriteStrategyBuilder;
33-
use vortex::layout::layouts::compact::CompactCompressor;
3433
use vortex::utils::aliases::hash_map::HashMap;
3534

3635
pub mod benchmark;
@@ -215,7 +214,7 @@ impl CompactionStrategy {
215214
match self {
216215
CompactionStrategy::Compact => options.with_strategy(
217216
WriteStrategyBuilder::default()
218-
.with_compressor(CompactCompressor::default())
217+
.with_compact_encodings()
219218
.build(),
220219
),
221220
CompactionStrategy::Default => options,

vortex-btrblocks/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ enum-iterator = { workspace = true }
1818
getrandom_v03 = { workspace = true }
1919
itertools = { workspace = true }
2020
num-traits = { workspace = true }
21+
pco = { workspace = true, optional = true }
2122
rand = { workspace = true }
2223
rustc-hash = { workspace = true }
2324
tracing = { workspace = true }
@@ -31,12 +32,14 @@ vortex-error = { workspace = true }
3132
vortex-fastlanes = { workspace = true }
3233
vortex-fsst = { workspace = true }
3334
vortex-mask = { workspace = true }
35+
vortex-pco = { workspace = true, optional = true }
3436
vortex-runend = { workspace = true }
3537
vortex-scalar = { workspace = true }
3638
vortex-sequence = { workspace = true }
3739
vortex-sparse = { workspace = true }
3840
vortex-utils = { workspace = true }
3941
vortex-zigzag = { workspace = true }
42+
vortex-zstd = { workspace = true, optional = true }
4043

4144
[dev-dependencies]
4245
divan = { workspace = true }
@@ -47,6 +50,8 @@ vortex-array = { workspace = true, features = ["_test-harness"] }
4750
[features]
4851
# This feature enabled unstable encodings for which we don't guarantee stability.
4952
unstable_encodings = []
53+
pco = ["dep:pco", "dep:vortex-pco"]
54+
zstd = ["dep:vortex-zstd"]
5055

5156
[lints]
5257
workspace = true

vortex-btrblocks/src/builder.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,21 @@ pub struct BtrBlocksCompressorBuilder {
5151
impl Default for BtrBlocksCompressorBuilder {
5252
fn default() -> Self {
5353
Self {
54-
int_schemes: ALL_INT_SCHEMES.iter().copied().collect(),
55-
float_schemes: ALL_FLOAT_SCHEMES.iter().copied().collect(),
56-
string_schemes: ALL_STRING_SCHEMES.iter().copied().collect(),
54+
int_schemes: ALL_INT_SCHEMES
55+
.iter()
56+
.copied()
57+
.filter(|s| s.code() != IntCode::Pco)
58+
.collect(),
59+
float_schemes: ALL_FLOAT_SCHEMES
60+
.iter()
61+
.copied()
62+
.filter(|s| s.code() != FloatCode::Pco)
63+
.collect(),
64+
string_schemes: ALL_STRING_SCHEMES
65+
.iter()
66+
.copied()
67+
.filter(|s| s.code() != StringCode::Zstd)
68+
.collect(),
5769
}
5870
}
5971
}

vortex-btrblocks/src/compressor/float/mod.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ pub const ALL_FLOAT_SCHEMES: &[&dyn FloatScheme] = &[
7777
&DictScheme,
7878
&NullDominated,
7979
&RLE_FLOAT_SCHEME,
80+
#[cfg(feature = "pco")]
81+
&PcoScheme,
8082
];
8183

8284
/// [`Compressor`] for floating-point numbers.
@@ -142,6 +144,8 @@ pub enum FloatCode {
142144
Rle,
143145
/// Sparse encoding for null-dominated arrays.
144146
Sparse,
147+
/// Pco (pcodec) compression for floats.
148+
Pco,
145149
}
146150

147151
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
@@ -162,6 +166,11 @@ struct DictScheme;
162166
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
163167
pub struct NullDominated;
164168

169+
/// Pco (pcodec) compression for floats.
170+
#[cfg(feature = "pco")]
171+
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
172+
pub struct PcoScheme;
173+
165174
/// Configuration for float RLE compression.
166175
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
167176
pub struct FloatRLEConfig;
@@ -520,6 +529,31 @@ impl Scheme for NullDominated {
520529
}
521530
}
522531

532+
#[cfg(feature = "pco")]
533+
impl Scheme for PcoScheme {
534+
type StatsType = FloatStats;
535+
type CodeType = FloatCode;
536+
537+
fn code(&self) -> FloatCode {
538+
FloatCode::Pco
539+
}
540+
541+
fn compress(
542+
&self,
543+
_compressor: &BtrBlocksCompressor,
544+
stats: &Self::StatsType,
545+
_ctx: CompressorContext,
546+
_excludes: &[FloatCode],
547+
) -> VortexResult<ArrayRef> {
548+
Ok(vortex_pco::PcoArray::from_primitive(
549+
stats.source(),
550+
pco::DEFAULT_COMPRESSION_LEVEL,
551+
8192,
552+
)?
553+
.into_array())
554+
}
555+
}
556+
523557
#[cfg(test)]
524558
mod tests {
525559

vortex-btrblocks/src/compressor/integer/mod.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ pub const ALL_INT_SCHEMES: &[&dyn IntegerScheme] = &[
6262
&RunEndScheme,
6363
&SequenceScheme,
6464
&RLE_INTEGER_SCHEME,
65+
#[cfg(feature = "pco")]
66+
&PcoScheme,
6567
];
6668

6769
/// [`Compressor`] for signed and unsigned integers.
@@ -156,6 +158,8 @@ pub enum IntCode {
156158
Sequence,
157159
/// RLE encoding - generic run-length encoding.
158160
Rle,
161+
/// Pco (pcodec) compression for integers.
162+
Pco,
159163
}
160164

161165
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
@@ -188,6 +192,11 @@ pub struct RunEndScheme;
188192
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
189193
pub struct SequenceScheme;
190194

195+
/// Pco (pcodec) compression for integers.
196+
#[cfg(feature = "pco")]
197+
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
198+
pub struct PcoScheme;
199+
191200
/// Threshold for the average run length in an array before we consider run-end encoding.
192201
const RUN_END_THRESHOLD: u32 = 4;
193202

@@ -818,6 +827,49 @@ impl Scheme for SequenceScheme {
818827
}
819828
}
820829

830+
#[cfg(feature = "pco")]
831+
impl Scheme for PcoScheme {
832+
type StatsType = IntegerStats;
833+
type CodeType = IntCode;
834+
835+
fn code(&self) -> IntCode {
836+
IntCode::Pco
837+
}
838+
839+
fn expected_compression_ratio(
840+
&self,
841+
compressor: &BtrBlocksCompressor,
842+
stats: &Self::StatsType,
843+
ctx: CompressorContext,
844+
excludes: &[IntCode],
845+
) -> VortexResult<f64> {
846+
// Pco does not support I8 or U8.
847+
if matches!(
848+
stats.src.ptype(),
849+
vortex_dtype::PType::I8 | vortex_dtype::PType::U8
850+
) {
851+
return Ok(0.0);
852+
}
853+
854+
self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes)
855+
}
856+
857+
fn compress(
858+
&self,
859+
_compressor: &BtrBlocksCompressor,
860+
stats: &Self::StatsType,
861+
_ctx: CompressorContext,
862+
_excludes: &[IntCode],
863+
) -> VortexResult<ArrayRef> {
864+
Ok(vortex_pco::PcoArray::from_primitive(
865+
stats.source(),
866+
pco::DEFAULT_COMPRESSION_LEVEL,
867+
8192,
868+
)?
869+
.into_array())
870+
}
871+
}
872+
821873
#[cfg(test)]
822874
mod tests {
823875
use std::iter;

vortex-btrblocks/src/compressor/string.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@ pub const ALL_STRING_SCHEMES: &[&dyn StringScheme] = &[
124124
&FSSTScheme,
125125
&ConstantScheme,
126126
&NullDominated,
127+
#[cfg(feature = "zstd")]
128+
&ZstdScheme,
127129
];
128130

129131
/// [`Compressor`] for strings.
@@ -209,6 +211,11 @@ pub struct ConstantScheme;
209211
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
210212
pub struct NullDominated;
211213

214+
/// Zstd compression without dictionaries (nvCOMP compatible).
215+
#[cfg(feature = "zstd")]
216+
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
217+
pub struct ZstdScheme;
218+
212219
/// Unique identifier for string compression schemes.
213220
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Sequence, Ord, PartialOrd)]
214221
pub enum StringCode {
@@ -222,6 +229,8 @@ pub enum StringCode {
222229
Constant,
223230
/// Sparse encoding for null-dominated arrays.
224231
Sparse,
232+
/// Zstd compression without dictionaries.
233+
Zstd,
225234
}
226235

227236
impl Scheme for UncompressedScheme {
@@ -502,6 +511,30 @@ impl Scheme for NullDominated {
502511
}
503512
}
504513

514+
#[cfg(feature = "zstd")]
515+
impl Scheme for ZstdScheme {
516+
type StatsType = StringStats;
517+
type CodeType = StringCode;
518+
519+
fn code(&self) -> StringCode {
520+
StringCode::Zstd
521+
}
522+
523+
fn compress(
524+
&self,
525+
_compressor: &BtrBlocksCompressor,
526+
stats: &Self::StatsType,
527+
_ctx: CompressorContext,
528+
_excludes: &[StringCode],
529+
) -> VortexResult<ArrayRef> {
530+
let compacted = stats.source().compact_buffers()?;
531+
Ok(
532+
vortex_zstd::ZstdArray::from_var_bin_view_without_dict(&compacted, 3, 8192)?
533+
.into_array(),
534+
)
535+
}
536+
}
537+
505538
#[cfg(test)]
506539
mod tests {
507540
use vortex_array::arrays::VarBinViewArray;

0 commit comments

Comments
 (0)