diff --git a/sdk/python/feast/api/registry/rest/data_sources.py b/sdk/python/feast/api/registry/rest/data_sources.py index bbc120707fc..4e522644ed3 100644 --- a/sdk/python/feast/api/registry/rest/data_sources.py +++ b/sdk/python/feast/api/registry/rest/data_sources.py @@ -1,7 +1,9 @@ import logging -from typing import Dict +from typing import Dict, Optional from fastapi import APIRouter, Depends, Query +from fastapi.responses import JSONResponse +from pydantic import BaseModel from feast.api.registry.rest.codegen_utils import ( render_data_source_code, @@ -19,6 +21,7 @@ grpc_call, parse_tags, ) +from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto from feast.protos.feast.registry import RegistryServer_pb2 from feast.type_map import _convert_value_type_str_to_value_type from feast.types import from_value_type @@ -26,6 +29,54 @@ logger = logging.getLogger(__name__) +class FileOptionsModel(BaseModel): + uri: str = "" + + +class BigQueryOptionsModel(BaseModel): + table: str = "" + query: str = "" + + +class SnowflakeOptionsModel(BaseModel): + table: str = "" + database: str = "" + schema_: str = "" + + +class RedshiftOptionsModel(BaseModel): + table: str = "" + database: str = "" + schema_: str = "" + + +class KafkaOptionsModel(BaseModel): + kafka_bootstrap_servers: str = "" + topic: str = "" + + +class SparkOptionsModel(BaseModel): + table: str = "" + path: str = "" + + +class ApplyDataSourceRequestBody(BaseModel): + name: str + project: str + type: Optional[int] = None + timestamp_field: Optional[str] = "" + created_timestamp_column: Optional[str] = "" + description: Optional[str] = "" + tags: Optional[Dict[str, str]] = {} + owner: Optional[str] = "" + file_options: Optional[FileOptionsModel] = None + bigquery_options: Optional[BigQueryOptionsModel] = None + snowflake_options: Optional[SnowflakeOptionsModel] = None + redshift_options: Optional[RedshiftOptionsModel] = None + kafka_options: Optional[KafkaOptionsModel] = None + spark_options: Optional[SparkOptionsModel] = None + + def get_data_source_router(grpc_handler) -> APIRouter: router = APIRouter() @@ -157,4 +208,69 @@ def get_data_source( result["featureDefinition"] = render_data_source_code(context) return result + @router.post("/data_sources", status_code=201) + def apply_data_source(body: ApplyDataSourceRequestBody): + ds_proto = DataSourceProto( + name=body.name, + timestamp_field=body.timestamp_field or "", + created_timestamp_column=body.created_timestamp_column or "", + description=body.description or "", + tags=body.tags or {}, + owner=body.owner or "", + ) + if body.type is not None: + ds_proto.type = body.type # type: ignore[assignment] + + if body.file_options: + ds_proto.file_options.uri = body.file_options.uri + elif body.bigquery_options: + ds_proto.bigquery_options.table = body.bigquery_options.table + ds_proto.bigquery_options.query = body.bigquery_options.query + elif body.snowflake_options: + ds_proto.snowflake_options.table = body.snowflake_options.table + ds_proto.snowflake_options.database = body.snowflake_options.database + ds_proto.snowflake_options.schema = body.snowflake_options.schema_ + elif body.redshift_options: + ds_proto.redshift_options.table = body.redshift_options.table + ds_proto.redshift_options.database = body.redshift_options.database + ds_proto.redshift_options.schema = body.redshift_options.schema_ + elif body.kafka_options: + ds_proto.kafka_options.kafka_bootstrap_servers = ( + body.kafka_options.kafka_bootstrap_servers + ) + ds_proto.kafka_options.topic = body.kafka_options.topic + elif body.spark_options: + ds_proto.spark_options.table = body.spark_options.table + ds_proto.spark_options.path = body.spark_options.path + + req = RegistryServer_pb2.ApplyDataSourceRequest( + data_source=ds_proto, + project=body.project, + commit=True, + ) + grpc_call(grpc_handler.ApplyDataSource, req) + + return JSONResponse( + status_code=201, + content={ + "name": body.name, + "project": body.project, + "status": "applied", + }, + ) + + @router.delete("/data_sources/{name}") + def delete_data_source( + name: str, + project: str = Query(...), + ): + req = RegistryServer_pb2.DeleteDataSourceRequest( + name=name, + project=project, + commit=True, + ) + grpc_call(grpc_handler.DeleteDataSource, req) + + return {"name": name, "project": project, "status": "deleted"} + return router diff --git a/sdk/python/feast/api/registry/rest/entities.py b/sdk/python/feast/api/registry/rest/entities.py index d2943ea74c4..5326433c0aa 100644 --- a/sdk/python/feast/api/registry/rest/entities.py +++ b/sdk/python/feast/api/registry/rest/entities.py @@ -1,6 +1,9 @@ import logging +from typing import Dict, Optional from fastapi import APIRouter, Depends, Query +from fastapi.responses import JSONResponse +from pydantic import BaseModel from feast.api.registry.rest.codegen_utils import render_entity_code from feast.api.registry.rest.rest_utils import ( @@ -13,11 +16,23 @@ get_sorting_params, grpc_call, ) +from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto +from feast.protos.feast.core.Entity_pb2 import EntitySpecV2 as EntitySpecProto from feast.protos.feast.registry import RegistryServer_pb2 logger = logging.getLogger(__name__) +class ApplyEntityRequestBody(BaseModel): + name: str + project: str + join_key: Optional[str] = None + value_type: Optional[int] = 2 + description: Optional[str] = "" + tags: Optional[Dict[str, str]] = {} + owner: Optional[str] = "" + + def get_entity_router(grpc_handler) -> APIRouter: router = APIRouter() @@ -136,4 +151,44 @@ def get_entity( result["featureDefinition"] = render_entity_code(context) return result + @router.post("/entities", status_code=201) + def apply_entity(body: ApplyEntityRequestBody): + join_key = body.join_key if body.join_key else body.name + + spec = EntitySpecProto( + name=body.name, + value_type=body.value_type, + join_key=join_key, + description=body.description or "", + tags=body.tags or {}, + owner=body.owner or "", + ) + entity_proto = EntityProto(spec=spec) + + req = RegistryServer_pb2.ApplyEntityRequest( + entity=entity_proto, + project=body.project, + commit=True, + ) + grpc_call(grpc_handler.ApplyEntity, req) + + return JSONResponse( + status_code=201, + content={"name": body.name, "project": body.project, "status": "applied"}, + ) + + @router.delete("/entities/{name}") + def delete_entity( + name: str, + project: str = Query(...), + ): + req = RegistryServer_pb2.DeleteEntityRequest( + name=name, + project=project, + commit=True, + ) + grpc_call(grpc_handler.DeleteEntity, req) + + return {"name": name, "project": project, "status": "deleted"} + return router diff --git a/sdk/python/feast/api/registry/rest/feature_views.py b/sdk/python/feast/api/registry/rest/feature_views.py index 0c921a20870..04c1f42cefa 100644 --- a/sdk/python/feast/api/registry/rest/feature_views.py +++ b/sdk/python/feast/api/registry/rest/feature_views.py @@ -1,6 +1,10 @@ -from typing import Dict +import logging +from typing import Dict, List, Optional from fastapi import APIRouter, Depends, Query +from fastapi.responses import JSONResponse +from google.protobuf.duration_pb2 import Duration +from pydantic import BaseModel from feast.api.registry.rest.codegen_utils import render_feature_view_code from feast.api.registry.rest.rest_utils import ( @@ -14,10 +18,34 @@ paginate_and_sort, parse_tags, ) +from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto +from feast.protos.feast.core.Feature_pb2 import FeatureSpecV2 +from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto +from feast.protos.feast.core.FeatureView_pb2 import FeatureViewSpec from feast.registry_server import RegistryServer_pb2 from feast.type_map import _convert_value_type_str_to_value_type from feast.types import from_value_type +logger = logging.getLogger(__name__) + + +class FeatureModel(BaseModel): + name: str + value_type: int = 2 + + +class ApplyFeatureViewRequestBody(BaseModel): + name: str + project: str + entities: Optional[List[str]] = [] + features: Optional[List[FeatureModel]] = [] + batch_source: Optional[str] = "" + ttl_seconds: Optional[int] = None + online: Optional[bool] = True + description: Optional[str] = "" + tags: Optional[Dict[str, str]] = {} + owner: Optional[str] = "" + def _extract_feature_view_from_any(any_feature_view: dict) -> dict: """Extract the specific feature view type and data from an AnyFeatureView object. @@ -275,4 +303,64 @@ def list_all_feature_views( return result + @router.post("/feature_views", status_code=201) + def apply_feature_view(body: ApplyFeatureViewRequestBody): + feature_specs = [] + for f in body.features or []: + feature_specs.append(FeatureSpecV2(name=f.name, value_type=f.value_type)) + + batch_source_proto = ( + DataSourceProto(name=body.batch_source) if body.batch_source else None + ) + + ttl = ( + Duration(seconds=body.ttl_seconds) if body.ttl_seconds is not None else None + ) + + spec = FeatureViewSpec( + name=body.name, + entities=body.entities or [], + features=feature_specs, + tags=body.tags or {}, + online=body.online if body.online is not None else True, + description=body.description or "", + owner=body.owner or "", + ) + if ttl is not None: + spec.ttl.CopyFrom(ttl) + if batch_source_proto: + spec.batch_source.CopyFrom(batch_source_proto) + + fv_proto = FeatureViewProto(spec=spec) + + req = RegistryServer_pb2.ApplyFeatureViewRequest( + feature_view=fv_proto, + project=body.project, + commit=True, + ) + grpc_call(grpc_handler.ApplyFeatureView, req) + + return JSONResponse( + status_code=201, + content={ + "name": body.name, + "project": body.project, + "status": "applied", + }, + ) + + @router.delete("/feature_views/{name}") + def delete_feature_view( + name: str, + project: str = Query(...), + ): + req = RegistryServer_pb2.DeleteFeatureViewRequest( + name=name, + project=project, + commit=True, + ) + grpc_call(grpc_handler.DeleteFeatureView, req) + + return {"name": name, "project": project, "status": "deleted"} + return router diff --git a/sdk/python/tests/unit/api/test_api_rest_registry.py b/sdk/python/tests/unit/api/test_api_rest_registry.py index 3e6cb5e7e3d..04eacb6cb9b 100644 --- a/sdk/python/tests/unit/api/test_api_rest_registry.py +++ b/sdk/python/tests/unit/api/test_api_rest_registry.py @@ -2002,6 +2002,127 @@ def test_all_endpoints_return_404_for_invalid_objects(fastapi_test_app): assert data["error_type"] == "FeastObjectNotFoundException" +def test_apply_and_delete_entity_via_rest(fastapi_test_app): + """Test POST /entities and DELETE /entities/{name} endpoints.""" + # Apply a new entity + response = fastapi_test_app.post( + "/entities", + json={ + "name": "driver_id", + "project": "demo_project", + "join_key": "driver_id", + "value_type": 2, + "description": "Driver entity", + "owner": "ml-team", + }, + ) + assert response.status_code == 201 + data = response.json() + assert data["name"] == "driver_id" + assert data["status"] == "applied" + + # Verify it exists + response = fastapi_test_app.get("/entities/driver_id?project=demo_project") + assert response.status_code == 200 + assert response.json()["spec"]["name"] == "driver_id" + + # Delete it + response = fastapi_test_app.delete("/entities/driver_id?project=demo_project") + assert response.status_code == 200 + data = response.json() + assert data["name"] == "driver_id" + assert data["status"] == "deleted" + + # Verify it's gone + response = fastapi_test_app.get("/entities/driver_id?project=demo_project") + assert response.status_code == 404 + + +def test_apply_and_delete_data_source_via_rest(fastapi_test_app): + """Test POST /data_sources and DELETE /data_sources/{name} endpoints.""" + # Apply a new file data source + response = fastapi_test_app.post( + "/data_sources", + json={ + "name": "test_file_source", + "project": "demo_project", + "type": 1, + "timestamp_field": "event_timestamp", + "description": "Test file source", + "file_options": {"uri": "s3://bucket/path/data.parquet"}, + }, + ) + assert response.status_code == 201 + data = response.json() + assert data["name"] == "test_file_source" + assert data["status"] == "applied" + + # Verify it exists + response = fastapi_test_app.get( + "/data_sources/test_file_source?project=demo_project" + ) + assert response.status_code == 200 + assert response.json()["name"] == "test_file_source" + + # Delete it + response = fastapi_test_app.delete( + "/data_sources/test_file_source?project=demo_project" + ) + assert response.status_code == 200 + data = response.json() + assert data["name"] == "test_file_source" + assert data["status"] == "deleted" + + # Verify it's gone + response = fastapi_test_app.get( + "/data_sources/test_file_source?project=demo_project" + ) + assert response.status_code == 404 + + +def test_apply_and_delete_feature_view_via_rest(fastapi_test_app): + """Test POST /feature_views and DELETE /feature_views/{name} endpoints.""" + # Apply a new feature view (no batch_source: a bare DataSourceProto with only a + # name but no type is rejected by the registry's source-type validation) + response = fastapi_test_app.post( + "/feature_views", + json={ + "name": "driver_stats", + "project": "demo_project", + "entities": ["user_id"], + "features": [ + {"name": "trip_count", "value_type": 2}, + {"name": "avg_rating", "value_type": 4}, + ], + "ttl_seconds": 86400, + "online": True, + "description": "Driver statistics feature view", + }, + ) + assert response.status_code == 201 + data = response.json() + assert data["name"] == "driver_stats" + assert data["status"] == "applied" + + # Verify it exists + response = fastapi_test_app.get("/feature_views/driver_stats?project=demo_project") + assert response.status_code == 200 + assert response.json()["spec"]["name"] == "driver_stats" + + # Delete it + response = fastapi_test_app.delete( + "/feature_views/driver_stats?project=demo_project" + ) + assert response.status_code == 200 + data = response.json() + assert data["name"] == "driver_stats" + assert data["status"] == "deleted" + + # Verify it's gone + response = fastapi_test_app.get("/feature_views/driver_stats?project=demo_project") + assert response.status_code == 404 + + def test_metrics_resource_counts_nonexistent_project(fastapi_test_app): """Test /metrics/resource_counts with a non-existent project returns empty data.""" response = fastapi_test_app.get(