diff --git a/cli/cmd/clients.go b/cli/cmd/destinations_v0.go similarity index 65% rename from cli/cmd/clients.go rename to cli/cmd/destinations_v0.go index a3cc221afd6178..b3b65d058e4100 100644 --- a/cli/cmd/clients.go +++ b/cli/cmd/destinations_v0.go @@ -3,14 +3,15 @@ package cmd import ( "context" "fmt" - "github.com/cloudquery/plugin-sdk/clients" + + destination "github.com/cloudquery/plugin-sdk/clients/destination/v0" "github.com/cloudquery/plugin-sdk/specs" "github.com/rs/zerolog/log" ) -type destinationClients []*clients.DestinationClient +type destinationClients []*destination.Client -func newDestinationClients(ctx context.Context, sourceSpec specs.Source, destinationsSpecs []specs.Destination, cqDir string) (destinationClients, error) { +func newDestinationClientsV0(ctx context.Context, sourceSpec specs.Source, destinationsSpecs []specs.Destination, cqDir string) (destinationClients, error) { var err error destClients := make(destinationClients, len(sourceSpec.Destinations)) defer func() { @@ -22,14 +23,14 @@ func newDestinationClients(ctx context.Context, sourceSpec specs.Source, destina }() for i, destinationSpec := range destinationsSpecs { - opts := []clients.DestinationClientOption{ - clients.WithDestinationLogger(log.Logger), - clients.WithDestinationDirectory(cqDir), + opts := []destination.ClientOption{ + destination.WithLogger(log.Logger), + destination.WithDirectory(cqDir), } if disableSentry { - opts = append(opts, clients.WithDestinationNoSentry()) + opts = append(opts, destination.WithNoSentry()) } - destClients[i], err = clients.NewDestinationClient(ctx, destinationSpec.Registry, destinationSpec.Path, destinationSpec.Version, opts...) + destClients[i], err = destination.NewClient(ctx, destinationSpec.Registry, destinationSpec.Path, destinationSpec.Version, opts...) if err != nil { return nil, fmt.Errorf("failed to create destination plugin client for %s: %w", destinationSpec.Name, err) } diff --git a/cli/cmd/errors.go b/cli/cmd/errors.go new file mode 100644 index 00000000000000..c1e3d8c577d4d9 --- /dev/null +++ b/cli/cmd/errors.go @@ -0,0 +1,22 @@ +package cmd + +import ( + "errors" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// isUnimplemented returns true if an error indicates that the underlying grpc call +// was unimplemented on the server side. +func isUnimplemented(err error) bool { + if err == nil { + return false + } + st, ok := status.FromError(err) + if ok && st.Code() == codes.Unimplemented { + return true + } + err = errors.Unwrap(err) + return isUnimplemented(err) +} diff --git a/cli/cmd/migrate.go b/cli/cmd/migrate.go index 34e6c43e68329e..5c5a79e0b7204b 100644 --- a/cli/cmd/migrate.go +++ b/cli/cmd/migrate.go @@ -1,15 +1,15 @@ package cmd import ( - "context" "fmt" "strings" - "time" - "github.com/cloudquery/plugin-sdk/clients" + discovery "github.com/cloudquery/plugin-sdk/clients/discovery/v0" + "github.com/cloudquery/plugin-sdk/registry" "github.com/cloudquery/plugin-sdk/specs" "github.com/rs/zerolog/log" "github.com/spf13/cobra" + "golang.org/x/exp/slices" ) const ( @@ -59,74 +59,42 @@ func migrate(cmd *cobra.Command, args []string) error { } destinationsSpecs = append(destinationsSpecs, *spec) } - if err := migrateConnection(ctx, cqDir, *sourceSpec, destinationsSpecs); err != nil { - return fmt.Errorf("failed to migrate source %s: %w", sourceSpec.Name, err) + discoveryClient, err := discovery.NewClient(ctx, sourceSpec.Registry, registry.PluginTypeSource, sourceSpec.Path, sourceSpec.Version) + if err != nil { + return fmt.Errorf("failed to create discovery client for source %s: %w", sourceSpec.Name, err) } - } - - return nil -} - -func migrateConnection(ctx context.Context, cqDir string, sourceSpec specs.Source, destinationsSpecs []specs.Destination) error { - destinationNames := make([]string, len(destinationsSpecs)) - for i := range destinationsSpecs { - destinationNames[i] = destinationsSpecs[i].Name - } - migrateTime := time.Now().UTC() - - log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationNames).Time("migrate_time", migrateTime).Msg("Start migration") - defer log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationNames).Time("migrate_time", migrateTime).Msg("End migration") - - sourceClient, err := clients.NewSourceClient(ctx, sourceSpec.Registry, sourceSpec.Path, sourceSpec.Version, - clients.WithSourceLogger(log.Logger), - clients.WithSourceDirectory(cqDir), - ) - if err != nil { - return fmt.Errorf("failed to get source plugin client for %s: %w", sourceSpec.Name, err) - } - defer func() { - if err := sourceClient.Terminate(); err != nil { - log.Error().Err(err).Msg("Failed to terminate source client") - fmt.Println("failed to terminate source client:", err) + versions, err := discoveryClient.GetVersions(ctx) + if err != nil { + if discoveryErr := discoveryClient.Terminate(); err != nil { + log.Error().Err(discoveryErr).Msg("failed to terminate discovery client") + fmt.Println("failed to terminate discovery client:", discoveryErr) + } + if err := migrateConnectionV0(ctx, cqDir, *sourceSpec, destinationsSpecs); err != nil { + return fmt.Errorf("failed to migrate source %s: %w", sourceSpec.Name, err) + } + return nil } - }() - destClients, err := newDestinationClients(ctx, sourceSpec, destinationsSpecs, cqDir) - if err != nil { - return err - } - defer destClients.Close() - - selectedTables, tablesForSpecSupported, err := getTablesForSpec(ctx, sourceClient, sourceSpec) - if err != nil { - return fmt.Errorf("failed to get tables for source %s: %w", sourceSpec.Name, err) - } - tableCount := len(selectedTables.FlattenTables()) - - // Print a count of the tables that will be migrated. - if tablesForSpecSupported { - word := "tables" - if tableCount == 1 { - word = "table" + if err := discoveryClient.Terminate(); err != nil { + return fmt.Errorf("failed to terminate discovery client: %w", err) } - fmt.Printf("Source %s will migrate %d %s.\n", sourceSpec.Name, tableCount, word) - } - fmt.Println("Starting migration for:", sourceSpec.Name, "->", sourceSpec.Destinations) - log.Info().Str("source", sourceSpec.Name).Strs("destinations", sourceSpec.Destinations).Msg("Starting migration") + if slices.Index(versions, "v1") != -1 { + if err := migrateConnectionV1(ctx, cqDir, *sourceSpec, destinationsSpecs); err != nil { + return fmt.Errorf("failed to migrate source %s: %w", sourceSpec.Name, err) + } + return nil + } - for i, destinationSpec := range destinationsSpecs { - if err := destClients[i].Migrate(ctx, selectedTables); err != nil { - return fmt.Errorf("failed to migrate source %s on destination %s : %w", sourceSpec.Name, destinationSpec.Name, err) + if slices.Index(versions, "v0") != -1 { + if err := migrateConnectionV0(ctx, cqDir, *sourceSpec, destinationsSpecs); err != nil { + return fmt.Errorf("failed to migrate source %s: %w", sourceSpec.Name, err) + } + return nil } + + return fmt.Errorf("failed to migrate source %s, unknown versions %v", sourceSpec.Name, versions) } - tt := time.Since(migrateTime) - fmt.Printf("Migration completed successfully.\n") - log.Info().Str("source", sourceSpec.Name). - Strs("destinations", sourceSpec.Destinations). - Int("num_tables", tableCount). - Float64("time_took", tt.Seconds()). - Msg("Migration completed successfully") return nil } diff --git a/cli/cmd/migrate_v0.go b/cli/cmd/migrate_v0.go new file mode 100644 index 00000000000000..2b2bdfc6de3aca --- /dev/null +++ b/cli/cmd/migrate_v0.go @@ -0,0 +1,75 @@ +package cmd + +import ( + "context" + "fmt" + "time" + + source "github.com/cloudquery/plugin-sdk/clients/source/v0" + "github.com/cloudquery/plugin-sdk/specs" + "github.com/rs/zerolog/log" +) + +func migrateConnectionV0(ctx context.Context, cqDir string, sourceSpec specs.Source, destinationsSpecs []specs.Destination) error { + destinationNames := make([]string, len(destinationsSpecs)) + for i := range destinationsSpecs { + destinationNames[i] = destinationsSpecs[i].Name + } + migrateTime := time.Now().UTC() + + log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationNames).Time("migrate_time", migrateTime).Msg("Start migration") + defer log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationNames).Time("migrate_time", migrateTime).Msg("End migration") + + sourceClient, err := source.NewClient(ctx, sourceSpec.Registry, sourceSpec.Path, sourceSpec.Version, + source.WithLogger(log.Logger), + source.WithDirectory(cqDir), + ) + if err != nil { + return fmt.Errorf("failed to get source plugin client for %s: %w", sourceSpec.Name, err) + } + defer func() { + if err := sourceClient.Terminate(); err != nil { + log.Error().Err(err).Msg("Failed to terminate source client") + fmt.Println("failed to terminate source client:", err) + } + }() + + destClients, err := newDestinationClientsV0(ctx, sourceSpec, destinationsSpecs, cqDir) + if err != nil { + return err + } + defer destClients.Close() + + selectedTables, tablesForSpecSupported, err := getTablesForSpec(ctx, sourceClient, sourceSpec) + if err != nil { + return fmt.Errorf("failed to get tables for source %s: %w", sourceSpec.Name, err) + } + tableCount := len(selectedTables.FlattenTables()) + + // Print a count of the tables that will be migrated. + if tablesForSpecSupported { + word := "tables" + if tableCount == 1 { + word = "table" + } + fmt.Printf("Source %s will migrate %d %s.\n", sourceSpec.Name, tableCount, word) + } + + fmt.Println("Starting migration for:", sourceSpec.Name, "->", sourceSpec.Destinations) + log.Info().Str("source", sourceSpec.Name).Strs("destinations", sourceSpec.Destinations).Msg("Starting migration") + + for i, destinationSpec := range destinationsSpecs { + if err := destClients[i].Migrate(ctx, selectedTables); err != nil { + return fmt.Errorf("failed to migrate source %s on destination %s : %w", sourceSpec.Name, destinationSpec.Name, err) + } + } + tt := time.Since(migrateTime) + fmt.Printf("Migration completed successfully.\n") + log.Info().Str("source", sourceSpec.Name). + Strs("destinations", sourceSpec.Destinations). + Int("num_tables", tableCount). + Float64("time_took", tt.Seconds()). + Msg("Migration completed successfully") + + return nil +} diff --git a/cli/cmd/migrate_v1.go b/cli/cmd/migrate_v1.go new file mode 100644 index 00000000000000..60b80db4196c6c --- /dev/null +++ b/cli/cmd/migrate_v1.go @@ -0,0 +1,70 @@ +package cmd + +import ( + "context" + "fmt" + "time" + + source "github.com/cloudquery/plugin-sdk/clients/source/v1" + "github.com/cloudquery/plugin-sdk/specs" + "github.com/rs/zerolog/log" +) + +func migrateConnectionV1(ctx context.Context, cqDir string, sourceSpec specs.Source, destinationsSpecs []specs.Destination) error { + destinationNames := make([]string, len(destinationsSpecs)) + for i := range destinationsSpecs { + destinationNames[i] = destinationsSpecs[i].Name + } + migrateTime := time.Now().UTC() + + log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationNames).Time("migrate_time", migrateTime).Msg("Start migration") + defer log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationNames).Time("migrate_time", migrateTime).Msg("End migration") + + sourceClient, err := source.NewClient(ctx, sourceSpec.Registry, sourceSpec.Path, sourceSpec.Version, + source.WithLogger(log.Logger), + source.WithDirectory(cqDir), + ) + if err != nil { + return fmt.Errorf("failed to get source plugin client for %s: %w", sourceSpec.Name, err) + } + defer func() { + if err := sourceClient.Terminate(); err != nil { + log.Error().Err(err).Msg("Failed to terminate source client") + fmt.Println("failed to terminate source client:", err) + } + }() + + destClients, err := newDestinationClientsV0(ctx, sourceSpec, destinationsSpecs, cqDir) + if err != nil { + return err + } + defer destClients.Close() + + if err := sourceClient.Init(ctx, sourceSpec); err != nil { + return fmt.Errorf("failed to init source %s: %w", sourceSpec.Name, err) + } + + tables, err := sourceClient.GetDynamicTables(ctx) + if err != nil { + return fmt.Errorf("failed to get dynamic tables for source %s: %w", sourceSpec.Name, err) + } + + tableCount := len(tables.FlattenTables()) + fmt.Println("Starting migration for:", sourceSpec.Name, "->", sourceSpec.Destinations, "with", tableCount, "tables") + log.Info().Str("source", sourceSpec.Name).Strs("destinations", sourceSpec.Destinations).Msg("Starting migration") + + for i, destinationSpec := range destinationsSpecs { + if err := destClients[i].Migrate(ctx, tables); err != nil { + return fmt.Errorf("failed to migrate source %s on destination %s : %w", sourceSpec.Name, destinationSpec.Name, err) + } + } + tt := time.Since(migrateTime) + fmt.Printf("Migration completed successfully.\n") + log.Info().Str("source", sourceSpec.Name). + Strs("destinations", sourceSpec.Destinations). + Int("num_tables", tableCount). + Float64("time_took", tt.Seconds()). + Msg("Migration completed successfully") + + return nil +} diff --git a/cli/cmd/sync.go b/cli/cmd/sync.go index e6274b2c1a90cd..6bca52d0ce0615 100644 --- a/cli/cmd/sync.go +++ b/cli/cmd/sync.go @@ -1,19 +1,17 @@ package cmd import ( - "context" "fmt" "strings" - "time" "github.com/google/uuid" + "golang.org/x/exp/slices" - "github.com/cloudquery/plugin-sdk/clients" + discovery "github.com/cloudquery/plugin-sdk/clients/discovery/v0" + "github.com/cloudquery/plugin-sdk/registry" "github.com/cloudquery/plugin-sdk/specs" "github.com/rs/zerolog/log" - "github.com/schollz/progressbar/v3" "github.com/spf13/cobra" - "golang.org/x/sync/errgroup" ) const ( @@ -75,189 +73,43 @@ func sync(cmd *cobra.Command, args []string) error { } destinationsSpecs = append(destinationsSpecs, *spec) } - opts := []clients.SourceClientOption{ - clients.WithSourceLogger(log.Logger), - clients.WithSourceDirectory(cqDir), - } - if disableSentry { - opts = append(opts, clients.WithSourceNoSentry()) - } - sourceClient, err := clients.NewSourceClient(ctx, sourceSpec.Registry, sourceSpec.Path, sourceSpec.Version, opts...) + discoveryClient, err := discovery.NewClient(ctx, sourceSpec.Registry, registry.PluginTypeSource, sourceSpec.Path, sourceSpec.Version) if err != nil { - return fmt.Errorf("failed to get source plugin client for %s: %w", sourceSpec.Name, err) + return fmt.Errorf("failed to create discovery client for source %s: %w", sourceSpec.Name, err) } - //nolint:revive - defer func() { - if err := sourceClient.Terminate(); err != nil { - log.Error().Err(err).Msg("Failed to terminate source client") - fmt.Println("failed to terminate source client: ", err) - } - }() - v, err := sourceClient.GetProtocolVersion(ctx) + versions, err := discoveryClient.GetVersions(ctx) if err != nil { - return fmt.Errorf("failed to get protocol version for source %s: %w", sourceSpec.Name, err) - } - switch v { - case 1: - if err := syncConnectionV1(ctx, cqDir, sourceClient, *sourceSpec, destinationsSpecs, invocationUUID.String(), noMigrate); err != nil { - return err + if discoveryErr := discoveryClient.Terminate(); err != nil { + log.Error().Err(discoveryErr).Msg("failed to terminate discovery client") + fmt.Println("failed to terminate discovery client:", discoveryErr) } - case 2: - if err := syncConnectionV2(ctx, cqDir, sourceClient, *sourceSpec, destinationsSpecs, invocationUUID.String(), noMigrate); err != nil { - return err + // If we get an error here, we assume that the plugin is not a v1 plugin and we try to sync it as a v0 plugin + if err := syncConnectionV0(ctx, cqDir, *sourceSpec, destinationsSpecs, invocationUUID.String(), noMigrate); err != nil { + return fmt.Errorf("failed to sync source %s: %w", sourceSpec.Name, err) } - default: - return fmt.Errorf("unknown protocol version %d for source %s", v, sourceSpec.Name) - } - } - - return nil -} - -func syncConnectionV1(ctx context.Context, cqDir string, sourceClient *clients.SourceClient, sourceSpec specs.Source, destinationsSpecs []specs.Destination, uid string, noMigrate bool) error { - var err error - destinationNames := make([]string, len(destinationsSpecs)) - for i := range destinationsSpecs { - destinationNames[i] = destinationsSpecs[i].Name - } - syncTime := time.Now().UTC() - - log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationNames).Time("sync_time", syncTime).Msg("Start sync") - defer log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationNames).Time("sync_time", syncTime).Msg("End sync") - - sourceClient, err = clients.NewSourceClient(ctx, sourceSpec.Registry, sourceSpec.Path, sourceSpec.Version, - clients.WithSourceLogger(log.Logger), - clients.WithSourceDirectory(cqDir), - ) - if err != nil { - return fmt.Errorf("failed to get source plugin client for %s: %w", sourceSpec.Name, err) - } - defer func() { - if err := sourceClient.Terminate(); err != nil { - log.Error().Err(err).Msg("Failed to terminate source client") - fmt.Println("failed to terminate source client: ", err) - } - }() - - destClients, err := newDestinationClients(ctx, sourceSpec, destinationsSpecs, cqDir) - if err != nil { - return err - } - defer destClients.Close() - - if !noMigrate { - fmt.Println("Starting migration for:", sourceSpec.Name, "->", sourceSpec.Destinations) - log.Info().Str("source", sourceSpec.Name).Strs("destinations", sourceSpec.Destinations).Msg("Start migration") - tables, err := sourceClient.GetTables(ctx) - if err != nil { - return fmt.Errorf("failed to get tables for source %s: %w", sourceSpec.Name, err) + return nil } - for i, destinationSpec := range destinationsSpecs { - if err := destClients[i].Migrate(ctx, tables); err != nil { - return fmt.Errorf("failed to migrate source %s on destination %s : %w", sourceSpec.Name, destinationSpec.Name, err) - } + if err := discoveryClient.Terminate(); err != nil { + return fmt.Errorf("failed to terminate discovery client: %w", err) } - migrateTimeTook := time.Since(syncTime) - fmt.Printf("Migration completed successfully.\n") - log.Info(). - Str("source", sourceSpec.Name). - Strs("destinations", sourceSpec.Destinations). - Int("num_tables", len(tables)). - Float64("time_took", migrateTimeTook.Seconds()). - Msg("End migration") - } - resources := make(chan []byte) - g, gctx := errgroup.WithContext(ctx) - log.Info().Str("source", sourceSpec.Name).Strs("destinations", sourceSpec.Destinations).Msg("Start fetching resources") - fmt.Println("Starting sync for:", sourceSpec.Name, "->", sourceSpec.Destinations) - g.Go(func() error { - defer close(resources) - if err := sourceClient.Sync(gctx, sourceSpec, resources); err != nil { - if isUnknownConcurrencyFieldError(err) { - return fmt.Errorf("unsupported version of source %s@%s. Please update to the latest version from https://cloudquery.io/docs/plugins/sources", sourceSpec.Name, sourceSpec.Version) + if slices.Index(versions, "v1") != -1 { + if err := syncConnectionV1(ctx, cqDir, *sourceSpec, destinationsSpecs, invocationUUID.String(), noMigrate); err != nil { + return fmt.Errorf("failed to sync v1 source %s: %w", sourceSpec.Name, err) } - return fmt.Errorf("failed to sync source %s: %w", sourceSpec.Name, err) + return nil } - return nil - }) - bar := progressbar.NewOptions(-1, - progressbar.OptionSetDescription("Syncing resources..."), - progressbar.OptionSetItsString("resources"), - progressbar.OptionShowIts(), - progressbar.OptionSetElapsedTime(true), - progressbar.OptionShowCount(), - progressbar.OptionClearOnFinish(), - ) - failedWrites := uint64(0) - totalResources := uint64(0) - destSubscriptions := make([]chan []byte, len(sourceSpec.Destinations)) - for i := range destSubscriptions { - destSubscriptions[i] = make(chan []byte) - } - for i, destination := range sourceSpec.Destinations { - i := i - destination := destination - g.Go(func() error { - var destFailedWrites uint64 - var err error - if destFailedWrites, err = destClients[i].Write(gctx, sourceSpec.Name, syncTime, destSubscriptions[i]); err != nil { - return fmt.Errorf("failed to write for %s->%s: %w", sourceSpec.Name, destination, err) - } - if err := destClients[i].Close(ctx); err != nil { - return fmt.Errorf("failed to close destination client for %s->%s: %w", sourceSpec.Name, destination, err) + if slices.Index(versions, "v0") != -1 { + if err := syncConnectionV1(ctx, cqDir, *sourceSpec, destinationsSpecs, invocationUUID.String(), noMigrate); err != nil { + return fmt.Errorf("failed to sync v0 source %s: %w", sourceSpec.Name, err) } - failedWrites += destFailedWrites return nil - }) - } - - g.Go(func() error { - for resource := range resources { - totalResources++ - _ = bar.Add(1) - for i := range destSubscriptions { - select { - case <-gctx.Done(): - return gctx.Err() - case destSubscriptions[i] <- resource: - } - } - } - for i := range destSubscriptions { - close(destSubscriptions[i]) } - return nil - }) - if err := g.Wait(); err != nil { - _ = bar.Finish() - return err - } - summary, err := sourceClient.GetSyncSummary(ctx) - if err != nil { - return fmt.Errorf("failed to get sync summary: %w", err) + return fmt.Errorf("failed to sync source %s, unknown versions %v", sourceSpec.Name, versions) } - _ = bar.Finish() - syncTimeTook := time.Since(syncTime) - fmt.Println("Sync completed successfully.") - fmt.Printf("Summary: resources: %d, errors: %d, panic: %d, failed_writes: %d, time: %s\n", summary.Resources, summary.Errors, summary.Panics, failedWrites, syncTimeTook.Truncate(time.Second).String()) - log.Info().Str("source", sourceSpec.Name).Strs("destinations", sourceSpec.Destinations). - Uint64("resources", totalResources).Uint64("errors", summary.Errors).Uint64("panic", summary.Panics).Uint64("failed_writes", failedWrites).Float64("time_took", syncTimeTook.Seconds()).Msg("Sync completed successfully") - - // Send analytics, if activated. We only send if the source plugin registry is GitHub, mostly to avoid sending data from development machines. - if analyticsClient != nil && sourceSpec.Registry == specs.RegistryGithub { - log.Info().Msg("Sending sync summary to " + analyticsHost) - if err := analyticsClient.SendSyncSummary(ctx, sourceSpec, destinationsSpecs, uid, *summary); err != nil { - log.Warn().Err(err).Msg("Failed to send sync summary") - } - } return nil } - -func isUnknownConcurrencyFieldError(err error) bool { - return strings.Contains(err.Error(), unknownFieldErrorPrefix+`"table_concurrency"`) || strings.Contains(err.Error(), unknownFieldErrorPrefix+`"resource_concurrency"`) -} diff --git a/cli/cmd/sync_v0.go b/cli/cmd/sync_v0.go new file mode 100644 index 00000000000000..8b906ce82c0e6e --- /dev/null +++ b/cli/cmd/sync_v0.go @@ -0,0 +1,49 @@ +package cmd + +import ( + "context" + "fmt" + + "github.com/cloudquery/plugin-sdk/clients/source/v0" + "github.com/cloudquery/plugin-sdk/specs" + "github.com/rs/zerolog/log" +) + +func syncConnectionV0(ctx context.Context, cqDir string, sourceSpec specs.Source, destinationsSpecs []specs.Destination, uid string, noMigrate bool) error { + opts := []source.ClientOption{ + source.WithLogger(log.Logger), + source.WithDirectory(cqDir), + } + if disableSentry { + opts = append(opts, source.WithNoSentry()) + } + sourceClient, err := source.NewClient(ctx, sourceSpec.Registry, sourceSpec.Path, sourceSpec.Version, opts...) + if err != nil { + return fmt.Errorf("failed to get source plugin client for %s: %w", sourceSpec.Name, err) + } + //nolint:revive + defer func() { + if err := sourceClient.Terminate(); err != nil { + log.Error().Err(err).Msg("Failed to terminate source client") + fmt.Println("failed to terminate source client: ", err) + } + }() + + v, err := sourceClient.GetProtocolVersion(ctx) + if err != nil { + return fmt.Errorf("failed to get protocol version for source %s: %w", sourceSpec.Name, err) + } + switch v { + case 1: + if err := syncConnectionV0_1(ctx, cqDir, sourceClient, sourceSpec, destinationsSpecs, uid, noMigrate); err != nil { + return err + } + case 2: + if err := syncConnectionV0_2(ctx, cqDir, sourceClient, sourceSpec, destinationsSpecs, uid, noMigrate); err != nil { + return err + } + default: + return fmt.Errorf("unknown protocol version %d for source %s", v, sourceSpec.Name) + } + return nil +} diff --git a/cli/cmd/sync_v0_1.go b/cli/cmd/sync_v0_1.go new file mode 100644 index 00000000000000..e60af1a220e0b4 --- /dev/null +++ b/cli/cmd/sync_v0_1.go @@ -0,0 +1,161 @@ +package cmd + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/cloudquery/plugin-sdk/clients/source/v0" + "github.com/cloudquery/plugin-sdk/specs" + "github.com/rs/zerolog/log" + "github.com/schollz/progressbar/v3" + "golang.org/x/sync/errgroup" +) + +func syncConnectionV0_1(ctx context.Context, cqDir string, sourceClient *source.Client, sourceSpec specs.Source, destinationsSpecs []specs.Destination, uid string, noMigrate bool) error { + var err error + destinationNames := make([]string, len(destinationsSpecs)) + for i := range destinationsSpecs { + destinationNames[i] = destinationsSpecs[i].Name + } + syncTime := time.Now().UTC() + + log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationNames).Time("sync_time", syncTime).Msg("Start sync") + defer log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationNames).Time("sync_time", syncTime).Msg("End sync") + + sourceClient, err = source.NewClient(ctx, sourceSpec.Registry, sourceSpec.Path, sourceSpec.Version, + source.WithLogger(log.Logger), + source.WithDirectory(cqDir), + ) + if err != nil { + return fmt.Errorf("failed to get source plugin client for %s: %w", sourceSpec.Name, err) + } + defer func() { + if err := sourceClient.Terminate(); err != nil { + log.Error().Err(err).Msg("Failed to terminate source client") + fmt.Println("failed to terminate source client: ", err) + } + }() + + destClients, err := newDestinationClientsV0(ctx, sourceSpec, destinationsSpecs, cqDir) + if err != nil { + return err + } + defer destClients.Close() + + if !noMigrate { + fmt.Println("Starting migration for:", sourceSpec.Name, "->", sourceSpec.Destinations) + log.Info().Str("source", sourceSpec.Name).Strs("destinations", sourceSpec.Destinations).Msg("Start migration") + tables, err := sourceClient.GetTables(ctx) + if err != nil { + return fmt.Errorf("failed to get tables for source %s: %w", sourceSpec.Name, err) + } + for i, destinationSpec := range destinationsSpecs { + if err := destClients[i].Migrate(ctx, tables); err != nil { + return fmt.Errorf("failed to migrate source %s on destination %s : %w", sourceSpec.Name, destinationSpec.Name, err) + } + } + migrateTimeTook := time.Since(syncTime) + fmt.Printf("Migration completed successfully.\n") + log.Info(). + Str("source", sourceSpec.Name). + Strs("destinations", sourceSpec.Destinations). + Int("num_tables", len(tables)). + Float64("time_took", migrateTimeTook.Seconds()). + Msg("End migration") + } + + resources := make(chan []byte) + g, gctx := errgroup.WithContext(ctx) + log.Info().Str("source", sourceSpec.Name).Strs("destinations", sourceSpec.Destinations).Msg("Start fetching resources") + fmt.Println("Starting sync for:", sourceSpec.Name, "->", sourceSpec.Destinations) + g.Go(func() error { + defer close(resources) + if err := sourceClient.Sync(gctx, sourceSpec, resources); err != nil { + if isUnknownConcurrencyFieldError(err) { + return fmt.Errorf("unsupported version of source %s@%s. Please update to the latest version from https://cloudquery.io/docs/plugins/sources", sourceSpec.Name, sourceSpec.Version) + } + return fmt.Errorf("failed to sync source %s: %w", sourceSpec.Name, err) + } + return nil + }) + + bar := progressbar.NewOptions(-1, + progressbar.OptionSetDescription("Syncing resources..."), + progressbar.OptionSetItsString("resources"), + progressbar.OptionShowIts(), + progressbar.OptionSetElapsedTime(true), + progressbar.OptionShowCount(), + progressbar.OptionClearOnFinish(), + ) + failedWrites := uint64(0) + totalResources := uint64(0) + destSubscriptions := make([]chan []byte, len(sourceSpec.Destinations)) + for i := range destSubscriptions { + destSubscriptions[i] = make(chan []byte) + } + for i, destination := range sourceSpec.Destinations { + i := i + destination := destination + g.Go(func() error { + var destFailedWrites uint64 + var err error + if destFailedWrites, err = destClients[i].Write(gctx, sourceSpec.Name, syncTime, destSubscriptions[i]); err != nil { + return fmt.Errorf("failed to write for %s->%s: %w", sourceSpec.Name, destination, err) + } + if err := destClients[i].Close(ctx); err != nil { + return fmt.Errorf("failed to close destination client for %s->%s: %w", sourceSpec.Name, destination, err) + } + failedWrites += destFailedWrites + return nil + }) + } + + g.Go(func() error { + for resource := range resources { + totalResources++ + _ = bar.Add(1) + for i := range destSubscriptions { + select { + case <-gctx.Done(): + return gctx.Err() + case destSubscriptions[i] <- resource: + } + } + } + for i := range destSubscriptions { + close(destSubscriptions[i]) + } + return nil + }) + + if err := g.Wait(); err != nil { + _ = bar.Finish() + return err + } + summary, err := sourceClient.GetSyncSummary(ctx) + if err != nil { + return fmt.Errorf("failed to get sync summary: %w", err) + } + _ = bar.Finish() + syncTimeTook := time.Since(syncTime) + + fmt.Println("Sync completed successfully.") + fmt.Printf("Summary: resources: %d, errors: %d, panic: %d, failed_writes: %d, time: %s\n", summary.Resources, summary.Errors, summary.Panics, failedWrites, syncTimeTook.Truncate(time.Second).String()) + log.Info().Str("source", sourceSpec.Name).Strs("destinations", sourceSpec.Destinations). + Uint64("resources", totalResources).Uint64("errors", summary.Errors).Uint64("panic", summary.Panics).Uint64("failed_writes", failedWrites).Float64("time_took", syncTimeTook.Seconds()).Msg("Sync completed successfully") + + // Send analytics, if activated. We only send if the source plugin registry is GitHub, mostly to avoid sending data from development machines. + if analyticsClient != nil && sourceSpec.Registry == specs.RegistryGithub { + log.Info().Msg("Sending sync summary to " + analyticsHost) + if err := analyticsClient.SendSyncSummary(ctx, sourceSpec, destinationsSpecs, uid, *summary); err != nil { + log.Warn().Err(err).Msg("Failed to send sync summary") + } + } + return nil +} + +func isUnknownConcurrencyFieldError(err error) bool { + return strings.Contains(err.Error(), unknownFieldErrorPrefix+`"table_concurrency"`) || strings.Contains(err.Error(), unknownFieldErrorPrefix+`"resource_concurrency"`) +} diff --git a/cli/cmd/sync2.go b/cli/cmd/sync_v0_2.go similarity index 93% rename from cli/cmd/sync2.go rename to cli/cmd/sync_v0_2.go index 160ddb9bbc6f87..61fda469449e81 100644 --- a/cli/cmd/sync2.go +++ b/cli/cmd/sync_v0_2.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - "github.com/cloudquery/plugin-sdk/clients" + "github.com/cloudquery/plugin-sdk/clients/source/v0" "github.com/cloudquery/plugin-sdk/schema" "github.com/cloudquery/plugin-sdk/specs" "github.com/rs/zerolog/log" @@ -13,7 +13,7 @@ import ( "golang.org/x/sync/errgroup" ) -func syncConnectionV2(ctx context.Context, cqDir string, sourceClient *clients.SourceClient, sourceSpec specs.Source, destinationsSpecs []specs.Destination, uid string, noMigrate bool) error { +func syncConnectionV0_2(ctx context.Context, cqDir string, sourceClient *source.Client, sourceSpec specs.Source, destinationsSpecs []specs.Destination, uid string, noMigrate bool) error { var err error destinationStrings := make([]string, len(destinationsSpecs)) for i := range destinationsSpecs { @@ -24,7 +24,7 @@ func syncConnectionV2(ctx context.Context, cqDir string, sourceClient *clients.S log.Info().Str("source", sourceSpec.VersionString()).Strs("destinations", destinationStrings).Time("sync_time", syncTime).Msg("Start sync") defer log.Info().Str("source", sourceSpec.VersionString()).Strs("destinations", destinationStrings).Time("sync_time", syncTime).Msg("End sync") - destClients, err := newDestinationClients(ctx, sourceSpec, destinationsSpecs, cqDir) + destClients, err := newDestinationClientsV0(ctx, sourceSpec, destinationsSpecs, cqDir) if err != nil { return err } @@ -176,9 +176,9 @@ func syncConnectionV2(ctx context.Context, cqDir string, sourceClient *clients.S // getTablesForSpec first tries the newer GetTablesForSpec call, but if it is not available, falls back to // GetTables. The returned `supported` value indicates whether GetTablesForSpec was supported by the server. -func getTablesForSpec(ctx context.Context, sourceClient *clients.SourceClient, sourceSpec specs.Source) (tables schema.Tables, supported bool, err error) { +func getTablesForSpec(ctx context.Context, sourceClient *source.Client, sourceSpec specs.Source) (tables schema.Tables, supported bool, err error) { tables, err = sourceClient.GetTablesForSpec(ctx, &sourceSpec) - if clients.IsUnimplemented(err) { + if isUnimplemented(err) { // the plugin server does not support GetTablesForSpec. Fall back to GetTables. tables, err = sourceClient.GetTables(ctx) return tables, false, err diff --git a/cli/cmd/sync_v1.go b/cli/cmd/sync_v1.go new file mode 100644 index 00000000000000..3824c3830e73a6 --- /dev/null +++ b/cli/cmd/sync_v1.go @@ -0,0 +1,183 @@ +package cmd + +import ( + "context" + "fmt" + "time" + + "github.com/cloudquery/plugin-sdk/clients/source/v1" + "github.com/cloudquery/plugin-sdk/specs" + "github.com/rs/zerolog/log" + "github.com/schollz/progressbar/v3" + "golang.org/x/sync/errgroup" +) + +func syncConnectionV1(ctx context.Context, cqDir string, sourceSpec specs.Source, destinationsSpecs []specs.Destination, uid string, noMigrate bool) error { + opts := []source.ClientOption{ + source.WithLogger(log.Logger), + source.WithDirectory(cqDir), + } + if disableSentry { + opts = append(opts, source.WithNoSentry()) + } + sourceClient, err := source.NewClient(ctx, sourceSpec.Registry, sourceSpec.Path, sourceSpec.Version, opts...) + if err != nil { + return fmt.Errorf("failed to get source plugin client for %s: %w", sourceSpec.Name, err) + } + //nolint:revive + defer func() { + if err := sourceClient.Terminate(); err != nil { + log.Error().Err(err).Msg("Failed to terminate source client") + fmt.Println("failed to terminate source client: ", err) + } + }() + + syncTime := time.Now().UTC() + destinationStrings := make([]string, len(destinationsSpecs)) + for i := range destinationsSpecs { + destinationStrings[i] = destinationsSpecs[i].VersionString() + } + + log.Info().Str("source", sourceSpec.VersionString()).Strs("destinations", destinationStrings).Time("sync_time", syncTime).Msg("Start sync") + defer log.Info().Str("source", sourceSpec.VersionString()).Strs("destinations", destinationStrings).Time("sync_time", syncTime).Msg("End sync") + + destClients, err := newDestinationClientsV0(ctx, sourceSpec, destinationsSpecs, cqDir) + if err != nil { + return err + } + defer destClients.Close() + + if err := sourceClient.Init(ctx, sourceSpec); err != nil { + return fmt.Errorf("failed to init source %s: %w", sourceSpec.VersionString(), err) + } + + tables, err := sourceClient.GetDynamicTables(ctx) + if err != nil { + return fmt.Errorf("failed to get dynamic tables for source %s: %w", sourceSpec.VersionString(), err) + } + + tableCount := len(tables.FlattenTables()) + + if !noMigrate { + fmt.Printf("Starting migration with %d tables for: %s -> %s\n", tableCount, sourceSpec.VersionString(), destinationStrings) + log.Info().Str("source", sourceSpec.VersionString()).Strs("destinations", destinationStrings).Msg("Start migration") + migrateStart := time.Now() + + for i, destinationSpec := range destinationsSpecs { + // Currently we migrate all tables, but this is subject to change once policies + // are adapted to handle non-existent tables in some way. + if err := destClients[i].Migrate(ctx, tables); err != nil { + return fmt.Errorf("failed to migrate source %s on destination %s : %w", sourceSpec.VersionString(), destinationSpec.VersionString(), err) + } + } + migrateTimeTook := time.Since(migrateStart) + fmt.Printf("Migration completed successfully.\n") + log.Info(). + Str("source", sourceSpec.VersionString()). + Strs("destinations", destinationStrings). + Int("num_tables", tableCount). + Float64("time_took", migrateTimeTook.Seconds()). + Msg("End migration") + } + + resources := make(chan []byte) + g, gctx := errgroup.WithContext(ctx) + log.Info().Str("source", sourceSpec.VersionString()).Strs("destinations", destinationStrings).Msg("Start fetching resources") + fmt.Printf("Starting sync for: %s -> %s\n", sourceSpec.VersionString(), destinationStrings) + g.Go(func() error { + defer close(resources) + if err := sourceClient.Sync(gctx, resources); err != nil { + if isUnknownConcurrencyFieldError(err) { + return fmt.Errorf("unsupported version of source %s. Please update to the latest version from https://cloudquery.io/docs/plugins/sources", sourceSpec.VersionString()) + } + return fmt.Errorf("failed to sync source %s: %w", sourceSpec.VersionString(), err) + } + return nil + }) + + destSubscriptions := make([]chan []byte, len(destinationsSpecs)) + for i := range destSubscriptions { + destSubscriptions[i] = make(chan []byte) + } + bar := progressbar.NewOptions(-1, + progressbar.OptionSetDescription("Syncing resources..."), + progressbar.OptionSetItsString("resources"), + progressbar.OptionShowIts(), + progressbar.OptionSetElapsedTime(true), + progressbar.OptionShowCount(), + progressbar.OptionClearOnFinish(), + ) + failedWrites := uint64(0) + totalResources := uint64(0) + for i, destination := range destinationsSpecs { + i := i + destination := destination + g.Go(func() error { + var destFailedWrites uint64 + var err error + if err = destClients[i].Write2(gctx, sourceSpec, tables, syncTime, destSubscriptions[i]); err != nil { + return fmt.Errorf("failed to write for %s -> %s: %w", sourceSpec.VersionString(), destination.VersionString(), err) + } + // call Close on destination client using the outer context, so that it happens even if writes get cancelled + if err := destClients[i].Close(ctx); err != nil { + return fmt.Errorf("failed to close destination client for %s -> %s: %w", sourceSpec.VersionString(), destination.VersionString(), err) + } + failedWrites += destFailedWrites + return nil + }) + } + + g.Go(func() error { + t := time.NewTicker(1 * time.Second) + defer func() { + for i := range destSubscriptions { + close(destSubscriptions[i]) + } + t.Stop() + }() + for { + select { + case resource, ok := <-resources: + if !ok { + return nil + } + totalResources++ + _ = bar.Add(1) + for i := range destSubscriptions { + select { + case <-gctx.Done(): + return gctx.Err() + case destSubscriptions[i] <- resource: + } + } + case <-t.C: + _ = bar.Add(0) + case <-gctx.Done(): + return nil + } + } + }) + + if err := g.Wait(); err != nil { + _ = bar.Finish() + return err + } + _ = bar.Finish() + syncTimeTook := time.Since(syncTime) + + metrics, err := sourceClient.GetMetrics(ctx) + if err != nil { + return fmt.Errorf("failed to get metrics for source %s: %w", sourceSpec.VersionString(), err) + } + + fmt.Printf("Sync completed successfully. Resources: %d, Errors: %d, Panics: %d, Time: %s\n", metrics.TotalResources(), metrics.TotalErrors(), metrics.TotalPanics(), syncTimeTook.Truncate(time.Second).String()) + + // Send analytics, if activated. We only send if the source plugin registry is GitHub, mostly to avoid sending data from development machines. + if analyticsClient != nil && sourceSpec.Registry == specs.RegistryGithub { + log.Info().Msg("Sending sync summary to " + analyticsHost) + if err := analyticsClient.SendSyncMetrics(ctx, sourceSpec, destinationsSpecs, uid, metrics); err != nil { + log.Warn().Err(err).Msg("Failed to send sync summary") + } + } + return nil +} diff --git a/cli/go.mod b/cli/go.mod index c44c503503e1c7..6092f9fd0498c7 100644 --- a/cli/go.mod +++ b/cli/go.mod @@ -3,7 +3,7 @@ module github.com/cloudquery/cloudquery/cli go 1.19 require ( - github.com/cloudquery/plugin-sdk v1.27.0 + github.com/cloudquery/plugin-sdk v1.28.0 github.com/getsentry/sentry-go v0.16.0 github.com/google/uuid v1.3.0 github.com/rs/zerolog v1.28.0 @@ -11,6 +11,7 @@ require ( github.com/spf13/cobra v1.6.1 github.com/stretchr/testify v1.8.1 github.com/thoas/go-funk v0.9.3 + golang.org/x/exp v0.0.0-20221230185412-738e83a70c30 golang.org/x/sync v0.1.0 google.golang.org/grpc v1.51.0 google.golang.org/protobuf v1.28.1 diff --git a/cli/go.sum b/cli/go.sum index 9f37017510c12f..98f3b4e9d7afc8 100644 --- a/cli/go.sum +++ b/cli/go.sum @@ -1,8 +1,8 @@ github.com/avast/retry-go/v4 v4.3.1 h1:Mtg11F9PdAIMkMiio2RKcYauoVHjl2aB3zQJJlzD4cE= github.com/avast/retry-go/v4 v4.3.1/go.mod h1:rg6XFaiuFYII0Xu3RDbZQkxCofFwruZKW8oEF1jpWiU= github.com/bradleyjkemp/cupaloy/v2 v2.8.0 h1:any4BmKE+jGIaMpnU8YgH/I2LPiLBufr6oMMlVBbn9M= -github.com/cloudquery/plugin-sdk v1.27.0 h1:DXuvnBt1gOB98umBZU6jltZEV6oxfsdEBIAbQXFcIx4= -github.com/cloudquery/plugin-sdk v1.27.0/go.mod h1:teMPyCON3uPdMsHvzpSiOg+IK2sOR5Tf9dYLreoURzI= +github.com/cloudquery/plugin-sdk v1.28.0 h1:txt4/ahmzlhi9EnvvJw9+9da7k9H5cYA34WXj8iFneY= +github.com/cloudquery/plugin-sdk v1.28.0/go.mod h1:teMPyCON3uPdMsHvzpSiOg+IK2sOR5Tf9dYLreoURzI= github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w= github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= @@ -74,6 +74,7 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o github.com/thoas/go-funk v0.9.3 h1:7+nAEx3kn5ZJcnDm2Bh23N2yOtweO14bi//dvRtgLpw= github.com/thoas/go-funk v0.9.3/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= golang.org/x/exp v0.0.0-20221230185412-738e83a70c30 h1:m9O6OTJ627iFnN2JIWfdqlZCzneRO6EEBsHXI25P8ws= +golang.org/x/exp v0.0.0-20221230185412-738e83a70c30/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/net v0.4.0 h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU= golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= diff --git a/cli/internal/versions/versions.go b/cli/internal/versions/versions.go deleted file mode 100644 index 9d0b6b5b7d6360..00000000000000 --- a/cli/internal/versions/versions.go +++ /dev/null @@ -1,86 +0,0 @@ -package versions - -import ( - "context" - "encoding/json" - "fmt" - "io" - "net/http" - "strings" - - "github.com/cloudquery/plugin-sdk/clients" -) - -type manifestResponse struct { - Latest string `json:"latest"` -} - -type githubLatestResponse struct { - TagName string `json:"tag_name"` - // other fields are ignored -} - -const ( - CloudQueryOrg = "cloudquery" - GithubBaseURL = "https://github.com" - CloudQueryBaseURL = "https://versions.cloudquery.io" -) - -// GetLatestPluginRelease returns the latest release version string for the given organization, plugin type -// and plugin. -func GetLatestPluginRelease(ctx context.Context, org, name string, typ clients.PluginType) (string, error) { - if org == CloudQueryOrg { - return getLatestCQPluginRelease(ctx, name, typ) - } - return getLatestCommunityPluginRelease(ctx, org, name, typ) -} - -func getLatestCQPluginRelease(ctx context.Context, name string, typ clients.PluginType) (string, error) { - url := fmt.Sprintf(CloudQueryBaseURL+"/v2/%s-%s.json", typ, name) - b, err := doRequest(ctx, url) - if err != nil { - return "", fmt.Errorf("reading manifest for %v: %w", name, err) - } - mr := &manifestResponse{} - err = json.Unmarshal(b, mr) - if err != nil { - return "", fmt.Errorf("unmarshaling manifest response: %w", err) - } - version := strings.TrimPrefix(mr.Latest, fmt.Sprintf("plugins-%s-%s-", string(typ), name)) - return version, nil -} - -func getLatestCommunityPluginRelease(ctx context.Context, org, name string, typ clients.PluginType) (string, error) { - url := fmt.Sprintf(GithubBaseURL+"/%s/cq-%s-%s/releases/latest", org, typ, name) - b, err := doRequest(ctx, url) - if err != nil { - return "", fmt.Errorf("reading %v: %w", url, err) - } - gr := &githubLatestResponse{} - err = json.Unmarshal(b, gr) - if err != nil { - return "", fmt.Errorf("unmarshaling GitHub latest response %s: %w", url, err) - } - return gr.TagName, nil -} - -func doRequest(ctx context.Context, url string) ([]byte, error) { - req, err := http.NewRequestWithContext(ctx, "GET", url, nil) - if err != nil { - return nil, err - } - req.Header.Add("Accept", "application/json") - resp, err := http.DefaultClient.Do(req) - if err != nil { - return nil, err - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("status code %v (%v)", resp.StatusCode, resp.Status) - } - b, err := io.ReadAll(resp.Body) - if err != nil { - return nil, fmt.Errorf("reading response body: %w", err) - } - return b, nil -} diff --git a/cli/internal/versions/versions_test.go b/cli/internal/versions/versions_test.go deleted file mode 100644 index 241e81c9f9955f..00000000000000 --- a/cli/internal/versions/versions_test.go +++ /dev/null @@ -1,31 +0,0 @@ -package versions - -import ( - "context" - "strings" - "testing" - - "github.com/cloudquery/plugin-sdk/clients" -) - -func TestGetLatestCQPluginRelease(t *testing.T) { - ctx := context.Background() - version, err := getLatestCQPluginRelease(ctx, "test", clients.PluginTypeSource) - if err != nil { - t.Fatalf("error calling GetLatestPluginRelease: %v", err) - } - if !strings.HasPrefix(version, "v") { - t.Errorf("got version = %q, want a version starting with 'v'", version) - } -} - -func TestGetLatestCommunityPluginRelease(t *testing.T) { - ctx := context.Background() - version, err := getLatestCommunityPluginRelease(ctx, "yevgenypats", "test", clients.PluginTypeSource) - if err != nil { - t.Fatalf("error calling GetLatestPluginRelease: %v", err) - } - if !strings.HasPrefix(version, "v") { - t.Errorf("got version = %q, want a version starting with 'v'", version) - } -}