Skip to content

Commit 9b1f954

Browse files
authored
Upgrade to DataFusion 53 (#6740)
Just a routine upgrade to DataFusion 53. - Tracking issue for the release: apache/datafusion#19692 This PR includes a perf regression that is fixed by #7176, keeping them as separate PRs so it doesn't get lost in all the noise here. ~Things left here~ All done! - [x] Either wait for clflushopt/tpchgen-rs#246 to merge and release, or change how we generate data in benchmarks (either by removing the dependency on `tpchgen-arrow` or using the CLI). - [x] Go through the 53 release changelog and see if there's anything new we should take advantage of or adjust. - [x] Finish the `object_store` upgrade --------- Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent c256ccd commit 9b1f954

11 files changed

Lines changed: 1018 additions & 692 deletions

File tree

Cargo.lock

Lines changed: 939 additions & 632 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -88,16 +88,16 @@ arbitrary = "1.3.2"
8888
arc-swap = "1.8"
8989
arcref = "0.2.0"
9090
arrayref = "0.3.7"
91-
arrow-arith = "57.1"
92-
arrow-array = "57.1"
93-
arrow-buffer = "57.1"
94-
arrow-cast = "57.1"
95-
arrow-data = "57.1"
96-
arrow-ipc = "57.1"
97-
arrow-ord = "57.1"
98-
arrow-schema = "57.1"
99-
arrow-select = "57.1"
100-
arrow-string = "57.1"
91+
arrow-arith = "58"
92+
arrow-array = "58"
93+
arrow-buffer = "58"
94+
arrow-cast = "58"
95+
arrow-data = "58"
96+
arrow-ipc = "58"
97+
arrow-ord = "58"
98+
arrow-schema = "58"
99+
arrow-select = "58"
100+
arrow-string = "58"
101101
async-fs = "2.2.0"
102102
async-lock = "3.4"
103103
async-stream = "0.3.6"
@@ -125,20 +125,20 @@ cudarc = { version = "0.18.2", features = [
125125
custom-labels = "0.4.4"
126126
daachorse = "1.0.0"
127127
dashmap = "6.1.0"
128-
datafusion = { version = "52", default-features = false, features = ["sql"] }
129-
datafusion-catalog = { version = "52" }
130-
datafusion-common = { version = "52" }
131-
datafusion-common-runtime = { version = "52" }
132-
datafusion-datasource = { version = "52", default-features = false }
133-
datafusion-execution = { version = "52" }
134-
datafusion-expr = { version = "52" }
135-
datafusion-functions = { version = "52" }
136-
datafusion-physical-expr = { version = "52" }
137-
datafusion-physical-expr-adapter = { version = "52" }
138-
datafusion-physical-expr-common = { version = "52" }
139-
datafusion-physical-plan = { version = "52" }
140-
datafusion-pruning = { version = "52" }
141-
datafusion-sqllogictest = { version = "52" }
128+
datafusion = { version = "53", default-features = false, features = ["sql"] }
129+
datafusion-catalog = { version = "53" }
130+
datafusion-common = { version = "53" }
131+
datafusion-common-runtime = { version = "53" }
132+
datafusion-datasource = { version = "53", default-features = false }
133+
datafusion-execution = { version = "53" }
134+
datafusion-expr = { version = "53" }
135+
datafusion-functions = { version = "53" }
136+
datafusion-physical-expr = { version = "53" }
137+
datafusion-physical-expr-adapter = { version = "53" }
138+
datafusion-physical-expr-common = { version = "53" }
139+
datafusion-physical-plan = { version = "53" }
140+
datafusion-pruning = { version = "53" }
141+
datafusion-sqllogictest = { version = "53" }
142142
dirs = "6.0.0"
143143
divan = { package = "codspeed-divan-compat", version = "4.0.4" }
144144
enum-iterator = "2.0.0"
@@ -176,14 +176,14 @@ noodles-bgzf = "0.46.0"
176176
noodles-vcf = { version = "0.86.0", features = ["async"] }
177177
num-traits = "0.2.19"
178178
num_enum = { version = "0.7.3", default-features = false }
179-
object_store = { version = "0.12.4", default-features = false }
179+
object_store = { version = "0.13.1", default-features = false }
180180
once_cell = "1.21"
181181
oneshot = { version = "0.2.0", features = ["async"] }
182182
opentelemetry = "0.31.0"
183183
opentelemetry-otlp = "0.31.0"
184184
opentelemetry_sdk = "0.31.0"
185185
parking_lot = { version = "0.12.3", features = ["nightly"] }
186-
parquet = "57.1"
186+
parquet = "58"
187187
paste = "1.0.15"
188188
pco = "1.0.1"
189189
pin-project-lite = "0.2.15"
@@ -196,8 +196,8 @@ public-api = "0.51"
196196
pyo3 = { version = "0.28.0" }
197197
pyo3-bytes = "0.6"
198198
pyo3-log = "0.13.0"
199-
pyo3-object_store = "0.10.0"
200-
quote = "1.0.41"
199+
pyo3-object_store = "0.9.0"
200+
quote = "1.0.44"
201201
rand = "0.10.0"
202202
rand_distr = "0.6"
203203
ratatui = { version = "0.30", default-features = false }
@@ -224,7 +224,7 @@ sketches-ddsketch = "0.4.0"
224224
smol = "2.0.2"
225225
static_assertions = "1.1"
226226
strum = "0.28"
227-
syn = { version = "2.0.113", features = ["full"] }
227+
syn = { version = "2.0.117", features = ["full"] }
228228
sysinfo = "0.38.0"
229229
tabled = { version = "0.20.0", default-features = false }
230230
taffy = "0.9.0"
@@ -238,13 +238,14 @@ thiserror = "2.0.3"
238238
tokio = { version = "1.48" }
239239
tokio-stream = "0.1.17"
240240
tokio-util = "0.7.17"
241-
tpchgen = { version = "2.0.2" }
242-
tpchgen-arrow = { version = "2.0.2" }
241+
# Pull these into non-public crates to support DF 58
242+
tpchgen = { version = "2.0.2", git = "https://github.com/clflushopt/tpchgen-rs.git", rev = "438e9c2dbc25b2fff82c0efc08b3f13b5707874f" }
243+
tpchgen-arrow = { version = "2.0.2", git = "https://github.com/clflushopt/tpchgen-rs.git", rev = "438e9c2dbc25b2fff82c0efc08b3f13b5707874f" }
243244
tracing = { version = "0.1.41", default-features = false }
244245
tracing-perfetto = "0.1.5"
245246
tracing-subscriber = "0.3"
246247
url = "2.5.7"
247-
uuid = { version = "1.19", features = ["js"] }
248+
uuid = { version = "1.21", features = ["js"] }
248249
wasm-bindgen-futures = "0.4.54"
249250
xshell = "0.2.6"
250251
zigzag = "0.1.0"

vortex-bench/src/random_access/take.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use itertools::Itertools;
1414
use parquet::arrow::ParquetRecordBatchStreamBuilder;
1515
use parquet::arrow::arrow_reader::ArrowReaderMetadata;
1616
use parquet::arrow::arrow_reader::ArrowReaderOptions;
17+
use parquet::file::metadata::PageIndexPolicy;
1718
use stream::StreamExt;
1819
use tokio::fs::File;
1920
use vortex::array::Canonical;
@@ -100,7 +101,7 @@ impl ParquetRandomAccessor {
100101
/// Open a Parquet file, parse the footer, and return a ready-to-use accessor.
101102
pub async fn open(path: PathBuf, name: impl Into<String>) -> anyhow::Result<Self> {
102103
let mut file = File::open(&path).await?;
103-
let options = ArrowReaderOptions::new().with_page_index(true);
104+
let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
104105
let arrow_metadata = ArrowReaderMetadata::load_async(&mut file, options).await?;
105106

106107
let row_group_offsets = once(0)

vortex-cuda/src/pooled_read_at.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use object_store::GetOptions;
1313
use object_store::GetRange;
1414
use object_store::GetResultPayload;
1515
use object_store::ObjectStore;
16+
use object_store::ObjectStoreExt;
1617
use object_store::path::Path as ObjectPath;
1718
use vortex::array::buffer::BufferHandle;
1819
use vortex::buffer::Alignment;

vortex-datafusion/src/convert/exprs.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,8 @@ fn try_operator_from_df(value: &DFOperator) -> DFResult<Operator> {
395395
| DFOperator::AtQuestion
396396
| DFOperator::Question
397397
| DFOperator::QuestionAnd
398-
| DFOperator::QuestionPipe => {
398+
| DFOperator::QuestionPipe
399+
| DFOperator::Colon => {
399400
tracing::debug!(operator = %value, "Can't pushdown binary_operator operator");
400401
Err(exec_datafusion_err!(
401402
"Unsupported datafusion operator {value}"

vortex-datafusion/src/persistent/format.rs

Lines changed: 32 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
3232
use datafusion_datasource::file_sink_config::FileSinkConfig;
3333
use datafusion_datasource::sink::DataSinkExec;
3434
use datafusion_datasource::source::DataSourceExec;
35+
use datafusion_execution::cache::cache_manager::CachedFileMetadataEntry;
3536
use datafusion_expr::dml::InsertOp;
3637
use datafusion_physical_expr::LexRequirement;
3738
use datafusion_physical_plan::ExecutionPlan;
@@ -251,16 +252,19 @@ impl FileFormat for VortexFormat {
251252
let cache = file_metadata_cache.clone();
252253

253254
SpawnedTask::spawn(async move {
254-
// Check if we have cached metadata for this file
255-
if let Some(cached) = cache.get(&object)
256-
&& let Some(cached_vortex) =
257-
cached.as_any().downcast_ref::<CachedVortexMetadata>()
255+
// Check if we have entry metadata for this file
256+
if let Some(entry) = cache.get(&object.location)
257+
&& entry.is_valid_for(&object)
258+
&& let Some(cached_vortex) = entry
259+
.file_metadata
260+
.as_any()
261+
.downcast_ref::<CachedVortexMetadata>()
258262
{
259263
let inferred_schema = cached_vortex.footer().dtype().to_arrow_schema()?;
260264
return VortexResult::Ok((object.location, inferred_schema));
261265
}
262266

263-
// Not cached or invalid - open the file
267+
// Not entry or invalid - open the file
264268
let reader = Arc::new(ObjectStoreReadAt::new(
265269
store,
266270
object.location.clone(),
@@ -276,7 +280,8 @@ impl FileFormat for VortexFormat {
276280

277281
// Cache the metadata
278282
let cached_metadata = Arc::new(CachedVortexMetadata::new(&vxf));
279-
cache.put(&object, cached_metadata);
283+
let entry = CachedFileMetadataEntry::new(object.clone(), cached_metadata);
284+
cache.put(&object.location, entry);
280285

281286
let inferred_schema = vxf.dtype().to_arrow_schema()?;
282287
VortexResult::Ok((object.location, inferred_schema))
@@ -310,24 +315,28 @@ impl FileFormat for VortexFormat {
310315
let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
311316

312317
SpawnedTask::spawn(async move {
313-
// Try to get cached metadata first
314-
let cached_metadata = file_metadata_cache.get(&object).and_then(|cached| {
315-
cached
316-
.as_any()
317-
.downcast_ref::<CachedVortexMetadata>()
318-
.map(|m| {
319-
(
320-
m.footer().dtype().clone(),
321-
m.footer().statistics().cloned(),
322-
m.footer().row_count(),
323-
)
324-
})
325-
});
318+
// Try to get entry metadata first
319+
let cached_metadata = file_metadata_cache
320+
.get(&object.location)
321+
.filter(|entry| entry.is_valid_for(&object))
322+
.and_then(|entry| {
323+
entry
324+
.file_metadata
325+
.as_any()
326+
.downcast_ref::<CachedVortexMetadata>()
327+
.map(|m| {
328+
(
329+
m.footer().dtype().clone(),
330+
m.footer().statistics().cloned(),
331+
m.footer().row_count(),
332+
)
333+
})
334+
});
326335

327336
let (dtype, file_stats, row_count) = match cached_metadata {
328337
Some(metadata) => metadata,
329338
None => {
330-
// Not cached - open the file
339+
// Not entry - open the file
331340
let reader = Arc::new(ObjectStoreReadAt::new(
332341
store,
333342
object.location.clone(),
@@ -348,8 +357,9 @@ impl FileFormat for VortexFormat {
348357
})?;
349358

350359
// Cache the metadata
351-
let cached = Arc::new(CachedVortexMetadata::new(&vxf));
352-
file_metadata_cache.put(&object, cached);
360+
let file_metadata = Arc::new(CachedVortexMetadata::new(&vxf));
361+
let entry = CachedFileMetadataEntry::new(object.clone(), file_metadata);
362+
file_metadata_cache.put(&object.location, entry);
353363

354364
(
355365
vxf.dtype().clone(),

vortex-datafusion/src/persistent/opener.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,10 @@ impl FileOpener for VortexOpener {
185185
.with_labels(labels);
186186

187187
if let Some(file_metadata_cache) = file_metadata_cache
188-
&& let Some(file_metadata) = file_metadata_cache.get(&file.object_meta)
189-
&& let Some(vortex_metadata) = file_metadata
188+
&& let Some(entry) = file_metadata_cache.get(file.path())
189+
&& entry.is_valid_for(&file.object_meta)
190+
&& let Some(vortex_metadata) = entry
191+
.file_metadata
190192
.as_any()
191193
.downcast_ref::<CachedVortexMetadata>()
192194
{
@@ -216,7 +218,7 @@ impl FileOpener for VortexOpener {
216218
let expr_adapter = expr_adapter_factory.create(
217219
Arc::clone(&unified_file_schema),
218220
Arc::clone(&this_file_schema),
219-
);
221+
)?;
220222

221223
let simplifier = PhysicalExprSimplifier::new(&this_file_schema);
222224

vortex-io/src/object_store/read_at.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use object_store::GetOptions;
1111
use object_store::GetRange;
1212
use object_store::GetResultPayload;
1313
use object_store::ObjectStore;
14+
use object_store::ObjectStoreExt;
1415
use object_store::path::Path as ObjectPath;
1516
use vortex_array::buffer::BufferHandle;
1617
use vortex_buffer::Alignment;

vortex-io/src/object_store/write.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use futures::TryStreamExt;
99
use futures::stream::FuturesUnordered;
1010
use object_store::MultipartUpload;
1111
use object_store::ObjectStore;
12+
use object_store::ObjectStoreExt;
1213
use object_store::PutPayload;
1314
use object_store::PutResult;
1415
use object_store::path::Path;

vortex-jni/src/file.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use jni::objects::ReleaseMode;
1515
use jni::sys::jlong;
1616
use jni::sys::jobject;
1717
use object_store::ObjectStore;
18+
use object_store::ObjectStoreExt;
1819
use object_store::path::Path;
1920
use prost::Message;
2021
use url::Url;

0 commit comments

Comments
 (0)