From adf52faf02a7f44aa8df01c752e7b4fe4cfa8380 Mon Sep 17 00:00:00 2001 From: Kemal Hadimli Date: Mon, 22 Jun 2026 18:51:11 +0100 Subject: [PATCH 1/2] feat: Report source plugin versions to the injected platform destination Inject a `source_versions` list ({name, path, version}) for the sync's sources into the auto-injected platform destination spec, so the platform can reject sources whose version the asset view can't process before any upload. --- cli/internal/platform/inject.go | 15 +++++++++++++-- cli/internal/platform/inject_test.go | 3 +++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/cli/internal/platform/inject.go b/cli/internal/platform/inject.go index 3087467f2f..adeb8ced10 100644 --- a/cli/internal/platform/inject.go +++ b/cli/internal/platform/inject.go @@ -138,6 +138,16 @@ func MaybeInjectDestination(ctx context.Context, logger zerolog.Logger, token, t } apiURL := platformAPIURL(session.ApiUrl) + // Report each source's plugin path+version so the platform can reject (before + // any upload) sources whose version the asset view can't process. + sourceVersions := make([]map[string]string, 0, len(sources)) + for _, s := range sources { + sourceVersions = append(sourceVersions, map[string]string{ + "name": s.Name, + "path": s.Path, + "version": s.Version, + }) + } dest := &specs.Destination{ Metadata: specs.Metadata{ Name: destinationName, @@ -151,8 +161,9 @@ func MaybeInjectDestination(ctx context.Context, logger zerolog.Logger, token, t // Unique per invocation so concurrent runs don't wipe each other's rows. SyncGroupId: strconv.FormatUint(allocateSyncGroupID(time.Now()), 10), Spec: map[string]any{ - "api_url": apiURL, - "token": session.Token, + "api_url": apiURL, + "token": session.Token, + "source_versions": sourceVersions, }, } dest.SetDefaults() diff --git a/cli/internal/platform/inject_test.go b/cli/internal/platform/inject_test.go index aead539b3e..fd2ea8d5bc 100644 --- a/cli/internal/platform/inject_test.go +++ b/cli/internal/platform/inject_test.go @@ -99,6 +99,9 @@ func TestInject_Active_AppendsDestinationAndWiresSources(t *testing.T) { require.Equal(t, destinationName, got[1].Name) require.Equal(t, "https://x.us.platform.cloudquery.io/api", got[1].Spec["api_url"], "external-syncs endpoints are served under /api") require.Equal(t, "cqpd_payload.sig", got[1].Spec["token"], "destination must get the minted cqpd_ token, not the cloud credential") + require.Equal(t, []map[string]string{ + {"name": "aws", "path": "cloudquery/aws", "version": "v1.0.0"}, + }, got[1].Spec["source_versions"], "each source's path+version must be reported for the platform gate") require.Equal(t, defaultPlugin.Version, got[1].Version) require.Equal(t, defaultPlugin.Path, got[1].Path) require.Equal(t, specs.RegistryCloudQuery, got[1].Registry) From cea18bec1c6eefa40bf0816b4f93807fc53c9b2b Mon Sep 17 00:00:00 2001 From: Kemal Hadimli Date: Mon, 22 Jun 2026 20:21:54 +0100 Subject: [PATCH 2/2] =?UTF-8?q?refactor:=20Address=20review=20=E2=80=94=20?= =?UTF-8?q?typed=20sourceVersion=20struct,=20two-source=20test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Use a local sourceVersion struct instead of []map[string]string (self-documenting, guards against key typos). Add a two-source case asserting order and that none are dropped. --- cli/internal/platform/inject.go | 17 +++++++++++------ cli/internal/platform/inject_test.go | 17 ++++++++++++++--- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/cli/internal/platform/inject.go b/cli/internal/platform/inject.go index adeb8ced10..8f85c1a0cf 100644 --- a/cli/internal/platform/inject.go +++ b/cli/internal/platform/inject.go @@ -76,6 +76,15 @@ func pluginCoords() pluginCoordinates { return p } +// sourceVersion is one entry of the platform destination's `source_versions` +// spec field — the source plugin path+version the platform gates on. JSON tags +// match the platform's CreateExternalSync `sources` items. +type sourceVersion struct { + Name string `json:"name"` + Path string `json:"path"` + Version string `json:"version"` +} + // MaybeInjectDestination appends a `platform` destination carrying a freshly // minted cqpd_ token when the team has an active platform tenant. Tenant/network // failures skip injection silently; a pre-existing `platform` destination is a @@ -140,13 +149,9 @@ func MaybeInjectDestination(ctx context.Context, logger zerolog.Logger, token, t apiURL := platformAPIURL(session.ApiUrl) // Report each source's plugin path+version so the platform can reject (before // any upload) sources whose version the asset view can't process. - sourceVersions := make([]map[string]string, 0, len(sources)) + sourceVersions := make([]sourceVersion, 0, len(sources)) for _, s := range sources { - sourceVersions = append(sourceVersions, map[string]string{ - "name": s.Name, - "path": s.Path, - "version": s.Version, - }) + sourceVersions = append(sourceVersions, sourceVersion{Name: s.Name, Path: s.Path, Version: s.Version}) } dest := &specs.Destination{ Metadata: specs.Metadata{ diff --git a/cli/internal/platform/inject_test.go b/cli/internal/platform/inject_test.go index fd2ea8d5bc..298641f95e 100644 --- a/cli/internal/platform/inject_test.go +++ b/cli/internal/platform/inject_test.go @@ -99,9 +99,10 @@ func TestInject_Active_AppendsDestinationAndWiresSources(t *testing.T) { require.Equal(t, destinationName, got[1].Name) require.Equal(t, "https://x.us.platform.cloudquery.io/api", got[1].Spec["api_url"], "external-syncs endpoints are served under /api") require.Equal(t, "cqpd_payload.sig", got[1].Spec["token"], "destination must get the minted cqpd_ token, not the cloud credential") - require.Equal(t, []map[string]string{ - {"name": "aws", "path": "cloudquery/aws", "version": "v1.0.0"}, - }, got[1].Spec["source_versions"], "each source's path+version must be reported for the platform gate") + srcVersionsJSON, err := json.Marshal(got[1].Spec["source_versions"]) + require.NoError(t, err) + require.JSONEq(t, `[{"name":"aws","path":"cloudquery/aws","version":"v1.0.0"}]`, string(srcVersionsJSON), + "each source's path+version must be reported for the platform gate") require.Equal(t, defaultPlugin.Version, got[1].Version) require.Equal(t, defaultPlugin.Path, got[1].Path) require.Equal(t, specs.RegistryCloudQuery, got[1].Registry) @@ -109,6 +110,16 @@ func TestInject_Active_AppendsDestinationAndWiresSources(t *testing.T) { require.Equal(t, specs.WriteModeAppend, got[1].WriteMode, "sync_group_id requires a write mode other than overwrite-delete-stale") require.NotEmpty(t, got[1].SyncGroupId) require.Contains(t, sources[0].Destinations, destinationName) + + // Multiple sources are reported in order, none dropped. + twoGot := mustInject(t, "tok", "team-x", []*specs.Source{ + {Metadata: specs.Metadata{Name: "aws", Path: "cloudquery/aws", Version: "v1.0.0", Registry: specs.RegistryCloudQuery}}, + {Metadata: specs.Metadata{Name: "gcp", Path: "cloudquery/gcp", Version: "v2.3.4", Registry: specs.RegistryCloudQuery}}, + }, testDestinations()) + twoJSON, err := json.Marshal(twoGot[1].Spec["source_versions"]) + require.NoError(t, err) + require.JSONEq(t, `[{"name":"aws","path":"cloudquery/aws","version":"v1.0.0"},{"name":"gcp","path":"cloudquery/gcp","version":"v2.3.4"}]`, + string(twoJSON), "sources reported in order, none dropped") } func TestInject_CreatedTenant_Injects(t *testing.T) {