Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 5 additions & 19 deletions cli/cmd/migrate_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cmd

import (
"context"
"encoding/json"
"fmt"
"time"

Expand Down Expand Up @@ -45,28 +44,15 @@ func migrateConnectionV3(ctx context.Context, sourceClient *managedplugin.Client
}

// initialize destinations first, so that their connections may be used as backends by the source
for i := range destinationsClients {
destSpec := destinationSpecs[i]
destSpecBytes, err := json.Marshal(destSpec.Spec)
if err != nil {
return err
}
if _, err := destinationsPbClients[i].Init(ctx, &plugin.Init_Request{
Spec: destSpecBytes,
}); err != nil {
return err
for i, destinationSpec := range destinationSpecs {
if err := initPlugin(ctx, destinationsPbClients[i], destinationSpec.Spec, false); err != nil {
return fmt.Errorf("failed to init destination %v: %w", destinationSpec.Name, err)
}
}

specBytes, err := json.Marshal(sourceSpec.Spec)
err := initPlugin(ctx, sourcePbClient, sourceSpec.Spec, true)
if err != nil {
return err
}
if _, err := sourcePbClient.Init(ctx, &plugin.Init_Request{
Spec: specBytes,
NoConnection: true,
}); err != nil {
return err
return fmt.Errorf("failed to init source %v: %w", sourceSpec.Name, err)
}

writeClients := make([]plugin.Plugin_WriteClient, len(destinationsPbClients))
Expand Down
77 changes: 77 additions & 0 deletions cli/cmd/specs.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
package cmd

import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/cloudquery/cloudquery/cli/internal/specs/v0"
"github.com/cloudquery/plugin-pb-go/pb/plugin/v3"
pbSpecs "github.com/cloudquery/plugin-pb-go/specs"
"github.com/rs/zerolog/log"
"github.com/santhosh-tekuri/jsonschema/v5"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func CLIRegistryToPbRegistry(registry specs.Registry) pbSpecs.Registry {
Expand Down Expand Up @@ -86,3 +94,72 @@ func CLIDestinationSpecToPbSpec(spec specs.Destination) pbSpecs.Destination {
Spec: spec.Spec,
}
}

// initPlugin is a simple wrapper that will try to validate the spec before actually passing it to Init.
func initPlugin(ctx context.Context, client plugin.PluginClient, spec any, noConnection bool) error {
if !noConnection {
// perform spec validation
if err := validatePluginSpec(ctx, client, spec); err != nil {
return err
}
}

specBytes, err := json.Marshal(spec)
if err != nil {
return err
}

_, err = client.Init(ctx, &plugin.Init_Request{Spec: specBytes, NoConnection: noConnection})
return err
}

// validatePluginSpec encompasses spec validation only:
// 1. Get spec schema from the plugin.
// If the call isn't implemented, just skip the validation.
// 2. Validate that the provided JSON schema is valid & can be used for spec validation.
// If the spec is empty (i.e., the plugin didn't supply the schema) just skip.
// 3. If the schema isn't empty but not valid, print the error message & skip the validation.
// 4. Finally, return the validation result.
func validatePluginSpec(ctx context.Context, client plugin.PluginClient, spec any) error {
schema, err := client.GetSpecSchema(ctx, &plugin.GetSpecSchema_Request{})
if err != nil {
st, ok := status.FromError(err)
if !ok {
// not a gRPC-compatible error
log.Err(err).Msg("failed to get spec schema")
return err
}
if st.Code() != codes.Unimplemented {
// unimplemented is OK, treat as empty schema
log.Err(err).Msg("failed to get spec schema")
return err
}
}

jsonSchema := schema.GetJsonSchema()
if len(jsonSchema) == 0 {
// This will also be true for Unimplemented response (schema = nil => schema.GetJsonSchema() = "")
log.Info().Msg("empty JSON schema for plugin spec, skipping validation")
return nil
}

sc, err := parseJSONSchema(jsonSchema)
if err != nil {
log.Err(err).Msg("failed to parse spec schema, skipping validation")
return nil
}

return sc.Validate(spec)
}

func parseJSONSchema(jsonSchema string) (*jsonschema.Schema, error) {
c := jsonschema.NewCompiler()
c.Draft = jsonschema.Draft2020
c.AssertFormat = true

if err := c.AddResource("schema.json", strings.NewReader(jsonSchema)); err != nil {
return nil, err
}

return c.Compile("schema.json")
Comment thread
candiduslynx marked this conversation as resolved.
Outdated
}
34 changes: 7 additions & 27 deletions cli/cmd/sync_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,28 +126,14 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des
}

// initialize destinations first, so that their connections may be used as backends by the source
for i := range destinationsClients {
destSpec := destinationSpecs[i]
destSpecBytes, err := json.Marshal(destSpec.Spec)
if err != nil {
return err
}
if _, err := destinationsPbClients[i].Init(ctx, &plugin.Init_Request{
Spec: destSpecBytes,
}); err != nil {
return fmt.Errorf("failed to init destination %v: %w", destSpec.Name, err)
for i, destinationSpec := range destinationSpecs {
if err := initPlugin(ctx, destinationsPbClients[i], destinationSpec.Spec, false); err != nil {
return fmt.Errorf("failed to init destination %v: %w", destinationSpec.Name, err)
}
}
if backend != nil {
backendSpec := backend.spec
backendSpecBytes, err := json.Marshal(backendSpec.Spec)
if err != nil {
return err
}
if _, err := backendPbClient.Init(ctx, &plugin.Init_Request{
Spec: backendSpecBytes,
}); err != nil {
return fmt.Errorf("failed to init backend %v: %w", backendSpec.Name, err)
if err := initPlugin(ctx, backendPbClient, backend.spec.Spec, false); err != nil {
return fmt.Errorf("failed to init backend %v: %w", backend.spec.Name, err)
}
}

Expand All @@ -165,14 +151,8 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des
return fmt.Errorf("failed to unmarshal source spec JSON after variable replacement: %w", err)
}

sourceSpecBytes, err := json.Marshal(sourceSpec.Spec)
if err != nil {
return err
}
if _, err := sourcePbClient.Init(ctx, &plugin.Init_Request{
Spec: sourceSpecBytes,
}); err != nil {
return err
if err = initPlugin(ctx, sourcePbClient, sourceSpec.Spec, false); err != nil {
return fmt.Errorf("failed to init source %v: %w", sourceSpec.Name, err)
}

writeClients := make([]plugin.Plugin_WriteClient, len(destinationsPbClients))
Expand Down
4 changes: 1 addition & 3 deletions cli/cmd/tables_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ func tablesV3(ctx context.Context, sourceClient *managedplugin.Client, path stri
return err
}
sourcePbClient := pluginPb.NewPluginClient(sourceClient.Conn)
if _, err := sourcePbClient.Init(ctx, &pluginPb.Init_Request{
NoConnection: true,
}); err != nil {
if err := initPlugin(ctx, sourcePbClient, nil, true); err != nil {
return fmt.Errorf("failed to init source: %w", err)
}
getTablesResp, err := sourcePbClient.GetTables(ctx, &pluginPb.GetTables_Request{
Expand Down