diff --git a/cli/cmd/doc_test.go b/cli/cmd/doc_test.go
index 488f1df1316ded..8e09336906fd6f 100644
--- a/cli/cmd/doc_test.go
+++ b/cli/cmd/doc_test.go
@@ -31,6 +31,7 @@ var docFiles = []string{
"cloudquery_plugin.md",
"cloudquery_plugin_install.md",
"cloudquery_plugin_publish.md",
+ "cloudquery_plugin_spec-schema.md",
"cloudquery_switch.md",
}
diff --git a/cli/cmd/plugin_spec_schema.go b/cli/cmd/plugin_spec_schema.go
new file mode 100644
index 00000000000000..8c52dd84b26433
--- /dev/null
+++ b/cli/cmd/plugin_spec_schema.go
@@ -0,0 +1,168 @@
+package cmd
+
+import (
+ "errors"
+ "fmt"
+ "os"
+ "path/filepath"
+
+ cqapiauth "github.com/cloudquery/cloudquery-api-go/auth"
+ "github.com/cloudquery/cloudquery/cli/v6/internal/auth"
+ "github.com/cloudquery/cloudquery/cli/v6/internal/hub"
+ "github.com/cloudquery/plugin-pb-go/managedplugin"
+ "github.com/cloudquery/plugin-pb-go/pb/plugin/v3"
+ "github.com/rs/zerolog/log"
+ "github.com/spf13/cobra"
+)
+
+const (
+ pluginSpecSchemaShort = "Export a plugin's spec JSON schema."
+ pluginSpecSchemaLong = `Export a plugin's spec JSON schema.
+
+Only plugins published to the CloudQuery hub are supported. registry: local,
+registry: grpc, and registry: docker plugins are not exportable because they
+have no stable (path, version) identity to anchor the generated filename.
+For those registries the plugin binary is already accessible locally, so
+'cloudquery validate-config' can validate them in-place without --schemas-dir.
+
+Without --schemas-dir the schema is printed to stdout. With --schemas-dir the
+schema is written to
/@.json, which is the
+filename format expected by ` + "`cloudquery validate-config --schemas-dir`" + `.
+Including the version in the filename ensures validation always runs against
+the schema matching the plugin version in the config.`
+ pluginSpecSchemaExample = `
+# Print schema to stdout
+cloudquery plugin spec-schema cloudquery/source/aws@v33.0.0
+
+# Write to ./schemas/aws@v33.0.0.json
+cloudquery plugin spec-schema cloudquery/source/aws@v33.0.0 -D ./schemas`
+)
+
+func newCmdPluginSpecSchema() *cobra.Command {
+ cmd := &cobra.Command{
+ Use: "spec-schema //@",
+ Short: pluginSpecSchemaShort,
+ Long: pluginSpecSchemaLong,
+ Example: pluginSpecSchemaExample,
+ Args: cobra.ExactArgs(1),
+ RunE: runPluginSpecSchema,
+ }
+ cmd.Flags().StringP("schemas-dir", "D", "", "Write schema to /@.json. If omitted, the schema is printed to stdout.")
+ return cmd
+}
+
+func runPluginSpecSchema(cmd *cobra.Command, args []string) error {
+ schemasDir, err := cmd.Flags().GetString("schemas-dir")
+ if err != nil {
+ return err
+ }
+ cqDir, err := cmd.Flags().GetString("cq-dir")
+ if err != nil {
+ return err
+ }
+
+ ref, err := hub.ParseHubPluginRef(args[0])
+ if err != nil {
+ return err
+ }
+
+ pluginType, err := pluginTypeFromKind(ref.Kind)
+ if err != nil {
+ return err
+ }
+
+ ctx := cmd.Context()
+
+ // Only registry: cloudquery is supported. The hub-ref input format already
+ // constrains us to this registry, but the hardcoded value below is the
+ // single source of truth — do not add a flag that lets callers select
+ // local / grpc / docker without first defining how the exported filename
+ // (which is keyed on a stable name+version) should be derived for those.
+ pluginCfg := managedplugin.Config{
+ Name: ref.Name,
+ Version: ref.Version,
+ Path: fmt.Sprintf("%s/%s", ref.TeamName, ref.Name),
+ Registry: managedplugin.RegistryCloudQuery,
+ }
+
+ // CloudQuery-registry plugins always need an auth token.
+ tc := cqapiauth.NewTokenClient()
+ authToken, err := tc.GetToken()
+ if err != nil {
+ return fmt.Errorf("failed to get auth token: %w", err)
+ }
+ teamName, err := auth.GetTeamForToken(ctx, authToken)
+ if err != nil {
+ return fmt.Errorf("failed to get team name: %w", err)
+ }
+
+ opts := []managedplugin.Option{
+ managedplugin.WithLogger(log.Logger),
+ managedplugin.WithAuthToken(authToken.Value),
+ managedplugin.WithTeamName(teamName),
+ }
+ if logConsole {
+ opts = append(opts, managedplugin.WithNoProgress())
+ }
+ if cqDir != "" {
+ opts = append(opts, managedplugin.WithDirectory(cqDir))
+ }
+ if disableSentry {
+ opts = append(opts, managedplugin.WithNoSentry())
+ }
+
+ clients, err := managedplugin.NewClients(ctx, pluginType, []managedplugin.Config{pluginCfg}, opts...)
+ if err != nil {
+ return enrichClientError(clients, []bool{false}, err)
+ }
+ defer func() {
+ if err := clients.Terminate(); err != nil {
+ fmt.Println(err)
+ }
+ }()
+ if len(clients) == 0 {
+ return errors.New("plugin client not initialized")
+ }
+
+ pluginClient := plugin.NewPluginClient(clients[0].Conn)
+ jsonSchema, err := getSpecSchemaFromPlugin(ctx, pluginClient)
+ if err != nil {
+ return fmt.Errorf("failed to fetch spec schema: %w", err)
+ }
+ if len(jsonSchema) == 0 {
+ return fmt.Errorf("plugin %s did not return a spec schema", ref.String())
+ }
+
+ return writeSchemaOutput(jsonSchema, ref.Name, ref.Version, schemasDir)
+}
+
+func pluginTypeFromKind(kind string) (managedplugin.PluginType, error) {
+ switch kind {
+ case "source":
+ return managedplugin.PluginSource, nil
+ case "destination":
+ return managedplugin.PluginDestination, nil
+ default:
+ return 0, fmt.Errorf("unsupported plugin kind %q (expected source or destination)", kind)
+ }
+}
+
+func writeSchemaOutput(jsonSchema, pluginName, pluginVersion, schemasDir string) error {
+ if schemasDir == "" {
+ _, err := fmt.Print(jsonSchema)
+ return err
+ }
+ if err := os.MkdirAll(schemasDir, 0o755); err != nil {
+ return err
+ }
+ return os.WriteFile(filepath.Join(schemasDir, schemaFileName(pluginName, pluginVersion)), []byte(jsonSchema), 0o644)
+}
+
+// schemaFileName returns the canonical filename for a plugin's schema under --schemas-dir.
+// Version is included whenever non-empty so consumers can pin validation to the right plugin version.
+func schemaFileName(pluginName, pluginVersion string) string {
+ if pluginVersion == "" {
+ return pluginName + ".json"
+ }
+ return pluginName + "@" + pluginVersion + ".json"
+}
diff --git a/cli/cmd/plugin_spec_schema_test.go b/cli/cmd/plugin_spec_schema_test.go
new file mode 100644
index 00000000000000..29ddea957c77d3
--- /dev/null
+++ b/cli/cmd/plugin_spec_schema_test.go
@@ -0,0 +1,49 @@
+package cmd
+
+import (
+ "os"
+ "path"
+ "testing"
+
+ "github.com/cloudquery/plugin-pb-go/managedplugin"
+ "github.com/stretchr/testify/require"
+)
+
+func TestPluginTypeFromKind(t *testing.T) {
+ src, err := pluginTypeFromKind("source")
+ require.NoError(t, err)
+ require.Equal(t, managedplugin.PluginSource, src)
+
+ dst, err := pluginTypeFromKind("destination")
+ require.NoError(t, err)
+ require.Equal(t, managedplugin.PluginDestination, dst)
+
+ _, err = pluginTypeFromKind("transformer")
+ require.Error(t, err)
+}
+
+func TestSchemaFileName(t *testing.T) {
+ require.Equal(t, "aws@v33.0.0.json", schemaFileName("aws", "v33.0.0"))
+ require.Equal(t, "aws.json", schemaFileName("aws", ""))
+}
+
+func TestWriteSchemaOutput(t *testing.T) {
+ const schema = `{"type":"object"}`
+
+ t.Run("to schemas dir with versioned name", func(t *testing.T) {
+ dir := t.TempDir()
+ sub := path.Join(dir, "nested")
+ require.NoError(t, writeSchemaOutput(schema, "aws", "v33.0.0", sub))
+ got, err := os.ReadFile(path.Join(sub, "aws@v33.0.0.json"))
+ require.NoError(t, err)
+ require.Equal(t, schema, string(got))
+ })
+
+ t.Run("to schemas dir without version falls back to unversioned name", func(t *testing.T) {
+ dir := t.TempDir()
+ require.NoError(t, writeSchemaOutput(schema, "aws", "", dir))
+ got, err := os.ReadFile(path.Join(dir, "aws.json"))
+ require.NoError(t, err)
+ require.Equal(t, schema, string(got))
+ })
+}
diff --git a/cli/cmd/root.go b/cli/cmd/root.go
index 199687a968e3c4..7e1019e5b6255c 100644
--- a/cli/cmd/root.go
+++ b/cli/cmd/root.go
@@ -190,6 +190,7 @@ func NewCmdRoot() *cobra.Command {
pluginCmd.AddCommand(
newCmdPluginInstall(false),
newCmdPluginPublish(),
+ newCmdPluginSpecSchema(),
pluginDocCmd,
pluginUIAssetsCmd,
)
diff --git a/cli/cmd/specs.go b/cli/cmd/specs.go
index 0bfeb7633fa2a1..62df10115665aa 100644
--- a/cli/cmd/specs.go
+++ b/cli/cmd/specs.go
@@ -115,22 +115,37 @@ func initPlugin(ctx context.Context, client plugin.PluginClient, spec map[string
// 3. If the schema isn't empty but not valid, print the error message & skip the validation.
// 4. Finally, return the validation result.
func validatePluginSpec(ctx context.Context, client plugin.PluginClient, spec any) error {
+ jsonSchema, err := getSpecSchemaFromPlugin(ctx, client)
+ if err != nil {
+ return err
+ }
+ return validateSpecAgainstSchema(jsonSchema, spec)
+}
+
+func getSpecSchemaFromPlugin(ctx context.Context, client plugin.PluginClient) (string, error) {
schema, err := client.GetSpecSchema(ctx, &plugin.GetSpecSchema_Request{})
if err != nil {
st, ok := status.FromError(err)
if !ok {
// not a gRPC-compatible error
log.Err(err).Msg("failed to get spec schema")
- return err
+ return "", err
}
if st.Code() != codes.Unimplemented {
// unimplemented is OK, treat as empty schema
log.Err(err).Msg("failed to get spec schema")
- return err
+ return "", err
}
}
+ return schema.GetJsonSchema(), nil
+}
- jsonSchema := schema.GetJsonSchema()
+// validateSpecAgainstSchema validates spec against a plugin-supplied JSON schema.
+// Intended for schemas obtained from a running plugin over gRPC: a malformed or empty
+// schema is treated as "skip validation" so a buggy plugin can't block sync/validate-config.
+// For user-supplied schema files use validateSpecAgainstSchemaStrict instead, which fails
+// loudly on a malformed schema rather than silently passing.
+func validateSpecAgainstSchema(jsonSchema string, spec any) error {
if len(jsonSchema) == 0 {
// This will also be true for Unimplemented response (schema = nil => schema.GetJsonSchema() = "")
log.Info().Msg("empty JSON schema for plugin spec, skipping validation")
@@ -146,6 +161,22 @@ func validatePluginSpec(ctx context.Context, client plugin.PluginClient, spec an
return sc.Validate(spec)
}
+// validateSpecAgainstSchemaStrict validates spec against a JSON schema and treats parse
+// failures as errors. Use this when the schema is authoritative (e.g. a local file
+// passed via --schemas-dir) so a corrupt or empty schema cannot produce a false pass.
+func validateSpecAgainstSchemaStrict(jsonSchema string, spec any) error {
+ if len(jsonSchema) == 0 {
+ return errors.New("schema is empty")
+ }
+
+ sc, err := parseJSONSchema(jsonSchema)
+ if err != nil {
+ return fmt.Errorf("failed to parse JSON schema: %w", err)
+ }
+
+ return sc.Validate(spec)
+}
+
func parseJSONSchema(jsonSchema string) (*jsonschema.Schema, error) {
c := jsonschema.NewCompiler()
c.DefaultDraft(jsonschema.Draft2020)
diff --git a/cli/cmd/specs_test.go b/cli/cmd/specs_test.go
new file mode 100644
index 00000000000000..f0cfc256e8225a
--- /dev/null
+++ b/cli/cmd/specs_test.go
@@ -0,0 +1,40 @@
+package cmd
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestValidateSpecAgainstSchema_LenientForBuggyPlugins(t *testing.T) {
+ // Empty schema (e.g. plugin returned Unimplemented over gRPC) is treated as skip.
+ require.NoError(t, validateSpecAgainstSchema("", map[string]any{}))
+ // Unparseable schema is logged and skipped (lenient path used for plugin gRPC results).
+ require.NoError(t, validateSpecAgainstSchema(`{not-valid-json`, map[string]any{}))
+}
+
+func TestValidateSpecAgainstSchemaStrict_FailsOnBadSchemaAndBadSpec(t *testing.T) {
+ const goodSchema = `{
+ "$schema": "https://json-schema.org/draft/2020-12/schema",
+ "type": "object",
+ "properties": {"field": {"type": "string"}},
+ "required": ["field"],
+ "additionalProperties": false
+ }`
+
+ // Good spec passes.
+ require.NoError(t, validateSpecAgainstSchemaStrict(goodSchema, map[string]any{"field": "ok"}))
+
+ // Bad spec is rejected.
+ err := validateSpecAgainstSchemaStrict(goodSchema, map[string]any{"field": 42})
+ require.Error(t, err)
+
+ // Corrupt schema is rejected, NOT silently passed.
+ err = validateSpecAgainstSchemaStrict(`{not-valid-json`, map[string]any{})
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "failed to parse JSON schema")
+
+ // Empty schema is rejected, NOT silently passed.
+ err = validateSpecAgainstSchemaStrict("", map[string]any{})
+ require.Error(t, err)
+}
diff --git a/cli/cmd/testdata/schemas-dir/dst.json b/cli/cmd/testdata/schemas-dir/dst.json
new file mode 100644
index 00000000000000..13de23e823755d
--- /dev/null
+++ b/cli/cmd/testdata/schemas-dir/dst.json
@@ -0,0 +1,4 @@
+{
+ "$schema": "https://json-schema.org/draft/2020-12/schema",
+ "type": ["object", "null"]
+}
diff --git a/cli/cmd/testdata/schemas-dir/src.json b/cli/cmd/testdata/schemas-dir/src.json
new file mode 100644
index 00000000000000..1b3a9dc0a0a225
--- /dev/null
+++ b/cli/cmd/testdata/schemas-dir/src.json
@@ -0,0 +1,8 @@
+{
+ "$schema": "https://json-schema.org/draft/2020-12/schema",
+ "type": ["object", "null"],
+ "additionalProperties": false,
+ "properties": {
+ "field": { "type": "string" }
+ }
+}
diff --git a/cli/cmd/testdata/validate-config-schemas-dir-bad.yml b/cli/cmd/testdata/validate-config-schemas-dir-bad.yml
new file mode 100644
index 00000000000000..0fee47c8d6dede
--- /dev/null
+++ b/cli/cmd/testdata/validate-config-schemas-dir-bad.yml
@@ -0,0 +1,15 @@
+kind: source
+spec:
+ name: src
+ path: ./nonexistent-src-binary
+ registry: local
+ destinations: [dst]
+ tables: ["*"]
+ spec:
+ bogus: field
+---
+kind: destination
+spec:
+ name: dst
+ path: ./nonexistent-dst-binary
+ registry: local
diff --git a/cli/cmd/testdata/validate-config-schemas-dir.yml b/cli/cmd/testdata/validate-config-schemas-dir.yml
new file mode 100644
index 00000000000000..a9e48808087f99
--- /dev/null
+++ b/cli/cmd/testdata/validate-config-schemas-dir.yml
@@ -0,0 +1,13 @@
+kind: source
+spec:
+ name: src
+ path: ./nonexistent-src-binary
+ registry: local
+ destinations: [dst]
+ tables: ["*"]
+---
+kind: destination
+spec:
+ name: dst
+ path: ./nonexistent-dst-binary
+ registry: local
diff --git a/cli/cmd/validate_config.go b/cli/cmd/validate_config.go
index 1a90a904402b3c..c393525f08a414 100644
--- a/cli/cmd/validate_config.go
+++ b/cli/cmd/validate_config.go
@@ -3,6 +3,9 @@ package cmd
import (
"errors"
"fmt"
+ "io/fs"
+ "os"
+ "path/filepath"
"strings"
"github.com/cloudquery/cloudquery/cli/v6/internal/auth"
@@ -20,6 +23,8 @@ const (
cloudquery validate-config ./directory
# Validate configs from directories and files
cloudquery validate-config ./directory ./aws.yml ./pg.yml
+# Validate fully offline using locally-stored plugin JSON schemas
+cloudquery validate-config --schemas-dir ./schemas ./aws.yml
`
)
@@ -33,6 +38,7 @@ func newCmdValidateConfig() *cobra.Command {
RunE: validateConfig,
Hidden: false,
}
+ cmd.Flags().String("schemas-dir", "", "Directory of pre-fetched .json schema files. Plugins with a matching file are validated offline (no plugin spawn, no auth). Use 'cloudquery plugin spec-schema' to generate these files.")
return cmd
}
@@ -42,6 +48,10 @@ func validateConfig(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
+ schemasDir, err := cmd.Flags().GetString("schemas-dir")
+ if err != nil {
+ return err
+ }
ctx := cmd.Context()
@@ -54,75 +64,147 @@ func validateConfig(cmd *cobra.Command, args []string) error {
sources := specReader.Sources
destinations := specReader.Destinations
- authToken, err := auth.GetAuthTokenIfNeeded(log.Logger, sources, destinations, nil)
- if err != nil {
- return fmt.Errorf("failed to get auth token: %w", err)
- }
- teamName, err := auth.GetTeamForToken(ctx, authToken)
- if err != nil {
- return fmt.Errorf("failed to get team name: %w", err)
- }
- opts := []managedplugin.Option{
- managedplugin.WithLogger(log.Logger),
- managedplugin.WithAuthToken(authToken.Value),
- managedplugin.WithTeamName(teamName),
- }
- if logConsole {
- opts = append(opts, managedplugin.WithNoProgress())
- }
- if cqDir != "" {
- opts = append(opts, managedplugin.WithDirectory(cqDir))
- }
- if disableSentry {
- opts = append(opts, managedplugin.WithNoSentry())
+ // Resolve local schema files when --schemas-dir is set. Empty string means "no file, spawn plugin".
+ sourceSchemaFiles := make([]string, len(sources))
+ destinationSchemaFiles := make([]string, len(destinations))
+ if schemasDir != "" {
+ for i, source := range sources {
+ sourceSchemaFiles[i], err = lookupSchemaFile(schemasDir, source.Name, source.Version)
+ if err != nil {
+ return err
+ }
+ }
+ for i, destination := range destinations {
+ destinationSchemaFiles[i], err = lookupSchemaFile(schemasDir, destination.Name, destination.Version)
+ if err != nil {
+ return err
+ }
+ }
}
- sourcePluginConfigs := make([]managedplugin.Config, len(sources))
- sourceRegInferred := make([]bool, len(sources))
+ // Partition plugin spawn list to those without a local schema file.
+ sourcePluginConfigs := make([]managedplugin.Config, 0, len(sources))
+ sourcePluginIdx := make([]int, 0, len(sources))
+ sourceRegInferred := make([]bool, 0, len(sources))
+ sourcesNeedingPlugin := make([]*specs.Source, 0, len(sources))
for i, source := range sources {
- sourcePluginConfigs[i] = managedplugin.Config{
+ if sourceSchemaFiles[i] != "" {
+ continue
+ }
+ sourcePluginConfigs = append(sourcePluginConfigs, managedplugin.Config{
Name: source.Name,
Version: source.Version,
Path: source.Path,
Registry: SpecRegistryToPlugin(source.Registry),
DockerAuth: source.DockerRegistryAuthToken,
- }
- sourceRegInferred[i] = source.RegistryInferred()
+ })
+ sourcePluginIdx = append(sourcePluginIdx, i)
+ sourceRegInferred = append(sourceRegInferred, source.RegistryInferred())
+ sourcesNeedingPlugin = append(sourcesNeedingPlugin, source)
}
- destinationPluginConfigs := make([]managedplugin.Config, len(destinations))
- destinationRegInferred := make([]bool, len(destinations))
+ destinationPluginConfigs := make([]managedplugin.Config, 0, len(destinations))
+ destinationPluginIdx := make([]int, 0, len(destinations))
+ destinationRegInferred := make([]bool, 0, len(destinations))
+ destinationsNeedingPlugin := make([]*specs.Destination, 0, len(destinations))
for i, destination := range destinations {
- destinationPluginConfigs[i] = managedplugin.Config{
+ if destinationSchemaFiles[i] != "" {
+ continue
+ }
+ destinationPluginConfigs = append(destinationPluginConfigs, managedplugin.Config{
Name: destination.Name,
Version: destination.Version,
Path: destination.Path,
Registry: SpecRegistryToPlugin(destination.Registry),
DockerAuth: destination.DockerRegistryAuthToken,
- }
- destinationRegInferred[i] = destination.RegistryInferred()
+ })
+ destinationPluginIdx = append(destinationPluginIdx, i)
+ destinationRegInferred = append(destinationRegInferred, destination.RegistryInferred())
+ destinationsNeedingPlugin = append(destinationsNeedingPlugin, destination)
}
- sourceClients, err := managedplugin.NewClients(ctx, managedplugin.PluginSource, sourcePluginConfigs, opts...)
- if err != nil {
- return enrichClientError(sourceClients, sourceRegInferred, err)
+ var sourceClients, destinationClients managedplugin.Clients
+ if len(sourcePluginConfigs) > 0 || len(destinationPluginConfigs) > 0 {
+ authToken, err := auth.GetAuthTokenIfNeeded(log.Logger, sourcesNeedingPlugin, destinationsNeedingPlugin, nil)
+ if err != nil {
+ return fmt.Errorf("failed to get auth token: %w", err)
+ }
+ teamName, err := auth.GetTeamForToken(ctx, authToken)
+ if err != nil {
+ return fmt.Errorf("failed to get team name: %w", err)
+ }
+ opts := []managedplugin.Option{
+ managedplugin.WithLogger(log.Logger),
+ managedplugin.WithAuthToken(authToken.Value),
+ managedplugin.WithTeamName(teamName),
+ }
+ if logConsole {
+ opts = append(opts, managedplugin.WithNoProgress())
+ }
+ if cqDir != "" {
+ opts = append(opts, managedplugin.WithDirectory(cqDir))
+ }
+ if disableSentry {
+ opts = append(opts, managedplugin.WithNoSentry())
+ }
+
+ sourceClients, err = managedplugin.NewClients(ctx, managedplugin.PluginSource, sourcePluginConfigs, opts...)
+ if err != nil {
+ return enrichClientError(sourceClients, sourceRegInferred, err)
+ }
+ defer func() {
+ if err := sourceClients.Terminate(); err != nil {
+ fmt.Println(err)
+ }
+ }()
+ destinationClients, err = managedplugin.NewClients(ctx, managedplugin.PluginDestination, destinationPluginConfigs, opts...)
+ if err != nil {
+ return enrichClientError(destinationClients, destinationRegInferred, err)
+ }
+ defer func() {
+ if err := destinationClients.Terminate(); err != nil {
+ fmt.Println(err)
+ }
+ }()
}
- defer func() {
- if err := sourceClients.Terminate(); err != nil {
- fmt.Println(err)
+
+ var initErrors []error
+ // File-based validation (offline; no plugin spawn).
+ for i, source := range sources {
+ if sourceSchemaFiles[i] == "" {
+ continue
+ }
+ log.Info().Str("source", source.VersionString()).Str("schema", sourceSchemaFiles[i]).Msg("Validating source against local schema")
+ schemaBytes, err := os.ReadFile(sourceSchemaFiles[i])
+ if err != nil {
+ initErrors = append(initErrors, fmt.Errorf("failed to read schema file for source %v: %w", source.VersionString(), err))
+ continue
+ }
+ if err := validateSpecAgainstSchemaStrict(string(schemaBytes), source.Spec); err != nil {
+ initErrors = append(initErrors, fmt.Errorf("failed to validate source config %v: %w", source.VersionString(), err))
+ } else {
+ log.Info().Str("source", source.VersionString()).Msg("validated successfully")
}
- }()
- destinationClients, err := managedplugin.NewClients(ctx, managedplugin.PluginDestination, destinationPluginConfigs, opts...)
- if err != nil {
- return enrichClientError(destinationClients, destinationRegInferred, err)
}
- defer func() {
- if err := destinationClients.Terminate(); err != nil {
- fmt.Println(err)
+ for i, destination := range destinations {
+ if destinationSchemaFiles[i] == "" {
+ continue
}
- }()
+ log.Info().Str("destination", destination.VersionString()).Str("schema", destinationSchemaFiles[i]).Msg("Validating destination against local schema")
+ schemaBytes, err := os.ReadFile(destinationSchemaFiles[i])
+ if err != nil {
+ initErrors = append(initErrors, fmt.Errorf("failed to read schema file for destination %v: %w", destination.VersionString(), err))
+ continue
+ }
+ if err := validateSpecAgainstSchemaStrict(string(schemaBytes), destination.Spec); err != nil {
+ initErrors = append(initErrors, fmt.Errorf("failed to validate destination config %v: %w", destination.VersionString(), err))
+ } else {
+ log.Info().Str("destination", destination.VersionString()).Msg("validated successfully")
+ }
+ }
- var initErrors []error
- for i, client := range sourceClients {
+ // Plugin-based validation for entries without a local schema file.
+ for ci, client := range sourceClients {
+ i := sourcePluginIdx[ci]
pluginClient := plugin.NewPluginClient(client.Conn)
log.Info().Str("source", sources[i].VersionString()).Msg("Initializing source")
err := validatePluginSpec(ctx, pluginClient, sources[i].Spec)
@@ -132,7 +214,8 @@ func validateConfig(cmd *cobra.Command, args []string) error {
log.Info().Str("source", sources[i].VersionString()).Msg("validated successfully")
}
}
- for i, client := range destinationClients {
+ for ci, client := range destinationClients {
+ i := destinationPluginIdx[ci]
pluginClient := plugin.NewPluginClient(client.Conn)
log.Info().Str("destination", destinations[i].VersionString()).Msg("Initializing destination")
err = validatePluginSpec(ctx, pluginClient, destinations[i].Spec)
@@ -145,3 +228,38 @@ func validateConfig(cmd *cobra.Command, args []string) error {
return errors.Join(initErrors...)
}
+
+// lookupSchemaFile resolves a plugin's pre-fetched schema file under dir.
+// Prefers @.json so validation can pin to the configured plugin version,
+// falling back to .json when version is empty (e.g. for registry: local) or when
+// only the unversioned file exists. Returns "" with no error when neither file exists;
+// surfaces all other os.Stat errors (e.g. permission denied) so the caller cannot
+// silently fall back to the online path on an unreadable file.
+func lookupSchemaFile(dir, name, version string) (string, error) {
+ if dir == "" {
+ return "", nil
+ }
+ // Reject spec names that could escape dir via path traversal. Plugin spec
+ // names are simple identifiers in practice, so anything containing a
+ // separator or '..' is treated as not-found rather than silently rebased.
+ if name == "" || name == ".." || strings.ContainsAny(name, `/\`) {
+ return "", nil
+ }
+
+ candidates := make([]string, 0, 2)
+ if version != "" {
+ candidates = append(candidates, filepath.Join(dir, name+"@"+version+".json"))
+ }
+ candidates = append(candidates, filepath.Join(dir, name+".json"))
+
+ for _, p := range candidates {
+ _, err := os.Stat(p)
+ if err == nil {
+ return p, nil
+ }
+ if !errors.Is(err, fs.ErrNotExist) {
+ return "", fmt.Errorf("failed to stat schema file %s: %w", p, err)
+ }
+ }
+ return "", nil
+}
diff --git a/cli/cmd/validate_config_test.go b/cli/cmd/validate_config_test.go
index 4feb77136f83b4..3fc23e88b9bf71 100644
--- a/cli/cmd/validate_config_test.go
+++ b/cli/cmd/validate_config_test.go
@@ -54,3 +54,76 @@ func TestValidateConfig(t *testing.T) {
})
}
}
+
+func TestValidateConfigSchemasDir(t *testing.T) {
+ _, filename, _, _ := runtime.Caller(0)
+ currentDir := path.Dir(filename)
+ schemasDir := path.Join(currentDir, "testdata", "schemas-dir")
+
+ t.Run("good spec validates offline without plugin spawn", func(t *testing.T) {
+ cmd := NewCmdRoot()
+ testConfig := path.Join(currentDir, "testdata", "validate-config-schemas-dir.yml")
+ baseArgs := testCommandArgs(t)
+
+ args := append([]string{"validate-config", testConfig, "--schemas-dir", schemasDir}, baseArgs...)
+ cmd.SetArgs(args)
+ err := cmd.Execute()
+ require.NoError(t, err)
+
+ b, logFileError := os.ReadFile(baseArgs[3])
+ require.NoError(t, logFileError, "failed to read cloudquery.log")
+ logContent := string(b)
+ require.Contains(t, logContent, "Validating source against local schema")
+ require.Contains(t, logContent, "Validating destination against local schema")
+ // No plugin spawn happened, so no "Initializing source/destination" lines.
+ require.NotContains(t, logContent, "Initializing source")
+ require.NotContains(t, logContent, "Initializing destination")
+ })
+
+ t.Run("spec violating schema fails offline", func(t *testing.T) {
+ cmd := NewCmdRoot()
+ testConfig := path.Join(currentDir, "testdata", "validate-config-schemas-dir-bad.yml")
+ baseArgs := testCommandArgs(t)
+
+ args := append([]string{"validate-config", testConfig, "--schemas-dir", schemasDir}, baseArgs...)
+ cmd.SetArgs(args)
+ err := cmd.Execute()
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "failed to validate source config src")
+ })
+}
+
+func TestLookupSchemaFile(t *testing.T) {
+ dir := t.TempDir()
+ unversioned := path.Join(dir, "aws.json")
+ versioned := path.Join(dir, "aws@v33.0.0.json")
+ require.NoError(t, os.WriteFile(unversioned, []byte("{}"), 0o644))
+ require.NoError(t, os.WriteFile(versioned, []byte("{}"), 0o644))
+
+ check := func(t *testing.T, wantPath string, gotPath string, err error) {
+ t.Helper()
+ require.NoError(t, err)
+ require.Equal(t, wantPath, gotPath)
+ }
+
+ // Versioned file takes precedence when both exist.
+ got, err := lookupSchemaFile(dir, "aws", "v33.0.0")
+ check(t, versioned, got, err)
+ // Falls back to unversioned when version-specific file is missing.
+ got, err = lookupSchemaFile(dir, "aws", "v99.0.0")
+ check(t, unversioned, got, err)
+ // Empty version uses unversioned name (e.g. registry: local without a version).
+ got, err = lookupSchemaFile(dir, "aws", "")
+ check(t, unversioned, got, err)
+ // Unknown plugin returns empty.
+ got, err = lookupSchemaFile(dir, "gcp", "v1.0.0")
+ check(t, "", got, err)
+ // Empty dir returns empty.
+ got, err = lookupSchemaFile("", "aws", "v33.0.0")
+ check(t, "", got, err)
+ // Path-traversal-shaped names are rejected as not-found.
+ got, err = lookupSchemaFile(dir, "../aws", "v33.0.0")
+ check(t, "", got, err)
+ got, err = lookupSchemaFile(dir, "..", "")
+ check(t, "", got, err)
+}
diff --git a/cli/docs/reference/cloudquery_plugin.md b/cli/docs/reference/cloudquery_plugin.md
index 34451d9f1766cd..e67457e1a1a02c 100644
--- a/cli/docs/reference/cloudquery_plugin.md
+++ b/cli/docs/reference/cloudquery_plugin.md
@@ -30,6 +30,7 @@ Plugin commands
* [cloudquery](/cli/cli-reference/cloudquery) - CloudQuery CLI
* [cloudquery plugin install](/cli/cli-reference/cloudquery_plugin_install) - Install required plugin images from your configuration
* [cloudquery plugin publish](/cli/cli-reference/cloudquery_plugin_publish) - Publish to CloudQuery Hub.
+* [cloudquery plugin spec-schema](/cli/cli-reference/cloudquery_plugin_spec-schema) - Export a plugin's spec JSON schema.
- [Integration Concepts](/cli/core-concepts/integrations) - How integrations work
- [Managing Versions](/cli/advanced/managing-versions) - Integration versioning
diff --git a/cli/docs/reference/cloudquery_plugin_spec-schema.md b/cli/docs/reference/cloudquery_plugin_spec-schema.md
new file mode 100644
index 00000000000000..3a515bf5013e16
--- /dev/null
+++ b/cli/docs/reference/cloudquery_plugin_spec-schema.md
@@ -0,0 +1,63 @@
+---
+title: "plugin_spec-schema"
+---
+# cloudquery plugin spec-schema
+
+Export a plugin's spec JSON schema.
+
+## Synopsis
+
+Export a plugin's spec JSON schema.
+
+Only plugins published to the CloudQuery hub are supported. registry: local,
+registry: grpc, and registry: docker plugins are not exportable because they
+have no stable (path, version) identity to anchor the generated filename.
+For those registries the plugin binary is already accessible locally, so
+'cloudquery validate-config' can validate them in-place without --schemas-dir.
+
+Without --schemas-dir the schema is printed to stdout. With --schemas-dir the
+schema is written to /@.json, which is the
+filename format expected by `cloudquery validate-config --schemas-dir`.
+Including the version in the filename ensures validation always runs against
+the schema matching the plugin version in the config.
+
+```
+cloudquery plugin spec-schema //@ [flags]
+```
+
+## Examples
+
+```
+
+# Print schema to stdout
+cloudquery plugin spec-schema cloudquery/source/aws@v33.0.0
+
+# Write to ./schemas/aws@v33.0.0.json
+cloudquery plugin spec-schema cloudquery/source/aws@v33.0.0 -D ./schemas
+```
+
+## Options
+
+```
+ -h, --help help for spec-schema
+ -D, --schemas-dir string Write schema to /@.json. If omitted, the schema is printed to stdout.
+```
+
+## Options inherited from parent commands
+
+```
+ --cq-dir string directory to store cloudquery files, such as downloaded plugins (default ".cq")
+ --invocation-id uuid useful for when using Open Telemetry integration for tracing and logging to be able to correlate logs and traces through many services (default )
+ --log-console enable console logging
+ --log-file-name string Log filename (default "cloudquery.log")
+ --log-file-overwrite Overwrite log file on each run instead of appending. Use this if your filesystem does not support append mode (e.g. FUSE-mounted cloud storage).
+ --log-format string Logging format (json, text) (default "text")
+ --log-level string Logging level (trace, debug, info, warn, error) (default "info")
+ --no-log-file Disable logging to file
+ --telemetry-level string Telemetry level (none, errors, stats, all) (default "all")
+```
+
+## See Also
+
+* [cloudquery plugin](/cli/cli-reference/cloudquery_plugin) - Plugin commands
+
diff --git a/cli/docs/reference/cloudquery_validate-config.md b/cli/docs/reference/cloudquery_validate-config.md
index 536445f76a9380..3f9d9c856031b7 100644
--- a/cli/docs/reference/cloudquery_validate-config.md
+++ b/cli/docs/reference/cloudquery_validate-config.md
@@ -20,13 +20,16 @@ cloudquery validate-config [files or directories] [flags]
cloudquery validate-config ./directory
# Validate configs from directories and files
cloudquery validate-config ./directory ./aws.yml ./pg.yml
+# Validate fully offline using locally-stored plugin JSON schemas
+cloudquery validate-config --schemas-dir ./schemas ./aws.yml
```
## Options
```
- -h, --help help for validate-config
+ -h, --help help for validate-config
+ --schemas-dir string Directory of pre-fetched .json schema files. Plugins with a matching file are validated offline (no plugin spawn, no auth). Use 'cloudquery plugin spec-schema' to generate these files.
```
## Options inherited from parent commands