Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions cli/cmd/clients.go → cli/cmd/destinations_v0.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package cmd
import (
"context"
"fmt"
"github.com/cloudquery/plugin-sdk/clients"

destination "github.com/cloudquery/plugin-sdk/clients/destination/v0"
"github.com/cloudquery/plugin-sdk/specs"
"github.com/rs/zerolog/log"
)

type destinationClients []*clients.DestinationClient
type destinationClients []*destination.Client

func newDestinationClients(ctx context.Context, sourceSpec specs.Source, destinationsSpecs []specs.Destination, cqDir string) (destinationClients, error) {
func newDestinationClientsV0(ctx context.Context, sourceSpec specs.Source, destinationsSpecs []specs.Destination, cqDir string) (destinationClients, error) {
var err error
destClients := make(destinationClients, len(sourceSpec.Destinations))
defer func() {
Expand All @@ -22,14 +23,14 @@ func newDestinationClients(ctx context.Context, sourceSpec specs.Source, destina
}()

for i, destinationSpec := range destinationsSpecs {
opts := []clients.DestinationClientOption{
clients.WithDestinationLogger(log.Logger),
clients.WithDestinationDirectory(cqDir),
opts := []destination.ClientOption{
destination.WithLogger(log.Logger),
destination.WithDirectory(cqDir),
}
if disableSentry {
opts = append(opts, clients.WithDestinationNoSentry())
opts = append(opts, destination.WithNoSentry())
}
destClients[i], err = clients.NewDestinationClient(ctx, destinationSpec.Registry, destinationSpec.Path, destinationSpec.Version, opts...)
destClients[i], err = destination.NewClient(ctx, destinationSpec.Registry, destinationSpec.Path, destinationSpec.Version, opts...)
if err != nil {
return nil, fmt.Errorf("failed to create destination plugin client for %s: %w", destinationSpec.Name, err)
}
Expand Down
22 changes: 22 additions & 0 deletions cli/cmd/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package cmd

import (
"errors"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// isUnimplemented returns true if an error indicates that the underlying grpc call
// was unimplemented on the server side.
func isUnimplemented(err error) bool {
if err == nil {
return false
}
st, ok := status.FromError(err)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if ok && st.Code() == codes.Unimplemented {
return true
}
err = errors.Unwrap(err)
return isUnimplemented(err)
}
94 changes: 31 additions & 63 deletions cli/cmd/migrate.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package cmd

import (
"context"
"fmt"
"strings"
"time"

"github.com/cloudquery/plugin-sdk/clients"
discovery "github.com/cloudquery/plugin-sdk/clients/discovery/v0"
"github.com/cloudquery/plugin-sdk/registry"
"github.com/cloudquery/plugin-sdk/specs"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"golang.org/x/exp/slices"
)

const (
Expand Down Expand Up @@ -59,74 +59,42 @@ func migrate(cmd *cobra.Command, args []string) error {
}
destinationsSpecs = append(destinationsSpecs, *spec)
}
if err := migrateConnection(ctx, cqDir, *sourceSpec, destinationsSpecs); err != nil {
return fmt.Errorf("failed to migrate source %s: %w", sourceSpec.Name, err)
discoveryClient, err := discovery.NewClient(ctx, sourceSpec.Registry, registry.PluginTypeSource, sourceSpec.Path, sourceSpec.Version)
if err != nil {
return fmt.Errorf("failed to create discovery client for source %s: %w", sourceSpec.Name, err)
}
}

return nil
}

func migrateConnection(ctx context.Context, cqDir string, sourceSpec specs.Source, destinationsSpecs []specs.Destination) error {
destinationNames := make([]string, len(destinationsSpecs))
for i := range destinationsSpecs {
destinationNames[i] = destinationsSpecs[i].Name
}
migrateTime := time.Now().UTC()

log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationNames).Time("migrate_time", migrateTime).Msg("Start migration")
defer log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationNames).Time("migrate_time", migrateTime).Msg("End migration")

sourceClient, err := clients.NewSourceClient(ctx, sourceSpec.Registry, sourceSpec.Path, sourceSpec.Version,
clients.WithSourceLogger(log.Logger),
clients.WithSourceDirectory(cqDir),
)
if err != nil {
return fmt.Errorf("failed to get source plugin client for %s: %w", sourceSpec.Name, err)
}
defer func() {
if err := sourceClient.Terminate(); err != nil {
log.Error().Err(err).Msg("Failed to terminate source client")
fmt.Println("failed to terminate source client:", err)
versions, err := discoveryClient.GetVersions(ctx)
if err != nil {
if discoveryErr := discoveryClient.Terminate(); err != nil {
log.Error().Err(discoveryErr).Msg("failed to terminate discovery client")
fmt.Println("failed to terminate discovery client:", discoveryErr)
}
if err := migrateConnectionV0(ctx, cqDir, *sourceSpec, destinationsSpecs); err != nil {
return fmt.Errorf("failed to migrate source %s: %w", sourceSpec.Name, err)
}
return nil
}
}()

destClients, err := newDestinationClients(ctx, sourceSpec, destinationsSpecs, cqDir)
if err != nil {
return err
}
defer destClients.Close()

selectedTables, tablesForSpecSupported, err := getTablesForSpec(ctx, sourceClient, sourceSpec)
if err != nil {
return fmt.Errorf("failed to get tables for source %s: %w", sourceSpec.Name, err)
}
tableCount := len(selectedTables.FlattenTables())

// Print a count of the tables that will be migrated.
if tablesForSpecSupported {
word := "tables"
if tableCount == 1 {
word = "table"
if err := discoveryClient.Terminate(); err != nil {
return fmt.Errorf("failed to terminate discovery client: %w", err)
}
fmt.Printf("Source %s will migrate %d %s.\n", sourceSpec.Name, tableCount, word)
}

fmt.Println("Starting migration for:", sourceSpec.Name, "->", sourceSpec.Destinations)
log.Info().Str("source", sourceSpec.Name).Strs("destinations", sourceSpec.Destinations).Msg("Starting migration")
if slices.Index(versions, "v1") != -1 {
if err := migrateConnectionV1(ctx, cqDir, *sourceSpec, destinationsSpecs); err != nil {
return fmt.Errorf("failed to migrate source %s: %w", sourceSpec.Name, err)
}
return nil
}

for i, destinationSpec := range destinationsSpecs {
if err := destClients[i].Migrate(ctx, selectedTables); err != nil {
return fmt.Errorf("failed to migrate source %s on destination %s : %w", sourceSpec.Name, destinationSpec.Name, err)
if slices.Index(versions, "v0") != -1 {
if err := migrateConnectionV0(ctx, cqDir, *sourceSpec, destinationsSpecs); err != nil {
return fmt.Errorf("failed to migrate source %s: %w", sourceSpec.Name, err)
}
return nil
}

return fmt.Errorf("failed to migrate source %s, unknown versions %v", sourceSpec.Name, versions)
}
tt := time.Since(migrateTime)
fmt.Printf("Migration completed successfully.\n")
log.Info().Str("source", sourceSpec.Name).
Strs("destinations", sourceSpec.Destinations).
Int("num_tables", tableCount).
Float64("time_took", tt.Seconds()).
Msg("Migration completed successfully")

return nil
}
75 changes: 75 additions & 0 deletions cli/cmd/migrate_v0.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package cmd

import (
"context"
"fmt"
"time"

source "github.com/cloudquery/plugin-sdk/clients/source/v0"
"github.com/cloudquery/plugin-sdk/specs"
"github.com/rs/zerolog/log"
)

func migrateConnectionV0(ctx context.Context, cqDir string, sourceSpec specs.Source, destinationsSpecs []specs.Destination) error {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should put these in separate packages? instead of postfixing the methods something like v1/migrate and v0/migrate

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would avoid this for now I think it will cause all things like cyclical imports but also on the CLI I don't think it should be different packages as we don't import it so it should only be on the SDK.

destinationNames := make([]string, len(destinationsSpecs))
for i := range destinationsSpecs {
destinationNames[i] = destinationsSpecs[i].Name
}
migrateTime := time.Now().UTC()

log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationNames).Time("migrate_time", migrateTime).Msg("Start migration")
defer log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationNames).Time("migrate_time", migrateTime).Msg("End migration")

sourceClient, err := source.NewClient(ctx, sourceSpec.Registry, sourceSpec.Path, sourceSpec.Version,
source.WithLogger(log.Logger),
source.WithDirectory(cqDir),
)
if err != nil {
return fmt.Errorf("failed to get source plugin client for %s: %w", sourceSpec.Name, err)
}
defer func() {
if err := sourceClient.Terminate(); err != nil {
log.Error().Err(err).Msg("Failed to terminate source client")
fmt.Println("failed to terminate source client:", err)
}
}()

destClients, err := newDestinationClientsV0(ctx, sourceSpec, destinationsSpecs, cqDir)
if err != nil {
return err
}
defer destClients.Close()

selectedTables, tablesForSpecSupported, err := getTablesForSpec(ctx, sourceClient, sourceSpec)
if err != nil {
return fmt.Errorf("failed to get tables for source %s: %w", sourceSpec.Name, err)
}
tableCount := len(selectedTables.FlattenTables())

// Print a count of the tables that will be migrated.
if tablesForSpecSupported {
word := "tables"
if tableCount == 1 {
word = "table"
}
fmt.Printf("Source %s will migrate %d %s.\n", sourceSpec.Name, tableCount, word)
}

fmt.Println("Starting migration for:", sourceSpec.Name, "->", sourceSpec.Destinations)
log.Info().Str("source", sourceSpec.Name).Strs("destinations", sourceSpec.Destinations).Msg("Starting migration")

for i, destinationSpec := range destinationsSpecs {
if err := destClients[i].Migrate(ctx, selectedTables); err != nil {
return fmt.Errorf("failed to migrate source %s on destination %s : %w", sourceSpec.Name, destinationSpec.Name, err)
}
}
tt := time.Since(migrateTime)
fmt.Printf("Migration completed successfully.\n")
log.Info().Str("source", sourceSpec.Name).
Strs("destinations", sourceSpec.Destinations).
Int("num_tables", tableCount).
Float64("time_took", tt.Seconds()).
Msg("Migration completed successfully")

return nil
}
70 changes: 70 additions & 0 deletions cli/cmd/migrate_v1.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package cmd

import (
"context"
"fmt"
"time"

source "github.com/cloudquery/plugin-sdk/clients/source/v1"
"github.com/cloudquery/plugin-sdk/specs"
"github.com/rs/zerolog/log"
)

func migrateConnectionV1(ctx context.Context, cqDir string, sourceSpec specs.Source, destinationsSpecs []specs.Destination) error {
destinationNames := make([]string, len(destinationsSpecs))
for i := range destinationsSpecs {
destinationNames[i] = destinationsSpecs[i].Name
}
migrateTime := time.Now().UTC()

log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationNames).Time("migrate_time", migrateTime).Msg("Start migration")
defer log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationNames).Time("migrate_time", migrateTime).Msg("End migration")

sourceClient, err := source.NewClient(ctx, sourceSpec.Registry, sourceSpec.Path, sourceSpec.Version,
source.WithLogger(log.Logger),
source.WithDirectory(cqDir),
)
if err != nil {
return fmt.Errorf("failed to get source plugin client for %s: %w", sourceSpec.Name, err)
}
defer func() {
if err := sourceClient.Terminate(); err != nil {
log.Error().Err(err).Msg("Failed to terminate source client")
fmt.Println("failed to terminate source client:", err)
}
}()

destClients, err := newDestinationClientsV0(ctx, sourceSpec, destinationsSpecs, cqDir)
if err != nil {
return err
}
defer destClients.Close()

if err := sourceClient.Init(ctx, sourceSpec); err != nil {
return fmt.Errorf("failed to init source %s: %w", sourceSpec.Name, err)
}

tables, err := sourceClient.GetDynamicTables(ctx)
if err != nil {
return fmt.Errorf("failed to get dynamic tables for source %s: %w", sourceSpec.Name, err)
}

tableCount := len(tables.FlattenTables())
fmt.Println("Starting migration for:", sourceSpec.Name, "->", sourceSpec.Destinations, "with", tableCount, "tables")
log.Info().Str("source", sourceSpec.Name).Strs("destinations", sourceSpec.Destinations).Msg("Starting migration")

for i, destinationSpec := range destinationsSpecs {
if err := destClients[i].Migrate(ctx, tables); err != nil {
return fmt.Errorf("failed to migrate source %s on destination %s : %w", sourceSpec.Name, destinationSpec.Name, err)
}
}
tt := time.Since(migrateTime)
fmt.Printf("Migration completed successfully.\n")
log.Info().Str("source", sourceSpec.Name).
Strs("destinations", sourceSpec.Destinations).
Int("num_tables", tableCount).
Float64("time_took", tt.Seconds()).
Msg("Migration completed successfully")

return nil
}
Loading