Skip to content

Commit 5a85047

Browse files
Leonid Ryzhykryzhyk
authored andcommitted
Materialized view-related fixups.
- Docs - Demos - Python API Signed-off-by: Leonid Ryzhyk <leonid@feldera.com>
1 parent 60363a0 commit 5a85047

File tree

21 files changed

+110
-76
lines changed

21 files changed

+110
-76
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/adapters/src/format/json/output.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,7 @@ mod test {
494494
let mut encoder = JsonEncoder::new(
495495
Box::new(consumer),
496496
config,
497-
&Relation::new("TestStruct", false, TestStruct::schema()),
497+
&Relation::new("TestStruct", false, TestStruct::schema(), false),
498498
);
499499
let zsets = batches
500500
.iter()
@@ -671,7 +671,7 @@ mod test {
671671
let mut encoder = JsonEncoder::new(
672672
Box::new(consumer),
673673
config,
674-
&Relation::new("TestStruct", false, TestStruct::schema()),
674+
&Relation::new("TestStruct", false, TestStruct::schema(), false),
675675
);
676676
let zset = OrdZSet::from_keys((), test_data()[0].clone());
677677

crates/adapters/src/format/parquet/test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ fn parquet_output() {
115115
let mut encoder = ParquetEncoder::new(
116116
Box::new(consumer),
117117
config,
118-
Relation::new("TestStruct2", false, TestStruct2::schema()),
118+
Relation::new("TestStruct2", false, TestStruct2::schema(), false),
119119
)
120120
.expect("Can't create encoder");
121121
let zset = OrdZSet::from_keys(

crates/adapters/src/test/kafka.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ impl BufferConsumer {
245245
let buffer = MockDeZSet::new();
246246

247247
// Input parsers don't care about schema yet.
248-
let schema = Relation::new("mock_schema", false, vec![]);
248+
let schema = Relation::new("mock_schema", false, vec![], false);
249249

250250
let mut parser = format
251251
.new_parser(

crates/adapters/src/test/mod.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ where
9797
{
9898
let input_handle = <MockDeZSet<T, U>>::new();
9999
// Input parsers don't care about schema yet.
100-
let schema = Relation::new("mock_schema", false, vec![]);
100+
let schema = Relation::new("mock_schema", false, vec![], false);
101101
let consumer = MockInputConsumer::from_handle(
102102
&InputCollectionHandle::new(schema, input_handle.clone()),
103103
config,
@@ -155,10 +155,11 @@ where
155155
let (input, hinput) = circuit.add_input_zset::<T>();
156156

157157
let input_schema =
158-
serde_json::to_string(&Relation::new("test_input1", false, schema.clone())).unwrap();
158+
serde_json::to_string(&Relation::new("test_input1", false, schema.clone(), false))
159+
.unwrap();
159160

160161
let output_schema =
161-
serde_json::to_string(&Relation::new("test_output1", false, schema)).unwrap();
162+
serde_json::to_string(&Relation::new("test_output1", false, schema, false)).unwrap();
162163

163164
catalog.register_materialized_input_zset(input.clone(), hinput, &input_schema);
164165
catalog.register_materialized_output_zset(input, &output_schema);
@@ -196,7 +197,7 @@ where
196197
let buffer = MockDeZSet::<T, T>::new();
197198

198199
// Input parsers don't care about schema yet.
199-
let schema = Relation::new("mock_schema", false, vec![]);
200+
let schema = Relation::new("mock_schema", false, vec![], false);
200201

201202
let mut parser = format
202203
.new_parser(

crates/pipeline-types/src/program_schema.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,17 @@ pub struct Relation {
5050
pub case_sensitive: bool,
5151
#[cfg_attr(feature = "testing", proptest(value = "Vec::new()"))]
5252
pub fields: Vec<Field>,
53+
#[serde(default)]
54+
pub materialized: bool,
5355
}
5456

5557
impl Relation {
56-
pub fn new(name: &str, case_sensitive: bool, fields: Vec<Field>) -> Self {
58+
pub fn new(name: &str, case_sensitive: bool, fields: Vec<Field>, materialized: bool) -> Self {
5759
Self {
5860
name: name.to_string(),
5961
case_sensitive,
6062
fields,
63+
materialized,
6164
}
6265
}
6366

crates/pipeline-types/src/transport/delta_table.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,10 @@ fn test_delta_reader_config_serde() {
170170

171171
let expected = r#"{"uri":"protocol:/path/to/somewhere","timestamp_column":"ts","mode":"follow","snapshot_filter":"ts BETWEEN '2005-01-01 00:00:00' AND '2010-12-31 23:59:59'","version":null,"datetime":"2010-12-31 00:00:00Z","customoption1":"val1","customoption2":"val2"}"#;
172172

173-
assert_eq!(serialized_config, expected);
173+
assert_eq!(
174+
serde_json::from_str::<serde_json::Value>(&serialized_config).unwrap(),
175+
serde_json::from_str::<serde_json::Value>(&expected).unwrap()
176+
);
174177
}
175178

176179
impl DeltaTableReaderConfig {

crates/pipeline_manager/src/db/test.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -862,8 +862,8 @@ async fn versioning() {
862862
tenant_id,
863863
program_id,
864864
ProgramSchema {
865-
inputs: vec![Relation::new("t1", false, vec![])],
866-
outputs: vec![Relation::new("v1", false, vec![])],
865+
inputs: vec![Relation::new("t1", false, vec![], false)],
866+
outputs: vec![Relation::new("v1", false, vec![], false)],
867867
},
868868
)
869869
.await
@@ -941,10 +941,10 @@ async fn versioning() {
941941
program_id,
942942
ProgramSchema {
943943
inputs: vec![
944-
Relation::new("t1", false, vec![]),
945-
Relation::new("t2", false, vec![]),
944+
Relation::new("t1", false, vec![], false),
945+
Relation::new("t2", false, vec![], false),
946946
],
947-
outputs: vec![Relation::new("v1", false, vec![])],
947+
outputs: vec![Relation::new("v1", false, vec![], false)],
948948
},
949949
)
950950
.await
@@ -963,8 +963,8 @@ async fn versioning() {
963963
tenant_id,
964964
program_id,
965965
ProgramSchema {
966-
inputs: vec![Relation::new("tnew1", false, vec![])],
967-
outputs: vec![Relation::new("vnew1", false, vec![])],
966+
inputs: vec![Relation::new("tnew1", false, vec![], false)],
967+
outputs: vec![Relation::new("vnew1", false, vec![], false)],
968968
},
969969
)
970970
.await

crates/pipeline_manager/src/integration_test.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -755,7 +755,7 @@ async fn deploy_pipeline() {
755755
let config = setup().await;
756756
let _ = deploy_pipeline_without_connectors(
757757
&config,
758-
"create table t1(c1 integer); create view v1 as select * from t1;",
758+
"create table t1(c1 integer) with ('materialized' = 'true'); create view v1 as select * from t1;",
759759
)
760760
.await;
761761

@@ -930,7 +930,7 @@ async fn json_ingress() {
930930
let config = setup().await;
931931
let id = deploy_pipeline_without_connectors(
932932
&config,
933-
"create table t1(c1 integer, c2 bool, c3 varchar); create view v1 as select * from t1;",
933+
"create table t1(c1 integer, c2 bool, c3 varchar) with ('materialized' = 'true'); create materialized view v1 as select * from t1;",
934934
)
935935
.await;
936936

@@ -1101,7 +1101,7 @@ async fn map_column() {
11011101
let config = setup().await;
11021102
let id = deploy_pipeline_without_connectors(
11031103
&config,
1104-
"create table t1(c1 integer, c2 bool, c3 MAP<varchar, varchar>); create view v1 as select * from t1;",
1104+
"create table t1(c1 integer, c2 bool, c3 MAP<varchar, varchar>) with ('materialized' = 'true'); create view v1 as select * from t1;",
11051105
)
11061106
.await;
11071107

@@ -1159,7 +1159,7 @@ async fn parse_datetime() {
11591159
let config = setup().await;
11601160
let _ = deploy_pipeline_without_connectors(
11611161
&config,
1162-
"create table t1(t TIME, ts TIMESTAMP, d DATE);",
1162+
"create table t1(t TIME, ts TIMESTAMP, d DATE) with ('materialized' = 'true');",
11631163
)
11641164
.await;
11651165

@@ -1208,7 +1208,7 @@ async fn quoted_columns() {
12081208
let config = setup().await;
12091209
let _ = deploy_pipeline_without_connectors(
12101210
&config,
1211-
r#"create table t1("c1" integer not null, "C2" bool not null, "😁❤" varchar not null, "αβγ" boolean not null, ΔΘ boolean not null)"#,
1211+
r#"create table t1("c1" integer not null, "C2" bool not null, "😁❤" varchar not null, "αβγ" boolean not null, ΔΘ boolean not null) with ('materialized' = 'true')"#,
12121212
)
12131213
.await;
12141214

@@ -1258,7 +1258,7 @@ async fn primary_keys() {
12581258
let config = setup().await;
12591259
let _ = deploy_pipeline_without_connectors(
12601260
&config,
1261-
r#"create table t1(id bigint not null, s varchar not null, primary key (id))"#,
1261+
r#"create table t1(id bigint not null, s varchar not null, primary key (id)) with ('materialized' = 'true')"#,
12621262
)
12631263
.await;
12641264

@@ -1371,8 +1371,8 @@ async fn case_sensitive_tables() {
13711371
&config,
13721372
r#"create table "TaBle1"(id bigint not null);
13731373
create table table1(id bigint);
1374-
create view "V1" as select * from "TaBle1";
1375-
create view "v1" as select * from table1;"#,
1374+
create materialized view "V1" as select * from "TaBle1";
1375+
create materialized view "v1" as select * from table1;"#,
13761376
)
13771377
.await;
13781378

demo/project_demo00-SecOps/project.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ create view k8scluster_vulnerability (
138138
-- Per-cluster statistics:
139139
-- * Number of vulnerabilities.
140140
-- * Most severe vulnerability.
141-
create view k8scluster_vulnerability_stats (
141+
create materialized view k8scluster_vulnerability_stats (
142142
k8scluster_id,
143143
k8scluster_name,
144144
total_vulnerabilities,

0 commit comments

Comments
 (0)