4848import org .dbsp .sqlCompiler .circuit .operator .DBSPDistinctOperator ;
4949import org .dbsp .sqlCompiler .circuit .operator .DBSPJoinBaseOperator ;
5050import org .dbsp .sqlCompiler .circuit .operator .DBSPOperator ;
51+ import org .dbsp .sqlCompiler .circuit .operator .DBSPSourceMultisetOperator ;
5152import org .dbsp .sqlCompiler .circuit .operator .DBSPStreamJoinIndexOperator ;
5253import org .dbsp .sqlCompiler .circuit .operator .DBSPIndexedTopKOperator ;
5354import org .dbsp .sqlCompiler .circuit .operator .DBSPIntegrateTraceRetainKeysOperator ;
8990import org .dbsp .sqlCompiler .compiler .visitors .outer .DeclareComparators ;
9091import org .dbsp .sqlCompiler .ir .IDBSPInnerNode ;
9192import org .dbsp .sqlCompiler .ir .IDBSPNode ;
93+ import org .dbsp .sqlCompiler .ir .IDBSPOuterNode ;
9294import org .dbsp .sqlCompiler .ir .expression .DBSPBinaryExpression ;
9395import org .dbsp .sqlCompiler .ir .expression .DBSPClosureExpression ;
9496import org .dbsp .sqlCompiler .ir .expression .DBSPComparatorExpression ;
112114import org .dbsp .sqlCompiler .ir .type .derived .DBSPTypeStruct ;
113115import org .dbsp .sqlCompiler .ir .type .user .DBSPTypeZSet ;
114116import org .dbsp .sqlCompiler .ir .type .primitive .DBSPTypeBool ;
117+ import org .dbsp .util .Bijection ;
115118import org .dbsp .util .HashString ;
116119import org .dbsp .util .IIndentStream ;
117120import org .dbsp .util .IndentStream ;
@@ -508,6 +511,36 @@ public VisitDecision preorder(DBSPDeltaOperator delta) {
508511 return VisitDecision .STOP ;
509512 }
510513
514+ void registerTable (DBSPSourceMultisetOperator operator , DBSPOperator materialized ) {
515+ // Register a table. If the table is materialized and has lateness,
516+ // the "materialized" operator may actually be the controlled filter following the table.
517+ // In other cases, materialized == operator.
518+ // The "materialized" operator's output stream is used to build the integral that serves the table contents,
519+ // and that integral should not contain the late values.
520+ // Note that this is never a problem for SourceMapOperators with lateness, due to the
521+ // nature of the circuits synthesized for them.
522+ this .builder .newline ();
523+ this .generateStructHelpers (operator .originalRowType , operator .metadata );
524+ String registerFunction = operator .metadata .materialized ?
525+ "register_materialized_input_zset" : "register_input_zset" ;
526+ this .builder .append ("catalog." )
527+ .append (registerFunction )
528+ .append ("::<_, " );
529+ IHasSchema tableDescription = this .metadata .getTableDescription (operator .tableName );
530+ JsonNode j = tableDescription .asJson (true );
531+ j = this .stripProperties (j );
532+ DBSPStrLiteral json = new DBSPStrLiteral (j .toString (), true );
533+ operator .originalRowType .accept (this .innerVisitor );
534+ this .builder .append (">(" )
535+ .append (materialized .getOutput (0 ).getName (this .preferHash ))
536+ .append (".clone(), " )
537+ .append (this .handleName (operator ))
538+ .append (", " );
539+ json .accept (this .innerVisitor );
540+ this .innerVisitor .setOperatorContext (null );
541+ this .builder .append (");" );
542+ }
543+
511544 @ Override
512545 public VisitDecision preorder (DBSPSourceMultisetOperator operator ) {
513546 this .computeHash (operator );
@@ -532,26 +565,9 @@ public VisitDecision preorder(DBSPSourceMultisetOperator operator) {
532565 }
533566 this .tagStream (operator );
534567 if (!this .useHandles ) {
535- this .builder .newline ();
536- this .generateStructHelpers (operator .originalRowType , operator .metadata );
537- String registerFunction = operator .metadata .materialized ?
538- "register_materialized_input_zset" : "register_input_zset" ;
539- this .builder .append ("catalog." )
540- .append (registerFunction )
541- .append ("::<_, " );
542- IHasSchema tableDescription = this .metadata .getTableDescription (operator .tableName );
543- JsonNode j = tableDescription .asJson (true );
544- j = this .stripProperties (j );
545- DBSPStrLiteral json = new DBSPStrLiteral (j .toString (), true );
546- operator .originalRowType .accept (this .innerVisitor );
547- this .builder .append (">(" )
548- .append (operator .getNodeName (this .preferHash ))
549- .append (".clone(), " )
550- .append (this .handleName (operator ))
551- .append (", " );
552- json .accept (this .innerVisitor );
553- this .innerVisitor .setOperatorContext (null );
554- this .builder .append (");" );
568+ if (!this .materialization .has (operator )) {
569+ this .registerTable (operator , operator );
570+ }
555571 }
556572 return VisitDecision .STOP ;
557573 }
@@ -798,6 +814,13 @@ public VisitDecision preorder(DBSPControlledKeyFilterOperator operator) {
798814 this .builder .newline ()
799815 .append (operator .getOutput (0 ).getName (this .preferHash ))
800816 .append (".set_persistent_id(hash);" );
817+ if (!this .useHandles ) {
818+ if (this .materialization .hasRight (operator )) {
819+ // Materialize now; otherwise, materialize at the ControlledFilter operator
820+ DBSPSourceMultisetOperator table = this .materialization .getLeft (operator );
821+ this .registerTable (table , operator );
822+ }
823+ }
801824 return VisitDecision .STOP ;
802825 }
803826
@@ -1542,14 +1565,44 @@ public VisitDecision preorder(DBSPConstantOperator operator) {
15421565 return this .constantLike (operator );
15431566 }
15441567
1568+ // Keeps track of tables that have to be materialized at a different point in the circuit.
1569+ Bijection <DBSPSourceMultisetOperator , DBSPControlledKeyFilterOperator > materialization = new Bijection <>();
1570+
1571+ void discoverLateMaterializations (DBSPCircuit circuit ) {
1572+ // Discover input nodes that immediately feed the left input of a controlled_key_filter operator.
1573+ // This pattern is generated by inputs with LATENESS annotations.
1574+ // If the input table is materialized, the materialized stream has
1575+ // to be the output of the controlled_key_filter operator.
1576+ for (DBSPOperator op : circuit .getAllOperators ()) {
1577+ // I don't think we need to recurse for nested nodes; the input nodes are all the outer level
1578+ if (op .is (DBSPControlledKeyFilterOperator .class )) {
1579+ DBSPControlledKeyFilterOperator filter = op .to (DBSPControlledKeyFilterOperator .class );
1580+ DBSPOperator left = filter .left ().operator ;
1581+ if (!left .is (DBSPSourceMultisetOperator .class )) continue ;
1582+ DBSPSourceMultisetOperator source = left .to (DBSPSourceMultisetOperator .class );
1583+ if (source .metadata .materialized )
1584+ this .materialization .map (source , filter );
1585+ }
1586+ }
1587+ }
1588+
1589+ @ Override
1590+ public Token startVisit (IDBSPOuterNode node ) {
1591+ Token result = super .startVisit (node );
1592+ DBSPCircuit circuit = node .to (DBSPCircuit .class );
1593+ this .discoverLateMaterializations (circuit );
1594+ return result ;
1595+ }
1596+
15451597 @ Override
15461598 public VisitDecision preorder (DBSPNowOperator operator ) {
15471599 return this .constantLike (operator );
15481600 }
15491601
15501602 public static void toRustString (DBSPCompiler compiler , IIndentStream stream ,
15511603 DBSPCircuit circuit , ProjectDeclarations projectDeclarations ) {
1552- ToRustVisitor visitor = new ToRustVisitor (compiler , stream , circuit .getMetadata (), projectDeclarations );
1604+ ToRustVisitor visitor = new ToRustVisitor (
1605+ compiler , stream , circuit .getMetadata (), projectDeclarations );
15531606 visitor .apply (circuit );
15541607 }
15551608}
0 commit comments