forked from feast-dev/feast
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_feature_views.py
More file actions
119 lines (98 loc) · 3.23 KB
/
test_feature_views.py
File metadata and controls
119 lines (98 loc) · 3.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
from datetime import timedelta
import pytest
from typeguard import TypeCheckError
from feast.batch_feature_view import BatchFeatureView
from feast.data_format import AvroFormat
from feast.data_source import KafkaSource
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.field import Field
from feast.infra.offline_stores.file_source import FileSource
from feast.protos.feast.types.Value_pb2 import ValueType
from feast.types import Float32
def test_create_feature_view_with_conflicting_entities():
user1 = Entity(name="user1", join_keys=["user_id"])
user2 = Entity(name="user2", join_keys=["user_id"])
batch_source = FileSource(path="some path")
with pytest.raises(ValueError):
_ = FeatureView(
name="test",
entities=[user1, user2],
ttl=timedelta(days=30),
source=batch_source,
)
def test_create_batch_feature_view():
batch_source = FileSource(path="some path")
BatchFeatureView(
name="test batch feature view",
entities=[],
ttl=timedelta(days=30),
source=batch_source,
)
with pytest.raises(TypeError):
BatchFeatureView(
name="test batch feature view", entities=[], ttl=timedelta(days=30)
)
stream_source = KafkaSource(
name="kafka",
timestamp_field="event_timestamp",
kafka_bootstrap_servers="",
message_format=AvroFormat(""),
topic="topic",
batch_source=FileSource(path="some path"),
)
with pytest.raises(ValueError):
BatchFeatureView(
name="test batch feature view",
entities=[],
ttl=timedelta(days=30),
source=stream_source,
)
def simple_udf(x: int):
return x + 3
def test_hash():
file_source = FileSource(name="my-file-source", path="test.parquet")
feature_view_1 = FeatureView(
name="my-feature-view",
entities=[],
schema=[
Field(name="feature1", dtype=Float32),
Field(name="feature2", dtype=Float32),
],
source=file_source,
)
feature_view_2 = FeatureView(
name="my-feature-view",
entities=[],
schema=[
Field(name="feature1", dtype=Float32),
Field(name="feature2", dtype=Float32),
],
source=file_source,
)
feature_view_3 = FeatureView(
name="my-feature-view",
entities=[],
schema=[Field(name="feature1", dtype=Float32)],
source=file_source,
)
feature_view_4 = FeatureView(
name="my-feature-view",
entities=[],
schema=[Field(name="feature1", dtype=Float32)],
source=file_source,
description="test",
)
s1 = {feature_view_1, feature_view_2}
assert len(s1) == 1
s2 = {feature_view_1, feature_view_3}
assert len(s2) == 2
s3 = {feature_view_3, feature_view_4}
assert len(s3) == 2
s4 = {feature_view_1, feature_view_2, feature_view_3, feature_view_4}
assert len(s4) == 3
# TODO(felixwang9817): Add tests for proto conversion.
# TODO(felixwang9817): Add tests for field mapping logic.
def test_field_types():
with pytest.raises(TypeCheckError):
Field(name="name", dtype=ValueType.INT32)