diff --git a/docs/how-to-guides/feast-operator/07-openlineage-and-materialization.md b/docs/how-to-guides/feast-operator/07-openlineage-and-materialization.md index 72b2fe92a7e..8d46ed03ef8 100644 --- a/docs/how-to-guides/feast-operator/07-openlineage-and-materialization.md +++ b/docs/how-to-guides/feast-operator/07-openlineage-and-materialization.md @@ -103,7 +103,7 @@ openlineage: | Field | Type | Description | |-------|------|-------------| | `enabled` | bool | Activates OpenLineage. Must be `true` | -| `transportType` | string | `http` / `console` / `file` / `kafka` | +| `transportType` | string | `http` / `console` / `file` / `kafka` (omit to use OpenLineage SDK defaults) | | `transportUrl` | string | Base URL for HTTP transport | | `transportEndpoint` | string | API path appended to `transportUrl` | | `apiKeySecretRef.name` | string | Name of a Secret containing key `api_key` | diff --git a/docs/reference/openlineage.md b/docs/reference/openlineage.md index 01837c9936a..78438082d44 100644 --- a/docs/reference/openlineage.md +++ b/docs/reference/openlineage.md @@ -88,7 +88,7 @@ fs.materialize( | Option | Default | Description | |--------|---------|-------------| | `enabled` | `false` | Enable/disable OpenLineage integration | -| `transport_type` | `http` | Transport type: `http`, `file`, `kafka` | +| `transport_type` | `None` | Transport type: `http`, `console`, `file`, `kafka`. When unset, defers to OpenLineage SDK defaults. | | `transport_url` | - | URL for HTTP transport (required) | | `transport_endpoint` | `api/v1/lineage` | API endpoint for HTTP transport | | `api_key` | - | Optional API key for authentication | diff --git a/sdk/python/feast/openlineage/client.py b/sdk/python/feast/openlineage/client.py index 45d021f2f07..445231aae89 100644 --- a/sdk/python/feast/openlineage/client.py +++ b/sdk/python/feast/openlineage/client.py @@ -106,9 +106,12 @@ def __init__( # Initialize the OpenLineage client try: transport_config = self._config.get_transport_config() - self._client = OpenLineageClient(config={"transport": transport_config}) + if transport_config is None: + self._client = OpenLineageClient() + else: + self._client = OpenLineageClient(config={"transport": transport_config}) logger.info( - f"OpenLineage client initialized with {self._config.transport_type} transport" + f"OpenLineage client initialized with {self._config.transport_type or 'default'} transport" ) except Exception as e: logger.error(f"Failed to initialize OpenLineage client: {e}") diff --git a/sdk/python/feast/openlineage/config.py b/sdk/python/feast/openlineage/config.py index 4d8b7684179..7c3b1fd9814 100644 --- a/sdk/python/feast/openlineage/config.py +++ b/sdk/python/feast/openlineage/config.py @@ -28,7 +28,8 @@ class OpenLineageConfig: Attributes: enabled: Whether OpenLineage integration is enabled - transport_type: Type of transport (http, console, file, kafka) + transport_type: Type of transport (http, console, file, kafka), or None to use + OpenLineage SDK defaults transport_url: URL for HTTP transport transport_endpoint: API endpoint for HTTP transport api_key: Optional API key for authentication @@ -40,7 +41,7 @@ class OpenLineageConfig: """ enabled: bool = True - transport_type: str = "console" + transport_type: Optional[str] = None transport_url: Optional[str] = None transport_endpoint: str = "api/v1/lineage" api_key: Optional[str] = None @@ -63,7 +64,7 @@ def from_dict(cls, config_dict: Dict[str, Any]) -> "OpenLineageConfig": """ return cls( enabled=config_dict.get("enabled", True), - transport_type=config_dict.get("transport_type", "console"), + transport_type=config_dict.get("transport_type"), transport_url=config_dict.get("transport_url"), transport_endpoint=config_dict.get("transport_endpoint", "api/v1/lineage"), api_key=config_dict.get("api_key"), @@ -81,7 +82,7 @@ def from_env(cls) -> "OpenLineageConfig": Environment variables: FEAST_OPENLINEAGE_ENABLED: Enable/disable OpenLineage (default: true) - FEAST_OPENLINEAGE_TRANSPORT_TYPE: Transport type (default: console) + FEAST_OPENLINEAGE_TRANSPORT_TYPE: Transport type (default: None, uses OL SDK defaults) FEAST_OPENLINEAGE_URL: HTTP transport URL FEAST_OPENLINEAGE_ENDPOINT: API endpoint (default: api/v1/lineage) FEAST_OPENLINEAGE_API_KEY: API key for authentication @@ -93,7 +94,7 @@ def from_env(cls) -> "OpenLineageConfig": """ return cls( enabled=os.getenv("FEAST_OPENLINEAGE_ENABLED", "true").lower() == "true", - transport_type=os.getenv("FEAST_OPENLINEAGE_TRANSPORT_TYPE", "console"), + transport_type=os.getenv("FEAST_OPENLINEAGE_TRANSPORT_TYPE"), transport_url=os.getenv("FEAST_OPENLINEAGE_URL"), transport_endpoint=os.getenv( "FEAST_OPENLINEAGE_ENDPOINT", "api/v1/lineage" @@ -129,13 +130,17 @@ def to_dict(self) -> Dict[str, Any]: "additional_config": self.additional_config, } - def get_transport_config(self) -> Dict[str, Any]: + def get_transport_config(self) -> Optional[Dict[str, Any]]: """ Get transport-specific configuration for OpenLineage client. Returns: - Dictionary with transport configuration + Dictionary with transport configuration, or None if transport_type + is not set (allowing the OpenLineage SDK to use its own defaults). """ + if not self.transport_type: + return None + config: Dict[str, Any] = {"type": self.transport_type} if self.transport_type == "http": diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index b8bc794aaf5..7518f613788 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -249,8 +249,8 @@ class OpenLineageConfig(FeastBaseModel): enabled: StrictBool = False """ bool: Whether OpenLineage integration is enabled. Defaults to False. """ - transport_type: StrictStr = "console" - """ str: Type of transport (http, console, file, kafka). Defaults to console. """ + transport_type: Optional[StrictStr] = None + """ str: Type of transport (http, console, file, kafka). Defaults to None (uses OpenLineage SDK defaults). """ transport_url: Optional[StrictStr] = None """ str: URL for HTTP transport. Required when transport_type is 'http'. """ diff --git a/sdk/python/tests/unit/test_openlineage_client.py b/sdk/python/tests/unit/test_openlineage_client.py new file mode 100644 index 00000000000..9be050ce473 --- /dev/null +++ b/sdk/python/tests/unit/test_openlineage_client.py @@ -0,0 +1,151 @@ +# Copyright 2026 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path + +import pytest + +# --------------------------------------------------------------------------- +# Guard: skip entire module if openlineage-python is not installed +# --------------------------------------------------------------------------- +ol = pytest.importorskip( + "openlineage.client", reason="openlineage-python not installed" +) + +from openlineage.client.transport.console import ConsoleTransport # noqa: E402 +from openlineage.client.transport.transform import TransformTransport # noqa: E402 + +from feast.openlineage.client import FeastOpenLineageClient # noqa: E402 +from feast.openlineage.config import OpenLineageConfig # noqa: E402 + +_TRANSFORM_YML = """\ +transport: + type: transform + transformer_class: openlineage.client.transport.transform.JobNamespaceReplaceTransformer + transformer_properties: + new_job_namespace: new_value + transport: + type: console +""" + + +def _write_openlineage_yml(tmp_path: Path, content: str = _TRANSFORM_YML) -> str: + """Write an openlineage.yml file and return its path.""" + yml = tmp_path / "openlineage.yml" + yml.write_text(content) + return str(yml) + + +class TestDefaultConsoleTransport: + """When transport_type is None and there is no openlineage.yml, + the OpenLineage SDK should fall back to ConsoleTransport.""" + + def test_default_config_uses_console_transport( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + # Ensure no openlineage.yml is found by changing cwd to an empty dir + monkeypatch.chdir(tmp_path) + monkeypatch.delenv("OPENLINEAGE_CONFIG", raising=False) + monkeypatch.delenv("OPENLINEAGE_URL", raising=False) + monkeypatch.delenv("OPENLINEAGE_DISABLED", raising=False) + + config = OpenLineageConfig(enabled=True) # transport_type defaults to None + client = FeastOpenLineageClient(config=config) + + assert client.is_enabled + assert isinstance(client._client.transport, ConsoleTransport) + + def test_default_from_dict_uses_console_transport( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + monkeypatch.chdir(tmp_path) + monkeypatch.delenv("OPENLINEAGE_CONFIG", raising=False) + monkeypatch.delenv("OPENLINEAGE_URL", raising=False) + monkeypatch.delenv("OPENLINEAGE_DISABLED", raising=False) + + config = OpenLineageConfig.from_dict({"enabled": True}) + client = FeastOpenLineageClient(config=config) + + assert client.is_enabled + assert isinstance(client._client.transport, ConsoleTransport) + + +class TestTransformTransportFromYml: + def test_transform_yml_is_respected( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + yml_path = _write_openlineage_yml(tmp_path) + monkeypatch.setenv("OPENLINEAGE_CONFIG", yml_path) + monkeypatch.delenv("OPENLINEAGE_URL", raising=False) + monkeypatch.delenv("OPENLINEAGE_DISABLED", raising=False) + + config = OpenLineageConfig(enabled=True) # transport_type=None + client = FeastOpenLineageClient(config=config) + + assert client.is_enabled + assert isinstance(client._client.transport, TransformTransport) + # The inner transport should be console + assert isinstance(client._client.transport.transport, ConsoleTransport) + + def test_transform_yml_in_cwd_is_respected( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + monkeypatch.delenv("OPENLINEAGE_CONFIG", raising=False) + monkeypatch.delenv("OPENLINEAGE_URL", raising=False) + monkeypatch.delenv("OPENLINEAGE_DISABLED", raising=False) + + # Write openlineage.yml in the dir we'll chdir to + _write_openlineage_yml(tmp_path) + monkeypatch.chdir(tmp_path) + + config = OpenLineageConfig(enabled=True) # transport_type=None + client = FeastOpenLineageClient(config=config) + + assert client.is_enabled + assert isinstance(client._client.transport, TransformTransport) + + +class TestExplicitConfigOverridesYml: + def test_explicit_console_ignores_yml_env( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + yml_path = _write_openlineage_yml(tmp_path) + monkeypatch.setenv("OPENLINEAGE_CONFIG", yml_path) + monkeypatch.delenv("OPENLINEAGE_URL", raising=False) + monkeypatch.delenv("OPENLINEAGE_DISABLED", raising=False) + + config = OpenLineageConfig(enabled=True, transport_type="console") + client = FeastOpenLineageClient(config=config) + + assert client.is_enabled + # Must be plain ConsoleTransport, NOT TransformTransport + assert isinstance(client._client.transport, ConsoleTransport) + assert not isinstance(client._client.transport, TransformTransport) + + def test_explicit_console_ignores_yml_in_cwd( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + monkeypatch.delenv("OPENLINEAGE_CONFIG", raising=False) + monkeypatch.delenv("OPENLINEAGE_URL", raising=False) + monkeypatch.delenv("OPENLINEAGE_DISABLED", raising=False) + + _write_openlineage_yml(tmp_path) + monkeypatch.chdir(tmp_path) + + config = OpenLineageConfig(enabled=True, transport_type="console") + client = FeastOpenLineageClient(config=config) + + assert client.is_enabled + assert isinstance(client._client.transport, ConsoleTransport) + assert not isinstance(client._client.transport, TransformTransport) diff --git a/skills/references/configuration.md b/skills/references/configuration.md index 9a1720984b1..7a63b8429bd 100644 --- a/skills/references/configuration.md +++ b/skills/references/configuration.md @@ -258,7 +258,7 @@ materialization: ```yaml openlineage: enabled: true - transport_type: http # http, console, file, kafka + transport_type: http # http, console, file, kafka (omit to use OpenLineage SDK defaults) transport_url: http://marquez:5000 transport_endpoint: api/v1/lineage namespace: feast