|
11 | 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | 12 | # See the License for the specific language governing permissions and |
13 | 13 | # limitations under the License. |
14 | | -import time |
15 | 14 |
|
16 | 15 | import grpc |
17 | 16 | import pandas as pd |
| 17 | +import numpy as np |
18 | 18 | import feast.core.CoreService_pb2_grpc as Core |
19 | 19 | import feast.serving.ServingService_pb2_grpc as Serving |
20 | | -from feast.entity import Entity |
21 | 20 | from feast.core.CoreService_pb2 import GetFeastCoreVersionResponse |
22 | | -from feast.core.CoreService_pb2 import GetFeatureSetsResponse |
23 | | -from feast.core.FeatureSet_pb2 import FeatureSetSpec |
24 | | -from feast.serving.ServingService_pb2 import GetFeastServingVersionResponse |
| 21 | +from feast.serving.ServingService_pb2 import ( |
| 22 | + GetFeastServingVersionResponse, |
| 23 | + GetOnlineFeaturesResponse, |
| 24 | +) |
| 25 | +from google.protobuf.timestamp_pb2 import Timestamp |
25 | 26 | import pytest |
26 | 27 | from feast.client import Client |
27 | | -from feast.tests import dataframes |
28 | 28 | from concurrent import futures |
29 | 29 | from feast.tests.feast_core_server import CoreServicer |
30 | 30 | from feast.tests.feast_serving_server import ServingServicer |
31 | | -from feast.feature import Feature |
32 | | -from feast.feature_set import FeatureSet |
33 | | -from feast.value_type import ValueType |
34 | | -from feast.source import KafkaSource |
35 | | -from feast.tests.fake_kafka import FakeKafka |
| 31 | +from feast.types import FeatureRow_pb2 as FeatureRowProto |
| 32 | +from feast.types import Field_pb2 as FieldProto |
| 33 | +from feast.types import Value_pb2 as ValueProto |
36 | 34 |
|
37 | 35 | CORE_URL = "core.feast.example.com" |
38 | 36 | SERVING_URL = "serving.example.com" |
@@ -94,63 +92,56 @@ def test_version(self, mock_client, mocker): |
94 | 92 | and status["serving"]["version"] == "0.3.0" |
95 | 93 | ) |
96 | 94 |
|
97 | | - @pytest.mark.parametrize("dataframe", [dataframes.GOOD]) |
98 | | - def test_ingest_then_get_one_feature_set_success( |
99 | | - self, core_server, dataframe: pd.DataFrame |
100 | | - ): |
101 | | - # Create and register Fake Kafka |
102 | | - fake_kafka = FakeKafka() |
| 95 | + def test_get_feature(self, mock_client, mocker): |
| 96 | + ROW_COUNT = 300 |
103 | 97 |
|
104 | | - # Set up Feast Serving with Fake Kafka |
105 | | - server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) |
106 | | - Serving.add_ServingServiceServicer_to_server( |
107 | | - ServingServicer(kafka=fake_kafka), server |
| 98 | + mock_client._serving_service_stub = Serving.ServingServiceStub( |
| 99 | + grpc.insecure_channel("") |
108 | 100 | ) |
109 | | - server.add_insecure_port("[::]:50052") |
110 | | - server.start() |
111 | 101 |
|
112 | | - # Create Feast client and register with Core and Serving |
113 | | - client = Client(core_url="localhost:50051", serving_url="localhost:50052") |
114 | | - |
115 | | - # Create feature set and update based on dataframe |
116 | | - feature_set_1 = FeatureSet( |
117 | | - name="feature_set_1", |
118 | | - entities=[Entity(name="entity_id", dtype=ValueType.INT32)], |
119 | | - features=[ |
120 | | - Feature(name="feature_1", dtype=ValueType.FLOAT), |
121 | | - Feature(name="feature_2", dtype=ValueType.STRING), |
122 | | - Feature(name="feature_3", dtype=ValueType.INT32), |
123 | | - ], |
| 102 | + response = GetOnlineFeaturesResponse() |
| 103 | + feature_row = FeatureRowProto.FeatureRow( |
| 104 | + event_timestamp=Timestamp(), feature_set="feature_set_1:1" |
| 105 | + ) |
| 106 | + for feature_num in range(1, 10): |
| 107 | + field = FieldProto.Field( |
| 108 | + name="feature_" + str(feature_num), |
| 109 | + value=ValueProto.Value(int64_val=feature_num), |
| 110 | + ) |
| 111 | + feature_row.fields.append(field) |
| 112 | + |
| 113 | + feature_data_set = GetOnlineFeaturesResponse.FeatureDataset( |
| 114 | + name="feature_set_1", version=1 |
124 | 115 | ) |
125 | 116 |
|
126 | | - # Register feature set with Feast core |
127 | | - client.apply(feature_set_1) |
| 117 | + for row_number in range(1, ROW_COUNT + 1): |
| 118 | + feature_data_set.feature_rows.append(feature_row) |
128 | 119 |
|
129 | | - # Register Fake Kafka with feature set |
130 | | - feature_set_1._message_producer = fake_kafka |
| 120 | + response.feature_datasets.append(feature_data_set) |
131 | 121 |
|
132 | | - # Ingest data into Feast using Fake Kafka |
133 | | - feature_set_1.ingest(dataframe) |
| 122 | + mocker.patch.object( |
| 123 | + mock_client._serving_service_stub, |
| 124 | + "GetOnlineFeatures", |
| 125 | + return_value=response, |
| 126 | + ) |
134 | 127 |
|
135 | | - time.sleep(2) |
| 128 | + entity_data = pd.DataFrame( |
| 129 | + {"datetime": np.repeat(4, ROW_COUNT), "entity_id": np.repeat(4, ROW_COUNT)} |
| 130 | + ) |
136 | 131 |
|
137 | | - # Retrieve feature values from Feast serving |
138 | | - feature_dataframe = client.get( |
139 | | - entity_data=dataframe[["datetime", "entity_id"]], |
| 132 | + feature_dataframe = mock_client.get( |
| 133 | + entity_data=entity_data, |
140 | 134 | feature_ids=[ |
141 | | - "feature_set_1.feature_1", |
142 | | - "feature_set_1.feature_2", |
143 | | - "feature_set_1.feature_3", |
| 135 | + "feature_set_1:1.feature_1", |
| 136 | + "feature_set_1:1.feature_2", |
| 137 | + "feature_set_1:1.feature_3", |
| 138 | + "feature_set_1:1.feature_4", |
| 139 | + "feature_set_1:1.feature_5", |
| 140 | + "feature_set_1:1.feature_6", |
| 141 | + "feature_set_1:1.feature_7", |
| 142 | + "feature_set_1:1.feature_8", |
| 143 | + "feature_set_1:1.feature_9", |
144 | 144 | ], |
145 | 145 | ) |
146 | 146 |
|
147 | | - assert True |
148 | | - |
149 | | - # assert ( |
150 | | - # feature_dataframe["feature_set_1.feature_1"][0] == 0.2 |
151 | | - # and feature_dataframe["feature_set_1.feature_2"][0] == "string1" |
152 | | - # and feature_dataframe["feature_set_1.feature_3"][0] == 1 |
153 | | - # ) |
154 | | - |
155 | | - # Stop Feast Serving server |
156 | | - server.stop(0) |
| 147 | + feature_dataframe.head() |
0 commit comments