diff --git a/clients/destination.go b/clients/destination.go index 09bac7f302..aaee881955 100644 --- a/clients/destination.go +++ b/clients/destination.go @@ -15,7 +15,7 @@ import ( "time" "github.com/cloudquery/plugin-sdk/internal/pb" - "github.com/cloudquery/plugin-sdk/plugins" + "github.com/cloudquery/plugin-sdk/plugins/destination" "github.com/cloudquery/plugin-sdk/schema" "github.com/cloudquery/plugin-sdk/specs" "github.com/rs/zerolog" @@ -196,12 +196,12 @@ func (c *DestinationClient) GetProtocolVersion(ctx context.Context) (uint64, err return res.Version, nil } -func (c *DestinationClient) GetMetrics(ctx context.Context) (*plugins.DestinationMetrics, error) { +func (c *DestinationClient) GetMetrics(ctx context.Context) (*destination.Metrics, error) { res, err := c.pbClient.GetMetrics(ctx, &pb.GetDestinationMetrics_Request{}) if err != nil { return nil, fmt.Errorf("failed to call GetMetrics: %w", err) } - var stats plugins.DestinationMetrics + var stats destination.Metrics if err := json.Unmarshal(res.Metrics, &stats); err != nil { return nil, fmt.Errorf("failed to unmarshal destination metrics: %w", err) } diff --git a/clients/source.go b/clients/source.go index fe32f483a7..fa8e61b4ef 100644 --- a/clients/source.go +++ b/clients/source.go @@ -14,7 +14,7 @@ import ( "sync" "github.com/cloudquery/plugin-sdk/internal/pb" - "github.com/cloudquery/plugin-sdk/plugins" + "github.com/cloudquery/plugin-sdk/plugins/source" "github.com/cloudquery/plugin-sdk/schema" "github.com/cloudquery/plugin-sdk/specs" "github.com/rs/zerolog" @@ -217,12 +217,12 @@ func (c *SourceClient) Version(ctx context.Context) (string, error) { return res.Version, nil } -func (c *SourceClient) GetMetrics(ctx context.Context) (*plugins.SourceMetrics, error) { +func (c *SourceClient) GetMetrics(ctx context.Context) (*source.Metrics, error) { res, err := c.pbClient.GetMetrics(ctx, &pb.GetSourceMetrics_Request{}) if err != nil { return nil, fmt.Errorf("failed to call GetMetrics: %w", err) } - var stats plugins.SourceMetrics + var stats source.Metrics if err := json.Unmarshal(res.Metrics, &stats); err != nil { return nil, fmt.Errorf("failed to unmarshal source stats: %w", err) } diff --git a/internal/servers/destinations.go b/internal/servers/destinations.go index 515dd9211b..08d1610c36 100644 --- a/internal/servers/destinations.go +++ b/internal/servers/destinations.go @@ -7,7 +7,7 @@ import ( "io" "github.com/cloudquery/plugin-sdk/internal/pb" - "github.com/cloudquery/plugin-sdk/plugins" + "github.com/cloudquery/plugin-sdk/plugins/destination" "github.com/cloudquery/plugin-sdk/schema" "github.com/cloudquery/plugin-sdk/specs" "github.com/rs/zerolog" @@ -18,7 +18,7 @@ import ( type DestinationServer struct { pb.UnimplementedDestinationServer - Plugin *plugins.DestinationPlugin + Plugin *destination.Plugin Logger zerolog.Logger } diff --git a/internal/servers/source.go b/internal/servers/source.go index debf99888e..d2571ce5b0 100644 --- a/internal/servers/source.go +++ b/internal/servers/source.go @@ -8,7 +8,7 @@ import ( "fmt" "github.com/cloudquery/plugin-sdk/internal/pb" - "github.com/cloudquery/plugin-sdk/plugins" + "github.com/cloudquery/plugin-sdk/plugins/source" "github.com/cloudquery/plugin-sdk/schema" "github.com/cloudquery/plugin-sdk/specs" "github.com/getsentry/sentry-go" @@ -20,7 +20,7 @@ import ( type SourceServer struct { pb.UnimplementedSourceServer - Plugin *plugins.SourcePlugin + Plugin *source.Plugin Logger zerolog.Logger } @@ -139,7 +139,7 @@ func (s *SourceServer) GetMetrics(context.Context, *pb.GetSourceMetrics_Request) // Aggregate metrics before sending to keep response size small. // Temporary fix for https://github.com/cloudquery/cloudquery/issues/3962 m := s.Plugin.Metrics() - agg := &plugins.TableClientMetrics{} + agg := &source.TableClientMetrics{} for _, table := range m.TableClient { for _, tableClient := range table { agg.Resources += tableClient.Resources @@ -147,8 +147,8 @@ func (s *SourceServer) GetMetrics(context.Context, *pb.GetSourceMetrics_Request) agg.Panics += tableClient.Panics } } - b, err := json.Marshal(&plugins.SourceMetrics{ - TableClient: map[string]map[string]*plugins.TableClientMetrics{"": {"": agg}}, + b, err := json.Marshal(&source.Metrics{ + TableClient: map[string]map[string]*source.TableClientMetrics{"": {"": agg}}, }) if err != nil { return nil, fmt.Errorf("failed to marshal source metrics: %w", err) diff --git a/plugins/destination_memdb_plugin.go b/plugins/destination/memdb_plugin.go similarity index 93% rename from plugins/destination_memdb_plugin.go rename to plugins/destination/memdb_plugin.go index c60553188b..cc869abb0b 100644 --- a/plugins/destination_memdb_plugin.go +++ b/plugins/destination/memdb_plugin.go @@ -1,4 +1,4 @@ -package plugins +package destination import ( "context" @@ -33,25 +33,25 @@ func withBlockingWrite() TestDestinationOption { } } -func getNewTestDestinationMemDBClient(options ...TestDestinationOption) NewDestinationClientFunc { +func getNewTestDestinationMemDBClient(options ...TestDestinationOption) NewClientFunc { c := &TestDestinationMemDBClient{ memoryDB: make(map[string][][]interface{}), } for _, opt := range options { opt(c) } - return func(context.Context, zerolog.Logger, specs.Destination) (DestinationClient, error) { + return func(context.Context, zerolog.Logger, specs.Destination) (Client, error) { return c, nil } } -func NewTestDestinationMemDBClient(context.Context, zerolog.Logger, specs.Destination) (DestinationClient, error) { +func NewTestDestinationMemDBClient(context.Context, zerolog.Logger, specs.Destination) (Client, error) { return &TestDestinationMemDBClient{ memoryDB: make(map[string][][]interface{}), }, nil } -func newTestDestinationMemDBClientErrOnNew(context.Context, zerolog.Logger, specs.Destination) (DestinationClient, error) { +func newTestDestinationMemDBClientErrOnNew(context.Context, zerolog.Logger, specs.Destination) (Client, error) { return nil, fmt.Errorf("newTestDestinationMemDBClientErrOnNew") } @@ -132,8 +132,8 @@ func (c *TestDestinationMemDBClient) Write(ctx context.Context, tables schema.Ta return nil } -func (*TestDestinationMemDBClient) Metrics() DestinationMetrics { - return DestinationMetrics{} +func (*TestDestinationMemDBClient) Metrics() Metrics { + return Metrics{} } func (c *TestDestinationMemDBClient) Close(context.Context) error { diff --git a/plugins/destination_metrics.go b/plugins/destination/metrics.go similarity index 70% rename from plugins/destination_metrics.go rename to plugins/destination/metrics.go index 40d2663a03..d00613ecf8 100644 --- a/plugins/destination_metrics.go +++ b/plugins/destination/metrics.go @@ -1,6 +1,6 @@ -package plugins +package destination -type DestinationMetrics struct { +type Metrics struct { // Errors number of errors / failed writes Errors uint64 // Writes number of successful writes diff --git a/plugins/destination.go b/plugins/destination/plugin.go similarity index 73% rename from plugins/destination.go rename to plugins/destination/plugin.go index 1ead0d5c43..697be753f6 100644 --- a/plugins/destination.go +++ b/plugins/destination/plugin.go @@ -1,4 +1,4 @@ -package plugins +package destination import ( "context" @@ -11,15 +11,15 @@ import ( "golang.org/x/sync/errgroup" ) -type NewDestinationClientFunc func(context.Context, zerolog.Logger, specs.Destination) (DestinationClient, error) +type NewClientFunc func(context.Context, zerolog.Logger, specs.Destination) (Client, error) -type DestinationClient interface { +type Client interface { schema.CQTypeTransformer ReverseTransformValues(table *schema.Table, values []interface{}) (schema.CQTypes, error) Migrate(ctx context.Context, tables schema.Tables) error Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- []interface{}) error Write(ctx context.Context, tables schema.Tables, res <-chan *ClientResource) error - Metrics() DestinationMetrics + Metrics() Metrics DeleteStale(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time) error Close(ctx context.Context) error } @@ -29,15 +29,15 @@ type ClientResource struct { Data []interface{} } -type DestinationPlugin struct { +type Plugin struct { // Name of destination plugin i.e postgresql,snowflake name string // Version of the destination plugin version string // Called upon configure call to validate and init configuration - newDestinationClient NewDestinationClientFunc + newDestinationClient NewClientFunc // initialized destination client - client DestinationClient + client Client // spec the client was initialized with spec specs.Destination // Logger to call, this logger is passed to the serve.Serve Client, if not define Serve will create one instead. @@ -46,8 +46,8 @@ type DestinationPlugin struct { const writeWorkers = 1 -func NewDestinationPlugin(name string, version string, newDestinationClient NewDestinationClientFunc) *DestinationPlugin { - p := &DestinationPlugin{ +func NewDestinationPlugin(name string, version string, newDestinationClient NewClientFunc) *Plugin { + p := &Plugin{ name: name, version: version, newDestinationClient: newDestinationClient, @@ -55,20 +55,20 @@ func NewDestinationPlugin(name string, version string, newDestinationClient NewD return p } -func (p *DestinationPlugin) Name() string { +func (p *Plugin) Name() string { return p.name } -func (p *DestinationPlugin) Version() string { +func (p *Plugin) Version() string { return p.version } -func (p *DestinationPlugin) Metrics() DestinationMetrics { +func (p *Plugin) Metrics() Metrics { return p.client.Metrics() } // we need lazy loading because we want to be able to initialize after -func (p *DestinationPlugin) Init(ctx context.Context, logger zerolog.Logger, spec specs.Destination) error { +func (p *Plugin) Init(ctx context.Context, logger zerolog.Logger, spec specs.Destination) error { var err error p.logger = logger p.spec = spec @@ -83,12 +83,12 @@ func (p *DestinationPlugin) Init(ctx context.Context, logger zerolog.Logger, spe } // we implement all DestinationClient functions so we can hook into pre-post behavior -func (p *DestinationPlugin) Migrate(ctx context.Context, tables schema.Tables) error { +func (p *Plugin) Migrate(ctx context.Context, tables schema.Tables) error { SetDestinationManagedCqColumns(tables) return p.client.Migrate(ctx, tables) } -func (p *DestinationPlugin) readAll(ctx context.Context, table *schema.Table, sourceName string) ([]schema.CQTypes, error) { +func (p *Plugin) readAll(ctx context.Context, table *schema.Table, sourceName string) ([]schema.CQTypes, error) { var readErr error ch := make(chan schema.CQTypes) go func() { @@ -103,7 +103,7 @@ func (p *DestinationPlugin) readAll(ctx context.Context, table *schema.Table, so return resources, readErr } -func (p *DestinationPlugin) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- schema.CQTypes) error { +func (p *Plugin) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- schema.CQTypes) error { SetDestinationManagedCqColumns(schema.Tables{table}) ch := make(chan []interface{}) var err error @@ -122,13 +122,13 @@ func (p *DestinationPlugin) Read(ctx context.Context, table *schema.Table, sourc } // this function is currently used mostly for testing so it's not a public api -func (p *DestinationPlugin) writeOne(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time, resource schema.DestinationResource) error { +func (p *Plugin) writeOne(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time, resource schema.DestinationResource) error { resources := []schema.DestinationResource{resource} return p.writeAll(ctx, tables, sourceName, syncTime, resources) } // this function is currently used mostly for testing so it's not a public api -func (p *DestinationPlugin) writeAll(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time, resources []schema.DestinationResource) error { +func (p *Plugin) writeAll(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time, resources []schema.DestinationResource) error { ch := make(chan schema.DestinationResource, len(resources)) for _, resource := range resources { ch <- resource @@ -137,7 +137,7 @@ func (p *DestinationPlugin) writeAll(ctx context.Context, tables schema.Tables, return p.Write(ctx, tables, sourceName, syncTime, ch) } -func (p *DestinationPlugin) Write(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time, res <-chan schema.DestinationResource) error { +func (p *Plugin) Write(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time, res <-chan schema.DestinationResource) error { syncTime = syncTime.UTC() SetDestinationManagedCqColumns(tables) ch := make(chan *ClientResource) @@ -179,12 +179,12 @@ func (p *DestinationPlugin) Write(ctx context.Context, tables schema.Tables, sou return nil } -func (p *DestinationPlugin) DeleteStale(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time) error { +func (p *Plugin) DeleteStale(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time) error { syncTime = syncTime.UTC() return p.client.DeleteStale(ctx, tables, sourceName, syncTime) } -func (p *DestinationPlugin) Close(ctx context.Context) error { +func (p *Plugin) Close(ctx context.Context) error { return p.client.Close(ctx) } diff --git a/plugins/destination_test.go b/plugins/destination/plugin_test.go similarity index 87% rename from plugins/destination_test.go rename to plugins/destination/plugin_test.go index ef2175be23..219da4410c 100644 --- a/plugins/destination_test.go +++ b/plugins/destination/plugin_test.go @@ -1,4 +1,4 @@ -package plugins +package destination import ( "context" @@ -10,13 +10,13 @@ import ( "github.com/cloudquery/plugin-sdk/specs" ) -func TestDestinationPlugin(t *testing.T) { +func TestPlugin(t *testing.T) { p := NewDestinationPlugin("test", "development", NewTestDestinationMemDBClient) - DestinationPluginTestSuiteRunner(t, p, nil, - DestinationTestSuiteTests{}) + PluginTestSuiteRunner(t, p, nil, + TestSuiteTests{}) } -func TestDestinationOnNewError(t *testing.T) { +func TestOnNewError(t *testing.T) { ctx := context.Background() p := NewDestinationPlugin("test", "development", newTestDestinationMemDBClientErrOnNew) err := p.Init(ctx, getTestLogger(t), specs.Destination{}) @@ -26,7 +26,7 @@ func TestDestinationOnNewError(t *testing.T) { } } -func TestDestinationOnWriteError(t *testing.T) { +func TestOnWriteError(t *testing.T) { ctx := context.Background() newClientFunc := getNewTestDestinationMemDBClient(withErrOnWrite()) p := NewDestinationPlugin("test", "development", newClientFunc) @@ -53,7 +53,7 @@ func TestDestinationOnWriteError(t *testing.T) { } } -func TestDestinationOnWriteCtxCancelled(t *testing.T) { +func TestOnWriteCtxCancelled(t *testing.T) { ctx := context.Background() newClientFunc := getNewTestDestinationMemDBClient(withBlockingWrite()) p := NewDestinationPlugin("test", "development", newClientFunc) diff --git a/plugins/destination_reverse_transformer.go b/plugins/destination/reverse_transformer.go similarity index 97% rename from plugins/destination_reverse_transformer.go rename to plugins/destination/reverse_transformer.go index ed51fa32a4..c6939bf73d 100644 --- a/plugins/destination_reverse_transformer.go +++ b/plugins/destination/reverse_transformer.go @@ -1,4 +1,4 @@ -package plugins +package destination import ( "fmt" diff --git a/plugins/destination_testing.go b/plugins/destination/testing.go similarity index 91% rename from plugins/destination_testing.go rename to plugins/destination/testing.go index d04864ba9a..a97302a869 100644 --- a/plugins/destination_testing.go +++ b/plugins/destination/testing.go @@ -1,4 +1,4 @@ -package plugins +package destination import ( "context" @@ -14,11 +14,11 @@ import ( "github.com/rs/zerolog" ) -type destinationTestSuite struct { - tests DestinationTestSuiteTests +type TestSuite struct { + tests TestSuiteTests } -type DestinationTestSuiteTests struct { +type TestSuiteTests struct { // SkipOverwrite skips testing for "overwrite" mode. Use if the destination // // plugin doesn't support this feature. SkipOverwrite bool @@ -45,7 +45,7 @@ func getTestLogger(t *testing.T) zerolog.Logger { ).Level(zerolog.DebugLevel).With().Timestamp().Logger() } -func (s *destinationTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, p *DestinationPlugin, logger zerolog.Logger, spec specs.Destination) error { +func (s *TestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error { spec.WriteMode = specs.WriteModeOverwrite if err := p.Init(ctx, logger, spec); err != nil { return fmt.Errorf("failed to init plugin: %w", err) @@ -140,7 +140,7 @@ func (s *destinationTestSuite) destinationPluginTestWriteOverwrite(ctx context.C return nil } -func (s *destinationTestSuite) destinationPluginTestWriteAppend(ctx context.Context, p *DestinationPlugin, logger zerolog.Logger, spec specs.Destination) error { +func (s *TestSuite) destinationPluginTestWriteAppend(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error { spec.WriteMode = specs.WriteModeAppend if err := p.Init(ctx, logger, spec); err != nil { return fmt.Errorf("failed to init plugin: %w", err) @@ -207,13 +207,13 @@ func (s *destinationTestSuite) destinationPluginTestWriteAppend(ctx context.Cont return nil } -func DestinationPluginTestSuiteRunner(t *testing.T, p *DestinationPlugin, spec interface{}, tests DestinationTestSuiteTests) { +func PluginTestSuiteRunner(t *testing.T, p *Plugin, spec interface{}, tests TestSuiteTests) { t.Helper() destSpec := specs.Destination{ Name: "testsuite", Spec: spec, } - suite := &destinationTestSuite{ + suite := &TestSuite{ tests: tests, } ctx := context.Background() diff --git a/plugins/.snapshots/TestGenerateSourcePluginDocs-JSON-__tables.json b/plugins/source/.snapshots/TestGeneratePluginDocs-JSON-__tables.json similarity index 100% rename from plugins/.snapshots/TestGenerateSourcePluginDocs-JSON-__tables.json rename to plugins/source/.snapshots/TestGeneratePluginDocs-JSON-__tables.json diff --git a/plugins/.snapshots/TestGenerateSourcePluginDocs-Markdown-README.md b/plugins/source/.snapshots/TestGeneratePluginDocs-Markdown-README.md similarity index 100% rename from plugins/.snapshots/TestGenerateSourcePluginDocs-Markdown-README.md rename to plugins/source/.snapshots/TestGeneratePluginDocs-Markdown-README.md diff --git a/plugins/.snapshots/TestGenerateSourcePluginDocs-Markdown-relation_relation_table.md b/plugins/source/.snapshots/TestGeneratePluginDocs-Markdown-relation_relation_table.md similarity index 100% rename from plugins/.snapshots/TestGenerateSourcePluginDocs-Markdown-relation_relation_table.md rename to plugins/source/.snapshots/TestGeneratePluginDocs-Markdown-relation_relation_table.md diff --git a/plugins/.snapshots/TestGenerateSourcePluginDocs-Markdown-relation_table.md b/plugins/source/.snapshots/TestGeneratePluginDocs-Markdown-relation_table.md similarity index 100% rename from plugins/.snapshots/TestGenerateSourcePluginDocs-Markdown-relation_table.md rename to plugins/source/.snapshots/TestGeneratePluginDocs-Markdown-relation_table.md diff --git a/plugins/.snapshots/TestGenerateSourcePluginDocs-Markdown-test_table.md b/plugins/source/.snapshots/TestGeneratePluginDocs-Markdown-test_table.md similarity index 100% rename from plugins/.snapshots/TestGenerateSourcePluginDocs-Markdown-test_table.md rename to plugins/source/.snapshots/TestGeneratePluginDocs-Markdown-test_table.md diff --git a/plugins/source/.snapshots/TestGenerateSourcePluginDocs-JSON-__tables.json b/plugins/source/.snapshots/TestGenerateSourcePluginDocs-JSON-__tables.json new file mode 100644 index 0000000000..4ed34ef55c --- /dev/null +++ b/plugins/source/.snapshots/TestGenerateSourcePluginDocs-JSON-__tables.json @@ -0,0 +1,125 @@ +[ + { + "name": "test_table", + "description": "Description for test table", + "columns": [ + { + "name": "_cq_source_name", + "type": "TypeString" + }, + { + "name": "_cq_sync_time", + "type": "TypeTimestamp" + }, + { + "name": "_cq_id", + "type": "TypeUUID" + }, + { + "name": "_cq_parent_id", + "type": "TypeUUID" + }, + { + "name": "int_col", + "type": "TypeInt" + }, + { + "name": "id_col", + "type": "TypeInt", + "is_primary_key": true + }, + { + "name": "id_col2", + "type": "TypeInt", + "is_primary_key": true + } + ], + "relations": [ + { + "name": "relation_table", + "description": "Description for relational table", + "columns": [ + { + "name": "_cq_source_name", + "type": "TypeString" + }, + { + "name": "_cq_sync_time", + "type": "TypeTimestamp" + }, + { + "name": "_cq_id", + "type": "TypeUUID", + "is_primary_key": true + }, + { + "name": "_cq_parent_id", + "type": "TypeUUID" + }, + { + "name": "string_col", + "type": "TypeString" + } + ], + "relations": [ + { + "name": "relation_relation_table", + "description": "Description for relational table's relation", + "columns": [ + { + "name": "_cq_source_name", + "type": "TypeString" + }, + { + "name": "_cq_sync_time", + "type": "TypeTimestamp" + }, + { + "name": "_cq_id", + "type": "TypeUUID", + "is_primary_key": true + }, + { + "name": "_cq_parent_id", + "type": "TypeUUID" + }, + { + "name": "string_col", + "type": "TypeString" + } + ], + "relations": [] + } + ] + }, + { + "name": "relation_table2", + "description": "Description for second relational table", + "columns": [ + { + "name": "_cq_source_name", + "type": "TypeString" + }, + { + "name": "_cq_sync_time", + "type": "TypeTimestamp" + }, + { + "name": "_cq_id", + "type": "TypeUUID", + "is_primary_key": true + }, + { + "name": "_cq_parent_id", + "type": "TypeUUID" + }, + { + "name": "string_col", + "type": "TypeString" + } + ], + "relations": [] + } + ] + } +] diff --git a/plugins/source/.snapshots/TestGenerateSourcePluginDocs-Markdown-README.md b/plugins/source/.snapshots/TestGenerateSourcePluginDocs-Markdown-README.md new file mode 100644 index 0000000000..5ef4c70d71 --- /dev/null +++ b/plugins/source/.snapshots/TestGenerateSourcePluginDocs-Markdown-README.md @@ -0,0 +1,6 @@ +# Source Plugin: test +## Tables +- [test_table](test_table.md) + - [relation_table](relation_table.md) + - [relation_relation_table](relation_relation_table.md) + - [relation_table2](relation_table2.md) diff --git a/plugins/source/.snapshots/TestGenerateSourcePluginDocs-Markdown-relation_relation_table.md b/plugins/source/.snapshots/TestGenerateSourcePluginDocs-Markdown-relation_relation_table.md new file mode 100644 index 0000000000..a4cce162a8 --- /dev/null +++ b/plugins/source/.snapshots/TestGenerateSourcePluginDocs-Markdown-relation_relation_table.md @@ -0,0 +1,18 @@ +# Table: relation_relation_table + +Description for relational table's relation + +The primary key for this table is **_cq_id**. + +## Relations +This table depends on [relation_table](relation_table.md). + + +## Columns +| Name | Type | +| ------------- | ------------- | +|_cq_source_name|String| +|_cq_sync_time|Timestamp| +|_cq_id (PK)|UUID| +|_cq_parent_id|UUID| +|string_col|String| diff --git a/plugins/source/.snapshots/TestGenerateSourcePluginDocs-Markdown-relation_table.md b/plugins/source/.snapshots/TestGenerateSourcePluginDocs-Markdown-relation_table.md new file mode 100644 index 0000000000..71c4b3fae8 --- /dev/null +++ b/plugins/source/.snapshots/TestGenerateSourcePluginDocs-Markdown-relation_table.md @@ -0,0 +1,20 @@ +# Table: relation_table + +Description for relational table + +The primary key for this table is **_cq_id**. + +## Relations +This table depends on [test_table](test_table.md). + +The following tables depend on relation_table: + - [relation_relation_table](relation_relation_table.md) + +## Columns +| Name | Type | +| ------------- | ------------- | +|_cq_source_name|String| +|_cq_sync_time|Timestamp| +|_cq_id (PK)|UUID| +|_cq_parent_id|UUID| +|string_col|String| diff --git a/plugins/source/.snapshots/TestGenerateSourcePluginDocs-Markdown-test_table.md b/plugins/source/.snapshots/TestGenerateSourcePluginDocs-Markdown-test_table.md new file mode 100644 index 0000000000..9c1fd5e91a --- /dev/null +++ b/plugins/source/.snapshots/TestGenerateSourcePluginDocs-Markdown-test_table.md @@ -0,0 +1,22 @@ +# Table: test_table + +Description for test table + +The composite primary key for this table is (**id_col**, **id_col2**). + +## Relations + +The following tables depend on test_table: + - [relation_table](relation_table.md) + - [relation_table2](relation_table2.md) + +## Columns +| Name | Type | +| ------------- | ------------- | +|_cq_source_name|String| +|_cq_sync_time|Timestamp| +|_cq_id|UUID| +|_cq_parent_id|UUID| +|int_col|Int| +|id_col (PK)|Int| +|id_col2 (PK)|Int| diff --git a/plugins/benchmark_test.go b/plugins/source/benchmark_test.go similarity index 93% rename from plugins/benchmark_test.go rename to plugins/source/benchmark_test.go index fea7f4f4cf..e749e5d4c9 100644 --- a/plugins/benchmark_test.go +++ b/plugins/source/benchmark_test.go @@ -1,4 +1,4 @@ -package plugins +package source import ( "context" @@ -55,19 +55,19 @@ func (s *BenchmarkScenario) SetDefaults() { } } -type SourceBenchmark struct { +type Benchmark struct { *BenchmarkScenario b *testing.B tables []*schema.Table - plugin *SourcePlugin + plugin *Plugin apiCalls atomic.Int64 } -func NewSourceBenchmark(b *testing.B, scenario BenchmarkScenario) *SourceBenchmark { +func NewBenchmark(b *testing.B, scenario BenchmarkScenario) *Benchmark { scenario.SetDefaults() - sb := &SourceBenchmark{ + sb := &Benchmark{ BenchmarkScenario: &scenario, b: b, tables: nil, @@ -77,7 +77,7 @@ func NewSourceBenchmark(b *testing.B, scenario BenchmarkScenario) *SourceBenchma return sb } -func (s *SourceBenchmark) setup(b *testing.B) { +func (s *Benchmark) setup(b *testing.B) { tableResolver := func(ctx context.Context, meta schema.ClientMeta, parent *schema.Resource, res chan<- interface{}) error { s.simulateAPICall(s.ResolverMin, s.ResolverStdDev, s.ResolverMean) total := 0 @@ -145,8 +145,8 @@ func (s *SourceBenchmark) setup(b *testing.B) { } } - plugin := NewSourcePlugin( - "testSourcePlugin", + plugin := NewPlugin( + "testPlugin", "1.0.0", s.tables, newTestExecutionClient, @@ -156,7 +156,7 @@ func (s *SourceBenchmark) setup(b *testing.B) { s.b = b } -func (s *SourceBenchmark) simulateAPICall(min, stdDev, mean time.Duration) { +func (s *Benchmark) simulateAPICall(min, stdDev, mean time.Duration) { s.apiCalls.Add(1) sample := int(rand.NormFloat64()*float64(stdDev) + float64(mean)) duration := time.Duration(sample) @@ -174,7 +174,7 @@ func min(a, b int) int { return b } -func (s *SourceBenchmark) Run() { +func (s *Benchmark) Run() { for n := 0; n < s.b.N; n++ { s.b.StopTimer() ctx := context.Background() @@ -220,7 +220,7 @@ func (s *SourceBenchmark) Run() { // lowerBound calculates a rough lower bound on the sync time so that we know how // much room there is for optimization. This does not currently take the "concurrency" // value into account. Use this number only as a rough guide. -func (s *SourceBenchmark) lowerBound() time.Duration { +func (s *Benchmark) lowerBound() time.Duration { // we require one API call per page pages := s.ResourcesPerTable / s.ResourcesPerPage if s.ResourcesPerTable%s.ResourcesPerPage == 0 { @@ -288,7 +288,7 @@ func benchmarkWithConcurrency(b *testing.B, concurrency uint64) { ResolverStdDev: 100 * time.Millisecond, Concurrency: concurrency, } - sb := NewSourceBenchmark(b, bs) + sb := NewBenchmark(b, bs) sb.Run() } @@ -311,6 +311,6 @@ func benchmarkTablesWithChildrenConcurrency(b *testing.B, concurrency uint64) { ResolverStdDev: 100 * time.Millisecond, Concurrency: concurrency, } - sb := NewSourceBenchmark(b, bs) + sb := NewBenchmark(b, bs) sb.Run() } diff --git a/plugins/source_docs.go b/plugins/source/docs.go similarity index 87% rename from plugins/source_docs.go rename to plugins/source/docs.go index ddc0c4d10f..d54e8e009f 100644 --- a/plugins/source_docs.go +++ b/plugins/source/docs.go @@ -1,4 +1,4 @@ -package plugins +package source import ( "embed" @@ -9,19 +9,20 @@ import ( "strings" "text/template" + "github.com/cloudquery/plugin-sdk/plugins/destination" "github.com/cloudquery/plugin-sdk/schema" ) //go:embed templates/*.go.tpl var templatesFS embed.FS -// GenerateSourcePluginDocs creates table documentation for the source plugin based on its list of tables -func (p *SourcePlugin) GenerateSourcePluginDocs(dir, format string) error { +// GeneratePluginDocs creates table documentation for the source plugin based on its list of tables +func (p *Plugin) GeneratePluginDocs(dir, format string) error { if err := os.MkdirAll(dir, os.ModePerm); err != nil { return err } - SetDestinationManagedCqColumns(p.Tables()) + destination.SetDestinationManagedCqColumns(p.Tables()) switch format { case "markdown": @@ -46,7 +47,7 @@ type jsonColumn struct { IsPrimaryKey bool `json:"is_primary_key,omitempty"` } -func (p *SourcePlugin) renderTablesAsJSON(dir string) error { +func (p *Plugin) renderTablesAsJSON(dir string) error { tables := p.jsonifyTables(p.Tables()) b, err := json.MarshalIndent(tables, "", " ") if err != nil { @@ -56,7 +57,7 @@ func (p *SourcePlugin) renderTablesAsJSON(dir string) error { return os.WriteFile(outputPath, b, 0644) } -func (p *SourcePlugin) jsonifyTables(tables schema.Tables) []jsonTable { +func (p *Plugin) jsonifyTables(tables schema.Tables) []jsonTable { jsonTables := make([]jsonTable, len(tables)) for i, table := range tables { jsonColumns := make([]jsonColumn, len(table.Columns)) @@ -77,7 +78,7 @@ func (p *SourcePlugin) jsonifyTables(tables schema.Tables) []jsonTable { return jsonTables } -func (p *SourcePlugin) renderTablesAsMarkdown(dir string) error { +func (p *Plugin) renderTablesAsMarkdown(dir string) error { for _, table := range p.Tables() { if err := renderAllTables(table, dir); err != nil { return err diff --git a/plugins/source_docs_test.go b/plugins/source/docs_test.go similarity index 84% rename from plugins/source_docs_test.go rename to plugins/source/docs_test.go index 10d1d00555..83e68ef202 100644 --- a/plugins/source_docs_test.go +++ b/plugins/source/docs_test.go @@ -1,6 +1,6 @@ //go:build !windows -package plugins +package source import ( "os" @@ -69,15 +69,15 @@ var testTables = []*schema.Table{ }, } -func TestGenerateSourcePluginDocs(t *testing.T) { - p := NewSourcePlugin("test", "v1.0.0", testTables, newTestExecutionClient) +func TestGeneratePluginDocs(t *testing.T) { + p := NewPlugin("test", "v1.0.0", testTables, newTestExecutionClient) t.Run("Markdown", func(t *testing.T) { tmpdir := t.TempDir() - err := p.GenerateSourcePluginDocs(tmpdir, "markdown") + err := p.GeneratePluginDocs(tmpdir, "markdown") if err != nil { - t.Fatalf("unexpected error calling GenerateSourcePluginDocs: %v", err) + t.Fatalf("unexpected error calling GeneratePluginDocs: %v", err) } expectFiles := []string{"test_table.md", "relation_table.md", "relation_relation_table.md", "README.md"} @@ -94,9 +94,9 @@ func TestGenerateSourcePluginDocs(t *testing.T) { t.Run("JSON", func(t *testing.T) { tmpdir := t.TempDir() - err := p.GenerateSourcePluginDocs(tmpdir, "json") + err := p.GeneratePluginDocs(tmpdir, "json") if err != nil { - t.Fatalf("unexpected error calling GenerateSourcePluginDocs: %v", err) + t.Fatalf("unexpected error calling GeneratePluginDocs: %v", err) } expectFiles := []string{"__tables.json"} diff --git a/plugins/source_metrics.go b/plugins/source/metrics.go similarity index 84% rename from plugins/source_metrics.go rename to plugins/source/metrics.go index 4dabb9dd18..440ef63ace 100644 --- a/plugins/source_metrics.go +++ b/plugins/source/metrics.go @@ -1,4 +1,4 @@ -package plugins +package source import ( "time" @@ -6,7 +6,7 @@ import ( "github.com/cloudquery/plugin-sdk/schema" ) -type SourceMetrics struct { +type Metrics struct { TableClient map[string]map[string]*TableClientMetrics } @@ -23,7 +23,7 @@ func (s *TableClientMetrics) Equal(other *TableClientMetrics) bool { } // Equal compares to stats. Mostly useful in testing -func (s *SourceMetrics) Equal(other *SourceMetrics) bool { +func (s *Metrics) Equal(other *Metrics) bool { for table, clientStats := range s.TableClient { for client, stats := range clientStats { if _, ok := other.TableClient[table]; !ok { @@ -53,7 +53,7 @@ func (s *SourceMetrics) Equal(other *SourceMetrics) bool { return true } -func (s *SourceMetrics) initWithClients(table *schema.Table, clients []schema.ClientMeta) { +func (s *Metrics) initWithClients(table *schema.Table, clients []schema.ClientMeta) { s.TableClient[table.Name] = make(map[string]*TableClientMetrics, len(clients)) for _, client := range clients { s.TableClient[table.Name][client.ID()] = &TableClientMetrics{} @@ -63,7 +63,7 @@ func (s *SourceMetrics) initWithClients(table *schema.Table, clients []schema.Cl } } -func (s *SourceMetrics) TotalErrors() uint64 { +func (s *Metrics) TotalErrors() uint64 { var total uint64 for _, clientMetrics := range s.TableClient { for _, metrics := range clientMetrics { @@ -73,7 +73,7 @@ func (s *SourceMetrics) TotalErrors() uint64 { return total } -func (s *SourceMetrics) TotalPanics() uint64 { +func (s *Metrics) TotalPanics() uint64 { var total uint64 for _, clientMetrics := range s.TableClient { for _, metrics := range clientMetrics { @@ -83,7 +83,7 @@ func (s *SourceMetrics) TotalPanics() uint64 { return total } -func (s *SourceMetrics) TotalResources() uint64 { +func (s *Metrics) TotalResources() uint64 { var total uint64 for _, clientMetrics := range s.TableClient { for _, metrics := range clientMetrics { diff --git a/plugins/source_metrics_test.go b/plugins/source/metrics_test.go similarity index 88% rename from plugins/source_metrics_test.go rename to plugins/source/metrics_test.go index 61a2909753..b18cdb387a 100644 --- a/plugins/source_metrics_test.go +++ b/plugins/source/metrics_test.go @@ -1,9 +1,9 @@ -package plugins +package source import "testing" -func TestSourceMetrics(t *testing.T) { - s := &SourceMetrics{ +func TestMetrics(t *testing.T) { + s := &Metrics{ TableClient: make(map[string]map[string]*TableClientMetrics), } s.TableClient["test_table"] = make(map[string]*TableClientMetrics) @@ -22,7 +22,7 @@ func TestSourceMetrics(t *testing.T) { t.Fatal("expected 3 panics") } - other := &SourceMetrics{ + other := &Metrics{ TableClient: make(map[string]map[string]*TableClientMetrics), } other.TableClient["test_table"] = make(map[string]*TableClientMetrics) diff --git a/plugins/source.go b/plugins/source/plugin.go similarity index 80% rename from plugins/source.go rename to plugins/source/plugin.go index 94f7da4a3d..dd28b7f460 100644 --- a/plugins/source.go +++ b/plugins/source/plugin.go @@ -1,4 +1,4 @@ -package plugins +package source import ( "context" @@ -12,21 +12,21 @@ import ( "golang.org/x/sync/semaphore" ) -type SourceNewExecutionClientFunc func(context.Context, zerolog.Logger, specs.Source) (schema.ClientMeta, error) +type NewExecutionClientFunc func(context.Context, zerolog.Logger, specs.Source) (schema.ClientMeta, error) -// SourcePlugin is the base structure required to pass to sdk.serve +// Plugin is the base structure required to pass to sdk.serve // We take a declarative approach to API here similar to Cobra -type SourcePlugin struct { +type Plugin struct { // Name of plugin i.e aws,gcp, azure etc' name string // Version of the plugin version string // Called upon configure call to validate and init configuration - newExecutionClient SourceNewExecutionClientFunc + newExecutionClient NewExecutionClientFunc // Tables is all tables supported by this source plugin tables schema.Tables // status sync metrics - metrics *SourceMetrics + metrics *Metrics // Logger to call, this logger is passed to the serve.Serve Client, if not defined Serve will create one instead. logger zerolog.Logger // resourceSem is a semaphore that limits the number of concurrent resources being fetched @@ -80,15 +80,15 @@ func maxDepth(tables schema.Tables) uint64 { return depth } -// NewSourcePlugin returns a new plugin with a given name, version, tables, newExecutionClient +// NewPlugin returns a new plugin with a given name, version, tables, newExecutionClient // and additional options. -func NewSourcePlugin(name string, version string, tables []*schema.Table, newExecutionClient SourceNewExecutionClientFunc) *SourcePlugin { - p := SourcePlugin{ +func NewPlugin(name string, version string, tables []*schema.Table, newExecutionClient NewExecutionClientFunc) *Plugin { + p := Plugin{ name: name, version: version, tables: tables, newExecutionClient: newExecutionClient, - metrics: &SourceMetrics{TableClient: make(map[string]map[string]*TableClientMetrics)}, + metrics: &Metrics{TableClient: make(map[string]map[string]*TableClientMetrics)}, caser: caser.New(), } addInternalColumns(p.tables) @@ -103,18 +103,18 @@ func NewSourcePlugin(name string, version string, tables []*schema.Table, newExe return &p } -func (p *SourcePlugin) SetLogger(logger zerolog.Logger) { +func (p *Plugin) SetLogger(logger zerolog.Logger) { p.logger = logger.With().Str("module", p.name+"-src").Logger() } // Tables returns all tables supported by this source plugin -func (p *SourcePlugin) Tables() schema.Tables { +func (p *Plugin) Tables() schema.Tables { return p.tables } // TablesForSpec returns all tables supported by this source plugin that match the given spec. // It validates the tables part of the spec and will return an error if it is found to be invalid. -func (p *SourcePlugin) TablesForSpec(spec specs.Source) (schema.Tables, error) { +func (p *Plugin) TablesForSpec(spec specs.Source) (schema.Tables, error) { spec.SetDefaults() if err := spec.Validate(); err != nil { return nil, fmt.Errorf("invalid spec: %w", err) @@ -127,21 +127,21 @@ func (p *SourcePlugin) TablesForSpec(spec specs.Source) (schema.Tables, error) { } // Name return the name of this plugin -func (p *SourcePlugin) Name() string { +func (p *Plugin) Name() string { return p.name } // Version returns the version of this plugin -func (p *SourcePlugin) Version() string { +func (p *Plugin) Version() string { return p.version } -func (p *SourcePlugin) Metrics() *SourceMetrics { +func (p *Plugin) Metrics() *Metrics { return p.metrics } // Sync is syncing data from the requested tables in spec to the given channel -func (p *SourcePlugin) Sync(ctx context.Context, spec specs.Source, res chan<- *schema.Resource) error { +func (p *Plugin) Sync(ctx context.Context, spec specs.Source, res chan<- *schema.Resource) error { spec.SetDefaults() if err := spec.Validate(); err != nil { return fmt.Errorf("invalid spec: %w", err) diff --git a/plugins/source_test.go b/plugins/source/plugin_test.go similarity index 97% rename from plugins/source_test.go rename to plugins/source/plugin_test.go index c5557c2132..d171291c79 100644 --- a/plugins/source_test.go +++ b/plugins/source/plugin_test.go @@ -1,4 +1,4 @@ -package plugins +package source import ( "context" @@ -121,14 +121,14 @@ func newTestExecutionClient(context.Context, zerolog.Logger, specs.Source) (sche type syncTestCase struct { table *schema.Table - stats SourceMetrics + stats Metrics data []schema.CQTypes } var syncTestCases = []syncTestCase{ { table: testTableSuccess(), - stats: SourceMetrics{ + stats: Metrics{ TableClient: map[string]map[string]*TableClientMetrics{ "test_table_success": { "testExecutionClient": { @@ -147,7 +147,7 @@ var syncTestCases = []syncTestCase{ }, { table: testTableResolverPanic(), - stats: SourceMetrics{ + stats: Metrics{ TableClient: map[string]map[string]*TableClientMetrics{ "test_table_resolver_panic": { "testExecutionClient": { @@ -160,7 +160,7 @@ var syncTestCases = []syncTestCase{ }, { table: testTablePreResourceResolverPanic(), - stats: SourceMetrics{ + stats: Metrics{ TableClient: map[string]map[string]*TableClientMetrics{ "test_table_pre_resource_resolver_panic": { "testExecutionClient": { @@ -173,7 +173,7 @@ var syncTestCases = []syncTestCase{ }, { table: testTableColumnResolverPanic(), - stats: SourceMetrics{ + stats: Metrics{ TableClient: map[string]map[string]*TableClientMetrics{ "test_table_column_resolver_panic": { "testExecutionClient": { @@ -194,7 +194,7 @@ var syncTestCases = []syncTestCase{ }, { table: testTableRelationSuccess(), - stats: SourceMetrics{ + stats: Metrics{ TableClient: map[string]map[string]*TableClientMetrics{ "test_table_relation_success": { "testExecutionClient": { @@ -248,7 +248,7 @@ func testSyncTable(t *testing.T, tc syncTestCase) { tc.table, } - plugin := NewSourcePlugin( + plugin := NewPlugin( "testSourcePlugin", "1.0.0", tables, diff --git a/plugins/source_scheduler_dfs.go b/plugins/source/scheduler_dfs.go similarity index 88% rename from plugins/source_scheduler_dfs.go rename to plugins/source/scheduler_dfs.go index dd27d1df5e..42961b1c81 100644 --- a/plugins/source_scheduler_dfs.go +++ b/plugins/source/scheduler_dfs.go @@ -1,4 +1,4 @@ -package plugins +package source import ( "context" @@ -22,7 +22,7 @@ const ( minResourceConcurrency = 100 ) -func (p *SourcePlugin) syncDfs(ctx context.Context, spec specs.Source, client schema.ClientMeta, tables schema.Tables, resolvedResources chan<- *schema.Resource) { +func (p *Plugin) syncDfs(ctx context.Context, spec specs.Source, client schema.ClientMeta, tables schema.Tables, resolvedResources chan<- *schema.Resource) { // This is very similar to the concurrent web crawler problem with some minor changes. // We are using DFS to make sure memory usage is capped at O(h) where h is the height of the tree. tableConcurrency := max(spec.Concurrency/minResourceConcurrency, minTableConcurrency) @@ -74,7 +74,7 @@ func (p *SourcePlugin) syncDfs(ctx context.Context, spec specs.Source, client sc wg.Wait() } -func (p *SourcePlugin) resolveTableDfs(ctx context.Context, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, resolvedResources chan<- *schema.Resource, depth int) { +func (p *Plugin) resolveTableDfs(ctx context.Context, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, resolvedResources chan<- *schema.Resource, depth int) { clientName := client.ID() logger := p.logger.With().Str("table", table.Name).Str("client", clientName).Logger() @@ -114,7 +114,7 @@ func (p *SourcePlugin) resolveTableDfs(ctx context.Context, table *schema.Table, } } -func (p *SourcePlugin) resolveResourcesDfs(ctx context.Context, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, resources interface{}, resolvedResources chan<- *schema.Resource, depth int) { +func (p *Plugin) resolveResourcesDfs(ctx context.Context, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, resources interface{}, resolvedResources chan<- *schema.Resource, depth int) { resourcesSlice := helpers.InterfaceSlice(resources) if len(resourcesSlice) == 0 { return @@ -168,7 +168,7 @@ func (p *SourcePlugin) resolveResourcesDfs(ctx context.Context, table *schema.Ta wg.Wait() } -func (p *SourcePlugin) resolveResource(ctx context.Context, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, item interface{}) *schema.Resource { +func (p *Plugin) resolveResource(ctx context.Context, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, item interface{}) *schema.Resource { ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) defer cancel() resource := schema.NewResourceData(table, parent, item) @@ -205,7 +205,7 @@ func (p *SourcePlugin) resolveResource(ctx context.Context, table *schema.Table, return resource } -func (p *SourcePlugin) resolveColumn(ctx context.Context, logger zerolog.Logger, tableMetrics *TableClientMetrics, client schema.ClientMeta, resource *schema.Resource, c schema.Column) { +func (p *Plugin) resolveColumn(ctx context.Context, logger zerolog.Logger, tableMetrics *TableClientMetrics, client schema.ClientMeta, resource *schema.Resource, c schema.Column) { columnStartTime := time.Now() defer func() { if err := recover(); err != nil { diff --git a/plugins/templates/all_tables.md.go.tpl b/plugins/source/templates/all_tables.md.go.tpl similarity index 100% rename from plugins/templates/all_tables.md.go.tpl rename to plugins/source/templates/all_tables.md.go.tpl diff --git a/plugins/templates/all_tables_entry.md.go.tpl b/plugins/source/templates/all_tables_entry.md.go.tpl similarity index 100% rename from plugins/templates/all_tables_entry.md.go.tpl rename to plugins/source/templates/all_tables_entry.md.go.tpl diff --git a/plugins/templates/table.md.go.tpl b/plugins/source/templates/table.md.go.tpl similarity index 100% rename from plugins/templates/table.md.go.tpl rename to plugins/source/templates/table.md.go.tpl diff --git a/plugins/source_testing.go b/plugins/source/testing.go similarity index 87% rename from plugins/source_testing.go rename to plugins/source/testing.go index 5cd054e66c..96302b8148 100644 --- a/plugins/source_testing.go +++ b/plugins/source/testing.go @@ -1,4 +1,4 @@ -package plugins +package source import ( "context" @@ -8,10 +8,10 @@ import ( "github.com/cloudquery/plugin-sdk/specs" ) -func TestSourcePluginSync(t *testing.T, plugin *SourcePlugin, spec specs.Source, opts ...TestSourcePluginOption) { +func TestPluginSync(t *testing.T, plugin *Plugin, spec specs.Source, opts ...TestPluginOption) { t.Helper() - o := &testSourcePluginOptions{ + o := &testPluginOptions{ parallel: true, } for _, opt := range opts { @@ -40,15 +40,15 @@ func TestSourcePluginSync(t *testing.T, plugin *SourcePlugin, spec specs.Source, validateTables(t, plugin.Tables(), syncedResources) } -type TestSourcePluginOption func(*testSourcePluginOptions) +type TestPluginOption func(*testPluginOptions) -func WithTestSourcePluginNoParallel() TestSourcePluginOption { - return func(f *testSourcePluginOptions) { +func WithTestPluginNoParallel() TestPluginOption { + return func(f *testPluginOptions) { f.parallel = false } } -type testSourcePluginOptions struct { +type testPluginOptions struct { parallel bool } diff --git a/plugins/source_validate.go b/plugins/source/validate.go similarity index 91% rename from plugins/source_validate.go rename to plugins/source/validate.go index 9b620a9354..835b798c7e 100644 --- a/plugins/source_validate.go +++ b/plugins/source/validate.go @@ -1,10 +1,10 @@ -package plugins +package source import ( "fmt" ) -func (p *SourcePlugin) validate() error { +func (p *Plugin) validate() error { if err := p.tables.ValidateDuplicateColumns(); err != nil { return fmt.Errorf("found duplicate columns in source plugin: %s: %w", p.name, err) } diff --git a/serve/destination.go b/serve/destination.go index d0c8111a6a..257c2a3366 100644 --- a/serve/destination.go +++ b/serve/destination.go @@ -10,7 +10,7 @@ import ( "github.com/cloudquery/plugin-sdk/internal/pb" "github.com/cloudquery/plugin-sdk/internal/servers" - "github.com/cloudquery/plugin-sdk/plugins" + "github.com/cloudquery/plugin-sdk/plugins/destination" "github.com/getsentry/sentry-go" grpczerolog "github.com/grpc-ecosystem/go-grpc-middleware/providers/zerolog/v2" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" @@ -23,7 +23,7 @@ import ( ) type destinationServe struct { - plugin *plugins.DestinationPlugin + plugin *destination.Plugin sentryDSN string } @@ -40,7 +40,7 @@ var testDestinationListenerLock sync.Mutex const serveDestinationShort = `Start destination plugin server` -func Destination(plugin *plugins.DestinationPlugin, opts ...DestinationOption) { +func Destination(plugin *destination.Plugin, opts ...DestinationOption) { s := &destinationServe{ plugin: plugin, } @@ -55,7 +55,7 @@ func Destination(plugin *plugins.DestinationPlugin, opts ...DestinationOption) { } // nolint:dupl -func newCmdDestinationServe(destination *destinationServe) *cobra.Command { +func newCmdDestinationServe(serve *destinationServe) *cobra.Command { var address string var network string var noSentry bool @@ -109,14 +109,14 @@ func newCmdDestinationServe(destination *destinationServe) *cobra.Command { grpc.MaxSendMsgSize(servers.MaxMsgSize), ) pb.RegisterDestinationServer(s, &servers.DestinationServer{ - Plugin: destination.plugin, + Plugin: serve.plugin, Logger: logger, }) - version := destination.plugin.Version() + version := serve.plugin.Version() - if destination.sentryDSN != "" && !strings.EqualFold(version, "development") && !noSentry { + if serve.sentryDSN != "" && !strings.EqualFold(version, "development") && !noSentry { err = sentry.Init(sentry.ClientOptions{ - Dsn: destination.sentryDSN, + Dsn: serve.sentryDSN, Debug: false, AttachStacktrace: false, Release: version, @@ -175,12 +175,12 @@ func newCmdDestinationServe(destination *destinationServe) *cobra.Command { return cmd } -func newCmdDestinationRoot(destination *destinationServe) *cobra.Command { +func newCmdDestinationRoot(serve *destinationServe) *cobra.Command { cmd := &cobra.Command{ - Use: fmt.Sprintf("%s ", destination.plugin.Name()), + Use: fmt.Sprintf("%s ", serve.plugin.Name()), } - cmd.AddCommand(newCmdDestinationServe(destination)) + cmd.AddCommand(newCmdDestinationServe(serve)) cmd.CompletionOptions.DisableDefaultCmd = true - cmd.Version = destination.plugin.Version() + cmd.Version = serve.plugin.Version() return cmd } diff --git a/serve/destination_test.go b/serve/destination_test.go index 84e9fd573f..23c1200215 100644 --- a/serve/destination_test.go +++ b/serve/destination_test.go @@ -10,7 +10,7 @@ import ( "github.com/cloudquery/plugin-sdk/clients" "github.com/cloudquery/plugin-sdk/internal/testdata" - "github.com/cloudquery/plugin-sdk/plugins" + "github.com/cloudquery/plugin-sdk/plugins/destination" "github.com/cloudquery/plugin-sdk/schema" "github.com/cloudquery/plugin-sdk/specs" "google.golang.org/grpc" @@ -24,7 +24,7 @@ func bufDestinationDialer(context.Context, string) (net.Conn, error) { } func TestDestination(t *testing.T) { - plugin := plugins.NewDestinationPlugin("testDestinationPlugin", "development", plugins.NewTestDestinationMemDBClient) + plugin := destination.NewDestinationPlugin("testDestinationPlugin", "development", destination.NewTestDestinationMemDBClient) s := &destinationServe{ plugin: plugin, } diff --git a/serve/source.go b/serve/source.go index a8eb2f7707..d7ef87aac8 100644 --- a/serve/source.go +++ b/serve/source.go @@ -10,7 +10,7 @@ import ( "github.com/cloudquery/plugin-sdk/internal/pb" "github.com/cloudquery/plugin-sdk/internal/servers" - "github.com/cloudquery/plugin-sdk/plugins" + "github.com/cloudquery/plugin-sdk/plugins/source" "github.com/getsentry/sentry-go" grpczerolog "github.com/grpc-ecosystem/go-grpc-middleware/providers/zerolog/v2" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" @@ -24,7 +24,7 @@ import ( ) type sourceServe struct { - plugin *plugins.SourcePlugin + plugin *source.Plugin sentryDSN string } @@ -42,7 +42,7 @@ var testSourceListenerLock sync.Mutex const serveSourceShort = `Start source plugin server` -func Source(plugin *plugins.SourcePlugin, opts ...SourceOption) { +func Source(plugin *source.Plugin, opts ...SourceOption) { s := &sourceServe{ plugin: plugin, } @@ -57,7 +57,7 @@ func Source(plugin *plugins.SourcePlugin, opts ...SourceOption) { } // nolint:dupl -func newCmdSourceServe(source *sourceServe) *cobra.Command { +func newCmdSourceServe(serve *sourceServe) *cobra.Command { var address string var network string var noSentry bool @@ -114,16 +114,16 @@ func newCmdSourceServe(source *sourceServe) *cobra.Command { grpc.MaxRecvMsgSize(servers.MaxMsgSize), grpc.MaxSendMsgSize(servers.MaxMsgSize), ) - source.plugin.SetLogger(logger) + serve.plugin.SetLogger(logger) pb.RegisterSourceServer(s, &servers.SourceServer{ - Plugin: source.plugin, + Plugin: serve.plugin, Logger: logger, }) - version := source.plugin.Version() + version := serve.plugin.Version() - if source.sentryDSN != "" && !strings.EqualFold(version, "development") && !noSentry { + if serve.sentryDSN != "" && !strings.EqualFold(version, "development") && !noSentry { err = sentry.Init(sentry.ClientOptions{ - Dsn: source.sentryDSN, + Dsn: serve.sentryDSN, Debug: false, AttachStacktrace: false, Release: version, @@ -198,7 +198,7 @@ doc --format json . ` ) -func newCmdSourceDoc(source *sourceServe) *cobra.Command { +func newCmdSourceDoc(serve *sourceServe) *cobra.Command { format := newEnum([]string{"json", "markdown"}, "markdown") cmd := &cobra.Command{ Use: "doc ", @@ -206,20 +206,20 @@ func newCmdSourceDoc(source *sourceServe) *cobra.Command { Long: sourceDocLong, Args: cobra.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) error { - return source.plugin.GenerateSourcePluginDocs(args[0], format.Value) + return serve.plugin.GeneratePluginDocs(args[0], format.Value) }, } cmd.Flags().Var(format, "format", fmt.Sprintf("output format. one of: %s", strings.Join(format.Allowed, ","))) return cmd } -func newCmdSourceRoot(source *sourceServe) *cobra.Command { +func newCmdSourceRoot(serve *sourceServe) *cobra.Command { cmd := &cobra.Command{ - Use: fmt.Sprintf("%s ", source.plugin.Name()), + Use: fmt.Sprintf("%s ", serve.plugin.Name()), } - cmd.AddCommand(newCmdSourceServe(source)) - cmd.AddCommand(newCmdSourceDoc(source)) + cmd.AddCommand(newCmdSourceServe(serve)) + cmd.AddCommand(newCmdSourceDoc(serve)) cmd.CompletionOptions.DisableDefaultCmd = true - cmd.Version = source.plugin.Version() + cmd.Version = serve.plugin.Version() return cmd } diff --git a/serve/source_test.go b/serve/source_test.go index 818f186942..1f688dde39 100644 --- a/serve/source_test.go +++ b/serve/source_test.go @@ -10,7 +10,7 @@ import ( "time" "github.com/cloudquery/plugin-sdk/clients" - "github.com/cloudquery/plugin-sdk/plugins" + "github.com/cloudquery/plugin-sdk/plugins/source" "github.com/cloudquery/plugin-sdk/schema" "github.com/cloudquery/plugin-sdk/specs" "github.com/rs/zerolog" @@ -76,8 +76,8 @@ func bufSourceDialer(context.Context, string) (net.Conn, error) { } func TestSourceSuccess(t *testing.T) { - plugin := plugins.NewSourcePlugin( - "testSourcePlugin", + plugin := source.NewPlugin( + "testPlugin", "v1.0.0", []*schema.Table{testTable("test_table"), testTable("test_table2")}, newTestExecutionClient) @@ -129,8 +129,8 @@ func TestSourceSuccess(t *testing.T) { if err != nil { t.Fatal(err) } - if name != "testSourcePlugin" { - t.Fatalf("expected name to be testSourcePlugin but got %s", name) + if name != "testPlugin" { + t.Fatalf("expected name to be testPlugin but got %s", name) } version, err := c.Version(ctx) @@ -233,7 +233,7 @@ func TestSourceSuccess(t *testing.T) { const testSourceFailExpectedErr = "failed to fetch resources from stream: rpc error: code = Unknown desc = failed to sync resources: failed to create execution client for source plugin testSourcePlugin: error in newTestExecutionClientErr" func TestSourceFail(t *testing.T) { - plugin := plugins.NewSourcePlugin( + plugin := source.NewPlugin( "testSourcePlugin", "v1.0.0", []*schema.Table{testTable("test_table")},