Skip to content

Commit d37e180

Browse files
committed
[manager] Only keep relations with lateness in the schema.
We include program IR and schema in pipeline config. The config is passed to the pipeline via ConfigMap, which has a size limit of 1MB. A large program can easily exceed this limit. Previously we reduced IR size by only keeping source and sink nodes. A program with a very large number of relations can still exceed the limit. Since this information is currently only used as part of backfill avoidance, which only cares about relations with lateness, we prune all relations without lateness to further reduce pipeline config size. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
1 parent a2bd0d6 commit d37e180

File tree

2 files changed

+17
-3
lines changed

2 files changed

+17
-3
lines changed

crates/feldera-types/src/program_schema.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ impl ProgramSchema {
174174
self.inputs
175175
.iter()
176176
.chain(self.outputs.iter())
177-
.filter(|rel| rel.fields.iter().any(|f| f.lateness.is_some()))
177+
.filter(|rel| rel.has_lateness())
178178
.map(|rel| rel.name.clone())
179179
.collect()
180180
}
@@ -234,6 +234,10 @@ impl Relation {
234234
let name = canonical_identifier(name);
235235
self.fields.iter().find(|f| f.name == name)
236236
}
237+
238+
pub fn has_lateness(&self) -> bool {
239+
self.fields.iter().any(|f| f.lateness.is_some())
240+
}
237241
}
238242

239243
/// A SQL field.

crates/pipeline-manager/src/db/types/program.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -754,7 +754,7 @@ pub fn generate_pipeline_config(
754754
// Only keep tables and views, ignoring intermediate nodes.
755755
// These are currently the only nodes used by the pipeline
756756
// (to compute pipeline diffs). Including all nodes can cause the IR
757-
// to exceed the maximum ConfigMap size supported by k8s (3145728).
757+
// to exceed the maximum ConfigMap size supported by k8s (1MB).
758758
let mir = program_info
759759
.dataflow
760760
.as_ref()
@@ -772,9 +772,19 @@ pub fn generate_pipeline_config(
772772
})
773773
.unwrap_or_default();
774774

775+
// Remove inputs and outputs that do not have lateness.
776+
// This field is currently only used for backfill avoidance, which only cares about
777+
// relations with lateness. Including the entire schema would cause the IR to exceed the
778+
// maximum ConfigMap size supported by k8s (1MB).
779+
let mut program_schema = program_info.schema.clone();
780+
program_schema.inputs.retain(|input| input.has_lateness());
781+
program_schema
782+
.outputs
783+
.retain(|output| output.has_lateness());
784+
775785
let program_ir = ProgramIr {
776786
mir,
777-
program_schema: program_info.schema.clone(),
787+
program_schema,
778788
};
779789

780790
PipelineConfig {

0 commit comments

Comments
 (0)