Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions cli/internal/platform/inject.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ func pluginCoords() pluginCoordinates {
return p
}

// sourceVersion is one entry of the platform destination's `source_versions`
// spec field — the source plugin path+version the platform gates on. JSON tags
// match the platform's CreateExternalSync `sources` items.
type sourceVersion struct {
Name string `json:"name"`
Path string `json:"path"`
Version string `json:"version"`
}

// MaybeInjectDestination appends a `platform` destination carrying a freshly
// minted cqpd_ token when the team has an active platform tenant. Tenant/network
// failures skip injection silently; a pre-existing `platform` destination is a
Expand Down Expand Up @@ -138,6 +147,12 @@ func MaybeInjectDestination(ctx context.Context, logger zerolog.Logger, token, t
}

apiURL := platformAPIURL(session.ApiUrl)
// Report each source's plugin path+version so the platform can reject (before
// any upload) sources whose version the asset view can't process.
sourceVersions := make([]sourceVersion, 0, len(sources))
for _, s := range sources {
sourceVersions = append(sourceVersions, sourceVersion{Name: s.Name, Path: s.Path, Version: s.Version})
}
dest := &specs.Destination{
Metadata: specs.Metadata{
Name: destinationName,
Expand All @@ -151,8 +166,9 @@ func MaybeInjectDestination(ctx context.Context, logger zerolog.Logger, token, t
// Unique per invocation so concurrent runs don't wipe each other's rows.
SyncGroupId: strconv.FormatUint(allocateSyncGroupID(time.Now()), 10),
Spec: map[string]any{
"api_url": apiURL,
"token": session.Token,
"api_url": apiURL,
"token": session.Token,
"source_versions": sourceVersions,
},
}
dest.SetDefaults()
Expand Down
14 changes: 14 additions & 0 deletions cli/internal/platform/inject_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,27 @@ func TestInject_Active_AppendsDestinationAndWiresSources(t *testing.T) {
require.Equal(t, destinationName, got[1].Name)
require.Equal(t, "https://x.us.platform.cloudquery.io/api", got[1].Spec["api_url"], "external-syncs endpoints are served under /api")
require.Equal(t, "cqpd_payload.sig", got[1].Spec["token"], "destination must get the minted cqpd_ token, not the cloud credential")
srcVersionsJSON, err := json.Marshal(got[1].Spec["source_versions"])
require.NoError(t, err)
require.JSONEq(t, `[{"name":"aws","path":"cloudquery/aws","version":"v1.0.0"}]`, string(srcVersionsJSON),
"each source's path+version must be reported for the platform gate")
require.Equal(t, defaultPlugin.Version, got[1].Version)
require.Equal(t, defaultPlugin.Path, got[1].Path)
require.Equal(t, specs.RegistryCloudQuery, got[1].Registry)
require.True(t, got[1].SyncSummary, "send_sync_summary must be set so the destination receives finalize signals")
require.Equal(t, specs.WriteModeAppend, got[1].WriteMode, "sync_group_id requires a write mode other than overwrite-delete-stale")
require.NotEmpty(t, got[1].SyncGroupId)
require.Contains(t, sources[0].Destinations, destinationName)

// Multiple sources are reported in order, none dropped.
twoGot := mustInject(t, "tok", "team-x", []*specs.Source{
{Metadata: specs.Metadata{Name: "aws", Path: "cloudquery/aws", Version: "v1.0.0", Registry: specs.RegistryCloudQuery}},
{Metadata: specs.Metadata{Name: "gcp", Path: "cloudquery/gcp", Version: "v2.3.4", Registry: specs.RegistryCloudQuery}},
}, testDestinations())
twoJSON, err := json.Marshal(twoGot[1].Spec["source_versions"])
require.NoError(t, err)
require.JSONEq(t, `[{"name":"aws","path":"cloudquery/aws","version":"v1.0.0"},{"name":"gcp","path":"cloudquery/gcp","version":"v2.3.4"}]`,
string(twoJSON), "sources reported in order, none dropped")
}

func TestInject_CreatedTenant_Injects(t *testing.T) {
Expand Down