Skip to content

Commit 0afd883

Browse files
authored
feat: Add JSON schema to gcs destination plugin (#16439)
Closes #16407 Basically, an adapted version of #16404
1 parent 967e4a7 commit 0afd883

12 files changed

Lines changed: 634 additions & 89 deletions

File tree

.github/workflows/dest_gcs.yml

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,6 @@ jobs:
3131
- uses: actions/checkout@v4
3232
with:
3333
fetch-depth: 2
34-
- name: Authenticate to Google Cloud
35-
uses: 'google-github-actions/auth@v1'
36-
with:
37-
workload_identity_provider: 'projects/151868820337/locations/global/workloadIdentityPools/integration-test-pool/providers/integration-test-provider'
38-
service_account: 'integration-service-account@cq-integration-tests.iam.gserviceaccount.com'
3934
- name: Set up Go 1.x
4035
uses: actions/setup-go@v4
4136
with:
@@ -50,6 +45,17 @@ jobs:
5045
args: "--config ../../.golangci.yml"
5146
skip-pkg-cache: true
5247
skip-build-cache: true
48+
- name: gen
49+
if: github.event_name == 'pull_request'
50+
run: make gen
51+
- name: Fail if generation updated files
52+
if: github.event_name == 'pull_request'
53+
run: test "$(git status -s | wc -l)" -eq 0 || (git status -s; exit 1)
54+
- name: Authenticate to Google Cloud
55+
uses: 'google-github-actions/auth@v1'
56+
with:
57+
workload_identity_provider: 'projects/151868820337/locations/global/workloadIdentityPools/integration-test-pool/providers/integration-test-provider'
58+
service_account: 'integration-service-account@cq-integration-tests.iam.gserviceaccount.com'
5359
- name: Build
5460
run: go build .
5561
- name: Test file plugin

plugins/destination/gcs/Makefile

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,13 @@ test:
55
.PHONY: lint
66
lint:
77
golangci-lint run --config ../../.golangci.yml
8+
9+
.PHONY: gen-spec-schema
10+
gen-spec-schema:
11+
# required for loading comments from filetypes
12+
go mod vendor
13+
go run client/spec/gen/main.go
14+
15+
# All gen targets
16+
.PHONY: gen
17+
gen: gen-spec-schema

plugins/destination/gcs/client/client.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77

88
"cloud.google.com/go/storage"
9+
"github.com/cloudquery/cloudquery/plugins/destination/gcs/client/spec"
910
"github.com/cloudquery/filetypes/v4"
1011
"github.com/cloudquery/plugin-sdk/v4/writers/streamingbatchwriter"
1112

@@ -21,7 +22,7 @@ type Client struct {
2122
streamingbatchwriter.UnimplementedDeleteRecords
2223

2324
logger zerolog.Logger
24-
spec *Spec
25+
spec *spec.Spec
2526

2627
gcsClient *storage.Client
2728
bucket *storage.BucketHandle
@@ -30,20 +31,20 @@ type Client struct {
3031
writer *streamingbatchwriter.StreamingBatchWriter
3132
}
3233

33-
func New(ctx context.Context, logger zerolog.Logger, spec []byte, _ plugin.NewClientOptions) (plugin.Client, error) {
34+
func New(ctx context.Context, logger zerolog.Logger, s []byte, _ plugin.NewClientOptions) (plugin.Client, error) {
3435
c := &Client{
3536
logger: logger.With().Str("module", "gcs").Logger(),
3637
}
3738

38-
if err := json.Unmarshal(spec, &c.spec); err != nil {
39+
if err := json.Unmarshal(s, &c.spec); err != nil {
3940
return nil, fmt.Errorf("failed to unmarshal gcs spec: %w", err)
4041
}
4142
if err := c.spec.Validate(); err != nil {
4243
return nil, err
4344
}
4445
c.spec.SetDefaults()
4546

46-
filetypesClient, err := filetypes.NewClient(c.spec.FileSpec)
47+
filetypesClient, err := filetypes.NewClient(&c.spec.FileSpec)
4748
if err != nil {
4849
return nil, fmt.Errorf("failed to create filetypes client: %w", err)
4950
}

plugins/destination/gcs/client/client_test.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/apache/arrow/go/v15/arrow"
1111
"github.com/apache/arrow/go/v15/arrow/array"
1212
"github.com/apache/arrow/go/v15/arrow/memory"
13+
spec "github.com/cloudquery/cloudquery/plugins/destination/gcs/client/spec"
1314
"github.com/cloudquery/filetypes/v4"
1415
"github.com/cloudquery/plugin-sdk/v4/message"
1516
"github.com/cloudquery/plugin-sdk/v4/plugin"
@@ -26,29 +27,27 @@ func TestPlugin(t *testing.T) {
2627
filetypes.FormatTypeJSON,
2728
filetypes.FormatTypeParquet,
2829
} {
29-
spec := Spec{
30+
s := spec.Spec{
3031
Bucket: bucket,
3132
Path: t.TempDir(),
3233
NoRotate: true,
33-
FileSpec: &filetypes.FileSpec{
34-
Format: ft,
35-
},
34+
FileSpec: filetypes.FileSpec{Format: ft},
3635
}
3736

3837
t.Run("generic/"+string(ft), func(t *testing.T) {
39-
testPlugin(t, &spec)
38+
testPlugin(t, &s)
4039
})
4140

4241
t.Run("write/"+string(ft), func(t *testing.T) {
43-
testPluginCustom(t, &spec)
42+
testPluginCustom(t, &s)
4443
})
4544
}
4645
}
4746

48-
func testPlugin(t *testing.T, spec *Spec) {
47+
func testPlugin(t *testing.T, s *spec.Spec) {
4948
ctx := context.Background()
5049
p := plugin.NewPlugin("gcs", "development", New)
51-
b, err := json.Marshal(spec)
50+
b, err := json.Marshal(s)
5251
if err != nil {
5352
t.Fatal(err)
5453
}
@@ -67,7 +66,7 @@ func testPlugin(t *testing.T, spec *Spec) {
6766
)
6867
}
6968

70-
func testPluginCustom(t *testing.T, spec *Spec) {
69+
func testPluginCustom(t *testing.T, s *spec.Spec) {
7170
ctx := context.Background()
7271

7372
var client plugin.Client
@@ -77,7 +76,7 @@ func testPluginCustom(t *testing.T, spec *Spec) {
7776
client, err = New(ctx, logger, spec, opts)
7877
return client, err
7978
})
80-
b, err := json.Marshal(spec)
79+
b, err := json.Marshal(s)
8180
if err != nil {
8281
t.Fatal(err)
8382
}

plugins/destination/gcs/client/spec.go

Lines changed: 0 additions & 68 deletions
This file was deleted.
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"path"
7+
"runtime"
8+
9+
"github.com/cloudquery/cloudquery/plugins/destination/gcs/client/spec"
10+
"github.com/cloudquery/codegen/jsonschema"
11+
"github.com/cloudquery/filetypes/v4"
12+
)
13+
14+
func main() {
15+
fmt.Println("Generating JSON schema for plugin spec")
16+
jsonschema.GenerateIntoFile(new(spec.Spec), path.Join(currDir(), "..", "schema.json"),
17+
append(filetypes.FileSpec{}.JSONSchemaOptions(),
18+
jsonschema.WithAddGoComments("github.com/cloudquery/cloudquery/plugins/destination/gcs/client/spec", path.Join(currDir(), "..")),
19+
jsonschema.WithAddGoComments("github.com/cloudquery/filetypes/v4", path.Join(currDir(), "..", "..", "..", "vendor", "github.com/cloudquery/filetypes/v4")),
20+
)...,
21+
)
22+
}
23+
24+
func currDir() string {
25+
_, filename, _, ok := runtime.Caller(0)
26+
if !ok {
27+
log.Fatal("Failed to get caller information")
28+
}
29+
return path.Dir(filename)
30+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package spec
2+
3+
import (
4+
_ "embed"
5+
6+
"github.com/invopop/jsonschema"
7+
orderedmap "github.com/wk8/go-ordered-map/v2"
8+
)
9+
10+
func (s Spec) JSONSchemaExtend(sc *jsonschema.Schema) {
11+
s.FileSpec.JSONSchemaExtend(sc) // need to call manually
12+
13+
batchTimeout := sc.Properties.Value("batch_timeout").OneOf[0] // 0 - val, 1 - null
14+
batchTimeout.Default = "30s"
15+
16+
// no_rotate:true -> only nulls for batch options
17+
noRotateNoBatch := &jsonschema.Schema{
18+
Title: "Disallow batching when using no_rotate",
19+
If: &jsonschema.Schema{
20+
Properties: func() *orderedmap.OrderedMap[string, *jsonschema.Schema] {
21+
noRotate := *sc.Properties.Value("no_rotate")
22+
noRotate.Default = nil
23+
noRotate.Const = true
24+
noRotate.Description = ""
25+
properties := orderedmap.New[string, *jsonschema.Schema]()
26+
properties.Set("no_rotate", &noRotate)
27+
return properties
28+
}(),
29+
Required: []string{"no_rotate"},
30+
},
31+
Then: &jsonschema.Schema{
32+
Properties: func() *orderedmap.OrderedMap[string, *jsonschema.Schema] {
33+
// we make the non-zero requirement, so we want to allow only null here
34+
null := &jsonschema.Schema{Type: "null"}
35+
properties := orderedmap.New[string, *jsonschema.Schema]()
36+
properties.Set("batch_size", null)
37+
properties.Set("batch_size_bytes", null)
38+
properties.Set("batch_timeout", null)
39+
return properties
40+
}(),
41+
},
42+
}
43+
44+
sc.AllOf = append(sc.AllOf, noRotateNoBatch)
45+
}
46+
47+
//go:embed schema.json
48+
var JSONSchema string

0 commit comments

Comments
 (0)