diff --git a/airbyte_cdk/manifest_server/routers/manifest.py b/airbyte_cdk/manifest_server/routers/manifest.py index ec27ca816..323088a88 100644 --- a/airbyte_cdk/manifest_server/routers/manifest.py +++ b/airbyte_cdk/manifest_server/routers/manifest.py @@ -13,6 +13,7 @@ from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import ( INJECTED_COMPONENTS_PY, INJECTED_COMPONENTS_PY_CHECKSUMS, + AirbyteCustomCodeNotPermittedError, ) from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets @@ -57,6 +58,11 @@ def safe_build_source( ) except jsonschema.exceptions.ValidationError as e: raise HTTPException(status_code=400, detail=f"Invalid manifest: {e.message}") + except AirbyteCustomCodeNotPermittedError as e: + raise HTTPException( + status_code=400, + detail="Custom connector code is not permitted in this environment.", + ) from e router = APIRouter( @@ -85,6 +91,7 @@ def test_read(request: StreamTestReadRequest) -> StreamReadResponse: ) config_dict = request.config.model_dump() + manifest = request.manifest.model_dump() catalog = build_catalog(request.stream_name) converted_state = [AirbyteStateMessageSerializer.load(state) for state in request.state] @@ -97,7 +104,6 @@ def test_read(request: StreamTestReadRequest) -> StreamReadResponse: # We enforce a concurrency level of 1 so that the stream is processed on a single thread # to retain ordering for the grouping of the builder message responses. - manifest = request.manifest.model_dump() if "concurrency_level" in manifest: manifest["concurrency_level"]["default_concurrency"] = 1 else: diff --git a/unit_tests/manifest_server/routers/test_manifest.py b/unit_tests/manifest_server/routers/test_manifest.py index e488380cb..ff43d19a0 100644 --- a/unit_tests/manifest_server/routers/test_manifest.py +++ b/unit_tests/manifest_server/routers/test_manifest.py @@ -6,6 +6,9 @@ from airbyte_cdk.connector_builder.models import StreamRead as CDKStreamRead from airbyte_cdk.manifest_server.app import app +from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import ( + AirbyteCustomCodeNotPermittedError, +) client = TestClient(app) @@ -188,6 +191,52 @@ def test_test_read_with_custom_components( 2, # slice_limit ) + @patch("airbyte_cdk.manifest_server.routers.manifest.build_catalog") + @patch("airbyte_cdk.manifest_server.routers.manifest.build_source") + def test_test_read_returns_bad_request_for_disallowed_custom_components( + self, + mock_build_source, + mock_build_catalog, + sample_manifest, + sample_config, + ): + manifest = { + **sample_manifest, + "streams": [ + { + **sample_manifest["streams"][0], + "retriever": { + **sample_manifest["streams"][0]["retriever"], + "record_selector": { + "type": "RecordSelector", + "extractor": { + "type": "CustomRecordExtractor", + "class_name": "components.CustomRecordExtractor", + }, + }, + }, + } + ], + } + request_data = { + "manifest": manifest, + "config": sample_config, + "stream_name": "products", + "state": [], + "custom_components_code": "class CustomRecordExtractor: pass", + } + + mock_build_catalog.return_value = Mock() + mock_build_source.side_effect = AirbyteCustomCodeNotPermittedError + + response = client.post("/v1/manifest/test_read", json=request_data) + + assert response.status_code == 400 + assert ( + response.json()["detail"] + == "Custom connector code is not permitted in this environment." + ) + @patch("airbyte_cdk.manifest_server.routers.manifest.AirbyteStateMessageSerializer") @patch("airbyte_cdk.manifest_server.routers.manifest.ManifestCommandProcessor") @patch("airbyte_cdk.manifest_server.routers.manifest.build_catalog")