diff --git a/.github/workflows/pr_local_integration_tests.yml b/.github/workflows/pr_local_integration_tests.yml index 0787b8a3f2c..143cfe40973 100644 --- a/.github/workflows/pr_local_integration_tests.yml +++ b/.github/workflows/pr_local_integration_tests.yml @@ -42,6 +42,13 @@ jobs: enable-cache: true - name: Install dependencies run: make install-python-dependencies-ci + - name: Cache Hadoop tarball + uses: actions/cache@v4 + with: + path: ~/hadoop-3.4.2.tar.gz + key: hadoop-3.4.2 + - name: Install Hadoop dependencies + run: make install-hadoop-dependencies-ci - name: Test local integration tests if: ${{ always() }} # this will guarantee that step won't be canceled and resources won't leak run: make test-python-integration-local diff --git a/Makefile b/Makefile index e1d10404ded..e773cfd46e6 100644 --- a/Makefile +++ b/Makefile @@ -87,7 +87,19 @@ install-python-dependencies-ci: ## Install Python CI dependencies in system envi uv pip sync --system sdk/python/requirements/py$(PYTHON_VERSION)-ci-requirements.txt uv pip install --system --no-deps -e . -# Used by multicloud/Dockerfile.dev +# Used in github actions/ci +install-hadoop-dependencies-ci: ## Install Hadoop dependencies + @if [ ! -f $$HOME/hadoop-3.4.2.tar.gz ]; then \ + echo "Downloading Hadoop tarball..."; \ + wget -q https://dlcdn.apache.org/hadoop/common/hadoop-3.4.2/hadoop-3.4.2.tar.gz -O $$HOME/hadoop-3.4.2.tar.gz; \ + else \ + echo "Using cached Hadoop tarball"; \ + fi + @if [ ! -d $$HOME/hadoop ]; then \ + echo "Extracting Hadoop tarball..."; \ + tar -xzf $$HOME/hadoop-3.4.2.tar.gz -C $$HOME; \ + mv $$HOME/hadoop-3.4.2 $$HOME/hadoop; \ + fi install-python-ci-dependencies: ## Install Python CI dependencies in system environment using piptools python -m piptools sync sdk/python/requirements/py$(PYTHON_VERSION)-ci-requirements.txt pip install --no-deps -e . @@ -146,6 +158,9 @@ test-python-integration: ## Run Python integration tests (CI) test-python-integration-local: ## Run Python integration tests (local dev mode) FEAST_IS_LOCAL_TEST=True \ FEAST_LOCAL_ONLINE_CONTAINER=True \ + HADOOP_HOME=$$HOME/hadoop \ + CLASSPATH="$$( $$HADOOP_HOME/bin/hadoop classpath --glob ):$$CLASSPATH" \ + HADOOP_USER_NAME=root \ python -m pytest --tb=short -v -n 8 --color=yes --integration --durations=10 --timeout=1200 --timeout_method=thread --dist loadgroup \ -k "not test_lambda_materialization and not test_snowflake_materialization" \ -m "not rbac_remote_integration_test" \ diff --git a/docs/reference/registries/README.md b/docs/reference/registries/README.md index ac0f58e6135..01671cf2212 100644 --- a/docs/reference/registries/README.md +++ b/docs/reference/registries/README.md @@ -26,6 +26,10 @@ Please see [Registry](../../getting-started/components/registry.md) for a concep [snowflake.md](snowflake.md) {% endcontent-ref %} +{% content-ref url="hdfs.md" %} +[hdfs.md](hdfs.md) +{% endcontent-ref %} + {% content-ref url="remote.md" %} [remote.md](remote.md) {% endcontent-ref %} diff --git a/docs/reference/registries/hdfs.md b/docs/reference/registries/hdfs.md new file mode 100644 index 00000000000..c6f6b641aff --- /dev/null +++ b/docs/reference/registries/hdfs.md @@ -0,0 +1,42 @@ +# HDFS Registry + +## Description + +HDFS registry provides support for storing the protobuf representation of your feature store objects (data sources, feature views, feature services, etc.) in Hadoop Distributed File System (HDFS). + +While it can be used in production, there are still inherent limitations with a file-based registries, since changing a single field in the registry requires re-writing the whole registry file. With multiple concurrent writers, this presents a risk of data loss, or bottlenecks writes to the registry since all changes have to be serialized (e.g. when running materialization for multiple feature views or time ranges concurrently). + +### Pre-requisites + +The HDFS registry requires Hadoop 3.3+ to be installed and the `HADOOP_HOME` environment variable set. + +### Authentication and User Configuration + +The HDFS registry is using `pyarrow.fs.HadoopFileSystem` and **does not** support specifying HDFS users or Kerberos credentials directly in the `feature_store.yaml` configuration. It relies entirely on the Hadoop and system environment configuration available to the process running Feast. + +By default, `pyarrow.fs.HadoopFileSystem` inherits authentication from the underlying Hadoop client libraries and environment variables, such as: + +- `HADOOP_USER_NAME` +- `KRB5CCNAME` +- `hadoop.security.authentication` +- Any other relevant properties in `core-site.xml` and `hdfs-site.xml` + +For more information, refer to: +- [pyarrow.fs.HadoopFileSystem API Reference](https://arrow.apache.org/docs/python/generated/pyarrow.fs.HadoopFileSystem.html) +- [Hadoop Security: Simple & Kerberos Authentication](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SecureMode.html) + +## Example + +An example of how to configure this would be: + +{% code title="feature_store.yaml" %} +```yaml +project: feast_hdfs +registry: + path: hdfs://[YOUR NAMENODE HOST]:[YOUR NAMENODE PORT]/[PATH TO REGISTRY]/registry.pb + cache_ttl_seconds: 60 +online_store: null +offline_store: null +``` +{% endcode %} + diff --git a/sdk/python/feast/infra/registry/contrib/hdfs/__init__.py b/sdk/python/feast/infra/registry/contrib/hdfs/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sdk/python/feast/infra/registry/contrib/hdfs/hdfs_registry_store.py b/sdk/python/feast/infra/registry/contrib/hdfs/hdfs_registry_store.py new file mode 100644 index 00000000000..f4c4193d569 --- /dev/null +++ b/sdk/python/feast/infra/registry/contrib/hdfs/hdfs_registry_store.py @@ -0,0 +1,121 @@ +import json +import uuid +from pathlib import Path, PurePosixPath +from typing import Optional +from urllib.parse import urlparse + +from pyarrow import fs + +from feast.infra.registry.registry_store import RegistryStore +from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto +from feast.repo_config import RegistryConfig +from feast.utils import _utc_now + + +class HDFSRegistryStore(RegistryStore): + """HDFS implementation of RegistryStore. + registryConfig.path should be a hdfs path like hdfs://namenode:8020/path/to/registry.db + """ + + def __init__(self, registry_config: RegistryConfig, repo_path: Path): + try: + from pyarrow.fs import HadoopFileSystem + except ImportError as e: + from feast.errors import FeastExtrasDependencyImportError + + raise FeastExtrasDependencyImportError( + "pyarrow.fs.HadoopFileSystem", str(e) + ) + uri = registry_config.path + self._uri = urlparse(uri) + if self._uri.scheme != "hdfs": + raise ValueError( + f"Unsupported scheme {self._uri.scheme} in HDFS path {uri}" + ) + self._hdfs = HadoopFileSystem(self._uri.hostname, self._uri.port or 8020) + self._path = PurePosixPath(self._uri.path) + + def get_registry_proto(self): + registry_proto = RegistryProto() + if _check_hdfs_path_exists(self._hdfs, str(self._path)): + with self._hdfs.open_input_file(str(self._path)) as f: + registry_proto.ParseFromString(f.read()) + return registry_proto + raise FileNotFoundError( + f'Registry not found at path "{self._uri.geturl()}". Have you run "feast apply"?' + ) + + def update_registry_proto(self, registry_proto: RegistryProto): + self._write_registry(registry_proto) + + def teardown(self): + if _check_hdfs_path_exists(self._hdfs, str(self._path)): + self._hdfs.delete_file(str(self._path)) + else: + # Nothing to do + pass + + def _write_registry(self, registry_proto: RegistryProto): + """Write registry protobuf to HDFS.""" + registry_proto.version_id = str(uuid.uuid4()) + registry_proto.last_updated.FromDatetime(_utc_now()) + + dir_path = self._path.parent + if not _check_hdfs_path_exists(self._hdfs, str(dir_path)): + self._hdfs.create_dir(str(dir_path), recursive=True) + + with self._hdfs.open_output_stream(str(self._path)) as f: + f.write(registry_proto.SerializeToString()) + + def set_project_metadata(self, project: str, key: str, value: str): + """Set a custom project metadata key-value pair in the registry (HDFS backend).""" + registry_proto = self.get_registry_proto() + found = False + + for pm in registry_proto.project_metadata: + if pm.project == project: + # Load JSON metadata from project_uuid + try: + meta = json.loads(pm.project_uuid) if pm.project_uuid else {} + except Exception: + meta = {} + + if not isinstance(meta, dict): + meta = {} + + meta[key] = value + pm.project_uuid = json.dumps(meta) + found = True + break + + if not found: + # Create new ProjectMetadata entry + from feast.project_metadata import ProjectMetadata + + pm = ProjectMetadata(project_name=project) + pm.project_uuid = json.dumps({key: value}) + registry_proto.project_metadata.append(pm.to_proto()) + + # Write back + self.update_registry_proto(registry_proto) + + def get_project_metadata(self, project: str, key: str) -> Optional[str]: + """Get custom project metadata key from registry (HDFS backend).""" + registry_proto = self.get_registry_proto() + + for pm in registry_proto.project_metadata: + if pm.project == project: + try: + meta = json.loads(pm.project_uuid) if pm.project_uuid else {} + except Exception: + meta = {} + + if not isinstance(meta, dict): + return None + return meta.get(key, None) + return None + + +def _check_hdfs_path_exists(hdfs, path: str) -> bool: + info = hdfs.get_file_info([path])[0] + return info.type != fs.FileType.NotFound diff --git a/sdk/python/feast/infra/registry/registry.py b/sdk/python/feast/infra/registry/registry.py index 38f79f84e7a..9a021744dd1 100644 --- a/sdk/python/feast/infra/registry/registry.py +++ b/sdk/python/feast/infra/registry/registry.py @@ -62,12 +62,14 @@ "S3RegistryStore": "feast.infra.registry.s3.S3RegistryStore", "FileRegistryStore": "feast.infra.registry.file.FileRegistryStore", "AzureRegistryStore": "feast.infra.registry.contrib.azure.azure_registry_store.AzBlobRegistryStore", + "HDFSRegistryStore": "feast.infra.registry.contrib.hdfs.hdfs_registry_store.HDFSRegistryStore", } REGISTRY_STORE_CLASS_FOR_SCHEME = { "gs": "GCSRegistryStore", "s3": "S3RegistryStore", "file": "FileRegistryStore", + "hdfs": "HDFSRegistryStore", "": "FileRegistryStore", } @@ -143,7 +145,7 @@ def get_registry_store_class_from_scheme(registry_path: str): if uri.scheme not in REGISTRY_STORE_CLASS_FOR_SCHEME: raise Exception( f"Registry path {registry_path} has unsupported scheme {uri.scheme}. " - f"Supported schemes are file, s3 and gs." + f"Supported schemes are file, s3, gs and hdfs." ) else: registry_store_type = REGISTRY_STORE_CLASS_FOR_SCHEME[uri.scheme] diff --git a/sdk/python/tests/integration/registration/test_universal_registry.py b/sdk/python/tests/integration/registration/test_universal_registry.py index eb663d8565a..29b31ef1b75 100644 --- a/sdk/python/tests/integration/registration/test_universal_registry.py +++ b/sdk/python/tests/integration/registration/test_universal_registry.py @@ -22,8 +22,12 @@ import grpc_testing import pandas as pd +import pyarrow.fs as fs import pytest from pytest_lazyfixture import lazy_fixture +from testcontainers.core.container import DockerContainer +from testcontainers.core.network import Network +from testcontainers.core.waiting_utils import wait_for_logs from testcontainers.mysql import MySqlContainer from testcontainers.postgres import PostgresContainer @@ -280,6 +284,60 @@ def sqlite_registry(): yield SqlRegistry(registry_config, "project", None) +@pytest.fixture(scope="function") +def hdfs_registry(): + HADOOP_NAMENODE_IMAGE = "bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8" + HADOOP_DATANODE_IMAGE = "bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8" + HDFS_CLUSTER_NAME = "feast-hdfs-cluster" + HADOOP_NAMENODE_WAIT_LOG = "namenode.NameNode: NameNode RPC up" + HADOOP_DATANODE_WAIT_LOG = "datanode.DataNode: .*successfully registered with NN" + with Network() as network: + namenode = None + datanode = None + + try: + namenode = ( + DockerContainer(HADOOP_NAMENODE_IMAGE) + .with_network(network) + .with_env("CLUSTER_NAME", HDFS_CLUSTER_NAME) + .with_exposed_ports(8020) + .with_network_aliases("namenode") + .with_kwargs(hostname="namenode") + .start() + ) + wait_for_logs(namenode, HADOOP_NAMENODE_WAIT_LOG, timeout=120) + namenode_ip = namenode.get_container_host_ip() + namenode_port = int(namenode.get_exposed_port(8020)) + + datanode = ( + DockerContainer(HADOOP_DATANODE_IMAGE) + .with_network(network) + .with_exposed_ports(9867) + .with_env("CLUSTER_NAME", HDFS_CLUSTER_NAME) + .with_env("CORE_CONF_fs_defaultFS", "hdfs://namenode:8020") + .with_network_aliases("datanode") + .with_kwargs(hostname="datanode") + .start() + ) + + wait_for_logs(datanode, HADOOP_DATANODE_WAIT_LOG, timeout=120) + + hdfs = fs.HadoopFileSystem(host=namenode_ip, port=namenode_port) + hdfs.create_dir("/feast") + registry_path = f"hdfs://{namenode_ip}:{namenode_port}/feast/registry.db" + with hdfs.open_output_stream(registry_path) as f: + f.write(b"") + + registry_config = RegistryConfig(path=registry_path, cache_ttl_seconds=600) + reg = Registry("project", registry_config, None) + yield reg + finally: + if datanode: + datanode.stop() + if namenode: + namenode.stop() + + class GrpcMockChannel: def __init__(self, service, servicer): self.service = service @@ -350,6 +408,10 @@ def mock_remote_registry(): lazy_fixture("mock_remote_registry"), marks=pytest.mark.rbac_remote_integration_test, ), + pytest.param( + lazy_fixture("hdfs_registry"), + marks=pytest.mark.xdist_group(name="hdfs_registry"), + ), ] sql_fixtures = [