use anyhow::Result; use chrono::Datelike; use csv::Reader; use dbsp::typed_batch::IndexedZSetReader; use dbsp::utils::{Tup2, Tup3}; use dbsp::{OrdIndexedZSet, OutputHandle, RootCircuit, ZSetHandle, ZWeight}; use feldera_macros::IsNone; use rkyv::{Archive, Serialize}; use size_of::SizeOf; #[derive( Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, SizeOf, Archive, Serialize, rkyv::Deserialize, serde::Deserialize, IsNone, )] #[archive_attr(derive(Ord, Eq, PartialEq, PartialOrd))] struct Record { location: String, date: i32, daily_vaccinations: Option, } #[allow(clippy::type_complexity)] fn build_circuit( circuit: &mut RootCircuit, ) -> Result<( ZSetHandle, OutputHandle, ZWeight>>, )> { let (input_stream, input_handle) = circuit.add_input_zset::(); let subset = input_stream.filter(|r| { r.location == "England" || r.location == "Northern Ireland" || r.location == "Scotland" || r.location == "Wales" }); let monthly_totals = subset .map_index(|r| { let date = chrono::NaiveDate::from_epoch_days(r.date).unwrap(); ( Tup3(r.location.clone(), date.year(), date.month() as u8), r.daily_vaccinations.unwrap_or(0), ) }) .aggregate_linear(|v| *v as ZWeight); Ok((input_handle, monthly_totals.output())) } fn main() -> Result<()> { let (circuit, (input_handle, output_handle)) = RootCircuit::build(build_circuit)?; let path = format!( "{}/examples/tutorial/vaccinations.csv", env!("CARGO_MANIFEST_DIR") ); let mut input_records = Reader::from_path(path)? .deserialize() .map(|result| result.map(|record| Tup2(record, 1))) .collect::>, _>>()?; input_handle.append(&mut input_records); circuit.transaction()?; output_handle .consolidate() .iter() .for_each(|(Tup3(l, y, m), sum, w)| println!("{l:16} {y}-{m:02} {sum:10}: {w:+}")); Ok(()) }