Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,19 @@ dependencies = [
"opentelemetry-processor-baggage~=0.61b0",
"traceloop-sdk~=0.52.0",
"PyJWT~=2.10.1",
"protobuf>=4.25.0",
"protovalidate>=0.13.0",
"grpcio>=1.60.0",
"opentelemetry-api>=1.28.0",
"opentelemetry-sdk>=1.28.0",
]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["src/sap_cloud_sdk"]
packages = ["src/sap_cloud_sdk", "src/buf"]

[dependency-groups]
dev = [
Expand Down
Empty file added src/buf/__init__.py
Empty file.
Empty file added src/buf/validate/__init__.py
Empty file.
465 changes: 465 additions & 0 deletions src/buf/validate/validate_pb2.py

Large diffs are not rendered by default.

650 changes: 650 additions & 0 deletions src/buf/validate/validate_pb2.pyi

Large diffs are not rendered by default.

126 changes: 126 additions & 0 deletions src/sap_cloud_sdk/core/auditlog_ng/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""SAP Cloud SDK for Python - Audit Log NG (OTLP/gRPC) module
Comment thread
simeongelovski marked this conversation as resolved.

Sends audit log events as OpenTelemetry LogRecords over gRPC.
Supports mTLS (client certificates) and insecure (no-auth) modes.

The create_client() function accepts an AuditLogNGConfig and returns a
ready-to-use AuditClient.

Usage:
from sap_cloud_sdk.core.auditlog_ng import create_client, AuditLogNGConfig

config = AuditLogNGConfig(
endpoint="audit.example.com:443",
deployment_id="my-deployment",
namespace="namespace-123",
cert_file="client.pem",
key_file="client.key",
)
client = create_client(config=config)

# Send an audit event (protobuf message)
event_id = client.send(event, "DataAccess")
client.close()
"""

from typing import Optional

from sap_cloud_sdk.core.auditlog_ng.client import AuditClient
from sap_cloud_sdk.core.auditlog_ng.config import (
AuditLogNGConfig,
SCHEMA_URL,
validate_source_arg,

Check failure on line 32 in src/sap_cloud_sdk/core/auditlog_ng/__init__.py

View workflow job for this annotation

GitHub Actions / Code Quality Checks

ruff (F401)

src/sap_cloud_sdk/core/auditlog_ng/__init__.py:32:5: F401 `sap_cloud_sdk.core.auditlog_ng.config.validate_source_arg` imported but unused; consider removing, adding to `__all__`, or using a redundant alias help: Add unused import `validate_source_arg` to __all__
Comment thread
simeongelovski marked this conversation as resolved.
Outdated
)
from sap_cloud_sdk.core.auditlog_ng.exceptions import (
AuditLogNGError,
ClientCreationError,
TransportError,
Comment thread
simeongelovski marked this conversation as resolved.
Outdated
ValidationError,
)


def create_client(
*,
config: Optional[AuditLogNGConfig] = None,
endpoint: Optional[str] = None,
deployment_id: Optional[str] = None,
namespace: Optional[str] = None,
cert_file: Optional[str] = None,
key_file: Optional[str] = None,
ca_file: Optional[str] = None,
insecure: bool = False,
service_name: str = "audit-client",
batch: bool = False,
compression: bool = True,
schema_url: str = SCHEMA_URL,
) -> AuditClient:
"""Create an AuditClient for sending audit events over OTLP/gRPC.

Either pass a pre-built ``config`` **or** the individual keyword arguments.
When ``config`` is provided the remaining keyword arguments are ignored.

Args:
config: Optional explicit configuration. If provided, all other
keyword arguments are ignored.
endpoint: OTLP gRPC endpoint (``host:port``).
deployment_id: Deployment identifier.
namespace: Namespace identifier.
cert_file: Path to client certificate (PEM) for mTLS.
key_file: Path to client private key (PEM) for mTLS.
ca_file: Path to CA certificate (PEM) for server verification.
insecure: Use insecure connection (no TLS).
service_name: OpenTelemetry ``service.name`` resource attribute.
batch: Use batch processing (better throughput, slight delay).
compression: Enable gzip compression.
schema_url: OpenTelemetry schema URL for the logger.

Returns:
AuditClient: Configured client ready for audit operations.

Raises:
ClientCreationError: If client creation fails.
ValueError: If required parameters are missing.
"""
try:
if config is None:
if not endpoint or not deployment_id or not namespace:
raise ValueError(
"endpoint, deployment_id, and namespace are required "
"when config is not provided"
)
config = AuditLogNGConfig(
endpoint=endpoint,
deployment_id=deployment_id,
namespace=namespace,
cert_file=cert_file,
key_file=key_file,
ca_file=ca_file,
insecure=insecure,
service_name=service_name,
batch=batch,
compression=compression,
schema_url=schema_url,
)

return AuditClient(config)

except (ValueError, ValidationError) as e:
raise e
except Exception as e:
raise ClientCreationError(f"Failed to create audit log NG client: {e}") from e


__all__ = [
# Factory function
"create_client",
# Client
"AuditClient",
# Configuration
"AuditLogNGConfig",
"SCHEMA_URL",
Comment thread
simeongelovski marked this conversation as resolved.
Outdated
# Exceptions
"AuditLogNGError",
"ClientCreationError",
"TransportError",
Comment thread
simeongelovski marked this conversation as resolved.
Outdated
"ValidationError",
]
238 changes: 238 additions & 0 deletions src/sap_cloud_sdk/core/auditlog_ng/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
"""Audit Log OTLP Client.

Sends audit log events as OpenTelemetry LogRecords over gRPC.
Supports mTLS (client certificates) and insecure (no-auth) modes.
"""

import json
import uuid
from typing import Optional

import protovalidate
from protovalidate import ValidationError as ProtoValidationError

import grpc
from google.protobuf.message import Message
from google.protobuf.json_format import MessageToDict
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs.export import (
SimpleLogRecordProcessor,
BatchLogRecordProcessor,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry._logs.severity import SeverityNumber

from sap_cloud_sdk.core.auditlog_ng.config import (
AuditLogNGConfig,
validate_source_arg,
)
from sap_cloud_sdk.core.auditlog_ng.exceptions import (
TransportError,

Check failure on line 31 in src/sap_cloud_sdk/core/auditlog_ng/client.py

View workflow job for this annotation

GitHub Actions / Code Quality Checks

ruff (F401)

src/sap_cloud_sdk/core/auditlog_ng/client.py:31:5: F401 `sap_cloud_sdk.core.auditlog_ng.exceptions.TransportError` imported but unused help: Remove unused import: `sap_cloud_sdk.core.auditlog_ng.exceptions.TransportError`
ValidationError,
)
from sap_cloud_sdk.core.telemetry import Module, Operation, record_metrics

Check failure on line 34 in src/sap_cloud_sdk/core/auditlog_ng/client.py

View workflow job for this annotation

GitHub Actions / Code Quality Checks

ruff (F401)

src/sap_cloud_sdk/core/auditlog_ng/client.py:34:61: F401 `sap_cloud_sdk.core.telemetry.record_metrics` imported but unused help: Remove unused import

Check failure on line 34 in src/sap_cloud_sdk/core/auditlog_ng/client.py

View workflow job for this annotation

GitHub Actions / Code Quality Checks

ruff (F401)

src/sap_cloud_sdk/core/auditlog_ng/client.py:34:50: F401 `sap_cloud_sdk.core.telemetry.Operation` imported but unused help: Remove unused import


class AuditClient:
"""OTLP-based audit log client.

Wraps protobuf audit events in OpenTelemetry LogRecords and sends
them over gRPC to an OTLP-compatible endpoint.

Note:
Do not instantiate this class directly. Use the
:func:`~sap_cloud_sdk.core.auditlog_ng.create_client` factory function
instead, which handles proper configuration.

Example::

from sap_cloud_sdk.core.auditlog_ng import create_client

client = create_client(config=AuditLogNGConfig(
endpoint="audit.example.com:443",
deployment_id="my-deployment",
namespace="namespace-123",
cert_file="client.pem",
key_file="client.key",
))

event_id = client.send(event, "DataAccess")
client.close()
"""

def __init__(self, config: AuditLogNGConfig, _telemetry_source: Optional[Module] = None) -> None:
"""Initialize the audit client from a config object.

Args:
config: Fully-validated :class:`AuditLogNGConfig`.
"""
self._config = config
self._telemetry_source = _telemetry_source
self._closed = False

# Build gRPC credentials
credentials = self._build_credentials(config)

# Create OTLP exporter
self._exporter = OTLPLogExporter(
endpoint=config.endpoint,
insecure=config.insecure,
credentials=credentials,
compression=(
grpc.Compression.Gzip
if config.compression
else grpc.Compression.NoCompression
),
)

# Create logger provider
self._provider = LoggerProvider(
resource=Resource.create(
{
"service.name": config.service_name,
"sap.ucl.deployment_id": config.deployment_id,
"sap.ucl.system_namespace": config.namespace,
}
)
)

# Add processor
processor = (
BatchLogRecordProcessor(self._exporter)
if config.batch
else SimpleLogRecordProcessor(self._exporter)
)
self._provider.add_log_record_processor(processor)

self._logger = self._provider.get_logger(
"auditlog",
schema_url=config.schema_url,
)

# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------

def send(
self,
event: Message,
event_type: Optional[str] = None,
format: str = "protobuf-binary",
) -> str:
"""Send an audit log event.

Args:
event: Protobuf message (audit event).
event_type: Event type name (defaults to message type name).
format: Serialization format (``"protobuf-binary"`` or ``"json"``).

Returns:
Generated event ID (UUID).

Raises:
RuntimeError: If the client has already been closed.
ValueError: If *format* is not a supported value.
ValidationError: If the protobuf event fails validation.
"""
Comment thread
simeongelovski marked this conversation as resolved.
if self._closed:
raise RuntimeError("Client is closed")

if format not in {"protobuf-binary", "json"}:
raise ValueError("format must be 'protobuf-binary' or 'json'")

try:
protovalidate.validate(event)
except ProtoValidationError as e:
raise ValidationError(f"Audit event validation failed: {e}") from e

tenant_id = event.common.tenant_id
validate_source_arg(tenant_id, "tenant_id")

event_id = str(uuid.uuid4())

# Determine event type from message descriptor if not provided
if event_type is None:
event_type = event.DESCRIPTOR.name

event_type = f"sap.als.AuditEvent.{event_type}.v2"

if format == "json":
mime_type = "application/json"
event_dict = MessageToDict(event, preserving_proto_field_name=False)
body = json.dumps(event_dict)
else:
mime_type = "application/protobuf"
body = event.SerializeToString()

# Emit log record
self._logger.emit(
severity_number=SeverityNumber.INFO,
event_name=event_type,
body=body,
attributes={
"cloudevents.event_id": event_id,
"sap.tenancy.tenant_id": tenant_id,
"sap.auditlogging.mime_type": mime_type,
},
)

return event_id

def send_json(self, event: Message, event_type: Optional[str] = None) -> str:
"""Send event in JSON format."""
return self.send(event, event_type, format="json")

def flush(self) -> None:
"""Flush pending events (for batch mode)."""
if not self._closed:
self._provider.force_flush()

def close(self) -> None:
"""Shutdown the client and flush pending events."""
if not self._closed:
self._provider.shutdown()
self._closed = True

# ------------------------------------------------------------------
# Context manager
# ------------------------------------------------------------------

def __enter__(self) -> "AuditClient":
return self

def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
self.close()
return False

# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------

@staticmethod
def _build_credentials(
config: AuditLogNGConfig,
) -> Optional[grpc.ChannelCredentials]:
"""Build gRPC channel credentials from config."""
if config.insecure:
return None

root_certs = None
private_key = None
cert_chain = None

if config.ca_file:
with open(config.ca_file, "rb") as f:
root_certs = f.read()

if config.cert_file and config.key_file:
with open(config.key_file, "rb") as f:
private_key = f.read()
with open(config.cert_file, "rb") as f:
cert_chain = f.read()

return grpc.ssl_channel_credentials(
root_certificates=root_certs,
private_key=private_key,
certificate_chain=cert_chain,
)
Loading
Loading