Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions clients/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"time"

"github.com/cloudquery/plugin-sdk/internal/pb"
"github.com/cloudquery/plugin-sdk/plugins"
"github.com/cloudquery/plugin-sdk/plugins/destination"
"github.com/cloudquery/plugin-sdk/schema"
"github.com/cloudquery/plugin-sdk/specs"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -196,12 +196,12 @@ func (c *DestinationClient) GetProtocolVersion(ctx context.Context) (uint64, err
return res.Version, nil
}

func (c *DestinationClient) GetMetrics(ctx context.Context) (*plugins.DestinationMetrics, error) {
func (c *DestinationClient) GetMetrics(ctx context.Context) (*destination.Metrics, error) {
res, err := c.pbClient.GetMetrics(ctx, &pb.GetDestinationMetrics_Request{})
if err != nil {
return nil, fmt.Errorf("failed to call GetMetrics: %w", err)
}
var stats plugins.DestinationMetrics
var stats destination.Metrics
if err := json.Unmarshal(res.Metrics, &stats); err != nil {
return nil, fmt.Errorf("failed to unmarshal destination metrics: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions clients/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"sync"

"github.com/cloudquery/plugin-sdk/internal/pb"
"github.com/cloudquery/plugin-sdk/plugins"
"github.com/cloudquery/plugin-sdk/plugins/source"
"github.com/cloudquery/plugin-sdk/schema"
"github.com/cloudquery/plugin-sdk/specs"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -217,12 +217,12 @@ func (c *SourceClient) Version(ctx context.Context) (string, error) {
return res.Version, nil
}

func (c *SourceClient) GetMetrics(ctx context.Context) (*plugins.SourceMetrics, error) {
func (c *SourceClient) GetMetrics(ctx context.Context) (*source.Metrics, error) {
res, err := c.pbClient.GetMetrics(ctx, &pb.GetSourceMetrics_Request{})
if err != nil {
return nil, fmt.Errorf("failed to call GetMetrics: %w", err)
}
var stats plugins.SourceMetrics
var stats source.Metrics
if err := json.Unmarshal(res.Metrics, &stats); err != nil {
return nil, fmt.Errorf("failed to unmarshal source stats: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/servers/destinations.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"io"

"github.com/cloudquery/plugin-sdk/internal/pb"
"github.com/cloudquery/plugin-sdk/plugins"
"github.com/cloudquery/plugin-sdk/plugins/destination"
"github.com/cloudquery/plugin-sdk/schema"
"github.com/cloudquery/plugin-sdk/specs"
"github.com/rs/zerolog"
Expand All @@ -18,7 +18,7 @@ import (

type DestinationServer struct {
pb.UnimplementedDestinationServer
Plugin *plugins.DestinationPlugin
Plugin *destination.Plugin
Logger zerolog.Logger
}

Expand Down
10 changes: 5 additions & 5 deletions internal/servers/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"fmt"

"github.com/cloudquery/plugin-sdk/internal/pb"
"github.com/cloudquery/plugin-sdk/plugins"
"github.com/cloudquery/plugin-sdk/plugins/source"
"github.com/cloudquery/plugin-sdk/schema"
"github.com/cloudquery/plugin-sdk/specs"
"github.com/getsentry/sentry-go"
Expand All @@ -20,7 +20,7 @@ import (

type SourceServer struct {
pb.UnimplementedSourceServer
Plugin *plugins.SourcePlugin
Plugin *source.Plugin
Logger zerolog.Logger
}

Expand Down Expand Up @@ -139,16 +139,16 @@ func (s *SourceServer) GetMetrics(context.Context, *pb.GetSourceMetrics_Request)
// Aggregate metrics before sending to keep response size small.
// Temporary fix for https://github.com/cloudquery/cloudquery/issues/3962
m := s.Plugin.Metrics()
agg := &plugins.TableClientMetrics{}
agg := &source.TableClientMetrics{}
for _, table := range m.TableClient {
for _, tableClient := range table {
agg.Resources += tableClient.Resources
agg.Errors += tableClient.Errors
agg.Panics += tableClient.Panics
}
}
b, err := json.Marshal(&plugins.SourceMetrics{
TableClient: map[string]map[string]*plugins.TableClientMetrics{"": {"": agg}},
b, err := json.Marshal(&source.Metrics{
TableClient: map[string]map[string]*source.TableClientMetrics{"": {"": agg}},
})
if err != nil {
return nil, fmt.Errorf("failed to marshal source metrics: %w", err)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package plugins
package destination

import (
"context"
Expand Down Expand Up @@ -33,25 +33,25 @@ func withBlockingWrite() TestDestinationOption {
}
}

func getNewTestDestinationMemDBClient(options ...TestDestinationOption) NewDestinationClientFunc {
func getNewTestDestinationMemDBClient(options ...TestDestinationOption) NewClientFunc {
c := &TestDestinationMemDBClient{
memoryDB: make(map[string][][]interface{}),
}
for _, opt := range options {
opt(c)
}
return func(context.Context, zerolog.Logger, specs.Destination) (DestinationClient, error) {
return func(context.Context, zerolog.Logger, specs.Destination) (Client, error) {
return c, nil
}
}

func NewTestDestinationMemDBClient(context.Context, zerolog.Logger, specs.Destination) (DestinationClient, error) {
func NewTestDestinationMemDBClient(context.Context, zerolog.Logger, specs.Destination) (Client, error) {
return &TestDestinationMemDBClient{
memoryDB: make(map[string][][]interface{}),
}, nil
}

func newTestDestinationMemDBClientErrOnNew(context.Context, zerolog.Logger, specs.Destination) (DestinationClient, error) {
func newTestDestinationMemDBClientErrOnNew(context.Context, zerolog.Logger, specs.Destination) (Client, error) {
return nil, fmt.Errorf("newTestDestinationMemDBClientErrOnNew")
}

Expand Down Expand Up @@ -132,8 +132,8 @@ func (c *TestDestinationMemDBClient) Write(ctx context.Context, tables schema.Ta
return nil
}

func (*TestDestinationMemDBClient) Metrics() DestinationMetrics {
return DestinationMetrics{}
func (*TestDestinationMemDBClient) Metrics() Metrics {
return Metrics{}
}

func (c *TestDestinationMemDBClient) Close(context.Context) error {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package plugins
package destination

type DestinationMetrics struct {
type Metrics struct {
// Errors number of errors / failed writes
Errors uint64
// Writes number of successful writes
Expand Down
42 changes: 21 additions & 21 deletions plugins/destination.go → plugins/destination/plugin.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package plugins
package destination

import (
"context"
Expand All @@ -11,15 +11,15 @@ import (
"golang.org/x/sync/errgroup"
)

type NewDestinationClientFunc func(context.Context, zerolog.Logger, specs.Destination) (DestinationClient, error)
type NewClientFunc func(context.Context, zerolog.Logger, specs.Destination) (Client, error)

type DestinationClient interface {
type Client interface {
schema.CQTypeTransformer
ReverseTransformValues(table *schema.Table, values []interface{}) (schema.CQTypes, error)
Migrate(ctx context.Context, tables schema.Tables) error
Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- []interface{}) error
Write(ctx context.Context, tables schema.Tables, res <-chan *ClientResource) error
Metrics() DestinationMetrics
Metrics() Metrics
DeleteStale(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time) error
Close(ctx context.Context) error
}
Expand All @@ -29,15 +29,15 @@ type ClientResource struct {
Data []interface{}
}

type DestinationPlugin struct {
type Plugin struct {
// Name of destination plugin i.e postgresql,snowflake
name string
// Version of the destination plugin
version string
// Called upon configure call to validate and init configuration
newDestinationClient NewDestinationClientFunc
newDestinationClient NewClientFunc
// initialized destination client
client DestinationClient
client Client
// spec the client was initialized with
spec specs.Destination
// Logger to call, this logger is passed to the serve.Serve Client, if not define Serve will create one instead.
Expand All @@ -46,29 +46,29 @@ type DestinationPlugin struct {

const writeWorkers = 1

func NewDestinationPlugin(name string, version string, newDestinationClient NewDestinationClientFunc) *DestinationPlugin {
p := &DestinationPlugin{
func NewDestinationPlugin(name string, version string, newDestinationClient NewClientFunc) *Plugin {
p := &Plugin{
name: name,
version: version,
newDestinationClient: newDestinationClient,
}
return p
}

func (p *DestinationPlugin) Name() string {
func (p *Plugin) Name() string {
return p.name
}

func (p *DestinationPlugin) Version() string {
func (p *Plugin) Version() string {
return p.version
}

func (p *DestinationPlugin) Metrics() DestinationMetrics {
func (p *Plugin) Metrics() Metrics {
return p.client.Metrics()
}

// we need lazy loading because we want to be able to initialize after
func (p *DestinationPlugin) Init(ctx context.Context, logger zerolog.Logger, spec specs.Destination) error {
func (p *Plugin) Init(ctx context.Context, logger zerolog.Logger, spec specs.Destination) error {
var err error
p.logger = logger
p.spec = spec
Expand All @@ -83,12 +83,12 @@ func (p *DestinationPlugin) Init(ctx context.Context, logger zerolog.Logger, spe
}

// we implement all DestinationClient functions so we can hook into pre-post behavior
func (p *DestinationPlugin) Migrate(ctx context.Context, tables schema.Tables) error {
func (p *Plugin) Migrate(ctx context.Context, tables schema.Tables) error {
SetDestinationManagedCqColumns(tables)
return p.client.Migrate(ctx, tables)
}

func (p *DestinationPlugin) readAll(ctx context.Context, table *schema.Table, sourceName string) ([]schema.CQTypes, error) {
func (p *Plugin) readAll(ctx context.Context, table *schema.Table, sourceName string) ([]schema.CQTypes, error) {
var readErr error
ch := make(chan schema.CQTypes)
go func() {
Expand All @@ -103,7 +103,7 @@ func (p *DestinationPlugin) readAll(ctx context.Context, table *schema.Table, so
return resources, readErr
}

func (p *DestinationPlugin) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- schema.CQTypes) error {
func (p *Plugin) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- schema.CQTypes) error {
SetDestinationManagedCqColumns(schema.Tables{table})
ch := make(chan []interface{})
var err error
Expand All @@ -122,13 +122,13 @@ func (p *DestinationPlugin) Read(ctx context.Context, table *schema.Table, sourc
}

// this function is currently used mostly for testing so it's not a public api
func (p *DestinationPlugin) writeOne(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time, resource schema.DestinationResource) error {
func (p *Plugin) writeOne(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time, resource schema.DestinationResource) error {
resources := []schema.DestinationResource{resource}
return p.writeAll(ctx, tables, sourceName, syncTime, resources)
}

// this function is currently used mostly for testing so it's not a public api
func (p *DestinationPlugin) writeAll(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time, resources []schema.DestinationResource) error {
func (p *Plugin) writeAll(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time, resources []schema.DestinationResource) error {
ch := make(chan schema.DestinationResource, len(resources))
for _, resource := range resources {
ch <- resource
Expand All @@ -137,7 +137,7 @@ func (p *DestinationPlugin) writeAll(ctx context.Context, tables schema.Tables,
return p.Write(ctx, tables, sourceName, syncTime, ch)
}

func (p *DestinationPlugin) Write(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time, res <-chan schema.DestinationResource) error {
func (p *Plugin) Write(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time, res <-chan schema.DestinationResource) error {
syncTime = syncTime.UTC()
SetDestinationManagedCqColumns(tables)
ch := make(chan *ClientResource)
Expand Down Expand Up @@ -179,12 +179,12 @@ func (p *DestinationPlugin) Write(ctx context.Context, tables schema.Tables, sou
return nil
}

func (p *DestinationPlugin) DeleteStale(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time) error {
func (p *Plugin) DeleteStale(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time) error {
syncTime = syncTime.UTC()
return p.client.DeleteStale(ctx, tables, sourceName, syncTime)
}

func (p *DestinationPlugin) Close(ctx context.Context) error {
func (p *Plugin) Close(ctx context.Context) error {
return p.client.Close(ctx)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package plugins
package destination

import (
"context"
Expand All @@ -10,13 +10,13 @@ import (
"github.com/cloudquery/plugin-sdk/specs"
)

func TestDestinationPlugin(t *testing.T) {
func TestPlugin(t *testing.T) {
p := NewDestinationPlugin("test", "development", NewTestDestinationMemDBClient)
DestinationPluginTestSuiteRunner(t, p, nil,
DestinationTestSuiteTests{})
PluginTestSuiteRunner(t, p, nil,
TestSuiteTests{})
}

func TestDestinationOnNewError(t *testing.T) {
func TestOnNewError(t *testing.T) {
ctx := context.Background()
p := NewDestinationPlugin("test", "development", newTestDestinationMemDBClientErrOnNew)
err := p.Init(ctx, getTestLogger(t), specs.Destination{})
Expand All @@ -26,7 +26,7 @@ func TestDestinationOnNewError(t *testing.T) {
}
}

func TestDestinationOnWriteError(t *testing.T) {
func TestOnWriteError(t *testing.T) {
ctx := context.Background()
newClientFunc := getNewTestDestinationMemDBClient(withErrOnWrite())
p := NewDestinationPlugin("test", "development", newClientFunc)
Expand All @@ -53,7 +53,7 @@ func TestDestinationOnWriteError(t *testing.T) {
}
}

func TestDestinationOnWriteCtxCancelled(t *testing.T) {
func TestOnWriteCtxCancelled(t *testing.T) {
ctx := context.Background()
newClientFunc := getNewTestDestinationMemDBClient(withBlockingWrite())
p := NewDestinationPlugin("test", "development", newClientFunc)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package plugins
package destination

import (
"fmt"
Expand Down
Loading