|
26 | 26 | import pytest |
27 | 27 | from pytest_lazyfixture import lazy_fixture |
28 | 28 | from testcontainers.core.container import DockerContainer |
| 29 | +from testcontainers.core.network import Network |
| 30 | +from testcontainers.core.waiting_utils import wait_for_logs |
29 | 31 | from testcontainers.mysql import MySqlContainer |
30 | 32 | from testcontainers.postgres import PostgresContainer |
31 | 33 |
|
@@ -282,27 +284,58 @@ def sqlite_registry(): |
282 | 284 | yield SqlRegistry(registry_config, "project", None) |
283 | 285 |
|
284 | 286 |
|
285 | | -HDFS_IMAGE = "bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8" |
286 | | -HDFS_PORT = 8020 |
287 | | - |
288 | | - |
289 | 287 | @pytest.fixture(scope="function") |
290 | 288 | def hdfs_registry(): |
291 | | - hdfs_image = HDFS_IMAGE |
292 | | - with DockerContainer(hdfs_image) as hdfs_container: |
293 | | - host = hdfs_container.get_container_host_ip() |
294 | | - port = HDFS_PORT |
295 | | - hdfs = fs.HadoopFileSystem(host=host, port=port) |
296 | | - hdfs.create_dir("/feast") |
| 289 | + HADOOP_NAMENODE_IMAGE = "bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8" |
| 290 | + HADOOP_DATANODE_IMAGE = "bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8" |
| 291 | + HDFS_CLUSTER_NAME = "feast-hdfs-cluster" |
| 292 | + HADOOP_NAMENODE_WAIT_LOG = "ipc.Server: IPC Server listener on 8020: starting" |
| 293 | + HADOOP_DATANODE_WAIT_LOG = "ipc.Server: IPC Server listener on 9867: starting" |
| 294 | + with Network() as network: |
| 295 | + namenode = None |
| 296 | + datanode = None |
| 297 | + |
| 298 | + try: |
| 299 | + namenode = ( |
| 300 | + DockerContainer(HADOOP_NAMENODE_IMAGE) |
| 301 | + .with_network(network) |
| 302 | + .with_env("CLUSTER_NAME", HDFS_CLUSTER_NAME) |
| 303 | + .with_exposed_ports(8020) |
| 304 | + .with_network_aliases("namenode") |
| 305 | + .with_kwargs(hostname="namenode") |
| 306 | + .start() |
| 307 | + ) |
| 308 | + wait_for_logs(namenode, HADOOP_NAMENODE_WAIT_LOG, timeout=120) |
| 309 | + namenode_ip = namenode.get_container_host_ip() |
| 310 | + namenode_port = int(namenode.get_exposed_port(8020)) |
| 311 | + |
| 312 | + datanode = ( |
| 313 | + DockerContainer(HADOOP_DATANODE_IMAGE) |
| 314 | + .with_network(network) |
| 315 | + .with_exposed_ports(9867) |
| 316 | + .with_env("CLUSTER_NAME", HDFS_CLUSTER_NAME) |
| 317 | + .with_env("CORE_CONF_fs_defaultFS", "hdfs://namenode:8020") |
| 318 | + .with_network_aliases("datanode") |
| 319 | + .with_kwargs(hostname="datanode") |
| 320 | + .start() |
| 321 | + ) |
297 | 322 |
|
298 | | - registry_path = f"hdfs://{host}:{port}/feast/registry.db" |
299 | | - with hdfs.open_output_stream(registry_path) as f: |
300 | | - f.write(b"") |
| 323 | + wait_for_logs(datanode, HADOOP_DATANODE_WAIT_LOG, timeout=120) |
301 | 324 |
|
302 | | - registry_config = RegistryConfig(path=registry_path, cache_ttl_seconds=600) |
303 | | - reg = Registry("project", registry_config, None) |
| 325 | + hdfs = fs.HadoopFileSystem(host=namenode_ip, port=namenode_port) |
| 326 | + hdfs.create_dir("/feast") |
| 327 | + registry_path = f"hdfs://{namenode_ip}:{namenode_port}/feast/registry.db" |
| 328 | + with hdfs.open_output_stream(registry_path) as f: |
| 329 | + f.write(b"") |
304 | 330 |
|
305 | | - yield reg |
| 331 | + registry_config = RegistryConfig(path=registry_path, cache_ttl_seconds=600) |
| 332 | + reg = Registry("project", registry_config, None) |
| 333 | + yield reg |
| 334 | + finally: |
| 335 | + if datanode: |
| 336 | + datanode.stop() |
| 337 | + if namenode: |
| 338 | + namenode.stop() |
306 | 339 |
|
307 | 340 |
|
308 | 341 | class GrpcMockChannel: |
|
0 commit comments