Skip to content

Commit 69f28c4

Browse files
committed
py: add Pipeline.errors: return list of all errors
Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
1 parent bbf5600 commit 69f28c4

File tree

3 files changed

+58
-0
lines changed

3 files changed

+58
-0
lines changed

python/docs/examples.rst

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,36 @@ Starting a Pipeline
8383
8484
pipeline.start()
8585
86+
Analyzing Existing Feldera Pipeline for Errors
87+
==============================================
88+
89+
First let's create a Feldera pipeline that errors from the web console, with the
90+
name ``check_error`` and invalid SQL as follows:
91+
92+
.. code-block:: sql
93+
94+
SELECT invalid
95+
96+
This will fail to compile.
97+
98+
We can use this Python SDK to connect to this Feldera pipeline to check if it has
99+
any errors as follows:
100+
101+
.. code-block:: python
102+
103+
pipeline = Pipeline.get("check_error", client)
104+
err = pipeline.errors()
105+
106+
if len(err) != 0:
107+
print("got err: ", err)
108+
109+
Here, ``err`` is a list of all errors in this pipeline. The above code will emit
110+
the following output:
111+
112+
.. code-block:: text
113+
114+
got err: [{'sql_compilation': {'exit_code': 1, 'messages': [{'start_line_number': 1, 'start_column': 1, 'end_line_number': 1, 'end_column': 14, 'warning': False, 'error_type': 'Not supported', 'message': 'Raw \'SELECT\' statements are not supported; did you forget to CREATE VIEW?: SELECT "invalid"', 'snippet': ' 1|SELECT invalid\n ^^^^^^^^^^^^^^\n'}]}}]
115+
86116
87117
Using Pandas DataFrames
88118
=======================

python/feldera/pipeline.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -816,3 +816,18 @@ def program_error(self) -> Mapping[str, Any]:
816816

817817
self.refresh()
818818
return self._inner.program_error
819+
820+
def errors(self) -> List[Mapping[str, Any]]:
821+
"""
822+
Returns a list of all errors in this pipeline.
823+
"""
824+
errors = []
825+
perr = self.program_error()
826+
for e in perr.keys():
827+
err = perr.get(e)
828+
if err and err.get("exit_code", 0) != 0:
829+
errors.append({e: err})
830+
derr = self.deployment_error()
831+
if derr:
832+
errors.append(derr)
833+
return errors

python/tests/test_pipeline_builder.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1173,6 +1173,19 @@ def test_program_error1(self):
11731173
pipeline.shutdown()
11741174
pipeline.delete()
11751175

1176+
def test_errors0(self):
1177+
sql = "SELECT invalid"
1178+
name = "test_errors0"
1179+
1180+
try:
1181+
_ = PipelineBuilder(TEST_CLIENT, name, sql).create_or_replace()
1182+
except Exception:
1183+
pass
1184+
1185+
pipeline = Pipeline.get(name, TEST_CLIENT)
1186+
1187+
assert pipeline.errors()[0]["sql_compilation"]["exit_code"] != 0
1188+
11761189

11771190
if __name__ == "__main__":
11781191
unittest.main()

0 commit comments

Comments
 (0)