diff --git a/CHANGELOG.md b/CHANGELOG.md index 9de459c415..2bbfd9442e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,19 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [4.24.0](https://github.com/cloudquery/plugin-sdk/compare/v4.23.0...v4.24.0) (2023-12-29) + + +### Features + +* Offline licensing support ([1fdf892](https://github.com/cloudquery/plugin-sdk/commit/1fdf892b8b4e4a90da4e69a463af0dd7d8b6a420)) + + +### Bug Fixes + +* **deps:** Update module github.com/cloudquery/plugin-pb-go to v1.15.0 ([#1438](https://github.com/cloudquery/plugin-sdk/issues/1438)) ([e0c2a4b](https://github.com/cloudquery/plugin-sdk/commit/e0c2a4bbf6248294ae62e47e129a65ed8dc01277)) +* **deps:** Update module github.com/cloudquery/plugin-pb-go to v1.16.0 ([#1440](https://github.com/cloudquery/plugin-sdk/issues/1440)) ([d2a5850](https://github.com/cloudquery/plugin-sdk/commit/d2a5850e126368fd3e03f0d993383ac0e355c8bc)) + ## [4.23.0](https://github.com/cloudquery/plugin-sdk/compare/v4.22.0...v4.23.0) (2023-12-27) diff --git a/examples/simple_plugin/go.mod b/examples/simple_plugin/go.mod index de3274025f..a7d8981647 100644 --- a/examples/simple_plugin/go.mod +++ b/examples/simple_plugin/go.mod @@ -4,7 +4,7 @@ go 1.21.1 require ( github.com/apache/arrow/go/v15 v15.0.0-20231223155039-ec41209ea02b - github.com/cloudquery/plugin-sdk/v4 v4.22.0 + github.com/cloudquery/plugin-sdk/v4 v4.23.0 github.com/rs/zerolog v1.30.0 ) @@ -21,6 +21,7 @@ require ( github.com/CloudyKit/jet/v6 v6.2.0 // indirect github.com/Joker/jade v1.1.3 // indirect github.com/Shopify/goreferrer v0.0.0-20220729165902-8cddb4f5de06 // indirect + github.com/adrg/xdg v0.4.0 // indirect github.com/andybalholm/brotli v1.0.6 // indirect github.com/apache/arrow/go/v13 v13.0.0-20230731205701-112f94971882 // indirect github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect @@ -32,7 +33,7 @@ require ( github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect github.com/chenzhuoyu/iasm v0.9.1 // indirect github.com/cloudquery/cloudquery-api-go v1.6.3 // indirect - github.com/cloudquery/plugin-pb-go v1.14.6 // indirect + github.com/cloudquery/plugin-pb-go v1.16.0 // indirect github.com/cloudquery/plugin-sdk/v2 v2.7.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/deepmap/oapi-codegen v1.15.0 // indirect diff --git a/examples/simple_plugin/go.sum b/examples/simple_plugin/go.sum index 61fec184b6..22f42515a6 100644 --- a/examples/simple_plugin/go.sum +++ b/examples/simple_plugin/go.sum @@ -11,6 +11,8 @@ github.com/Joker/jade v1.1.3/go.mod h1:T+2WLyt7VH6Lp0TRxQrUYEs64nRc83wkMQrfeIQKd github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= github.com/Shopify/goreferrer v0.0.0-20220729165902-8cddb4f5de06 h1:KkH3I3sJuOLP3TjA/dfr4NAY8bghDwnXiU7cTKxQqo0= github.com/Shopify/goreferrer v0.0.0-20220729165902-8cddb4f5de06/go.mod h1:7erjKLwalezA0k99cWs5L11HWOAPNjdUZ6RxH1BXbbM= +github.com/adrg/xdg v0.4.0 h1:RzRqFcjH4nE5C6oTAxhBtoE2IRyjBSa62SCbyPidvls= +github.com/adrg/xdg v0.4.0/go.mod h1:N6ag73EX4wyxeaoeHctc1mas01KZgsj5tYiAIwqJE/E= github.com/ajg/form v1.5.1 h1:t9c7v8JUKu/XxOGBU0yjNpaMloxGEJhUkqFRq0ibGeU= github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY= github.com/andybalholm/brotli v1.0.6 h1:Yf9fFpf49Zrxb9NlQaluyE92/+X7UVHlhMNJN2sxfOI= @@ -45,8 +47,8 @@ github.com/chenzhuoyu/iasm v0.9.1 h1:tUHQJXo3NhBqw6s33wkGn9SP3bvrWLdlVIJ3hQBL7P0 github.com/chenzhuoyu/iasm v0.9.1/go.mod h1:Xjy2NpN3h7aUqeqM+woSuuvxmIe6+DDsiNLIrkAmYog= github.com/cloudquery/cloudquery-api-go v1.6.3 h1:PnT0S57dN24nE86XGVM+5+ePTtMeXsEQqOR3kT2+YhY= github.com/cloudquery/cloudquery-api-go v1.6.3/go.mod h1:03fojQg0UpdgqXZ9tzZ5gF5CPad/F0sok66bsX6u4RA= -github.com/cloudquery/plugin-pb-go v1.14.6 h1:qFJ+611laD1PooMsaSuWfJZdVXG+Ul3/U7MlAjZXrXw= -github.com/cloudquery/plugin-pb-go v1.14.6/go.mod h1:/1KqTM4g8wG0uO0KQBct2t0vJrIxPY8npEZC6tdC2Es= +github.com/cloudquery/plugin-pb-go v1.16.0 h1:IoEIfskvZ07cTHnM5ma0KUVZtZSGtjXawZRH1nA5brQ= +github.com/cloudquery/plugin-pb-go v1.16.0/go.mod h1:p9qa3KC23payNjDiahMRh4f9Wu0uBPjDK8oHCqSp8eU= github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U= github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= @@ -320,6 +322,7 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211103235746-7861aae1554b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/go.mod b/go.mod index 6f2b1dd1c3..48a182d0e4 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/apache/arrow/go/v15 v15.0.0-20231223155039-ec41209ea02b github.com/bradleyjkemp/cupaloy/v2 v2.8.0 github.com/cloudquery/cloudquery-api-go v1.6.3 - github.com/cloudquery/plugin-pb-go v1.14.6 + github.com/cloudquery/plugin-pb-go v1.16.0 github.com/cloudquery/plugin-sdk/v2 v2.7.0 github.com/getsentry/sentry-go v0.24.1 github.com/goccy/go-json v0.10.2 diff --git a/go.sum b/go.sum index 8adb7c9233..53788d19e0 100644 --- a/go.sum +++ b/go.sum @@ -47,8 +47,8 @@ github.com/chenzhuoyu/iasm v0.9.1 h1:tUHQJXo3NhBqw6s33wkGn9SP3bvrWLdlVIJ3hQBL7P0 github.com/chenzhuoyu/iasm v0.9.1/go.mod h1:Xjy2NpN3h7aUqeqM+woSuuvxmIe6+DDsiNLIrkAmYog= github.com/cloudquery/cloudquery-api-go v1.6.3 h1:PnT0S57dN24nE86XGVM+5+ePTtMeXsEQqOR3kT2+YhY= github.com/cloudquery/cloudquery-api-go v1.6.3/go.mod h1:03fojQg0UpdgqXZ9tzZ5gF5CPad/F0sok66bsX6u4RA= -github.com/cloudquery/plugin-pb-go v1.14.6 h1:qFJ+611laD1PooMsaSuWfJZdVXG+Ul3/U7MlAjZXrXw= -github.com/cloudquery/plugin-pb-go v1.14.6/go.mod h1:/1KqTM4g8wG0uO0KQBct2t0vJrIxPY8npEZC6tdC2Es= +github.com/cloudquery/plugin-pb-go v1.16.0 h1:IoEIfskvZ07cTHnM5ma0KUVZtZSGtjXawZRH1nA5brQ= +github.com/cloudquery/plugin-pb-go v1.16.0/go.mod h1:p9qa3KC23payNjDiahMRh4f9Wu0uBPjDK8oHCqSp8eU= github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U= github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= diff --git a/plugin/meta.go b/plugin/meta.go new file mode 100644 index 0000000000..e609a13d49 --- /dev/null +++ b/plugin/meta.go @@ -0,0 +1,10 @@ +package plugin + +import cqapi "github.com/cloudquery/cloudquery-api-go" + +type Meta struct { + Team cqapi.PluginTeam + Kind cqapi.PluginKind + Name cqapi.PluginName + SkipUsageClient bool +} diff --git a/plugin/plugin.go b/plugin/plugin.go index fe84e8e2ce..66573dc42a 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/apache/arrow/go/v15/arrow" + cqapi "github.com/cloudquery/cloudquery-api-go" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/rs/zerolog" @@ -17,6 +18,7 @@ var ErrNotImplemented = fmt.Errorf("not implemented") type NewClientOptions struct { NoConnection bool + PluginMeta Meta } type NewClientFunc func(context.Context, zerolog.Logger, []byte, NewClientOptions) (Client, error) @@ -78,6 +80,8 @@ type Plugin struct { schema string // validator object to validate specs schemaValidator *jsonschema.Schema + // skips the usage client + skipUsageClient bool } // NewPlugin returns a new CloudQuery Plugin with the given name, version and implementation. @@ -124,6 +128,11 @@ func (p *Plugin) Version() string { return p.version } +// SetSkipUsageClient sets whether the usage client should be skipped +func (p *Plugin) SetSkipUsageClient(v bool) { + p.skipUsageClient = v +} + type OnBeforeSender interface { OnBeforeSend(context.Context, message.SyncMessage) (message.SyncMessage, error) } @@ -196,6 +205,13 @@ func (p *Plugin) Init(ctx context.Context, spec []byte, options NewClientOptions } } + options.PluginMeta = Meta{ + Team: p.team, + Kind: cqapi.PluginKind(p.kind), + Name: p.name, + SkipUsageClient: p.skipUsageClient, + } + p.client, err = p.newClient(ctx, p.logger, spec, options) if err != nil { return fmt.Errorf("failed to initialize client: %w", err) diff --git a/premium/offline.go b/premium/offline.go new file mode 100644 index 0000000000..31dab33df3 --- /dev/null +++ b/premium/offline.go @@ -0,0 +1,94 @@ +package premium + +import ( + "crypto/ed25519" + _ "embed" + "encoding/hex" + "encoding/json" + "errors" + "os" + "time" + + "github.com/rs/zerolog" +) + +type License struct { + LicensedTo string `json:"licensed_to"` // Customers name, e.g. "Acme Inc" + IssuedAt time.Time `json:"issued_at"` + ValidFrom time.Time `json:"valid_from"` + ExpiresAt time.Time `json:"expires_at"` +} + +type LicenseWrapper struct { + LicenseBytes []byte `json:"license"` + Signature string `json:"signature"` // crypto +} + +var ( + ErrInvalidLicenseSignature = errors.New("invalid license signature") + ErrLicenseNotValidYet = errors.New("license not valid yet") + ErrLicenseExpired = errors.New("license expired") +) + +//go:embed offline.key +var publicKey string + +func ValidateLicense(logger zerolog.Logger, licenseFile string) error { + licenseContents, err := os.ReadFile(licenseFile) + if err != nil { + return err + } + + l, err := UnpackLicense(licenseContents) + if err != nil { + return err + } + + return l.IsValid(logger) +} + +func UnpackLicense(lic []byte) (*License, error) { + publicKeyBytes, err := hex.DecodeString(publicKey) + if err != nil { + return nil, err + } + + var lw LicenseWrapper + if err := json.Unmarshal(lic, &lw); err != nil { + return nil, err + } + + signatureBytes, err := hex.DecodeString(lw.Signature) + if err != nil { + return nil, err + } + + if !ed25519.Verify(publicKeyBytes, lw.LicenseBytes, signatureBytes) { + return nil, ErrInvalidLicenseSignature + } + + var l License + if err := json.Unmarshal(lw.LicenseBytes, &l); err != nil { + return nil, err + } + + return &l, nil +} + +func (l *License) IsValid(logger zerolog.Logger) error { + now := time.Now().UTC() + if now.Before(l.ValidFrom) { + return ErrLicenseNotValidYet + } + if now.After(l.ExpiresAt) { + return ErrLicenseExpired + } + + msg := logger.Info() + if now.Add(15 * 24 * time.Hour).After(l.ExpiresAt) { + msg = logger.Warn() + } + + msg.Time("expires_at", l.ExpiresAt).Msgf("Offline license for %s loaded.", l.LicensedTo) + return nil +} diff --git a/premium/offline.key b/premium/offline.key new file mode 100644 index 0000000000..5c93806ae6 --- /dev/null +++ b/premium/offline.key @@ -0,0 +1 @@ +fd81a64351452e0ada99c05c7e44bdf104cc583eb3ed44bf5545fe82b2f0a615 \ No newline at end of file diff --git a/premium/offline_test.go b/premium/offline_test.go new file mode 100644 index 0000000000..92e37135c1 --- /dev/null +++ b/premium/offline_test.go @@ -0,0 +1,27 @@ +package premium + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestUnpackLicense(t *testing.T) { + publicKey = "eacdff4866c8bc0d97de3c2d7668d0970c61aa16c3f12d6ba8083147ff92c9a6" + + t.Run("Success", func(t *testing.T) { + licData := `{"license":"eyJsaWNlbnNlZF90byI6IlVOTElDRU5TRUQgVEVTVCIsImlzc3VlZF9hdCI6IjIwMjMtMTItMjhUMTk6MDI6MjguODM4MzY3WiIsInZhbGlkX2Zyb20iOiIyMDIzLTEyLTI4VDE5OjAyOjI4LjgzODM2N1oiLCJleHBpcmVzX2F0IjoiMjAyMy0xMi0yOVQxOTowMjoyOC44MzgzNjdaIn0=","signature":"8687a858463764b052455b3c783d979d364b5fb653b86d88a7463e495480db62fdec7ae1a84d1e30dddee77eb769a0e498ecfc836538c53e410aeb1a0c04d102"}` + + l, err := UnpackLicense([]byte(licData)) + require.NoError(t, err) + require.Equal(t, "UNLICENSED TEST", l.LicensedTo) + require.Equal(t, l.ExpiresAt.Add(-24*time.Hour).Truncate(time.Hour), l.ValidFrom.Truncate(time.Hour)) + }) + t.Run("Fail", func(t *testing.T) { + licData := `{"license":"eyJsaWNlbnNlZF90byI6IlVOTElDRU5TRUQgVEVTVCIsImlzc3VlZF9hdCI6IjIwMjMtMTItMjhUMTk6MDI6MjguODM4MzY3WiIsInZhbGlkX2Zyb20iOiIyMDIzLTEyLTI4VDE5OjAyOjI4LjgzODM2N1oiLCJleHBpcmVzX2F0IjoiMjAyMy0xMi0yOVQxOTowMjoyOC44MzgzNjdaIn0=","signature":"9687a858463764b052455b3c783d979d364b5fb653b86d88a7463e495480db62fdec7ae1a84d1e30dddee77eb769a0e498ecfc836538c53e410aeb1a0c04d102"}` + l, err := UnpackLicense([]byte(licData)) + require.ErrorIs(t, err, ErrInvalidLicenseSignature) + require.Nil(t, l) + }) +} diff --git a/premium/usage.go b/premium/usage.go index 7080cf09a3..3a27d3eb21 100644 --- a/premium/usage.go +++ b/premium/usage.go @@ -13,6 +13,7 @@ import ( cqapi "github.com/cloudquery/cloudquery-api-go" "github.com/cloudquery/cloudquery-api-go/auth" "github.com/cloudquery/cloudquery-api-go/config" + "github.com/cloudquery/plugin-sdk/v4/plugin" "github.com/google/uuid" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -119,19 +120,10 @@ func withTokenClient(tokenClient TokenClient) UsageClientOptions { } } -func WithPluginTeam(pluginTeam string) cqapi.PluginTeam { - return pluginTeam -} - -func WithPluginKind(pluginKind string) cqapi.PluginKind { - return cqapi.PluginKind(pluginKind) -} - -func WithPluginName(pluginName string) cqapi.PluginName { - return pluginName -} - -var _ UsageClient = (*BatchUpdater)(nil) +var ( + _ UsageClient = (*BatchUpdater)(nil) + _ UsageClient = (*NoOpUsageClient)(nil) +) type BatchUpdater struct { logger zerolog.Logger @@ -141,9 +133,7 @@ type BatchUpdater struct { // Plugin details teamName cqapi.TeamName - pluginTeam cqapi.PluginTeam - pluginKind cqapi.PluginKind - pluginName cqapi.PluginName + pluginMeta plugin.Meta // Configuration batchLimit uint32 @@ -161,14 +151,12 @@ type BatchUpdater struct { isClosed bool } -func NewUsageClient(pluginTeam cqapi.PluginTeam, pluginKind cqapi.PluginKind, pluginName cqapi.PluginName, ops ...UsageClientOptions) (*BatchUpdater, error) { +func NewUsageClient(meta plugin.Meta, ops ...UsageClientOptions) (UsageClient, error) { u := &BatchUpdater{ logger: zerolog.Nop(), url: defaultAPIURL, - pluginTeam: pluginTeam, - pluginKind: pluginKind, - pluginName: pluginName, + pluginMeta: meta, batchLimit: defaultBatchLimit, minTimeBetweenFlushes: defaultMinTimeBetweenFlushes, @@ -183,6 +171,13 @@ func NewUsageClient(pluginTeam cqapi.PluginTeam, pluginKind cqapi.PluginKind, pl op(u) } + if meta.SkipUsageClient { + u.logger.Debug().Msg("Disabling usage client") + return &NoOpUsageClient{ + TeamNameValue: u.teamName, + }, nil + } + if u.tokenClient == nil { u.tokenClient = auth.NewTokenClient() } @@ -248,8 +243,8 @@ func (u *BatchUpdater) TeamName() string { } func (u *BatchUpdater) HasQuota(ctx context.Context) (bool, error) { - u.logger.Debug().Str("url", u.url).Str("team", u.teamName).Str("pluginTeam", u.pluginTeam).Str("pluginKind", string(u.pluginKind)).Str("pluginName", u.pluginName).Msg("checking quota") - usage, err := u.apiClient.GetTeamPluginUsageWithResponse(ctx, u.teamName, u.pluginTeam, u.pluginKind, u.pluginName) + u.logger.Debug().Str("url", u.url).Str("team", u.teamName).Str("pluginTeam", u.pluginMeta.Team).Str("pluginKind", string(u.pluginMeta.Kind)).Str("pluginName", u.pluginMeta.Name).Msg("checking quota") + usage, err := u.apiClient.GetTeamPluginUsageWithResponse(ctx, u.teamName, u.pluginMeta.Team, u.pluginMeta.Kind, u.pluginMeta.Name) if err != nil { return false, fmt.Errorf("failed to get usage: %w", err) } @@ -333,9 +328,9 @@ func (u *BatchUpdater) updateUsageWithRetryAndBackoff(ctx context.Context, numbe resp, err := u.apiClient.IncreaseTeamPluginUsageWithResponse(ctx, u.teamName, cqapi.IncreaseTeamPluginUsageJSONRequestBody{ RequestId: uuid.New(), - PluginTeam: u.pluginTeam, - PluginKind: u.pluginKind, - PluginName: u.pluginName, + PluginTeam: u.pluginMeta.Team, + PluginKind: u.pluginMeta.Kind, + PluginName: u.pluginMeta.Name, Rows: int(numberToUpdate), }) if err != nil { @@ -414,3 +409,23 @@ func (u *BatchUpdater) getTeamNameByTokenType(tokenType auth.TokenType) (string, return "", fmt.Errorf("unsupported token type: %v", tokenType) } } + +type NoOpUsageClient struct { + TeamNameValue string +} + +func (n *NoOpUsageClient) TeamName() string { + return n.TeamNameValue +} + +func (NoOpUsageClient) HasQuota(_ context.Context) (bool, error) { + return true, nil +} + +func (NoOpUsageClient) Increase(_ uint32) error { + return nil +} + +func (NoOpUsageClient) Close() error { + return nil +} diff --git a/premium/usage_test.go b/premium/usage_test.go index a8bd519492..b658e3b905 100644 --- a/premium/usage_test.go +++ b/premium/usage_test.go @@ -7,12 +7,14 @@ import ( "math" "net/http" "net/http/httptest" + "sync" "testing" "time" cqapi "github.com/cloudquery/cloudquery-api-go" "github.com/cloudquery/cloudquery-api-go/auth" "github.com/cloudquery/cloudquery-api-go/config" + "github.com/cloudquery/plugin-sdk/v4/plugin" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -37,19 +39,23 @@ func TestUsageService_NewUsageClient_Defaults(t *testing.T) { require.NoError(t, err) uc, err := NewUsageClient( - WithPluginTeam("plugin-team"), - WithPluginKind("source"), - WithPluginName("vault"), + plugin.Meta{ + Team: "plugin-team", + Kind: cqapi.Source, + Name: "vault", + }, withTokenClient(&MockTokenClient{}), ) require.NoError(t, err) - assert.NotNil(t, uc.apiClient) - assert.Equal(t, "config-team", uc.teamName) - assert.Equal(t, zerolog.Nop(), uc.logger) - assert.Equal(t, 5, uc.maxRetries) - assert.Equal(t, 60*time.Second, uc.maxWaitTime) - assert.Equal(t, 30*time.Second, uc.maxTimeBetweenFlushes) + bu := uc.(*BatchUpdater) + + assert.NotNil(t, bu.apiClient) + assert.Equal(t, "config-team", bu.teamName) + assert.Equal(t, zerolog.Nop(), bu.logger) + assert.Equal(t, 5, bu.maxRetries) + assert.Equal(t, 60*time.Second, bu.maxWaitTime) + assert.Equal(t, 30*time.Second, bu.maxTimeBetweenFlushes) } func TestUsageService_NewUsageClient_Override(t *testing.T) { @@ -59,9 +65,11 @@ func TestUsageService_NewUsageClient_Override(t *testing.T) { logger := zerolog.New(zerolog.NewTestWriter(t)) uc, err := NewUsageClient( - WithPluginTeam("plugin-team"), - WithPluginKind("source"), - WithPluginName("vault"), + plugin.Meta{ + Team: "plugin-team", + Kind: cqapi.Source, + Name: "vault", + }, WithLogger(logger), WithAPIClient(ac), withTeamName("override-team-name"), @@ -72,12 +80,14 @@ func TestUsageService_NewUsageClient_Override(t *testing.T) { ) require.NoError(t, err) - assert.Equal(t, ac, uc.apiClient) - assert.Equal(t, "override-team-name", uc.teamName) - assert.Equal(t, logger, uc.logger) - assert.Equal(t, 10, uc.maxRetries) - assert.Equal(t, 120*time.Second, uc.maxWaitTime) - assert.Equal(t, 10*time.Second, uc.maxTimeBetweenFlushes) + bu := uc.(*BatchUpdater) + + assert.Equal(t, ac, bu.apiClient) + assert.Equal(t, "override-team-name", bu.teamName) + assert.Equal(t, logger, bu.logger) + assert.Equal(t, 10, bu.maxRetries) + assert.Equal(t, 120*time.Second, bu.maxWaitTime) + assert.Equal(t, 10*time.Second, bu.maxTimeBetweenFlushes) } func TestUsageService_HasQuota_NoRowsRemaining(t *testing.T) { @@ -367,13 +377,15 @@ func TestUsageService_CalculateRetryDuration_ServerBackPressure(t *testing.T) { func newClient(t *testing.T, apiClient *cqapi.ClientWithResponses, ops ...UsageClientOptions) *BatchUpdater { client, err := NewUsageClient( - WithPluginTeam("plugin-team"), - WithPluginKind("source"), - WithPluginName("vault"), + plugin.Meta{ + Team: "plugin-team", + Kind: cqapi.Source, + Name: "vault", + }, append(ops, withTeamName("team-name"), WithAPIClient(apiClient), withTokenClient(&MockTokenClient{}))...) require.NoError(t, err) - return client + return client.(*BatchUpdater) } func createTestServerWithRemainingRows(t *testing.T, remainingRows int) *testStage { @@ -397,6 +409,8 @@ func createTestServerWithRemainingRows(t *testing.T, remainingRows int) *testSta err := dec.Decode(&req) require.NoError(t, err) + stage.mu.Lock() + defer stage.mu.Unlock() stage.update = append(stage.update, req.Rows) w.WriteHeader(http.StatusOK) @@ -418,13 +432,18 @@ type testStage struct { remainingRows int update []int + mu sync.RWMutex } func (s *testStage) numberOfUpdates() int { + s.mu.RLock() + defer s.mu.RUnlock() return len(s.update) } func (s *testStage) sumOfUpdates() int { + s.mu.RLock() + defer s.mu.RUnlock() sum := 0 for _, val := range s.update { sum += val @@ -433,6 +452,8 @@ func (s *testStage) sumOfUpdates() int { } func (s *testStage) minExcludingClose() int { + s.mu.RLock() + defer s.mu.RUnlock() m := math.MaxInt for i := 0; i < len(s.update); i++ { if s.update[i] < m { diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go index b5ab8dd2ae..dfaa4cf97a 100644 --- a/scheduler/scheduler_test.go +++ b/scheduler/scheduler_test.go @@ -3,8 +3,6 @@ package scheduler import ( "context" "fmt" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "testing" "github.com/apache/arrow/go/v15/arrow" @@ -13,6 +11,8 @@ import ( "github.com/cloudquery/plugin-sdk/v4/scalar" "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) type testExecutionClient struct { @@ -276,10 +276,15 @@ func TestScheduler_Cancellation(t *testing.T) { } for _, strategy := range AllStrategies { + strategy := strategy for _, tc := range tests { tc := tc t.Run(fmt.Sprintf("%s_%s", tc.name, strategy.String()), func(t *testing.T) { - sc := NewScheduler(WithLogger(zerolog.New(zerolog.NewTestWriter(t))), WithStrategy(strategy)) + logger := zerolog.New(zerolog.NewTestWriter(t)) + if tc.cancel { + logger = zerolog.Nop() // FIXME without this, zerolog usage causes a race condition when tests are run with `-race -count=100` + } + sc := NewScheduler(WithLogger(logger), WithStrategy(strategy)) messages := make(chan message.SyncMessage) ctx, cancel := context.WithCancel(context.Background()) diff --git a/serve/plugin.go b/serve/plugin.go index 191dcc353b..adff0e3c82 100644 --- a/serve/plugin.go +++ b/serve/plugin.go @@ -11,6 +11,7 @@ import ( "github.com/cloudquery/plugin-sdk/v4/helpers/grpczerolog" "github.com/cloudquery/plugin-sdk/v4/plugin" + "github.com/cloudquery/plugin-sdk/v4/premium" "github.com/cloudquery/plugin-sdk/v4/types" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" @@ -121,6 +122,7 @@ func (s *PluginServe) newCmdPluginServe() *cobra.Command { var otelEndpointHeaders []string var otelEndpointInsecure bool var otelEndpointURLPath string + var licenseFile string logLevel := newEnum([]string{"trace", "debug", "info", "warn", "error"}, "info") logFormat := newEnum([]string{"text", "json"}, "text") telemetryLevel := newEnum([]string{"none", "errors", "stats", "all"}, "all") @@ -186,8 +188,13 @@ func (s *PluginServe) newCmdPluginServe() *cobra.Command { }() otel.SetTracerProvider(tp) } + if licenseFile != "" { + if err := premium.ValidateLicense(logger, licenseFile); err != nil { + return fmt.Errorf("failed to validate license: %w", err) + } + s.plugin.SetSkipUsageClient(true) + } - // opts.Plugin.Logger = logger var listener net.Listener if s.testListener { listener = s.testListenerConn @@ -296,6 +303,7 @@ func (s *PluginServe) newCmdPluginServe() *cobra.Command { cmd.Flags().StringArrayVar(&otelEndpointHeaders, "otel-endpoint-headers", []string{}, "Open Telemetry HTTP collector endpoint headers") cmd.Flags().BoolVar(&otelEndpointInsecure, "otel-endpoint-insecure", false, "use Open Telemetry HTTP endpoint (for development only)") cmd.Flags().BoolVar(&noSentry, "no-sentry", false, "disable sentry") + cmd.Flags().StringVar(&licenseFile, "license", "", "Path to offline license file") sendErrors := funk.ContainsString([]string{"all", "errors"}, telemetryLevel.String()) if !sendErrors { noSentry = true