From d24981fbe61cd4f916196eaabdfa3d63e87d360b Mon Sep 17 00:00:00 2001 From: candiduslynx Date: Thu, 1 Feb 2024 18:30:37 +0200 Subject: [PATCH 1/2] init --- .github/workflows/dest_kafka.yml | 6 + plugins/destination/kafka/Makefile | 10 + plugins/destination/kafka/client/client.go | 15 +- .../destination/kafka/client/client_test.go | 9 +- plugins/destination/kafka/client/spec.go | 47 ---- .../destination/kafka/client/spec/gen/main.go | 30 +++ .../destination/kafka/client/spec/schema.go | 46 ++++ .../destination/kafka/client/spec/schema.json | 231 ++++++++++++++++++ .../kafka/client/spec/schema_test.go | 199 +++++++++++++++ plugins/destination/kafka/client/spec/spec.go | 56 +++++ plugins/destination/kafka/docs/overview.md | 1 + plugins/destination/kafka/go.mod | 6 +- plugins/destination/kafka/main.go | 2 + 13 files changed, 597 insertions(+), 61 deletions(-) delete mode 100644 plugins/destination/kafka/client/spec.go create mode 100644 plugins/destination/kafka/client/spec/gen/main.go create mode 100644 plugins/destination/kafka/client/spec/schema.go create mode 100644 plugins/destination/kafka/client/spec/schema.json create mode 100644 plugins/destination/kafka/client/spec/schema_test.go create mode 100644 plugins/destination/kafka/client/spec/spec.go diff --git a/.github/workflows/dest_kafka.yml b/.github/workflows/dest_kafka.yml index 757a413dcf9c4f..52200e35be1fb2 100644 --- a/.github/workflows/dest_kafka.yml +++ b/.github/workflows/dest_kafka.yml @@ -67,6 +67,12 @@ jobs: args: "--config ../../.golangci.yml" skip-pkg-cache: true skip-build-cache: true + - name: gen + if: github.event_name == 'pull_request' + run: make gen + - name: Fail if generation updated files + if: github.event_name == 'pull_request' + run: test "$(git status -s | wc -l)" -eq 0 || (git status -s; exit 1) - name: Build run: go build . - name: Test diff --git a/plugins/destination/kafka/Makefile b/plugins/destination/kafka/Makefile index c957b06653ed16..5c3df7adfed0c8 100644 --- a/plugins/destination/kafka/Makefile +++ b/plugins/destination/kafka/Makefile @@ -7,3 +7,13 @@ test: .PHONY: lint lint: golangci-lint run --config ../../.golangci.yml + +.PHONY: gen-spec-schema +gen-spec-schema: + # required for loading comments from filetypes + go mod vendor + go run client/spec/gen/main.go + +# All gen targets +.PHONY: gen +gen: gen-spec-schema diff --git a/plugins/destination/kafka/client/client.go b/plugins/destination/kafka/client/client.go index e0df2da9db26ae..2cc6fdc731060f 100644 --- a/plugins/destination/kafka/client/client.go +++ b/plugins/destination/kafka/client/client.go @@ -8,6 +8,7 @@ import ( "time" "github.com/Shopify/sarama" + "github.com/cloudquery/cloudquery/plugins/destination/kafka/client/spec" "github.com/cloudquery/filetypes/v4" "github.com/cloudquery/plugin-sdk/v4/plugin" "github.com/rs/zerolog" @@ -20,12 +21,12 @@ type Client struct { producer sarama.SyncProducer logger zerolog.Logger - spec *Spec + spec *spec.Spec *filetypes.Client } -func New(_ context.Context, logger zerolog.Logger, spec []byte, opts plugin.NewClientOptions) (plugin.Client, error) { +func New(_ context.Context, logger zerolog.Logger, s []byte, opts plugin.NewClientOptions) (plugin.Client, error) { c := &Client{ logger: logger.With().Str("module", "dest-kafka").Logger(), } @@ -33,7 +34,7 @@ func New(_ context.Context, logger zerolog.Logger, spec []byte, opts plugin.NewC return c, nil } - if err := json.Unmarshal(spec, &c.spec); err != nil { + if err := json.Unmarshal(s, &c.spec); err != nil { return nil, fmt.Errorf("failed to unmarshal spec: %w", err) } if err := c.spec.Validate(); err != nil { @@ -55,10 +56,10 @@ func New(_ context.Context, logger zerolog.Logger, spec []byte, opts plugin.NewC c.conf.Metadata.Full = true c.conf.ClientID = c.spec.ClientID - if c.spec.SaslUsername != "" { + if c.spec.SASLUsername != "" { c.conf.Net.SASL.Enable = true - c.conf.Net.SASL.User = c.spec.SaslUsername - c.conf.Net.SASL.Password = c.spec.SaslPassword + c.conf.Net.SASL.User = c.spec.SASLUsername + c.conf.Net.SASL.Password = c.spec.SASLPassword c.conf.Net.TLS.Enable = true c.conf.Net.TLS.Config = &tls.Config{InsecureSkipVerify: true} c.conf.Net.SASL.Handshake = true @@ -70,7 +71,7 @@ func New(_ context.Context, logger zerolog.Logger, spec []byte, opts plugin.NewC return nil, err } - filetypesClient, err := filetypes.NewClient(c.spec.FileSpec) + filetypesClient, err := filetypes.NewClient(&c.spec.FileSpec) if err != nil { return nil, fmt.Errorf("failed to create filetypes client: %w", err) } diff --git a/plugins/destination/kafka/client/client_test.go b/plugins/destination/kafka/client/client_test.go index 7c157fcdbfd114..bf2021267bbc48 100644 --- a/plugins/destination/kafka/client/client_test.go +++ b/plugins/destination/kafka/client/client_test.go @@ -7,6 +7,7 @@ import ( "strings" "testing" + "github.com/cloudquery/cloudquery/plugins/destination/kafka/client/spec" "github.com/cloudquery/filetypes/v4" "github.com/cloudquery/plugin-sdk/v4/plugin" ) @@ -26,12 +27,12 @@ func getenv(key, fallback string) string { func TestPlugin(t *testing.T) { ctx := context.Background() p := plugin.NewPlugin("kafka", "development", New) - b, err := json.Marshal(&Spec{ + b, err := json.Marshal(&spec.Spec{ Brokers: strings.Split(getenv("CQ_DEST_KAFKA_CONNECTION_STRING", defaultConnectionString), ","), - SaslUsername: getenv("CQ_DEST_KAFKA_SASL_USERNAME", ""), - SaslPassword: getenv("CQ_DEST_KAFKA_SASL_PASSWORD", ""), + SASLUsername: getenv("CQ_DEST_KAFKA_SASL_USERNAME", ""), + SASLPassword: getenv("CQ_DEST_KAFKA_SASL_PASSWORD", ""), Verbose: true, - FileSpec: &filetypes.FileSpec{Format: filetypes.FormatTypeJSON}, + FileSpec: filetypes.FileSpec{Format: filetypes.FormatTypeJSON}, }) if err != nil { t.Fatal(err) diff --git a/plugins/destination/kafka/client/spec.go b/plugins/destination/kafka/client/spec.go deleted file mode 100644 index b65b4708b20b21..00000000000000 --- a/plugins/destination/kafka/client/spec.go +++ /dev/null @@ -1,47 +0,0 @@ -package client - -import ( - "fmt" - - "github.com/cloudquery/filetypes/v4" -) - -type Spec struct { - Brokers []string `json:"brokers,omitempty"` - Verbose bool `json:"verbose,omitempty"` - - SaslUsername string `json:"sasl_username,omitempty"` - SaslPassword string `json:"sasl_password,omitempty"` - - ClientID string `json:"client_id,omitempty"` - - *filetypes.FileSpec - - BatchSize int `json:"batch_size"` -} - -func (s *Spec) SetDefaults() { - if s.FileSpec == nil { - s.FileSpec = &filetypes.FileSpec{} - } - s.FileSpec.SetDefaults() - - if s.ClientID == "" { - s.ClientID = "cq-destination-kafka" - } - - if s.BatchSize == 0 { - s.BatchSize = 1000 - } -} - -func (s *Spec) Validate() error { - if len(s.Brokers) == 0 { - return fmt.Errorf("at least one broker is required") - } - if s.Format == "" { - return fmt.Errorf("format is required") - } - - return nil -} diff --git a/plugins/destination/kafka/client/spec/gen/main.go b/plugins/destination/kafka/client/spec/gen/main.go new file mode 100644 index 00000000000000..5f0bb03bb449a7 --- /dev/null +++ b/plugins/destination/kafka/client/spec/gen/main.go @@ -0,0 +1,30 @@ +package main + +import ( + "fmt" + "log" + "path" + "runtime" + + "github.com/cloudquery/cloudquery/plugins/destination/kafka/client/spec" + "github.com/cloudquery/codegen/jsonschema" + "github.com/cloudquery/filetypes/v4" +) + +func main() { + fmt.Println("Generating JSON schema for plugin spec") + jsonschema.GenerateIntoFile(new(spec.Spec), path.Join(currDir(), "..", "schema.json"), + append(filetypes.FileSpec{}.JSONSchemaOptions(), + jsonschema.WithAddGoComments("github.com/cloudquery/cloudquery/plugins/destination/kafka/client/spec", path.Join(currDir(), "..")), + jsonschema.WithAddGoComments("github.com/cloudquery/filetypes/v4", path.Join(currDir(), "..", "..", "..", "vendor", "github.com/cloudquery/filetypes/v4")), + )..., + ) +} + +func currDir() string { + _, filename, _, ok := runtime.Caller(0) + if !ok { + log.Fatal("Failed to get caller information") + } + return path.Dir(filename) +} diff --git a/plugins/destination/kafka/client/spec/schema.go b/plugins/destination/kafka/client/spec/schema.go new file mode 100644 index 00000000000000..01c80173c7f63a --- /dev/null +++ b/plugins/destination/kafka/client/spec/schema.go @@ -0,0 +1,46 @@ +package spec + +import ( + _ "embed" + + "github.com/invopop/jsonschema" + orderedmap "github.com/wk8/go-ordered-map/v2" +) + +func (s Spec) JSONSchemaExtend(sc *jsonschema.Schema) { + s.FileSpec.JSONSchemaExtend(sc) // need to call manually + + strValueIsSet := func(property string) *jsonschema.Schema { + return &jsonschema.Schema{ + Title: "`" + property + "` value is set", + Properties: func() *orderedmap.OrderedMap[string, *jsonschema.Schema] { + p := *sc.Properties.Value(property) + p.Default = nil + p.Description = "" + p.MinLength = &([]uint64{1}[0]) + properties := orderedmap.New[string, *jsonschema.Schema]() + properties.Set(property, &p) + return properties + }(), + Required: []string{property}, + } + } + usernamePresent := strValueIsSet("sasl_username") + passwordPresent := strValueIsSet("sasl_password") + + sc.AllOf = append(sc.AllOf, + &jsonschema.Schema{ + Title: "Require `sasl_password` when `sasl_username` is set", + If: usernamePresent, + Then: passwordPresent, + }, + &jsonschema.Schema{ + Title: "Require `sasl_username` when `sasl_password` is set", + If: passwordPresent, + Then: usernamePresent, + }, + ) +} + +//go:embed schema.json +var JSONSchema string diff --git a/plugins/destination/kafka/client/spec/schema.json b/plugins/destination/kafka/client/spec/schema.json new file mode 100644 index 00000000000000..76fd7cb7e9687e --- /dev/null +++ b/plugins/destination/kafka/client/spec/schema.json @@ -0,0 +1,231 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/cloudquery/cloudquery/plugins/destination/kafka/client/spec/spec", + "$ref": "#/$defs/Spec", + "$defs": { + "CSVSpec": { + "properties": { + "skip_header": { + "type": "boolean", + "description": "Specifies if the first line of a file should be the header.", + "default": false + }, + "delimiter": { + "type": "string", + "pattern": "^.$", + "description": "Character that will be used as the delimiter.", + "default": "," + } + }, + "additionalProperties": false, + "type": "object", + "description": "CloudQuery CSV file output spec." + }, + "JSONSpec": { + "additionalProperties": false, + "type": "object", + "description": "CloudQuery JSON file output spec." + }, + "ParquetSpec": { + "additionalProperties": false, + "type": "object", + "description": "CloudQuery Parquet file output spec." + }, + "Spec": { + "allOf": [ + { + "if": { + "properties": { + "sasl_username": { + "type": "string", + "minLength": 1 + } + }, + "required": [ + "sasl_username" + ], + "title": "`sasl_username` value is set" + }, + "then": { + "properties": { + "sasl_password": { + "type": "string", + "minLength": 1 + } + }, + "required": [ + "sasl_password" + ], + "title": "`sasl_password` value is set" + }, + "title": "Require `sasl_password` when `sasl_username` is set" + }, + { + "if": { + "properties": { + "sasl_password": { + "type": "string", + "minLength": 1 + } + }, + "required": [ + "sasl_password" + ], + "title": "`sasl_password` value is set" + }, + "then": { + "properties": { + "sasl_username": { + "type": "string", + "minLength": 1 + } + }, + "required": [ + "sasl_username" + ], + "title": "`sasl_username` value is set" + }, + "title": "Require `sasl_username` when `sasl_password` is set" + } + ], + "oneOf": [ + { + "properties": { + "format": { + "type": "string", + "const": "csv" + }, + "format_spec": { + "oneOf": [ + { + "$ref": "#/$defs/CSVSpec" + }, + { + "type": "null" + } + ] + } + } + }, + { + "properties": { + "format": { + "type": "string", + "const": "json" + }, + "format_spec": { + "oneOf": [ + { + "$ref": "#/$defs/JSONSpec" + }, + { + "type": "null" + } + ] + } + } + }, + { + "properties": { + "format": { + "type": "string", + "const": "parquet" + }, + "format_spec": { + "oneOf": [ + { + "$ref": "#/$defs/ParquetSpec" + }, + { + "type": "null" + } + ] + } + } + } + ], + "properties": { + "format": { + "type": "string", + "enum": [ + "csv", + "json", + "parquet" + ], + "description": "Output format." + }, + "format_spec": { + "oneOf": [ + { + "anyOf": [ + { + "$ref": "#/$defs/CSVSpec" + }, + { + "$ref": "#/$defs/JSONSpec" + }, + { + "$ref": "#/$defs/ParquetSpec" + } + ] + }, + { + "type": "null" + } + ] + }, + "compression": { + "type": "string", + "enum": [ + "", + "gzip" + ], + "description": "Compression type.\nEmpty or missing stands for no compression." + }, + "brokers": { + "oneOf": [ + { + "items": { + "type": "string", + "minLength": 1 + }, + "type": "array", + "minItems": 1, + "description": "List of brokers to connect to." + }, + { + "type": "null" + } + ] + }, + "verbose": { + "type": "boolean", + "description": "If `true`, the plugin will log all underlying Kafka client messages to the log." + }, + "sasl_username": { + "type": "string", + "description": "If connecting via SASL/PLAIN, the username to use." + }, + "sasl_password": { + "type": "string", + "description": "If connecting via SASL/PLAIN, the password to use." + }, + "client_id": { + "type": "string", + "description": "Client ID to be set for Kafka API calls.", + "default": "cq-destination-kafka" + }, + "batch_size": { + "type": "integer", + "description": "Number of records to write before starting a new object." + } + }, + "additionalProperties": false, + "type": "object", + "required": [ + "format", + "brokers" + ] + } + } +} diff --git a/plugins/destination/kafka/client/spec/schema_test.go b/plugins/destination/kafka/client/spec/schema_test.go new file mode 100644 index 00000000000000..3bcf9db3e77941 --- /dev/null +++ b/plugins/destination/kafka/client/spec/schema_test.go @@ -0,0 +1,199 @@ +package spec + +import ( + "testing" + + "github.com/cloudquery/codegen/jsonschema" +) + +func TestSpecJSONSchema(t *testing.T) { + // cases about embedded filetypes.FileSpec are tested in the corresponding package + // However, we add some tests to verify that it actually is properly working + jsonschema.TestJSONSchema(t, JSONSchema, []jsonschema.TestCase{ + { + Name: "csv file spec", + Spec: `{ + "format": "csv", + "format_spec": { + "skip_header": true, + "delimiter": "#" + }, + "path": "abc", + "bucket": "abc" +}`, + }, + { + Name: "bad format value", // also a part of embedded FileSpec testing + Spec: `{"format": "cs22v", "path": "abc"}`, + Err: true, + }, + { + Name: "minimal", + Spec: `{"format": "csv", "path": "abc", "bucket": "abc"}`, + }, + { + Name: "missing path", + Spec: `{"format": "csv", "bucket": "abc"}`, + Err: true, + }, + { + Name: "empty path", + Spec: `{"format": "csv", "path": "", "bucket": "abc"}`, + Err: true, + }, + { + Name: "null path", + Spec: `{"format": "csv", "path": null, "bucket": "abc"}`, + Err: true, + }, + { + Name: "integer path", + Spec: `{"format": "csv", "path": 123, "bucket": "abc"}`, + Err: true, + }, + { + Name: "missing bucket", + Spec: `{"format": "csv", "path": "abc"}`, + Err: true, + }, + { + Name: "empty bucket", + Spec: `{"format": "csv", "path": "abc", "bucket": ""}`, + Err: true, + }, + { + Name: "null bucket", + Spec: `{"format": "csv", "path": "abc", "bucket": null}`, + Err: true, + }, + { + Name: "integer bucket", + Spec: `{"format": "csv", "path": "abc", "bucket": 123}`, + Err: true, + }, + + { + Name: "null no_rotate", + Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate": null}`, + Err: true, + }, + { + Name: "bad no_rotate", + Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate": 123}`, + Err: true, + }, + { + Name: "no_rotate:true", + Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate": true}`, + }, + { + Name: "no_rotate:false", + Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate": false}`, + }, + { + Name: "zero batch_size", + Err: true, + Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "batch_size":0}`, + }, + { + Name: "float batch_size", + Err: true, + Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "batch_size":5.3}`, + }, + { + Name: "bad batch_size", + Err: true, + Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "batch_size":false}`, + }, + { + Name: "null batch_size", + Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "batch_size":null}`, + }, + { + Name: "proper batch_size", + Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "batch_size":123}`, + }, + { + Name: "zero batch_size_bytes", + Err: true, + Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "batch_size_bytes":0}`, + }, + { + Name: "float batch_size_bytes", + Err: true, + Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "batch_size_bytes":5.3}`, + }, + { + Name: "bad batch_size_bytes", + Err: true, + Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "batch_size_bytes":false}`, + }, + { + Name: "null batch_size_bytes", + Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "batch_size_bytes":null}`, + }, + { + Name: "proper batch_size_bytes", + Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "batch_size_bytes":123}`, + }, + // configtype.Duration is tested in plugin-sdk + // test only null here + { + Name: "null batch_timeout", + Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "batch_timeout":null}`, + }, + + // no_rotate + batching + { + Name: "no_rotate:false & batch_size:100", + Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate":false, "batch_size":100}`, + }, + { + Name: "no_rotate:true & batch_size:100", + Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate":true, "batch_size":100}`, + Err: true, + }, + { + Name: "no_rotate:false & batch_size:null", + Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate":false, "batch_size":null}`, + }, + { + Name: "no_rotate:true & batch_size:null", + Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate":true, "batch_size":null}`, + }, + { + Name: "no_rotate:false & batch_size_bytes:100", + Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate":false, "batch_size_bytes":100}`, + }, + { + Name: "no_rotate:true & batch_size_bytes:100", + Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate":true, "batch_size_bytes":100}`, + Err: true, + }, + { + Name: "no_rotate:false & batch_size_bytes:null", + Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate":false, "batch_size_bytes":null}`, + }, + { + Name: "no_rotate:true & batch_size_bytes:null", + Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate":true, "batch_size_bytes":null}`, + }, + { + Name: "no_rotate:false & batch_timeout:100s", + Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate":false, "batch_timeout":"100s"}`, + }, + { + Name: "no_rotate:true & batch_timeout:100s", + Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate":true, "batch_timeout":"100s"}`, + Err: true, + }, + { + Name: "no_rotate:false & batch_timeout:null", + Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate":false, "batch_timeout":null}`, + }, + { + Name: "no_rotate:true & batch_timeout:null", + Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate":true, "batch_timeout":null}`, + }, + }) +} diff --git a/plugins/destination/kafka/client/spec/spec.go b/plugins/destination/kafka/client/spec/spec.go new file mode 100644 index 00000000000000..b26ec629fc22df --- /dev/null +++ b/plugins/destination/kafka/client/spec/spec.go @@ -0,0 +1,56 @@ +package spec + +import ( + "fmt" + + "github.com/cloudquery/filetypes/v4" +) + +type Spec struct { + filetypes.FileSpec + + // List of brokers to connect to. + Brokers []string `json:"brokers,omitempty" jsonschema:"required,minLength=1,minItems=1"` + + // If `true`, the plugin will log all underlying Kafka client messages to the log. + Verbose bool `json:"verbose,omitempty"` + + // If connecting via SASL/PLAIN, the username to use. + SASLUsername string `json:"sasl_username,omitempty"` + + // If connecting via SASL/PLAIN, the password to use. + SASLPassword string `json:"sasl_password,omitempty"` + + // Client ID to be set for Kafka API calls. + ClientID string `json:"client_id,omitempty" jsonschema:"default=cq-destination-kafka"` + + // Number of records to write before starting a new object. + BatchSize int `json:"batch_size"` +} + +func (s *Spec) SetDefaults() { + s.FileSpec.SetDefaults() + + if s.ClientID == "" { + s.ClientID = "cq-destination-kafka" + } + + if s.BatchSize == 0 { + s.BatchSize = 1000 + } +} + +func (s *Spec) Validate() error { + if len(s.Brokers) == 0 { + return fmt.Errorf("at least one broker is required") + } + + // required for s.FileSpec.Validate call + err := s.FileSpec.UnmarshalSpec() + if err != nil { + return err + } + s.FileSpec.SetDefaults() + + return s.FileSpec.Validate() +} diff --git a/plugins/destination/kafka/docs/overview.md b/plugins/destination/kafka/docs/overview.md index f2444f3931e4ae..cae085db0e3966 100644 --- a/plugins/destination/kafka/docs/overview.md +++ b/plugins/destination/kafka/docs/overview.md @@ -20,6 +20,7 @@ This is the (nested) plugin spec - `brokers` (`[]string`) (required) List of brokers to connect to. + - `format` (`string`) (required) Format of the output file. Supported values are `csv`, `json` and `parquet`. diff --git a/plugins/destination/kafka/go.mod b/plugins/destination/kafka/go.mod index f4876150952866..c5af9e605d28d8 100644 --- a/plugins/destination/kafka/go.mod +++ b/plugins/destination/kafka/go.mod @@ -7,9 +7,12 @@ toolchain go1.21.6 require ( github.com/Shopify/sarama v1.37.2 github.com/apache/arrow/go/v15 v15.0.0-20240115115805-d7bc55542e61 + github.com/cloudquery/codegen v0.3.12 github.com/cloudquery/filetypes/v4 v4.2.10 github.com/cloudquery/plugin-sdk/v4 v4.29.1 + github.com/invopop/jsonschema v0.12.0 github.com/rs/zerolog v1.31.0 + github.com/wk8/go-ordered-map/v2 v2.1.8 ) require ( @@ -32,7 +35,6 @@ require ( github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect github.com/chenzhuoyu/iasm v0.9.1 // indirect github.com/cloudquery/cloudquery-api-go v1.7.2 // indirect - github.com/cloudquery/codegen v0.3.12 // indirect github.com/cloudquery/plugin-pb-go v1.16.7 // indirect github.com/cloudquery/plugin-sdk/v2 v2.7.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect @@ -65,7 +67,6 @@ require ( github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-uuid v1.0.3 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect - github.com/invopop/jsonschema v0.12.0 // indirect github.com/iris-contrib/schema v0.0.6 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect @@ -115,7 +116,6 @@ require ( github.com/valyala/fasttemplate v1.2.2 // indirect github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect - github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect github.com/yosssi/ace v0.0.5 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.opentelemetry.io/otel v1.20.0 // indirect diff --git a/plugins/destination/kafka/main.go b/plugins/destination/kafka/main.go index bf81ac86f4dabe..77e0436b2c4b11 100644 --- a/plugins/destination/kafka/main.go +++ b/plugins/destination/kafka/main.go @@ -5,6 +5,7 @@ import ( "log" "github.com/cloudquery/cloudquery/plugins/destination/kafka/client" + "github.com/cloudquery/cloudquery/plugins/destination/kafka/client/spec" internalPlugin "github.com/cloudquery/cloudquery/plugins/destination/kafka/resources/plugin" "github.com/cloudquery/plugin-sdk/v4/plugin" "github.com/cloudquery/plugin-sdk/v4/serve" @@ -18,6 +19,7 @@ func main() { p := plugin.NewPlugin(internalPlugin.Name, internalPlugin.Version, client.New, plugin.WithKind(internalPlugin.Kind), plugin.WithTeam(internalPlugin.Team), + plugin.WithJSONSchema(spec.JSONSchema), ) if err := serve.Plugin(p, serve.WithPluginSentryDSN(sentryDSN), serve.WithDestinationV0V1Server()).Serve(context.Background()); err != nil { log.Fatalf("failed to serve plugin: %v", err) From 8a9b8a993e7c92920842120f15a8eef11937fb8e Mon Sep 17 00:00:00 2001 From: candiduslynx Date: Thu, 1 Feb 2024 18:57:01 +0200 Subject: [PATCH 2/2] tests --- .../destination/kafka/client/spec/gen/main.go | 13 +- .../destination/kafka/client/spec/schema.json | 25 ++- .../kafka/client/spec/schema_test.go | 166 ++++++++---------- plugins/destination/kafka/client/spec/spec.go | 4 +- 4 files changed, 96 insertions(+), 112 deletions(-) diff --git a/plugins/destination/kafka/client/spec/gen/main.go b/plugins/destination/kafka/client/spec/gen/main.go index 5f0bb03bb449a7..4b8f0a96a7011a 100644 --- a/plugins/destination/kafka/client/spec/gen/main.go +++ b/plugins/destination/kafka/client/spec/gen/main.go @@ -7,16 +7,21 @@ import ( "runtime" "github.com/cloudquery/cloudquery/plugins/destination/kafka/client/spec" - "github.com/cloudquery/codegen/jsonschema" + cqjsonschema "github.com/cloudquery/codegen/jsonschema" "github.com/cloudquery/filetypes/v4" + "github.com/invopop/jsonschema" ) func main() { fmt.Println("Generating JSON schema for plugin spec") - jsonschema.GenerateIntoFile(new(spec.Spec), path.Join(currDir(), "..", "schema.json"), + cqjsonschema.GenerateIntoFile(new(spec.Spec), path.Join(currDir(), "..", "schema.json"), append(filetypes.FileSpec{}.JSONSchemaOptions(), - jsonschema.WithAddGoComments("github.com/cloudquery/cloudquery/plugins/destination/kafka/client/spec", path.Join(currDir(), "..")), - jsonschema.WithAddGoComments("github.com/cloudquery/filetypes/v4", path.Join(currDir(), "..", "..", "..", "vendor", "github.com/cloudquery/filetypes/v4")), + cqjsonschema.WithAddGoComments("github.com/cloudquery/cloudquery/plugins/destination/kafka/client/spec", path.Join(currDir(), "..")), + cqjsonschema.WithAddGoComments("github.com/cloudquery/filetypes/v4", path.Join(currDir(), "..", "..", "..", "vendor", "github.com/cloudquery/filetypes/v4")), + func(r *jsonschema.Reflector) { + // not required for this plugin + r.NullableFromType = false + }, )..., ) } diff --git a/plugins/destination/kafka/client/spec/schema.json b/plugins/destination/kafka/client/spec/schema.json index 76fd7cb7e9687e..4648bbeadaece1 100644 --- a/plugins/destination/kafka/client/spec/schema.json +++ b/plugins/destination/kafka/client/spec/schema.json @@ -183,20 +183,13 @@ "description": "Compression type.\nEmpty or missing stands for no compression." }, "brokers": { - "oneOf": [ - { - "items": { - "type": "string", - "minLength": 1 - }, - "type": "array", - "minItems": 1, - "description": "List of brokers to connect to." - }, - { - "type": "null" - } - ] + "items": { + "type": "string", + "minLength": 1 + }, + "type": "array", + "minItems": 1, + "description": "List of brokers to connect to." }, "verbose": { "type": "boolean", @@ -217,7 +210,9 @@ }, "batch_size": { "type": "integer", - "description": "Number of records to write before starting a new object." + "minimum": 1, + "description": "Number of records to write before starting a new object.", + "default": 1000 } }, "additionalProperties": false, diff --git a/plugins/destination/kafka/client/spec/schema_test.go b/plugins/destination/kafka/client/spec/schema_test.go index 3bcf9db3e77941..e91c3640600f0e 100644 --- a/plugins/destination/kafka/client/spec/schema_test.go +++ b/plugins/destination/kafka/client/spec/schema_test.go @@ -18,182 +18,166 @@ func TestSpecJSONSchema(t *testing.T) { "skip_header": true, "delimiter": "#" }, - "path": "abc", - "bucket": "abc" + "brokers": ["abc"] }`, }, { Name: "bad format value", // also a part of embedded FileSpec testing - Spec: `{"format": "cs22v", "path": "abc"}`, + Spec: `{"format": "cs22v", "brokers": ["abc"]}`, Err: true, }, { - Name: "minimal", - Spec: `{"format": "csv", "path": "abc", "bucket": "abc"}`, - }, - { - Name: "missing path", - Spec: `{"format": "csv", "bucket": "abc"}`, + Name: "missing brokers", + Spec: `{"format": "csv"}`, Err: true, }, { - Name: "empty path", - Spec: `{"format": "csv", "path": "", "bucket": "abc"}`, + Name: "empty brokers", + Spec: `{"format": "csv", "brokers": []}`, Err: true, }, { - Name: "null path", - Spec: `{"format": "csv", "path": null, "bucket": "abc"}`, + Name: "null brokers", + Spec: `{"format": "csv", "brokers": null}`, Err: true, }, { - Name: "integer path", - Spec: `{"format": "csv", "path": 123, "bucket": "abc"}`, + Name: "integer brokers", + Spec: `{"format": "csv", "brokers": 123}`, Err: true, }, { - Name: "missing bucket", - Spec: `{"format": "csv", "path": "abc"}`, + Name: "empty brokers value", + Spec: `{"format": "csv", "brokers": [""]}`, Err: true, }, { - Name: "empty bucket", - Spec: `{"format": "csv", "path": "abc", "bucket": ""}`, + Name: "null brokers value", + Spec: `{"format": "csv", "brokers": [null]}`, Err: true, }, { - Name: "null bucket", - Spec: `{"format": "csv", "path": "abc", "bucket": null}`, + Name: "integer brokers value", + Spec: `{"format": "csv", "brokers": [123]}`, Err: true, }, { - Name: "integer bucket", - Spec: `{"format": "csv", "path": "abc", "bucket": 123}`, - Err: true, + Name: "proper brokers", + Spec: `{"format": "csv", "brokers": ["abc"]}`, }, - { - Name: "null no_rotate", - Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate": null}`, + Name: "null verbose", + Spec: `{"format": "csv", "brokers": ["abc"], "verbose": null}`, Err: true, }, { - Name: "bad no_rotate", - Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate": 123}`, + Name: "integer verbose", + Spec: `{"format": "csv", "brokers": ["abc"], "verbose": 123}`, Err: true, }, { - Name: "no_rotate:true", - Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate": true}`, + Name: "verbose:true", + Spec: `{"format": "csv", "brokers": ["abc"], "verbose": true}`, }, { - Name: "no_rotate:false", - Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate": false}`, + Name: "verbose:false", + Spec: `{"format": "csv", "brokers": ["abc"], "verbose": false}`, }, + // sasl_username & sasl_password have to go together { - Name: "zero batch_size", - Err: true, - Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "batch_size":0}`, + Name: "empty sasl_username with empty sasl_password", + Spec: `{"format": "csv", "brokers": ["abc"], "sasl_username": "", "sasl_password": ""}`, }, { - Name: "float batch_size", - Err: true, - Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "batch_size":5.3}`, + Name: "non-empty sasl_username with non-empty sasl_password", + Spec: `{"format": "csv", "brokers": ["abc"], "sasl_username": "user", "sasl_password": "password"}`, }, { - Name: "bad batch_size", + Name: "non-empty sasl_username without sasl_password", + Spec: `{"format": "csv", "brokers": ["abc"], "sasl_username": "user"}`, Err: true, - Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "batch_size":false}`, }, { - Name: "null batch_size", - Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "batch_size":null}`, - }, - { - Name: "proper batch_size", - Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "batch_size":123}`, + Name: "non-empty sasl_username with empty sasl_password", + Spec: `{"format": "csv", "brokers": ["abc"], "sasl_username": "user", "sasl_password": ""}`, + Err: true, }, { - Name: "zero batch_size_bytes", + Name: "non-empty sasl_username with integer sasl_password", + Spec: `{"format": "csv", "brokers": ["abc"], "sasl_username": "user", "sasl_password": 123}`, Err: true, - Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "batch_size_bytes":0}`, }, { - Name: "float batch_size_bytes", + Name: "non-empty sasl_username with null sasl_password", + Spec: `{"format": "csv", "brokers": ["abc"], "sasl_username": "user", "sasl_password": null}`, Err: true, - Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "batch_size_bytes":5.3}`, }, { - Name: "bad batch_size_bytes", + Name: "non-empty sasl_password without sasl_username", + Spec: `{"format": "csv", "brokers": ["abc"], "sasl_password": "password"}`, Err: true, - Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "batch_size_bytes":false}`, }, { - Name: "null batch_size_bytes", - Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "batch_size_bytes":null}`, + Name: "non-empty sasl_password with empty sasl_username", + Spec: `{"format": "csv", "brokers": ["abc"], "sasl_username": "", "sasl_password": "password"}`, + Err: true, }, { - Name: "proper batch_size_bytes", - Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "batch_size_bytes":123}`, + Name: "non-empty sasl_password with integer sasl_username", + Spec: `{"format": "csv", "brokers": ["abc"], "sasl_username": 123, "sasl_password": "password"}`, + Err: true, }, - // configtype.Duration is tested in plugin-sdk - // test only null here { - Name: "null batch_timeout", - Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "batch_timeout":null}`, + Name: "non-empty sasl_password with null sasl_username", + Spec: `{"format": "csv", "brokers": ["abc"], "sasl_username": null, "sasl_password": "password"}`, + Err: true, }, - - // no_rotate + batching { - Name: "no_rotate:false & batch_size:100", - Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate":false, "batch_size":100}`, + Name: "empty client_id", + Spec: `{"format": "csv", "brokers": ["abc"], "client_id": ""}`, }, { - Name: "no_rotate:true & batch_size:100", - Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate":true, "batch_size":100}`, + Name: "null client_id", + Spec: `{"format": "csv", "brokers": ["abc"], "client_id": null}`, Err: true, }, { - Name: "no_rotate:false & batch_size:null", - Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate":false, "batch_size":null}`, - }, - { - Name: "no_rotate:true & batch_size:null", - Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate":true, "batch_size":null}`, + Name: "integer client_id", + Spec: `{"format": "csv", "brokers": ["abc"], "client_id": 123}`, + Err: true, }, { - Name: "no_rotate:false & batch_size_bytes:100", - Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate":false, "batch_size_bytes":100}`, + Name: "proper client_id", + Spec: `{"format": "csv", "brokers": ["abc"], "client_id": "abc"}`, }, { - Name: "no_rotate:true & batch_size_bytes:100", - Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate":true, "batch_size_bytes":100}`, + Name: "zero batch_size", + Spec: `{"format": "csv", "brokers": ["abc"], "batch_size": 0}`, Err: true, }, { - Name: "no_rotate:false & batch_size_bytes:null", - Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate":false, "batch_size_bytes":null}`, - }, - { - Name: "no_rotate:true & batch_size_bytes:null", - Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate":true, "batch_size_bytes":null}`, + Name: "float batch_size", + Spec: `{"format": "csv", "brokers": ["abc"], "batch_size": 1.5}`, + Err: true, }, { - Name: "no_rotate:false & batch_timeout:100s", - Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate":false, "batch_timeout":"100s"}`, + Name: "negative batch_size", + Spec: `{"format": "csv", "brokers": ["abc"], "batch_size": -1}`, + Err: true, }, { - Name: "no_rotate:true & batch_timeout:100s", - Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate":true, "batch_timeout":"100s"}`, + Name: "null batch_size", + Spec: `{"format": "csv", "brokers": ["abc"], "batch_size": null}`, Err: true, }, { - Name: "no_rotate:false & batch_timeout:null", - Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate":false, "batch_timeout":null}`, + Name: "string batch_size", + Spec: `{"format": "csv", "brokers": ["abc"], "batch_size": "abc"}`, + Err: true, }, { - Name: "no_rotate:true & batch_timeout:null", - Spec: `{"format": "csv", "path": "abc", "bucket": "abc", "no_rotate":true, "batch_timeout":null}`, + Name: "proper batch_size", + Spec: `{"format": "csv", "brokers": ["abc"], "batch_size": 100}`, }, }) } diff --git a/plugins/destination/kafka/client/spec/spec.go b/plugins/destination/kafka/client/spec/spec.go index b26ec629fc22df..5d6d636553047c 100644 --- a/plugins/destination/kafka/client/spec/spec.go +++ b/plugins/destination/kafka/client/spec/spec.go @@ -25,7 +25,7 @@ type Spec struct { ClientID string `json:"client_id,omitempty" jsonschema:"default=cq-destination-kafka"` // Number of records to write before starting a new object. - BatchSize int `json:"batch_size"` + BatchSize int `json:"batch_size" jsonschema:"minimum=1,default=1000"` } func (s *Spec) SetDefaults() { @@ -35,7 +35,7 @@ func (s *Spec) SetDefaults() { s.ClientID = "cq-destination-kafka" } - if s.BatchSize == 0 { + if s.BatchSize < 1 { s.BatchSize = 1000 } }