Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions packages/google-auth/google/auth/_agent_identity_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ def get_agent_identity_certificate_path():
if not cert_config_path and not has_well_known_dir:
return None

# If ECP config path is specified but does not exist, and we are on a workstation, fail-fast immediately.
if (
cert_config_path
and not has_well_known_dir
and not os.path.exists(cert_config_path)
):
return None

has_logged_config_warning = False
has_logged_cert_warning = False

Expand Down
35 changes: 33 additions & 2 deletions packages/google-auth/tests/test_agent_identity_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,29 +165,47 @@ def test_get_agent_identity_certificate_path_success(self, tmpdir, monkeypatch):
assert result == str(cert_path)

@mock.patch("time.sleep")
@mock.patch("google.auth._agent_identity_utils.os.path.exists")
def test_get_agent_identity_certificate_path_retry(
self, mock_sleep, tmpdir, monkeypatch
self, mock_exists, mock_sleep, tmpdir, monkeypatch
):
config_path = tmpdir.join("config.json")
monkeypatch.setenv(
environment_vars.GOOGLE_API_CERTIFICATE_CONFIG, str(config_path)
)

# Simulate workload env (well_known_dir exists) to avoid fail-fast
def exists_side_effect(path):
if path == "/var/run/secrets/workload-spiffe-credentials":
return True
return False

mock_exists.side_effect = exists_side_effect
Comment on lines +178 to +183
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The nested helper function exists_side_effect can be simplified to a concise lambda expression to improve readability and reduce boilerplate.

        mock_exists.side_effect = lambda path: path == "/var/run/secrets/workload-spiffe-credentials"


# File doesn't exist initially
with pytest.raises(exceptions.RefreshError):
_agent_identity_utils.get_agent_identity_certificate_path()

assert mock_sleep.call_count == len(_agent_identity_utils._POLLING_INTERVALS)

@mock.patch("time.sleep")
@mock.patch("google.auth._agent_identity_utils.os.path.exists")
def test_get_agent_identity_certificate_path_failure(
self, mock_sleep, tmpdir, monkeypatch
self, mock_exists, mock_sleep, tmpdir, monkeypatch
):
config_path = tmpdir.join("non_existent_config.json")
monkeypatch.setenv(
environment_vars.GOOGLE_API_CERTIFICATE_CONFIG, str(config_path)
)

# Simulate workload env (well_known_dir exists) to avoid fail-fast
def exists_side_effect(path):
if path == "/var/run/secrets/workload-spiffe-credentials":
return True
return False

mock_exists.side_effect = exists_side_effect
Comment on lines +202 to +207
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The nested helper function exists_side_effect can be simplified to a concise lambda expression to improve readability and reduce boilerplate.

        mock_exists.side_effect = lambda path: path == "/var/run/secrets/workload-spiffe-credentials"


with pytest.raises(exceptions.RefreshError) as excinfo:
_agent_identity_utils.get_agent_identity_certificate_path()

Expand All @@ -198,6 +216,19 @@ def test_get_agent_identity_certificate_path_failure(
)
assert mock_sleep.call_count == len(_agent_identity_utils._POLLING_INTERVALS)

def test_get_agent_identity_certificate_path_workstation_fail_fast(
self, tmpdir, monkeypatch
):
config_path = tmpdir.join("non_existent_config.json")
monkeypatch.setenv(
environment_vars.GOOGLE_API_CERTIFICATE_CONFIG, str(config_path)
)

# On a workstation, well_known_dir does not exist, and config file is missing.
# It should fail-fast and return None immediately.
result = _agent_identity_utils.get_agent_identity_certificate_path()
assert result is None

@mock.patch("time.sleep")
@mock.patch("os.path.exists")
def test_get_agent_identity_certificate_path_cert_not_found(
Expand Down
4 changes: 3 additions & 1 deletion packages/google-auth/tests/transport/test_mtls.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,10 @@ def test_default_client_encrypted_cert_source(
# Test good callback.
get_client_ssl_credentials.return_value = (True, b"cert", b"key", b"passphrase")
callback = mtls.default_client_encrypted_cert_source("cert_path", "key_path")
with mock.patch("{}.open".format(__name__), return_value=mock.MagicMock()):
with mock.patch("google.auth.transport.mtls.open", mock.mock_open()) as mock_file:
assert callback() == ("cert_path", "key_path", b"passphrase")
mock_file.assert_any_call("cert_path", "wb")
mock_file.assert_any_call("key_path", "wb")

# Test bad callback which throws exception.
get_client_ssl_credentials.side_effect = exceptions.ClientCertError()
Expand Down
Loading