Skip to content

Commit 349190e

Browse files
abhizerryzhyk
authored andcommitted
py: implement SQLContext.wait_for_completion
Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
1 parent af11c56 commit 349190e

File tree

6 files changed

+298
-54
lines changed

6 files changed

+298
-54
lines changed

python/docs/conf.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
'sphinx.ext.viewcode',
2424
'sphinx.ext.todo',
2525
'sphinx.ext.doctest',
26+
'enum_tools.autoenum',
2627
]
2728

2829
templates_path = ['_templates']

python/feldera/enums.py

Lines changed: 157 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,177 @@
11
from enum import Enum
22

33

4-
class CompilationProfile(Enum):
4+
# https://stackoverflow.com/questions/50473951/how-can-i-attach-documentation-to-members-of-a-python-enum/50473952#50473952
5+
class DocEnum(Enum):
6+
"""
7+
:meta private:
8+
"""
9+
10+
def __new__(cls, value, doc=None):
11+
self = object.__new__(cls) # calling super().__new__(value) here would fail
12+
self._value_ = value
13+
if doc is not None:
14+
self.__doc__ = doc
15+
return self
16+
17+
18+
class CompilationProfile(DocEnum):
519
"""
620
The compilation profile to use when compiling the program.
721
"""
822

9-
SERVER_DEFAULT = None
23+
SERVER_DEFAULT = None, "The compiler server default compilation profile."
24+
25+
DEV = "dev", "The development compilation profile."
26+
27+
UNOPTIMIZED = "unoptimized", "The unoptimized compilation profile."
28+
29+
OPTIMIZED = "optimized", "The optimized compilation profile, the default for this API."
30+
31+
32+
class BuildMode(DocEnum):
33+
CREATE = 1
34+
GET = 2
35+
GET_OR_CREATE = 3
36+
37+
38+
class PipelineStatus(DocEnum):
1039
"""
11-
The compiler server default compilation profile.
40+
Represents the state that this pipeline is currently in.
41+
42+
.. code-block:: text
43+
44+
Shutdown ◄────┐
45+
│ │
46+
/deploy │ │
47+
│ ⌛ShuttingDown
48+
▼ ▲
49+
⌛Provisioning │
50+
│ │
51+
Provisioned │
52+
▼ │/shutdown
53+
⌛Initializing │
54+
│ │
55+
┌────────┴─────────┴─┐
56+
│ ▼ │
57+
│ Paused │
58+
│ │ ▲ │
59+
│/start│ │/pause │
60+
│ ▼ │ │
61+
│ Running │
62+
└──────────┬─────────┘
63+
64+
65+
Failed
66+
"""
67+
68+
UNINITIALIZED = 1, """
69+
The pipeline has not been created yet.
1270
"""
1371

14-
DEV = "dev"
72+
SHUTDOWN = 2, """
73+
Pipeline has not been started or has been shut down.
74+
75+
The pipeline remains in this state until the user triggers
76+
a deployment by invoking the `/deploy` endpoint.
1577
"""
16-
The development compilation profile.
78+
79+
PROVISIONING = 3, """
80+
The runner triggered a deployment of the pipeline and is
81+
waiting for the pipeline HTTP server to come up.
82+
83+
In this state, the runner provisions a runtime for the pipeline,
84+
starts the pipeline within this runtime and waits for it to start accepting HTTP requests.
85+
86+
The user is unable to communicate with the pipeline during this
87+
time. The pipeline remains in this state until:
88+
89+
1. Its HTTP server is up and running; the pipeline transitions to the
90+
:py:enum:mem:`PipelineStatus.INITIALIZING` state.
91+
2. A pre-defined timeout has passed. The runner performs forced
92+
shutdown of the pipeline; returns to the :py:enum:mem:`PipelineStatus.SHUTDOWN` state.
93+
3. The user cancels the pipeline by invoking the `/shutdown` endpoint.
94+
The manager performs forced shutdown of the pipeline, returns to the
95+
:py:enum:mem:`PipelineStatus.SHUTDOWN` state.
96+
1797
"""
1898

19-
UNOPTIMIZED = "unoptimized"
99+
INITIALIZING = 4, """
100+
The pipeline is initializing its internal state and connectors.
101+
102+
This state is part of the pipeline's deployment process. In this state,
103+
the pipeline's HTTP server is up and running, but its query engine
104+
and input and output connectors are still initializing.
105+
106+
The pipeline remains in this state until:
107+
108+
1. Initialization completes successfully; the pipeline transitions to the
109+
:py:enum:mem:`PipelineStatus.PAUSED` state.
110+
2. Initialization fails; transitions to the :py:enum:mem:`PipelineStatus.FAILED` state.
111+
3. A pre-defined timeout has passed. The runner performs forced
112+
shutdown of the pipeline; returns to the :py:enum:mem:`PipelineStatus.SHUTDOWN` state.
113+
4. The user cancels the pipeline by invoking the `/shutdown` endpoint.
114+
The manager performs forced shutdown of the pipeline; returns to the
115+
:py:enum:mem:`PipelineStatus.SHUTDOWN` state.
116+
20117
"""
21-
The unoptimized compilation profile.
118+
119+
PAUSED = 5, """
120+
The pipeline is fully initialized, but data processing has been paused.
121+
122+
The pipeline remains in this state until:
123+
124+
1. The user starts the pipeline by invoking the `/start` endpoint. The
125+
manager passes the request to the pipeline; transitions to the
126+
:py:enum:mem:`PipelineStatus.RUNNING` state.
127+
2. The user cancels the pipeline by invoking the `/shutdown` endpoint.
128+
The manager passes the shutdown request to the pipeline to perform a
129+
graceful shutdown; transitions to the :py:enum:mem:`PipelineStatus.SHUTTING_DOWN` state.
130+
3. An unexpected runtime error renders the pipeline :py:enum:mem:`PipelineStatus.FAILED`.
131+
22132
"""
23133

24-
OPTIMIZED = "optimized"
134+
RUNNING = 6, """
135+
The pipeline is processing data.
136+
137+
The pipeline remains in this state until:
138+
139+
1. The user pauses the pipeline by invoking the `/pause` endpoint. The
140+
manager passes the request to the pipeline; transitions to the
141+
:py:enum:mem:`PipelineStatus.PAUSED` state.
142+
2. The user cancels the pipeline by invoking the `/shutdown` endpoint.
143+
The runner passes the shutdown request to the pipeline to perform a
144+
graceful shutdown; transitions to the
145+
:py:enum:mem:`PipelineStatus.SHUTTING_DOWN` state.
146+
3. An unexpected runtime error renders the pipeline
147+
:py:enum:mem:`PipelineStatus.FAILED`.
148+
25149
"""
26-
The optimized compilation profile, the default for this API.
150+
151+
SHUTTING_DOWN = 7, """
152+
Graceful shutdown in progress.
153+
154+
In this state, the pipeline finishes any ongoing data processing,
155+
produces final outputs, shuts down input/output connectors and
156+
terminates.
157+
158+
The pipeline remains in this state until:
159+
160+
1. Shutdown completes successfully; transitions to the :py:enum:mem:`PipelineStatus.SHUTDOWN` state.
161+
2. A pre-defined timeout has passed. The manager performs forced shutdown of the pipeline; returns to the
162+
:py:enum:mem:`PipelineStatus.SHUTDOWN` state.
163+
27164
"""
28165

166+
FAILED = 8, """
167+
The pipeline remains in this state until the users acknowledge the failure
168+
by issuing a call to shutdown the pipeline; transitions to the
169+
:py:enum:mem:`PipelineStatus.SHUTDOWN` state.
170+
"""
29171

30-
class BuildMode(Enum):
31-
CREATE = 1
32-
GET = 2
33-
GET_OR_CREATE = 3
172+
@staticmethod
173+
def from_str(value):
174+
for member in PipelineStatus:
175+
if member.name.lower() == value.lower():
176+
return member
177+
raise ValueError(f"Unknown value '{value}' for enum {PipelineStatus.__name__}")

python/feldera/output_handler.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import pandas as pd
2+
from typing import Optional
23

34
from queue import Queue
45
from feldera import FelderaClient
56
from feldera._callback_runner import CallbackRunner
67

78

89
class OutputHandler:
9-
def __init__(self, client: FelderaClient, pipeline_name: str, view_name: str, queue: Queue):
10+
def __init__(self, client: FelderaClient, pipeline_name: str, view_name: str, queue: Optional[Queue]):
1011
"""
1112
Initializes the output handler, but doesn't start it.
1213
To start the output handler, call the `.OutputHandler.start` method.
@@ -15,7 +16,7 @@ def __init__(self, client: FelderaClient, pipeline_name: str, view_name: str, qu
1516
self.client: FelderaClient = client
1617
self.pipeline_name: str = pipeline_name
1718
self.view_name: str = view_name
18-
self.queue: Queue = queue
19+
self.queue: Optional[Queue] = queue
1920
self.buffer: list[pd.DataFrame] = []
2021

2122
# the callback that is passed to the `CallbackRunner`
@@ -33,13 +34,17 @@ def start(self):
3334

3435
self.handler.start()
3536

36-
def to_pandas(self):
37+
def to_pandas(self, clear_buffer: bool = True):
3738
"""
3839
Returns the output of the pipeline as a pandas DataFrame
39-
"""
4040
41-
self.handler.join()
41+
:param clear_buffer: Whether to clear the buffer after getting the output.
42+
"""
4243

4344
if len(self.buffer) == 0:
4445
return pd.DataFrame()
45-
return pd.concat(self.buffer, ignore_index=True)
46+
res = pd.concat(self.buffer, ignore_index=True)
47+
if clear_buffer:
48+
self.buffer.clear()
49+
50+
return res

0 commit comments

Comments
 (0)