diff --git a/.github/workflows/dest_kafka.yml b/.github/workflows/dest_kafka.yml index 757a413dcf9c4f..52200e35be1fb2 100644 --- a/.github/workflows/dest_kafka.yml +++ b/.github/workflows/dest_kafka.yml @@ -67,6 +67,12 @@ jobs: args: "--config ../../.golangci.yml" skip-pkg-cache: true skip-build-cache: true + - name: gen + if: github.event_name == 'pull_request' + run: make gen + - name: Fail if generation updated files + if: github.event_name == 'pull_request' + run: test "$(git status -s | wc -l)" -eq 0 || (git status -s; exit 1) - name: Build run: go build . - name: Test diff --git a/plugins/destination/kafka/Makefile b/plugins/destination/kafka/Makefile index c957b06653ed16..5c3df7adfed0c8 100644 --- a/plugins/destination/kafka/Makefile +++ b/plugins/destination/kafka/Makefile @@ -7,3 +7,13 @@ test: .PHONY: lint lint: golangci-lint run --config ../../.golangci.yml + +.PHONY: gen-spec-schema +gen-spec-schema: + # required for loading comments from filetypes + go mod vendor + go run client/spec/gen/main.go + +# All gen targets +.PHONY: gen +gen: gen-spec-schema diff --git a/plugins/destination/kafka/client/client.go b/plugins/destination/kafka/client/client.go index e0df2da9db26ae..2cc6fdc731060f 100644 --- a/plugins/destination/kafka/client/client.go +++ b/plugins/destination/kafka/client/client.go @@ -8,6 +8,7 @@ import ( "time" "github.com/Shopify/sarama" + "github.com/cloudquery/cloudquery/plugins/destination/kafka/client/spec" "github.com/cloudquery/filetypes/v4" "github.com/cloudquery/plugin-sdk/v4/plugin" "github.com/rs/zerolog" @@ -20,12 +21,12 @@ type Client struct { producer sarama.SyncProducer logger zerolog.Logger - spec *Spec + spec *spec.Spec *filetypes.Client } -func New(_ context.Context, logger zerolog.Logger, spec []byte, opts plugin.NewClientOptions) (plugin.Client, error) { +func New(_ context.Context, logger zerolog.Logger, s []byte, opts plugin.NewClientOptions) (plugin.Client, error) { c := &Client{ logger: logger.With().Str("module", "dest-kafka").Logger(), } @@ -33,7 +34,7 @@ func New(_ context.Context, logger zerolog.Logger, spec []byte, opts plugin.NewC return c, nil } - if err := json.Unmarshal(spec, &c.spec); err != nil { + if err := json.Unmarshal(s, &c.spec); err != nil { return nil, fmt.Errorf("failed to unmarshal spec: %w", err) } if err := c.spec.Validate(); err != nil { @@ -55,10 +56,10 @@ func New(_ context.Context, logger zerolog.Logger, spec []byte, opts plugin.NewC c.conf.Metadata.Full = true c.conf.ClientID = c.spec.ClientID - if c.spec.SaslUsername != "" { + if c.spec.SASLUsername != "" { c.conf.Net.SASL.Enable = true - c.conf.Net.SASL.User = c.spec.SaslUsername - c.conf.Net.SASL.Password = c.spec.SaslPassword + c.conf.Net.SASL.User = c.spec.SASLUsername + c.conf.Net.SASL.Password = c.spec.SASLPassword c.conf.Net.TLS.Enable = true c.conf.Net.TLS.Config = &tls.Config{InsecureSkipVerify: true} c.conf.Net.SASL.Handshake = true @@ -70,7 +71,7 @@ func New(_ context.Context, logger zerolog.Logger, spec []byte, opts plugin.NewC return nil, err } - filetypesClient, err := filetypes.NewClient(c.spec.FileSpec) + filetypesClient, err := filetypes.NewClient(&c.spec.FileSpec) if err != nil { return nil, fmt.Errorf("failed to create filetypes client: %w", err) } diff --git a/plugins/destination/kafka/client/client_test.go b/plugins/destination/kafka/client/client_test.go index 7c157fcdbfd114..bf2021267bbc48 100644 --- a/plugins/destination/kafka/client/client_test.go +++ b/plugins/destination/kafka/client/client_test.go @@ -7,6 +7,7 @@ import ( "strings" "testing" + "github.com/cloudquery/cloudquery/plugins/destination/kafka/client/spec" "github.com/cloudquery/filetypes/v4" "github.com/cloudquery/plugin-sdk/v4/plugin" ) @@ -26,12 +27,12 @@ func getenv(key, fallback string) string { func TestPlugin(t *testing.T) { ctx := context.Background() p := plugin.NewPlugin("kafka", "development", New) - b, err := json.Marshal(&Spec{ + b, err := json.Marshal(&spec.Spec{ Brokers: strings.Split(getenv("CQ_DEST_KAFKA_CONNECTION_STRING", defaultConnectionString), ","), - SaslUsername: getenv("CQ_DEST_KAFKA_SASL_USERNAME", ""), - SaslPassword: getenv("CQ_DEST_KAFKA_SASL_PASSWORD", ""), + SASLUsername: getenv("CQ_DEST_KAFKA_SASL_USERNAME", ""), + SASLPassword: getenv("CQ_DEST_KAFKA_SASL_PASSWORD", ""), Verbose: true, - FileSpec: &filetypes.FileSpec{Format: filetypes.FormatTypeJSON}, + FileSpec: filetypes.FileSpec{Format: filetypes.FormatTypeJSON}, }) if err != nil { t.Fatal(err) diff --git a/plugins/destination/kafka/client/spec.go b/plugins/destination/kafka/client/spec.go deleted file mode 100644 index b65b4708b20b21..00000000000000 --- a/plugins/destination/kafka/client/spec.go +++ /dev/null @@ -1,47 +0,0 @@ -package client - -import ( - "fmt" - - "github.com/cloudquery/filetypes/v4" -) - -type Spec struct { - Brokers []string `json:"brokers,omitempty"` - Verbose bool `json:"verbose,omitempty"` - - SaslUsername string `json:"sasl_username,omitempty"` - SaslPassword string `json:"sasl_password,omitempty"` - - ClientID string `json:"client_id,omitempty"` - - *filetypes.FileSpec - - BatchSize int `json:"batch_size"` -} - -func (s *Spec) SetDefaults() { - if s.FileSpec == nil { - s.FileSpec = &filetypes.FileSpec{} - } - s.FileSpec.SetDefaults() - - if s.ClientID == "" { - s.ClientID = "cq-destination-kafka" - } - - if s.BatchSize == 0 { - s.BatchSize = 1000 - } -} - -func (s *Spec) Validate() error { - if len(s.Brokers) == 0 { - return fmt.Errorf("at least one broker is required") - } - if s.Format == "" { - return fmt.Errorf("format is required") - } - - return nil -} diff --git a/plugins/destination/kafka/client/spec/gen/main.go b/plugins/destination/kafka/client/spec/gen/main.go new file mode 100644 index 00000000000000..4b8f0a96a7011a --- /dev/null +++ b/plugins/destination/kafka/client/spec/gen/main.go @@ -0,0 +1,35 @@ +package main + +import ( + "fmt" + "log" + "path" + "runtime" + + "github.com/cloudquery/cloudquery/plugins/destination/kafka/client/spec" + cqjsonschema "github.com/cloudquery/codegen/jsonschema" + "github.com/cloudquery/filetypes/v4" + "github.com/invopop/jsonschema" +) + +func main() { + fmt.Println("Generating JSON schema for plugin spec") + cqjsonschema.GenerateIntoFile(new(spec.Spec), path.Join(currDir(), "..", "schema.json"), + append(filetypes.FileSpec{}.JSONSchemaOptions(), + cqjsonschema.WithAddGoComments("github.com/cloudquery/cloudquery/plugins/destination/kafka/client/spec", path.Join(currDir(), "..")), + cqjsonschema.WithAddGoComments("github.com/cloudquery/filetypes/v4", path.Join(currDir(), "..", "..", "..", "vendor", "github.com/cloudquery/filetypes/v4")), + func(r *jsonschema.Reflector) { + // not required for this plugin + r.NullableFromType = false + }, + )..., + ) +} + +func currDir() string { + _, filename, _, ok := runtime.Caller(0) + if !ok { + log.Fatal("Failed to get caller information") + } + return path.Dir(filename) +} diff --git a/plugins/destination/kafka/client/spec/schema.go b/plugins/destination/kafka/client/spec/schema.go new file mode 100644 index 00000000000000..01c80173c7f63a --- /dev/null +++ b/plugins/destination/kafka/client/spec/schema.go @@ -0,0 +1,46 @@ +package spec + +import ( + _ "embed" + + "github.com/invopop/jsonschema" + orderedmap "github.com/wk8/go-ordered-map/v2" +) + +func (s Spec) JSONSchemaExtend(sc *jsonschema.Schema) { + s.FileSpec.JSONSchemaExtend(sc) // need to call manually + + strValueIsSet := func(property string) *jsonschema.Schema { + return &jsonschema.Schema{ + Title: "`" + property + "` value is set", + Properties: func() *orderedmap.OrderedMap[string, *jsonschema.Schema] { + p := *sc.Properties.Value(property) + p.Default = nil + p.Description = "" + p.MinLength = &([]uint64{1}[0]) + properties := orderedmap.New[string, *jsonschema.Schema]() + properties.Set(property, &p) + return properties + }(), + Required: []string{property}, + } + } + usernamePresent := strValueIsSet("sasl_username") + passwordPresent := strValueIsSet("sasl_password") + + sc.AllOf = append(sc.AllOf, + &jsonschema.Schema{ + Title: "Require `sasl_password` when `sasl_username` is set", + If: usernamePresent, + Then: passwordPresent, + }, + &jsonschema.Schema{ + Title: "Require `sasl_username` when `sasl_password` is set", + If: passwordPresent, + Then: usernamePresent, + }, + ) +} + +//go:embed schema.json +var JSONSchema string diff --git a/plugins/destination/kafka/client/spec/schema.json b/plugins/destination/kafka/client/spec/schema.json new file mode 100644 index 00000000000000..4648bbeadaece1 --- /dev/null +++ b/plugins/destination/kafka/client/spec/schema.json @@ -0,0 +1,226 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/cloudquery/cloudquery/plugins/destination/kafka/client/spec/spec", + "$ref": "#/$defs/Spec", + "$defs": { + "CSVSpec": { + "properties": { + "skip_header": { + "type": "boolean", + "description": "Specifies if the first line of a file should be the header.", + "default": false + }, + "delimiter": { + "type": "string", + "pattern": "^.$", + "description": "Character that will be used as the delimiter.", + "default": "," + } + }, + "additionalProperties": false, + "type": "object", + "description": "CloudQuery CSV file output spec." + }, + "JSONSpec": { + "additionalProperties": false, + "type": "object", + "description": "CloudQuery JSON file output spec." + }, + "ParquetSpec": { + "additionalProperties": false, + "type": "object", + "description": "CloudQuery Parquet file output spec." + }, + "Spec": { + "allOf": [ + { + "if": { + "properties": { + "sasl_username": { + "type": "string", + "minLength": 1 + } + }, + "required": [ + "sasl_username" + ], + "title": "`sasl_username` value is set" + }, + "then": { + "properties": { + "sasl_password": { + "type": "string", + "minLength": 1 + } + }, + "required": [ + "sasl_password" + ], + "title": "`sasl_password` value is set" + }, + "title": "Require `sasl_password` when `sasl_username` is set" + }, + { + "if": { + "properties": { + "sasl_password": { + "type": "string", + "minLength": 1 + } + }, + "required": [ + "sasl_password" + ], + "title": "`sasl_password` value is set" + }, + "then": { + "properties": { + "sasl_username": { + "type": "string", + "minLength": 1 + } + }, + "required": [ + "sasl_username" + ], + "title": "`sasl_username` value is set" + }, + "title": "Require `sasl_username` when `sasl_password` is set" + } + ], + "oneOf": [ + { + "properties": { + "format": { + "type": "string", + "const": "csv" + }, + "format_spec": { + "oneOf": [ + { + "$ref": "#/$defs/CSVSpec" + }, + { + "type": "null" + } + ] + } + } + }, + { + "properties": { + "format": { + "type": "string", + "const": "json" + }, + "format_spec": { + "oneOf": [ + { + "$ref": "#/$defs/JSONSpec" + }, + { + "type": "null" + } + ] + } + } + }, + { + "properties": { + "format": { + "type": "string", + "const": "parquet" + }, + "format_spec": { + "oneOf": [ + { + "$ref": "#/$defs/ParquetSpec" + }, + { + "type": "null" + } + ] + } + } + } + ], + "properties": { + "format": { + "type": "string", + "enum": [ + "csv", + "json", + "parquet" + ], + "description": "Output format." + }, + "format_spec": { + "oneOf": [ + { + "anyOf": [ + { + "$ref": "#/$defs/CSVSpec" + }, + { + "$ref": "#/$defs/JSONSpec" + }, + { + "$ref": "#/$defs/ParquetSpec" + } + ] + }, + { + "type": "null" + } + ] + }, + "compression": { + "type": "string", + "enum": [ + "", + "gzip" + ], + "description": "Compression type.\nEmpty or missing stands for no compression." + }, + "brokers": { + "items": { + "type": "string", + "minLength": 1 + }, + "type": "array", + "minItems": 1, + "description": "List of brokers to connect to." + }, + "verbose": { + "type": "boolean", + "description": "If `true`, the plugin will log all underlying Kafka client messages to the log." + }, + "sasl_username": { + "type": "string", + "description": "If connecting via SASL/PLAIN, the username to use." + }, + "sasl_password": { + "type": "string", + "description": "If connecting via SASL/PLAIN, the password to use." + }, + "client_id": { + "type": "string", + "description": "Client ID to be set for Kafka API calls.", + "default": "cq-destination-kafka" + }, + "batch_size": { + "type": "integer", + "minimum": 1, + "description": "Number of records to write before starting a new object.", + "default": 1000 + } + }, + "additionalProperties": false, + "type": "object", + "required": [ + "format", + "brokers" + ] + } + } +} diff --git a/plugins/destination/kafka/client/spec/schema_test.go b/plugins/destination/kafka/client/spec/schema_test.go new file mode 100644 index 00000000000000..e91c3640600f0e --- /dev/null +++ b/plugins/destination/kafka/client/spec/schema_test.go @@ -0,0 +1,183 @@ +package spec + +import ( + "testing" + + "github.com/cloudquery/codegen/jsonschema" +) + +func TestSpecJSONSchema(t *testing.T) { + // cases about embedded filetypes.FileSpec are tested in the corresponding package + // However, we add some tests to verify that it actually is properly working + jsonschema.TestJSONSchema(t, JSONSchema, []jsonschema.TestCase{ + { + Name: "csv file spec", + Spec: `{ + "format": "csv", + "format_spec": { + "skip_header": true, + "delimiter": "#" + }, + "brokers": ["abc"] +}`, + }, + { + Name: "bad format value", // also a part of embedded FileSpec testing + Spec: `{"format": "cs22v", "brokers": ["abc"]}`, + Err: true, + }, + { + Name: "missing brokers", + Spec: `{"format": "csv"}`, + Err: true, + }, + { + Name: "empty brokers", + Spec: `{"format": "csv", "brokers": []}`, + Err: true, + }, + { + Name: "null brokers", + Spec: `{"format": "csv", "brokers": null}`, + Err: true, + }, + { + Name: "integer brokers", + Spec: `{"format": "csv", "brokers": 123}`, + Err: true, + }, + { + Name: "empty brokers value", + Spec: `{"format": "csv", "brokers": [""]}`, + Err: true, + }, + { + Name: "null brokers value", + Spec: `{"format": "csv", "brokers": [null]}`, + Err: true, + }, + { + Name: "integer brokers value", + Spec: `{"format": "csv", "brokers": [123]}`, + Err: true, + }, + { + Name: "proper brokers", + Spec: `{"format": "csv", "brokers": ["abc"]}`, + }, + { + Name: "null verbose", + Spec: `{"format": "csv", "brokers": ["abc"], "verbose": null}`, + Err: true, + }, + { + Name: "integer verbose", + Spec: `{"format": "csv", "brokers": ["abc"], "verbose": 123}`, + Err: true, + }, + { + Name: "verbose:true", + Spec: `{"format": "csv", "brokers": ["abc"], "verbose": true}`, + }, + { + Name: "verbose:false", + Spec: `{"format": "csv", "brokers": ["abc"], "verbose": false}`, + }, + // sasl_username & sasl_password have to go together + { + Name: "empty sasl_username with empty sasl_password", + Spec: `{"format": "csv", "brokers": ["abc"], "sasl_username": "", "sasl_password": ""}`, + }, + { + Name: "non-empty sasl_username with non-empty sasl_password", + Spec: `{"format": "csv", "brokers": ["abc"], "sasl_username": "user", "sasl_password": "password"}`, + }, + { + Name: "non-empty sasl_username without sasl_password", + Spec: `{"format": "csv", "brokers": ["abc"], "sasl_username": "user"}`, + Err: true, + }, + { + Name: "non-empty sasl_username with empty sasl_password", + Spec: `{"format": "csv", "brokers": ["abc"], "sasl_username": "user", "sasl_password": ""}`, + Err: true, + }, + { + Name: "non-empty sasl_username with integer sasl_password", + Spec: `{"format": "csv", "brokers": ["abc"], "sasl_username": "user", "sasl_password": 123}`, + Err: true, + }, + { + Name: "non-empty sasl_username with null sasl_password", + Spec: `{"format": "csv", "brokers": ["abc"], "sasl_username": "user", "sasl_password": null}`, + Err: true, + }, + { + Name: "non-empty sasl_password without sasl_username", + Spec: `{"format": "csv", "brokers": ["abc"], "sasl_password": "password"}`, + Err: true, + }, + { + Name: "non-empty sasl_password with empty sasl_username", + Spec: `{"format": "csv", "brokers": ["abc"], "sasl_username": "", "sasl_password": "password"}`, + Err: true, + }, + { + Name: "non-empty sasl_password with integer sasl_username", + Spec: `{"format": "csv", "brokers": ["abc"], "sasl_username": 123, "sasl_password": "password"}`, + Err: true, + }, + { + Name: "non-empty sasl_password with null sasl_username", + Spec: `{"format": "csv", "brokers": ["abc"], "sasl_username": null, "sasl_password": "password"}`, + Err: true, + }, + { + Name: "empty client_id", + Spec: `{"format": "csv", "brokers": ["abc"], "client_id": ""}`, + }, + { + Name: "null client_id", + Spec: `{"format": "csv", "brokers": ["abc"], "client_id": null}`, + Err: true, + }, + { + Name: "integer client_id", + Spec: `{"format": "csv", "brokers": ["abc"], "client_id": 123}`, + Err: true, + }, + { + Name: "proper client_id", + Spec: `{"format": "csv", "brokers": ["abc"], "client_id": "abc"}`, + }, + { + Name: "zero batch_size", + Spec: `{"format": "csv", "brokers": ["abc"], "batch_size": 0}`, + Err: true, + }, + { + Name: "float batch_size", + Spec: `{"format": "csv", "brokers": ["abc"], "batch_size": 1.5}`, + Err: true, + }, + { + Name: "negative batch_size", + Spec: `{"format": "csv", "brokers": ["abc"], "batch_size": -1}`, + Err: true, + }, + { + Name: "null batch_size", + Spec: `{"format": "csv", "brokers": ["abc"], "batch_size": null}`, + Err: true, + }, + { + Name: "string batch_size", + Spec: `{"format": "csv", "brokers": ["abc"], "batch_size": "abc"}`, + Err: true, + }, + { + Name: "proper batch_size", + Spec: `{"format": "csv", "brokers": ["abc"], "batch_size": 100}`, + }, + }) +} diff --git a/plugins/destination/kafka/client/spec/spec.go b/plugins/destination/kafka/client/spec/spec.go new file mode 100644 index 00000000000000..5d6d636553047c --- /dev/null +++ b/plugins/destination/kafka/client/spec/spec.go @@ -0,0 +1,56 @@ +package spec + +import ( + "fmt" + + "github.com/cloudquery/filetypes/v4" +) + +type Spec struct { + filetypes.FileSpec + + // List of brokers to connect to. + Brokers []string `json:"brokers,omitempty" jsonschema:"required,minLength=1,minItems=1"` + + // If `true`, the plugin will log all underlying Kafka client messages to the log. + Verbose bool `json:"verbose,omitempty"` + + // If connecting via SASL/PLAIN, the username to use. + SASLUsername string `json:"sasl_username,omitempty"` + + // If connecting via SASL/PLAIN, the password to use. + SASLPassword string `json:"sasl_password,omitempty"` + + // Client ID to be set for Kafka API calls. + ClientID string `json:"client_id,omitempty" jsonschema:"default=cq-destination-kafka"` + + // Number of records to write before starting a new object. + BatchSize int `json:"batch_size" jsonschema:"minimum=1,default=1000"` +} + +func (s *Spec) SetDefaults() { + s.FileSpec.SetDefaults() + + if s.ClientID == "" { + s.ClientID = "cq-destination-kafka" + } + + if s.BatchSize < 1 { + s.BatchSize = 1000 + } +} + +func (s *Spec) Validate() error { + if len(s.Brokers) == 0 { + return fmt.Errorf("at least one broker is required") + } + + // required for s.FileSpec.Validate call + err := s.FileSpec.UnmarshalSpec() + if err != nil { + return err + } + s.FileSpec.SetDefaults() + + return s.FileSpec.Validate() +} diff --git a/plugins/destination/kafka/docs/overview.md b/plugins/destination/kafka/docs/overview.md index f2444f3931e4ae..cae085db0e3966 100644 --- a/plugins/destination/kafka/docs/overview.md +++ b/plugins/destination/kafka/docs/overview.md @@ -20,6 +20,7 @@ This is the (nested) plugin spec - `brokers` (`[]string`) (required) List of brokers to connect to. + - `format` (`string`) (required) Format of the output file. Supported values are `csv`, `json` and `parquet`. diff --git a/plugins/destination/kafka/go.mod b/plugins/destination/kafka/go.mod index f4876150952866..c5af9e605d28d8 100644 --- a/plugins/destination/kafka/go.mod +++ b/plugins/destination/kafka/go.mod @@ -7,9 +7,12 @@ toolchain go1.21.6 require ( github.com/Shopify/sarama v1.37.2 github.com/apache/arrow/go/v15 v15.0.0-20240115115805-d7bc55542e61 + github.com/cloudquery/codegen v0.3.12 github.com/cloudquery/filetypes/v4 v4.2.10 github.com/cloudquery/plugin-sdk/v4 v4.29.1 + github.com/invopop/jsonschema v0.12.0 github.com/rs/zerolog v1.31.0 + github.com/wk8/go-ordered-map/v2 v2.1.8 ) require ( @@ -32,7 +35,6 @@ require ( github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect github.com/chenzhuoyu/iasm v0.9.1 // indirect github.com/cloudquery/cloudquery-api-go v1.7.2 // indirect - github.com/cloudquery/codegen v0.3.12 // indirect github.com/cloudquery/plugin-pb-go v1.16.7 // indirect github.com/cloudquery/plugin-sdk/v2 v2.7.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect @@ -65,7 +67,6 @@ require ( github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-uuid v1.0.3 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect - github.com/invopop/jsonschema v0.12.0 // indirect github.com/iris-contrib/schema v0.0.6 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect @@ -115,7 +116,6 @@ require ( github.com/valyala/fasttemplate v1.2.2 // indirect github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect - github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect github.com/yosssi/ace v0.0.5 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.opentelemetry.io/otel v1.20.0 // indirect diff --git a/plugins/destination/kafka/main.go b/plugins/destination/kafka/main.go index bf81ac86f4dabe..77e0436b2c4b11 100644 --- a/plugins/destination/kafka/main.go +++ b/plugins/destination/kafka/main.go @@ -5,6 +5,7 @@ import ( "log" "github.com/cloudquery/cloudquery/plugins/destination/kafka/client" + "github.com/cloudquery/cloudquery/plugins/destination/kafka/client/spec" internalPlugin "github.com/cloudquery/cloudquery/plugins/destination/kafka/resources/plugin" "github.com/cloudquery/plugin-sdk/v4/plugin" "github.com/cloudquery/plugin-sdk/v4/serve" @@ -18,6 +19,7 @@ func main() { p := plugin.NewPlugin(internalPlugin.Name, internalPlugin.Version, client.New, plugin.WithKind(internalPlugin.Kind), plugin.WithTeam(internalPlugin.Team), + plugin.WithJSONSchema(spec.JSONSchema), ) if err := serve.Plugin(p, serve.WithPluginSentryDSN(sentryDSN), serve.WithDestinationV0V1Server()).Serve(context.Background()); err != nil { log.Fatalf("failed to serve plugin: %v", err)