Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion packages/google-auth/google/auth/_agent_identity_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ def get_and_parse_agent_identity_certificate():
if is_opted_out:
return None

# Respect explicit opt-out of mTLS / client certs
use_client_cert = os.environ.get(environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE)
if use_client_cert and use_client_cert.lower() == "false":
return None
Comment on lines +205 to +207

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If GOOGLE_API_USE_CLIENT_CERTIFICATE is set to an invalid value (e.g., 'foo'), check_use_client_cert() will treat it as False (disabled). However, the current check only returns None if it is explicitly 'false'. To ensure perfect alignment with check_use_client_cert(), we should treat any value other than 'true' as an opt-out/disabled state when the environment variable is set.

Suggested change
use_client_cert = os.environ.get(environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE)
if use_client_cert and use_client_cert.lower() == "false":
return None
use_client_cert = os.environ.get(environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE)
if use_client_cert is not None and use_client_cert.lower() != "true":
return None


cert_path = get_agent_identity_certificate_path()
if not cert_path:
return None
Expand Down Expand Up @@ -312,7 +317,24 @@ def should_request_bound_token(cert):
).lower()
== "true"
)
return is_agent_cert and is_opted_in
if not (is_agent_cert and is_opted_in):
return False

# Respect explicit opt-out of mTLS / client certs
use_client_cert = os.environ.get(environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE)
if use_client_cert and use_client_cert.lower() == "false":
return False
Comment on lines +324 to +326

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to the check in get_and_parse_agent_identity_certificate, if GOOGLE_API_USE_CLIENT_CERTIFICATE is set to an invalid value, it should be treated as disabled to prevent mismatched bound tokens. We should check if the variable is set and is not 'true'.

Suggested change
use_client_cert = os.environ.get(environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE)
if use_client_cert and use_client_cert.lower() == "false":
return False
use_client_cert = os.environ.get(environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE)
if use_client_cert is not None and use_client_cert.lower() != "true":
return False


# Graceful degradation for auto-enablement:
# If GOOGLE_API_USE_CLIENT_CERTIFICATE is not set, we only request a bound token
# if the transport is capable of handling it.
if use_client_cert is None or use_client_cert == "":
from google.auth.transport import _mtls_helper

if not _mtls_helper.is_transport_mtls_capable():
return False

return True


def get_cached_cert_fingerprint(cached_cert):
Expand Down
27 changes: 21 additions & 6 deletions packages/google-auth/google/auth/aio/transport/sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,11 @@ def __init__(
async def configure_mtls_channel(self, client_cert_callback=None):
"""Configure the client certificate and key for SSL connection.

The function does nothing unless `GOOGLE_API_USE_CLIENT_CERTIFICATE` is
explicitly set to `true`. In this case if client certificate and key are
successfully obtained (from the given client_cert_callback or from application
default SSL credentials), the underlying transport will be reconfigured
to use mTLS.
This method configures mTLS if client certificates are explicitly enabled
(via GOOGLE_API_USE_CLIENT_CERTIFICATE=true) or auto-enabled (when the env
variable is unset and workload certificates are discovered). In these cases,
the underlying transport will be reconfigured to use mTLS.

Note: This function does nothing if the `aiohttp` library is not
installed.
Important: Calling this method will close any ongoing API requests associated
Expand All @@ -170,7 +170,8 @@ async def configure_mtls_channel(self, client_cert_callback=None):

Raises:
google.auth.exceptions.MutualTLSChannelError: If mutual TLS channel
creation failed for any reason.
creation failed for any reason (e.g., missing dependencies, custom request
handler limitations, or missing certificates when mTLS was requested).
"""
if self._mtls_init_task is None:

Expand Down Expand Up @@ -207,6 +208,20 @@ async def _do_configure():
self._auth_request = AiohttpRequest(session=new_session)

await old_auth_request.close()
else:
self._is_mtls = False
# If a custom request handler was provided, the library cannot configure
# the connection to use mTLS. Raise an error instead of silently falling back.
raise exceptions.MutualTLSChannelError(
"mTLS was configured/enabled but client certificate or private key could not be loaded."
)
Comment on lines +215 to +217

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The exception message here is misleading. If this block is reached, it means a custom request handler (not an AiohttpRequest) was provided, which cannot be configured for mTLS. The current message states that the client certificate or private key could not be loaded, which will confuse users trying to debug their configuration.

Suggested change
raise exceptions.MutualTLSChannelError(
"mTLS was configured/enabled but client certificate or private key could not be loaded."
)
raise exceptions.MutualTLSChannelError(
"mTLS was configured/enabled but a custom request handler was provided, which is not supported for mTLS configuration."
)

else:
# If mTLS is configured or intended, but we fail to find client certificates,
# we must fail fast by raising an error instead of silently falling back to
# standard TLS.
raise exceptions.MutualTLSChannelError(
"mTLS was configured/enabled but client certificate or private key could not be loaded."
)

except (
exceptions.ClientCertError,
Expand Down
92 changes: 62 additions & 30 deletions packages/google-auth/google/auth/transport/_mtls_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@

# Default gcloud config path, to be used with path.expanduser for cross-platform compatibility.
CERTIFICATE_CONFIGURATION_DEFAULT_PATH = "~/.config/gcloud/certificate_config.json"
_WELL_KNOWN_SPIFFE_CERT_PATH = (
"/var/run/secrets/workload-spiffe-credentials/certificates.pem"
)
_CERT_PROVIDER_COMMAND = "cert_provider_command"
_CERT_REGEX = re.compile(
b"-----BEGIN CERTIFICATE-----.+-----END CERTIFICATE-----\r?\n?", re.DOTALL
Expand Down Expand Up @@ -200,12 +203,13 @@ def _get_workload_cert_and_key_paths(config_path, include_context_aware=True):
return None, None
workload = cert_configs["workload"]

if "cert_path" not in workload:
return None, None
if "cert_path" not in workload or "key_path" not in workload:
raise exceptions.ClientCertError(
'Workload certificate configuration is missing "cert_path" or "key_path" in {}'.format(
absolute_path
)
)
cert_path = workload["cert_path"]

if "key_path" not in workload:
return None, None
key_path = workload["key_path"]

# == BEGIN Temporary Cloud Run PATCH ==
Expand Down Expand Up @@ -448,16 +452,31 @@ def client_cert_callback():
return crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)


def is_transport_mtls_capable():
"""Returns True if the required dependencies for establishing an mTLS connection are present."""
try:
import OpenSSL
from cryptography import x509

return True
except ImportError:
return False


def check_use_client_cert():
"""Returns boolean for whether the client certificate should be used for mTLS.

If GOOGLE_API_USE_CLIENT_CERTIFICATE is set to true or false, a corresponding
bool value will be returned. If the value is set to an unexpected string, it
will default to False.
If GOOGLE_API_USE_CLIENT_CERTIFICATE is unset, the value will be inferred
by reading a file pointed at by GOOGLE_API_CERTIFICATE_CONFIG, and verifying
it contains a "workload" section. If so, the function will return True,
otherwise False.
as True (auto-enabled) if the required transport libraries (pyOpenSSL and cryptography)
are installed AND either:
1. A workload config file exists (pointed at by GOOGLE_API_CERTIFICATE_CONFIG)
containing a "workload" section.
2. A certificate file exists at the well-known SPIFFE path
(/var/run/secrets/workload-spiffe-credentials/certificates.pem).
Otherwise, it returns False.

Returns:
bool: Whether the client certificate should be used for mTLS connection.
Expand All @@ -471,31 +490,44 @@ def check_use_client_cert():
# Check if the value of GOOGLE_API_USE_CLIENT_CERTIFICATE is set.
if use_client_cert:
return use_client_cert.lower() == "true"
else:
# Check if the value of GOOGLE_API_CERTIFICATE_CONFIG is set.
cert_path = getenv(environment_vars.GOOGLE_API_CERTIFICATE_CONFIG)
if cert_path is None:
cert_path = getenv(
environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH
)

if cert_path:
try:
with open(cert_path, "r") as f:
content = json.load(f)
# verify json has workload key
content["cert_configs"]["workload"]
return True
except (
FileNotFoundError,
OSError,
KeyError,
TypeError,
json.JSONDecodeError,
) as e:
_LOGGER.debug("error decoding certificate: %s", e)
# Auto-enablement checks (when GOOGLE_API_USE_CLIENT_CERTIFICATE is not set)
# Gracefully degrade to standard TLS if transport capabilities are missing
if not is_transport_mtls_capable():
return False

# Check if the value of GOOGLE_API_CERTIFICATE_CONFIG is set.
cert_path = getenv(environment_vars.GOOGLE_API_CERTIFICATE_CONFIG)
if cert_path is None:
cert_path = getenv(
environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH
)

if cert_path:
try:
with open(cert_path, "r") as f:
content = json.load(f)
# verify json has workload key
content["cert_configs"]["workload"]
return True
except (
FileNotFoundError,
OSError,
KeyError,
TypeError,
json.JSONDecodeError,
) as e:
_LOGGER.debug("error decoding certificate: %s", e)

# Fallback check for well-known SPIFFE path (agent identity)
if (
path.exists(_WELL_KNOWN_SPIFFE_CERT_PATH)
and path.getsize(_WELL_KNOWN_SPIFFE_CERT_PATH) > 0
):
return True
Comment on lines +523 to +527

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using path.exists can return True for directories, and calling path.getsize on a directory or a file without read permissions can raise an OSError (such as PermissionError). To make this auto-detection robust and prevent unexpected crashes on startup, we should use path.isfile and wrap the check in a try-except OSError block.

Suggested change
if (
path.exists(_WELL_KNOWN_SPIFFE_CERT_PATH)
and path.getsize(_WELL_KNOWN_SPIFFE_CERT_PATH) > 0
):
return True
try:
if (
path.isfile(_WELL_KNOWN_SPIFFE_CERT_PATH)
and path.getsize(_WELL_KNOWN_SPIFFE_CERT_PATH) > 0
):
return True
except OSError:
pass


return False


def check_parameters_for_unauthorized_response(cached_cert):
"""Returns the cached and current cert fingerprint for reconfiguring mTLS.
Expand Down
20 changes: 14 additions & 6 deletions packages/google-auth/google/auth/transport/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,11 +428,11 @@ def __init__(
def configure_mtls_channel(self, client_cert_callback=None):
"""Configure the client certificate and key for SSL connection.

The function does nothing unless `GOOGLE_API_USE_CLIENT_CERTIFICATE` is
explicitly set to `true`. In this case if client certificate and key are
successfully obtained (from the given client_cert_callback or from application
default SSL credentials), a :class:`_MutualTlsAdapter` instance will be mounted
to "https://" prefix.
This method configures mTLS if client certificates are explicitly enabled
(via GOOGLE_API_USE_CLIENT_CERTIFICATE=true) or auto-enabled (when the env
variable is unset and workload certificates are discovered). In these cases,
if the client certificate and key are successfully obtained, a
:class:`_MutualTlsAdapter` instance will be mounted to the "https://" prefix.

Args:
client_cert_callback (Optional[Callable[[], (bytes, bytes)]]):
Expand All @@ -443,7 +443,8 @@ def configure_mtls_channel(self, client_cert_callback=None):

Raises:
google.auth.exceptions.MutualTLSChannelError: If mutual TLS channel
creation failed for any reason.
creation failed for any reason (e.g. missing dependencies, missing
certificates when explicitly requested, or custom request adapter issues).
"""
use_client_cert = google.auth.transport._mtls_helper.check_use_client_cert()
if not use_client_cert:
Expand All @@ -468,6 +469,13 @@ def configure_mtls_channel(self, client_cert_callback=None):
mtls_adapter = _MutualTlsAdapter(cert, key)
self._cached_cert = cert
self.mount("https://", mtls_adapter)
else:
# If mTLS is configured or intended, but we fail to find client certificates,
# we must fail fast by raising an error instead of silently falling back to
# standard TLS.
raise exceptions.MutualTLSChannelError(
"mTLS channel configuration failed because no client certificates were found."
)
except (
exceptions.ClientCertError,
ImportError,
Expand Down
23 changes: 14 additions & 9 deletions packages/google-auth/google/auth/transport/urllib3.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,13 +313,12 @@ def __init__(

def configure_mtls_channel(self, client_cert_callback=None):
"""Configures mutual TLS channel using the given client_cert_callback or
application default SSL credentials. The behavior is controlled by
`GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable.
(1) If the environment variable value is `true`, the function returns True
if the channel is mutual TLS and False otherwise. The `http` provided
in the constructor will be overwritten.
(2) If the environment variable is not set or `false`, the function does
nothing and it always return False.
application default SSL credentials.

The channel is configured if GOOGLE_API_USE_CLIENT_CERTIFICATE is "true",
or if it is unset and workload certificates are detected in the environment.
If client_cert_callback is None, default SSL credentials (workload or SecureConnect)
are loaded.

Args:
client_cert_callback (Optional[Callable[[], (bytes, bytes)]]):
Expand All @@ -333,7 +332,8 @@ def configure_mtls_channel(self, client_cert_callback=None):

Raises:
google.auth.exceptions.MutualTLSChannelError: If mutual TLS channel
creation failed for any reason.
creation failed for any reason (e.g., missing dependencies, or missing
certificates when mTLS was explicitly/implicitly requested).
"""
use_client_cert = transport._mtls_helper.check_use_client_cert()
if not use_client_cert:
Expand All @@ -356,7 +356,12 @@ def configure_mtls_channel(self, client_cert_callback=None):
self.http = _make_mutual_tls_http(cert, key)
self._cached_cert = cert
else:
self.http = _make_default_http()
# If mTLS is configured or intended, but we fail to find client certificates,
# we must fail fast by raising an error instead of silently falling back to
# standard TLS.
raise exceptions.MutualTLSChannelError(
"mTLS channel configuration failed because no client certificates were found."
)
except (
exceptions.ClientCertError,
ImportError,
Expand Down
51 changes: 51 additions & 0 deletions packages/google-auth/tests/test_agent_identity_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,45 @@ def test_should_request_bound_token(self, mock_is_agent, monkeypatch):
)
assert not _agent_identity_utils.should_request_bound_token(mock.sentinel.cert)

@mock.patch("google.auth._agent_identity_utils._is_agent_identity_certificate")
@mock.patch("google.auth.transport._mtls_helper.is_transport_mtls_capable")
def test_should_request_bound_token_explicit_use_client_cert_false(
self, mock_capable, mock_is_agent, monkeypatch
):
mock_is_agent.return_value = True
mock_capable.return_value = True
monkeypatch.setenv(
environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE,
"false",
)
assert not _agent_identity_utils.should_request_bound_token(mock.sentinel.cert)

@mock.patch("google.auth._agent_identity_utils._is_agent_identity_certificate")
@mock.patch("google.auth.transport._mtls_helper.is_transport_mtls_capable")
def test_should_request_bound_token_auto_enablement_capable(
self, mock_capable, mock_is_agent, monkeypatch
):
mock_is_agent.return_value = True
mock_capable.return_value = True
monkeypatch.delenv(
environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE,
raising=False,
)
assert _agent_identity_utils.should_request_bound_token(mock.sentinel.cert)

@mock.patch("google.auth._agent_identity_utils._is_agent_identity_certificate")
@mock.patch("google.auth.transport._mtls_helper.is_transport_mtls_capable")
def test_should_request_bound_token_auto_enablement_incapable(
self, mock_capable, mock_is_agent, monkeypatch
):
mock_is_agent.return_value = True
mock_capable.return_value = False
monkeypatch.delenv(
environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE,
raising=False,
)
assert not _agent_identity_utils.should_request_bound_token(mock.sentinel.cert)

def test_get_agent_identity_certificate_path_success(self, tmpdir, monkeypatch):
cert_path = tmpdir.join("cert.pem")
cert_path.write("cert_content")
Expand Down Expand Up @@ -439,6 +478,18 @@ def test_get_and_parse_agent_identity_certificate_success(
mock_parse_certificate.assert_called_once_with(b"cert_bytes")
assert result == mock_parse_certificate.return_value

@mock.patch("google.auth._agent_identity_utils.get_agent_identity_certificate_path")
def test_get_and_parse_agent_identity_certificate_use_client_cert_false(
self, mock_get_path, monkeypatch
):
monkeypatch.setenv(
environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE,
"false",
)
result = _agent_identity_utils.get_and_parse_agent_identity_certificate()
assert result is None
mock_get_path.assert_not_called()

def test_get_cached_cert_fingerprint_no_cert(self):
with pytest.raises(ValueError, match="mTLS connection is not configured."):
_agent_identity_utils.get_cached_cert_fingerprint(None)
Expand Down
Loading
Loading