Skip to content

Commit ad1f962

Browse files
committed
[adapters] Upgrade delta and iceberg dependencies
Upgrade deltalake and related dependencies: delta-rs 0.26.2 -> 0.30.2 iceberg 0.5.1 -> 0.8.0 arrow 55 -> 57 datafusion 47 -> 51 The new version of delta-rs is based on delta-kernel and has some features missing in 0.26, such as support for v2 checkpoints and deletion vectors. The latter will require additional work on the connector to support deletion vectors in follow and CDC modes, but they should work out of the box in the snapshot mode. Overall we are hoping this version will be more reliable and performant. As part of the upgrade, I disabled the `chrono::rkyv` feature. The latest version of datafusion resurfaced this bug apache/datafusion#14862. The bug is triggered by the chrono crate when it is compiled with the rkyv feature enabled. It turns out that we no longer need this feature, except in the DBSP tutorial. This commit removes the feature and modifies the tutorial to use an integer that represents the number of days since epoch instead of chrono::NaiveDate. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
1 parent 7e36561 commit ad1f962

File tree

25 files changed

+5179
-4808
lines changed

25 files changed

+5179
-4808
lines changed

Cargo.lock

Lines changed: 909 additions & 597 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@ anyhow = "1.0.91"
5353
apache-avro = "0.18.0"
5454
arc-swap = "1.5.1"
5555
arcstr = "1.2.0"
56-
arrow = "55"
57-
arrow-json = "55"
58-
arrow-digest = "55"
59-
arrow-schema = "55"
56+
arrow = "57"
57+
arrow-json = "57"
58+
arrow-digest = "57"
59+
arrow-schema = "57"
6060
ascii_table = "=4.0.2"
6161
async-channel = "2.3.1"
6262
async-nats = "0.44.2"
@@ -84,7 +84,7 @@ bytes = "1.11.1"
8484
bytestring = "1.4.0"
8585
change-detection = "1.2"
8686
cached = { version = "0.43.0", features = ["async"] }
87-
chrono = { version = "0.4.38", default-features = false }
87+
chrono = { version = "0.4.43", default-features = false }
8888
circular-queue = "0.2.6"
8989
clap = { version = "4.5", features = ["derive", "env"] }
9090
clap_complete = "4.5"
@@ -99,13 +99,18 @@ crossbeam-utils = "0.8.6"
9999
csv = "1.2.2"
100100
csv-core = "0.1.10"
101101
dashmap = "6.1.0"
102-
datafusion = "47"
102+
datafusion = "51.0"
103103
dbsp = { path = "crates/dbsp", version = "0.252.0" }
104104
dbsp_nexmark = { path = "crates/nexmark" }
105105
deadpool-postgres = "0.14.1"
106-
#deltalake = "0.26.2"
107-
deltalake = { git = "https://github.com/ryzhyk/delta-rs.git", rev = "06f42db5344f4ad8560af341352572488a4e6e06" }
108-
deltalake-catalog-unity = { git = "https://github.com/ryzhyk/delta-rs.git", rev = "06f42db5344f4ad8560af341352572488a4e6e06" }
106+
#deltalake = "0.30.2"
107+
#deltalake-catalog-unity = "0.14.1"
108+
deltalake-catalog-unity = { git = "https://github.com/ryzhyk/delta-rs.git", rev = "b24e5aaec322db0f084449e5c57f97ff4526bbdc" }
109+
deltalake = { git = "https://github.com/ryzhyk/delta-rs.git", rev = "b24e5aaec322db0f084449e5c57f97ff4526bbdc" }
110+
111+
# deltalake = { git = "https://github.com/ryzhyk/delta-rs.git", rev = "06f42db5344f4ad8560af341352572488a4e6e06" }
112+
# deltalake-catalog-unity = { git = "https://github.com/ryzhyk/delta-rs.git", rev = "06f42db5344f4ad8560af341352572488a4e6e06" }
113+
delta_kernel = "0.19.0"
109114
derive_more = { version = "1.0.0" }
110115
directories = "6.0"
111116
dirs = "5.0"
@@ -141,10 +146,10 @@ hashbrown = "0.14.2"
141146
hdrhist = "0.5"
142147
hex = "0.4.3"
143148
home = "=0.5.9"
144-
iceberg = "0.5.1"
145-
iceberg-catalog-glue = "0.5.1"
146-
iceberg-catalog-rest = "0.5.1"
147-
iceberg-datafusion = "0.5.1"
149+
iceberg = "0.8.0"
150+
iceberg-catalog-glue = "0.8.0"
151+
iceberg-catalog-rest = "0.8.0"
152+
iceberg-datafusion = "0.8.0"
148153
impl-trait-for-tuples = "0.2"
149154
indexmap = "2.7.1"
150155
indicatif = "0.17.0-rc.11"
@@ -177,7 +182,7 @@ once_cell = "1.20.2"
177182
openssl = "0.10.72"
178183
ordered-float = { version = "4.2.0", features = ["serde"] }
179184
ouroboros = "0.18.4"
180-
parquet = "55"
185+
parquet = "57"
181186
paste = "1.0.12"
182187
petgraph = "0.6.0"
183188
pg-client-config = "0.1.2"
@@ -219,7 +224,7 @@ ryu = "1.0.20"
219224
schema_registry_converter = "4.2.0"
220225
semver = "1.0.27"
221226
serde = "1.0.213"
222-
serde_arrow = { version = "0.13.4", features = ["arrow-55"] }
227+
serde_arrow = { version = "0.13.7", features = ["arrow-57"] }
223228
serde_bytes = "0.11.15"
224229
serde_json = { version = "1.0.132", features = ["arbitrary_precision"] }
225230
serde_json_path_to_error = "0.1.5"

crates/adapterlib/src/utils/datafusion.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::errors::journal::ControllerError;
22
use anyhow::{Error as AnyError, anyhow};
3+
use arrow::array::Array;
34
use datafusion::common::arrow::array::{AsArray, RecordBatch};
45
use datafusion::logical_expr::sqlparser::parser::ParserError;
56
use datafusion::prelude::{SQLOptions, SessionContext};
@@ -53,7 +54,21 @@ pub async fn execute_singleton_query(
5354
));
5455
}
5556

56-
Ok(result[0].column(0).as_string::<i32>().value(0).to_string())
57+
let column0 = result[0].column(0);
58+
59+
array_to_string(column0).ok_or_else(|| {
60+
anyhow!("internal error: cannot retrieve the output of query '{query}' as a string")
61+
})
62+
}
63+
64+
pub fn array_to_string(array: &dyn Array) -> Option<String> {
65+
if let Some(string_view_array) = array.as_string_view_opt() {
66+
Some(string_view_array.value(0).to_string())
67+
} else {
68+
array
69+
.as_string_opt::<i32>()
70+
.map(|array| array.value(0).to_string())
71+
}
5772
}
5873

5974
/// Parse expression only to validate it.
@@ -141,7 +156,7 @@ pub async fn validate_timestamp_column(
141156
// which requires storing and sorting the entire collection locally.
142157
let is_zero = execute_singleton_query(
143158
datafusion,
144-
&format!("select cast(({lateness} + {lateness}) = {lateness} as string)"),
159+
&format!("select cast((({lateness} + {lateness}) = {lateness}) as string)"),
145160
)
146161
.await
147162
.map_err(|e| ControllerError::invalid_transport_configuration(endpoint_name, &e.to_string()))?;

crates/adapters/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ proptest-derive = { workspace = true, optional = true }
107107
clap = { workspace = true }
108108
tokio = { workspace = true, features = ["sync", "macros", "fs", "rt"] }
109109
utoipa = { workspace = true }
110-
chrono = { workspace = true, features = ["rkyv-64", "serde"] }
110+
chrono = { workspace = true, features = ["serde"] }
111111
colored = { workspace = true }
112112
uuid = { workspace = true, features = ["v4", "std"] }
113113
rkyv = { workspace = true, features = ["std", "size_64"] }
@@ -198,6 +198,7 @@ backoff = { workspace = true }
198198
sentry = { workspace = true }
199199
zip = { workspace = true }
200200
smallvec = { workspace = true }
201+
delta_kernel = { workspace = true }
201202

202203
[package.metadata.cargo-machete]
203204
ignored = ["num-traits"]

0 commit comments

Comments
 (0)