@@ -8,6 +8,8 @@ use std::path::PathBuf;
88use futures:: StreamExt ;
99use futures:: TryStreamExt ;
1010use parquet:: arrow:: ParquetRecordBatchStreamBuilder ;
11+ use parquet:: arrow:: async_reader:: ParquetRecordBatchStream ;
12+ use sysinfo:: System ;
1113use tokio:: fs:: File ;
1214use tokio:: fs:: OpenOptions ;
1315use tokio:: fs:: create_dir_all;
@@ -20,6 +22,12 @@ use vortex::array::VortexSessionExecute;
2022use vortex:: array:: arrays:: ChunkedArray ;
2123use vortex:: array:: arrow:: FromArrowArray ;
2224use vortex:: array:: builders:: builder_with_capacity;
25+ use vortex:: array:: stream:: ArrayStreamAdapter ;
26+ use vortex:: array:: stream:: ArrayStreamExt ;
27+ use vortex:: dtype:: DType ;
28+ use vortex:: dtype:: arrow:: FromArrowType ;
29+ use vortex:: error:: VortexError ;
30+ use vortex:: error:: VortexResult ;
2331use vortex:: file:: WriteOptionsSessionExt ;
2432use vortex:: session:: VortexSession ;
2533
@@ -28,37 +36,115 @@ use crate::Format;
2836use crate :: SESSION ;
2937use crate :: utils:: file:: idempotent_async;
3038
31- /// Read a Parquet file and return it as a Vortex ArrayStream.
32- pub async fn parquet_to_vortex ( parquet_path : PathBuf ) -> anyhow:: Result < ChunkedArray > {
39+ /// Memory budget per concurrent conversion stream in GB. This is somewhat arbitary.
40+ const MEMORY_PER_STREAM_GB : u64 = 4 ;
41+
42+ /// Minimum number of concurrent conversion streams.
43+ const MIN_CONCURRENCY : u64 = 1 ;
44+
45+ /// Maximum number of concurrent conversion streams. This is somewhat arbitary.
46+ const MAX_CONCURRENCY : u64 = 16 ;
47+
48+ /// Returns the available system memory in bytes.
49+ fn available_memory_bytes ( ) -> u64 {
50+ System :: new_all ( ) . available_memory ( )
51+ }
52+
53+ /// Calculate appropriate concurrency based on available memory.
54+ fn calculate_concurrency ( ) -> usize {
55+ let available_gb = available_memory_bytes ( ) / ( 1024 * 1024 * 1024 ) ;
56+ let concurrency = ( available_gb / MEMORY_PER_STREAM_GB ) . clamp ( MIN_CONCURRENCY , MAX_CONCURRENCY ) ;
57+
58+ info ! (
59+ "Available memory: {}GB, maximum concurrency is: {}" ,
60+ available_gb, concurrency
61+ ) ;
62+
63+ concurrency as usize
64+ }
65+
66+ /// Read a Parquet file and return it as a Vortex [`ChunkedArray`].
67+ ///
68+ /// Note: This loads the entire file into memory. For large files, use the streaming conversion like
69+ /// in [`parquet_to_vortex_stream`] instead.
70+ pub async fn parquet_to_vortex_chunks ( parquet_path : PathBuf ) -> anyhow:: Result < ChunkedArray > {
3371 let file = File :: open ( parquet_path) . await ?;
34- let mut reader = ParquetRecordBatchStreamBuilder :: new ( file) . await ?. build ( ) ?;
35- let mut chunks = vec ! [ ] ;
36-
37- while let Some ( rb) = reader. next ( ) . await {
38- let rb = rb?;
39- let chunk = ArrayRef :: from_arrow ( rb, false ) ?;
40-
41- // Make sure data is uncompressed and canonicalized
42- let mut builder = builder_with_capacity ( chunk. dtype ( ) , chunk. len ( ) ) ;
43- chunk. append_to_builder (
44- builder. as_mut ( ) ,
45- & mut VortexSession :: default ( ) . create_execution_ctx ( ) ,
46- ) ?;
47- let chunk = builder. finish ( ) ;
48- chunks. push ( chunk) ;
49- }
72+ let builder = ParquetRecordBatchStreamBuilder :: new ( file) . await ?;
73+ let reader = builder. build ( ) ?;
74+
75+ let chunks: Vec < ArrayRef > = parquet_to_vortex_stream ( reader)
76+ . map ( |r| r. map_err ( anyhow:: Error :: from) )
77+ . try_collect ( )
78+ . await ?;
5079
5180 Ok ( ChunkedArray :: from_iter ( chunks) )
5281}
5382
83+ /// Create a streaming Vortex array from a Parquet reader.
84+ ///
85+ /// Streams record batches and converts them to Vortex arrays on-the-fly, avoiding loading the
86+ /// entire file into memory.
87+ pub fn parquet_to_vortex_stream (
88+ reader : ParquetRecordBatchStream < File > ,
89+ ) -> impl futures:: Stream < Item = VortexResult < ArrayRef > > {
90+ reader. map ( move |result| {
91+ result
92+ . map_err ( |e| VortexError :: generic ( e. into ( ) ) )
93+ . and_then ( |rb| {
94+ let chunk = ArrayRef :: from_arrow ( rb, false ) ?;
95+ let mut builder = builder_with_capacity ( chunk. dtype ( ) , chunk. len ( ) ) ;
96+
97+ // Canonicalize the chunk.
98+ chunk. append_to_builder (
99+ builder. as_mut ( ) ,
100+ & mut VortexSession :: default ( ) . create_execution_ctx ( ) ,
101+ ) ?;
102+
103+ Ok ( builder. finish ( ) )
104+ } )
105+ } )
106+ }
107+
108+ /// Convert a single Parquet file to Vortex format using streaming.
109+ ///
110+ /// Streams data directly from Parquet to Vortex without loading the entire file into memory.
111+ pub async fn convert_parquet_file_to_vortex (
112+ parquet_path : & Path ,
113+ output_path : & Path ,
114+ compaction : CompactionStrategy ,
115+ ) -> anyhow:: Result < ( ) > {
116+ let file = File :: open ( parquet_path) . await ?;
117+ let builder = ParquetRecordBatchStreamBuilder :: new ( file) . await ?;
118+ let dtype = DType :: from_arrow ( builder. schema ( ) . as_ref ( ) ) ;
119+
120+ let stream = parquet_to_vortex_stream ( builder. build ( ) ?) ;
121+
122+ let mut output_file = OpenOptions :: new ( )
123+ . write ( true )
124+ . truncate ( true )
125+ . create ( true )
126+ . open ( output_path)
127+ . await ?;
128+
129+ compaction
130+ . apply_options ( SESSION . write_options ( ) )
131+ . write (
132+ & mut output_file,
133+ ArrayStreamExt :: boxed ( ArrayStreamAdapter :: new ( dtype, stream) ) ,
134+ )
135+ . await ?;
136+
137+ Ok ( ( ) )
138+ }
139+
54140/// Convert all Parquet files in a directory to Vortex format.
55141///
56- /// This function reads Parquet files from `{input_path}/parquet/` and writes
57- /// Vortex files to `{input_path}/vortex-file-compressed/` (for Default compaction)
58- /// or `{input_path}/vortex-compact/` (for Compact compaction).
142+ /// This function reads Parquet files from `{input_path}/parquet/` and writes Vortex files to
143+ /// `{input_path}/vortex-file-compressed/` (for Default compaction) or
144+ /// `{input_path}/vortex-compact/` (for Compact compaction).
59145///
60- /// The conversion is idempotent - existing Vortex files will not be regenerated.
61- pub async fn convert_parquet_to_vortex (
146+ /// The conversion is idempotent: existing Vortex files will not be regenerated.
147+ pub async fn convert_parquet_directory_to_vortex (
62148 input_path : & Path ,
63149 compaction : CompactionStrategy ,
64150) -> anyhow:: Result < ( ) > {
@@ -72,7 +158,6 @@ pub async fn convert_parquet_to_vortex(
72158 create_dir_all ( & vortex_dir) . await ?;
73159
74160 let parquet_inputs = fs:: read_dir ( & parquet_path) ?. collect :: < std:: io:: Result < Vec < _ > > > ( ) ?;
75-
76161 trace ! (
77162 "Found {} parquet files in {}" ,
78163 parquet_inputs. len( ) ,
@@ -83,6 +168,7 @@ pub async fn convert_parquet_to_vortex(
83168 . iter ( )
84169 . filter ( |entry| entry. path ( ) . extension ( ) . is_some_and ( |e| e == "parquet" ) ) ;
85170
171+ let concurrency = calculate_concurrency ( ) ;
86172 futures:: stream:: iter ( iter)
87173 . map ( |dir_entry| {
88174 let filename = {
@@ -100,30 +186,18 @@ pub async fn convert_parquet_to_vortex(
100186 "Processing file '{filename}' with {:?} strategy" ,
101187 compaction
102188 ) ;
103- let chunked_array = parquet_to_vortex ( parquet_file_path) . await ?;
104- let mut f = OpenOptions :: new ( )
105- . write ( true )
106- . truncate ( true )
107- . create ( true )
108- . open ( & vtx_file)
109- . await ?;
110-
111- let write_options = compaction. apply_options ( SESSION . write_options ( ) ) ;
112-
113- write_options
114- . write ( & mut f, chunked_array. to_array_stream ( ) )
115- . await ?;
116-
117- anyhow:: Ok ( ( ) )
189+ convert_parquet_file_to_vortex ( & parquet_file_path, & vtx_file, compaction)
190+ . await
118191 } )
119192 . await
120193 . expect ( "Failed to write Vortex file" )
121194 }
122195 . in_current_span ( ) ,
123196 )
124197 } )
125- . buffer_unordered ( 16 )
198+ . buffer_unordered ( concurrency )
126199 . try_collect :: < Vec < _ > > ( )
127200 . await ?;
201+
128202 Ok ( ( ) )
129203}
0 commit comments