Skip to content

Commit a81bc78

Browse files
committed
types: fix openapi spec of StartFromCheckpoint
* py: set `runtime_version` in `PipelineBuilder` Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> [ci] apply automatic fixes Signed-off-by: feldera-bot <feldera-bot@feldera.com>
1 parent 82af9d5 commit a81bc78

File tree

5 files changed

+45
-19
lines changed

5 files changed

+45
-19
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ tracing-subscriber = "0.3.18"
238238
typedmap = "0.3.0"
239239
url = "2.5.0"
240240
urlencoding = "2.1.3"
241-
utoipa = "4.2"
241+
utoipa = { version = "4.2", features = ["uuid"] }
242242
utoipa-swagger-ui = { version = "7.1", features = ["vendored"] }
243243
uuid = { version = "1.17.0", features = ["serde"] }
244244
wiremock = "0.6"

crates/feldera-types/src/config.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,12 +272,42 @@ pub enum StorageCompression {
272272
Snappy,
273273
}
274274

275-
#[derive(Debug, Clone, Eq, PartialEq, ToSchema)]
275+
#[derive(Debug, Clone, Eq, PartialEq)]
276276
pub enum StartFromCheckpoint {
277277
Latest,
278278
Uuid(uuid::Uuid),
279279
}
280280

281+
impl ToSchema<'_> for StartFromCheckpoint {
282+
fn schema() -> (
283+
&'static str,
284+
utoipa::openapi::RefOr<utoipa::openapi::schema::Schema>,
285+
) {
286+
(
287+
"StartFromCheckpoint",
288+
utoipa::openapi::RefOr::T(Schema::OneOf(
289+
OneOfBuilder::new()
290+
.item(
291+
ObjectBuilder::new()
292+
.schema_type(SchemaType::String)
293+
.enum_values(Some(["latest"].into_iter()))
294+
.build(),
295+
)
296+
.item(
297+
ObjectBuilder::new()
298+
.schema_type(SchemaType::String)
299+
.format(Some(utoipa::openapi::SchemaFormat::KnownFormat(
300+
utoipa::openapi::KnownFormat::Uuid,
301+
)))
302+
.build(),
303+
)
304+
.nullable(true)
305+
.build(),
306+
)),
307+
)
308+
}
309+
}
310+
281311
impl<'de> Deserialize<'de> for StartFromCheckpoint {
282312
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
283313
where

openapi.json

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7238,22 +7238,15 @@
72387238
{
72397239
"type": "string",
72407240
"enum": [
7241-
"Latest"
7241+
"latest"
72427242
]
72437243
},
72447244
{
7245-
"type": "object",
7246-
"required": [
7247-
"Uuid"
7248-
],
7249-
"properties": {
7250-
"Uuid": {
7251-
"type": "string",
7252-
"format": "uuid"
7253-
}
7254-
}
7245+
"type": "string",
7246+
"format": "uuid"
72557247
}
7256-
]
7248+
],
7249+
"nullable": true
72577250
},
72587251
"StorageBackendConfig": {
72597252
"oneOf": [

python/feldera/pipeline_builder.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import os
2+
from typing import Optional
3+
14
from feldera.rest.feldera_client import FelderaClient
25
from feldera.rest.pipeline import Pipeline as InnerPipeline
36
from feldera.pipeline import Pipeline
@@ -32,6 +35,7 @@ def __init__(
3235
description: str = "",
3336
compilation_profile: CompilationProfile = CompilationProfile.OPTIMIZED,
3437
runtime_config: RuntimeConfig = RuntimeConfig.default(),
38+
runtime_version: Optional[str] = None,
3539
):
3640
self.client: FelderaClient = client
3741
self.name: str | None = name
@@ -41,6 +45,9 @@ def __init__(
4145
self.udf_toml: str = udf_toml
4246
self.compilation_profile: CompilationProfile = compilation_profile
4347
self.runtime_config: RuntimeConfig = runtime_config
48+
self.runtime_version: Optional[str] = runtime_version or os.environ.get(
49+
"FELDERA_RUNTIME_VERSION"
50+
)
4451

4552
def create(self) -> Pipeline:
4653
"""
@@ -67,6 +74,7 @@ def create(self) -> Pipeline:
6774
udf_toml=self.udf_toml,
6875
program_config={
6976
"profile": self.compilation_profile.value,
77+
"runtime_version": self.runtime_version,
7078
},
7179
runtime_config=self.runtime_config.to_dict(),
7280
)

python/feldera/runtime_config.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import os
21
from typing import Optional, Any, Mapping
32
from feldera.enums import FaultToleranceModel
43

@@ -79,7 +78,6 @@ def __init__(
7978
clock_resolution_usecs: Optional[int] = None,
8079
provisioning_timeout_secs: Optional[int] = None,
8180
resources: Optional[Resources] = None,
82-
runtime_version: Optional[str] = None,
8381
fault_tolerance_model: Optional[FaultToleranceModel] = None,
8482
checkpoint_interval_secs: Optional[int] = None,
8583
):
@@ -91,9 +89,6 @@ def __init__(
9189
self.min_batch_size_records = min_batch_size_records
9290
self.clock_resolution_usecs = clock_resolution_usecs
9391
self.provisioning_timeout_secs = provisioning_timeout_secs
94-
self.runtime_version = runtime_version or os.environ.get(
95-
"FELDERA_RUNTIME_VERSION"
96-
)
9792
if fault_tolerance_model is not None:
9893
self.fault_tolerance = {
9994
"model": str(fault_tolerance_model),

0 commit comments

Comments
 (0)