Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 157 additions & 0 deletions cli/cmd/plugin_spec_schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package cmd

import (
"errors"
"fmt"
"os"
"path/filepath"

cqapiauth "github.com/cloudquery/cloudquery-api-go/auth"
"github.com/cloudquery/cloudquery/cli/v6/internal/auth"
"github.com/cloudquery/cloudquery/cli/v6/internal/hub"
"github.com/cloudquery/plugin-pb-go/managedplugin"
"github.com/cloudquery/plugin-pb-go/pb/plugin/v3"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
)

const (
pluginSpecSchemaShort = "Export a plugin's spec JSON schema."
pluginSpecSchemaLong = `Export a plugin's spec JSON schema.

Without --schemas-dir the schema is printed to stdout. With --schemas-dir the
schema is written to <dir>/<plugin-name>@<version>.json, which is the
filename format expected by ` + "`cloudquery validate-config --schemas-dir`" + `.
Including the version in the filename ensures validation always runs against
the schema matching the plugin version in the config.`
pluginSpecSchemaExample = `
# Print schema to stdout
cloudquery plugin spec-schema cloudquery/source/aws@v33.0.0

# Write to ./schemas/aws@v33.0.0.json
cloudquery plugin spec-schema cloudquery/source/aws@v33.0.0 -D ./schemas`
)

func newCmdPluginSpecSchema() *cobra.Command {
cmd := &cobra.Command{
Use: "spec-schema <team_name>/<plugin_kind>/<plugin_name>@<version>",
Short: pluginSpecSchemaShort,
Long: pluginSpecSchemaLong,
Example: pluginSpecSchemaExample,
Args: cobra.ExactArgs(1),
RunE: runPluginSpecSchema,
}
cmd.Flags().StringP("schemas-dir", "D", "", "Write schema to <dir>/<plugin-name>@<version>.json. If omitted, the schema is printed to stdout.")
return cmd
}

func runPluginSpecSchema(cmd *cobra.Command, args []string) error {
schemasDir, err := cmd.Flags().GetString("schemas-dir")
if err != nil {
return err
}
cqDir, err := cmd.Flags().GetString("cq-dir")
if err != nil {
return err
}

ref, err := hub.ParseHubPluginRef(args[0])
if err != nil {
return err
}

pluginType, err := pluginTypeFromKind(ref.Kind)
if err != nil {
return err
}

ctx := cmd.Context()

pluginCfg := managedplugin.Config{
Name: ref.Name,
Version: ref.Version,
Path: fmt.Sprintf("%s/%s", ref.TeamName, ref.Name),
Registry: managedplugin.RegistryCloudQuery,
}

// CloudQuery-registry plugins always need an auth token.
tc := cqapiauth.NewTokenClient()
authToken, err := tc.GetToken()
if err != nil {
return fmt.Errorf("failed to get auth token: %w", err)
}
teamName, err := auth.GetTeamForToken(ctx, authToken)
if err != nil {
return fmt.Errorf("failed to get team name: %w", err)
}

opts := []managedplugin.Option{
managedplugin.WithLogger(log.Logger),
managedplugin.WithAuthToken(authToken.Value),
managedplugin.WithTeamName(teamName),
}
if logConsole {
opts = append(opts, managedplugin.WithNoProgress())
}
if cqDir != "" {
opts = append(opts, managedplugin.WithDirectory(cqDir))
}
if disableSentry {
opts = append(opts, managedplugin.WithNoSentry())
}

clients, err := managedplugin.NewClients(ctx, pluginType, []managedplugin.Config{pluginCfg}, opts...)
if err != nil {
return enrichClientError(clients, []bool{false}, err)
}
defer func() {
if err := clients.Terminate(); err != nil {
fmt.Println(err)
}
}()
if len(clients) == 0 {
return errors.New("plugin client not initialized")
}

pluginClient := plugin.NewPluginClient(clients[0].Conn)
jsonSchema, err := getSpecSchemaFromPlugin(ctx, pluginClient)
if err != nil {
return fmt.Errorf("failed to fetch spec schema: %w", err)
}
if len(jsonSchema) == 0 {
return fmt.Errorf("plugin %s did not return a spec schema", ref.String())
}

return writeSchemaOutput(jsonSchema, ref.Name, ref.Version, schemasDir)
}

func pluginTypeFromKind(kind string) (managedplugin.PluginType, error) {
switch kind {
case "source":
return managedplugin.PluginSource, nil
case "destination":
return managedplugin.PluginDestination, nil
default:
return 0, fmt.Errorf("unsupported plugin kind %q (expected source or destination)", kind)
}
}

func writeSchemaOutput(jsonSchema, pluginName, pluginVersion, schemasDir string) error {
if schemasDir == "" {
_, err := fmt.Print(jsonSchema)
return err
}
if err := os.MkdirAll(schemasDir, 0o755); err != nil {
return err
}
return os.WriteFile(filepath.Join(schemasDir, schemaFileName(pluginName, pluginVersion)), []byte(jsonSchema), 0o644)
}

// schemaFileName returns the canonical filename for a plugin's schema under --schemas-dir.
// Version is included whenever non-empty so consumers can pin validation to the right plugin version.
func schemaFileName(pluginName, pluginVersion string) string {
if pluginVersion == "" {
return pluginName + ".json"
}
return pluginName + "@" + pluginVersion + ".json"
}
49 changes: 49 additions & 0 deletions cli/cmd/plugin_spec_schema_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package cmd

import (
"os"
"path"
"testing"

"github.com/cloudquery/plugin-pb-go/managedplugin"
"github.com/stretchr/testify/require"
)

func TestPluginTypeFromKind(t *testing.T) {
src, err := pluginTypeFromKind("source")
require.NoError(t, err)
require.Equal(t, managedplugin.PluginSource, src)

dst, err := pluginTypeFromKind("destination")
require.NoError(t, err)
require.Equal(t, managedplugin.PluginDestination, dst)

_, err = pluginTypeFromKind("transformer")
require.Error(t, err)
}

func TestSchemaFileName(t *testing.T) {
require.Equal(t, "aws@v33.0.0.json", schemaFileName("aws", "v33.0.0"))
require.Equal(t, "aws.json", schemaFileName("aws", ""))
}

func TestWriteSchemaOutput(t *testing.T) {
const schema = `{"type":"object"}`

t.Run("to schemas dir with versioned name", func(t *testing.T) {
dir := t.TempDir()
sub := path.Join(dir, "nested")
require.NoError(t, writeSchemaOutput(schema, "aws", "v33.0.0", sub))
got, err := os.ReadFile(path.Join(sub, "aws@v33.0.0.json"))
require.NoError(t, err)
require.Equal(t, schema, string(got))
})

t.Run("to schemas dir without version falls back to unversioned name", func(t *testing.T) {
dir := t.TempDir()
require.NoError(t, writeSchemaOutput(schema, "aws", "", dir))
got, err := os.ReadFile(path.Join(dir, "aws.json"))
require.NoError(t, err)
require.Equal(t, schema, string(got))
})
}
1 change: 1 addition & 0 deletions cli/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ func NewCmdRoot() *cobra.Command {
pluginCmd.AddCommand(
newCmdPluginInstall(false),
newCmdPluginPublish(),
newCmdPluginSpecSchema(),
pluginDocCmd,
pluginUIAssetsCmd,
)
Expand Down
16 changes: 13 additions & 3 deletions cli/cmd/specs.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,22 +115,32 @@ func initPlugin(ctx context.Context, client plugin.PluginClient, spec map[string
// 3. If the schema isn't empty but not valid, print the error message & skip the validation.
// 4. Finally, return the validation result.
func validatePluginSpec(ctx context.Context, client plugin.PluginClient, spec any) error {
jsonSchema, err := getSpecSchemaFromPlugin(ctx, client)
if err != nil {
return err
}
return validateSpecAgainstSchema(jsonSchema, spec)
}

func getSpecSchemaFromPlugin(ctx context.Context, client plugin.PluginClient) (string, error) {
schema, err := client.GetSpecSchema(ctx, &plugin.GetSpecSchema_Request{})
if err != nil {
st, ok := status.FromError(err)
if !ok {
// not a gRPC-compatible error
log.Err(err).Msg("failed to get spec schema")
return err
return "", err
}
if st.Code() != codes.Unimplemented {
// unimplemented is OK, treat as empty schema
log.Err(err).Msg("failed to get spec schema")
return err
return "", err
}
}
return schema.GetJsonSchema(), nil
}

jsonSchema := schema.GetJsonSchema()
func validateSpecAgainstSchema(jsonSchema string, spec any) error {
if len(jsonSchema) == 0 {
// This will also be true for Unimplemented response (schema = nil => schema.GetJsonSchema() = "")
Comment thread
disq marked this conversation as resolved.
log.Info().Msg("empty JSON schema for plugin spec, skipping validation")
Expand Down
4 changes: 4 additions & 0 deletions cli/cmd/testdata/schemas-dir/dst.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"type": ["object", "null"]
}
8 changes: 8 additions & 0 deletions cli/cmd/testdata/schemas-dir/src.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"type": ["object", "null"],
"additionalProperties": false,
"properties": {
"field": { "type": "string" }
}
}
15 changes: 15 additions & 0 deletions cli/cmd/testdata/validate-config-schemas-dir-bad.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
kind: source
spec:
name: src
path: ./nonexistent-src-binary
registry: local
destinations: [dst]
tables: ["*"]
spec:
bogus: field
---
kind: destination
spec:
name: dst
path: ./nonexistent-dst-binary
registry: local
13 changes: 13 additions & 0 deletions cli/cmd/testdata/validate-config-schemas-dir.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
kind: source
spec:
name: src
path: ./nonexistent-src-binary
registry: local
destinations: [dst]
tables: ["*"]
---
kind: destination
spec:
name: dst
path: ./nonexistent-dst-binary
registry: local
Loading
Loading