-
Notifications
You must be signed in to change notification settings - Fork 111
Expand file tree
/
Copy pathcatalog.rs
More file actions
945 lines (785 loc) · 31.3 KB
/
catalog.rs
File metadata and controls
945 lines (785 loc) · 31.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
use std::any::Any;
use std::collections::HashSet;
use std::fmt::{Debug, Formatter};
use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, Mutex};
use anyhow::Result as AnyResult;
#[cfg(feature = "with-avro")]
use apache_avro::{
Schema as AvroSchema,
schema::{Name as AvroName, NamesRef},
types::Value as AvroValue,
};
use arrow::record_batch::RecordBatch;
use dbsp::circuit::NodeId;
use dbsp::dynamic::{ClonableTrait, DynData, DynVec, Factory};
use dbsp::operator::StagedBuffers;
use dyn_clone::DynClone;
use feldera_sqllib::Variant;
use feldera_types::format::csv::CsvParserConfig;
use feldera_types::format::json::JsonFlavor;
use feldera_types::program_schema::{Relation, SqlIdentifier};
use feldera_types::serde_with_context::SqlSerdeConfig;
use serde_arrow::ArrayBuilder;
#[cfg(feature = "with-avro")]
use std::collections::HashMap;
use crate::errors::controller::ControllerError;
use crate::format::InputBuffer;
use crate::preprocess::PreprocessorRegistry;
/// Descriptor that specifies the format in which records are received
/// or into which they should be encoded before sending.
#[derive(Clone)]
pub enum RecordFormat {
// TODO: Support different JSON encodings:
// * Map - the default encoding
// * Array - allow the subset and the order of columns to be configurable
// * Raw - Only applicable to single-column tables. Input records contain
// raw encoding of this column only. This is particularly useful for
// tables that store raw JSON or binary data to be parsed using SQL.
Json(JsonFlavor),
Csv(CsvParserConfig),
Parquet(SqlSerdeConfig),
#[cfg(feature = "with-avro")]
Avro,
Raw(String),
}
/// An input handle that deserializes and buffers records.
///
/// A trait for a type that wraps a [`ZSetHandle`](`dbsp::ZSetHandle`) or an
/// [`MapHandle`](`dbsp::MapHandle`) and collects serialized relational data for
/// the associated input stream. The client passes a byte array with a
/// serialized data record (e.g., in JSON or CSV format) to
/// [`insert`](`Self::insert`), [`delete`](`Self::delete`), and
/// [`update`](`Self::update`) methods. The record gets deserialized into the
/// strongly typed representation expected by the input stream.
///
/// Instances of this trait are created by calling
/// [`DeCollectionHandle::configure_deserializer`].
/// The data format accepted by the handle is determined
/// by the `record_format` argument passed to this method.
///
/// The input handle internally buffers the deserialized records. Use the
/// `InputBuffer` supertrait to push them to the circuit or extract them for
/// later use.
pub trait DeCollectionStream: Send + Sync + InputBuffer {
/// Buffer a new insert update.
///
/// `metadata` contains optional metadata attached by the transport adapter or parser,
/// such as Kafka headers, topic name, etc. This metadata is passed as an aux argument to
/// `DeserializeWithContext::deserialize_with_context_aux`.
///
/// Returns an error if deserialization fails, i.e., the serialized
/// representation is corrupted or does not match the value type of
/// the underlying input stream.
fn insert(&mut self, data: &[u8], metadata: &Option<Variant>) -> AnyResult<()>;
/// Buffer a new delete update.
///
/// The `data` argument contains a serialized record whose
/// type depends on the underlying input stream: streams created by
/// [`RootCircuit::add_input_zset`](`dbsp::RootCircuit::add_input_zset`)
/// and [`RootCircuit::add_input_set`](`dbsp::RootCircuit::add_input_set`)
/// methods support deletion by value, hence the serialized record must
/// match the value type of the stream. Streams created with
/// [`RootCircuit::add_input_map`](`dbsp::RootCircuit::add_input_map`)
/// support deletion by key, so the serialized record must match the key
/// type of the stream.
///
/// The record gets deserialized and pushed to the underlying input stream
/// handle as a delete update.
///
/// `metadata` contains optional metadata attached by the transport adapter or parser,
/// such as Kafka headers, topic name, etc. This metadata is passed as an aux argument to
/// `DeserializeWithContext::deserialize_with_context_aux`.
///
/// Returns an error if deserialization fails, i.e., the serialized
/// representation is corrupted or does not match the value or key
/// type of the underlying input stream.
fn delete(&mut self, data: &[u8], metadata: &Option<Variant>) -> AnyResult<()>;
/// Buffer a new update that will modify an existing record.
///
/// `metadata` contains optional metadata attached by the transport adapter or parser,
/// such as Kafka headers, topic name, etc. This metadata is passed as an aux argument to
/// `DeserializeWithContext::deserialize_with_context_aux`.
///
/// This method can only be called on streams created with
/// [`RootCircuit::add_input_map`](`dbsp::RootCircuit::add_input_map`)
/// and will fail on other streams. The serialized record must match
/// the update type of this stream, specified as a type argument to
/// `Catalog::register_input_map`.
fn update(&mut self, data: &[u8], metadata: &Option<Variant>) -> AnyResult<()>;
/// Reserve space for at least `reservation` more updates in the
/// internal input buffer.
///
/// Reservations are not required but can be used when the number
/// of inputs is known ahead of time to reduce reallocations.
fn reserve(&mut self, reservation: usize);
/// Removes any updates beyond the first `len`.
fn truncate(&mut self, len: usize);
/// Stages all of the `buffers`, which must have been obtained from a
/// [Parser] for this stream, into a [StagedBuffers] that may later be used
/// to push the collected data into the circuit. See [StagedBuffers] for
/// more information.
///
/// [Parser]: crate::format::Parser
fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers>;
/// Create a new deserializer with the same configuration connected to the
/// same input stream. The new deserializer has an independent buffer that
/// is initially empty.
fn fork(&self) -> Box<dyn DeCollectionStream>;
}
/// Like `DeCollectionStream`, but deserializes Arrow-encoded records before pushing them to a
/// stream.
pub trait ArrowStream: InputBuffer + Send + Sync {
/// Buffer a new batch of insert updates.
///
/// `metadata` contains optional metadata attached by the transport adapter or parser,
/// such as Kafka headers, topic name, etc. This metadata is passed as an aux argument to
/// `DeserializeWithContext::deserialize_with_context_aux` for each deserialized record.
fn insert(&mut self, data: &RecordBatch, metadata: &Option<Variant>) -> AnyResult<()>;
/// Buffer a new batch of delete updates.
///
/// `metadata` contains optional metadata attached by the transport adapter or parser,
/// such as Kafka headers, topic name, etc. This metadata is passed as an aux argument to
/// `DeserializeWithContext::deserialize_with_context_aux` for each deserialized record.
fn delete(&mut self, data: &RecordBatch, metadata: &Option<Variant>) -> AnyResult<()>;
/// Insert records in `data` with polarities from the `polarities` array.
///
/// `polarities` must be the same length as `data`.
fn insert_with_polarities(
&mut self,
data: &RecordBatch,
polarities: &[bool],
metadata: &Option<Variant>,
) -> AnyResult<()>;
/// Create a new deserializer with the same configuration connected to
/// the same input stream.
fn fork(&self) -> Box<dyn ArrowStream>;
/// Stages all of the `buffers`, which must have been obtained from a
/// [Parser] for this stream, into a [StagedBuffers] that may later be used
/// to push the collected data into the circuit. See [StagedBuffers] for
/// more information.
///
/// [Parser]: crate::format::Parser
fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers>;
}
#[cfg(feature = "with-avro")]
pub type AvroSchemaRefs = HashMap<AvroName, AvroSchema>;
/// Like `DeCollectionStream`, but deserializes Avro-encoded records before pushing them to a
/// stream.
#[cfg(feature = "with-avro")]
pub trait AvroStream: InputBuffer + Send + Sync {
/// Buffer a new insert update.
///
/// # Arguments
///
/// * `schema` - The Avro schema to use for deserialization.
/// * `refs` - A map of named schema references that may be used to resolve references within `schema`.
/// * `metadata` - Optional metadata attached by the transport adapter or parser,
/// such as Kafka headers, topic name, etc. This metadata is passed as an aux argument to
/// `DeserializeWithContext::deserialize_with_context_aux`.
fn insert(
&mut self,
data: &AvroValue,
schema: &AvroSchema,
refs: &AvroSchemaRefs,
n_bytes: usize,
metadata: &Option<Variant>,
) -> AnyResult<()>;
fn delete(
&mut self,
data: &AvroValue,
schema: &AvroSchema,
refs: &AvroSchemaRefs,
n_bytes: usize,
metadata: &Option<Variant>,
) -> AnyResult<()>;
/// Create a new deserializer with the same configuration connected to
/// the same input stream.
fn fork(&self) -> Box<dyn AvroStream>;
/// Stages all of the `buffers`, which must have been obtained from a
/// [Parser] for this stream, into a [StagedBuffers] that may later be used
/// to push the collected data into the circuit. See [StagedBuffers] for
/// more information.
///
/// [Parser]: crate::format::Parser
fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers>;
}
/// A handle to an input collection that can be used to feed serialized data
/// to the collection.
pub trait DeCollectionHandle: Send + Sync {
/// Create a [`DeCollectionStream`] object to parse input data encoded
/// using the format specified in `RecordFormat`.
fn configure_deserializer(
&self,
record_format: RecordFormat,
) -> Result<Box<dyn DeCollectionStream>, ControllerError>;
/// Create an `ArrowStream` object to parse Arrow-encoded input data.
fn configure_arrow_deserializer(
&self,
config: SqlSerdeConfig,
) -> Result<Box<dyn ArrowStream>, ControllerError>;
/// Create an `AvroStream` object to parse Avro-encoded input data.
#[cfg(feature = "with-avro")]
fn configure_avro_deserializer(&self) -> Result<Box<dyn AvroStream>, ControllerError>;
fn fork(&self) -> Box<dyn DeCollectionHandle>;
}
/// A type-erased batch whose contents can be serialized.
///
/// This is a wrapper around the DBSP `BatchReader` trait that returns a cursor that
/// yields `erased_serde::Serialize` trait objects that can be used to serialize
/// the contents of the batch without knowing its key and value types.
// The reason we need the `Sync` trait below is so that we can wrap batches
// in `Arc` and send the same batch to multiple output endpoint threads.
pub trait SerBatchReader: 'static + Send + Sync {
/// Number of keys in the batch.
fn key_count(&self) -> usize;
/// Number of tuples in the batch.
fn len(&self) -> usize;
fn is_empty(&self) -> bool {
self.len() == 0
}
/// Create a cursor over the batch that yields records
/// formatted using the specified format.
fn cursor<'a>(
&'a self,
record_format: RecordFormat,
) -> Result<Box<dyn SerCursor + Send + 'a>, ControllerError>;
/// Returns all batches in this reader.
///
/// A reader can wrap a single batch or a spine or a spine snapshot. This method extracts
/// all batches from the reader.
fn batches(&self) -> Vec<Arc<dyn SerBatch>>;
fn snapshot(&self) -> Arc<dyn SerBatchReader>;
fn keys_factory(&self) -> &'static dyn Factory<DynVec<DynData>>;
fn key_factory(&self) -> &'static dyn Factory<DynData>;
fn sample_keys(&self, sample_size: usize, sample: &mut DynVec<DynData>);
fn partition_keys(&self, num_partitions: usize, bounds: &mut DynVec<DynData>);
}
impl Debug for dyn SerBatchReader {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut cursor = self
.cursor(RecordFormat::Json(Default::default()))
.map_err(|_| std::fmt::Error)?;
let mut key = Vec::new();
let mut val = Vec::new();
while cursor.key_valid() {
cursor
.serialize_key(&mut key)
.map_err(|_| std::fmt::Error)?;
write!(f, "{}=>{{", String::from_utf8_lossy(&key))?;
while cursor.val_valid() {
cursor
.serialize_val(&mut val)
.map_err(|_| std::fmt::Error)?;
write!(
f,
"{}=>{}, ",
String::from_utf8_lossy(&val),
cursor.weight()
)?;
val.clear();
cursor.step_val();
}
write!(f, "}}, ")?;
key.clear();
cursor.step_key();
}
Ok(())
}
}
impl Debug for dyn SerBatch {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
self.as_batch_reader().fmt(f)
}
}
/// A type-erased `Batch`.
pub trait SerBatch: SerBatchReader {
/// Convert to `Arc<Any>`, which can then be downcast to a reference
/// to a concrete batch type.
fn as_any(self: Arc<Self>) -> Arc<dyn Any + Sync + Send>;
/// Merge `self` with all batches in `other`.
fn merge(self: Arc<Self>, other: Vec<Arc<dyn SerBatch>>) -> Arc<dyn SerBatch>;
fn as_batch_reader(&self) -> &dyn SerBatchReader;
fn arc_as_batch_reader(self: Arc<Self>) -> Arc<dyn SerBatchReader>;
/// Convert batch into a trace with identical contents.
fn into_trace(self: Arc<Self>) -> Box<dyn SerTrace>;
}
/// A type-erased `Trace`.
pub trait SerTrace: SerBatchReader {
/// Insert a batch into the trace.
fn insert(&mut self, batch: Arc<dyn SerBatch>);
fn as_batch_reader(&self) -> &dyn SerBatchReader;
}
#[doc(hidden)]
pub struct SplitCursorBuilder {
batch: Arc<dyn SerBatchReader>,
start_key: Box<DynData>,
end_key: Option<Box<DynData>>,
format: RecordFormat,
}
impl SplitCursorBuilder {
/// Create a [`SplitCursorBuilder`] for partition `index` given a batch,
/// pre-computed partition `bounds` (as returned by
/// [`SerBatchReader::partition_keys`]), and a record `format`.
///
/// `bounds` contains `N-1` boundary keys for `N` partitions.
/// Partition 0 spans from the start of the batch to `bounds[0]`,
/// partition `i` spans from `bounds[i-1]` to `bounds[i]`, and the last
/// partition spans from `bounds[N-2]` to the end of the batch.
///
/// Returns `None` if the partition is empty (the cursor has no key at the
/// start position).
pub fn from_bounds(
batch: Arc<dyn SerBatchReader>,
bounds: &DynVec<DynData>,
index: usize,
format: RecordFormat,
) -> Option<Self> {
let start_bound = if index == 0 {
None
} else if index <= bounds.len() {
Some(bounds.index(index - 1).as_data())
} else {
None
};
let end_bound = if index < bounds.len() {
Some(bounds.index(index).as_data())
} else {
None
};
let start_key = {
let mut cursor = batch.cursor(format.clone()).unwrap();
// Seek to start. If None, the cursor starts at the beginning.
if let Some(start_bound) = start_bound {
cursor.seek_key_exact(start_bound);
}
// Clone the actual key the cursor landed on.
cursor.get_key().map(|s| {
let mut key = batch.key_factory().default_box();
s.clone_to(key.as_mut());
key
})
}?;
let end_key = end_bound.map(|e| {
let mut key = batch.key_factory().default_box();
e.clone_to(key.as_mut());
key
});
Some(SplitCursorBuilder {
batch,
start_key,
end_key,
format,
})
}
pub fn build<'a>(&'a self) -> SplitCursor<'a> {
let mut cursor = self.batch.cursor(self.format.clone()).unwrap();
// Cannot use `seek_key_exact` here, so we can single-step the cursor afterward.
cursor.seek_key(self.start_key.as_data());
SplitCursor {
cursor,
start_key: self.start_key.clone(),
end_key: self.end_key.clone(),
}
}
}
#[doc(hidden)]
pub struct SplitCursor<'a> {
cursor: Box<dyn SerCursor + 'a>,
start_key: Box<DynData>,
end_key: Option<Box<DynData>>,
}
impl SplitCursor<'_> {
fn finished(&self) -> bool {
if let Some(ref end_key) = self.end_key
&& let Some(current_key) = self.cursor.get_key()
{
return current_key >= end_key.as_data();
}
false
}
}
impl SerCursor for SplitCursor<'_> {
fn key_valid(&self) -> bool {
self.cursor.key_valid() && !self.finished()
}
fn val_valid(&self) -> bool {
self.cursor.val_valid()
}
fn key(&self) -> &DynData {
self.cursor.key()
}
fn get_key(&self) -> Option<&DynData> {
if !self.key_valid() {
return None;
}
self.cursor.get_key()
}
fn serialize_key(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
self.cursor.serialize_key(dst)
}
fn key_to_json(&mut self) -> AnyResult<serde_json::Value> {
self.cursor.key_to_json()
}
fn serialize_key_fields(
&mut self,
fields: &HashSet<String>,
dst: &mut Vec<u8>,
) -> AnyResult<()> {
self.cursor.serialize_key_fields(fields, dst)
}
fn serialize_key_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()> {
self.cursor.serialize_key_to_arrow(dst)
}
fn serialize_key_to_arrow_with_metadata(
&mut self,
metadata: &dyn erased_serde::Serialize,
dst: &mut ArrayBuilder,
) -> AnyResult<()> {
self.cursor
.serialize_key_to_arrow_with_metadata(metadata, dst)
}
fn serialize_val_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()> {
self.cursor.serialize_val_to_arrow(dst)
}
fn serialize_val_to_arrow_with_metadata(
&mut self,
metadata: &dyn erased_serde::Serialize,
dst: &mut ArrayBuilder,
) -> AnyResult<()> {
self.cursor
.serialize_val_to_arrow_with_metadata(metadata, dst)
}
#[cfg(feature = "with-avro")]
fn key_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
self.cursor.key_to_avro(schema, refs)
}
fn serialize_key_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
self.cursor.serialize_key_weight(dst)
}
fn serialize_val(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
self.cursor.serialize_val(dst)
}
fn val_to_json(&mut self) -> AnyResult<serde_json::Value> {
self.cursor.val_to_json()
}
#[cfg(feature = "with-avro")]
fn val_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
self.cursor.val_to_avro(schema, refs)
}
fn weight(&mut self) -> i64 {
self.cursor.weight()
}
fn step_key(&mut self) {
self.cursor.step_key();
}
fn step_val(&mut self) {
self.cursor.step_val();
}
fn rewind_keys(&mut self) {
self.cursor.rewind_keys();
self.cursor.seek_key(self.start_key.as_data());
}
fn rewind_vals(&mut self) {
self.cursor.rewind_vals();
}
fn seek_key_exact(&mut self, key: &DynData) -> bool {
if let Some(ref end_key) = self.end_key
&& key >= end_key.as_data()
{
return false;
}
self.cursor.seek_key_exact(key)
}
fn seek_key(&mut self, key: &DynData) {
self.cursor.seek_key(key);
}
}
/// Cursor that allows serializing the contents of a type-erased batch.
///
/// This is a wrapper around the DBSP `Cursor` trait that yields keys and values
/// of the underlying batch as `erased_serde::Serialize` trait objects.
pub trait SerCursor: Send {
/// Indicates if the current key is valid.
///
/// A value of `false` indicates that the cursor has exhausted all keys.
fn key_valid(&self) -> bool;
/// Indicates if the current value is valid.
///
/// A value of `false` indicates that the cursor has exhausted all values
/// for this key.
fn val_valid(&self) -> bool;
fn key(&self) -> &DynData;
fn get_key(&self) -> Option<&DynData>;
/// Serialize current key. Panics if invalid.
fn serialize_key(&mut self, dst: &mut Vec<u8>) -> AnyResult<()>;
/// Convert key to JSON. Used for error reporting to generate a human-readable
/// representation of the key.
fn key_to_json(&mut self) -> AnyResult<serde_json::Value>;
/// Like `serialize_key`, but only serializes the specified fields of the key.
fn serialize_key_fields(
&mut self,
fields: &HashSet<String>,
dst: &mut Vec<u8>,
) -> AnyResult<()>;
/// Serialize current key into arrow format. Panics if invalid.
fn serialize_key_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()>;
/// Serialize current key into arrow format, adding additional metadata columns.
/// `metadata` must be a struct or a map.
fn serialize_key_to_arrow_with_metadata(
&mut self,
metadata: &dyn erased_serde::Serialize,
dst: &mut ArrayBuilder,
) -> AnyResult<()>;
/// Serialize current value into arrow format. Panics if invalid.
fn serialize_val_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()>;
/// Serialize current value into arrow format, adding additional metadata columns.
/// `metadata` must be a struct or a map.
fn serialize_val_to_arrow_with_metadata(
&mut self,
metadata: &dyn erased_serde::Serialize,
dst: &mut ArrayBuilder,
) -> AnyResult<()>;
#[cfg(feature = "with-avro")]
/// Convert current key to an Avro value.
fn key_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue>;
/// Serialize the `(key, weight)` tuple.
///
/// FIXME: This only exists to support the CSV serializer, which outputs
/// key and weight in the same CSV record.
fn serialize_key_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()>;
/// Serialize current value. Panics if invalid.
fn serialize_val(&mut self, dst: &mut Vec<u8>) -> AnyResult<()>;
/// Convert value to JSON. Used for error reporting to generate a human-readable
/// representation of the value.
fn val_to_json(&mut self) -> AnyResult<serde_json::Value>;
#[cfg(feature = "with-avro")]
/// Convert current value to Avro.
fn val_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue>;
/// Returns the weight associated with the current key/value pair.
fn weight(&mut self) -> i64;
/// Advances the cursor to the next key.
fn step_key(&mut self);
/// Advances the cursor to the next value.
fn step_val(&mut self);
/// Rewinds the cursor to the first key.
fn rewind_keys(&mut self);
/// Rewinds the cursor to the first value for current key.
fn rewind_vals(&mut self);
fn count_keys(&mut self) -> usize {
let mut count = 0;
while self.key_valid() {
count += 1;
self.step_key()
}
count
}
fn seek_key_exact(&mut self, key: &DynData) -> bool;
fn seek_key(&mut self, key: &DynData);
}
/// A handle to an output stream of a circuit that yields type-erased
/// read-only batches.
///
/// A trait for a type that wraps around an
/// [`OutputHandle`](`dbsp::OutputHandle`) and yields output batches produced by
/// the circuit as [`SerBatchReader`]s.
pub trait SerBatchReaderHandle: Send + Sync + DynClone {
/// See [`OutputHandle::num_nonempty_mailboxes`](`dbsp::OutputHandle::num_nonempty_mailboxes`)
fn num_nonempty_mailboxes(&self) -> usize;
/// Like [`OutputHandle::take_from_worker`](`dbsp::OutputHandle::take_from_worker`),
/// but returns output batch as a [`SerBatchReader`] trait object.
fn take_from_worker(&self, worker: usize) -> Option<Box<dyn SerBatchReader>>;
/// Like [`OutputHandle::take_from_all`](`dbsp::OutputHandle::take_from_all`),
/// but returns output batches as [`SerBatchReader`] trait objects.
fn take_from_all(&self) -> Vec<Arc<dyn SerBatchReader>>;
/// Concatenate outputs from all workers into a single batch reader.
fn concat(&self) -> Arc<dyn SerBatchReader>;
}
dyn_clone::clone_trait_object!(SerBatchReaderHandle);
/// Cursor that iterates over deletions before insertions.
///
/// Most consumers don't understand Z-sets and expect a stream of upserts
/// instead, which means that the order of updates matters. For a table
/// with a primary key or unique constraint we must delete an existing record
/// before creating a new one with the same key. DBSP may not know about
/// these constraints, so the safe thing to do is to output deletions before
/// insertions. This cursor helps by iterating over all deletions in
/// the batch before insertions.
pub struct CursorWithPolarity<'a> {
cursor: Box<dyn SerCursor + 'a>,
second_pass: bool,
}
impl<'a> CursorWithPolarity<'a> {
pub fn new(cursor: Box<dyn SerCursor + 'a>) -> Self {
let mut result = Self {
cursor,
second_pass: false,
};
if result.key_valid() {
result.advance_val();
}
result
}
fn advance_val(&mut self) {
while self.cursor.val_valid()
&& ((!self.second_pass && self.cursor.weight() >= 0)
|| (self.second_pass && self.cursor.weight() <= 0))
{
self.step_val();
}
}
}
impl SerCursor for CursorWithPolarity<'_> {
fn key_valid(&self) -> bool {
self.cursor.key_valid()
}
fn val_valid(&self) -> bool {
self.cursor.val_valid()
}
fn key(&self) -> &DynData {
self.cursor.key()
}
fn get_key(&self) -> Option<&DynData> {
self.cursor.get_key()
}
fn serialize_key(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
self.cursor.serialize_key(dst)
}
fn key_to_json(&mut self) -> AnyResult<serde_json::Value> {
self.cursor.key_to_json()
}
fn serialize_key_fields(
&mut self,
fields: &HashSet<String>,
dst: &mut Vec<u8>,
) -> AnyResult<()> {
self.cursor.serialize_key_fields(fields, dst)
}
#[cfg(feature = "with-avro")]
fn key_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
self.cursor.key_to_avro(schema, refs)
}
fn serialize_key_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
self.cursor.serialize_key_weight(dst)
}
fn serialize_key_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()> {
self.cursor.serialize_key_to_arrow(dst)
}
fn serialize_key_to_arrow_with_metadata(
&mut self,
metadata: &dyn erased_serde::Serialize,
dst: &mut ArrayBuilder,
) -> AnyResult<()> {
self.cursor
.serialize_key_to_arrow_with_metadata(metadata, dst)
}
fn serialize_val_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()> {
self.cursor.serialize_val_to_arrow(dst)
}
fn serialize_val_to_arrow_with_metadata(
&mut self,
metadata: &dyn erased_serde::Serialize,
dst: &mut ArrayBuilder,
) -> AnyResult<()> {
self.cursor
.serialize_val_to_arrow_with_metadata(metadata, dst)
}
fn serialize_val(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
self.cursor.serialize_val(dst)
}
fn val_to_json(&mut self) -> AnyResult<serde_json::Value> {
self.cursor.val_to_json()
}
#[cfg(feature = "with-avro")]
fn val_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
self.cursor.val_to_avro(schema, refs)
}
fn weight(&mut self) -> i64 {
self.cursor.weight()
}
fn step_key(&mut self) {
self.cursor.step_key();
if !self.cursor.key_valid() && !self.second_pass {
self.cursor.rewind_keys();
self.second_pass = true;
}
if self.cursor.key_valid() {
self.advance_val();
}
}
fn step_val(&mut self) {
self.cursor.step_val();
self.advance_val();
}
fn rewind_keys(&mut self) {
self.cursor.rewind_keys();
self.second_pass = false;
if self.cursor.key_valid() {
self.advance_val();
}
}
fn rewind_vals(&mut self) {
self.cursor.rewind_vals();
self.advance_val();
}
fn seek_key_exact(&mut self, key: &DynData) -> bool {
self.cursor.seek_key_exact(key)
}
fn seek_key(&mut self, key: &DynData) {
self.cursor.seek_key(key);
}
}
/// A catalog of input and output stream handles of a circuit.
pub trait CircuitCatalog: Send + Sync {
/// Look up an input stream handle by name.
fn input_collection_handle(&self, name: &SqlIdentifier) -> Option<&InputCollectionHandle>;
fn output_iter(
&self,
) -> Box<dyn Iterator<Item = (&SqlIdentifier, &OutputCollectionHandles)> + '_>;
/// Look up output stream handles by name.
fn output_handles(&self, name: &SqlIdentifier) -> Option<&OutputCollectionHandles>;
fn output_handles_mut(&mut self, name: &SqlIdentifier) -> Option<&mut OutputCollectionHandles>;
/// The registry used to insert new user-defined preprocessors
fn preprocessor_registry(&self) -> Arc<Mutex<PreprocessorRegistry>>;
}
#[doc(hidden)]
pub struct InputCollectionHandle {
pub schema: Relation,
pub handle: Box<dyn DeCollectionHandle>,
/// Node id of the input stream in the circuit.
///
/// Used to check whether the input stream needs to be backfilled during bootstrapping,
/// i.e., whether attached input connectors should be reset to their initial offsets or
/// continue from the checkpointed offsets.
pub node_id: NodeId,
}
impl InputCollectionHandle {
#[doc(hidden)]
pub fn new<H>(schema: Relation, handle: H, node_id: NodeId) -> Self
where
H: DeCollectionHandle + 'static,
{
Self {
schema,
handle: Box::new(handle),
node_id,
}
}
}
/// A set of stream handles associated with each output collection.
#[derive(Clone)]
pub struct OutputCollectionHandles {
pub key_schema: Option<Relation>,
pub value_schema: Relation,
pub index_of: Option<SqlIdentifier>,
/// Whether the integrate handle is an indexed Z-set.
pub integrate_handle_is_indexed: bool,
/// A handle to a snapshot of a materialized table/view.
pub integrate_handle: Option<Arc<dyn SerBatchReaderHandle>>,
/// A stream of changes to the collection.
pub delta_handle: Box<dyn SerBatchReaderHandle>,
/// Reference to the enable count of the accumulator used to collect updates to this stream.
/// Incremented every time an output connector is attached to this stream; decremented when
/// the output connector is detached.
pub enable_count: Arc<AtomicUsize>,
}