Skip to content

Commit b77ea6b

Browse files
committed
fix(): change hdfs remove api and fix hdfs registry test
Signed-off-by: Chimey Rock <trinhvanthoai99@gmail.com>
1 parent 943b2d3 commit b77ea6b

File tree

2 files changed

+65
-26
lines changed

2 files changed

+65
-26
lines changed

sdk/python/feast/infra/registry/contrib/hdfs/hdfs_registry_store.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
from typing import Optional
55
from urllib.parse import urlparse
66

7+
from pyarrow import fs
8+
79
from feast.infra.registry.registry_store import RegistryStore
810
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
911
from feast.repo_config import RegistryConfig
@@ -35,8 +37,8 @@ def __init__(self, registry_config: RegistryConfig, repo_path: Path):
3537

3638
def get_registry_proto(self):
3739
registry_proto = RegistryProto()
38-
if self._hdfs.exists(str(self._path)):
39-
with self._hdfs.open(str(self._path), "rb") as f:
40+
if _check_hdfs_path_exists(self._hdfs, str(self._path)):
41+
with self._hdfs.open_input_file(str(self._path)) as f:
4042
registry_proto.ParseFromString(f.read())
4143
return registry_proto
4244
raise FileNotFoundError(
@@ -47,11 +49,10 @@ def update_registry_proto(self, registry_proto: RegistryProto):
4749
self._write_registry(registry_proto)
4850

4951
def teardown(self):
50-
try:
51-
self._hdfs.delete(str(self._path))
52-
except FileNotFoundError:
53-
# If the file deletion fails with FileNotFoundError, the file has already
54-
# been deleted.
52+
if _check_hdfs_path_exists(self._hdfs, str(self._path)):
53+
self._hdfs.delete_file(str(self._path))
54+
else:
55+
# Nothing to do
5556
pass
5657

5758
def _write_registry(self, registry_proto: RegistryProto):
@@ -60,10 +61,10 @@ def _write_registry(self, registry_proto: RegistryProto):
6061
registry_proto.last_updated.FromDatetime(_utc_now())
6162

6263
dir_path = self._path.parent
63-
if not self._hdfs.exists(str(dir_path)):
64-
self._hdfs.mkdir(str(dir_path))
64+
if not _check_hdfs_path_exists(self._hdfs, str(dir_path)):
65+
self._hdfs.create_dir(str(dir_path), recursive=True)
6566

66-
with self._hdfs.open(str(self._path), "wb") as f:
67+
with self._hdfs.open_output_stream(str(self._path)) as f:
6768
f.write(registry_proto.SerializeToString())
6869

6970
def set_project_metadata(self, project: str, key: str, value: str):
@@ -113,3 +114,8 @@ def get_project_metadata(self, project: str, key: str) -> Optional[str]:
113114
return None
114115
return meta.get(key, None)
115116
return None
117+
118+
119+
def _check_hdfs_path_exists(hdfs, path: str) -> bool:
120+
info = hdfs.get_file_info([path])[0]
121+
return info.type != fs.FileType.NotFound

sdk/python/tests/integration/registration/test_universal_registry.py

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import pytest
2727
from pytest_lazyfixture import lazy_fixture
2828
from testcontainers.core.container import DockerContainer
29+
from testcontainers.core.network import Network
30+
from testcontainers.core.waiting_utils import wait_for_logs
2931
from testcontainers.mysql import MySqlContainer
3032
from testcontainers.postgres import PostgresContainer
3133

@@ -282,27 +284,58 @@ def sqlite_registry():
282284
yield SqlRegistry(registry_config, "project", None)
283285

284286

285-
HDFS_IMAGE = "bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8"
286-
HDFS_PORT = 8020
287-
288-
289287
@pytest.fixture(scope="function")
290288
def hdfs_registry():
291-
hdfs_image = HDFS_IMAGE
292-
with DockerContainer(hdfs_image) as hdfs_container:
293-
host = hdfs_container.get_container_host_ip()
294-
port = HDFS_PORT
295-
hdfs = fs.HadoopFileSystem(host=host, port=port)
296-
hdfs.create_dir("/feast")
289+
HADOOP_NAMENODE_IMAGE = "bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8"
290+
HADOOP_DATANODE_IMAGE = "bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8"
291+
HDFS_CLUSTER_NAME = "feast-hdfs-cluster"
292+
HADOOP_NAMENODE_WAIT_LOG = "ipc.Server: IPC Server listener on 8020: starting"
293+
HADOOP_DATANODE_WAIT_LOG = "ipc.Server: IPC Server listener on 9867: starting"
294+
with Network() as network:
295+
namenode = None
296+
datanode = None
297+
298+
try:
299+
namenode = (
300+
DockerContainer(HADOOP_NAMENODE_IMAGE)
301+
.with_network(network)
302+
.with_env("CLUSTER_NAME", HDFS_CLUSTER_NAME)
303+
.with_exposed_ports(8020)
304+
.with_network_aliases("namenode")
305+
.with_kwargs(hostname="namenode")
306+
.start()
307+
)
308+
wait_for_logs(namenode, HADOOP_NAMENODE_WAIT_LOG, timeout=120)
309+
namenode_ip = namenode.get_container_host_ip()
310+
namenode_port = int(namenode.get_exposed_port(8020))
311+
312+
datanode = (
313+
DockerContainer(HADOOP_DATANODE_IMAGE)
314+
.with_network(network)
315+
.with_exposed_ports(9867)
316+
.with_env("CLUSTER_NAME", HDFS_CLUSTER_NAME)
317+
.with_env("CORE_CONF_fs_defaultFS", "hdfs://namenode:8020")
318+
.with_network_aliases("datanode")
319+
.with_kwargs(hostname="datanode")
320+
.start()
321+
)
297322

298-
registry_path = f"hdfs://{host}:{port}/feast/registry.db"
299-
with hdfs.open_output_stream(registry_path) as f:
300-
f.write(b"")
323+
wait_for_logs(datanode, HADOOP_DATANODE_WAIT_LOG, timeout=120)
301324

302-
registry_config = RegistryConfig(path=registry_path, cache_ttl_seconds=600)
303-
reg = Registry("project", registry_config, None)
325+
hdfs = fs.HadoopFileSystem(host=namenode_ip, port=namenode_port)
326+
hdfs.create_dir("/feast")
327+
registry_path = f"hdfs://{namenode_ip}:{namenode_port}/feast/registry.db"
328+
with hdfs.open_output_stream(registry_path) as f:
329+
f.write(b"")
304330

305-
yield reg
331+
registry_config = RegistryConfig(path=registry_path, cache_ttl_seconds=600)
332+
reg = Registry("project", registry_config, None)
333+
yield reg
334+
finally:
335+
if datanode:
336+
datanode.stop()
337+
if namenode:
338+
namenode.stop()
306339

307340

308341
class GrpcMockChannel:

0 commit comments

Comments
 (0)