Skip to content

Commit edcaa7e

Browse files
committed
[python] Add unit tests for Arrow IPC query results
1 parent f14aded commit edcaa7e

File tree

1 file changed

+137
-0
lines changed

1 file changed

+137
-0
lines changed
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
"""Unit tests for FelderaClient.query_as_arrow_ipc and Pipeline.query_arrow."""
2+
3+
import builtins
4+
import io
5+
import sys
6+
from unittest.mock import MagicMock
7+
8+
import pytest
9+
10+
from feldera.rest.feldera_client import FelderaClient
11+
12+
13+
def _import_arrow_modules():
14+
pa = pytest.importorskip("pyarrow")
15+
ipc = pytest.importorskip("pyarrow.ipc")
16+
return pa, ipc
17+
18+
19+
def _make_ipc_bytes(table) -> bytes:
20+
"""Serialise a ``pyarrow.Table`` to Arrow IPC stream bytes."""
21+
_, ipc = _import_arrow_modules()
22+
buf = io.BytesIO()
23+
with ipc.new_stream(buf, table.schema) as writer:
24+
if table.num_rows > 0:
25+
writer.write_table(table)
26+
return buf.getvalue()
27+
28+
29+
def _mock_response(ipc_bytes: bytes) -> MagicMock:
30+
"""Return a mock response whose ``iter_content`` yields the IPC bytes in 1 KB chunks."""
31+
resp = MagicMock()
32+
chunk_size = 1024
33+
chunks = [
34+
ipc_bytes[i : i + chunk_size]
35+
for i in range(0, max(len(ipc_bytes), 1), chunk_size)
36+
]
37+
resp.iter_content = MagicMock(return_value=iter(chunks))
38+
return resp
39+
40+
41+
@pytest.fixture()
42+
def client() -> FelderaClient:
43+
"""A ``FelderaClient`` with a mocked HTTP layer (no real network calls)."""
44+
c = FelderaClient.__new__(FelderaClient)
45+
c.http = MagicMock()
46+
return c
47+
48+
49+
class TestQueryAsArrowIpc:
50+
def test_non_empty_result_returns_correct_data(self, client: FelderaClient):
51+
pa, _ = _import_arrow_modules()
52+
schema = pa.schema([("id", pa.int64()), ("name", pa.utf8())])
53+
expected = pa.table({"id": [1, 2, 3], "name": ["a", "b", "c"]}, schema=schema)
54+
ipc_bytes = _make_ipc_bytes(expected)
55+
56+
client.http.get.return_value = _mock_response(ipc_bytes)
57+
result = client.query_as_arrow_ipc("my_pipeline", "SELECT id, name FROM t")
58+
59+
assert isinstance(result, pa.Table)
60+
assert result.schema == schema
61+
assert result.num_rows == 3
62+
assert result.column("id").to_pylist() == [1, 2, 3]
63+
assert result.column("name").to_pylist() == ["a", "b", "c"]
64+
65+
def test_http_called_with_correct_params(self, client: FelderaClient):
66+
pa, _ = _import_arrow_modules()
67+
schema = pa.schema([("id", pa.int64())])
68+
table = pa.table({"id": [42]}, schema=schema)
69+
client.http.get.return_value = _mock_response(_make_ipc_bytes(table))
70+
71+
client.query_as_arrow_ipc("my_pipeline", "SELECT id FROM t")
72+
73+
client.http.get.assert_called_once_with(
74+
path="/pipelines/my_pipeline/query",
75+
params={
76+
"pipeline_name": "my_pipeline",
77+
"sql": "SELECT id FROM t",
78+
"format": "arrow_ipc",
79+
},
80+
stream=True,
81+
)
82+
83+
def test_empty_result_preserves_schema(self, client: FelderaClient):
84+
pa, _ = _import_arrow_modules()
85+
schema = pa.schema([("id", pa.int64()), ("value", pa.float64())])
86+
empty = pa.table(
87+
{
88+
"id": pa.array([], type=pa.int64()),
89+
"value": pa.array([], type=pa.float64()),
90+
},
91+
schema=schema,
92+
)
93+
client.http.get.return_value = _mock_response(_make_ipc_bytes(empty))
94+
95+
result = client.query_as_arrow_ipc(
96+
"my_pipeline", "SELECT id, value FROM t WHERE false"
97+
)
98+
99+
assert isinstance(result, pa.Table)
100+
assert result.schema == schema
101+
assert result.num_rows == 0
102+
103+
def test_missing_pyarrow_raises_helpful_import_error(self, client: FelderaClient, monkeypatch):
104+
real_import = builtins.__import__
105+
106+
def _import(name, globals=None, locals=None, fromlist=(), level=0):
107+
if name == "pyarrow" or name.startswith("pyarrow."):
108+
raise ImportError("No module named 'pyarrow'")
109+
return real_import(name, globals, locals, fromlist, level)
110+
111+
monkeypatch.delitem(sys.modules, "pyarrow", raising=False)
112+
monkeypatch.delitem(sys.modules, "pyarrow.ipc", raising=False)
113+
monkeypatch.setattr(builtins, "__import__", _import)
114+
115+
with pytest.raises(ImportError, match="pip install feldera\\[arrow\\]"):
116+
client.query_as_arrow_ipc("my_pipeline", "SELECT 1")
117+
118+
client.http.get.assert_not_called()
119+
120+
121+
class TestPipelineQueryArrow:
122+
def test_query_arrow_delegates_to_client(self):
123+
"""Pipeline.query_arrow must forward to client.query_as_arrow_ipc."""
124+
from feldera.pipeline import Pipeline
125+
126+
pipeline = Pipeline.__new__(Pipeline)
127+
pipeline._inner = MagicMock()
128+
pipeline._inner.name = "pipe1"
129+
pipeline.client = MagicMock()
130+
131+
expected = object()
132+
pipeline.client.query_as_arrow_ipc.return_value = expected
133+
134+
result = pipeline.query_arrow("SELECT x FROM v")
135+
136+
pipeline.client.query_as_arrow_ipc.assert_called_once_with("pipe1", "SELECT x FROM v")
137+
assert result is expected

0 commit comments

Comments
 (0)