Skip to content

Commit 8441ddf

Browse files
authored
py: use per-instance variables instead of static (#1771)
Fixes: #1770 Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
1 parent 0e24687 commit 8441ddf

File tree

10 files changed

+104
-107
lines changed

10 files changed

+104
-107
lines changed

python/feldera/_sql_table.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,13 @@
33

44

55
class SQLTable:
6-
name: str
7-
ddl: str
8-
schema: Optional[SQLSchema]
9-
106
def __init__(self, name: str, ddl: Optional[str] = None, schema: Optional[SQLSchema] = None):
117
if ddl is None and schema is None:
128
raise ValueError("Either ddl or schema must be provided")
139

14-
self.name = name
15-
self.ddl = ddl
16-
self.schema = schema
10+
self.name: str = name
11+
self.ddl: str = ddl
12+
self.schema: Optional[SQLSchema] = schema
1713

1814
def build_ddl(self):
1915
"""

python/feldera/output_handler.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,13 @@ class _OutputHandlerInstruction(Enum):
1313

1414

1515
class OutputHandler(Thread):
16-
client: FelderaClient
17-
pipeline_name: str
18-
view_name: str
19-
queue: Queue
20-
buffer: list[list[dict]] = []
21-
2216
def __init__(self, client: FelderaClient, pipeline_name: str, view_name: str, queue: Queue):
2317
super().__init__()
24-
self.client = client
25-
self.pipeline_name = pipeline_name
26-
self.view_name = view_name
27-
self.queue = queue
18+
self.client: FelderaClient = client
19+
self.pipeline_name: str = pipeline_name
20+
self.view_name: str = view_name
21+
self.queue: Queue = queue
22+
self.buffer: list[list[dict]] = []
2823

2924
def run(self):
3025
"""

python/feldera/rest/attached_connector.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,11 @@ class AttachedConnector:
77
A connector that is attached to a pipeline.
88
"""
99

10-
name: str
11-
is_input: bool
12-
connector_name: str
13-
relation_name: str
14-
1510
def __init__(self, connector_name: str, relation_name: str, is_input: bool, name: Optional[str] = None):
16-
self.name = name or str(uuid.uuid4())
17-
self.is_input = is_input
18-
self.connector_name = connector_name
19-
self.relation_name = relation_name
11+
self.name: str = name or str(uuid.uuid4())
12+
self.is_input: bool = is_input
13+
self.connector_name: str = connector_name
14+
self.relation_name: str = relation_name
2015

2116
def to_json(self):
2217
return {

python/feldera/rest/config.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@ def __init__(
2020
:param timeout: The timeout for the HTTP requests
2121
"""
2222

23-
version = version or "v0"
24-
25-
self.url = url
26-
self.api_key = api_key
27-
self.version = version
28-
self.timeout = timeout
23+
self.url: str = url
24+
self.api_key: Optional[str] = api_key
25+
self.version: Optional[str] = version or "v0"
26+
self.timeout: Optional[float] = timeout

python/feldera/rest/connector.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,6 @@ class Connector:
66
"""
77
A generic connector class that can be used to represent any Feldera connector
88
"""
9-
name: str
10-
description: Optional[str]
11-
config: Mapping[str, Any]
12-
id: Optional[str]
139

1410
def __init__(
1511
self,
@@ -18,10 +14,10 @@ def __init__(
1814
config: Optional[Mapping[str, Any]] = None,
1915
id: Optional[str] = None,
2016
):
21-
self.name = name
22-
self.config = config or {}
23-
self.description = description
24-
self.id = id
17+
self.name: str = name
18+
self.config: Mapping[str, Any] = config or {}
19+
self.description: Optional[str] = description
20+
self.id: Optional[str] = id
2521

2622
def attach_relation(self, relation_name: str, is_input: bool) -> AttachedConnector:
2723
return AttachedConnector(self.name, relation_name, is_input)

python/feldera/rest/pipeline.py

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,6 @@ class Pipeline:
66
"""
77
Represents a Feldera pipeline
88
"""
9-
id: Optional[str]
10-
name: str
11-
program_name: str
12-
description: Optional[str]
13-
attached_connectors: list[AttachedConnector]
14-
config: Mapping[str, Any]
15-
state: Optional[Mapping[str, Any]]
16-
version: int
179

1810
def __init__(
1911
self,
@@ -26,14 +18,14 @@ def __init__(
2618
version: int = 0,
2719
id: Optional[str] = None
2820
):
29-
self.name = name
30-
self.program_name = program_name
31-
self.description = description
32-
self.attached_connectors = attached_connectors or []
33-
self.config = config or Pipeline.default_config()
34-
self.state = state
35-
self.version = version
36-
self.id = id
21+
self.name: str = name
22+
self.program_name: str = program_name
23+
self.description: Optional[str] = description
24+
self.attached_connectors: list[AttachedConnector] = attached_connectors or []
25+
self.config: Mapping[str, Any] = config or Pipeline.default_config()
26+
self.state: Optional[Mapping[str, Any]] = state
27+
self.version: int = version
28+
self.id: Optional[str] = id
3729

3830
@staticmethod
3931
def default_config() -> Mapping[str, Any]:

python/feldera/rest/program.py

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,11 @@ class Program:
66
Represents a Feldera SQL program
77
"""
88

9-
name: Optional[str] = None
10-
code: Optional[str] = None
11-
description: Optional[str] = None
12-
id: Optional[str] = None
13-
version: Optional[int] = None
14-
status: Optional[str] = None
15-
169
def __init__(self, name: str, code: str, description: str = None, status: Optional[str] = None, version: Optional[int] = None, id: Optional[str] = None) -> None:
17-
self.name = name
18-
self.code = code
19-
self.description = description
20-
self.status = status
21-
self.version = version
22-
self.id = id
10+
self.name: Optional[str] = name
11+
self.code: Optional[str] = code
12+
self.description: Optional[str] = description
13+
self.status: Optional[str] = status
14+
self.version: Optional[int] = version
15+
self.id: Optional[str] = id
2316

python/feldera/sql_context.py

Lines changed: 31 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -38,36 +38,6 @@ class SQLContext:
3838
The SQLContext is the main entry point for the Feldera SQL API.
3939
Abstracts the interaction with the Feldera API and provides a high-level interface for SQL pipelines.
4040
"""
41-
client: FelderaClient
42-
pipeline_name: str
43-
program_name: str
44-
build_mode: BuildMode
45-
state: PipelineState = PipelineState.SHUTDOWN
46-
47-
pipeline_description: str = ""
48-
program_description: str = ""
49-
ddl: str = ""
50-
51-
# In the SQL DDL declaration, the order of the tables and views is important.
52-
# From python 3.7 onwards, the order of insertion is preserved in dictionaries.
53-
# https://softwaremaniacs.org/blog/2020/02/05/dicts-ordered/en/
54-
views: Dict[str, str] = {}
55-
tables: Dict[str, SQLTable] = {}
56-
57-
# TODO: to be used for schema inference
58-
todo_tables: Dict[str, Optional[SQLTable]] = {}
59-
60-
http_input_buffer: list[Dict[str, dict | list[dict] | str]] = []
61-
62-
# buffer that stores all input connectors to be created
63-
# this is a Mapping[table_name -> list[Connector]]
64-
input_connectors_buffer: Dict[str, list[Connector]] = {}
65-
66-
# buffer that stores all output connectors to be created
67-
# this is a Mapping[view_name -> list[Connector]]
68-
output_connectors_buffer: Dict[str, list[Connector]] = {}
69-
70-
views_tx: list[Dict[str, Queue]] = []
7141

7242
def __init__(
7343
self,
@@ -77,13 +47,39 @@ def __init__(
7747
program_name: str = None,
7848
program_description: str = None,
7949
):
80-
self.client = client
50+
self.build_mode: Optional[BuildMode] = None
51+
self.state: PipelineState = PipelineState.SHUTDOWN
52+
53+
self.ddl: str = ""
54+
55+
# In the SQL DDL declaration, the order of the tables and views is important.
56+
# From python 3.7 onwards, the order of insertion is preserved in dictionaries.
57+
# https://softwaremaniacs.org/blog/2020/02/05/dicts-ordered/en/
58+
self.views: Dict[str, str] = {}
59+
self.tables: Dict[str, SQLTable] = {}
60+
61+
# TODO: to be used for schema inference
62+
self.todo_tables: Dict[str, Optional[SQLTable]] = {}
63+
64+
self.http_input_buffer: list[Dict[str, dict | list[dict] | str]] = []
65+
66+
# buffer that stores all input connectors to be created
67+
# this is a Mapping[table_name -> list[Connector]]
68+
self.input_connectors_buffer: Dict[str, list[Connector]] = {}
69+
70+
# buffer that stores all output connectors to be created
71+
# this is a Mapping[view_name -> list[Connector]]
72+
self.output_connectors_buffer: Dict[str, list[Connector]] = {}
73+
74+
self.views_tx: list[Dict[str, Queue]] = []
75+
76+
self.client: FelderaClient = client
8177

82-
self.pipeline_name = pipeline_name
83-
self.pipeline_description = pipeline_description or ""
78+
self.pipeline_name: str = pipeline_name
79+
self.pipeline_description: str = pipeline_description or ""
8480

85-
self.program_name = program_name or pipeline_name
86-
self.program_description = program_description or ""
81+
self.program_name: str = program_name or pipeline_name
82+
self.program_description: str = program_description or ""
8783

8884
def __build_ddl(self):
8985
"""

python/feldera/sql_schema.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,10 @@
22

33

44
class SQLSchema:
5-
# TODO: should capture nullability of columns, model after the rust `Relation` type
6-
schema: Mapping[str, str]
75

86
def __init__(self, schema: Mapping[str, str]):
9-
self.schema = schema
7+
# TODO: should capture nullability of columns, model after the rust `Relation` type
8+
self.schema: Mapping[str, str] = schema
109

1110
def build_ddl(self, table_name: str) -> str:
1211
ddl = f"CREATE TABLE {table_name} (\n"

python/tests/test_wireframes.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,43 @@ def test_local(self):
3737
print()
3838
print(df)
3939

40+
def test_two_SQLContexts(self):
41+
# https://github.com/feldera/feldera/issues/1770
42+
43+
sql = SQLContext('sql_context1', TEST_CLIENT).get_or_create()
44+
sql2 = SQLContext('sql_context2', TEST_CLIENT).get_or_create()
45+
46+
TBL_NAMES = ['students', 'grades']
47+
VIEW_NAMES = [n + "_view" for n in TBL_NAMES]
48+
49+
df_students = pd.read_csv('students.csv')
50+
df_grades = pd.read_csv('grades.csv')
51+
52+
sql.register_table(TBL_NAMES[0], SQLSchema({"name": "STRING", "id": "INT"}))
53+
sql2.register_table(TBL_NAMES[1], SQLSchema({
54+
"student_id": "INT",
55+
"science": "INT",
56+
"maths": "INT",
57+
"art": "INT"
58+
}))
59+
60+
sql.register_view(VIEW_NAMES[0], f"SELECT * FROM {TBL_NAMES[0]}")
61+
sql2.register_view(VIEW_NAMES[1], f"SELECT * FROM {TBL_NAMES[1]}")
62+
63+
sql.connect_source_pandas(TBL_NAMES[0], df_students)
64+
sql2.connect_source_pandas(TBL_NAMES[1], df_grades)
65+
66+
out = sql.listen(VIEW_NAMES[0])
67+
out2 = sql2.listen(VIEW_NAMES[1])
68+
69+
sql.run_to_completion()
70+
sql2.run_to_completion()
71+
72+
df = out.to_pandas()
73+
df2 = out2.to_pandas()
74+
75+
assert df.columns.tolist() not in df2.columns.tolist()
76+
4077

4178
if __name__ == '__main__':
4279
unittest.main()

0 commit comments

Comments
 (0)