diff --git a/cli/internal/platform/inject.go b/cli/internal/platform/inject.go index 3087467f2f..8f85c1a0cf 100644 --- a/cli/internal/platform/inject.go +++ b/cli/internal/platform/inject.go @@ -76,6 +76,15 @@ func pluginCoords() pluginCoordinates { return p } +// sourceVersion is one entry of the platform destination's `source_versions` +// spec field — the source plugin path+version the platform gates on. JSON tags +// match the platform's CreateExternalSync `sources` items. +type sourceVersion struct { + Name string `json:"name"` + Path string `json:"path"` + Version string `json:"version"` +} + // MaybeInjectDestination appends a `platform` destination carrying a freshly // minted cqpd_ token when the team has an active platform tenant. Tenant/network // failures skip injection silently; a pre-existing `platform` destination is a @@ -138,6 +147,12 @@ func MaybeInjectDestination(ctx context.Context, logger zerolog.Logger, token, t } apiURL := platformAPIURL(session.ApiUrl) + // Report each source's plugin path+version so the platform can reject (before + // any upload) sources whose version the asset view can't process. + sourceVersions := make([]sourceVersion, 0, len(sources)) + for _, s := range sources { + sourceVersions = append(sourceVersions, sourceVersion{Name: s.Name, Path: s.Path, Version: s.Version}) + } dest := &specs.Destination{ Metadata: specs.Metadata{ Name: destinationName, @@ -151,8 +166,9 @@ func MaybeInjectDestination(ctx context.Context, logger zerolog.Logger, token, t // Unique per invocation so concurrent runs don't wipe each other's rows. SyncGroupId: strconv.FormatUint(allocateSyncGroupID(time.Now()), 10), Spec: map[string]any{ - "api_url": apiURL, - "token": session.Token, + "api_url": apiURL, + "token": session.Token, + "source_versions": sourceVersions, }, } dest.SetDefaults() diff --git a/cli/internal/platform/inject_test.go b/cli/internal/platform/inject_test.go index aead539b3e..298641f95e 100644 --- a/cli/internal/platform/inject_test.go +++ b/cli/internal/platform/inject_test.go @@ -99,6 +99,10 @@ func TestInject_Active_AppendsDestinationAndWiresSources(t *testing.T) { require.Equal(t, destinationName, got[1].Name) require.Equal(t, "https://x.us.platform.cloudquery.io/api", got[1].Spec["api_url"], "external-syncs endpoints are served under /api") require.Equal(t, "cqpd_payload.sig", got[1].Spec["token"], "destination must get the minted cqpd_ token, not the cloud credential") + srcVersionsJSON, err := json.Marshal(got[1].Spec["source_versions"]) + require.NoError(t, err) + require.JSONEq(t, `[{"name":"aws","path":"cloudquery/aws","version":"v1.0.0"}]`, string(srcVersionsJSON), + "each source's path+version must be reported for the platform gate") require.Equal(t, defaultPlugin.Version, got[1].Version) require.Equal(t, defaultPlugin.Path, got[1].Path) require.Equal(t, specs.RegistryCloudQuery, got[1].Registry) @@ -106,6 +110,16 @@ func TestInject_Active_AppendsDestinationAndWiresSources(t *testing.T) { require.Equal(t, specs.WriteModeAppend, got[1].WriteMode, "sync_group_id requires a write mode other than overwrite-delete-stale") require.NotEmpty(t, got[1].SyncGroupId) require.Contains(t, sources[0].Destinations, destinationName) + + // Multiple sources are reported in order, none dropped. + twoGot := mustInject(t, "tok", "team-x", []*specs.Source{ + {Metadata: specs.Metadata{Name: "aws", Path: "cloudquery/aws", Version: "v1.0.0", Registry: specs.RegistryCloudQuery}}, + {Metadata: specs.Metadata{Name: "gcp", Path: "cloudquery/gcp", Version: "v2.3.4", Registry: specs.RegistryCloudQuery}}, + }, testDestinations()) + twoJSON, err := json.Marshal(twoGot[1].Spec["source_versions"]) + require.NoError(t, err) + require.JSONEq(t, `[{"name":"aws","path":"cloudquery/aws","version":"v1.0.0"},{"name":"gcp","path":"cloudquery/gcp","version":"v2.3.4"}]`, + string(twoJSON), "sources reported in order, none dropped") } func TestInject_CreatedTenant_Injects(t *testing.T) {