-
Notifications
You must be signed in to change notification settings - Fork 131
Expand file tree
/
Copy pathtest_doubles.py
More file actions
60 lines (51 loc) · 1.46 KB
/
Copy pathtest_doubles.py
File metadata and controls
60 lines (51 loc) · 1.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import unittest
import math
from feldera import PipelineBuilder
from tests import TEST_CLIENT
from tests.platform.helper import PipelineTestCase
from feldera.runtime_config import RuntimeConfig
from feldera.testutils import FELDERA_TEST_NUM_WORKERS, FELDERA_TEST_NUM_HOSTS
# Test serialization of doubles
class TestDouble(PipelineTestCase):
def test_local(self):
sql = """
CREATE TABLE t (
d DOUBLE
) WITH ('connectors' = '[{
"name": "t",
"transport": {
"name": "datagen",
"config": {
"seed": 1,
"plan": [{
"limit": 1,
"fields": {
"d": { "values": [0] }
}
}]
}
}
}]');
CREATE MATERIALIZED VIEW v AS
SELECT 1/d as one, 0/d as zero FROM t;
"""
pipeline = PipelineBuilder(
TEST_CLIENT,
name=self.register_for_cleanup("test_doubles"),
sql=sql,
runtime_config=RuntimeConfig(
workers=FELDERA_TEST_NUM_WORKERS,
hosts=FELDERA_TEST_NUM_HOSTS,
),
).create_or_replace()
pipeline.start_paused()
pipeline.resume()
pipeline.wait_for_completion()
data = list(pipeline.query_arrow_dicts("SELECT * FROM v"))
assert len(data) == 1
assert data[0]["one"] == float("inf")
assert math.isnan(data[0]["zero"])
pipeline.stop(force=True)
pipeline.delete(True)
if __name__ == "__main__":
unittest.main()