-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathtest_table.py
More file actions
104 lines (93 loc) · 3.21 KB
/
test_table.py
File metadata and controls
104 lines (93 loc) · 3.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import pyarrow as pa
import pytest
from cloudquery.sdk.schema import Table, Column, filter_dfs
from cloudquery.sdk.schema.table import flatten_tables
def test_table():
table = Table(
name="test_table",
columns=[Column("test_column", pa.int32())],
title="Test Table",
description="Test description",
parent=Table(name="parent_table", columns=[]),
relations=[],
is_incremental=True,
)
sch = table.to_arrow_schema()
got = Table.from_arrow_schema(sch)
assert got.name == table.name
assert got.title == table.title
assert got.description == table.description
assert got.is_incremental == table.is_incremental
assert got.parent.name == table.parent.name
def test_filter_dfs_warns_no_matches():
with pytest.raises(ValueError):
tables = [Table("test1", []), Table("test2", [])]
filter_dfs(tables, include_tables=["test3"], skip_tables=[])
with pytest.raises(ValueError):
tables = [Table("test1", []), Table("test2", [])]
filter_dfs(tables, include_tables=["*"], skip_tables=["test3"])
def test_filter_dfs():
table_grandchild = Table("test_grandchild", [Column("test_column", pa.int32())])
table_child = Table(
"test_child",
[Column("test_column", pa.int32())],
relations=[
table_grandchild,
],
)
table_top1 = Table(
"test_top1",
[Column("test_column", pa.int32())],
relations=[
table_child,
],
)
table_top2 = Table("test_top2", [Column("test_column", pa.int32())])
tables = [table_top1, table_top2]
cases = [
{
"include_tables": ["*"],
"skip_tables": [],
"skip_dependent_tables": False,
"expect_top": ["test_top1", "test_top2"],
"expect_flattened": [
"test_top1",
"test_top2",
"test_child",
"test_grandchild",
],
},
{
"include_tables": ["*"],
"skip_tables": ["test_top1"],
"skip_dependent_tables": False,
"expect_top": ["test_top2"],
"expect_flattened": ["test_top2"],
},
{
"include_tables": ["test_top1"],
"skip_tables": ["test_top2"],
"skip_dependent_tables": True,
"expect_top": ["test_top1"],
"expect_flattened": ["test_top1"],
},
{
"include_tables": ["test_child"],
"skip_tables": [],
"skip_dependent_tables": True,
"expect_top": ["test_top1"],
"expect_flattened": ["test_top1", "test_child"],
},
]
for case in cases:
got = filter_dfs(
tables=tables,
include_tables=case["include_tables"],
skip_tables=case["skip_tables"],
skip_dependent_tables=case["skip_dependent_tables"],
)
assert sorted([t.name for t in got]) == sorted(case["expect_top"]), case
got_flattened = flatten_tables(got)
want_flattened = sorted(case["expect_flattened"])
got_flattened = sorted([t.name for t in got_flattened])
assert got_flattened == want_flattened, case