Skip to content

Commit 806b7a5

Browse files
committed
[adapters] Don't materialize tables with PK+lateness.
Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
1 parent b39a0a0 commit 806b7a5

File tree

2 files changed

+81
-17
lines changed

2 files changed

+81
-17
lines changed

crates/adapters/src/static_compile/catalog.rs

Lines changed: 77 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ impl Catalog {
259259
.unwrap();
260260

261261
// Inputs are also outputs.
262-
self.register_materialized_output_map_persistent(
262+
self.register_output_map_persistent(
263263
Self::output_persistent_id(&stream).as_deref(),
264264
stream,
265265
schema,
@@ -506,6 +506,33 @@ impl Catalog {
506506
self.register_materialized_output_map_persistent(None, stream, schema)
507507
}
508508

509+
pub fn register_output_map_persistent<K, KD, V, VD>(
510+
&mut self,
511+
persistent_id: Option<&str>,
512+
stream: Stream<RootCircuit, OrdIndexedZSet<K, V>>,
513+
schema: &str,
514+
) where
515+
KD: for<'de> DeserializeWithContext<'de, SqlSerdeConfig>
516+
+ SerializeWithContext<SqlSerdeConfig>
517+
+ From<K>
518+
+ Send
519+
+ Sync
520+
+ Debug
521+
+ 'static,
522+
VD: for<'de> DeserializeWithContext<'de, SqlSerdeConfig>
523+
+ SerializeWithContext<SqlSerdeConfig>
524+
+ From<V>
525+
+ Default
526+
+ Debug
527+
+ Clone
528+
+ Send
529+
+ 'static,
530+
K: DBData + Send + Sync + From<KD> + Default,
531+
V: DBData + Send + Sync + From<VD> + Default,
532+
{
533+
self.register_output_map_persistent_inner(persistent_id, stream, schema, false)
534+
}
535+
509536
pub fn register_materialized_output_map_persistent<K, KD, V, VD>(
510537
&mut self,
511538
persistent_id: Option<&str>,
@@ -529,6 +556,34 @@ impl Catalog {
529556
+ 'static,
530557
K: DBData + Send + Sync + From<KD> + Default,
531558
V: DBData + Send + Sync + From<VD> + Default,
559+
{
560+
self.register_output_map_persistent_inner(persistent_id, stream, schema, true)
561+
}
562+
563+
pub fn register_output_map_persistent_inner<K, KD, V, VD>(
564+
&mut self,
565+
persistent_id: Option<&str>,
566+
stream: Stream<RootCircuit, OrdIndexedZSet<K, V>>,
567+
schema: &str,
568+
materialized: bool,
569+
) where
570+
KD: for<'de> DeserializeWithContext<'de, SqlSerdeConfig>
571+
+ SerializeWithContext<SqlSerdeConfig>
572+
+ From<K>
573+
+ Send
574+
+ Sync
575+
+ Debug
576+
+ 'static,
577+
VD: for<'de> DeserializeWithContext<'de, SqlSerdeConfig>
578+
+ SerializeWithContext<SqlSerdeConfig>
579+
+ From<V>
580+
+ Default
581+
+ Debug
582+
+ Clone
583+
+ Send
584+
+ 'static,
585+
K: DBData + Send + Sync + From<KD> + Default,
586+
V: DBData + Send + Sync + From<VD> + Default,
532587
{
533588
let schema: Relation = Self::parse_relation_schema(schema).unwrap();
534589
let name = schema.name.clone();
@@ -546,16 +601,26 @@ impl Catalog {
546601
let (delta_handle, delta_gid) = delta.accumulate_output_persistent_with_gid(persistent_id);
547602
stream.circuit().set_mir_node_id(&delta_gid, persistent_id);
548603

549-
// `integrate_trace` below should return the existing integral created by the InputUpsert operator.
550-
let (integrate_handle, integral_gid) = stream
551-
.integrate_trace()
552-
.apply(|s| TypedBatch::<K, V, ZWeight, _>::new(s.inner().ro_snapshot()))
553-
.output_persistent_with_gid(
554-
persistent_id.map(|id| format!("{id}.integral")).as_deref(),
555-
);
556-
stream
557-
.circuit()
558-
.set_mir_node_id(&integral_gid, persistent_id);
604+
let integrate_handle = if materialized {
605+
// `integrate_trace` below should return the existing integral created by the InputUpsert operator.
606+
let (integrate_handle, integral_gid) = stream
607+
.integrate_trace()
608+
.apply(|s| TypedBatch::<K, V, ZWeight, _>::new(s.inner().ro_snapshot()))
609+
.output_persistent_with_gid(
610+
persistent_id
611+
.map(|id| format!("{id}.output_integral"))
612+
.as_deref(),
613+
);
614+
stream
615+
.circuit()
616+
.set_mir_node_id(&integral_gid, persistent_id);
617+
Some(
618+
Arc::new(<SerCollectionHandleImpl<_, KD, VD>>::new(integrate_handle))
619+
as Arc<dyn SerBatchReaderHandle>,
620+
)
621+
} else {
622+
None
623+
};
559624

560625
let handles = OutputCollectionHandles {
561626
key_schema: None,
@@ -564,9 +629,7 @@ impl Catalog {
564629
delta_handle: Box::new(<SerCollectionHandleImpl<_, VD, ()>>::new(delta_handle))
565630
as Box<dyn SerBatchReaderHandle>,
566631
integrate_handle_is_indexed: true,
567-
integrate_handle: Some(Arc::new(<SerCollectionHandleImpl<_, KD, VD>>::new(
568-
integrate_handle,
569-
)) as Arc<dyn SerBatchReaderHandle>),
632+
integrate_handle,
570633
};
571634

572635
self.register_output_batch_handles(&name, handles).unwrap();

docs.feldera.com/docs/sql/materialized.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@ To declare a materialized table, use the materialized attribute:
1919
CREATE TABLE my_table (...) WITH ('materialized' = 'true');
2020
```
2121

22-
In addition, tables with a `PRIMARY KEY` constraint are automatically materialized
23-
(since the pipeline must in any case maintain an index of such a table used to enforce the constraint)
24-
and do not require an explicit `materialized` attribute.
22+
Tables with a `PRIMARY KEY` constraint are automatically materialized, since the pipeline
23+
must maintain an index to enforce the constraint. Therefore, such tables do not need an explicit
24+
`materialized` attribute. The only exception is when a table has a `LATENESS` attribute defined
25+
on one of its primary key columns—in that case, you must specify the `materialized` attribute explicitly.
2526

2627
To declare a materialized view, use the `CREATE MATERIALIZED VIEW` syntax:
2728

0 commit comments

Comments
 (0)