-
Notifications
You must be signed in to change notification settings - Fork 111
Expand file tree
/
Copy pathtutorial9.rs
More file actions
127 lines (120 loc) · 2.98 KB
/
tutorial9.rs
File metadata and controls
127 lines (120 loc) · 2.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use anyhow::Result;
use chrono::{Datelike, NaiveDate};
use csv::Reader;
use dbsp::{
utils::{Tup2, Tup3},
OrdIndexedZSet, OutputHandle, RootCircuit, ZSetHandle, ZWeight,
};
use rkyv::{Archive, Serialize};
use size_of::SizeOf;
#[derive(
Clone,
Default,
Debug,
Eq,
PartialEq,
Ord,
PartialOrd,
Hash,
SizeOf,
Archive,
Serialize,
rkyv::Deserialize,
serde::Deserialize,
)]
#[archive_attr(derive(Ord, Eq, PartialEq, PartialOrd))]
struct Record {
location: String,
date: NaiveDate,
daily_vaccinations: Option<u64>,
}
#[derive(
Clone,
Default,
Debug,
Eq,
PartialEq,
Ord,
PartialOrd,
Hash,
SizeOf,
Archive,
Serialize,
rkyv::Deserialize,
serde::Deserialize,
)]
#[archive_attr(derive(Ord, Eq, PartialEq, PartialOrd))]
#[archive(compare(PartialEq, PartialOrd))]
struct VaxMonthly {
count: u64,
year: i32,
month: u8,
}
#[allow(clippy::type_complexity)]
fn build_circuit(
circuit: &mut RootCircuit,
) -> Result<(
ZSetHandle<Record>,
OutputHandle<OrdIndexedZSet<String, VaxMonthly>>,
)> {
let (input_stream, input_handle) = circuit.add_input_zset::<Record>();
let subset = input_stream.filter(|r| {
r.location == "England"
|| r.location == "Northern Ireland"
|| r.location == "Scotland"
|| r.location == "Wales"
});
let monthly_totals = subset
.map_index(|r| {
(
Tup3(r.location.clone(), r.date.year(), r.date.month() as u8),
r.daily_vaccinations.unwrap_or(0),
)
})
.aggregate_linear(|v| *v as ZWeight);
let most_vax = monthly_totals
.map_index(|(Tup3(l, y, m), sum)| {
(
l.clone(),
VaxMonthly {
count: *sum as u64,
year: *y,
month: *m,
},
)
})
.topk_desc(3);
Ok((input_handle, most_vax.output()))
}
fn main() -> Result<()> {
let (circuit, (input_handle, output_handle)) = RootCircuit::build(build_circuit)?;
let path = format!(
"{}/examples/tutorial/vaccinations.csv",
env!("CARGO_MANIFEST_DIR")
);
let mut reader = Reader::from_path(path)?;
let mut input_records = reader.deserialize();
loop {
let mut batch = Vec::new();
while batch.len() < 500 {
let Some(record) = input_records.next() else {
break;
};
batch.push(Tup2(record?, 1));
}
if batch.is_empty() {
break;
}
println!("Input {} records:", batch.len());
input_handle.append(&mut batch);
circuit.transaction()?;
output_handle
.consolidate()
.iter()
.for_each(|(l, VaxMonthly { count, year, month }, w)| {
println!(" {l:16} {year}-{month:02} {count:10}: {w:+}")
});
println!();
}
Ok(())
}