Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion airbyte_cdk/manifest_server/routers/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import (
INJECTED_COMPONENTS_PY,
INJECTED_COMPONENTS_PY_CHECKSUMS,
AirbyteCustomCodeNotPermittedError,
)
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets

Expand Down Expand Up @@ -57,6 +58,11 @@ def safe_build_source(
)
except jsonschema.exceptions.ValidationError as e:
raise HTTPException(status_code=400, detail=f"Invalid manifest: {e.message}")
except AirbyteCustomCodeNotPermittedError as e:
raise HTTPException(
status_code=400,
detail="Custom connector code is not permitted in this environment.",
) from e


router = APIRouter(
Expand Down Expand Up @@ -85,6 +91,7 @@ def test_read(request: StreamTestReadRequest) -> StreamReadResponse:
)

config_dict = request.config.model_dump()
manifest = request.manifest.model_dump()

catalog = build_catalog(request.stream_name)
converted_state = [AirbyteStateMessageSerializer.load(state) for state in request.state]
Expand All @@ -97,7 +104,6 @@ def test_read(request: StreamTestReadRequest) -> StreamReadResponse:

# We enforce a concurrency level of 1 so that the stream is processed on a single thread
# to retain ordering for the grouping of the builder message responses.
manifest = request.manifest.model_dump()
if "concurrency_level" in manifest:
manifest["concurrency_level"]["default_concurrency"] = 1
else:
Expand Down
49 changes: 49 additions & 0 deletions unit_tests/manifest_server/routers/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

from airbyte_cdk.connector_builder.models import StreamRead as CDKStreamRead
from airbyte_cdk.manifest_server.app import app
from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import (
AirbyteCustomCodeNotPermittedError,
)

client = TestClient(app)

Expand Down Expand Up @@ -188,6 +191,52 @@ def test_test_read_with_custom_components(
2, # slice_limit
)

@patch("airbyte_cdk.manifest_server.routers.manifest.build_catalog")
@patch("airbyte_cdk.manifest_server.routers.manifest.build_source")
def test_test_read_returns_bad_request_for_disallowed_custom_components(
self,
mock_build_source,
mock_build_catalog,
sample_manifest,
sample_config,
):
manifest = {
**sample_manifest,
"streams": [
{
**sample_manifest["streams"][0],
"retriever": {
**sample_manifest["streams"][0]["retriever"],
"record_selector": {
"type": "RecordSelector",
"extractor": {
"type": "CustomRecordExtractor",
"class_name": "components.CustomRecordExtractor",
},
},
},
}
],
}
request_data = {
"manifest": manifest,
"config": sample_config,
"stream_name": "products",
"state": [],
"custom_components_code": "class CustomRecordExtractor: pass",
}

mock_build_catalog.return_value = Mock()
mock_build_source.side_effect = AirbyteCustomCodeNotPermittedError

response = client.post("/v1/manifest/test_read", json=request_data)

assert response.status_code == 400
assert (
response.json()["detail"]
== "Custom connector code is not permitted in this environment."
)

@patch("airbyte_cdk.manifest_server.routers.manifest.AirbyteStateMessageSerializer")
@patch("airbyte_cdk.manifest_server.routers.manifest.ManifestCommandProcessor")
@patch("airbyte_cdk.manifest_server.routers.manifest.build_catalog")
Expand Down
Loading