Skip to content

Commit 4b907e5

Browse files
authored
Perf: Stream conversion from Parquet to Vortex to prevent OOM (#6228)
Updates the Parquet to Vortex conversion in `vortex-bench/src/conversions.rs` to stream the writer instead of collecting everything in memory. Without this, the Clickbench benchmark on single files (where the input Parquet compressed file is 14GB) OOM on an EC2 instance with only 32 (effective 29) GB of memory. I've confirmed that this change fixes that problem and also makes it a lot faster to convert. Also renames some functions to be more descriptive. I think there might be an argument to making these functions available in the `vortex` top-level crate too, this seems to be a generally useful thing to have if you have a bunch of Parquet files. Signed-off-by: Connor Tsui <connor.tsui20@gmail.com>
1 parent 19a5b01 commit 4b907e5

9 files changed

Lines changed: 144 additions & 60 deletions

File tree

benchmarks/compress-bench/src/vortex.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use vortex::file::WriteOptionsSessionExt;
1717
use vortex_bench::Format;
1818
use vortex_bench::SESSION;
1919
use vortex_bench::compress::Compressor;
20-
use vortex_bench::conversions::parquet_to_vortex;
20+
use vortex_bench::conversions::parquet_to_vortex_chunks;
2121

2222
/// Compressor implementation for Vortex format.
2323
pub struct VortexCompressor;
@@ -30,7 +30,7 @@ impl Compressor for VortexCompressor {
3030

3131
async fn compress(&self, parquet_path: &Path) -> Result<(u64, Duration)> {
3232
// Read the parquet file as an array stream
33-
let uncompressed = parquet_to_vortex(parquet_path.to_path_buf()).await?;
33+
let uncompressed = parquet_to_vortex_chunks(parquet_path.to_path_buf()).await?;
3434

3535
let mut buf = Vec::new();
3636
let start = Instant::now();
@@ -46,7 +46,7 @@ impl Compressor for VortexCompressor {
4646

4747
async fn decompress(&self, parquet_path: &Path) -> Result<Duration> {
4848
// First compress to get the bytes we'll decompress
49-
let uncompressed = parquet_to_vortex(parquet_path.to_path_buf()).await?;
49+
let uncompressed = parquet_to_vortex_chunks(parquet_path.to_path_buf()).await?;
5050
let mut buf = Vec::new();
5151
let mut cursor = Cursor::new(&mut buf);
5252
SESSION

benchmarks/datafusion-bench/src/main.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use vortex_bench::Engine;
2828
use vortex_bench::Format;
2929
use vortex_bench::Opt;
3030
use vortex_bench::Opts;
31-
use vortex_bench::conversions::convert_parquet_to_vortex;
31+
use vortex_bench::conversions::convert_parquet_directory_to_vortex;
3232
use vortex_bench::create_benchmark;
3333
use vortex_bench::create_output_writer;
3434
use vortex_bench::display::DisplayFormat;
@@ -125,10 +125,12 @@ async fn main() -> anyhow::Result<()> {
125125
for format in args.formats.iter() {
126126
match format {
127127
Format::OnDiskVortex => {
128-
convert_parquet_to_vortex(&base_path, CompactionStrategy::Default).await?;
128+
convert_parquet_directory_to_vortex(&base_path, CompactionStrategy::Default)
129+
.await?;
129130
}
130131
Format::VortexCompact => {
131-
convert_parquet_to_vortex(&base_path, CompactionStrategy::Compact).await?;
132+
convert_parquet_directory_to_vortex(&base_path, CompactionStrategy::Compact)
133+
.await?;
132134
}
133135
_ => {}
134136
}

benchmarks/duckdb-bench/src/main.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use vortex_bench::Engine;
1515
use vortex_bench::Format;
1616
use vortex_bench::Opt;
1717
use vortex_bench::Opts;
18-
use vortex_bench::conversions::convert_parquet_to_vortex;
18+
use vortex_bench::conversions::convert_parquet_directory_to_vortex;
1919
use vortex_bench::create_benchmark;
2020
use vortex_bench::create_output_writer;
2121
use vortex_bench::display::DisplayFormat;
@@ -99,10 +99,18 @@ fn main() -> anyhow::Result<()> {
9999
for format in args.formats.iter().copied() {
100100
match format {
101101
Format::OnDiskVortex => {
102-
convert_parquet_to_vortex(&base_path, CompactionStrategy::Default).await?;
102+
convert_parquet_directory_to_vortex(
103+
&base_path,
104+
CompactionStrategy::Default,
105+
)
106+
.await?;
103107
}
104108
Format::VortexCompact => {
105-
convert_parquet_to_vortex(&base_path, CompactionStrategy::Compact).await?;
109+
convert_parquet_directory_to_vortex(
110+
&base_path,
111+
CompactionStrategy::Compact,
112+
)
113+
.await?;
106114
}
107115
// OnDiskDuckDB tables are created during register_tables by loading from Parquet
108116
_ => {}

vortex-bench/src/bin/data-gen.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use vortex_bench::CompactionStrategy;
1919
use vortex_bench::Format;
2020
use vortex_bench::Opt;
2121
use vortex_bench::Opts;
22-
use vortex_bench::conversions::convert_parquet_to_vortex;
22+
use vortex_bench::conversions::convert_parquet_directory_to_vortex;
2323
use vortex_bench::create_benchmark;
2424
use vortex_bench::generate_duckdb_registration_sql;
2525
use vortex_bench::setup_logging_and_tracing;
@@ -68,15 +68,15 @@ async fn main() -> anyhow::Result<()> {
6868
.iter()
6969
.any(|f| matches!(f, Format::OnDiskVortex))
7070
{
71-
convert_parquet_to_vortex(&base_path, CompactionStrategy::Default).await?;
71+
convert_parquet_directory_to_vortex(&base_path, CompactionStrategy::Default).await?;
7272
}
7373

7474
if args
7575
.formats
7676
.iter()
7777
.any(|f| matches!(f, Format::VortexCompact))
7878
{
79-
convert_parquet_to_vortex(&base_path, CompactionStrategy::Compact).await?;
79+
convert_parquet_directory_to_vortex(&base_path, CompactionStrategy::Compact).await?;
8080
}
8181

8282
if args

vortex-bench/src/clickbench/data.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use vortex::error::VortexExpect;
2929

3030
use crate::Format;
3131
// Re-export for use by clickbench_benchmark
32-
pub use crate::conversions::convert_parquet_to_vortex;
32+
pub use crate::conversions::convert_parquet_directory_to_vortex;
3333
use crate::idempotent_async;
3434

3535
pub static HITS_SCHEMA: LazyLock<Schema> = LazyLock::new(|| {

vortex-bench/src/conversions.rs

Lines changed: 114 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ use std::path::PathBuf;
88
use futures::StreamExt;
99
use futures::TryStreamExt;
1010
use parquet::arrow::ParquetRecordBatchStreamBuilder;
11+
use parquet::arrow::async_reader::ParquetRecordBatchStream;
12+
use sysinfo::System;
1113
use tokio::fs::File;
1214
use tokio::fs::OpenOptions;
1315
use tokio::fs::create_dir_all;
@@ -20,6 +22,12 @@ use vortex::array::VortexSessionExecute;
2022
use vortex::array::arrays::ChunkedArray;
2123
use vortex::array::arrow::FromArrowArray;
2224
use vortex::array::builders::builder_with_capacity;
25+
use vortex::array::stream::ArrayStreamAdapter;
26+
use vortex::array::stream::ArrayStreamExt;
27+
use vortex::dtype::DType;
28+
use vortex::dtype::arrow::FromArrowType;
29+
use vortex::error::VortexError;
30+
use vortex::error::VortexResult;
2331
use vortex::file::WriteOptionsSessionExt;
2432
use vortex::session::VortexSession;
2533

@@ -28,37 +36,115 @@ use crate::Format;
2836
use crate::SESSION;
2937
use crate::utils::file::idempotent_async;
3038

31-
/// Read a Parquet file and return it as a Vortex ArrayStream.
32-
pub async fn parquet_to_vortex(parquet_path: PathBuf) -> anyhow::Result<ChunkedArray> {
39+
/// Memory budget per concurrent conversion stream in GB. This is somewhat arbitary.
40+
const MEMORY_PER_STREAM_GB: u64 = 4;
41+
42+
/// Minimum number of concurrent conversion streams.
43+
const MIN_CONCURRENCY: u64 = 1;
44+
45+
/// Maximum number of concurrent conversion streams. This is somewhat arbitary.
46+
const MAX_CONCURRENCY: u64 = 16;
47+
48+
/// Returns the available system memory in bytes.
49+
fn available_memory_bytes() -> u64 {
50+
System::new_all().available_memory()
51+
}
52+
53+
/// Calculate appropriate concurrency based on available memory.
54+
fn calculate_concurrency() -> usize {
55+
let available_gb = available_memory_bytes() / (1024 * 1024 * 1024);
56+
let concurrency = (available_gb / MEMORY_PER_STREAM_GB).clamp(MIN_CONCURRENCY, MAX_CONCURRENCY);
57+
58+
info!(
59+
"Available memory: {}GB, maximum concurrency is: {}",
60+
available_gb, concurrency
61+
);
62+
63+
concurrency as usize
64+
}
65+
66+
/// Read a Parquet file and return it as a Vortex [`ChunkedArray`].
67+
///
68+
/// Note: This loads the entire file into memory. For large files, use the streaming conversion like
69+
/// in [`parquet_to_vortex_stream`] instead.
70+
pub async fn parquet_to_vortex_chunks(parquet_path: PathBuf) -> anyhow::Result<ChunkedArray> {
3371
let file = File::open(parquet_path).await?;
34-
let mut reader = ParquetRecordBatchStreamBuilder::new(file).await?.build()?;
35-
let mut chunks = vec![];
36-
37-
while let Some(rb) = reader.next().await {
38-
let rb = rb?;
39-
let chunk = ArrayRef::from_arrow(rb, false)?;
40-
41-
// Make sure data is uncompressed and canonicalized
42-
let mut builder = builder_with_capacity(chunk.dtype(), chunk.len());
43-
chunk.append_to_builder(
44-
builder.as_mut(),
45-
&mut VortexSession::default().create_execution_ctx(),
46-
)?;
47-
let chunk = builder.finish();
48-
chunks.push(chunk);
49-
}
72+
let builder = ParquetRecordBatchStreamBuilder::new(file).await?;
73+
let reader = builder.build()?;
74+
75+
let chunks: Vec<ArrayRef> = parquet_to_vortex_stream(reader)
76+
.map(|r| r.map_err(anyhow::Error::from))
77+
.try_collect()
78+
.await?;
5079

5180
Ok(ChunkedArray::from_iter(chunks))
5281
}
5382

83+
/// Create a streaming Vortex array from a Parquet reader.
84+
///
85+
/// Streams record batches and converts them to Vortex arrays on-the-fly, avoiding loading the
86+
/// entire file into memory.
87+
pub fn parquet_to_vortex_stream(
88+
reader: ParquetRecordBatchStream<File>,
89+
) -> impl futures::Stream<Item = VortexResult<ArrayRef>> {
90+
reader.map(move |result| {
91+
result
92+
.map_err(|e| VortexError::generic(e.into()))
93+
.and_then(|rb| {
94+
let chunk = ArrayRef::from_arrow(rb, false)?;
95+
let mut builder = builder_with_capacity(chunk.dtype(), chunk.len());
96+
97+
// Canonicalize the chunk.
98+
chunk.append_to_builder(
99+
builder.as_mut(),
100+
&mut VortexSession::default().create_execution_ctx(),
101+
)?;
102+
103+
Ok(builder.finish())
104+
})
105+
})
106+
}
107+
108+
/// Convert a single Parquet file to Vortex format using streaming.
109+
///
110+
/// Streams data directly from Parquet to Vortex without loading the entire file into memory.
111+
pub async fn convert_parquet_file_to_vortex(
112+
parquet_path: &Path,
113+
output_path: &Path,
114+
compaction: CompactionStrategy,
115+
) -> anyhow::Result<()> {
116+
let file = File::open(parquet_path).await?;
117+
let builder = ParquetRecordBatchStreamBuilder::new(file).await?;
118+
let dtype = DType::from_arrow(builder.schema().as_ref());
119+
120+
let stream = parquet_to_vortex_stream(builder.build()?);
121+
122+
let mut output_file = OpenOptions::new()
123+
.write(true)
124+
.truncate(true)
125+
.create(true)
126+
.open(output_path)
127+
.await?;
128+
129+
compaction
130+
.apply_options(SESSION.write_options())
131+
.write(
132+
&mut output_file,
133+
ArrayStreamExt::boxed(ArrayStreamAdapter::new(dtype, stream)),
134+
)
135+
.await?;
136+
137+
Ok(())
138+
}
139+
54140
/// Convert all Parquet files in a directory to Vortex format.
55141
///
56-
/// This function reads Parquet files from `{input_path}/parquet/` and writes
57-
/// Vortex files to `{input_path}/vortex-file-compressed/` (for Default compaction)
58-
/// or `{input_path}/vortex-compact/` (for Compact compaction).
142+
/// This function reads Parquet files from `{input_path}/parquet/` and writes Vortex files to
143+
/// `{input_path}/vortex-file-compressed/` (for Default compaction) or
144+
/// `{input_path}/vortex-compact/` (for Compact compaction).
59145
///
60-
/// The conversion is idempotent - existing Vortex files will not be regenerated.
61-
pub async fn convert_parquet_to_vortex(
146+
/// The conversion is idempotent: existing Vortex files will not be regenerated.
147+
pub async fn convert_parquet_directory_to_vortex(
62148
input_path: &Path,
63149
compaction: CompactionStrategy,
64150
) -> anyhow::Result<()> {
@@ -72,7 +158,6 @@ pub async fn convert_parquet_to_vortex(
72158
create_dir_all(&vortex_dir).await?;
73159

74160
let parquet_inputs = fs::read_dir(&parquet_path)?.collect::<std::io::Result<Vec<_>>>()?;
75-
76161
trace!(
77162
"Found {} parquet files in {}",
78163
parquet_inputs.len(),
@@ -83,6 +168,7 @@ pub async fn convert_parquet_to_vortex(
83168
.iter()
84169
.filter(|entry| entry.path().extension().is_some_and(|e| e == "parquet"));
85170

171+
let concurrency = calculate_concurrency();
86172
futures::stream::iter(iter)
87173
.map(|dir_entry| {
88174
let filename = {
@@ -100,30 +186,18 @@ pub async fn convert_parquet_to_vortex(
100186
"Processing file '{filename}' with {:?} strategy",
101187
compaction
102188
);
103-
let chunked_array = parquet_to_vortex(parquet_file_path).await?;
104-
let mut f = OpenOptions::new()
105-
.write(true)
106-
.truncate(true)
107-
.create(true)
108-
.open(&vtx_file)
109-
.await?;
110-
111-
let write_options = compaction.apply_options(SESSION.write_options());
112-
113-
write_options
114-
.write(&mut f, chunked_array.to_array_stream())
115-
.await?;
116-
117-
anyhow::Ok(())
189+
convert_parquet_file_to_vortex(&parquet_file_path, &vtx_file, compaction)
190+
.await
118191
})
119192
.await
120193
.expect("Failed to write Vortex file")
121194
}
122195
.in_current_span(),
123196
)
124197
})
125-
.buffer_unordered(16)
198+
.buffer_unordered(concurrency)
126199
.try_collect::<Vec<_>>()
127200
.await?;
201+
128202
Ok(())
129203
}

vortex-bench/src/datasets/taxi_data.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use vortex::file::WriteOptionsSessionExt;
1515
use crate::CompactionStrategy;
1616
use crate::IdempotentPath;
1717
use crate::SESSION;
18-
use crate::conversions::parquet_to_vortex;
18+
use crate::conversions::parquet_to_vortex_chunks;
1919
use crate::datasets::Dataset;
2020
use crate::datasets::data_downloads::download_data;
2121
use crate::idempotent_async;
@@ -61,7 +61,7 @@ pub async fn taxi_data_vortex() -> Result<PathBuf> {
6161
let buf = output_fname.to_path_buf();
6262
let mut output_file = TokioFile::create(output_fname).await?;
6363

64-
let data = parquet_to_vortex(taxi_data_parquet().await?).await?;
64+
let data = parquet_to_vortex_chunks(taxi_data_parquet().await?).await?;
6565

6666
SESSION
6767
.write_options()
@@ -81,7 +81,7 @@ pub async fn taxi_data_vortex_compact() -> Result<PathBuf> {
8181
// This is the only difference to `taxi_data_vortex`.
8282
let write_options = CompactionStrategy::Compact.apply_options(SESSION.write_options());
8383

84-
let data = parquet_to_vortex(taxi_data_parquet().await?).await?;
84+
let data = parquet_to_vortex_chunks(taxi_data_parquet().await?).await?;
8585

8686
write_options
8787
.write(&mut output_file, data.to_array_stream())

vortex-bench/src/downloadable_dataset.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use vortex::file::WriteOptionsSessionExt;
1010

1111
use crate::IdempotentPath;
1212
use crate::SESSION;
13-
use crate::conversions::parquet_to_vortex;
13+
use crate::conversions::parquet_to_vortex_chunks;
1414
use crate::datasets::Dataset;
1515
use crate::datasets::data_downloads::download_data;
1616
use crate::idempotent_async;
@@ -61,7 +61,7 @@ impl Dataset for DownloadableDataset {
6161
let dir = format!("{}/", self.name()).to_data_path();
6262
let vortex = dir.join(format!("{}.vortex", self.name()));
6363

64-
let data = parquet_to_vortex(parquet).await?;
64+
let data = parquet_to_vortex_chunks(parquet).await?;
6565
idempotent_async(&vortex, async |path| -> anyhow::Result<()> {
6666
SESSION
6767
.write_options()

vortex-bench/src/public_bi.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use crate::Format;
3939
use crate::IdempotentPath;
4040
use crate::SESSION;
4141
use crate::TableSpec;
42-
use crate::conversions::parquet_to_vortex;
42+
use crate::conversions::parquet_to_vortex_chunks;
4343
use crate::datasets::Dataset;
4444
use crate::datasets::data_downloads::decompress_bz2;
4545
use crate::datasets::data_downloads::download_data;
@@ -362,7 +362,7 @@ impl PBIData {
362362
let vortex = self.get_file_path(&table.name, FileType::Vortex);
363363

364364
async move {
365-
let data = parquet_to_vortex(parquet).await?;
365+
let data = parquet_to_vortex_chunks(parquet).await?;
366366
let vortex_file =
367367
idempotent_async(&vortex, async |output_path| -> anyhow::Result<()> {
368368
SESSION

0 commit comments

Comments
 (0)