Skip to content

Commit 81e2c72

Browse files
committed
Add support for OIDC auth in packaged demo runner
Signed-off-by: Karakatiza666 <bulakh.96@gmail.com>
1 parent f8dd308 commit 81e2c72

File tree

5 files changed

+257
-156
lines changed

5 files changed

+257
-156
lines changed

CLAUDE.md

Lines changed: 41 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -8279,6 +8279,7 @@ make clean
82798279
- **Pipeline**: Main interface for pipeline operations and data I/O
82808280
- **PipelineBuilder**: Declarative pipeline construction
82818281
- **Output Handlers**: Stream processors for pipeline outputs
8282+
- **OIDC Authentication**: `testutils_oidc.py` - OIDC token management and authentication utilities (see `tests/CLAUDE.md` for details)
82828283

82838284
## Important Implementation Details
82848285

@@ -8375,10 +8376,12 @@ pipeline = client.create_pipeline("my_pipeline")
83758376
- `pandas>=2.1.2` - DataFrame operations and data manipulation
83768377
- `numpy>=2.2.4` - Numerical computing support
83778378
- `typing-extensions` - Enhanced type annotations
8379+
- `PyJWT>=2.8.0` - JWT token handling for OIDC authentication
8380+
- `pretty-errors` - Enhanced error formatting
8381+
- `ruff>=0.6.9` - Fast Python linter and formatter
83788382

83798383
#### Development Dependencies
83808384
- `pytest>=8.3.5` - Testing framework with timeout support
8381-
- `ruff>=0.6.9` - Fast Python linter and formatter
83828385
- `sphinx==7.3.7` - Documentation generation
83838386
- `kafka-python-ng==2.2.2` - Kafka integration for testing
83848387

@@ -8448,51 +8451,26 @@ This completely eliminates auth server rate limiting issues and cross-process to
84488451

84498452
##### Implementation Details
84508453

8451-
**Master-Only Token Fetching**:
8452-
```python
8453-
def pytest_configure(config):
8454-
"""Fetch OIDC token on master node only."""
8455-
if is_master(config):
8456-
token_data = _fetch_oidc_token()
8457-
# Store in environment variable for cross-process access
8458-
import base64
8459-
token_json = json.dumps(token_data)
8460-
token_b64 = base64.b64encode(token_json.encode()).decode()
8461-
os.environ['FELDERA_PYTEST_OIDC_TOKEN'] = token_b64
8462-
```
8463-
8464-
**Cross-Process Token Access**:
8465-
```python
8466-
def obtain_access_token(self):
8467-
"""Get token from environment variable set by master node."""
8468-
env_token = os.getenv('FELDERA_PYTEST_OIDC_TOKEN')
8469-
if env_token:
8470-
import base64
8471-
token_json = base64.b64decode(env_token.encode()).decode()
8472-
token_data = json.loads(token_json)
8473-
return token_data['access_token']
8474-
```
8475-
8476-
**Session Fixture Verification**:
8477-
- Session-scoped `oidc_token_fixture` verifies token is available and valid
8478-
- Validates token expiration with 30-second buffer
8479-
- Serves as early validation that authentication is properly set up
8480-
8481-
**Header Merging for Custom Requests**:
8482-
```python
8483-
def http_request(method: str, path: str, **kwargs):
8484-
"""Merge authentication headers with custom headers."""
8485-
base_headers = _base_headers() # Contains Authorization token
8486-
custom_headers = kwargs.pop("headers", None) or {}
8487-
headers = {**base_headers, **custom_headers} # Merge both
8488-
```
8454+
The OIDC token caching system uses pytest-xdist hooks and environment variables for cross-process token sharing:
8455+
8456+
- **Master-Only Fetching**: `pytest_configure()` hook fetches OIDC token once on master node
8457+
- **Environment Storage**: Token cached in `FELDERA_PYTEST_OIDC_TOKEN` with base64 encoding
8458+
- **Cross-Process Access**: Worker processes inherit environment variable automatically
8459+
- **Session Validation**: `oidc_token_fixture` verifies token setup with 30-second expiration buffer
8460+
- **Header Integration**: `http_request()` merges authentication headers with custom headers
84898461

84908462
**Key Components**:
84918463
- **`pytest_configure()` (conftest.py)**: Master-only hook that fetches OIDC token once and stores in environment
84928464
- **`pytest_configure_node()` (conftest.py)**: xdist hook that passes token data via workerinput (backup mechanism)
84938465
- **`oidc_token_fixture()` (conftest.py)**: Session fixture that verifies token setup
8494-
- **`obtain_access_token()` (oidc_test_helper.py)**: Returns token from environment variable or fails fast
8466+
- **`obtain_access_token()` (testutils_oidc.py)**: Returns token from environment variable or fails fast
84958467
- **`http_request()` (helper.py)**: Merges authentication headers with custom headers for ingress/egress requests
8468+
- **Reusable Token Functions (testutils_oidc.py)**:
8469+
- `setup_token_cache()`: High-level function that sets up token caching (used by both pytest and demo runners)
8470+
- `get_cached_token_from_env()`: Retrieves and validates cached tokens
8471+
- `parse_cached_token()`: Parses base64-encoded token data
8472+
- `is_token_valid()`: Checks token expiration with configurable buffer
8473+
- `encode_token_for_env()`: Encodes tokens for environment storage
84968474

84978475
### API Key Authentication (Fallback)
84988476

@@ -8504,17 +8482,9 @@ export FELDERA_API_KEY="your-api-key"
85048482

85058483
### Usage in Tests
85068484

8507-
Platform tests automatically detect and use the appropriate authentication method:
8485+
Platform tests automatically detect and use the appropriate authentication method via `_base_headers()` helper function.
85088486

8509-
```python
8510-
from tests.platform.helper import _base_headers
8511-
8512-
# Headers automatically include OIDC token or API key
8513-
headers = _base_headers()
8514-
response = requests.get(url, headers=headers)
8515-
```
8516-
8517-
The authentication selection follows this priority:
8487+
Authentication priority:
85188488
1. **OIDC** (if `OIDC_TEST_ISSUER` and related vars are set)
85198489
2. **API Key** (if `FELDERA_API_KEY` is set)
85208490
3. **No Auth** (for local testing without authentication)
@@ -8533,6 +8503,27 @@ env:
85338503
```
85348504
85358505
This ensures consistent authentication across both "Runtime Integration Tests" and "Platform Integration Tests (OSS Docker Image)" workflows that run in parallel.
8506+
8507+
### OIDC Usage Beyond Testing
8508+
8509+
The OIDC infrastructure is designed to be reusable outside of the test suite:
8510+
8511+
#### Demo Runners and External Tools
8512+
8513+
External tools can reuse the OIDC infrastructure by calling `setup_token_cache()` followed by `_get_effective_api_key()` to get cached tokens or fallback API keys.
8514+
8515+
#### Token Caching for Multiple Processes
8516+
8517+
The token caching system is designed to work across multiple processes:
8518+
8519+
1. **First Process**: Calls `setup_token_cache()` → fetches and caches token in environment
8520+
2. **Subsequent Processes**: Call `setup_token_cache()` → reuses cached token if still valid
8521+
3. **Automatic Refresh**: Fetches new token only when cached token expires
8522+
8523+
This pattern is used by:
8524+
- **Pytest Test Runs**: Master node fetches token, workers reuse it
8525+
- **Demo Runners**: `demo/all-packaged/run.py` uses the same caching mechanism
8526+
- **CI Workflows**: Multiple demos in sequence reuse the same token
85368527
<!-- SECTION:python/tests/CLAUDE.md END -->
85378528

85388529
---

demo/all-packaged/run.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,42 @@
1616
import argparse
1717
from feldera import FelderaClient, PipelineBuilder
1818
from feldera.enums import PipelineStatus
19+
from feldera.testutils import _get_effective_api_key
20+
from feldera.testutils_oidc import setup_token_cache
1921

2022

2123
def main():
24+
# Initialize OIDC token cache before any API calls (reuses pytest logic)
25+
setup_token_cache()
26+
2227
# Parse command-line arguments
2328
parser = argparse.ArgumentParser()
2429
parser.add_argument(
2530
"--api-url", required=True, help="Feldera API URL (e.g., http://localhost:8080)"
2631
)
2732
args = parser.parse_args()
2833

29-
# Create Feldera client using the API URL
34+
# Create Feldera client using the API URL with OIDC token caching support
3035
api_url = args.api_url
3136
print(f"Feldera API URL: {api_url}")
32-
client = FelderaClient(api_url)
37+
38+
# Get effective API key (OIDC token takes precedence over static API key)
39+
# This reuses the same token caching infrastructure as the Python test suite
40+
effective_api_key = _get_effective_api_key()
41+
if effective_api_key:
42+
print("🔐 AUTH: Using cached OIDC authentication")
43+
client = FelderaClient(api_url, api_key=effective_api_key)
44+
else:
45+
print("🔐 AUTH: Using no authentication")
46+
client = FelderaClient(api_url)
3347

3448
# Retrieve and run all packaged demos
35-
demos = requests.get(f"{api_url}/v0/config/demos").json()
49+
# Use the same authentication headers for the demos request
50+
headers = {"Accept": "application/json"}
51+
if effective_api_key:
52+
headers["Authorization"] = f"Bearer {effective_api_key}"
53+
54+
demos = requests.get(f"{api_url}/v0/config/demos", headers=headers).json()
3655
print(f"Total {len(demos)} packages demos were found and will be run")
3756
for demo in demos:
3857
print(f"Running packaged demo: {demo['name']}...")

python/feldera/testutils_oidc.py

Lines changed: 186 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -101,25 +101,13 @@ def obtain_access_token(self, pytest_cache=None) -> str:
101101
current_time = time.time()
102102

103103
# Check environment variable for cross-process token sharing
104-
env_token = os.getenv("FELDERA_PYTEST_OIDC_TOKEN")
105-
if env_token:
106-
try:
107-
import base64
108-
109-
token_json = base64.b64decode(env_token.encode()).decode()
110-
token_data = json.loads(token_json)
111-
112-
if (
113-
token_data.get("access_token")
114-
and current_time < token_data.get("expires_at", 0) - 30
115-
):
116-
logger.info("Using environment variable cached access token")
117-
# Cache in instance for future calls to avoid repeated parsing
118-
self._access_token = token_data["access_token"]
119-
self._token_expires_at = token_data["expires_at"]
120-
return token_data["access_token"]
121-
except Exception as e:
122-
logger.warning(f"Failed to parse environment token: {e}")
104+
token_data = get_cached_token_from_env()
105+
if token_data:
106+
logger.info("Using environment variable cached access token")
107+
# Cache in instance for future calls to avoid repeated parsing
108+
self._access_token = token_data["access_token"]
109+
self._token_expires_at = token_data["expires_at"]
110+
return token_data["access_token"]
123111

124112
# Fallback: Check instance cache
125113
if self._access_token and current_time < self._token_expires_at - 30:
@@ -195,3 +183,182 @@ def get_oidc_test_helper() -> Optional[OidcTestHelper]:
195183
if config:
196184
_test_helper = OidcTestHelper(config)
197185
return _test_helper
186+
187+
188+
def parse_cached_token(env_token: str) -> Optional[Dict[str, Any]]:
189+
"""
190+
Parse and validate a base64-encoded token from environment variable.
191+
192+
Args:
193+
env_token: Base64-encoded JSON token data from environment variable
194+
195+
Returns:
196+
Parsed token data dict if valid, None if invalid or expired
197+
"""
198+
try:
199+
import base64
200+
token_json = base64.b64decode(env_token.encode()).decode()
201+
token_data = json.loads(token_json)
202+
return token_data
203+
except Exception as e:
204+
logging.getLogger(__name__).warning(f"Failed to parse cached token: {e}")
205+
return None
206+
207+
208+
def is_token_valid(token_data: Dict[str, Any], buffer_seconds: int = 30) -> bool:
209+
"""
210+
Check if token data is valid and not expired.
211+
212+
Args:
213+
token_data: Dictionary containing token information
214+
buffer_seconds: Safety buffer before expiration (default 30 seconds)
215+
216+
Returns:
217+
True if token is valid and not expired, False otherwise
218+
"""
219+
if not token_data:
220+
return False
221+
222+
access_token = token_data.get("access_token")
223+
expires_at = token_data.get("expires_at", 0)
224+
225+
if not access_token:
226+
return False
227+
228+
current_time = time.time()
229+
return current_time < expires_at - buffer_seconds
230+
231+
232+
def encode_token_for_env(token_data: Dict[str, Any]) -> str:
233+
"""
234+
Encode token data as base64 for storage in environment variables.
235+
236+
Args:
237+
token_data: Dictionary containing token information
238+
239+
Returns:
240+
Base64-encoded JSON string suitable for environment variable storage
241+
"""
242+
import base64
243+
token_json = json.dumps(token_data)
244+
return base64.b64encode(token_json.encode()).decode()
245+
246+
247+
def get_cached_token_from_env(env_var_name: str = "FELDERA_PYTEST_OIDC_TOKEN") -> Optional[Dict[str, Any]]:
248+
"""
249+
Retrieve and validate cached token from environment variable.
250+
251+
Args:
252+
env_var_name: Name of environment variable containing cached token
253+
254+
Returns:
255+
Valid token data if available and not expired, None otherwise
256+
"""
257+
import os
258+
259+
env_token = os.getenv(env_var_name)
260+
if not env_token:
261+
return None
262+
263+
token_data = parse_cached_token(env_token)
264+
if token_data and is_token_valid(token_data):
265+
return token_data
266+
267+
return None
268+
269+
270+
def setup_token_cache() -> Optional[Dict[str, Any]]:
271+
"""
272+
Set up OIDC token cache in environment variable if not already present.
273+
274+
This function:
275+
1. Checks if a valid token is already cached
276+
2. If not, fetches a new token
277+
3. Stores the token in environment variable for cross-process access
278+
279+
Used by both pytest hooks and demo runners to ensure consistent token caching.
280+
281+
Returns:
282+
Token data if successfully cached, None if OIDC not configured
283+
"""
284+
import os
285+
286+
# Check if token is already cached and still valid
287+
cached_token = get_cached_token_from_env()
288+
if cached_token:
289+
print("🔐 AUTH: Using existing cached OIDC token")
290+
return cached_token
291+
292+
# Fetch new token if needed
293+
token_data = fetch_oidc_token()
294+
if token_data:
295+
# Store in environment variable for reuse by subsequent processes
296+
token_b64 = encode_token_for_env(token_data)
297+
os.environ["FELDERA_PYTEST_OIDC_TOKEN"] = token_b64
298+
print("🔐 AUTH: Token cached in environment for cross-process access")
299+
return token_data
300+
else:
301+
print("🔐 AUTH: No OIDC configuration found, using fallback authentication")
302+
return None
303+
304+
305+
def fetch_oidc_token() -> Optional[Dict[str, Any]]:
306+
"""
307+
Fetch OIDC token using Resource Owner Password Grant flow.
308+
309+
This function is used by both pytest hooks and demo runners to ensure
310+
consistent token fetching behavior across the entire test infrastructure.
311+
312+
Returns:
313+
Dict containing access_token, expires_at, and cached_at if successful,
314+
None if OIDC is not configured.
315+
"""
316+
oidc_helper = get_oidc_test_helper()
317+
if oidc_helper is None:
318+
return None
319+
320+
print("🔐 AUTH: Fetching OIDC token")
321+
322+
try:
323+
token_endpoint = oidc_helper.get_token_endpoint()
324+
data = {
325+
"grant_type": "password",
326+
"username": oidc_helper.config.username,
327+
"password": oidc_helper.config.password,
328+
"client_id": oidc_helper.config.client_id,
329+
"client_secret": oidc_helper.config.client_secret,
330+
"scope": oidc_helper.config.scope,
331+
"audience": "feldera-api",
332+
}
333+
334+
headers = {
335+
"Content-Type": "application/x-www-form-urlencoded",
336+
"Accept": "application/json",
337+
}
338+
339+
response = requests.post(token_endpoint, data=data, headers=headers, timeout=30)
340+
341+
if not response.ok:
342+
print(f"🔐 AUTH: ❌ Token request FAILED: {response.status_code}")
343+
raise Exception(
344+
f"Token request failed: {response.status_code} - {response.text}"
345+
)
346+
347+
token_response = response.json()
348+
print("🔐 AUTH: ✅ Token request SUCCESS!")
349+
350+
access_token = token_response["access_token"]
351+
expires_in = token_response.get("expires_in", 3600)
352+
expires_at = time.time() + expires_in
353+
354+
return {
355+
"access_token": access_token,
356+
"expires_at": expires_at,
357+
"cached_at": time.time(),
358+
}
359+
360+
except Exception as e:
361+
print(f"🔐 AUTH: CRITICAL FAILURE - Failed to fetch OIDC token: {e}")
362+
raise RuntimeError(
363+
f"OIDC authentication is configured but token retrieval failed: {e}"
364+
) from e

0 commit comments

Comments
 (0)