|
18 | 18 | from datetime import datetime, timedelta |
19 | 19 | from typing import Any |
20 | 20 |
|
21 | | -from feast import Entity, FeatureStore, FeatureView, FileSource, RepoConfig |
| 21 | +import pandas as pd |
| 22 | + |
| 23 | +from feast import ( |
| 24 | + Entity, |
| 25 | + FeatureStore, |
| 26 | + FeatureView, |
| 27 | + FileSource, |
| 28 | + RepoConfig, |
| 29 | + RequestSource, |
| 30 | +) |
22 | 31 | from feast.driver_test_data import create_driver_hourly_stats_df |
23 | 32 | from feast.field import Field |
24 | 33 | from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig |
25 | 34 | from feast.on_demand_feature_view import on_demand_feature_view |
26 | | -from feast.types import Float32, Float64, Int64 |
| 35 | +from feast.types import Array, Float32, Float64, Int64, PdfBytes, String, ValueType |
27 | 36 |
|
28 | 37 |
|
29 | 38 | class TestOnlineWrites(unittest.TestCase): |
@@ -144,3 +153,84 @@ def test_online_retrieval(self): |
144 | 153 | "conv_rate_plus_acc", |
145 | 154 | ] |
146 | 155 | ) |
| 156 | + |
| 157 | + |
| 158 | +class TestOnlineWritesWithTransform(unittest.TestCase): |
| 159 | + def test_transform_on_write_pdf(self): |
| 160 | + with tempfile.TemporaryDirectory() as data_dir: |
| 161 | + self.store = FeatureStore( |
| 162 | + config=RepoConfig( |
| 163 | + project="test_write_to_online_store_with_transform", |
| 164 | + registry=os.path.join(data_dir, "registry.db"), |
| 165 | + provider="local", |
| 166 | + entity_key_serialization_version=2, |
| 167 | + online_store=SqliteOnlineStoreConfig( |
| 168 | + path=os.path.join(data_dir, "online.db") |
| 169 | + ), |
| 170 | + ) |
| 171 | + ) |
| 172 | + |
| 173 | + chunk = Entity( |
| 174 | + name="chunk_id", |
| 175 | + description="Chunk ID", |
| 176 | + value_type=ValueType.STRING, |
| 177 | + join_keys=["chunk_id"], |
| 178 | + ) |
| 179 | + |
| 180 | + document = Entity( |
| 181 | + name="document_id", |
| 182 | + description="Document ID", |
| 183 | + value_type=ValueType.STRING, |
| 184 | + join_keys=["document_id"], |
| 185 | + ) |
| 186 | + |
| 187 | + input_request_pdf = RequestSource( |
| 188 | + name="pdf_request_source", |
| 189 | + schema=[ |
| 190 | + Field(name="document_id", dtype=String), |
| 191 | + Field(name="pdf_bytes", dtype=PdfBytes), |
| 192 | + Field(name="file_name", dtype=String), |
| 193 | + ], |
| 194 | + ) |
| 195 | + |
| 196 | + @on_demand_feature_view( |
| 197 | + entities=[chunk, document], |
| 198 | + sources=[input_request_pdf], |
| 199 | + schema=[ |
| 200 | + Field(name="document_id", dtype=String), |
| 201 | + Field(name="chunk_id", dtype=String), |
| 202 | + Field(name="chunk_text", dtype=String), |
| 203 | + Field( |
| 204 | + name="vector", |
| 205 | + dtype=Array(Float32), |
| 206 | + vector_index=True, |
| 207 | + vector_search_metric="L2", |
| 208 | + ), |
| 209 | + ], |
| 210 | + mode="python", |
| 211 | + write_to_online_store=True, |
| 212 | + singleton=True, |
| 213 | + ) |
| 214 | + def transform_pdf_on_write_view(inputs: dict[str, Any]) -> dict[str, Any]: |
| 215 | + k = 10 |
| 216 | + return { |
| 217 | + "document_id": ["doc_1", "doc_2"], |
| 218 | + "chunk_id": ["chunk-1", "chunk-2"], |
| 219 | + "vector": [[0.5] * k, [0.4] * k], |
| 220 | + "chunk_text": ["chunk text 1", "chunk text 2"], |
| 221 | + } |
| 222 | + |
| 223 | + self.store.apply([chunk, document, transform_pdf_on_write_view]) |
| 224 | + |
| 225 | + sample_pdf = b"%PDF-1.3\n3 0 obj\n<</Type /Page\n/Parent 1 0 R\n/Resources 2 0 R\n/Contents 4 0 R>>\nendobj\n4 0 obj\n<</Filter /FlateDecode /Length 115>>\nstream\nx\x9c\x15\xcc1\x0e\x820\x18@\xe1\x9dS\xbcM]jk$\xd5\xd5(\x83!\x86\xa1\x17\xf8\xa3\xa5`LIh+\xd7W\xc6\xf7\r\xef\xc0\xbd\xd2\xaa\xb6,\xd5\xc5\xb1o\x0c\xa6VZ\xe3znn%\xf3o\xab\xb1\xe7\xa3:Y\xdc\x8bm\xeb\xf3&1\xc8\xd7\xd3\x97\xc82\xe6\x81\x87\xe42\xcb\x87Vb(\x12<\xdd<=}Jc\x0cL\x91\xee\xda$\xb5\xc3\xbd\xd7\xe9\x0f\x8d\x97 $\nendstream\nendobj\n1 0 obj\n<</Type /Pages\n/Kids [3 0 R ]\n/Count 1\n/MediaBox [0 0 595.28 841.89]\n>>\nendobj\n5 0 obj\n<</Type /Font\n/BaseFont /Helvetica\n/Subtype /Type1\n/Encoding /WinAnsiEncoding\n>>\nendobj\n2 0 obj\n<<\n/ProcSet [/PDF /Text /ImageB /ImageC /ImageI]\n/Font <<\n/F1 5 0 R\n>>\n/XObject <<\n>>\n>>\nendobj\n6 0 obj\n<<\n/Producer (PyFPDF 1.7.2 http://pyfpdf.googlecode.com/)\n/Title (This is a sample title. And this is another sentence. Finally, this is the third sentence.)\n/Author (Francisco Javier Arceo)\n/CreationDate (D:20250312165548)\n>>\nendobj\n7 0 obj\n<<\n/Type /Catalog\n/Pages 1 0 R\n/OpenAction [3 0 R /FitH null]\n/PageLayout /OneColumn\n>>\nendobj\nxref\n0 8\n0000000000 65535 f \n0000000272 00000 n \n0000000455 00000 n \n0000000009 00000 n \n0000000087 00000 n \n0000000359 00000 n \n0000000559 00000 n \n0000000734 00000 n \ntrailer\n<<\n/Size 8\n/Root 7 0 R\n/Info 6 0 R\n>>\nstartxref\n837\n%%EOF\n" |
| 226 | + sample_input = { |
| 227 | + "pdf_bytes": sample_pdf, |
| 228 | + "file_name": "sample_pdf", |
| 229 | + "document_id": "doc_1", |
| 230 | + } |
| 231 | + input_df = pd.DataFrame([sample_input]) |
| 232 | + |
| 233 | + self.store.write_to_online_store( |
| 234 | + feature_view_name="transform_pdf_on_write_view", |
| 235 | + df=input_df, |
| 236 | + ) |
0 commit comments