Skip to content

Commit 89e3b42

Browse files
committed
Use JOINs and fix test
Signed-off-by: Simon Kassing <simon.kassing@feldera.com>
1 parent 2f0cf0a commit 89e3b42

File tree

4 files changed

+33
-37
lines changed

4 files changed

+33
-37
lines changed

crates/pipeline-manager/migrations/V32__pipeline_monitor_event.sql

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
-- The rows in this table are regularly cleaned up to prevent it growing unbound.
33
CREATE TABLE IF NOT EXISTS pipeline_monitor_event (
44
id UUID PRIMARY KEY NOT NULL, -- Unique event identifier.
5-
tenant_id UUID NOT NULL, -- Tenant identifier
65
pipeline_id UUID NOT NULL, -- Identifier of the pipeline the event corresponds to.
76
recorded_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL, -- Timestamp when status recording started.
87
resources_status VARCHAR NOT NULL, -- Resources status.
@@ -13,6 +12,5 @@ CREATE TABLE IF NOT EXISTS pipeline_monitor_event (
1312
runtime_desired_status VARCHAR NULL, -- Runtime desired status (only set during deployment).
1413
program_status VARCHAR NOT NULL, -- Program status.
1514
storage_status VARCHAR NOT NULL, -- Storage status.
16-
FOREIGN KEY (tenant_id) REFERENCES tenant(id) ON DELETE CASCADE,
1715
FOREIGN KEY (pipeline_id) REFERENCES pipeline(id) ON DELETE CASCADE
1816
);

crates/pipeline-manager/proptest-regressions/db/test.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,4 @@ cc 126dfeb51d3f8f4707aaf4d12a3d6f90a0ac9fe7f12cffcaa014e8604dfb5f3c # shrinks to
4646
cc 849625f53652fb9e6c8f1438bcc512d6d2f5a8abe49286048fce5fd6a1a71c87
4747
cc 4058ca3fe2e46f2fb8bf26a0f86d8e2036b07b1ddd469b6af343004ae67373c0 # shrinks to [NewOrUpdatePipeline(TenantId(01000000-0000-0000-0000-000000000000), 03000000-0000-0000-0000-000000000000, "pipeline-3", "v0", false, PipelineDescr { name: "pipeline-3", description: "", runtime_config: Object {"workers": Number(0), "storage": Null, "fault_tolerance": Object {"model": String("none"), "checkpoint_interval_secs": Number(60)}, "cpu_profiler": Bool(false), "tracing": Bool(false), "tracing_endpoint_jaeger": String(""), "min_batch_size_records": Number(0), "max_buffering_delay_usecs": Number(0), "resources": Object {"cpu_cores_min": Null, "cpu_cores_max": Null, "memory_mb_min": Null, "memory_mb_max": Null, "storage_mb_max": Null, "storage_class": Null, "service_account_name": Null, "namespace": Null}, "clock_resolution_usecs": Null, "pin_cpus": Array [], "provisioning_timeout_secs": Null, "max_parallel_connector_init": Null, "init_containers": Null, "checkpoint_during_suspend": Bool(false), "http_workers": Null, "io_workers": Null, "dev_tweaks": Object {}, "logging": Null}, program_code: "", udf_rust: "", udf_toml: "", program_config: Object {"profile": String("optimized"), "cache": Bool(false), "runtime_version": Null} }), TransitProgramStatusToCompilingSql(TenantId(01000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000), Version(1)), TransitProgramStatusToSqlCompiled(TenantId(01000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000), Version(1), SqlCompilationInfo { exit_code: 1, messages: [] }, Object {"schema": Object {"inputs": Array [], "outputs": Array []}, "main_rust": String("main-rust-0"), "udf_stubs": String("udf-stubs-0"), "dataflow": Null, "input_connectors": Object {}, "output_connectors": Object {}}), TransitProgramStatusToCompilingRust(TenantId(01000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000), Version(1)), ClearOngoingRustCompilation("v0"), GetNextRustCompilation("v0")]
4848
cc 4e812fc686724fb39aff5c4f0dbde67e36f477522457ca2b9fa5e5374c9ce61f # shrinks to [NewOrUpdatePipeline(TenantId(03000000-0000-0000-0000-000000000000), 03000000-0000-0000-0000-000000000000, "pipeline-4", "v2", false, PipelineDescr { name: "pipeline-4", description: "", runtime_config: Object {"workers": Number(0), "storage": Null, "fault_tolerance": Object {"model": String("none"), "checkpoint_interval_secs": Number(60)}, "cpu_profiler": Bool(false), "tracing": Bool(false), "tracing_endpoint_jaeger": String(""), "min_batch_size_records": Number(0), "max_buffering_delay_usecs": Number(0), "resources": Object {"cpu_cores_min": Null, "cpu_cores_max": Null, "memory_mb_min": Null, "memory_mb_max": Null, "storage_mb_max": Null, "storage_class": Null, "service_account_name": Null, "namespace": Null}, "clock_resolution_usecs": Null, "pin_cpus": Array [], "provisioning_timeout_secs": Null, "max_parallel_connector_init": Null, "init_containers": Null, "checkpoint_during_suspend": Bool(false), "http_workers": Null, "io_workers": Null, "dev_tweaks": Object {}, "logging": Null, "pipeline_template_configmap": Null}, program_code: "", udf_rust: "", udf_toml: "", program_config: Object {"profile": String("optimized"), "cache": Bool(false), "runtime_version": Null} }), ClearOngoingSqlCompilation("v0"), SetDeploymentResourcesDesiredStatusProvisioned(TenantId(03000000-0000-0000-0000-000000000000), "pipeline-4", Paused), TransitDeploymentResourcesStatusToStopping(TenantId(03000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000), Version(2), None, None), ListPipelineMonitorEvents(TenantId(03000000-0000-0000-0000-000000000000), "pipeline-4")]
49+
cc db90e416d47dfca52aa161e45eb457083c8b2009dcce37f072d3173889981432 # shrinks to [NewPipeline(TenantId(03000000-0000-0000-0000-000000000000), 01000000-0000-0000-0000-000000000000, "v0", PipelineDescr { name: "pipeline-1", description: "", runtime_config: Object {"workers": Number(0), "hosts": Number(0), "storage": Null, "fault_tolerance": Object {"model": String("none"), "checkpoint_interval_secs": Number(60)}, "cpu_profiler": Bool(false), "tracing": Bool(false), "tracing_endpoint_jaeger": String(""), "min_batch_size_records": Number(0), "max_buffering_delay_usecs": Number(0), "resources": Object {"cpu_cores_min": Null, "cpu_cores_max": Null, "memory_mb_min": Null, "memory_mb_max": Null, "storage_mb_max": Null, "storage_class": Null, "service_account_name": Null, "namespace": Null}, "clock_resolution_usecs": Null, "pin_cpus": Array [], "provisioning_timeout_secs": Null, "max_parallel_connector_init": Null, "init_containers": Null, "checkpoint_during_suspend": Bool(false), "http_workers": Null, "io_workers": Null, "dev_tweaks": Object {}, "logging": Null, "pipeline_template_configmap": Null}, program_code: "", udf_rust: "", udf_toml: "", program_config: Object {"profile": String("optimized"), "cache": Bool(false), "runtime_version": Null} }), UpdatePipeline(TenantId(03000000-0000-0000-0000-000000000000), "pipeline-1", None, None, "v0", false, None, None, None, None, None), ListPipelineMonitorEvents(TenantId(03000000-0000-0000-0000-000000000000), "pipeline-1")]

crates/pipeline-manager/src/db/operations/pipeline_monitor.rs

Lines changed: 28 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,8 @@ pub(crate) async fn get_pipeline_monitor_event_short(
9696
let stmt = txn
9797
.prepare_cached(&format!(
9898
"SELECT {RETRIEVE_SHORT_PIPELINE_EVENT_COLUMNS}
99-
FROM pipeline_monitor_event AS e
100-
WHERE e.tenant_id = $1 AND e.pipeline_id = $2 AND e.id = $3
99+
FROM pipeline_monitor_event AS e, pipeline AS p
100+
WHERE p.tenant_id = $1 AND p.id = e.pipeline_id AND e.pipeline_id = $2 AND e.id = $3
101101
"
102102
))
103103
.await?;
@@ -122,8 +122,8 @@ pub(crate) async fn get_pipeline_monitor_event_extended(
122122
let stmt = txn
123123
.prepare_cached(&format!(
124124
"SELECT {RETRIEVE_EXTENDED_PIPELINE_EVENT_COLUMNS}
125-
FROM pipeline_monitor_event AS e
126-
WHERE e.tenant_id = $1 AND e.pipeline_id = $2 AND e.id = $3
125+
FROM pipeline_monitor_event AS e, pipeline AS p
126+
WHERE p.tenant_id = $1 AND p.id = e.pipeline_id AND e.pipeline_id = $2 AND e.id = $3
127127
"
128128
))
129129
.await?;
@@ -147,8 +147,8 @@ pub(crate) async fn get_latest_pipeline_monitor_event_short(
147147
let stmt = txn
148148
.prepare_cached(&format!(
149149
"SELECT {RETRIEVE_SHORT_PIPELINE_EVENT_COLUMNS}
150-
FROM pipeline_monitor_event AS e
151-
WHERE e.tenant_id = $1 AND e.pipeline_id = $2
150+
FROM pipeline_monitor_event AS e, pipeline AS p
151+
WHERE p.tenant_id = $1 AND p.id = e.pipeline_id AND e.pipeline_id = $2
152152
ORDER BY e.recorded_at DESC, e.id DESC
153153
LIMIT 1
154154
"
@@ -174,8 +174,8 @@ pub(crate) async fn get_latest_pipeline_monitor_event_extended(
174174
let stmt = txn
175175
.prepare_cached(&format!(
176176
"SELECT {RETRIEVE_EXTENDED_PIPELINE_EVENT_COLUMNS}
177-
FROM pipeline_monitor_event AS e
178-
WHERE e.tenant_id = $1 AND e.pipeline_id = $2
177+
FROM pipeline_monitor_event AS e, pipeline AS p
178+
WHERE p.tenant_id = $1 AND p.id = e.pipeline_id AND e.pipeline_id = $2
179179
ORDER BY e.recorded_at DESC, e.id DESC
180180
LIMIT 1
181181
"
@@ -201,8 +201,8 @@ pub(crate) async fn list_pipeline_monitor_events_short(
201201
let stmt = txn
202202
.prepare_cached(&format!(
203203
"SELECT {RETRIEVE_SHORT_PIPELINE_EVENT_COLUMNS}
204-
FROM pipeline_monitor_event AS e
205-
WHERE e.tenant_id = $1 AND e.pipeline_id = $2
204+
FROM pipeline_monitor_event AS e, pipeline AS p
205+
WHERE p.tenant_id = $1 AND p.id = e.pipeline_id AND e.pipeline_id = $2
206206
ORDER BY e.recorded_at DESC, e.id DESC
207207
"
208208
))
@@ -229,43 +229,42 @@ pub(crate) async fn new_pipeline_monitor_event(
229229
.prepare_cached(
230230
"INSERT INTO pipeline_monitor_event
231231
(
232-
id, tenant_id, pipeline_id,
232+
id, pipeline_id,
233233
resources_status, resources_status_details, resources_desired_status,
234234
runtime_status, runtime_status_details, runtime_desired_status,
235235
program_status, storage_status
236236
)
237237
VALUES
238238
(
239-
$1, $2, $3,
240-
$4, $5, $6,
241-
$7, $8, $9,
242-
$10, $11
239+
$1, $2,
240+
$3, $4, $5,
241+
$6, $7, $8,
242+
$9, $10
243243
)",
244244
)
245245
.await?;
246246
txn.execute(
247247
&stmt,
248248
&[
249249
&new_id, // $1: id
250-
&tenant_id.0, // $2: tenant_id
251-
&pipeline_id.0, // $3: pipeline_id
252-
&pipeline.deployment_resources_status.to_string(), // $4: resources_status
250+
&pipeline_id.0, // $2: pipeline_id
251+
&pipeline.deployment_resources_status.to_string(), // $3: resources_status
253252
&pipeline
254253
.deployment_resources_status_details
255254
.unwrap_or(json!({}))
256-
.to_string(), // $5: resources_status_details
257-
&pipeline.deployment_resources_desired_status.to_string(), // $6: resources_desired_status
255+
.to_string(), // $3: resources_status_details
256+
&pipeline.deployment_resources_desired_status.to_string(), // $4: resources_desired_status
258257
&pipeline
259258
.deployment_runtime_status
260-
.map(runtime_status_to_string), // $7: runtime_status
259+
.map(runtime_status_to_string), // $6: runtime_status
261260
&pipeline
262261
.deployment_runtime_status_details
263-
.map(|v| v.to_string()), // $8: runtime_status_details
262+
.map(|v| v.to_string()), // $7: runtime_status_details
264263
&pipeline
265264
.deployment_runtime_desired_status
266-
.map(runtime_desired_status_to_string), // $9: runtime_desired_status
267-
&pipeline.program_status.to_string(), // $10: program_status
268-
&pipeline.storage_status.to_string(), // $11: storage_status
265+
.map(runtime_desired_status_to_string), // $8: runtime_desired_status
266+
&pipeline.program_status.to_string(), // $9: program_status
267+
&pipeline.storage_status.to_string(), // $10: storage_status
269268
],
270269
)
271270
.await
@@ -278,21 +277,17 @@ pub(crate) async fn new_pipeline_monitor_event(
278277
WHERE e1.id NOT IN ( \
279278
SELECT e2.id \
280279
FROM pipeline_monitor_event AS e2 \
281-
WHERE tenant_id = $1 AND pipeline_id = $2
280+
WHERE pipeline_id = $1
282281
ORDER BY e2.recorded_at DESC, e2.id DESC \
283-
LIMIT $3 \
284-
) AND tenant_id = $1 AND pipeline_id = $2
282+
LIMIT $2 \
283+
) AND pipeline_id = $1
285284
",
286285
)
287286
.await?;
288287
let num_deleted_due_to_limit = txn
289288
.execute(
290289
&stmt,
291-
&[
292-
&tenant_id.0,
293-
&pipeline_id.0,
294-
&(MONITOR_PIPELINE_RETENTION_NUM as i64),
295-
],
290+
&[&pipeline_id.0, &(MONITOR_PIPELINE_RETENTION_NUM as i64)],
296291
)
297292
.await?;
298293
Ok(num_deleted_due_to_limit)

crates/pipeline-manager/src/db/test.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3282,8 +3282,10 @@ impl ModelHelpers for Mutex<DbModel> {
32823282
.insert((tenant_id, pipeline.id), pipeline.clone());
32833283

32843284
// Return the final extended pipeline descriptor
3285-
self.new_pipeline_monitor_event(tenant_id, pipeline.id)
3286-
.await?;
3285+
if version_increment {
3286+
self.new_pipeline_monitor_event(tenant_id, pipeline.id)
3287+
.await?;
3288+
}
32873289
Ok(pipeline)
32883290
}
32893291
}

0 commit comments

Comments
 (0)