Skip to content

Commit 8938d2c

Browse files
committed
Small step for humanity, small step for storage.
Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
1 parent 60f325e commit 8938d2c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

110 files changed

+2919
-2327
lines changed

Cargo.lock

Lines changed: 624 additions & 557 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,13 @@ members = ["crates/*", "sql-to-dbsp-compiler/lib/*"]
33
exclude = ["sql-to-dbsp-compiler/temp"]
44
resolver = "2"
55

6-
[workspace.metadata.release]
7-
release = false
6+
[workspace.metadata.release]
7+
release = false
88

99
[profile.bench]
1010
debug = true
11+
12+
[patch.crates-io]
13+
rkyv = { git = "https://github.com/gz/rkyv.git", rev = "3d3fd86" }
14+
rust_decimal = { git = "https://github.com/gz/rust-decimal.git", rev = "77df097" }
15+
size-of = { git = "https://github.com/gz/size-of.git", rev = "3013071" }

crates/adapters/Cargo.toml

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ test-utils = ["size-of", "proptest", "proptest-derive"]
1717

1818
[dependencies]
1919
pipeline_types = { path = "../pipeline-types/" }
20-
awc = { version = "3.1.1", default-features=false, features = ["compress-gzip", "compress-brotli", "cookies", "rustls"] }
20+
awc = { version = "3.1.1", default-features = false, features = ["compress-gzip", "compress-brotli", "cookies", "rustls"] }
2121
async-stream = "0.3.5"
2222
num-traits = "0.2.15"
2323
num-derive = "0.3.3"
@@ -39,7 +39,8 @@ actix = "0.13.1"
3939
actix-web = { version = "4.4.0", default-features = false, features = ["cookies", "macros", "compress-gzip", "compress-brotli"] }
4040
mime = "0.3.16"
4141
log = "0.4.20"
42-
size-of = { version = "0.1.2", features = ["time-std"], optional = true }
42+
# Once chrono is released with `849932` chrono version needs to be updated in size-of crate:
43+
size-of = { git = "https://github.com/gz/size-of.git", rev = "3013071", features = ["time-std", "ordered-float"], optional = true }
4344
futures = { version = "0.3.28" }
4445
futures-util = { version = "0.3.28" }
4546
proptest = { version = "1.0.0", optional = true }
@@ -49,13 +50,14 @@ clap = { version = "4.0.32", features = ["derive"] }
4950
tokio = { version = "1.25.0", features = ["sync", "macros", "fs", "rt"] }
5051
prometheus = "0.13.3"
5152
utoipa = "4.1"
52-
chrono = { version = "0.4.31", features = ["clock"], default-features = false }
53+
# Go back to a version once commit containing 849932 is released:
54+
chrono = { git = "https://github.com/chronotope/chrono", rev = "849932", features = ["rkyv-64", "serde"] }
5355
colored = "2.0.0"
5456
uuid = { version = "1.3.3", features = ["v4", "std"] }
5557
webpki-roots = "0.25.1"
5658
rustls = "0.20.8"
5759
lazy_static = "1.4.0"
58-
rkyv = "0.7.42"
60+
rkyv = { git = "https://github.com/gz/rkyv.git", rev = "3d3fd86", default-features = false, features = ["std", "size_64", "extra_traits"] }
5961
csv-core = "0.1.10"
6062
rust_decimal = "1.32.0"
6163
rand = "0.8.5"
@@ -68,7 +70,7 @@ psutil = "3.2.2"
6870
actix-test = "0.1.1"
6971
bstr = { version = "0.2.1", features = ["serde1"] }
7072
serde_json = "1.0.89"
71-
size-of = { version = "0.1.2", features = ["time-std"] }
73+
size-of = { git = "https://github.com/gz/size-of.git", rev = "3013071", features = ["time-std", "ordered-float"] }
7274
tempfile = "3.3.0"
7375
proptest = "1.0.0"
7476
proptest-derive = "0.3.0"

crates/adapters/src/catalog.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::{collections::BTreeMap, sync::Arc};
22

33
use crate::{serialize_struct, static_compile::DeScalarHandle, ControllerError};
44
use anyhow::Result as AnyResult;
5-
use dbsp::InputHandle;
5+
use dbsp::{utils::Tup2, InputHandle};
66
use pipeline_types::format::json::JsonFlavor;
77
use pipeline_types::query::OutputQuery;
88
use serde::{Deserialize, Serialize};
@@ -23,7 +23,7 @@ pub enum RecordFormat {
2323

2424
// Helper type only used to serialize neighborhoods as a map vs tuple.
2525
pub struct NeighborhoodEntry<KD> {
26-
index: isize,
26+
index: i64,
2727
key: KD,
2828
}
2929

@@ -32,11 +32,11 @@ serialize_struct!(NeighborhoodEntry(KD)[2]{
3232
key["key"]: KD
3333
});
3434

35-
impl<K, KD> From<(isize, (K, ()))> for NeighborhoodEntry<KD>
35+
impl<K, KD> From<Tup2<i64, Tup2<K, ()>>> for NeighborhoodEntry<KD>
3636
where
3737
KD: From<K>,
3838
{
39-
fn from((index, (key, ())): (isize, (K, ()))) -> Self {
39+
fn from(Tup2(index, Tup2(key, ())): Tup2<i64, Tup2<K, ()>>) -> Self {
4040
Self {
4141
index,
4242
key: KD::from(key),

crates/adapters/src/static_compile/catalog.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::{
66
use dbsp::{
77
algebra::ZRingValue,
88
operator::{DelayedFeedback, FilterMap, NeighborhoodDescr},
9+
utils::Tup2,
910
CollectionHandle, DBData, DBWeight, OrdIndexedZSet, RootCircuit, Stream, UpsertHandle, ZSet,
1011
};
1112

@@ -286,14 +287,14 @@ impl Catalog {
286287

287288
// Neighborhood delta stream.
288289
let neighborhood_handle = neighborhood_stream
289-
.map(|(idx, (_k, v))| (*idx, (v.clone(), ())))
290+
.map(|Tup2(idx, Tup2(_k, v))| Tup2(*idx, Tup2(v.clone(), ())))
290291
.output();
291292

292293
// Neighborhood snapshot stream. The integral computation
293294
// is essentially free thanks to stream caching.
294295
let neighborhood_snapshot_stream = neighborhood_stream.integrate();
295296
let neighborhood_snapshot_handle = neighborhood_snapshot_stream
296-
.map(|(idx, (_k, v))| (*idx, (v.clone(), ())))
297+
.map(|Tup2(idx, Tup2(_k, v))| Tup2(*idx, Tup2(v.clone(), ())))
297298
.output_guarded(&neighborhood_descr_stream.apply(|(reset, _descr)| *reset));
298299

299300
// Handle for the quantiles query.
@@ -304,7 +305,7 @@ impl Catalog {
304305
.integrate_trace()
305306
.stream_unique_key_val_quantiles(&num_quantiles_stream);
306307
let quantiles_handle = quantiles_stream
307-
.map(|(_k, v)| v.clone())
308+
.map(|Tup2(_k, v)| v.clone())
308309
.output_guarded(&num_quantiles_stream.apply(|num_quantiles| *num_quantiles > 0));
309310

310311
let handles = OutputCollectionHandles {

crates/adapters/src/static_compile/deinput.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -681,6 +681,8 @@ mod test {
681681
rkyv::Serialize,
682682
rkyv::Deserialize,
683683
)]
684+
#[archive_attr(derive(Clone, Ord, Eq, PartialEq, PartialOrd))]
685+
#[archive(compare(PartialEq, PartialOrd))]
684686
struct TestStruct {
685687
id: i64,
686688
s: String,
@@ -696,9 +698,9 @@ mod test {
696698
Box<dyn DeCollectionHandle>,
697699
);
698700
type OutputHandles = (
699-
OutputHandle<OrdZSet<TestStruct, isize>>,
700-
OutputHandle<OrdZSet<TestStruct, isize>>,
701-
OutputHandle<OrdIndexedZSet<i64, TestStruct, isize>>,
701+
OutputHandle<OrdZSet<TestStruct, i64>>,
702+
OutputHandle<OrdZSet<TestStruct, i64>>,
703+
OutputHandle<OrdIndexedZSet<i64, TestStruct, i64>>,
702704
);
703705

704706
// Test circuit for DeScalarHandle.
@@ -781,9 +783,9 @@ mod test {
781783
fn decollection_test_circuit(workers: usize) -> (DBSPHandle, InputHandles, OutputHandles) {
782784
let (dbsp, ((zset_input, zset_output), (set_input, set_output), (map_input, map_output))) =
783785
Runtime::init_circuit(workers, |circuit| {
784-
let (zset, zset_handle) = circuit.add_input_zset::<TestStruct, isize>();
785-
let (set, set_handle) = circuit.add_input_set::<TestStruct, isize>();
786-
let (map, map_handle) = circuit.add_input_map::<i64, TestStruct, isize>();
786+
let (zset, zset_handle) = circuit.add_input_zset::<TestStruct, i64>();
787+
let (set, set_handle) = circuit.add_input_set::<TestStruct, i64>();
788+
let (map, map_handle) = circuit.add_input_map::<i64, TestStruct, i64>();
787789

788790
let zset_output = zset.output();
789791
let set_output = set.output();
@@ -837,11 +839,11 @@ mod test {
837839
(),
838840
inputs.iter().map(|v| (v.clone(), 1)).collect::<Vec<_>>(),
839841
);
840-
let map = <OrdIndexedZSet<i64, TestStruct, isize, usize>>::from_tuples(
842+
let map = <OrdIndexedZSet<i64, TestStruct, i64>>::from_tuples(
841843
(),
842844
inputs
843845
.iter()
844-
.map(|v| ((v.id, v.clone()), 1isize))
846+
.map(|v| ((v.id, v.clone()), 1i64))
845847
.collect::<Vec<_>>(),
846848
);
847849

@@ -925,11 +927,11 @@ mod test {
925927
(),
926928
inputs.iter().map(|v| (v.clone(), -1)).collect::<Vec<_>>(),
927929
);
928-
let map = <OrdIndexedZSet<i64, TestStruct, isize, usize>>::from_tuples(
930+
let map = <OrdIndexedZSet<i64, TestStruct, i64>>::from_tuples(
929931
(),
930932
inputs
931933
.iter()
932-
.map(|v| ((v.id, v.clone()), -1isize))
934+
.map(|v| ((v.id, v.clone()), -1i64))
933935
.collect::<Vec<_>>(),
934936
);
935937

crates/adapters/src/test/data.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use crate::{deserialize_without_context, serialize_struct};
2121
rkyv::Serialize,
2222
rkyv::Deserialize,
2323
)]
24+
#[archive_attr(derive(Clone, Ord, Eq, PartialEq, PartialOrd))]
25+
#[archive(compare(PartialEq, PartialOrd))]
2426
pub struct TestStruct {
2527
pub id: u32,
2628
pub b: bool,

crates/dbsp/Cargo.toml

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@ keywords = ["DBSP", "streaming", "analytics", "database"]
1010
categories = ["algorithms", "data-structures"]
1111
publish = true
1212

13-
[package.metadata.release]
14-
release = true
13+
[package.metadata.release]
14+
release = true
1515

16-
[package.metadata.docs.rs]
17-
all-features = true
18-
rustdoc-args = ["--cfg", "docsrs"]
16+
[package.metadata.docs.rs]
17+
all-features = true
18+
rustdoc-args = ["--cfg", "docsrs"]
1919

2020
[features]
2121
# Note: If you add a feature, adjust the ALMOST_ALL_FEATURES environment variable in
@@ -24,7 +24,7 @@ default = ["with-serde"]
2424
persistence = ["rocksdb", "uuid"]
2525
with-serde = ["serde"]
2626
with-csv = ["csv"]
27-
with-arbitrary = ["proptest", "proptest-derive"]
27+
persistence2 = []
2828

2929
[dependencies]
3030
num = "0.4.0"
@@ -40,7 +40,7 @@ serde = { version = "1.0", features = ["derive"], optional = true }
4040
impl-trait-for-tuples = "0.2"
4141
itertools = "0.10.5"
4242
textwrap = "0.15.0"
43-
ordered-float = { version = "3.9.1", features = ["serde", "rkyv"] }
43+
ordered-float = { version = "3.9.1", features = ["serde", "rkyv_64"] }
4444
xxhash-rust = { version = "0.8.6", features = ["xxh3"] }
4545
crossbeam = "0.8.2"
4646
rocksdb = { version = "0.21", default-features = false, features = [
@@ -51,15 +51,15 @@ arc-swap = "1.5.1"
5151
mimalloc-rust-sys = "1.7.2"
5252
rand = "0.8.5"
5353
rust_decimal = "1.29"
54-
rkyv = "0.7.42"
55-
arcstr = { version = "1.1.4" }
56-
size-of = { version = "0.1.5", features = ["hashbrown", "time-std", "xxhash-xxh3", "arcstr", "chrono", "ordered-float"] }
54+
# Go back to rkyv repo once https://github.com/rkyv/rkyv/pull/462 is merged:
55+
rkyv = { git = "https://github.com/gz/rkyv.git", rev = "3d3fd86", default-features = false, features = ["std", "size_64", "extra_traits"] }
56+
# Once chrono is released with `849932` chrono version needs to be updated in size-of crate:
57+
size-of = { git = "https://github.com/gz/size-of.git", rev = "3013071", features = ["hashbrown", "time-std", "xxhash-xxh3", "arcstr", "chrono", "ordered-float"] }
5758
tarpc = { version = "0.33.0", features = ["full"] }
5859
futures = "0.3"
5960
tokio = { version = "1.25.0", features = ["macros", "rt", "rt-multi-thread"] }
6061
log = "0.4.20"
61-
proptest-derive = { version = "0.3.0", optional = true }
62-
proptest = { version = "1.0.0", optional = true }
62+
paste = "1.0.14"
6363

6464
[dev-dependencies]
6565
csv = "1.2.2"
@@ -76,7 +76,8 @@ indicatif = "0.17.0-rc.11"
7676
clap = { version = "3.2.8", features = ["derive", "env"] }
7777
reqwest = { version = "0.11.11", features = ["blocking"] }
7878
serde_json = "1.0.87"
79-
chrono = { version = "0.4.31", features = ["rkyv", "serde"] }
79+
# Go back to a version once commit containing 849932 is released:
80+
chrono = { git = "https://github.com/chronotope/chrono", rev = "849932", features = ["rkyv-64", "serde"] }
8081

8182
[dependencies.time]
8283
version = "0.3.20"

crates/dbsp/benches/column_layer.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ macro_rules! leaf_benches {
7575
let mut group = c.benchmark_group("ordered-builder-push-merge");
7676
$(
7777
group.bench_function($name, |b| {
78-
let (left, right) = data_leaves::<usize, isize, $layer<_,_>>($size);
78+
let (left, right) = data_leaves::<u64, i64, $layer<_,_>>($size);
7979

8080
b.iter_batched(
8181
|| (left.cursor(), right.cursor()),
@@ -95,7 +95,7 @@ macro_rules! leaf_benches {
9595
group.sample_size(10);
9696
$(
9797
group.bench_function($name, |b| {
98-
let (left, right) = data_leaves::<usize, isize, $layer<_,_>>($size);
98+
let (left, right) = data_leaves::<u64, i64, $layer<_,_>>($size);
9999

100100
b.iter_batched(
101101
|| (left.clone(), right.clone()),
@@ -110,7 +110,7 @@ macro_rules! leaf_benches {
110110
group.sample_size(10);
111111
$(
112112
group.bench_function($name, |b| {
113-
let (left, right) = data_leaves::<usize, isize, $layer<_,_>>($size);
113+
let (left, right) = data_leaves::<u64, i64, $layer<_,_>>($size);
114114

115115
b.iter_batched(
116116
|| (&left, &right),
@@ -125,7 +125,7 @@ macro_rules! leaf_benches {
125125
group.sample_size(10);
126126
$(
127127
group.bench_function($name, |b| {
128-
let (left, right) = data_leaves::<usize, isize, $layer<_,_>>($size);
128+
let (left, right) = data_leaves::<u64, i64, $layer<_,_>>($size);
129129

130130
b.iter_batched(
131131
|| (left.clone(), right.clone()),
@@ -140,7 +140,7 @@ macro_rules! leaf_benches {
140140
group.sample_size(10);
141141
$(
142142
group.bench_function($name, |b| {
143-
let (left, right) = data_leaves::<usize, isize, $layer<_,_>>($size);
143+
let (left, right) = data_leaves::<u64, i64, $layer<_,_>>($size);
144144

145145
b.iter_batched(
146146
|| (left.clone(), &right),
@@ -155,7 +155,7 @@ macro_rules! leaf_benches {
155155
group.sample_size(10);
156156
$(
157157
group.bench_function($name, |b| {
158-
let leaf = data_leaf::<usize, isize, $layer<usize, isize>>($size);
158+
let leaf = data_leaf::<u64, i64, $layer<u64, i64>>($size);
159159

160160
b.iter_batched(
161161
|| leaf.clone(),
@@ -170,7 +170,7 @@ macro_rules! leaf_benches {
170170
group.sample_size(10);
171171
$(
172172
group.bench_function($name, |b| {
173-
let leaf = data_leaf::<usize, isize, $layer<usize, isize>>($size);
173+
let leaf = data_leaf::<u64, i64, $layer<u64, i64>>($size);
174174

175175
b.iter_batched(
176176
|| &leaf,

0 commit comments

Comments
 (0)