@@ -12,14 +12,19 @@ use lance::dataset::WriteParams;
1212use lance_encoding:: version:: LanceFileVersion ;
1313use parquet:: arrow:: arrow_reader:: ParquetRecordBatchReaderBuilder ;
1414use vortex_bench:: Format ;
15- use vortex_bench:: datasets:: taxi_data:: taxi_data_parquet;
15+ use vortex_bench:: datasets:: feature_vectors;
16+ use vortex_bench:: datasets:: nested_lists;
17+ use vortex_bench:: datasets:: nested_structs;
18+ use vortex_bench:: datasets:: taxi_data;
1619use vortex_bench:: idempotent_async;
1720use vortex_bench:: random_access:: RandomAccessor ;
21+ use vortex_bench:: random_access:: data_path;
1822
19- pub async fn taxi_data_lance ( ) -> anyhow:: Result < PathBuf > {
20- idempotent_async ( "taxi/taxi.lance" , |output_fname| async move {
21- let parquet_path = taxi_data_parquet ( ) . await ?;
22-
23+ /// Convert a parquet file to lance format.
24+ ///
25+ /// Uses `idempotent_async` to skip conversion if the output already exists.
26+ async fn parquet_to_lance_file ( parquet_path : PathBuf , lance_path : & str ) -> anyhow:: Result < PathBuf > {
27+ idempotent_async ( lance_path, |output_fname| async move {
2328 let file = File :: open ( & parquet_path) ?;
2429 let builder = ParquetRecordBatchReaderBuilder :: try_new ( file) ?;
2530 let reader = builder. build ( ) ?;
@@ -39,13 +44,58 @@ pub async fn taxi_data_lance() -> anyhow::Result<PathBuf> {
3944 . await
4045}
4146
47+ pub async fn taxi_data_lance ( ) -> anyhow:: Result < PathBuf > {
48+ let parquet_path = taxi_data:: taxi_data_parquet ( ) . await ?;
49+ parquet_to_lance_file ( parquet_path, & data_path ( taxi_data:: DATASET , Format :: Lance ) ) . await
50+ }
51+
52+ pub async fn feature_vectors_lance ( ) -> anyhow:: Result < PathBuf > {
53+ let parquet_path = feature_vectors:: feature_vectors_parquet ( ) . await ?;
54+ parquet_to_lance_file (
55+ parquet_path,
56+ & data_path ( feature_vectors:: DATASET , Format :: Lance ) ,
57+ )
58+ . await
59+ }
60+
61+ pub async fn nested_lists_lance ( ) -> anyhow:: Result < PathBuf > {
62+ let parquet_path = nested_lists:: nested_lists_parquet ( ) . await ?;
63+ parquet_to_lance_file (
64+ parquet_path,
65+ & data_path ( nested_lists:: DATASET , Format :: Lance ) ,
66+ )
67+ . await
68+ }
69+
70+ pub async fn nested_structs_lance ( ) -> anyhow:: Result < PathBuf > {
71+ let parquet_path = nested_structs:: nested_structs_parquet ( ) . await ?;
72+ parquet_to_lance_file (
73+ parquet_path,
74+ & data_path ( nested_structs:: DATASET , Format :: Lance ) ,
75+ )
76+ . await
77+ }
78+
79+ /// Random accessor for Lance format files.
80+ ///
81+ /// The dataset handle is opened at construction time and reused across `take()` calls.
4282pub struct LanceRandomAccessor {
43- path : PathBuf ,
83+ name : String ,
84+ dataset : Dataset ,
4485}
4586
4687impl LanceRandomAccessor {
47- pub fn new ( path : PathBuf ) -> Self {
48- Self { path }
88+ /// Open a Lance dataset and return a ready-to-use accessor.
89+ pub async fn open ( path : PathBuf , name : impl Into < String > ) -> anyhow:: Result < Self > {
90+ let dataset = Dataset :: open (
91+ path. to_str ( )
92+ . ok_or_else ( || anyhow ! ( "Invalid dataset path" ) ) ?,
93+ )
94+ . await ?;
95+ Ok ( Self {
96+ name : name. into ( ) ,
97+ dataset,
98+ } )
4999 }
50100}
51101
@@ -56,22 +106,12 @@ impl RandomAccessor for LanceRandomAccessor {
56106 }
57107
58108 fn name ( & self ) -> & str {
59- "random-access/lance-tokio-local-disk"
60- }
61-
62- fn path ( & self ) -> & PathBuf {
63- & self . path
109+ & self . name
64110 }
65111
66- async fn take ( & self , indices : Vec < u64 > ) -> anyhow:: Result < usize > {
67- let dataset = Dataset :: open (
68- self . path
69- . to_str ( )
70- . ok_or_else ( || anyhow ! ( "Invalid dataset path" ) ) ?,
71- )
72- . await ?;
73- let projection = ProjectionRequest :: from_schema ( dataset. schema ( ) . clone ( ) ) ; // All columns.
74- let result = dataset. take ( indices. as_slice ( ) , projection) . await ?;
112+ async fn take ( & self , indices : & [ u64 ] ) -> anyhow:: Result < usize > {
113+ let projection = ProjectionRequest :: from_schema ( self . dataset . schema ( ) . clone ( ) ) ;
114+ let result = self . dataset . take ( indices, projection) . await ?;
75115 Ok ( result. num_rows ( ) )
76116 }
77117}
0 commit comments