diff --git a/.github/workflows/dest_azblob.yml b/.github/workflows/dest_azblob.yml index 34c0bc874273a6..685659779a1d4c 100644 --- a/.github/workflows/dest_azblob.yml +++ b/.github/workflows/dest_azblob.yml @@ -42,6 +42,12 @@ jobs: args: "--config ../../.golangci.yml" skip-pkg-cache: true skip-build-cache: true + - name: gen + if: github.event_name == 'pull_request' + run: make gen + - name: Fail if generation updated files + if: github.event_name == 'pull_request' + run: test "$(git status -s | wc -l)" -eq 0 || (git status -s; exit 1) - name: Build run: go build . - name: Test azblob plugin diff --git a/plugins/destination/azblob/Makefile b/plugins/destination/azblob/Makefile index f9ab6c2eac145b..0693013f2cba0d 100644 --- a/plugins/destination/azblob/Makefile +++ b/plugins/destination/azblob/Makefile @@ -5,3 +5,13 @@ test: .PHONY: lint lint: golangci-lint run --config ../../.golangci.yml + +.PHONY: gen-spec-schema +gen-spec-schema: + # required for loading comments from filetypes + go mod vendor + go run client/spec/gen/main.go + +# All gen targets +.PHONY: gen +gen: gen-spec-schema diff --git a/plugins/destination/azblob/client/client.go b/plugins/destination/azblob/client/client.go index e323875b4bf104..085e21532f224f 100644 --- a/plugins/destination/azblob/client/client.go +++ b/plugins/destination/azblob/client/client.go @@ -8,6 +8,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" + "github.com/cloudquery/cloudquery/plugins/destination/azblob/client/spec" "github.com/cloudquery/filetypes/v4" "github.com/cloudquery/plugin-sdk/v4/plugin" "github.com/cloudquery/plugin-sdk/v4/writers/streamingbatchwriter" @@ -21,14 +22,14 @@ type Client struct { streamingbatchwriter.UnimplementedDeleteRecords logger zerolog.Logger - spec *Spec + spec *spec.Spec *filetypes.Client writer *streamingbatchwriter.StreamingBatchWriter storageClient *azblob.Client } -func New(ctx context.Context, logger zerolog.Logger, spec []byte, opts plugin.NewClientOptions) (plugin.Client, error) { +func New(ctx context.Context, logger zerolog.Logger, s []byte, opts plugin.NewClientOptions) (plugin.Client, error) { c := &Client{ logger: logger.With().Str("module", "azb").Logger(), } @@ -36,7 +37,7 @@ func New(ctx context.Context, logger zerolog.Logger, spec []byte, opts plugin.Ne return c, nil } - if err := json.Unmarshal(spec, &c.spec); err != nil { + if err := json.Unmarshal(s, &c.spec); err != nil { return nil, fmt.Errorf("failed to unmarshal azblob spec: %w", err) } if err := c.spec.Validate(); err != nil { @@ -44,7 +45,7 @@ func New(ctx context.Context, logger zerolog.Logger, spec []byte, opts plugin.Ne } c.spec.SetDefaults() - filetypesClient, err := filetypes.NewClient(c.spec.FileSpec) + filetypesClient, err := filetypes.NewClient(&c.spec.FileSpec) if err != nil { return nil, fmt.Errorf("failed to create filetypes client: %w", err) } diff --git a/plugins/destination/azblob/client/client_test.go b/plugins/destination/azblob/client/client_test.go index d47c270545b6f1..ca9a47186a1ba6 100644 --- a/plugins/destination/azblob/client/client_test.go +++ b/plugins/destination/azblob/client/client_test.go @@ -10,6 +10,7 @@ import ( "github.com/apache/arrow/go/v15/arrow" "github.com/apache/arrow/go/v15/arrow/array" "github.com/apache/arrow/go/v15/arrow/memory" + "github.com/cloudquery/cloudquery/plugins/destination/azblob/client/spec" "github.com/cloudquery/filetypes/v4" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/plugin" @@ -19,8 +20,8 @@ import ( ) const ( - storage_account = "cqdestinationazblob" - container = "test" + storageAccount = "cqdestinationazblob" + container = "test" ) func TestPlugin(t *testing.T) { @@ -29,30 +30,28 @@ func TestPlugin(t *testing.T) { filetypes.FormatTypeJSON, filetypes.FormatTypeParquet, } { - spec := Spec{ - StorageAccount: storage_account, + s := spec.Spec{ + StorageAccount: storageAccount, Container: container, Path: t.TempDir(), NoRotate: true, - FileSpec: &filetypes.FileSpec{ - Format: ft, - }, + FileSpec: filetypes.FileSpec{Format: ft}, } t.Run("generic/"+string(ft), func(t *testing.T) { - testPlugin(t, &spec) + testPlugin(t, &s) }) t.Run("write/"+string(ft), func(t *testing.T) { - testPluginCustom(t, &spec) + testPluginCustom(t, &s) }) } } -func testPlugin(t *testing.T, spec *Spec) { +func testPlugin(t *testing.T, s *spec.Spec) { ctx := context.Background() p := plugin.NewPlugin("azblob", "development", New) - b, err := json.Marshal(spec) + b, err := json.Marshal(s) if err != nil { t.Fatal(err) } @@ -71,7 +70,7 @@ func testPlugin(t *testing.T, spec *Spec) { ) } -func testPluginCustom(t *testing.T, spec *Spec) { +func testPluginCustom(t *testing.T, s *spec.Spec) { ctx := context.Background() var client plugin.Client @@ -81,7 +80,7 @@ func testPluginCustom(t *testing.T, spec *Spec) { client, err = New(ctx, logger, spec, opts) return client, err }) - b, err := json.Marshal(spec) + b, err := json.Marshal(s) if err != nil { t.Fatal(err) } diff --git a/plugins/destination/azblob/client/spec.go b/plugins/destination/azblob/client/spec.go deleted file mode 100644 index c3e326c9462b67..00000000000000 --- a/plugins/destination/azblob/client/spec.go +++ /dev/null @@ -1,71 +0,0 @@ -package client - -import ( - "fmt" - "time" - - "github.com/cloudquery/filetypes/v4" - "github.com/cloudquery/plugin-sdk/v4/configtype" -) - -type Spec struct { - StorageAccount string `json:"storage_account,omitempty"` - Container string `json:"container,omitempty"` - Path string `json:"path,omitempty"` - NoRotate bool `json:"no_rotate,omitempty"` - *filetypes.FileSpec - - BatchSize *int64 `json:"batch_size"` - BatchSizeBytes *int64 `json:"batch_size_bytes"` - BatchTimeout *configtype.Duration `json:"batch_timeout"` -} - -func (s *Spec) SetDefaults() { - if s.BatchSize == nil { - if s.NoRotate { - s.BatchSize = int64ptr(0) - } else { - s.BatchSize = int64ptr(10000) - } - } - if s.BatchSizeBytes == nil { - if s.NoRotate { - s.BatchSizeBytes = int64ptr(0) - } else { - s.BatchSizeBytes = int64ptr(50 * 1024 * 1024) // 50 MiB - } - } - if s.BatchTimeout == nil { - if s.NoRotate { - d := configtype.NewDuration(0) - s.BatchTimeout = &d - } else { - d := configtype.NewDuration(30 * time.Second) - s.BatchTimeout = &d - } - } -} - -func (s *Spec) Validate() error { - if s.StorageAccount == "" { - return fmt.Errorf("`storage_account` is required") - } - if s.Container == "" { - return fmt.Errorf("`container` is required") - } - if s.Path == "" { - return fmt.Errorf("`path` is required") - } - if s.Format == "" { - return fmt.Errorf("`format` is required") - } - if s.NoRotate && ((s.BatchSize != nil && *s.BatchSize > 0) || (s.BatchSizeBytes != nil && *s.BatchSizeBytes > 0) || (s.BatchTimeout != nil && s.BatchTimeout.Duration() > 0)) { - return fmt.Errorf("`no_rotate` cannot be used with non-zero `batch_size`, `batch_size_bytes` or `batch_timeout_ms`") - } - - return nil -} - -func int64ptr(i int64) *int64 { - return &i -} diff --git a/plugins/destination/azblob/client/spec/gen/main.go b/plugins/destination/azblob/client/spec/gen/main.go new file mode 100644 index 00000000000000..51933b75593bd4 --- /dev/null +++ b/plugins/destination/azblob/client/spec/gen/main.go @@ -0,0 +1,30 @@ +package main + +import ( + "fmt" + "log" + "path" + "runtime" + + "github.com/cloudquery/cloudquery/plugins/destination/azblob/client/spec" + "github.com/cloudquery/codegen/jsonschema" + "github.com/cloudquery/filetypes/v4" +) + +func main() { + fmt.Println("Generating JSON schema for plugin spec") + jsonschema.GenerateIntoFile(new(spec.Spec), path.Join(currDir(), "..", "schema.json"), + append(filetypes.FileSpec{}.JSONSchemaOptions(), + jsonschema.WithAddGoComments("github.com/cloudquery/cloudquery/plugins/destination/azblob/client/spec", path.Join(currDir(), "..")), + jsonschema.WithAddGoComments("github.com/cloudquery/filetypes/v4", path.Join(currDir(), "..", "..", "..", "vendor", "github.com/cloudquery/filetypes/v4")), + )..., + ) +} + +func currDir() string { + _, filename, _, ok := runtime.Caller(0) + if !ok { + log.Fatal("Failed to get caller information") + } + return path.Dir(filename) +} diff --git a/plugins/destination/azblob/client/spec/schema.go b/plugins/destination/azblob/client/spec/schema.go new file mode 100644 index 00000000000000..4d029a8f32e9b7 --- /dev/null +++ b/plugins/destination/azblob/client/spec/schema.go @@ -0,0 +1,48 @@ +package spec + +import ( + _ "embed" + + "github.com/invopop/jsonschema" + orderedmap "github.com/wk8/go-ordered-map/v2" +) + +func (s Spec) JSONSchemaExtend(sc *jsonschema.Schema) { + s.FileSpec.JSONSchemaExtend(sc) // need to call manually + + batchTimeout := sc.Properties.Value("batch_timeout").OneOf[0] // 0 - val, 1 - null + batchTimeout.Default = "30s" + + // no_rotate:true -> only nulls for batch options + noRotateNoBatch := &jsonschema.Schema{ + Title: "Disallow batching when using no_rotate", + If: &jsonschema.Schema{ + Properties: func() *orderedmap.OrderedMap[string, *jsonschema.Schema] { + noRotate := *sc.Properties.Value("no_rotate") + noRotate.Default = nil + noRotate.Const = true + noRotate.Description = "" + properties := orderedmap.New[string, *jsonschema.Schema]() + properties.Set("no_rotate", &noRotate) + return properties + }(), + Required: []string{"no_rotate"}, + }, + Then: &jsonschema.Schema{ + Properties: func() *orderedmap.OrderedMap[string, *jsonschema.Schema] { + // we make the non-zero requirement, so we want to allow only null here + null := &jsonschema.Schema{Type: "null"} + properties := orderedmap.New[string, *jsonschema.Schema]() + properties.Set("batch_size", null) + properties.Set("batch_size_bytes", null) + properties.Set("batch_timeout", null) + return properties + }(), + }, + } + + sc.AllOf = append(sc.AllOf, noRotateNoBatch) +} + +//go:embed schema.json +var JSONSchema string diff --git a/plugins/destination/azblob/client/spec/schema.json b/plugins/destination/azblob/client/spec/schema.json new file mode 100644 index 00000000000000..49d94aca60a70c --- /dev/null +++ b/plugins/destination/azblob/client/spec/schema.json @@ -0,0 +1,232 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/cloudquery/cloudquery/plugins/destination/azblob/client/spec/spec", + "$ref": "#/$defs/Spec", + "$defs": { + "CSVSpec": { + "properties": { + "skip_header": { + "type": "boolean", + "description": "Specifies if the first line of a file should be the header.", + "default": false + }, + "delimiter": { + "type": "string", + "pattern": "^.$", + "description": "Character that will be used as the delimiter.", + "default": "," + } + }, + "additionalProperties": false, + "type": "object", + "description": "CloudQuery CSV file output spec." + }, + "Duration": { + "type": "string", + "pattern": "^[-+]?([0-9]*(\\.[0-9]*)?[a-z]+)+$", + "title": "CloudQuery configtype.Duration" + }, + "JSONSpec": { + "additionalProperties": false, + "type": "object", + "description": "CloudQuery JSON file output spec." + }, + "ParquetSpec": { + "additionalProperties": false, + "type": "object", + "description": "CloudQuery Parquet file output spec." + }, + "Spec": { + "allOf": [ + { + "if": { + "properties": { + "no_rotate": { + "type": "boolean", + "const": true + } + }, + "required": [ + "no_rotate" + ] + }, + "then": { + "properties": { + "batch_size": { + "type": "null" + }, + "batch_size_bytes": { + "type": "null" + }, + "batch_timeout": { + "type": "null" + } + } + }, + "title": "Disallow batching when using no_rotate" + } + ], + "oneOf": [ + { + "properties": { + "format": { + "type": "string", + "const": "csv" + }, + "format_spec": { + "oneOf": [ + { + "$ref": "#/$defs/CSVSpec" + }, + { + "type": "null" + } + ] + } + } + }, + { + "properties": { + "format": { + "type": "string", + "const": "json" + }, + "format_spec": { + "oneOf": [ + { + "$ref": "#/$defs/JSONSpec" + }, + { + "type": "null" + } + ] + } + } + }, + { + "properties": { + "format": { + "type": "string", + "const": "parquet" + }, + "format_spec": { + "oneOf": [ + { + "$ref": "#/$defs/ParquetSpec" + }, + { + "type": "null" + } + ] + } + } + } + ], + "properties": { + "format": { + "type": "string", + "enum": [ + "csv", + "json", + "parquet" + ], + "description": "Output format." + }, + "format_spec": { + "oneOf": [ + { + "anyOf": [ + { + "$ref": "#/$defs/CSVSpec" + }, + { + "$ref": "#/$defs/JSONSpec" + }, + { + "$ref": "#/$defs/ParquetSpec" + } + ] + }, + { + "type": "null" + } + ] + }, + "compression": { + "type": "string", + "enum": [ + "", + "gzip" + ], + "description": "Compression type.\nEmpty or missing stands for no compression." + }, + "storage_account": { + "type": "string", + "minLength": 1, + "description": "Storage account where to sync the files." + }, + "container": { + "type": "string", + "minLength": 1, + "description": "Storage container inside the storage account where to sync the files." + }, + "path": { + "type": "string", + "minLength": 1, + "description": "Path to where the files will be uploaded in the storage container." + }, + "no_rotate": { + "type": "boolean", + "description": "If set to `true`, the plugin will write to one file per table.\nOtherwise, for every batch a new file will be created with a different `.\u003cUUID\u003e` suffix.", + "default": false + }, + "batch_size": { + "oneOf": [ + { + "type": "integer", + "minimum": 1, + "description": "This parameter controls the maximum amount of items may be grouped together to be written in a single object.\n\nDefaults to `10000` unless `no_rotate` is `true` (will be `0` then).", + "default": 10000 + }, + { + "type": "null" + } + ] + }, + "batch_size_bytes": { + "oneOf": [ + { + "type": "integer", + "minimum": 1, + "description": "This parameter controls the maximum size of items that may be grouped together to be written in a single object.\n\nDefaults to `52428800` (50 MiB) unless `no_rotate` is `true` (will be `0` then).", + "default": 52428800 + }, + { + "type": "null" + } + ] + }, + "batch_timeout": { + "oneOf": [ + { + "$ref": "#/$defs/Duration", + "description": "This parameter controls the maximum interval between batch writes.\n\nDefaults to `30s` unless `no_rotate` is `true` (will be `0s` then).", + "default": "30s" + }, + { + "type": "null" + } + ] + } + }, + "additionalProperties": false, + "type": "object", + "required": [ + "format", + "storage_account", + "container", + "path" + ] + } + } +} diff --git a/plugins/destination/azblob/client/spec/schema_test.go b/plugins/destination/azblob/client/spec/schema_test.go new file mode 100644 index 00000000000000..6123d6deac09d1 --- /dev/null +++ b/plugins/destination/azblob/client/spec/schema_test.go @@ -0,0 +1,220 @@ +package spec + +import ( + "testing" + + "github.com/cloudquery/codegen/jsonschema" +) + +func TestSpecJSONSchema(t *testing.T) { + // cases about embedded filetypes.FileSpec are tested in the corresponding package + // However, we add some tests to verify that it actually is properly working + jsonschema.TestJSONSchema(t, JSONSchema, []jsonschema.TestCase{ + { + Name: "csv file spec", + Spec: `{ + "format": "csv", + "format_spec": { + "skip_header": true, + "delimiter": "#" + }, + "path": "abc", + "storage_account": "sa", + "container": "c" +}`, + }, + { + Name: "bad format value", // also a part of embedded FileSpec testing + Spec: `{"format": "cs22v", "path": "abc"}`, + Err: true, + }, + { + Name: "minimal", + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": "c"}`, + }, + { + Name: "missing path", + Spec: `{"format": "csv", "storage_account": "sa", "container": "c"}`, + Err: true, + }, + { + Name: "empty path", + Spec: `{"format": "csv", "path": "", "storage_account": "sa", "container": "c"}`, + Err: true, + }, + { + Name: "null path", + Spec: `{"format": "csv", "path": null, "storage_account": "sa", "container": "c"}`, + Err: true, + }, + { + Name: "integer path", + Spec: `{"format": "csv", "path": 123, "storage_account": "sa", "container": "c"}`, + Err: true, + }, + { + Name: "missing storage_account", + Spec: `{"format": "csv", "path": "abc", "container": "c"}`, + Err: true, + }, + { + Name: "empty storage_account", + Spec: `{"format": "csv", "path": "abc", "storage_account": "", "container": "c"}`, + Err: true, + }, + { + Name: "null storage_account", + Spec: `{"format": "csv", "path": "abc", "storage_account": null, "container": "c"}`, + Err: true, + }, + { + Name: "integer storage_account", + Spec: `{"format": "csv", "path": "abc", "storage_account": 123, "container": "c"}`, + Err: true, + }, + { + Name: "missing container", + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa"}`, + Err: true, + }, + { + Name: "empty container", + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": ""}`, + Err: true, + }, + { + Name: "null container", + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": null}`, + Err: true, + }, + { + Name: "integer container", + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": 123}`, + Err: true, + }, + + { + Name: "null no_rotate", + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": "c", "no_rotate": null}`, + Err: true, + }, + { + Name: "bad no_rotate", + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": "c", "no_rotate": 123}`, + Err: true, + }, + { + Name: "no_rotate:true", + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": "c", "no_rotate": true}`, + }, + { + Name: "no_rotate:false", + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": "c", "no_rotate": false}`, + }, + { + Name: "zero batch_size", + Err: true, + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": "c", "batch_size":0}`, + }, + { + Name: "float batch_size", + Err: true, + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": "c", "batch_size":5.3}`, + }, + { + Name: "bad batch_size", + Err: true, + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": "c", "batch_size":false}`, + }, + { + Name: "null batch_size", + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": "c", "batch_size":null}`, + }, + { + Name: "proper batch_size", + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": "c", "batch_size":123}`, + }, + { + Name: "zero batch_size_bytes", + Err: true, + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": "c", "batch_size_bytes":0}`, + }, + { + Name: "float batch_size_bytes", + Err: true, + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": "c", "batch_size_bytes":5.3}`, + }, + { + Name: "bad batch_size_bytes", + Err: true, + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": "c", "batch_size_bytes":false}`, + }, + { + Name: "null batch_size_bytes", + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": "c", "batch_size_bytes":null}`, + }, + { + Name: "proper batch_size_bytes", + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": "c", "batch_size_bytes":123}`, + }, + // configtype.Duration is tested in plugin-sdk + // test only null here + { + Name: "null batch_timeout", + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": "c", "batch_timeout":null}`, + }, + + // no_rotate + batching + { + Name: "no_rotate:false & batch_size:100", + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": "c", "no_rotate":false, "batch_size":100}`, + }, + { + Name: "no_rotate:true & batch_size:100", + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": "c", "no_rotate":true, "batch_size":100}`, + Err: true, + }, + { + Name: "no_rotate:false & batch_size:null", + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": "c", "no_rotate":false, "batch_size":null}`, + }, + { + Name: "no_rotate:true & batch_size:null", + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": "c", "no_rotate":true, "batch_size":null}`, + }, + { + Name: "no_rotate:false & batch_size_bytes:100", + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": "c", "no_rotate":false, "batch_size_bytes":100}`, + }, + { + Name: "no_rotate:true & batch_size_bytes:100", + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": "c", "no_rotate":true, "batch_size_bytes":100}`, + Err: true, + }, + { + Name: "no_rotate:false & batch_size_bytes:null", + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": "c", "no_rotate":false, "batch_size_bytes":null}`, + }, + { + Name: "no_rotate:true & batch_size_bytes:null", + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": "c", "no_rotate":true, "batch_size_bytes":null}`, + }, + { + Name: "no_rotate:false & batch_timeout:100s", + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": "c", "no_rotate":false, "batch_timeout":"100s"}`, + }, + { + Name: "no_rotate:true & batch_timeout:100s", + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": "c", "no_rotate":true, "batch_timeout":"100s"}`, + Err: true, + }, + { + Name: "no_rotate:false & batch_timeout:null", + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": "c", "no_rotate":false, "batch_timeout":null}`, + }, + { + Name: "no_rotate:true & batch_timeout:null", + Spec: `{"format": "csv", "path": "abc", "storage_account": "sa", "container": "c", "no_rotate":true, "batch_timeout":null}`, + }, + }) +} diff --git a/plugins/destination/azblob/client/spec/spec.go b/plugins/destination/azblob/client/spec/spec.go new file mode 100644 index 00000000000000..d524eba3fd9ed8 --- /dev/null +++ b/plugins/destination/azblob/client/spec/spec.go @@ -0,0 +1,96 @@ +package spec + +import ( + "fmt" + "time" + + "github.com/cloudquery/filetypes/v4" + "github.com/cloudquery/plugin-sdk/v4/configtype" +) + +type Spec struct { + filetypes.FileSpec + + // Storage account where to sync the files. + StorageAccount string `json:"storage_account,omitempty" jsonschema:"required,minLength=1"` + + // Storage container inside the storage account where to sync the files. + Container string `json:"container,omitempty" jsonschema:"required,minLength=1"` + + // Path to where the files will be uploaded in the storage container. + Path string `json:"path,omitempty" jsonschema:"required,minLength=1"` + + // If set to `true`, the plugin will write to one file per table. + // Otherwise, for every batch a new file will be created with a different `.` suffix. + NoRotate bool `json:"no_rotate,omitempty" jsonschema:"default=false"` + + // This parameter controls the maximum amount of items may be grouped together to be written in a single object. + // + // Defaults to `10000` unless `no_rotate` is `true` (will be `0` then). + BatchSize *int64 `json:"batch_size" jsonschema:"minimum=1,default=10000"` + + // This parameter controls the maximum size of items that may be grouped together to be written in a single object. + // + // Defaults to `52428800` (50 MiB) unless `no_rotate` is `true` (will be `0` then). + BatchSizeBytes *int64 `json:"batch_size_bytes" jsonschema:"minimum=1,default=52428800"` + + // This parameter controls the maximum interval between batch writes. + // + // Defaults to `30s` unless `no_rotate` is `true` (will be `0s` then). + BatchTimeout *configtype.Duration `json:"batch_timeout" jsonschema:"default=30s"` +} + +func (s *Spec) SetDefaults() { + if s.BatchSize == nil { + if s.NoRotate { + s.BatchSize = int64ptr(0) + } else { + s.BatchSize = int64ptr(10000) + } + } + if s.BatchSizeBytes == nil { + if s.NoRotate { + s.BatchSizeBytes = int64ptr(0) + } else { + s.BatchSizeBytes = int64ptr(50 * 1024 * 1024) // 50 MiB + } + } + if s.BatchTimeout == nil { + if s.NoRotate { + d := configtype.NewDuration(0) + s.BatchTimeout = &d + } else { + d := configtype.NewDuration(30 * time.Second) + s.BatchTimeout = &d + } + } +} + +func (s *Spec) Validate() error { + if len(s.StorageAccount) == 0 { + return fmt.Errorf("`storage_account` is required") + } + if len(s.Container) == 0 { + return fmt.Errorf("`container` is required") + } + if len(s.Path) == 0 { + return fmt.Errorf("`path` is required") + } + + if s.NoRotate && ((s.BatchSize != nil && *s.BatchSize > 0) || (s.BatchSizeBytes != nil && *s.BatchSizeBytes > 0) || (s.BatchTimeout != nil && s.BatchTimeout.Duration() > 0)) { + return fmt.Errorf("`no_rotate` cannot be used with non-zero `batch_size`, `batch_size_bytes` or `batch_timeout`") + } + + // required for s.FileSpec.Validate call + err := s.FileSpec.UnmarshalSpec() + if err != nil { + return err + } + s.FileSpec.SetDefaults() + + return s.FileSpec.Validate() +} + +func int64ptr(i int64) *int64 { + return &i +} diff --git a/plugins/destination/azblob/client/spec/spec_test.go b/plugins/destination/azblob/client/spec/spec_test.go new file mode 100644 index 00000000000000..25e6343b9c4807 --- /dev/null +++ b/plugins/destination/azblob/client/spec/spec_test.go @@ -0,0 +1,74 @@ +package spec + +import ( + "fmt" + "testing" + "time" + + "github.com/cloudquery/filetypes/v4" + "github.com/cloudquery/plugin-sdk/v4/configtype" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/require" +) + +const ( + storageAccount = "cqdestinationazblob" + container = "test" +) + +func TestSpec_SetDefaults(t *testing.T) { + dur0, dur30 := configtype.NewDuration(0), configtype.NewDuration(30*time.Second) + + cases := []struct { + Give Spec + Want Spec + }{ + + { + Give: Spec{Path: "test/path", FileSpec: filetypes.FileSpec{Format: "json"}}, + Want: Spec{Path: "test/path", FileSpec: filetypes.FileSpec{Format: "json"}, BatchSize: int64Ptr(10000), BatchSizeBytes: int64Ptr(50 * 1024 * 1024), BatchTimeout: &dur30}, + }, + { + Give: Spec{Path: "test/path", FileSpec: filetypes.FileSpec{Format: "json"}, NoRotate: true}, + Want: Spec{Path: "test/path", FileSpec: filetypes.FileSpec{Format: "json"}, NoRotate: true, BatchSize: int64Ptr(0), BatchSizeBytes: int64Ptr(0), BatchTimeout: &dur0}, + }, + } + for _, tc := range cases { + got := tc.Give + got.SetDefaults() + if diff := cmp.Diff(tc.Want, got, cmpopts.IgnoreUnexported(filetypes.FileSpec{}, configtype.Duration{})); diff != "" { + t.Errorf("SetDefaults() mismatch (-want +got):\n%s", diff) + } + require.Equal(t, tc.Want.BatchTimeout, got.BatchTimeout) + } +} + +func TestSpec_Validate(t *testing.T) { + zero, one, dur0 := int64(0), int64(1), configtype.NewDuration(0) + cases := []struct { + Give Spec + WantErr bool + }{ + {Give: Spec{Path: "test/path", FileSpec: filetypes.FileSpec{Format: "json"}, StorageAccount: storageAccount, Container: container, BatchSize: &zero, BatchSizeBytes: &zero}, WantErr: false}, + {Give: Spec{Path: "test/path", FileSpec: filetypes.FileSpec{Format: "json"}, Container: container, BatchSize: &zero, BatchSizeBytes: &zero}, WantErr: true}, // no StorageAccount + {Give: Spec{Path: "test/path", FileSpec: filetypes.FileSpec{Format: "json"}, NoRotate: false, StorageAccount: storageAccount, Container: container, BatchSize: &zero, BatchSizeBytes: &zero, BatchTimeout: &dur0}, WantErr: false}, + {Give: Spec{Path: "test/path", FileSpec: filetypes.FileSpec{Format: "json"}, NoRotate: true, StorageAccount: storageAccount, Container: container, BatchSize: &zero, BatchSizeBytes: &zero, BatchTimeout: &dur0}, WantErr: false}, + {Give: Spec{Path: "test/path", FileSpec: filetypes.FileSpec{Format: "json"}, NoRotate: true, StorageAccount: storageAccount, Container: container, BatchSize: &one, BatchSizeBytes: &zero, BatchTimeout: &dur0}, WantErr: true}, + } + for i, tc := range cases { + tc := tc + t.Run(fmt.Sprintf("Case %d", i+1), func(t *testing.T) { + err := tc.Give.Validate() + if tc.WantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +func int64Ptr(i int64) *int64 { + return &i +} diff --git a/plugins/destination/azblob/client/spec_test.go b/plugins/destination/azblob/client/spec_test.go deleted file mode 100644 index 9875fbc77f7535..00000000000000 --- a/plugins/destination/azblob/client/spec_test.go +++ /dev/null @@ -1,69 +0,0 @@ -package client - -import ( - "fmt" - "testing" - "time" - - "github.com/cloudquery/filetypes/v4" - "github.com/cloudquery/plugin-sdk/v4/configtype" - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - "github.com/stretchr/testify/require" -) - -func TestSpec_SetDefaults(t *testing.T) { - dur0, dur30 := configtype.NewDuration(0), configtype.NewDuration(30*time.Second) - - cases := []struct { - Give Spec - Want Spec - }{ - - { - Give: Spec{Path: "test/path", FileSpec: &filetypes.FileSpec{Format: "json"}}, - Want: Spec{Path: "test/path", FileSpec: &filetypes.FileSpec{Format: "json"}, BatchSize: int64Ptr(10000), BatchSizeBytes: int64Ptr(50 * 1024 * 1024), BatchTimeout: &dur30}, - }, - { - Give: Spec{Path: "test/path", FileSpec: &filetypes.FileSpec{Format: "json"}, NoRotate: true}, - Want: Spec{Path: "test/path", FileSpec: &filetypes.FileSpec{Format: "json"}, NoRotate: true, BatchSize: int64Ptr(0), BatchSizeBytes: int64Ptr(0), BatchTimeout: &dur0}, - }, - } - for _, tc := range cases { - got := tc.Give - got.SetDefaults() - if diff := cmp.Diff(tc.Want, got, cmpopts.IgnoreUnexported(filetypes.FileSpec{}, configtype.Duration{})); diff != "" { - t.Errorf("SetDefaults() mismatch (-want +got):\n%s", diff) - } - require.Equal(t, tc.Want.BatchTimeout, got.BatchTimeout) - } -} - -func TestSpec_Validate(t *testing.T) { - zero, one, dur0 := int64(0), int64(1), configtype.NewDuration(0) - cases := []struct { - Give Spec - WantErr bool - }{ - {Give: Spec{Path: "test/path", FileSpec: &filetypes.FileSpec{Format: "json"}, StorageAccount: storage_account, Container: container, BatchSize: &zero, BatchSizeBytes: &zero}, WantErr: false}, - {Give: Spec{Path: "test/path", FileSpec: &filetypes.FileSpec{Format: "json"}, Container: container, BatchSize: &zero, BatchSizeBytes: &zero}, WantErr: true}, // no StorageAccount - {Give: Spec{Path: "test/path", FileSpec: &filetypes.FileSpec{Format: "json"}, NoRotate: false, StorageAccount: storage_account, Container: container, BatchSize: &zero, BatchSizeBytes: &zero, BatchTimeout: &dur0}, WantErr: false}, - {Give: Spec{Path: "test/path", FileSpec: &filetypes.FileSpec{Format: "json"}, NoRotate: true, StorageAccount: storage_account, Container: container, BatchSize: &zero, BatchSizeBytes: &zero, BatchTimeout: &dur0}, WantErr: false}, - {Give: Spec{Path: "test/path", FileSpec: &filetypes.FileSpec{Format: "json"}, NoRotate: true, StorageAccount: storage_account, Container: container, BatchSize: &one, BatchSizeBytes: &zero, BatchTimeout: &dur0}, WantErr: true}, - } - for i, tc := range cases { - tc := tc - t.Run(fmt.Sprintf("Case %d", i+1), func(t *testing.T) { - err := tc.Give.Validate() - if tc.WantErr { - require.Error(t, err) - } else { - require.NoError(t, err) - } - }) - } -} - -func int64Ptr(i int64) *int64 { - return &i -} diff --git a/plugins/destination/azblob/client/write.go b/plugins/destination/azblob/client/write.go index e9f3eda3a21b9f..f20b99325e937e 100644 --- a/plugins/destination/azblob/client/write.go +++ b/plugins/destination/azblob/client/write.go @@ -18,9 +18,9 @@ func (c *Client) WriteTable(ctx context.Context, msgs <-chan *message.WriteInser if s == nil { table := msg.GetTable() - name := fmt.Sprintf("%s/%s.%s%s.%s", c.spec.Path, table.Name, c.spec.Format, c.spec.FileSpec.Compression.Extension(), uuid.NewString()) - if c.spec.NoRotate { - name = fmt.Sprintf("%s/%s.%s%s", c.spec.Path, table.Name, c.spec.Format, c.spec.FileSpec.Compression.Extension()) + name := fmt.Sprintf("%s/%s.%s%s", c.spec.Path, table.Name, c.spec.Format, c.spec.FileSpec.Compression.Extension()) + if !c.spec.NoRotate { + name += "." + uuid.NewString() } var err error diff --git a/plugins/destination/azblob/docs/overview.md b/plugins/destination/azblob/docs/overview.md index 44625d36f4e547..b12d1d0ec9f4f4 100644 --- a/plugins/destination/azblob/docs/overview.md +++ b/plugins/destination/azblob/docs/overview.md @@ -40,6 +40,11 @@ This is the (nested) spec used by the Azure blob destination Plugin. Path to where the files will be uploaded in the above bucket. +- `no_rotate` (`boolean`) (optional) (default: `false`) + + If set to `true`, the plugin will write to one file per table. + Otherwise, for every batch a new file will be created with a different `.` suffix. + - `format` (`string`) (required) Format of the output file. Supported values are `csv`, `json` and `parquet`.