diff --git a/.secrets.baseline b/.secrets.baseline index e7126304069..ae768fd1b8c 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -957,7 +957,7 @@ "filename": "infra/feast-operator/api/v1/featurestore_types.go", "hashed_secret": "44e17306b837162269a410204daaa5ecee4ec22c", "is_verified": false, - "line_number": 906 + "line_number": 935 } ], "infra/feast-operator/api/v1/zz_generated.deepcopy.go": [ @@ -980,7 +980,7 @@ "filename": "infra/feast-operator/api/v1/zz_generated.deepcopy.go", "hashed_secret": "c2028031c154bbe86fd69bef740855c74b927dcf", "is_verified": false, - "line_number": 1528 + "line_number": 1570 } ], "infra/feast-operator/api/v1alpha1/featurestore_types.go": [ @@ -1555,5 +1555,5 @@ } ] }, - "generated_at": "2026-06-11T15:45:28Z" + "generated_at": "2026-06-22T17:54:28Z" } diff --git a/docs/reference/openlineage.md b/docs/reference/openlineage.md index 78438082d44..da91bf82f56 100644 --- a/docs/reference/openlineage.md +++ b/docs/reference/openlineage.md @@ -186,6 +186,12 @@ Captures materialization run metadata: ## Lineage Visualization +### Option 1: Feast UI (Built-in) + +Feast includes a built-in OpenLineage consumer that can receive, store, and visualize lineage from **all** OpenLineage producers (Airflow, Spark, dbt, Feast itself, etc.) directly in the Feast UI. See the [OpenLineage Consumer](#openlineage-consumer) section below. + +### Option 2: Marquez + Use [Marquez](https://marquezproject.ai/) to visualize your Feast lineage: ```bash @@ -216,3 +222,201 @@ Then access the Marquez UI at http://localhost:3000 to see your feature lineage. | Entity | InputDataset | | FeatureService | OutputDataset | | Materialization | RunEvent (START/COMPLETE/FAIL) | + +--- + +## OpenLineage Consumer + +Feast can act as an **OpenLineage consumer**, receiving lineage events from any OpenLineage-compatible producer and displaying them in the Feast UI. This eliminates the need for a separate Marquez deployment when you want to visualize cross-system data lineage alongside your feature store. + +### Consumer Architecture + +``` +Producers (Airflow, Spark, dbt, Feast, Flink, …) + │ + ▼ + POST /api/v1/lineage ──→ Event Processor ──→ Lineage Store (SQL) + │ + ▼ + Feast UI + ┌──────────────────────────┐ + │ Lineage tab │ + │ ├─ OpenLineage Graph │ + │ │ (all producers) │ + │ └─ ☐ Feast Only Lineage │ + │ (registry view) │ + │ │ + │ Events tab │ + │ └─ Event browser │ + └──────────────────────────┘ +``` + +When the consumer is **not** enabled, the Feast UI shows only the original registry-based lineage view — no tabs are added. + +### Enabling the Consumer + +Add the `consumer` section under `openlineage` in your `feature_store.yaml`: + +```yaml +project: my_project +registry: + registry_type: sql + path: postgresql://user:****@host:5432/feast # pragma: allowlist secret + +openlineage: + enabled: true + namespace: my_project + consumer: + enabled: true + store_type: sql + # Optional: separate database for lineage storage. + # If omitted, the SQL registry database is reused. + # connection_string: postgresql://user:****@host:5432/feast_lineage + api_key: "change-me" # pragma: allowlist secret + namespace_mapping: + airflow_ns: my_project + spark_ns: my_project +``` + +Or via environment variables: + +```bash +export FEAST_OPENLINEAGE_CONSUMER_ENABLED=true +export FEAST_OPENLINEAGE_CONSUMER_STORE_TYPE=sql +export FEAST_OPENLINEAGE_CONSUMER_API_KEY=change-me # pragma: allowlist secret +# Optional separate DB: +# export FEAST_OPENLINEAGE_CONSUMER_CONNECTION_STRING=postgresql://... +``` + +### Consumer Configuration Options + +| Option | Default | Description | +|--------|---------|-------------| +| `consumer.enabled` | `false` | Enable the OpenLineage consumer | +| `consumer.store_type` | `sql` | Storage backend type. Currently only `sql` is supported | +| `consumer.connection_string` | - | Optional separate database connection string. If omitted, reuses the SQL registry database | +| `consumer.api_key` | - | API key that producers must provide when sending events | +| `consumer.namespace_mapping` | `{}` | Maps OpenLineage namespaces to Feast projects for RBAC scoping | + +### Consumer API Endpoints + +When the consumer is enabled, the following endpoints are available on the Feast REST registry server: + +#### Event Receiver (Producer-facing) + +| Endpoint | Method | Description | +|----------|--------|-------------| +| `/api/v1/lineage` | `POST` | Receive a single OpenLineage event (or array of events) | +| `/api/v1/lineage/batch` | `POST` | Receive a batch of OpenLineage events | + +Both endpoints require the `X-API-Key` header (or `Authorization: Bearer `) if `consumer.api_key` is configured. + +#### OpenLineage Query Endpoints (UI-facing) + +| Endpoint | Method | Description | +|----------|--------|-------------| +| `/lineage/openlineage/graph` | `GET` | Full lineage graph with all nodes, edges, and symlinks | +| `/lineage/openlineage/graph/{node_type}/{namespace}/{name}` | `GET` | Lineage graph centered on a specific node | +| `/lineage/openlineage/events` | `GET` | Browse stored events with filtering | +| `/lineage/openlineage/jobs` | `GET` | List all known OpenLineage jobs | +| `/lineage/openlineage/datasets` | `GET` | List all known OpenLineage datasets | + +#### Registry Query Endpoints + +| Endpoint | Method | Description | +|----------|--------|-------------| +| `/lineage/registry` | `GET` | Feast registry lineage (entities, feature views, services) | +| `/lineage/registry/all` | `GET` | All registry objects with full metadata | +| `/lineage/objects/{object_type}/{object_name}` | `GET` | Detail for a specific registry object | +| `/lineage/complete` | `GET` | Complete registry lineage with relationships | +| `/lineage/complete/all` | `GET` | Complete registry lineage for all objects | + +### Configuring Producers to Send Events to Feast + +Configure any OpenLineage producer to send events to your Feast instance: + +#### Airflow + +```python +# In airflow.cfg or environment +OPENLINEAGE_URL = "http://feast-registry:8080/api" +OPENLINEAGE_API_KEY = "change-me" # pragma: allowlist secret +``` + +#### Spark + +```properties +spark.openlineage.transport.type=http +spark.openlineage.transport.url=http://feast-registry:8080/api +spark.openlineage.transport.endpoint=/v1/lineage +spark.openlineage.transport.auth.type=api_key +spark.openlineage.transport.auth.apiKey=change-me +``` + +#### dbt + +```yaml +# In profiles.yml or environment +OPENLINEAGE_URL: "http://feast-registry:8080/api" +OPENLINEAGE_API_KEY: "change-me" # pragma: allowlist secret +``` + +#### Feast (Self-reporting) + +When both the OpenLineage producer and consumer are enabled, Feast's own events (from `feast apply`, materialization, etc.) are automatically ingested into the local consumer store — no HTTP transport is needed. + +```yaml +# In feature_store.yaml +openlineage: + enabled: true + namespace: my_project + consumer: + enabled: true + api_key: change-me # pragma: allowlist secret +``` + +### Feast UI Lineage Views + +When the consumer is enabled, the lineage page in the Feast UI shows two tabs: + +**Lineage tab** + +- **OpenLineage Graph** (default) — shows lineage from all OpenLineage producers with cross-producer connectivity. Nodes are color-coded by producer (colors generated dynamically). The graph supports filtering by type, producer, and object name. Clicking a node opens a **detail panel** showing description, schema, tags, features, entities, data quality metrics, data source info, and other facets. +- **Feast Only Lineage** (checkbox) — switches to the original Feast registry view (DataSource → FeatureView → FeatureService) powered entirely by the Feast registry. + +**Events tab** + +- Browse individual OpenLineage events with filtering by event type, job name, and run ID. Expand any event to inspect the full JSON payload. + +### Cross-Producer Lineage Connectivity + +The consumer automatically links datasets across different producers when they refer to the same physical data. Linking mechanisms: + +1. **Shared namespace + name** — If Airflow writes to `s3://bucket/path` and Spark reads from the same `s3://bucket/path`, the graph connects them automatically. +2. **SymlinksDatasetFacet** — Producers can declare aliases. For example, Feast can declare that its internal `driver_hourly_stats` is a symlink to the Spark output at `s3://bucket/features/driver_hourly_stats/`. +3. **dataSource URI matching** — Datasets with matching `dataSource.uri` facets are linked even if their namespace or name differ. + +Compatible producers include Airflow, Spark, dbt, Flink, Feast, Dagster, and Great Expectations. + +### RBAC for Lineage + +The OpenLineage consumer integrates with Feast's existing RBAC: + +- **Write access** (producers sending events): Authenticated via API key in the `X-API-Key` header +- **Read access** (UI viewing lineage): Namespace-based filtering maps OpenLineage namespaces to Feast projects. Users see only lineage data for namespaces they have access to via the `namespace_mapping` configuration + +### Database Schema + +The consumer creates the following tables (automatically on first startup): + +| Table | Purpose | +|-------|---------| +| `openlineage_events` | Raw event storage with JSON payloads | +| `openlineage_jobs` | Deduplicated job records with producer, description, and facets | +| `openlineage_datasets` | Deduplicated dataset records with schema, facets, and Feast mapping | +| `openlineage_runs` | Run lifecycle tracking (START/COMPLETE/FAIL) | +| `openlineage_run_io` | Input/output relationships between runs and datasets | +| `openlineage_lineage_edges` | Materialized lineage graph edges for efficient traversal | +| `openlineage_dataset_symlinks` | Cross-producer dataset linking via `SymlinksDatasetFacet` and `dataSource` URI matching | + +By default these tables are created in the **same database** as the SQL registry (hybrid storage). Set `consumer.connection_string` to store them in a separate database instead. diff --git a/infra/feast-operator/api/v1/featurestore_types.go b/infra/feast-operator/api/v1/featurestore_types.go index 3dd68311245..830e0a77e5d 100644 --- a/infra/feast-operator/api/v1/featurestore_types.go +++ b/infra/feast-operator/api/v1/featurestore_types.go @@ -111,6 +111,35 @@ type OpenLineageConfig struct { // Keys must be valid Feast OpenLineageConfig YAML field names. // +optional ExtraConfig map[string]string `json:"extraConfig,omitempty"` + // Consumer configures the OpenLineage consumer (event receiver) that enables + // Feast to receive and display lineage from external producers (Airflow, Spark, dbt, etc.). + // +optional + Consumer *OpenLineageConsumerConfig `json:"consumer,omitempty"` +} + +// OpenLineageConsumerConfig configures the OpenLineage consumer (event receiver). +// When enabled, the Feast REST server exposes POST /api/v1/lineage to receive +// OpenLineage events from any producer, storing them for visualization in the Feast UI. +type OpenLineageConsumerConfig struct { + // Enable the OpenLineage consumer. + Enabled bool `json:"enabled"` + // StoreType is the storage backend for lineage events. Currently only "sql" is supported. + // +kubebuilder:default="sql" + // +kubebuilder:validation:Enum=sql + // +optional + StoreType *string `json:"storeType,omitempty"` + // Reference to a Secret containing the key "connection_string" for a separate + // lineage database. If omitted, the SQL registry database is reused. + // +optional + ConnectionStringSecretRef *corev1.LocalObjectReference `json:"connectionStringSecretRef,omitempty"` + // Reference to a Secret containing the key "api_key" that producers must + // provide in the X-API-Key header when sending events. + // +optional + ApiKeySecretRef *corev1.LocalObjectReference `json:"apiKeySecretRef,omitempty"` + // NamespaceMapping maps OpenLineage namespaces to Feast projects for + // RBAC-based filtering of lineage data in the UI. + // +optional + NamespaceMapping map[string]string `json:"namespaceMapping,omitempty"` } // FeatureStoreSpec defines the desired state of FeatureStore diff --git a/infra/feast-operator/api/v1/zz_generated.deepcopy.go b/infra/feast-operator/api/v1/zz_generated.deepcopy.go index 9402f95e34a..2a6b6a69266 100644 --- a/infra/feast-operator/api/v1/zz_generated.deepcopy.go +++ b/infra/feast-operator/api/v1/zz_generated.deepcopy.go @@ -1043,6 +1043,11 @@ func (in *OpenLineageConfig) DeepCopyInto(out *OpenLineageConfig) { (*out)[key] = val } } + if in.Consumer != nil { + in, out := &in.Consumer, &out.Consumer + *out = new(OpenLineageConsumerConfig) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OpenLineageConfig. @@ -1055,6 +1060,43 @@ func (in *OpenLineageConfig) DeepCopy() *OpenLineageConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OpenLineageConsumerConfig) DeepCopyInto(out *OpenLineageConsumerConfig) { + *out = *in + if in.StoreType != nil { + in, out := &in.StoreType, &out.StoreType + *out = new(string) + **out = **in + } + if in.ConnectionStringSecretRef != nil { + in, out := &in.ConnectionStringSecretRef, &out.ConnectionStringSecretRef + *out = new(corev1.LocalObjectReference) + **out = **in + } + if in.ApiKeySecretRef != nil { + in, out := &in.ApiKeySecretRef, &out.ApiKeySecretRef + *out = new(corev1.LocalObjectReference) + **out = **in + } + if in.NamespaceMapping != nil { + in, out := &in.NamespaceMapping, &out.NamespaceMapping + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OpenLineageConsumerConfig. +func (in *OpenLineageConsumerConfig) DeepCopy() *OpenLineageConsumerConfig { + if in == nil { + return nil + } + out := new(OpenLineageConsumerConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *OptionalCtrConfigs) DeepCopyInto(out *OptionalCtrConfigs) { *out = *in diff --git a/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml b/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml index 0851b9abf96..e0386d063f5 100644 --- a/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml +++ b/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml @@ -793,6 +793,59 @@ spec: type: string type: object x-kubernetes-map-type: atomic + consumer: + description: |- + Consumer configures the OpenLineage consumer (event receiver) that enables + Feast to receive and display lineage from... + properties: + apiKeySecretRef: + description: |- + Reference to a Secret containing the key "api_key" that producers must + provide in the X-API-Key header when sending... + properties: + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. + type: string + type: object + x-kubernetes-map-type: atomic + connectionStringSecretRef: + description: |- + Reference to a Secret containing the key "connection_string" for a separate + lineage database. + properties: + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. + type: string + type: object + x-kubernetes-map-type: atomic + enabled: + description: Enable the OpenLineage consumer. + type: boolean + namespaceMapping: + additionalProperties: + type: string + description: |- + NamespaceMapping maps OpenLineage namespaces to Feast projects for + RBAC-based filtering of lineage data in the UI. + type: object + storeType: + default: sql + description: StoreType is the storage backend for lineage + events. Currently only "sql" is supported. + enum: + - sql + type: string + required: + - enabled + type: object enabled: description: Enable OpenLineage integration. type: boolean @@ -6799,6 +6852,59 @@ spec: type: string type: object x-kubernetes-map-type: atomic + consumer: + description: |- + Consumer configures the OpenLineage consumer (event receiver) that enables + Feast to receive and display lineage from... + properties: + apiKeySecretRef: + description: |- + Reference to a Secret containing the key "api_key" that producers must + provide in the X-API-Key header when sending... + properties: + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. + type: string + type: object + x-kubernetes-map-type: atomic + connectionStringSecretRef: + description: |- + Reference to a Secret containing the key "connection_string" for a separate + lineage database. + properties: + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. + type: string + type: object + x-kubernetes-map-type: atomic + enabled: + description: Enable the OpenLineage consumer. + type: boolean + namespaceMapping: + additionalProperties: + type: string + description: |- + NamespaceMapping maps OpenLineage namespaces to Feast projects for + RBAC-based filtering of lineage data in the UI. + type: object + storeType: + default: sql + description: StoreType is the storage backend for lineage + events. Currently only "sql" is supported. + enum: + - sql + type: string + required: + - enabled + type: object enabled: description: Enable OpenLineage integration. type: boolean diff --git a/infra/feast-operator/config/samples/v1_featurestore_openlineage_consumer.yaml b/infra/feast-operator/config/samples/v1_featurestore_openlineage_consumer.yaml new file mode 100644 index 00000000000..9e242896135 --- /dev/null +++ b/infra/feast-operator/config/samples/v1_featurestore_openlineage_consumer.yaml @@ -0,0 +1,63 @@ +apiVersion: v1 +kind: Secret +metadata: + name: openlineage-producer-secret + namespace: feast +stringData: + api_key: "your-marquez-api-key" #pragma: allowlist secret +--- +apiVersion: v1 +kind: Secret +metadata: + name: openlineage-consumer-secret + namespace: feast +stringData: + api_key: "consumer-api-key-for-producers" #pragma: allowlist secret +--- +apiVersion: feast.dev/v1 +kind: FeatureStore +metadata: + name: sample-openlineage-consumer + namespace: feast +spec: + feastProject: my_project + services: + registry: + local: + persistence: + store: + type: sql + secretRef: + name: registry-db-secret + openlineage: + enabled: true + transportType: http + transportUrl: "http://localhost:8080/api" + transportEndpoint: "v1/lineage" + apiKeySecretRef: + name: openlineage-producer-secret + extraConfig: + namespace: "my_project" + producer: "feast-operator" + emit_on_apply: "true" + emit_on_materialize: "true" + # consumer enables Feast as an OpenLineage event receiver. + # External producers (Airflow, Spark, dbt) can POST events to + # the Feast REST server at POST /api/v1/lineage. + # The Feast UI then displays lineage from all producers in + # Registry, OpenLineage, and Merged views. + consumer: + enabled: true + storeType: sql + # Optional: use a separate database for lineage storage. + # If omitted, the SQL registry database is reused. + # connectionStringSecretRef: + # name: lineage-db-secret + apiKeySecretRef: + name: openlineage-consumer-secret + # namespaceMapping maps OL namespaces to Feast projects + # for RBAC-scoped visibility in the UI. + namespaceMapping: + airflow_production: my_project + spark_etl: my_project + dbt_analytics: my_project diff --git a/infra/feast-operator/dist/install.yaml b/infra/feast-operator/dist/install.yaml index a6453c3e6a3..ca60530055a 100644 --- a/infra/feast-operator/dist/install.yaml +++ b/infra/feast-operator/dist/install.yaml @@ -801,6 +801,59 @@ spec: type: string type: object x-kubernetes-map-type: atomic + consumer: + description: |- + Consumer configures the OpenLineage consumer (event receiver) that enables + Feast to receive and display lineage from... + properties: + apiKeySecretRef: + description: |- + Reference to a Secret containing the key "api_key" that producers must + provide in the X-API-Key header when sending... + properties: + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. + type: string + type: object + x-kubernetes-map-type: atomic + connectionStringSecretRef: + description: |- + Reference to a Secret containing the key "connection_string" for a separate + lineage database. + properties: + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. + type: string + type: object + x-kubernetes-map-type: atomic + enabled: + description: Enable the OpenLineage consumer. + type: boolean + namespaceMapping: + additionalProperties: + type: string + description: |- + NamespaceMapping maps OpenLineage namespaces to Feast projects for + RBAC-based filtering of lineage data in the UI. + type: object + storeType: + default: sql + description: StoreType is the storage backend for lineage + events. Currently only "sql" is supported. + enum: + - sql + type: string + required: + - enabled + type: object enabled: description: Enable OpenLineage integration. type: boolean @@ -6807,6 +6860,59 @@ spec: type: string type: object x-kubernetes-map-type: atomic + consumer: + description: |- + Consumer configures the OpenLineage consumer (event receiver) that enables + Feast to receive and display lineage from... + properties: + apiKeySecretRef: + description: |- + Reference to a Secret containing the key "api_key" that producers must + provide in the X-API-Key header when sending... + properties: + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. + type: string + type: object + x-kubernetes-map-type: atomic + connectionStringSecretRef: + description: |- + Reference to a Secret containing the key "connection_string" for a separate + lineage database. + properties: + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. + type: string + type: object + x-kubernetes-map-type: atomic + enabled: + description: Enable the OpenLineage consumer. + type: boolean + namespaceMapping: + additionalProperties: + type: string + description: |- + NamespaceMapping maps OpenLineage namespaces to Feast projects for + RBAC-based filtering of lineage data in the UI. + type: object + storeType: + default: sql + description: StoreType is the storage backend for lineage + events. Currently only "sql" is supported. + enum: + - sql + type: string + required: + - enabled + type: object enabled: description: Enable OpenLineage integration. type: boolean diff --git a/infra/feast-operator/docs/api/markdown/ref.md b/infra/feast-operator/docs/api/markdown/ref.md index dd7acf55fb3..0a7782feb2a 100644 --- a/infra/feast-operator/docs/api/markdown/ref.md +++ b/infra/feast-operator/docs/api/markdown/ref.md @@ -726,6 +726,31 @@ emit_on_materialize) and transport-specific options (e.g. kafka bootstrap_servers, topic; file path). Boolean values ("true"/"false") and integer values are automatically coerced to their native YAML types. Keys must be valid Feast OpenLineageConfig YAML field names. | +| `consumer` _[OpenLineageConsumerConfig](#openlineageconsumerconfig)_ | Consumer configures the OpenLineage consumer (event receiver) that enables +Feast to receive and display lineage from external producers (Airflow, Spark, dbt, etc.). | + + +#### OpenLineageConsumerConfig + + + +OpenLineageConsumerConfig configures the OpenLineage consumer (event receiver). +When enabled, the Feast REST server exposes POST /api/v1/lineage to receive +OpenLineage events from any producer, storing them for visualization in the Feast UI. + +_Appears in:_ +- [OpenLineageConfig](#openlineageconfig) + +| Field | Description | +| --- | --- | +| `enabled` _boolean_ | Enable the OpenLineage consumer. | +| `storeType` _string_ | StoreType is the storage backend for lineage events. Currently only "sql" is supported. | +| `connectionStringSecretRef` _[LocalObjectReference](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#localobjectreference-v1-core)_ | Reference to a Secret containing the key "connection_string" for a separate +lineage database. If omitted, the SQL registry database is reused. | +| `apiKeySecretRef` _[LocalObjectReference](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#localobjectreference-v1-core)_ | Reference to a Secret containing the key "api_key" that producers must +provide in the X-API-Key header when sending events. | +| `namespaceMapping` _object (keys:string, values:string)_ | NamespaceMapping maps OpenLineage namespaces to Feast projects for +RBAC-based filtering of lineage data in the UI. | #### OptionalCtrConfigs diff --git a/infra/feast-operator/internal/controller/services/repo_config.go b/infra/feast-operator/internal/controller/services/repo_config.go index 454dd5b234a..21d04db7c0e 100644 --- a/infra/feast-operator/internal/controller/services/repo_config.go +++ b/infra/feast-operator/internal/controller/services/repo_config.go @@ -466,6 +466,54 @@ func setRepoConfigOpenLineage( yamlCfg.ApiKey = &apiKeyStr } + if ol.Consumer != nil { + consumerCfg := &OpenLineageConsumerYamlConfig{ + Enabled: ol.Consumer.Enabled, + StoreType: ol.Consumer.StoreType, + NamespaceMapping: ol.Consumer.NamespaceMapping, + } + + if ol.Consumer.ConnectionStringSecretRef != nil { + params, err := secretExtractionFunc("", ol.Consumer.ConnectionStringSecretRef.Name, "") + if err != nil { + return fmt.Errorf("failed to read consumer connection string from secret %s: %w", + ol.Consumer.ConnectionStringSecretRef.Name, err) + } + connStr, exists := params["connection_string"] + if !exists { + return fmt.Errorf("secret %q does not contain the required key \"connection_string\"", + ol.Consumer.ConnectionStringSecretRef.Name) + } + connStrStr, ok := connStr.(string) + if !ok { + return fmt.Errorf("key \"connection_string\" in secret %q must be a string, got %T", + ol.Consumer.ConnectionStringSecretRef.Name, connStr) + } + consumerCfg.ConnectionString = &connStrStr + } + + if ol.Consumer.ApiKeySecretRef != nil { + params, err := secretExtractionFunc("", ol.Consumer.ApiKeySecretRef.Name, "") + if err != nil { + return fmt.Errorf("failed to read consumer API key from secret %s: %w", + ol.Consumer.ApiKeySecretRef.Name, err) + } + apiKey, exists := params["api_key"] + if !exists { + return fmt.Errorf("secret %q does not contain the required key \"api_key\"", + ol.Consumer.ApiKeySecretRef.Name) + } + apiKeyStr, ok := apiKey.(string) + if !ok { + return fmt.Errorf("key \"api_key\" in secret %q must be a string, got %T", + ol.Consumer.ApiKeySecretRef.Name, apiKey) + } + consumerCfg.ApiKey = &apiKeyStr + } + + yamlCfg.Consumer = consumerCfg + } + repoConfig.OpenLineage = yamlCfg return nil } diff --git a/infra/feast-operator/internal/controller/services/services_types.go b/infra/feast-operator/internal/controller/services/services_types.go index 366f0c8d765..098362af96b 100644 --- a/infra/feast-operator/internal/controller/services/services_types.go +++ b/infra/feast-operator/internal/controller/services/services_types.go @@ -362,12 +362,22 @@ type MaterializationYamlConfig struct { // emit_on_apply, emit_on_materialize, transport-specific options, etc.) appear at // the same YAML level as the typed connection fields. type OpenLineageYamlConfig struct { - Enabled bool `yaml:"enabled"` - TransportType *string `yaml:"transport_type,omitempty"` - TransportUrl *string `yaml:"transport_url,omitempty"` - TransportEndpoint *string `yaml:"transport_endpoint,omitempty"` - ApiKey *string `yaml:"api_key,omitempty"` - ExtraConfig map[string]interface{} `yaml:",inline,omitempty"` + Enabled bool `yaml:"enabled"` + TransportType *string `yaml:"transport_type,omitempty"` + TransportUrl *string `yaml:"transport_url,omitempty"` + TransportEndpoint *string `yaml:"transport_endpoint,omitempty"` + ApiKey *string `yaml:"api_key,omitempty"` + ExtraConfig map[string]interface{} `yaml:",inline,omitempty"` + Consumer *OpenLineageConsumerYamlConfig `yaml:"consumer,omitempty"` +} + +// OpenLineageConsumerYamlConfig maps to the openlineage.consumer section of feature_store.yaml. +type OpenLineageConsumerYamlConfig struct { + Enabled bool `yaml:"enabled"` + StoreType *string `yaml:"store_type,omitempty"` + ConnectionString *string `yaml:"connection_string,omitempty"` + ApiKey *string `yaml:"api_key,omitempty"` + NamespaceMapping map[string]string `yaml:"namespace_mapping,omitempty"` } // OfflineStoreConfig is the configuration that relates to reading from and writing to the Feast offline store. diff --git a/sdk/python/feast/api/registry/rest/__init__.py b/sdk/python/feast/api/registry/rest/__init__.py index 24d755386a6..f26c213e81c 100644 --- a/sdk/python/feast/api/registry/rest/__init__.py +++ b/sdk/python/feast/api/registry/rest/__init__.py @@ -1,3 +1,6 @@ +import logging +from typing import Any, Optional + from fastapi import FastAPI from feast.api.registry.rest.data_sources import get_data_source_router @@ -14,6 +17,12 @@ from feast.api.registry.rest.saved_datasets import get_saved_dataset_router from feast.api.registry.rest.search import get_search_router +logger = logging.getLogger(__name__) + +_ol_store_instance: Optional[Any] = None +_ol_config: Optional[Any] = None +_ol_processor: Optional[Any] = None + def register_all_routes(app: FastAPI, grpc_handler, server=None, store=None): app.include_router(get_entity_router(grpc_handler)) @@ -32,3 +41,115 @@ def register_all_routes(app: FastAPI, grpc_handler, server=None, store=None): server.store if server and hasattr(server, "store") else None ) app.include_router(get_monitoring_router(grpc_handler, store=resolved_store)) + + _register_openlineage_consumer(app, resolved_store) + + +def _register_openlineage_consumer(app: FastAPI, feast_store): + """Register the OpenLineage consumer router if consumer is enabled.""" + if feast_store is None: + return + + try: + ol_pydantic = getattr(feast_store.config, "openlineage", None) + if ol_pydantic is None: + return + + from feast.openlineage.config import ( + OpenLineageConfig, + OpenLineageConsumerConfig, + ) + + if hasattr(ol_pydantic, "to_openlineage_config"): + ol_config = ol_pydantic.to_openlineage_config() + elif isinstance(ol_pydantic, dict): + ol_config = OpenLineageConfig.from_dict(ol_pydantic) + elif isinstance(ol_pydantic, OpenLineageConfig): + ol_config = ol_pydantic + else: + consumer_dict = getattr(ol_pydantic, "consumer", None) + if consumer_dict is None: + return + if isinstance(consumer_dict, dict): + consumer = OpenLineageConsumerConfig.from_dict(consumer_dict) + else: + consumer = OpenLineageConsumerConfig( + enabled=getattr(consumer_dict, "enabled", False), + store_type=getattr(consumer_dict, "store_type", "sql"), + connection_string=getattr(consumer_dict, "connection_string", None), + api_key=getattr(consumer_dict, "api_key", None), + namespace_mapping=getattr(consumer_dict, "namespace_mapping", None) + or {}, + ) + ol_config = OpenLineageConfig( + enabled=getattr(ol_pydantic, "enabled", False), + consumer=consumer, + ) + + consumer_config = getattr(ol_config, "consumer", None) + if consumer_config is None or not consumer_config.enabled: + return + + from feast.openlineage.consumer import get_consumer_router + from feast.openlineage.processor import OpenLineageProcessor + from feast.openlineage.store import OpenLineageStore + + if consumer_config.connection_string: + ol_store = OpenLineageStore( + connection_string=consumer_config.connection_string + ) + else: + registry_config = feast_store.config.registry + registry_path = getattr(registry_config, "path", None) + if isinstance(registry_config, dict): + registry_path = registry_config.get("path") + if not registry_path: + logger.warning("No registry path found for OpenLineage consumer store") + return + ol_store = OpenLineageStore(connection_string=registry_path) + + ol_store.initialize() + + import feast.api.registry.rest as _self_module + + _self_module._ol_store_instance = ol_store + _self_module._ol_config = ol_config + + ns_mapping = consumer_config.namespace_mapping + if isinstance(ns_mapping, type(None)): + ns_mapping = {} + + processor = OpenLineageProcessor( + store=ol_store, + namespace_mapping=ns_mapping, + ) + + _self_module._ol_processor = processor + + # Wire the local processor into Feast's own OL emitter so Feast events + # are also stored in the consumer DB automatically. + try: + if feast_store and hasattr(feast_store, "_openlineage_emitter"): + emitter = feast_store._openlineage_emitter + if emitter and hasattr(emitter, "_client") and emitter._client: + emitter._client.set_local_processor(processor) + logger.info("Feast OL emitter wired to local consumer processor") + except Exception as wire_err: + logger.debug(f"Could not wire emitter to local processor: {wire_err}") + + consumer_router = get_consumer_router( + config=ol_config, + store=ol_store, + processor=processor, + ) + + app.include_router(consumer_router) + logger.info("OpenLineage consumer endpoints registered") + + except ImportError as e: + logger.debug(f"OpenLineage consumer not available: {e}") + except Exception as e: + logger.warning(f"Failed to register OpenLineage consumer: {e}") + import traceback + + traceback.print_exc() diff --git a/sdk/python/feast/openlineage/__init__.py b/sdk/python/feast/openlineage/__init__.py index e32ae967004..b3c44849788 100644 --- a/sdk/python/feast/openlineage/__init__.py +++ b/sdk/python/feast/openlineage/__init__.py @@ -59,7 +59,7 @@ """ from feast.openlineage.client import FeastOpenLineageClient -from feast.openlineage.config import OpenLineageConfig +from feast.openlineage.config import OpenLineageConfig, OpenLineageConsumerConfig from feast.openlineage.emitter import FeastOpenLineageEmitter from feast.openlineage.facets import ( FeastDataSourceFacet, @@ -75,6 +75,7 @@ "FeastOpenLineageClient", "FeastOpenLineageEmitter", "OpenLineageConfig", + "OpenLineageConsumerConfig", # Facets (custom Feast metadata in lineage events) "FeastFeatureViewFacet", "FeastFeatureServiceFacet", diff --git a/sdk/python/feast/openlineage/client.py b/sdk/python/feast/openlineage/client.py index 445231aae89..d97bc482663 100644 --- a/sdk/python/feast/openlineage/client.py +++ b/sdk/python/feast/openlineage/client.py @@ -82,6 +82,8 @@ def __init__( load from environment variables. feature_store: Optional FeatureStore instance for context. """ + self._local_processor = None + if not OPENLINEAGE_AVAILABLE: logger.warning( "OpenLineage is not installed. Lineage events will not be emitted. " @@ -132,6 +134,23 @@ def namespace(self) -> str: """Get the default namespace.""" return self._config.namespace + def set_local_processor(self, processor) -> None: + """Attach a local OpenLineageProcessor so emitted events are also ingested locally.""" + self._local_processor = processor + + def _event_to_dict(self, event: Any) -> Optional[Dict[str, Any]]: + """Convert an OL event object to a plain dict for the local processor.""" + try: + if hasattr(event, "to_dict"): + return event.to_dict() + if hasattr(event, "__dict__"): + import json + + return json.loads(json.dumps(event, default=str)) + except Exception as e: + logger.debug(f"Could not serialize event for local processor: {e}") + return None + def emit(self, event: Any) -> bool: """ Emit an OpenLineage event. @@ -148,6 +167,15 @@ def emit(self, event: Any) -> bool: try: self._client.emit(event) + + if self._local_processor is not None: + event_dict = self._event_to_dict(event) + if event_dict: + try: + self._local_processor.process_event(event_dict) + except Exception as le: + logger.debug(f"Local processor ingest failed (non-fatal): {le}") + return True except Exception as e: logger.error(f"Failed to emit OpenLineage event: {e}") diff --git a/sdk/python/feast/openlineage/config.py b/sdk/python/feast/openlineage/config.py index 7c3b1fd9814..9380df3b8cf 100644 --- a/sdk/python/feast/openlineage/config.py +++ b/sdk/python/feast/openlineage/config.py @@ -21,6 +21,45 @@ from typing import Any, Dict, Optional +@dataclass +class OpenLineageConsumerConfig: + """ + Configuration for the OpenLineage consumer (event receiver). + + Attributes: + enabled: Whether the consumer is enabled + store_type: Storage backend type ('sql' uses the SQL registry DB) + connection_string: Optional separate DB connection string + api_key: API key for authenticating producers sending events + namespace_mapping: Map of OL namespace -> Feast project for RBAC scoping + """ + + enabled: bool = False + store_type: str = "sql" + connection_string: Optional[str] = None + api_key: Optional[str] = None + namespace_mapping: Dict[str, str] = field(default_factory=dict) + + @classmethod + def from_dict(cls, config_dict: Dict[str, Any]) -> "OpenLineageConsumerConfig": + return cls( + enabled=config_dict.get("enabled", False), + store_type=config_dict.get("store_type", "sql"), + connection_string=config_dict.get("connection_string"), + api_key=config_dict.get("api_key"), + namespace_mapping=config_dict.get("namespace_mapping", {}), + ) + + def to_dict(self) -> Dict[str, Any]: + return { + "enabled": self.enabled, + "store_type": self.store_type, + "connection_string": self.connection_string, + "api_key": self.api_key, + "namespace_mapping": self.namespace_mapping, + } + + @dataclass class OpenLineageConfig: """ @@ -38,6 +77,7 @@ class OpenLineageConfig: emit_on_apply: Emit lineage events when feast apply is called emit_on_materialize: Emit lineage events during materialization additional_config: Additional transport-specific configuration + consumer: Consumer (event receiver) configuration """ enabled: bool = True @@ -50,6 +90,9 @@ class OpenLineageConfig: emit_on_apply: bool = True emit_on_materialize: bool = True additional_config: Dict[str, Any] = field(default_factory=dict) + consumer: OpenLineageConsumerConfig = field( + default_factory=OpenLineageConsumerConfig + ) @classmethod def from_dict(cls, config_dict: Dict[str, Any]) -> "OpenLineageConfig": @@ -62,6 +105,13 @@ def from_dict(cls, config_dict: Dict[str, Any]) -> "OpenLineageConfig": Returns: OpenLineageConfig instance """ + consumer_dict = config_dict.get("consumer", {}) + consumer = ( + OpenLineageConsumerConfig.from_dict(consumer_dict) + if consumer_dict + else OpenLineageConsumerConfig() + ) + return cls( enabled=config_dict.get("enabled", True), transport_type=config_dict.get("transport_type"), @@ -73,6 +123,7 @@ def from_dict(cls, config_dict: Dict[str, Any]) -> "OpenLineageConfig": emit_on_apply=config_dict.get("emit_on_apply", True), emit_on_materialize=config_dict.get("emit_on_materialize", True), additional_config=config_dict.get("additional_config", {}), + consumer=consumer, ) @classmethod @@ -92,6 +143,14 @@ def from_env(cls) -> "OpenLineageConfig": Returns: OpenLineageConfig instance """ + consumer = OpenLineageConsumerConfig( + enabled=os.getenv("FEAST_OPENLINEAGE_CONSUMER_ENABLED", "false").lower() + == "true", + store_type=os.getenv("FEAST_OPENLINEAGE_CONSUMER_STORE_TYPE", "sql"), + connection_string=os.getenv("FEAST_OPENLINEAGE_CONSUMER_CONNECTION_STRING"), + api_key=os.getenv("FEAST_OPENLINEAGE_CONSUMER_API_KEY"), + ) + return cls( enabled=os.getenv("FEAST_OPENLINEAGE_ENABLED", "true").lower() == "true", transport_type=os.getenv("FEAST_OPENLINEAGE_TRANSPORT_TYPE"), @@ -108,8 +167,13 @@ def from_env(cls) -> "OpenLineageConfig": "FEAST_OPENLINEAGE_EMIT_ON_MATERIALIZE", "true" ).lower() == "true", + consumer=consumer, ) + @property + def consumer_api_key(self) -> Optional[str]: + return self.consumer.api_key if self.consumer else None + def to_dict(self) -> Dict[str, Any]: """ Convert configuration to dictionary. @@ -117,7 +181,7 @@ def to_dict(self) -> Dict[str, Any]: Returns: Dictionary representation of the configuration """ - return { + result = { "enabled": self.enabled, "transport_type": self.transport_type, "transport_url": self.transport_url, @@ -129,6 +193,9 @@ def to_dict(self) -> Dict[str, Any]: "emit_on_materialize": self.emit_on_materialize, "additional_config": self.additional_config, } + if self.consumer: + result["consumer"] = self.consumer.to_dict() + return result def get_transport_config(self) -> Optional[Dict[str, Any]]: """ diff --git a/sdk/python/feast/openlineage/consumer.py b/sdk/python/feast/openlineage/consumer.py new file mode 100644 index 00000000000..eba4b213d9c --- /dev/null +++ b/sdk/python/feast/openlineage/consumer.py @@ -0,0 +1,294 @@ +# Copyright 2026 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +OpenLineage consumer for Feast. + +Provides FastAPI router endpoints for receiving OpenLineage events +(POST /api/v1/lineage) and querying lineage data, making Feast +act as an OpenLineage consumer alongside its existing producer role. +""" + +import json +import logging +from typing import Any, List, Optional + +from fastapi import APIRouter, Header, HTTPException, Query, Request +from fastapi.responses import JSONResponse + +from feast.openlineage.config import OpenLineageConfig +from feast.openlineage.processor import OpenLineageProcessor +from feast.openlineage.store import OpenLineageStore + +logger = logging.getLogger(__name__) + + +def _safe_parse_json(val: Optional[str]) -> Optional[Any]: + if not val: + return None + try: + return json.loads(val) + except (json.JSONDecodeError, TypeError): + return None + + +def _verify_api_key( + expected_key: Optional[str], + x_api_key: Optional[str] = Header(None, alias="X-API-Key"), + authorization: Optional[str] = Header(None), +) -> bool: + if not expected_key: + return True + + if x_api_key and x_api_key == expected_key: + return True + + if authorization: + parts = authorization.split(" ", 1) + if ( + len(parts) == 2 + and parts[0].lower() == "bearer" + and parts[1] == expected_key + ): + return True + + return False + + +def get_consumer_router( + config: OpenLineageConfig, + store: OpenLineageStore, + processor: OpenLineageProcessor, + get_allowed_namespaces=None, +) -> APIRouter: + """ + Create FastAPI router for the OpenLineage consumer endpoints. + + Args: + config: OpenLineage configuration with consumer settings + store: The lineage store instance + processor: The event processor instance + get_allowed_namespaces: Optional callable that returns allowed namespaces + for the current user (for RBAC filtering). If None, all namespaces visible. + """ + router = APIRouter() + + # ── Producer-facing: receive events ── + + @router.post("/v1/lineage") + async def receive_lineage_event( + request: Request, + x_api_key: Optional[str] = Header(None, alias="X-API-Key"), + authorization: Optional[str] = Header(None), + ): + """ + Receive an OpenLineage event. + + Compatible with the standard OpenLineage API endpoint. + Accepts RunEvent, DatasetEvent, or JobEvent. + """ + api_key = getattr(config, "consumer_api_key", None) + if not _verify_api_key(api_key, x_api_key, authorization): + raise HTTPException(status_code=401, detail="Invalid API key") + + try: + body = await request.json() + except Exception: + raise HTTPException(status_code=400, detail="Invalid JSON body") + + if isinstance(body, list): + results = processor.process_batch(body) + return JSONResponse( + status_code=200 if results["failed"] == 0 else 207, + content={ + "status": "success" + if results["failed"] == 0 + else "partial_success", + "summary": { + "received": results["received"], + "successful": results["successful"], + "failed": results["failed"], + }, + }, + ) + else: + try: + event_id = processor.process_event(body) + return JSONResponse(status_code=201, content={"event_id": event_id}) + except Exception as e: + logger.error(f"Failed to process event: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + @router.post("/v1/lineage/batch") + async def receive_lineage_batch( + request: Request, + x_api_key: Optional[str] = Header(None, alias="X-API-Key"), + authorization: Optional[str] = Header(None), + ): + """Receive a batch of OpenLineage events.""" + api_key = getattr(config, "consumer_api_key", None) + if not _verify_api_key(api_key, x_api_key, authorization): + raise HTTPException(status_code=401, detail="Invalid API key") + + try: + body = await request.json() + except Exception: + raise HTTPException(status_code=400, detail="Invalid JSON body") + + if not isinstance(body, list): + raise HTTPException(status_code=400, detail="Expected array of events") + + results = processor.process_batch(body) + status = 204 if results["failed"] == 0 else 200 + if status == 204: + return JSONResponse(status_code=204, content=None) + + return JSONResponse( + status_code=200, + content={ + "status": "partial_success", + "summary": { + "received": results["received"], + "successful": results["successful"], + "failed": results["failed"], + }, + }, + ) + + # ── Query endpoints ── + + @router.get("/lineage/openlineage/events") + def list_events( + namespace: Optional[str] = Query(None), + job_name: Optional[str] = Query(None), + limit: int = Query(100, ge=1, le=1000), + offset: int = Query(0, ge=0), + ): + """List stored OpenLineage events with optional filtering.""" + ns_filter = _get_namespace_filter(get_allowed_namespaces) + events = store.get_events( + namespace=namespace, + job_name=job_name, + limit=limit, + offset=offset, + namespaces=ns_filter if not namespace else None, + ) + return {"events": events, "total": len(events)} + + @router.get("/lineage/openlineage/jobs") + def list_jobs(): + """List all jobs from all OpenLineage producers.""" + ns_filter = _get_namespace_filter(get_allowed_namespaces) + jobs = store.get_jobs(namespaces=ns_filter) + return {"jobs": jobs} + + @router.get("/lineage/openlineage/datasets") + def list_datasets(): + """List all datasets from OpenLineage events.""" + ns_filter = _get_namespace_filter(get_allowed_namespaces) + datasets = store.get_datasets(namespaces=ns_filter) + return {"datasets": datasets} + + @router.get("/lineage/openlineage/graph/{node_type}/{namespace}/{name}") + def get_lineage_graph( + node_type: str, + namespace: str, + name: str, + depth: int = Query(10, ge=1, le=50), + direction: str = Query("both", pattern="^(both|upstream|downstream)$"), + ): + """ + Get the lineage graph for a specific node. + + Traverses upstream and/or downstream from the given node + up to the specified depth. + """ + valid_types = ["job", "dataset"] + if node_type not in valid_types: + raise HTTPException( + status_code=400, + detail=f"node_type must be one of: {valid_types}", + ) + + ns_filter = _get_namespace_filter(get_allowed_namespaces) + graph = store.get_lineage_graph( + node_type=node_type, + namespace=namespace, + name=name, + depth=depth, + direction=direction, + allowed_namespaces=ns_filter, + ) + return graph + + @router.get("/lineage/openlineage/graph") + def get_full_lineage_graph(): + """ + Get all lineage edges (RBAC-filtered by namespace). + + Includes symlink edges that connect datasets across producers + when they reference the same physical data (via SymlinksDatasetFacet + or matching dataSource URIs). + """ + ns_filter = _get_namespace_filter(get_allowed_namespaces) + edges = store.get_all_lineage_edges(namespaces=ns_filter) + datasets = store.get_datasets(namespaces=ns_filter) + jobs = store.get_jobs(namespaces=ns_filter) + + nodes = [] + for ds in datasets: + facets = _safe_parse_json(ds.get("facets_json")) + schema = _safe_parse_json(ds.get("schema_json")) + nodes.append( + { + "type": "dataset", + "namespace": ds["dataset_namespace"], + "name": ds["dataset_name"], + "producer": ds.get("producer"), + "feast_object_type": ds.get("feast_object_type"), + "feast_object_name": ds.get("feast_object_name"), + "feast_project": ds.get("feast_project"), + "schema": schema, + "description": ds.get("description"), + "source_type": ds.get("source_type"), + "facets": facets, + } + ) + for job in jobs: + facets = _safe_parse_json(job.get("facets_json")) + nodes.append( + { + "type": "job", + "namespace": job["job_namespace"], + "name": job["job_name"], + "producer": job.get("producer"), + "job_type": job.get("job_type"), + "description": job.get("description"), + "facets": facets, + } + ) + + symlinks = store.get_all_symlinks() + + return {"nodes": nodes, "edges": edges, "symlinks": symlinks} + + def _get_namespace_filter(ns_callable) -> Optional[List[str]]: + if ns_callable: + try: + return ns_callable() + except Exception: + return None + return None + + return router diff --git a/sdk/python/feast/openlineage/models.py b/sdk/python/feast/openlineage/models.py new file mode 100644 index 00000000000..ec79cf3620d --- /dev/null +++ b/sdk/python/feast/openlineage/models.py @@ -0,0 +1,218 @@ +# Copyright 2026 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +SQLAlchemy table definitions for the OpenLineage consumer lineage store. + +Uses SQLAlchemy Core (Table + Column) pattern consistent with Feast's +existing SQL registry in feast/infra/registry/sql.py. +""" + +from sqlalchemy import ( + BigInteger, + Column, + Index, + MetaData, + String, + Text, + UniqueConstraint, +) + +ol_metadata = MetaData() + +openlineage_events = ( + "openlineage_events", + ol_metadata, + Column("event_id", String(255), primary_key=True), + Column("event_type", String(50), nullable=False), + Column("event_time", BigInteger, nullable=False), + Column("producer", String(512), nullable=True), + Column("job_namespace", String(512), nullable=False), + Column("job_name", String(512), nullable=False), + Column("run_id", String(255), nullable=True), + Column("event_json", Text, nullable=False), + Column("created_at", BigInteger, nullable=False), +) + +openlineage_jobs = ( + "openlineage_jobs", + ol_metadata, + Column("job_namespace", String(512), primary_key=True), + Column("job_name", String(512), primary_key=True), + Column("job_type", String(100), nullable=True), + Column("producer", String(512), nullable=True), + Column("description", Text, nullable=True), + Column("facets_json", Text, nullable=True), + Column("latest_run_id", String(255), nullable=True), + Column("updated_at", BigInteger, nullable=False), +) + +openlineage_datasets = ( + "openlineage_datasets", + ol_metadata, + Column("dataset_namespace", String(512), primary_key=True), + Column("dataset_name", String(512), primary_key=True), + Column("source_type", String(100), nullable=True), + Column("producer", String(512), nullable=True), + Column("description", Text, nullable=True), + Column("schema_json", Text, nullable=True), + Column("facets_json", Text, nullable=True), + Column("feast_object_type", String(100), nullable=True), + Column("feast_object_name", String(255), nullable=True), + Column("feast_project", String(255), nullable=True), + Column("updated_at", BigInteger, nullable=False), +) + +openlineage_runs = ( + "openlineage_runs", + ol_metadata, + Column("run_id", String(255), primary_key=True), + Column("job_namespace", String(512), nullable=False), + Column("job_name", String(512), nullable=False), + Column("state", String(50), nullable=False), + Column("started_at", BigInteger, nullable=True), + Column("ended_at", BigInteger, nullable=True), + Column("facets_json", Text, nullable=True), + Column("updated_at", BigInteger, nullable=False), +) + +openlineage_run_io = ( + "openlineage_run_io", + ol_metadata, + Column("run_id", String(255), nullable=False), + Column("dataset_namespace", String(512), nullable=False), + Column("dataset_name", String(512), nullable=False), + Column("io_type", String(50), nullable=False), + Column("facets_json", Text, nullable=True), + UniqueConstraint( + "run_id", + "dataset_namespace", + "dataset_name", + "io_type", + name="uq_run_io", + ), +) + +openlineage_lineage_edges = ( + "openlineage_lineage_edges", + ol_metadata, + Column("source_type", String(50), nullable=False), + Column("source_namespace", String(512), nullable=False), + Column("source_name", String(512), nullable=False), + Column("target_type", String(50), nullable=False), + Column("target_namespace", String(512), nullable=False), + Column("target_name", String(512), nullable=False), + Column("edge_type", String(50), nullable=True), + Column("updated_at", BigInteger, nullable=False), + UniqueConstraint( + "source_type", + "source_namespace", + "source_name", + "target_type", + "target_namespace", + "target_name", + name="uq_lineage_edge", + ), +) + +openlineage_dataset_symlinks = ( + "openlineage_dataset_symlinks", + ol_metadata, + Column("dataset_namespace", String(512), nullable=False), + Column("dataset_name", String(512), nullable=False), + Column("linked_namespace", String(512), nullable=False), + Column("linked_name", String(512), nullable=False), + Column("link_type", String(50), nullable=False), + Column("updated_at", BigInteger, nullable=False), + UniqueConstraint( + "dataset_namespace", + "dataset_name", + "linked_namespace", + "linked_name", + name="uq_dataset_symlink", + ), +) + + +def _build_tables(): + """Build Table objects from the tuple definitions above.""" + from sqlalchemy import Table + + tables = {} + for name, tbl_def in [ + ("events", openlineage_events), + ("jobs", openlineage_jobs), + ("datasets", openlineage_datasets), + ("runs", openlineage_runs), + ("run_io", openlineage_run_io), + ("lineage_edges", openlineage_lineage_edges), + ("dataset_symlinks", openlineage_dataset_symlinks), + ]: + tbl_name = tbl_def[0] + meta = tbl_def[1] + columns = tbl_def[2:] + tables[name] = Table(tbl_name, meta, *columns) + return tables + + +OL_TABLES = _build_tables() + +idx_events_time = Index( + "idx_ol_events_time", + OL_TABLES["events"].c.event_time, +) +idx_events_job = Index( + "idx_ol_events_job", + OL_TABLES["events"].c.job_namespace, + OL_TABLES["events"].c.job_name, +) +idx_runs_job = Index( + "idx_ol_runs_job", + OL_TABLES["runs"].c.job_namespace, + OL_TABLES["runs"].c.job_name, +) +idx_run_io_run = Index( + "idx_ol_run_io_run", + OL_TABLES["run_io"].c.run_id, +) +idx_run_io_dataset = Index( + "idx_ol_run_io_dataset", + OL_TABLES["run_io"].c.dataset_namespace, + OL_TABLES["run_io"].c.dataset_name, +) +idx_lineage_source = Index( + "idx_ol_lineage_source", + OL_TABLES["lineage_edges"].c.source_namespace, + OL_TABLES["lineage_edges"].c.source_name, +) +idx_lineage_target = Index( + "idx_ol_lineage_target", + OL_TABLES["lineage_edges"].c.target_namespace, + OL_TABLES["lineage_edges"].c.target_name, +) +idx_datasets_feast = Index( + "idx_ol_datasets_feast", + OL_TABLES["datasets"].c.feast_project, + OL_TABLES["datasets"].c.feast_object_type, +) +idx_symlinks_dataset = Index( + "idx_ol_symlinks_dataset", + OL_TABLES["dataset_symlinks"].c.dataset_namespace, + OL_TABLES["dataset_symlinks"].c.dataset_name, +) +idx_symlinks_linked = Index( + "idx_ol_symlinks_linked", + OL_TABLES["dataset_symlinks"].c.linked_namespace, + OL_TABLES["dataset_symlinks"].c.linked_name, +) diff --git a/sdk/python/feast/openlineage/processor.py b/sdk/python/feast/openlineage/processor.py new file mode 100644 index 00000000000..bbacf377b96 --- /dev/null +++ b/sdk/python/feast/openlineage/processor.py @@ -0,0 +1,383 @@ +# Copyright 2026 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +OpenLineage event processor for Feast. + +Parses incoming OpenLineage events (RunEvent, DatasetEvent, JobEvent), +extracts metadata, and builds the lineage graph in the store. +""" + +import logging +import uuid +from typing import Any, Dict, List, Optional + +from feast.openlineage.store import OpenLineageStore + +logger = logging.getLogger(__name__) + + +class OpenLineageProcessor: + """ + Processes OpenLineage events and stores them in the lineage store. + + Handles RunEvent, DatasetEvent, and JobEvent types, extracting + jobs, datasets, runs, I/O relationships, and building lineage edges. + """ + + def __init__( + self, + store: OpenLineageStore, + namespace_mapping: Optional[Dict[str, str]] = None, + ): + self._store = store + self._namespace_mapping = namespace_mapping or {} + + def process_event(self, event: Dict[str, Any]) -> str: + """ + Process a single OpenLineage event. + + Determines the event type and delegates to the appropriate handler. + Returns the event ID. + """ + event_id = str(uuid.uuid4()) + + if "run" in event and "job" in event: + self._process_run_event(event_id, event) + elif "dataset" in event and "job" not in event and "run" not in event: + self._process_dataset_event(event_id, event) + elif "job" in event and "run" not in event: + self._process_job_event(event_id, event) + else: + logger.warning(f"Unknown event structure, storing as raw event: {event_id}") + self._store.store_event(event_id, event) + + return event_id + + def process_batch(self, events: List[Dict[str, Any]]) -> Dict[str, Any]: + """Process a batch of OpenLineage events.""" + successful = 0 + failed = 0 + event_ids: List[str] = [] + + for event in events: + try: + event_id = self.process_event(event) + successful += 1 + event_ids.append(event_id) + except Exception as e: + logger.error(f"Failed to process event: {e}") + failed += 1 + + return { + "received": len(events), + "successful": successful, + "failed": failed, + "event_ids": event_ids, + } + + def _process_run_event(self, event_id: str, event: Dict[str, Any]): + job = event.get("job", {}) + run = event.get("run", {}) + event_type = event.get("eventType", "OTHER") + producer = event.get("producer") + + job_namespace = job.get("namespace", "") + job_name = job.get("name", "") + run_id = run.get("runId", "") + + self._store.store_event(event_id, event) + + self._store.upsert_job(job_namespace, job_name, job, producer=producer) + + run_facets = run.get("facets", {}) + self._store.upsert_run(run_id, job_namespace, job_name, event_type, run_facets) + + inputs = event.get("inputs", []) + outputs = event.get("outputs", []) + + for inp in inputs: + ds_namespace = inp.get("namespace", job_namespace) + ds_name = inp.get("name", "") + ds_facets = inp.get("facets", {}) + + feast_mapping = self._resolve_feast_mapping(ds_namespace, ds_name) + self._store.upsert_dataset( + ds_namespace, ds_name, ds_facets, feast_mapping, producer=producer + ) + self._store.store_run_io(run_id, ds_namespace, ds_name, "INPUT", ds_facets) + self._process_dataset_symlinks(ds_namespace, ds_name, ds_facets) + + self._store.upsert_lineage_edge( + source_type="dataset", + source_namespace=ds_namespace, + source_name=ds_name, + target_type="job", + target_namespace=job_namespace, + target_name=job_name, + edge_type="input", + ) + + for out in outputs: + ds_namespace = out.get("namespace", job_namespace) + ds_name = out.get("name", "") + ds_facets = out.get("facets", {}) + + feast_mapping = self._resolve_feast_mapping(ds_namespace, ds_name) + self._store.upsert_dataset( + ds_namespace, ds_name, ds_facets, feast_mapping, producer=producer + ) + self._store.store_run_io(run_id, ds_namespace, ds_name, "OUTPUT", ds_facets) + self._process_dataset_symlinks(ds_namespace, ds_name, ds_facets) + + self._store.upsert_lineage_edge( + source_type="job", + source_namespace=job_namespace, + source_name=job_name, + target_type="dataset", + target_namespace=ds_namespace, + target_name=ds_name, + edge_type="output", + ) + + self._build_dataset_to_dataset_edges(inputs, outputs, job_namespace) + + def _process_dataset_event(self, event_id: str, event: Dict[str, Any]): + dataset = event.get("dataset", {}) + ds_namespace = dataset.get("namespace", "") + ds_name = dataset.get("name", "") + ds_facets = dataset.get("facets", {}) + producer = event.get("producer") + + self._store.store_event(event_id, event) + + feast_mapping = self._resolve_feast_mapping(ds_namespace, ds_name) + self._store.upsert_dataset( + ds_namespace, ds_name, ds_facets, feast_mapping, producer=producer + ) + self._process_dataset_symlinks(ds_namespace, ds_name, ds_facets) + + def _process_job_event(self, event_id: str, event: Dict[str, Any]): + job = event.get("job", {}) + job_namespace = job.get("namespace", "") + job_name = job.get("name", "") + producer = event.get("producer") + + self._store.store_event(event_id, event) + + self._store.upsert_job(job_namespace, job_name, job, producer=producer) + + inputs = event.get("inputs", []) + outputs = event.get("outputs", []) + + for inp in inputs: + ds_namespace = inp.get("namespace", job_namespace) + ds_name = inp.get("name", "") + ds_facets = inp.get("facets", {}) + + feast_mapping = self._resolve_feast_mapping(ds_namespace, ds_name) + self._store.upsert_dataset( + ds_namespace, ds_name, ds_facets, feast_mapping, producer=producer + ) + self._process_dataset_symlinks(ds_namespace, ds_name, ds_facets) + + self._store.upsert_lineage_edge( + source_type="dataset", + source_namespace=ds_namespace, + source_name=ds_name, + target_type="job", + target_namespace=job_namespace, + target_name=job_name, + edge_type="input", + ) + + for out in outputs: + ds_namespace = out.get("namespace", job_namespace) + ds_name = out.get("name", "") + ds_facets = out.get("facets", {}) + + feast_mapping = self._resolve_feast_mapping(ds_namespace, ds_name) + self._store.upsert_dataset( + ds_namespace, ds_name, ds_facets, feast_mapping, producer=producer + ) + self._process_dataset_symlinks(ds_namespace, ds_name, ds_facets) + + self._store.upsert_lineage_edge( + source_type="job", + source_namespace=job_namespace, + source_name=job_name, + target_type="dataset", + target_namespace=ds_namespace, + target_name=ds_name, + edge_type="output", + ) + + def _build_dataset_to_dataset_edges( + self, + inputs: List[Dict], + outputs: List[Dict], + job_namespace: str, + ): + """ + Build transitive dataset-to-dataset edges through a job. + + For each input dataset and output dataset of the same job, + create an edge: input_dataset -> output_dataset. + This enables dataset-level lineage traversal without + requiring the UI to understand jobs. + """ + for inp in inputs: + in_ns = inp.get("namespace", job_namespace) + in_name = inp.get("name", "") + for out in outputs: + out_ns = out.get("namespace", job_namespace) + out_name = out.get("name", "") + if in_name and out_name: + self._store.upsert_lineage_edge( + source_type="dataset", + source_namespace=in_ns, + source_name=in_name, + target_type="dataset", + target_namespace=out_ns, + target_name=out_name, + edge_type="derived", + ) + + def _process_dataset_symlinks( + self, + ds_namespace: str, + ds_name: str, + ds_facets: Dict[str, Any], + ): + """ + Process SymlinksDatasetFacet and dataSource URI to create + cross-producer dataset links. + + OpenLineage SymlinksDatasetFacet spec: + { + "symlinks": { + "identifiers": [ + {"namespace": "...", "name": "...", "type": "TABLE"} + ] + } + } + + Also auto-links datasets sharing the same dataSource URI + across different namespaces. + """ + symlinks_facet = ds_facets.get("symlinks", {}) + identifiers = symlinks_facet.get("identifiers", []) + for ident in identifiers: + linked_ns = ident.get("namespace", "") + linked_name = ident.get("name", "") + link_type = ident.get("type", "symlink") + if ( + linked_ns + and linked_name + and (linked_ns != ds_namespace or linked_name != ds_name) + ): + self._store.upsert_dataset_symlink( + ds_namespace, + ds_name, + linked_ns, + linked_name, + link_type, + ) + self._store.upsert_dataset(linked_ns, linked_name) + self._store.upsert_lineage_edge( + source_type="dataset", + source_namespace=ds_namespace, + source_name=ds_name, + target_type="dataset", + target_namespace=linked_ns, + target_name=linked_name, + edge_type="symlink", + ) + self._store.upsert_lineage_edge( + source_type="dataset", + source_namespace=linked_ns, + source_name=linked_name, + target_type="dataset", + target_namespace=ds_namespace, + target_name=ds_name, + edge_type="symlink", + ) + + ds_uri = ds_facets.get("dataSource", {}).get("uri", "") + if ds_uri: + existing = self._store.find_datasets_by_uri(ds_uri) + for match in existing: + m_ns = match["namespace"] + m_name = match["name"] + if m_ns != ds_namespace or m_name != ds_name: + self._store.upsert_dataset_symlink( + ds_namespace, + ds_name, + m_ns, + m_name, + "dataSource_uri", + ) + self._store.upsert_lineage_edge( + source_type="dataset", + source_namespace=ds_namespace, + source_name=ds_name, + target_type="dataset", + target_namespace=m_ns, + target_name=m_name, + edge_type="symlink", + ) + self._store.upsert_lineage_edge( + source_type="dataset", + source_namespace=m_ns, + source_name=m_name, + target_type="dataset", + target_namespace=ds_namespace, + target_name=ds_name, + edge_type="symlink", + ) + + def _resolve_feast_mapping( + self, namespace: str, dataset_name: str + ) -> Optional[Dict[str, str]]: + """ + Attempt to map an OpenLineage dataset to a Feast registry object. + + Mapping rules: + 1. If the namespace matches a Feast project (directly or via namespace_mapping), + check if the dataset name matches a known Feast object naming pattern. + 2. Feast apply emits datasets with names like the FeatureView/FeatureService name. + 3. Feast materialize emits datasets named 'online_store_{fv_name}'. + """ + feast_project = self._namespace_mapping.get(namespace, namespace) + + if dataset_name.startswith("online_store_"): + fv_name = dataset_name[len("online_store_") :] + return { + "type": "featureView", + "name": fv_name, + "project": feast_project, + } + + if dataset_name.startswith("request_source_"): + return { + "type": "dataSource", + "name": dataset_name, + "project": feast_project, + } + + return { + "type": "unknown", + "name": dataset_name, + "project": feast_project, + } diff --git a/sdk/python/feast/openlineage/store.py b/sdk/python/feast/openlineage/store.py new file mode 100644 index 00000000000..4d1d331ce6f --- /dev/null +++ b/sdk/python/feast/openlineage/store.py @@ -0,0 +1,681 @@ +# Copyright 2026 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +OpenLineage lineage store for Feast. + +Provides CRUD operations for OpenLineage events, jobs, datasets, runs, +and lineage edges using SQLAlchemy Core, consistent with Feast's SQL registry. +""" + +import json +import logging +import time +from typing import Any, Dict, List, Optional + +from sqlalchemy import create_engine, select +from sqlalchemy.engine import Engine + +from feast.openlineage.models import OL_TABLES, ol_metadata + +logger = logging.getLogger(__name__) + + +class OpenLineageStore: + """ + Storage layer for OpenLineage lineage data. + + Stores events, jobs, datasets, runs, and lineage graph edges + in PostgreSQL (or SQLite for dev/testing). + """ + + def __init__( + self, engine: Optional[Engine] = None, connection_string: Optional[str] = None + ): + if engine: + self._engine = engine + elif connection_string: + self._engine = create_engine(connection_string) + else: + raise ValueError("Either engine or connection_string must be provided") + + def initialize(self): + ol_metadata.create_all(self._engine) + logger.info("OpenLineage store tables created/verified") + + @property + def engine(self) -> Engine: + return self._engine + + def store_event(self, event_id: str, event_data: Dict[str, Any]): + now = int(time.time() * 1000) + job = event_data.get("job", {}) + run = event_data.get("run", {}) + + row = { + "event_id": event_id, + "event_type": event_data.get("eventType", "UNKNOWN"), + "event_time": _parse_timestamp(event_data.get("eventTime", "")), + "producer": event_data.get("producer"), + "job_namespace": job.get("namespace", ""), + "job_name": job.get("name", ""), + "run_id": run.get("runId"), + "event_json": json.dumps(event_data), + "created_at": now, + } + + tbl = OL_TABLES["events"] + with self._engine.begin() as conn: + conn.execute(tbl.insert().values(**row)) + + def upsert_job( + self, + namespace: str, + name: str, + job_data: Dict[str, Any], + producer: Optional[str] = None, + ): + now = int(time.time() * 1000) + facets = job_data.get("facets", {}) + job_type = None + if "jobType" in facets: + jt = facets["jobType"] + job_type = jt.get("processingType", jt.get("integration")) + + description = None + if "documentation" in facets: + description = facets["documentation"].get("description") + + tbl = OL_TABLES["jobs"] + with self._engine.begin() as conn: + existing = conn.execute( + select(tbl).where( + tbl.c.job_namespace == namespace, + tbl.c.job_name == name, + ) + ).first() + + if existing: + update_vals = { + "job_type": job_type or existing.job_type, + "description": description or existing.description, + "facets_json": json.dumps(facets) + if facets + else existing.facets_json, + "updated_at": now, + } + if producer: + update_vals["producer"] = producer + conn.execute( + tbl.update() + .where(tbl.c.job_namespace == namespace, tbl.c.job_name == name) + .values(**update_vals) + ) + else: + conn.execute( + tbl.insert().values( + job_namespace=namespace, + job_name=name, + job_type=job_type, + producer=producer, + description=description, + facets_json=json.dumps(facets) if facets else None, + updated_at=now, + ) + ) + + def upsert_dataset( + self, + namespace: str, + name: str, + facets: Optional[Dict[str, Any]] = None, + feast_mapping: Optional[Dict[str, str]] = None, + producer: Optional[str] = None, + ): + now = int(time.time() * 1000) + facets = facets or {} + + schema_json = None + if "schema" in facets: + schema_json = json.dumps(facets["schema"]) + + description = None + if "documentation" in facets: + description = facets["documentation"].get("description") + + source_type = None + if "dataSource" in facets: + source_type = facets["dataSource"].get("name") + + feast_obj_type = feast_mapping.get("type") if feast_mapping else None + feast_obj_name = feast_mapping.get("name") if feast_mapping else None + feast_project = feast_mapping.get("project") if feast_mapping else None + + tbl = OL_TABLES["datasets"] + with self._engine.begin() as conn: + existing = conn.execute( + select(tbl).where( + tbl.c.dataset_namespace == namespace, + tbl.c.dataset_name == name, + ) + ).first() + + values = { + "source_type": source_type, + "description": description, + "schema_json": schema_json, + "facets_json": json.dumps(facets) if facets else None, + "updated_at": now, + } + if producer: + values["producer"] = producer + if feast_obj_type: + values["feast_object_type"] = feast_obj_type + if feast_obj_name: + values["feast_object_name"] = feast_obj_name + if feast_project: + values["feast_project"] = feast_project + + if existing: + conn.execute( + tbl.update() + .where( + tbl.c.dataset_namespace == namespace, + tbl.c.dataset_name == name, + ) + .values(**values) + ) + else: + values["dataset_namespace"] = namespace + values["dataset_name"] = name + conn.execute(tbl.insert().values(**values)) + + def upsert_run( + self, + run_id: str, + job_namespace: str, + job_name: str, + state: str, + facets: Optional[Dict] = None, + ): + now = int(time.time() * 1000) + tbl = OL_TABLES["runs"] + + with self._engine.begin() as conn: + existing = conn.execute(select(tbl).where(tbl.c.run_id == run_id)).first() + + if existing: + update_vals: Dict[str, Any] = {"state": state, "updated_at": now} + if state in ("COMPLETE", "FAIL", "ABORT"): + update_vals["ended_at"] = now + if facets: + update_vals["facets_json"] = json.dumps(facets) + conn.execute( + tbl.update().where(tbl.c.run_id == run_id).values(**update_vals) + ) + else: + conn.execute( + tbl.insert().values( + run_id=run_id, + job_namespace=job_namespace, + job_name=job_name, + state=state, + started_at=now if state == "START" else None, + ended_at=now + if state in ("COMPLETE", "FAIL", "ABORT") + else None, + facets_json=json.dumps(facets) if facets else None, + updated_at=now, + ) + ) + + tbl_jobs = OL_TABLES["jobs"] + conn.execute( + tbl_jobs.update() + .where( + tbl_jobs.c.job_namespace == job_namespace, + tbl_jobs.c.job_name == job_name, + ) + .values(latest_run_id=run_id, updated_at=now) + ) + + def store_run_io( + self, + run_id: str, + dataset_namespace: str, + dataset_name: str, + io_type: str, + facets: Optional[Dict] = None, + ): + tbl = OL_TABLES["run_io"] + with self._engine.begin() as conn: + existing = conn.execute( + select(tbl).where( + tbl.c.run_id == run_id, + tbl.c.dataset_namespace == dataset_namespace, + tbl.c.dataset_name == dataset_name, + tbl.c.io_type == io_type, + ) + ).first() + + if not existing: + conn.execute( + tbl.insert().values( + run_id=run_id, + dataset_namespace=dataset_namespace, + dataset_name=dataset_name, + io_type=io_type, + facets_json=json.dumps(facets) if facets else None, + ) + ) + + def upsert_lineage_edge( + self, + source_type: str, + source_namespace: str, + source_name: str, + target_type: str, + target_namespace: str, + target_name: str, + edge_type: Optional[str] = None, + ): + now = int(time.time() * 1000) + tbl = OL_TABLES["lineage_edges"] + with self._engine.begin() as conn: + existing = conn.execute( + select(tbl).where( + tbl.c.source_type == source_type, + tbl.c.source_namespace == source_namespace, + tbl.c.source_name == source_name, + tbl.c.target_type == target_type, + tbl.c.target_namespace == target_namespace, + tbl.c.target_name == target_name, + ) + ).first() + + if existing: + conn.execute( + tbl.update() + .where( + tbl.c.source_type == source_type, + tbl.c.source_namespace == source_namespace, + tbl.c.source_name == source_name, + tbl.c.target_type == target_type, + tbl.c.target_namespace == target_namespace, + tbl.c.target_name == target_name, + ) + .values(edge_type=edge_type, updated_at=now) + ) + else: + conn.execute( + tbl.insert().values( + source_type=source_type, + source_namespace=source_namespace, + source_name=source_name, + target_type=target_type, + target_namespace=target_namespace, + target_name=target_name, + edge_type=edge_type, + updated_at=now, + ) + ) + + # ── Query methods ── + + def get_events( + self, + namespace: Optional[str] = None, + job_name: Optional[str] = None, + limit: int = 100, + offset: int = 0, + namespaces: Optional[List[str]] = None, + ) -> List[Dict[str, Any]]: + tbl = OL_TABLES["events"] + query = ( + select(tbl).order_by(tbl.c.event_time.desc()).limit(limit).offset(offset) + ) + + if namespace: + query = query.where(tbl.c.job_namespace == namespace) + elif namespaces: + query = query.where(tbl.c.job_namespace.in_(namespaces)) + if job_name: + query = query.where(tbl.c.job_name == job_name) + + with self._engine.connect() as conn: + rows = conn.execute(query).fetchall() + return [dict(row._mapping) for row in rows] + + def get_jobs(self, namespaces: Optional[List[str]] = None) -> List[Dict[str, Any]]: + tbl = OL_TABLES["jobs"] + query = select(tbl).order_by(tbl.c.updated_at.desc()) + if namespaces: + query = query.where(tbl.c.job_namespace.in_(namespaces)) + + with self._engine.connect() as conn: + rows = conn.execute(query).fetchall() + return [dict(row._mapping) for row in rows] + + def get_datasets( + self, namespaces: Optional[List[str]] = None + ) -> List[Dict[str, Any]]: + tbl = OL_TABLES["datasets"] + query = select(tbl).order_by(tbl.c.updated_at.desc()) + if namespaces: + query = query.where(tbl.c.dataset_namespace.in_(namespaces)) + + with self._engine.connect() as conn: + rows = conn.execute(query).fetchall() + return [dict(row._mapping) for row in rows] + + def get_lineage_graph( + self, + node_type: str, + namespace: str, + name: str, + depth: int = 10, + direction: str = "both", + allowed_namespaces: Optional[List[str]] = None, + ) -> Dict[str, Any]: + """ + Get the lineage graph for a node, traversing upstream and/or downstream. + + Returns a dict with 'nodes' and 'edges' suitable for UI rendering. + """ + nodes: Dict[str, Dict[str, Any]] = {} + edges: List[Dict[str, Any]] = [] + + tbl = OL_TABLES["lineage_edges"] + + with self._engine.connect() as conn: + if direction in ("both", "downstream"): + self._traverse( + conn, + tbl, + node_type, + namespace, + name, + depth, + "downstream", + nodes, + edges, + allowed_namespaces, + ) + if direction in ("both", "upstream"): + self._traverse( + conn, + tbl, + node_type, + namespace, + name, + depth, + "upstream", + nodes, + edges, + allowed_namespaces, + ) + + root_key = f"{node_type}:{namespace}:{name}" + if root_key not in nodes: + nodes[root_key] = { + "type": node_type, + "namespace": namespace, + "name": name, + } + + return { + "nodes": list(nodes.values()), + "edges": edges, + } + + def _traverse( + self, + conn, + tbl, + node_type: str, + namespace: str, + name: str, + depth: int, + direction: str, + nodes: Dict[str, Dict], + edges: List[Dict], + allowed_namespaces: Optional[List[str]], + ): + visited = set() + queue = [(node_type, namespace, name, 0)] + + while queue: + n_type, n_ns, n_name, d = queue.pop(0) + key = f"{n_type}:{n_ns}:{n_name}" + + if key in visited or d > depth: + continue + visited.add(key) + + if direction == "downstream": + query = select(tbl).where( + tbl.c.source_type == n_type, + tbl.c.source_namespace == n_ns, + tbl.c.source_name == n_name, + ) + else: + query = select(tbl).where( + tbl.c.target_type == n_type, + tbl.c.target_namespace == n_ns, + tbl.c.target_name == n_name, + ) + + rows = conn.execute(query).fetchall() + for row in rows: + r = row._mapping + src_key = ( + f"{r['source_type']}:{r['source_namespace']}:{r['source_name']}" + ) + tgt_key = ( + f"{r['target_type']}:{r['target_namespace']}:{r['target_name']}" + ) + + if allowed_namespaces: + if ( + r["source_namespace"] not in allowed_namespaces + or r["target_namespace"] not in allowed_namespaces + ): + continue + + edge = { + "source_type": r["source_type"], + "source_namespace": r["source_namespace"], + "source_name": r["source_name"], + "target_type": r["target_type"], + "target_namespace": r["target_namespace"], + "target_name": r["target_name"], + "edge_type": r.get("edge_type"), + } + if edge not in edges: + edges.append(edge) + + for node_key, nt, ns, nn in [ + ( + src_key, + r["source_type"], + r["source_namespace"], + r["source_name"], + ), + ( + tgt_key, + r["target_type"], + r["target_namespace"], + r["target_name"], + ), + ]: + if node_key not in nodes: + nodes[node_key] = { + "type": nt, + "namespace": ns, + "name": nn, + } + + if direction == "downstream": + next_key = tgt_key + next_type = r["target_type"] + next_ns = r["target_namespace"] + next_name = r["target_name"] + else: + next_key = src_key + next_type = r["source_type"] + next_ns = r["source_namespace"] + next_name = r["source_name"] + + if next_key not in visited: + queue.append((next_type, next_ns, next_name, d + 1)) + + def upsert_dataset_symlink( + self, + dataset_namespace: str, + dataset_name: str, + linked_namespace: str, + linked_name: str, + link_type: str = "symlink", + ): + """Store a symlink between two dataset identifiers (bidirectional lookup).""" + now = int(time.time() * 1000) + tbl = OL_TABLES["dataset_symlinks"] + with self._engine.begin() as conn: + existing = conn.execute( + select(tbl).where( + tbl.c.dataset_namespace == dataset_namespace, + tbl.c.dataset_name == dataset_name, + tbl.c.linked_namespace == linked_namespace, + tbl.c.linked_name == linked_name, + ) + ).first() + if existing: + conn.execute( + tbl.update() + .where( + tbl.c.dataset_namespace == dataset_namespace, + tbl.c.dataset_name == dataset_name, + tbl.c.linked_namespace == linked_namespace, + tbl.c.linked_name == linked_name, + ) + .values(link_type=link_type, updated_at=now) + ) + else: + conn.execute( + tbl.insert().values( + dataset_namespace=dataset_namespace, + dataset_name=dataset_name, + linked_namespace=linked_namespace, + linked_name=linked_name, + link_type=link_type, + updated_at=now, + ) + ) + + def get_dataset_aliases(self, namespace: str, name: str) -> List[Dict[str, str]]: + """Get all known aliases for a dataset (both directions).""" + tbl = OL_TABLES["dataset_symlinks"] + results = [] + with self._engine.connect() as conn: + rows = conn.execute( + select(tbl).where( + tbl.c.dataset_namespace == namespace, + tbl.c.dataset_name == name, + ) + ).fetchall() + for r in rows: + results.append( + { + "namespace": r._mapping["linked_namespace"], + "name": r._mapping["linked_name"], + "link_type": r._mapping["link_type"], + } + ) + + rows = conn.execute( + select(tbl).where( + tbl.c.linked_namespace == namespace, + tbl.c.linked_name == name, + ) + ).fetchall() + for r in rows: + results.append( + { + "namespace": r._mapping["dataset_namespace"], + "name": r._mapping["dataset_name"], + "link_type": r._mapping["link_type"], + } + ) + return results + + def find_datasets_by_uri(self, uri: str) -> List[Dict[str, str]]: + """Find all datasets whose dataSource facet contains the given URI.""" + tbl = OL_TABLES["datasets"] + with self._engine.connect() as conn: + rows = conn.execute( + select( + tbl.c.dataset_namespace, tbl.c.dataset_name, tbl.c.facets_json + ).where(tbl.c.facets_json.isnot(None)) + ).fetchall() + + matches = [] + for r in rows: + try: + facets = json.loads(r._mapping["facets_json"]) + ds_uri = facets.get("dataSource", {}).get("uri", "") + if ds_uri and ds_uri == uri: + matches.append( + { + "namespace": r._mapping["dataset_namespace"], + "name": r._mapping["dataset_name"], + } + ) + except (json.JSONDecodeError, AttributeError): + pass + return matches + + def get_all_symlinks(self) -> List[Dict[str, Any]]: + """Get all dataset symlinks.""" + tbl = OL_TABLES["dataset_symlinks"] + with self._engine.connect() as conn: + rows = conn.execute(select(tbl)).fetchall() + return [dict(r._mapping) for r in rows] + + def get_all_lineage_edges( + self, namespaces: Optional[List[str]] = None + ) -> List[Dict[str, Any]]: + tbl = OL_TABLES["lineage_edges"] + query = select(tbl) + if namespaces: + query = query.where( + tbl.c.source_namespace.in_(namespaces) + | tbl.c.target_namespace.in_(namespaces) + ) + with self._engine.connect() as conn: + rows = conn.execute(query).fetchall() + return [dict(row._mapping) for row in rows] + + +def _parse_timestamp(ts_str: str) -> int: + if not ts_str: + return int(time.time() * 1000) + try: + from datetime import datetime + + if ts_str.endswith("Z"): + ts_str = ts_str[:-1] + "+00:00" + dt = datetime.fromisoformat(ts_str) + return int(dt.timestamp() * 1000) + except (ValueError, TypeError): + return int(time.time() * 1000) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 06529cea0f2..59ac9885fa0 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -239,6 +239,25 @@ class DataQualityMonitoringConfig(FeastConfigBaseModel): """Whether baseline distribution is computed automatically on ``feast apply``.""" +class OpenLineageConsumerConfig(FeastBaseModel): + """Configuration for the OpenLineage consumer (event receiver).""" + + enabled: StrictBool = False + """ bool: Whether the consumer is enabled. """ + + store_type: StrictStr = "sql" + """ str: Storage backend type. Currently only 'sql' is supported. """ + + connection_string: Optional[StrictStr] = None + """ str: Optional separate database connection string. """ + + api_key: Optional[StrictStr] = None + """ str: API key for authenticating producers sending events. """ + + namespace_mapping: Optional[Dict[str, str]] = None + """ dict: Map of OL namespace -> Feast project for RBAC scoping. """ + + class OpenLineageConfig(FeastBaseModel): """Configuration for OpenLineage integration. @@ -281,9 +300,25 @@ class OpenLineageConfig(FeastBaseModel): emit_on_materialize: StrictBool = True """ bool: Emit lineage events during materialization. """ + consumer: Optional[OpenLineageConsumerConfig] = None + """ OpenLineageConsumerConfig: Consumer (event receiver) configuration. """ + def to_openlineage_config(self): """Convert to feast.openlineage.OpenLineageConfig.""" from feast.openlineage.config import OpenLineageConfig as OLConfig + from feast.openlineage.config import ( + OpenLineageConsumerConfig as OLConsumerConfig, + ) + + consumer = None + if self.consumer: + consumer = OLConsumerConfig( + enabled=self.consumer.enabled, + store_type=self.consumer.store_type, + connection_string=self.consumer.connection_string, + api_key=self.consumer.api_key, + namespace_mapping=self.consumer.namespace_mapping or {}, + ) return OLConfig( enabled=self.enabled, @@ -295,6 +330,7 @@ def to_openlineage_config(self): producer=self.producer, emit_on_apply=self.emit_on_apply, emit_on_materialize=self.emit_on_materialize, + consumer=consumer or OLConsumerConfig(), ) diff --git a/ui/package-lock.json b/ui/package-lock.json index 2552b4367bc..f082919e432 100644 --- a/ui/package-lock.json +++ b/ui/package-lock.json @@ -1,12 +1,12 @@ { "name": "@feast-dev/feast-ui", - "version": "0.63.0", + "version": "0.64.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@feast-dev/feast-ui", - "version": "0.63.0", + "version": "0.64.0", "license": "Apache-2.0", "dependencies": { "@elastic/datemath": "^5.0.3", diff --git a/ui/src/components/LineageEventsList.tsx b/ui/src/components/LineageEventsList.tsx new file mode 100644 index 00000000000..af9c2ce709d --- /dev/null +++ b/ui/src/components/LineageEventsList.tsx @@ -0,0 +1,222 @@ +import React, { useState } from "react"; +import { + EuiPanel, + EuiTitle, + EuiSpacer, + EuiBasicTable, + EuiLoadingSpinner, + EuiEmptyPrompt, + EuiBadge, + EuiFieldSearch, + EuiFlexGroup, + EuiFlexItem, + EuiSelect, + EuiFlyout, + EuiFlyoutHeader, + EuiFlyoutBody, + EuiCodeBlock, + EuiButtonEmpty, +} from "@elastic/eui"; +import type { OpenLineageEvent } from "../queries/useLoadOpenLineageGraph"; +import { useLoadOpenLineageEvents } from "../queries/useLoadOpenLineageGraph"; + +const eventTypeColor = (eventType: string) => { + switch (eventType) { + case "START": + return "primary"; + case "COMPLETE": + return "success"; + case "FAIL": + return "danger"; + case "ABORT": + return "warning"; + default: + return "default"; + } +}; + +const formatTimestamp = (ts: number) => { + if (!ts) return "-"; + const date = new Date(ts); + return date.toLocaleString(); +}; + +const LineageEventsList: React.FC = () => { + const [namespace, setNamespace] = useState(""); + const [jobFilter, setJobFilter] = useState(""); + const [selectedEvent, setSelectedEvent] = useState( + null, + ); + + const { data, isLoading, isError } = useLoadOpenLineageEvents( + namespace || undefined, + jobFilter || undefined, + 100, + ); + + const columns = [ + { + field: "event_type", + name: "Type", + width: "100px", + render: (val: string) => ( + {val} + ), + }, + { + field: "event_time", + name: "Event Time", + width: "200px", + render: (val: number) => formatTimestamp(val), + }, + { + field: "job_namespace", + name: "Namespace", + width: "200px", + }, + { + field: "job_name", + name: "Job", + width: "250px", + }, + { + field: "run_id", + name: "Run ID", + width: "150px", + render: (val: string) => (val ? val.substring(0, 8) + "..." : "-"), + }, + { + field: "producer", + name: "Producer", + width: "150px", + }, + { + name: "Actions", + width: "80px", + render: (event: OpenLineageEvent) => ( + setSelectedEvent(event)} + > + View + + ), + }, + ]; + + if (isLoading) { + return ( + +
+ +
+
+ ); + } + + if (isError) { + return ( + + Events Unavailable} + body={ +

+ Could not load OpenLineage events. The consumer may not be + enabled. +

+ } + /> +
+ ); + } + + const events = data?.events || []; + + return ( + + +

OpenLineage Events ({events.length})

+
+ + + + + setNamespace(e.target.value)} + aria-label="Filter by namespace" + /> + + + setJobFilter(e.target.value)} + aria-label="Filter by job name" + /> + + + + + + {events.length === 0 ? ( + No Events} + body={

No OpenLineage events match the current filters.

} + /> + ) : ( + + )} + + {selectedEvent && ( + setSelectedEvent(null)} + size="m" + aria-labelledby="eventDetailTitle" + > + + +

+ Event: {selectedEvent.event_type} - {selectedEvent.job_name} +

+
+
+ + + +

Event Details

+
+ +
+ Event ID: {selectedEvent.event_id} +
+
+ Time: {formatTimestamp(selectedEvent.event_time)} +
+
+ Run ID: {selectedEvent.run_id || "-"} +
+
+ Producer: {selectedEvent.producer || "-"} +
+ + +

Raw Event JSON

+
+ + + {JSON.stringify(JSON.parse(selectedEvent.event_json), null, 2)} + +
+
+ )} +
+ ); +}; + +export default LineageEventsList; diff --git a/ui/src/components/OpenLineageGraph.tsx b/ui/src/components/OpenLineageGraph.tsx new file mode 100644 index 00000000000..5b6ceb36500 --- /dev/null +++ b/ui/src/components/OpenLineageGraph.tsx @@ -0,0 +1,957 @@ +import React, { useEffect, useMemo, useState } from "react"; +import { + ReactFlow, + Node, + Edge, + Controls, + Background, + useNodesState, + useEdgesState, + ConnectionLineType, + MarkerType, + Handle, + Position, +} from "reactflow"; + +import "reactflow/dist/style.css"; +import dagre from "dagre"; +import { + EuiPanel, + EuiTitle, + EuiSpacer, + EuiLoadingSpinner, + EuiToolTip, + EuiEmptyPrompt, + EuiFlexGroup, + EuiFlexItem, + EuiFormRow, + EuiSelect, + EuiBadge, +} from "@elastic/eui"; +import { useTheme } from "../contexts/ThemeContext"; +import type { + OpenLineageGraphData, + OpenLineageNode, +} from "../queries/useLoadOpenLineageGraph"; + +const nodeWidth = 280; +const nodeHeight = 65; + +// ── Producer-based colors (generated dynamically) ── + +const normalizeProducer = (producer?: string | null): string => { + if (!producer) return "unknown"; + const p = producer.toLowerCase().trim(); + + // Extract the last meaningful path segment from URLs like + // "https://github.com/OpenLineage/OpenLineage/tree/1.0.0/integration/airflow" + try { + const url = new URL(p); + const segments = url.pathname.split("/").filter(Boolean); + if (segments.length > 0) { + return segments[segments.length - 1].replace(/-/g, "_"); + } + } catch { + // not a URL + } + + // For plain names like "feast" or "my-custom-producer", just clean up + return p.replace(/-/g, "_").replace(/\s+/g, "_"); +}; + +const hashString = (str: string): number => { + let hash = 0; + for (let i = 0; i < str.length; i++) { + hash = str.charCodeAt(i) + ((hash << 5) - hash); + } + return Math.abs(hash); +}; + +const generateProducerColor = ( + producer: string, +): { main: string; light: string } => { + if (producer === "unknown") return { main: "#888888", light: "#f0f0f0" }; + const hue = hashString(producer) % 360; + const main = `hsl(${hue}, 70%, 45%)`; + const light = `hsl(${hue}, 60%, 94%)`; + return { main, light }; +}; + +const producerColorCache: Record = {}; + +const getProducerColors = (producer?: string | null) => { + const key = normalizeProducer(producer); + if (!producerColorCache[key]) { + producerColorCache[key] = generateProducerColor(key); + } + return producerColorCache[key]; +}; + +const getNodeIcon = (type: string) => { + return type === "job" ? "\u2699" : "\u2B21"; +}; + +// ── Custom Node ── + +interface LineageNodeData { + label: string; + type: string; + producer?: string; + namespace?: string; + nodeRef?: OpenLineageNode; + onNodeClick?: (node: OpenLineageNode) => void; +} + +const LineageCustomNode = ({ data }: { data: LineageNodeData }) => { + const [isHovered, setIsHovered] = useState(false); + const colors = getProducerColors(data.producer); + const icon = getNodeIcon(data.type); + const producerLabel = normalizeProducer(data.producer); + + const handleClick = () => { + if (data.onNodeClick && data.nodeRef) { + data.onNodeClick(data.nodeRef); + } + }; + + return ( +
setIsHovered(true)} + onMouseLeave={() => setIsHovered(false)} + onClick={handleClick} + > +
+ {producerLabel} +
+ + {data.namespace && isHovered && ( + +
+ {data.namespace} +
+
+ )} + + +
+
{icon}
+
+
+ {data.label} +
+ +
+ ); +}; + +const lineageNodeTypes = { lineageCustom: LineageCustomNode }; + +// ── Dagre layout ── + +const layoutGraph = (nodes: Node[], edges: Edge[], direction = "LR") => { + const dagreGraph = new dagre.graphlib.Graph(); + dagreGraph.setDefaultEdgeLabel(() => ({})); + dagreGraph.setGraph({ + rankdir: direction, + nodesep: 80, + ranksep: 120, + marginx: 50, + marginy: 50, + }); + + nodes.forEach((node) => { + dagreGraph.setNode(node.id, { width: nodeWidth, height: nodeHeight }); + }); + + edges.forEach((edge) => { + if (dagreGraph.hasNode(edge.source) && dagreGraph.hasNode(edge.target)) { + dagreGraph.setEdge(edge.source, edge.target); + } + }); + + dagre.layout(dagreGraph); + + return { + nodes: nodes.map((node) => { + const pos = dagreGraph.node(node.id); + return { + ...node, + position: { + x: (pos?.x ?? 0) - nodeWidth / 2, + y: (pos?.y ?? 0) - nodeHeight / 2, + }, + sourcePosition: direction === "TB" ? Position.Bottom : Position.Right, + targetPosition: direction === "TB" ? Position.Top : Position.Left, + }; + }), + edges, + }; +}; + +// ── Legend ── + +const ProducerLegend: React.FC<{ producers: string[] }> = ({ producers }) => { + const { colorMode } = useTheme(); + const isDarkMode = colorMode === "dark"; + const bg = isDarkMode ? "#1D1E24" : "white"; + const border = isDarkMode ? "#343741" : "#ddd"; + const text = isDarkMode ? "#DFE5EF" : "#333"; + + const items = producers.map((p) => ({ + key: p, + label: p.charAt(0).toUpperCase() + p.slice(1), + color: getProducerColors(p).main, + })); + + return ( +
+
+ Producers +
+ {items.map((item) => ( +
+
+
{item.label}
+
+ ))} +
+ ); +}; + +// ── Node Detail Panel ── + +const NodeDetailPanel: React.FC<{ + node: OpenLineageNode; + onClose: () => void; +}> = ({ node, onClose }) => { + const { colorMode } = useTheme(); + const isDarkMode = colorMode === "dark"; + const colors = getProducerColors(node.producer); + + const schema = node.schema; + const fields = schema?.fields || []; + const facets = node.facets || {}; + + const tags: string[] = []; + if (facets.feast_featureView?.tags) { + Object.entries(facets.feast_featureView.tags).forEach(([k, v]) => + tags.push(`${k}:${v}`), + ); + } + if (facets.feast_entity?.tags) { + Object.entries(facets.feast_entity.tags).forEach(([k, v]) => + tags.push(`${k}:${v}`), + ); + } + + const features: string[] = facets.feast_featureView?.features || []; + const entities: string[] = facets.feast_featureView?.entities || []; + const fvDescription = + node.description || + facets.documentation?.description || + facets.feast_featureView?.description || + facets.feast_featureService?.description || + facets.feast_entity?.description; + + const featureViews: string[] = + facets.feast_featureService?.feature_views || []; + const dqMetrics = facets.dataQualityMetrics; + const sqlFacet = facets.sql; + const dataSource = facets.dataSource; + const ownership = facets.ownership; + + const knownFacetKeys = new Set([ + "schema", + "documentation", + "feast_featureView", + "feast_featureService", + "feast_entity", + "dataQualityMetrics", + "sql", + "dataSource", + "ownership", + "symlinks", + "jobType", + ]); + const otherFacets = Object.keys(facets).filter((k) => !knownFacetKeys.has(k)); + + return ( +
+
+
+
+ {node.type} +
+
+ {node.name} +
+
+ +
+ + + {normalizeProducer(node.producer)} + + {node.job_type && ( + + {node.job_type} + + )} + {node.source_type && ( + + {node.source_type} + + )} + +
+ {node.namespace} +
+ + {fvDescription && ( +
+
Description
+
+ {fvDescription} +
+
+ )} + + {tags.length > 0 && ( +
+
Tags
+
+ {tags.map((t) => ( + + {t} + + ))} +
+
+ )} + + {entities.length > 0 && ( +
+
Entities
+ {entities.map((e) => ( +
+ {e} +
+ ))} +
+ )} + + {features.length > 0 && ( +
+
Features
+ {features.map((f) => ( +
+ {f} +
+ ))} +
+ )} + + {featureViews.length > 0 && ( +
+
Feature Views
+ {featureViews.map((fv) => ( +
+ {fv} +
+ ))} +
+ )} + + {fields.length > 0 && ( +
+
+ Schema ({fields.length} fields) +
+ + + + + + + + + {fields.map((f: any) => ( + + + + + ))} + +
FieldType
+ {f.name} + + {f.type || "—"} +
+
+ )} + + {node.feast_object_type && node.feast_object_type !== "unknown" && ( +
+
Feast Mapping
+
+ Type:{" "} + {node.feast_object_type} +
+ {node.feast_object_name && ( +
+ Name:{" "} + {node.feast_object_name} +
+ )} + {node.feast_project && ( +
+ Project:{" "} + {node.feast_project} +
+ )} +
+ )} + + {dataSource && ( +
+
Data Source
+ {dataSource.name && ( +
+ Name: {dataSource.name} +
+ )} + {dataSource.uri && ( +
+ {dataSource.uri} +
+ )} +
+ )} + + {ownership && ownership.owners && ( +
+
Owners
+ {ownership.owners.map((o: any, i: number) => ( +
+ {o.name || o.owner} + {o.type ? ` (${o.type})` : ""} +
+ ))} +
+ )} + + {sqlFacet && sqlFacet.query && ( +
+
SQL
+
+            {sqlFacet.query}
+          
+
+ )} + + {dqMetrics && ( +
+
Data Quality
+ {dqMetrics.rowCount != null && ( +
+ Row count:{" "} + {dqMetrics.rowCount.toLocaleString()} +
+ )} + {dqMetrics.columnMetrics && + Object.entries(dqMetrics.columnMetrics).map( + ([col, metrics]: [string, any]) => ( +
+ {col} + + {metrics.nullCount != null && `nulls: ${metrics.nullCount}`} + {metrics.distinctCount != null && + ` distinct: ${metrics.distinctCount}`} + {metrics.min != null && ` min: ${metrics.min}`} + {metrics.max != null && ` max: ${metrics.max}`} + +
+ ), + )} +
+ )} + + {otherFacets.length > 0 && ( +
+
Other Facets
+ {otherFacets.map((key) => ( +
+ {key} +
+ ))} +
+ )} +
+ ); +}; + +// ── Main LineageGraph component ── + +interface LineageGraphProps { + olData?: OpenLineageGraphData; + olLoading: boolean; + olError: boolean; + feastOnlyCheckbox?: React.ReactNode; +} + +const LineageGraph: React.FC = ({ + olData, + olLoading, + olError, + feastOnlyCheckbox, +}) => { + const [nodes, setNodes, onNodesChange] = useNodesState([]); + const [edges, setEdges, onEdgesChange] = useEdgesState([]); + + const [filterType, setFilterType] = useState(""); + const [filterProducer, setFilterProducer] = useState(""); + const [filterObject, setFilterObject] = useState(""); + const [selectedNode, setSelectedNode] = useState( + null, + ); + + const producers = useMemo(() => { + if (!olData) return []; + const set = new Set(); + for (const n of olData.nodes) { + set.add(normalizeProducer(n.producer)); + } + return Array.from(set).sort(); + }, [olData]); + + const objectOptions = useMemo(() => { + if (!olData) return []; + return olData.nodes + .filter((n) => { + if (filterType && n.type !== filterType) return false; + if (filterProducer && normalizeProducer(n.producer) !== filterProducer) + return false; + return true; + }) + .map((n) => n.name) + .filter((v, i, a) => a.indexOf(v) === i) + .sort(); + }, [olData, filterType, filterProducer]); + + useEffect(() => { + setFilterObject(""); + }, [filterType, filterProducer]); + + useEffect(() => { + if (!olData) return; + + let filteredNodes = olData.nodes; + if (filterType) { + filteredNodes = filteredNodes.filter((n) => n.type === filterType); + } + if (filterProducer) { + filteredNodes = filteredNodes.filter( + (n) => normalizeProducer(n.producer) === filterProducer, + ); + } + + const makeId = (type: string, ns: string, name: string) => + `${type}:${ns}:${name}`; + + if (filterObject) { + const focusIds = new Set( + filteredNodes + .filter((n) => n.name === filterObject) + .map((n) => makeId(n.type, n.namespace, n.name)), + ); + + const connectedIds = new Set(); + for (const e of olData.edges) { + const srcId = makeId(e.source_type, e.source_namespace, e.source_name); + const tgtId = makeId(e.target_type, e.target_namespace, e.target_name); + if (focusIds.has(srcId)) connectedIds.add(tgtId); + if (focusIds.has(tgtId)) connectedIds.add(srcId); + } + + const visibleIds = new Set( + Array.from(focusIds).concat(Array.from(connectedIds)), + ); + filteredNodes = olData.nodes.filter((n) => + visibleIds.has(makeId(n.type, n.namespace, n.name)), + ); + } + + const filteredNodeIds = new Set( + filteredNodes.map((n) => makeId(n.type, n.namespace, n.name)), + ); + + const flowNodes: Node[] = filteredNodes.map((n) => ({ + id: makeId(n.type, n.namespace, n.name), + type: "lineageCustom", + data: { + label: n.name, + type: n.type, + producer: n.producer, + namespace: n.namespace, + nodeRef: n, + onNodeClick: setSelectedNode, + } as LineageNodeData, + position: { x: 0, y: 0 }, + })); + + const flowEdges: Edge[] = olData.edges + .filter((e) => { + const srcId = makeId(e.source_type, e.source_namespace, e.source_name); + const tgtId = makeId(e.target_type, e.target_namespace, e.target_name); + return filteredNodeIds.has(srcId) && filteredNodeIds.has(tgtId); + }) + .map((e, i) => { + const isSymlink = e.edge_type === "symlink"; + const color = isSymlink + ? "#999999" + : e.edge_type === "derived" + ? "#3366cc" + : "#e67300"; + return { + id: `ol-edge-${i}`, + source: makeId(e.source_type, e.source_namespace, e.source_name), + sourceHandle: "source", + target: makeId(e.target_type, e.target_namespace, e.target_name), + targetHandle: "target", + animated: !isSymlink, + style: { + strokeWidth: isSymlink ? 1 : 2, + stroke: color, + strokeDasharray: isSymlink + ? "3 3" + : e.edge_type === "derived" + ? "5 3" + : "none", + }, + type: "smoothstep", + markerEnd: { + type: MarkerType.ArrowClosed, + width: 16, + height: 16, + color, + }, + }; + }); + + const { nodes: ln, edges: le } = layoutGraph(flowNodes, flowEdges); + setNodes(ln); + setEdges(le); + }, [olData, filterType, filterProducer, filterObject, setNodes, setEdges]); + + if (olLoading) { + return ( + +
+ +
+
+ ); + } + + if (olError || !olData) { + return ( + + OpenLineage Data Unavailable} + body={ +

+ The OpenLineage consumer is not enabled or no events have been + received yet. Enable the consumer in your{" "} + feature_store.yaml configuration. +

+ } + /> +
+ ); + } + + if (olData.nodes.length === 0) { + return ( + + No Lineage Events} + body={ +

+ No OpenLineage events have been received yet. Configure your data + pipeline producers (Airflow, Spark, dbt, Feast) to send events to + this instance. +

+ } + /> +
+ ); + } + + return ( + +
+ +

OpenLineage Graph

+
+ {feastOnlyCheckbox && ( +
+ {feastOnlyCheckbox} +
+ )} +
+ + + + + setFilterType(e.target.value)} + aria-label="Filter by type" + /> + + + + + ({ + value: p, + text: p.charAt(0).toUpperCase() + p.slice(1), + })), + ]} + value={filterProducer} + onChange={(e) => setFilterProducer(e.target.value)} + aria-label="Filter by producer" + /> + + + + + ({ value: name, text: name })), + ]} + value={filterObject} + onChange={(e) => setFilterObject(e.target.value)} + aria-label="Select object" + /> + + + +
+
+ setSelectedNode(null)} + > + + + + +
+ {selectedNode && ( + setSelectedNode(null)} + /> + )} +
+
+ ); +}; + +export { LineageGraph }; +export default LineageGraph; diff --git a/ui/src/components/RegistryVisualization.tsx b/ui/src/components/RegistryVisualization.tsx index a974f335bda..b8730e59e0b 100644 --- a/ui/src/components/RegistryVisualization.tsx +++ b/ui/src/components/RegistryVisualization.tsx @@ -86,6 +86,10 @@ const getNodeColor = (type: FEAST_FCO_TYPES) => { return "#0194e2"; // MLflow brand blue case FEAST_FCO_TYPES.mlflowModel: return "#7b2d8e"; // Purple + case FEAST_FCO_TYPES.openlineageJob: + return "#e67300"; // Deep orange for OL jobs + case FEAST_FCO_TYPES.openlineageDataset: + return "#3366cc"; // Steel blue for OL datasets default: return "#666666"; // Gray } @@ -107,6 +111,10 @@ const getLightNodeColor = (type: FEAST_FCO_TYPES) => { return "#e6f6fd"; // Light MLflow blue case FEAST_FCO_TYPES.mlflowModel: return "#f3e6f9"; // Light purple + case FEAST_FCO_TYPES.openlineageJob: + return "#fff0e0"; // Light deep orange + case FEAST_FCO_TYPES.openlineageDataset: + return "#e0ecff"; // Light steel blue default: return "#f0f0f0"; // Light gray } @@ -128,6 +136,10 @@ const getNodeIcon = (type: FEAST_FCO_TYPES) => { return "⬡"; // Hexagon for MLflow run case FEAST_FCO_TYPES.mlflowModel: return "⬢"; // Filled hexagon for registered model + case FEAST_FCO_TYPES.openlineageJob: + return "⚙"; // Gear for OL job + case FEAST_FCO_TYPES.openlineageDataset: + return "⬡"; // Hexagon for OL dataset default: return "●"; // Default circle } @@ -434,6 +446,8 @@ const getLayoutedElements = ( [FEAST_FCO_TYPES.labelView]: [], [FEAST_FCO_TYPES.mlflowRun]: [], [FEAST_FCO_TYPES.mlflowModel]: [], + [FEAST_FCO_TYPES.openlineageJob]: [], + [FEAST_FCO_TYPES.openlineageDataset]: [], }; isolatedNodes.forEach((node) => { @@ -928,6 +942,10 @@ const getNodePrefix = (type: FEAST_FCO_TYPES) => { return "mlflow"; case FEAST_FCO_TYPES.mlflowModel: return "model"; + case FEAST_FCO_TYPES.openlineageJob: + return "ol-job"; + case FEAST_FCO_TYPES.openlineageDataset: + return "ol-ds"; default: return "unknown"; } @@ -940,6 +958,8 @@ interface RegistryVisualizationProps { filterNode?: { type: FEAST_FCO_TYPES; name: string }; permissions?: any[]; mlflowRuns?: MlflowRunData[]; + extraCheckboxes?: React.ReactNode; + filterControls?: React.ReactNode; } const RegistryVisualization: React.FC = ({ @@ -949,6 +969,8 @@ const RegistryVisualization: React.FC = ({ filterNode, permissions, mlflowRuns, + extraCheckboxes, + filterControls, }) => { const [nodes, setNodes, onNodesChange] = useNodesState([]); const [edges, setEdges, onEdgesChange] = useEdgesState([]); @@ -1066,7 +1088,15 @@ const RegistryVisualization: React.FC = ({

Lineage

-
+
+ {extraCheckboxes}
+ {filterControls} {loading ? (
diff --git a/ui/src/components/RegistryVisualizationTab.tsx b/ui/src/components/RegistryVisualizationTab.tsx index 5afdce0c223..4fed3c4f856 100644 --- a/ui/src/components/RegistryVisualizationTab.tsx +++ b/ui/src/components/RegistryVisualizationTab.tsx @@ -16,7 +16,13 @@ import RegistryVisualization from "./RegistryVisualization"; import { FEAST_FCO_TYPES } from "../parsers/types"; import { filterPermissionsByAction } from "../utils/permissionUtils"; -const RegistryVisualizationTab = () => { +interface RegistryVisualizationTabProps { + feastOnlyCheckbox?: React.ReactNode; +} + +const RegistryVisualizationTab: React.FC = ({ + feastOnlyCheckbox, +}) => { const registryUrl = useContext(RegistryPathContext); const { projectName } = useParams(); const { isLoading, isSuccess, isError, data } = useLoadRegistry( @@ -93,70 +99,6 @@ const RegistryVisualizationTab = () => { {isSuccess && data && ( <> - - - - { - setSelectedObjectType(e.target.value); - setSelectedObjectName(""); // Reset name when type changes - }} - aria-label="Select object type" - /> - - - - - ({ - value: name, - text: name, - }), - ), - ]} - value={selectedObjectName} - onChange={(e) => setSelectedObjectName(e.target.value)} - aria-label="Select object" - disabled={selectedObjectType === ""} - /> - - - - - setSelectedPermissionAction(e.target.value)} - aria-label="Filter by permissions" - /> - - - { : undefined } mlflowRuns={mlflowData?.runs?.length ? mlflowData.runs : undefined} + extraCheckboxes={feastOnlyCheckbox} + filterControls={ + + + + { + setSelectedObjectType(e.target.value); + setSelectedObjectName(""); + }} + aria-label="Select object type" + /> + + + + + ({ + value: name, + text: name, + })), + ]} + value={selectedObjectName} + onChange={(e) => setSelectedObjectName(e.target.value)} + aria-label="Select object" + disabled={selectedObjectType === ""} + /> + + + + + + setSelectedPermissionAction(e.target.value) + } + aria-label="Filter by permissions" + /> + + + + } /> )} diff --git a/ui/src/hooks/useFCOExploreSuggestions.ts b/ui/src/hooks/useFCOExploreSuggestions.ts index 9068eefa0d4..e9ab456f72b 100644 --- a/ui/src/hooks/useFCOExploreSuggestions.ts +++ b/ui/src/hooks/useFCOExploreSuggestions.ts @@ -25,6 +25,8 @@ const FCO_TO_URL_NAME_MAP: Record = { labelView: "/label-view", mlflowRun: "/mlflow-run", mlflowModel: "/mlflow-model", + openlineageJob: "/lineage", + openlineageDataset: "/lineage", }; const createSearchLink = ( diff --git a/ui/src/pages/lineage/Index.tsx b/ui/src/pages/lineage/Index.tsx index a3a9ca19296..40bf7f83afc 100644 --- a/ui/src/pages/lineage/Index.tsx +++ b/ui/src/pages/lineage/Index.tsx @@ -1,18 +1,29 @@ -import React, { useContext } from "react"; +import React, { useContext, useState } from "react"; import { EuiPageTemplate, EuiTitle, EuiSpacer, EuiSkeletonText, EuiEmptyPrompt, + EuiButtonGroup, } from "@elastic/eui"; import { useDocumentTitle } from "../../hooks/useDocumentTitle"; import useLoadRegistry from "../../queries/useLoadRegistry"; import RegistryPathContext from "../../contexts/RegistryPathContext"; import RegistryVisualizationTab from "../../components/RegistryVisualizationTab"; +import { LineageGraph } from "../../components/OpenLineageGraph"; +import LineageEventsList from "../../components/LineageEventsList"; +import { useLoadOpenLineageGraph } from "../../queries/useLoadOpenLineageGraph"; import { useParams } from "react-router-dom"; +type ActiveTab = "lineage" | "events"; + +const tabButtons = [ + { id: "lineage", label: "Lineage" }, + { id: "events", label: "Events" }, +]; + const LineagePage = () => { useDocumentTitle("Feast Lineage"); const registryUrl = useContext(RegistryPathContext); @@ -22,7 +33,14 @@ const LineagePage = () => { projectName, ); - // Show message for "All Projects" view + const [activeTab, setActiveTab] = useState("lineage"); + const [registryOnly, setRegistryOnly] = useState(false); + + const olGraphQuery = useLoadOpenLineageGraph(); + + const olConsumerAvailable = + !olGraphQuery.isError && olGraphQuery.data !== undefined; + if (projectName === "all") { return ( @@ -81,7 +99,66 @@ const LineagePage = () => { /> )} - {isSuccess && } + {isSuccess && ( + <> + {olConsumerAvailable ? ( + <> + setActiveTab(id as ActiveTab)} + buttonSize="m" + isFullWidth={false} + /> + + + {activeTab === "lineage" && ( + <> + {registryOnly ? ( + + + setRegistryOnly(e.target.checked) + } + /> + {" Feast Only Lineage"} + + } + /> + ) : ( + + + setRegistryOnly(e.target.checked) + } + /> + {" Feast Only Lineage"} + + } + /> + )} + + )} + + {activeTab === "events" && } + + ) : ( + + )} + + )} ); diff --git a/ui/src/parsers/types.ts b/ui/src/parsers/types.ts index ee0020bc8e5..fc9f88d045f 100644 --- a/ui/src/parsers/types.ts +++ b/ui/src/parsers/types.ts @@ -6,6 +6,8 @@ enum FEAST_FCO_TYPES { labelView = "labelView", mlflowRun = "mlflowRun", mlflowModel = "mlflowModel", + openlineageJob = "openlineageJob", + openlineageDataset = "openlineageDataset", } export { FEAST_FCO_TYPES }; diff --git a/ui/src/queries/useLoadOpenLineageGraph.ts b/ui/src/queries/useLoadOpenLineageGraph.ts new file mode 100644 index 00000000000..25bd4e12389 --- /dev/null +++ b/ui/src/queries/useLoadOpenLineageGraph.ts @@ -0,0 +1,132 @@ +import { useContext } from "react"; +import { useQuery } from "react-query"; +import RegistryPathContext from "../contexts/RegistryPathContext"; +import { useDataMode } from "../contexts/DataModeContext"; +import restFetch from "./restApiClient"; + +export interface OpenLineageNode { + type: string; + namespace: string; + name: string; + producer?: string; + feast_object_type?: string; + feast_object_name?: string; + feast_project?: string; + schema?: any; + description?: string; + job_type?: string; + source_type?: string; + facets?: Record; +} + +export interface OpenLineageEdge { + source_type: string; + source_namespace: string; + source_name: string; + target_type: string; + target_namespace: string; + target_name: string; + edge_type?: string; + updated_at?: number; +} + +export interface OpenLineageSymlink { + dataset_namespace: string; + dataset_name: string; + linked_namespace: string; + linked_name: string; + link_type: string; +} + +export interface OpenLineageGraphData { + nodes: OpenLineageNode[]; + edges: OpenLineageEdge[]; + symlinks?: OpenLineageSymlink[]; +} + +export interface OpenLineageEvent { + event_id: string; + event_type: string; + event_time: number; + producer?: string; + job_namespace: string; + job_name: string; + run_id?: string; + event_json: string; + created_at: number; +} + +export interface RegistryRelationship { + source: { type: string; name: string }; + target: { type: string; name: string }; + type: string; + project?: string; +} + +export interface RegistryLineageData { + relationships: RegistryRelationship[]; + indirect_relationships: RegistryRelationship[]; +} + +const useLoadOpenLineageGraph = () => { + const registryUrl = useContext(RegistryPathContext); + const { fetchOptions } = useDataMode(); + + return useQuery( + ["openlineage-graph"], + () => + restFetch( + registryUrl, + "/lineage/openlineage/graph", + fetchOptions, + ), + { enabled: !!registryUrl }, + ); +}; + +const useLoadOpenLineageEvents = ( + namespace?: string, + jobName?: string, + limit: number = 100, +) => { + const registryUrl = useContext(RegistryPathContext); + const { fetchOptions } = useDataMode(); + + const params = new URLSearchParams(); + if (namespace) params.set("namespace", namespace); + if (jobName) params.set("job_name", jobName); + params.set("limit", limit.toString()); + + return useQuery<{ events: OpenLineageEvent[]; total: number }>( + ["openlineage-events", namespace, jobName, limit], + () => + restFetch( + registryUrl, + `/lineage/openlineage/events?${params.toString()}`, + fetchOptions, + ), + { enabled: !!registryUrl }, + ); +}; + +const useLoadRegistryLineage = (project?: string) => { + const registryUrl = useContext(RegistryPathContext); + const { fetchOptions } = useDataMode(); + + return useQuery( + ["registry-lineage", project], + () => + restFetch( + registryUrl, + `/lineage/registry?project=${project}`, + fetchOptions, + ), + { enabled: !!registryUrl && !!project }, + ); +}; + +export { + useLoadOpenLineageGraph, + useLoadOpenLineageEvents, + useLoadRegistryLineage, +};