Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: Added Lineage APIs to get registry objects relationships
Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
  • Loading branch information
ntkathole committed Jun 29, 2025
commit 55de37e63f0137234928aa3885b29e079497a390
40 changes: 40 additions & 0 deletions protos/feast/registry/RegistryServer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ service RegistryServer{
rpc Refresh (RefreshRequest) returns (google.protobuf.Empty) {}
rpc Proto (google.protobuf.Empty) returns (feast.core.Registry) {}

// Lineage RPCs
rpc GetRegistryLineage (GetRegistryLineageRequest) returns (GetRegistryLineageResponse) {}
rpc GetObjectRelationships (GetObjectRelationshipsRequest) returns (GetObjectRelationshipsResponse) {}

}

message RefreshRequest {
Expand Down Expand Up @@ -424,3 +428,39 @@ message DeleteProjectRequest {
string name = 1;
bool commit = 2;
}

// Lineage

message EntityReference {
string type = 1; // "dataSource", "entity", "featureView", "featureService"
string name = 2;
}

message EntityRelation {
EntityReference source = 1;
EntityReference target = 2;
}

message GetRegistryLineageRequest {
string project = 1;
bool allow_cache = 2;
string filter_object_type = 3;
string filter_object_name = 4;
}

message GetRegistryLineageResponse {
repeated EntityRelation relationships = 1;
repeated EntityRelation indirect_relationships = 2;
}

message GetObjectRelationshipsRequest {
string project = 1;
string object_type = 2;
string object_name = 3;
bool include_indirect = 4;
bool allow_cache = 5;
}

message GetObjectRelationshipsResponse {
repeated EntityRelation relationships = 1;
}
2 changes: 2 additions & 0 deletions sdk/python/feast/api/registry/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from feast.api.registry.rest.entities import get_entity_router
from feast.api.registry.rest.feature_services import get_feature_service_router
from feast.api.registry.rest.feature_views import get_feature_view_router
from feast.api.registry.rest.lineage import get_lineage_router
from feast.api.registry.rest.permissions import get_permission_router
from feast.api.registry.rest.projects import get_project_router
from feast.api.registry.rest.saved_datasets import get_saved_dataset_router
Expand All @@ -14,6 +15,7 @@ def register_all_routes(app: FastAPI, grpc_handler):
app.include_router(get_data_source_router(grpc_handler))
app.include_router(get_feature_service_router(grpc_handler))
app.include_router(get_feature_view_router(grpc_handler))
app.include_router(get_lineage_router(grpc_handler))
app.include_router(get_permission_router(grpc_handler))
app.include_router(get_project_router(grpc_handler))
app.include_router(get_saved_dataset_router(grpc_handler))
142 changes: 142 additions & 0 deletions sdk/python/feast/api/registry/rest/lineage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
"""REST API endpoints for registry lineage and relationships."""

from typing import Optional

from fastapi import APIRouter, HTTPException, Query

from feast.api.registry.rest.rest_utils import grpc_call
from feast.protos.feast.registry import RegistryServer_pb2


def get_lineage_router(grpc_handler) -> APIRouter:
router = APIRouter()

@router.get("/lineage/registry")
def get_registry_lineage(
project: str = Query(...),
allow_cache: bool = Query(True),
filter_object_type: Optional[str] = Query(None),
filter_object_name: Optional[str] = Query(None),
):
"""
Get complete registry lineage with relationships and indirect relationships.
Args:
project: Project name
allow_cache: Whether to allow cached data
filter_object_type: Optional filter by object type (dataSource, entity, featureView, featureService)
filter_object_name: Optional filter by object name
Returns:
Dictionary containing relationships and indirect_relationships arrays
"""
req = RegistryServer_pb2.GetRegistryLineageRequest(
project=project,
allow_cache=allow_cache,
filter_object_type=filter_object_type or "",
filter_object_name=filter_object_name or "",
)
response = grpc_call(grpc_handler.GetRegistryLineage, req)

return {
"relationships": response.get("relationships", []),
"indirect_relationships": response.get("indirectRelationships", []),
}

@router.get("/lineage/objects/{object_type}/{object_name}")
def get_object_relationships(
object_type: str,
object_name: str,
project: str = Query(...),
include_indirect: bool = Query(False),
allow_cache: bool = Query(True),
):
"""
Get relationships for a specific object.
Args:
object_type: Type of object (dataSource, entity, featureView, featureService)
object_name: Name of the object
project: Project name
include_indirect: Whether to include indirect relationships
allow_cache: Whether to allow cached data
Returns:
Dictionary containing relationships array for the specific object
"""
valid_types = ["dataSource", "entity", "featureView", "featureService"]
if object_type not in valid_types:
raise HTTPException(
status_code=400,
detail=f"Invalid object_type. Must be one of: {valid_types}",
)

req = RegistryServer_pb2.GetObjectRelationshipsRequest(
project=project,
object_type=object_type,
object_name=object_name,
include_indirect=include_indirect,
allow_cache=allow_cache,
)
response = grpc_call(grpc_handler.GetObjectRelationships, req)

return {"relationships": response.get("relationships", [])}

@router.get("/lineage/complete")
def get_complete_registry_data(
project: str = Query(...),
allow_cache: bool = Query(True),
):
"""
Get complete registry data.
This endpoint provides all the data the UI currently loads:
- All registry objects
- Relationships
- Indirect relationships
- Merged feature view data
Returns:
Complete registry data structure.
"""
# Get lineage data
lineage_req = RegistryServer_pb2.GetRegistryLineageRequest(
project=project,
allow_cache=allow_cache,
)
lineage_response = grpc_call(grpc_handler.GetRegistryLineage, lineage_req)

# Get all registry objects
entities_req = RegistryServer_pb2.ListEntitiesRequest(
project=project, allow_cache=allow_cache
)
entities_response = grpc_call(grpc_handler.ListEntities, entities_req)

data_sources_req = RegistryServer_pb2.ListDataSourcesRequest(
project=project, allow_cache=allow_cache
)
data_sources_response = grpc_call(
grpc_handler.ListDataSources, data_sources_req
)

feature_views_req = RegistryServer_pb2.ListAllFeatureViewsRequest(
project=project, allow_cache=allow_cache
)
feature_views_response = grpc_call(
grpc_handler.ListAllFeatureViews, feature_views_req
)

feature_services_req = RegistryServer_pb2.ListFeatureServicesRequest(
project=project, allow_cache=allow_cache
)
feature_services_response = grpc_call(
grpc_handler.ListFeatureServices, feature_services_req
)

return {
"project": project,
"objects": {
"entities": entities_response.get("entities", []),
"dataSources": data_sources_response.get("dataSources", []),
"featureViews": feature_views_response.get("featureViews", []),
"featureServices": feature_services_response.get("featureServices", []),
},
"relationships": lineage_response.get("relationships", []),
"indirectRelationships": lineage_response.get("indirectRelationships", []),
}

return router
140 changes: 138 additions & 2 deletions sdk/python/feast/infra/registry/base_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,16 +788,152 @@ def refresh(self, project: Optional[str] = None):
"""Refreshes the state of the registry cache by fetching the registry state from the remote registry store."""
raise NotImplementedError

# Lineage operations
def get_registry_lineage(
self,
project: str,
allow_cache: bool = False,
filter_object_type: Optional[str] = None,
filter_object_name: Optional[str] = None,
) -> tuple[List[Any], List[Any]]:
"""
Get complete registry lineage with relationships and indirect relationships.
Args:
project: Feast project name
allow_cache: Whether to allow returning data from a cached registry
filter_object_type: Optional filter by object type (dataSource, entity, featureView, featureService)
filter_object_name: Optional filter by object name
Returns:
Tuple of (direct_relationships, indirect_relationships)
"""
from feast.lineage.registry_lineage import RegistryLineageGenerator

# Create a registry proto with all objects
registry_proto = self._build_registry_proto(project, allow_cache)

# Generate lineage
lineage_generator = RegistryLineageGenerator()
relationships, indirect_relationships = lineage_generator.generate_lineage(
registry_proto
)

# Apply filtering if specified
if filter_object_type and filter_object_name:
relationships = [
rel
for rel in relationships
if (
(
rel.source.type.value == filter_object_type
and rel.source.name == filter_object_name
)
or (
rel.target.type.value == filter_object_type
and rel.target.name == filter_object_name
)
)
]
indirect_relationships = [
rel
for rel in indirect_relationships
if (
(
rel.source.type.value == filter_object_type
and rel.source.name == filter_object_name
)
or (
rel.target.type.value == filter_object_type
and rel.target.name == filter_object_name
)
)
]

return relationships, indirect_relationships

def get_object_relationships(
self,
project: str,
object_type: str,
object_name: str,
include_indirect: bool = False,
allow_cache: bool = False,
) -> List[Any]:
"""
Get relationships for a specific object.
Args:
project: Feast project name
object_type: Type of object (dataSource, entity, featureView, featureService)
object_name: Name of the object
include_indirect: Whether to include indirect relationships
allow_cache: Whether to allow returning data from a cached registry
Returns:
List of relationships involving the specified object
"""
from feast.lineage.registry_lineage import (
RegistryLineageGenerator,
)

registry_proto = self._build_registry_proto(project, allow_cache)
lineage_generator = RegistryLineageGenerator()

return lineage_generator.get_object_relationships(
registry_proto, object_type, object_name, include_indirect=include_indirect
)

def _build_registry_proto(
self, project: str, allow_cache: bool = False
) -> RegistryProto:
"""Helper method to build a registry proto with all objects."""
registry = RegistryProto()

# Add all entities
entities = self.list_entities(project=project, allow_cache=allow_cache)
for entity in entities:
registry.entities.append(entity.to_proto())

# Add all data sources
data_sources = self.list_data_sources(project=project, allow_cache=allow_cache)
for data_source in data_sources:
registry.data_sources.append(data_source.to_proto())

# Add all feature views
feature_views = self.list_feature_views(
project=project, allow_cache=allow_cache
)
for feature_view in feature_views:
registry.feature_views.append(feature_view.to_proto())

# Add all stream feature views
stream_feature_views = self.list_stream_feature_views(
project=project, allow_cache=allow_cache
)
for stream_feature_view in stream_feature_views:
registry.stream_feature_views.append(stream_feature_view.to_proto())

# Add all on-demand feature views
on_demand_feature_views = self.list_on_demand_feature_views(
project=project, allow_cache=allow_cache
)
for on_demand_feature_view in on_demand_feature_views:
registry.on_demand_feature_views.append(on_demand_feature_view.to_proto())

# Add all feature services
feature_services = self.list_feature_services(
project=project, allow_cache=allow_cache
)
for feature_service in feature_services:
registry.feature_services.append(feature_service.to_proto())

return registry

@staticmethod
def _message_to_sorted_dict(message: Message) -> Dict[str, Any]:
return json.loads(MessageToJson(message, sort_keys=True))

def to_dict(self, project: str) -> Dict[str, List[Any]]:
"""Returns a dictionary representation of the registry contents for the specified project.

For each list in the dictionary, the elements are sorted by name, so this
method can be used to compare two registries.

Args:
project: Feast project to convert to a dict
"""
Expand Down
Loading