-
Notifications
You must be signed in to change notification settings - Fork 544
feat(cli): Update CLI to support source v1 protocol #6986
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| package cmd | ||
|
|
||
| import ( | ||
| "errors" | ||
|
|
||
| "google.golang.org/grpc/codes" | ||
| "google.golang.org/grpc/status" | ||
| ) | ||
|
|
||
| // isUnimplemented returns true if an error indicates that the underlying grpc call | ||
| // was unimplemented on the server side. | ||
| func isUnimplemented(err error) bool { | ||
| if err == nil { | ||
| return false | ||
| } | ||
| st, ok := status.FromError(err) | ||
| if ok && st.Code() == codes.Unimplemented { | ||
| return true | ||
| } | ||
| err = errors.Unwrap(err) | ||
| return isUnimplemented(err) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,75 @@ | ||
| package cmd | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "time" | ||
|
|
||
| source "github.com/cloudquery/plugin-sdk/clients/source/v0" | ||
| "github.com/cloudquery/plugin-sdk/specs" | ||
| "github.com/rs/zerolog/log" | ||
| ) | ||
|
|
||
| func migrateConnectionV0(ctx context.Context, cqDir string, sourceSpec specs.Source, destinationsSpecs []specs.Destination) error { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we should put these in separate packages? instead of postfixing the methods something like
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would avoid this for now I think it will cause all things like cyclical imports but also on the CLI I don't think it should be different packages as we don't import it so it should only be on the SDK. |
||
| destinationNames := make([]string, len(destinationsSpecs)) | ||
| for i := range destinationsSpecs { | ||
| destinationNames[i] = destinationsSpecs[i].Name | ||
| } | ||
| migrateTime := time.Now().UTC() | ||
|
|
||
| log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationNames).Time("migrate_time", migrateTime).Msg("Start migration") | ||
| defer log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationNames).Time("migrate_time", migrateTime).Msg("End migration") | ||
|
|
||
| sourceClient, err := source.NewClient(ctx, sourceSpec.Registry, sourceSpec.Path, sourceSpec.Version, | ||
| source.WithLogger(log.Logger), | ||
| source.WithDirectory(cqDir), | ||
| ) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to get source plugin client for %s: %w", sourceSpec.Name, err) | ||
| } | ||
| defer func() { | ||
| if err := sourceClient.Terminate(); err != nil { | ||
| log.Error().Err(err).Msg("Failed to terminate source client") | ||
| fmt.Println("failed to terminate source client:", err) | ||
| } | ||
| }() | ||
|
|
||
| destClients, err := newDestinationClientsV0(ctx, sourceSpec, destinationsSpecs, cqDir) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| defer destClients.Close() | ||
|
|
||
| selectedTables, tablesForSpecSupported, err := getTablesForSpec(ctx, sourceClient, sourceSpec) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to get tables for source %s: %w", sourceSpec.Name, err) | ||
| } | ||
| tableCount := len(selectedTables.FlattenTables()) | ||
|
|
||
| // Print a count of the tables that will be migrated. | ||
| if tablesForSpecSupported { | ||
| word := "tables" | ||
| if tableCount == 1 { | ||
| word = "table" | ||
| } | ||
| fmt.Printf("Source %s will migrate %d %s.\n", sourceSpec.Name, tableCount, word) | ||
| } | ||
|
|
||
| fmt.Println("Starting migration for:", sourceSpec.Name, "->", sourceSpec.Destinations) | ||
| log.Info().Str("source", sourceSpec.Name).Strs("destinations", sourceSpec.Destinations).Msg("Starting migration") | ||
|
|
||
| for i, destinationSpec := range destinationsSpecs { | ||
| if err := destClients[i].Migrate(ctx, selectedTables); err != nil { | ||
| return fmt.Errorf("failed to migrate source %s on destination %s : %w", sourceSpec.Name, destinationSpec.Name, err) | ||
| } | ||
| } | ||
| tt := time.Since(migrateTime) | ||
| fmt.Printf("Migration completed successfully.\n") | ||
| log.Info().Str("source", sourceSpec.Name). | ||
| Strs("destinations", sourceSpec.Destinations). | ||
| Int("num_tables", tableCount). | ||
| Float64("time_took", tt.Seconds()). | ||
| Msg("Migration completed successfully") | ||
|
|
||
| return nil | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| package cmd | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "time" | ||
|
|
||
| source "github.com/cloudquery/plugin-sdk/clients/source/v1" | ||
| "github.com/cloudquery/plugin-sdk/specs" | ||
| "github.com/rs/zerolog/log" | ||
| ) | ||
|
|
||
| func migrateConnectionV1(ctx context.Context, cqDir string, sourceSpec specs.Source, destinationsSpecs []specs.Destination) error { | ||
| destinationNames := make([]string, len(destinationsSpecs)) | ||
| for i := range destinationsSpecs { | ||
| destinationNames[i] = destinationsSpecs[i].Name | ||
| } | ||
| migrateTime := time.Now().UTC() | ||
|
|
||
| log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationNames).Time("migrate_time", migrateTime).Msg("Start migration") | ||
| defer log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationNames).Time("migrate_time", migrateTime).Msg("End migration") | ||
|
|
||
| sourceClient, err := source.NewClient(ctx, sourceSpec.Registry, sourceSpec.Path, sourceSpec.Version, | ||
| source.WithLogger(log.Logger), | ||
| source.WithDirectory(cqDir), | ||
| ) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to get source plugin client for %s: %w", sourceSpec.Name, err) | ||
| } | ||
| defer func() { | ||
| if err := sourceClient.Terminate(); err != nil { | ||
| log.Error().Err(err).Msg("Failed to terminate source client") | ||
| fmt.Println("failed to terminate source client:", err) | ||
| } | ||
| }() | ||
|
|
||
| destClients, err := newDestinationClientsV0(ctx, sourceSpec, destinationsSpecs, cqDir) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| defer destClients.Close() | ||
|
|
||
| if err := sourceClient.Init(ctx, sourceSpec); err != nil { | ||
| return fmt.Errorf("failed to init source %s: %w", sourceSpec.Name, err) | ||
| } | ||
|
|
||
| tables, err := sourceClient.GetDynamicTables(ctx) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to get dynamic tables for source %s: %w", sourceSpec.Name, err) | ||
| } | ||
|
|
||
| tableCount := len(tables.FlattenTables()) | ||
| fmt.Println("Starting migration for:", sourceSpec.Name, "->", sourceSpec.Destinations, "with", tableCount, "tables") | ||
| log.Info().Str("source", sourceSpec.Name).Strs("destinations", sourceSpec.Destinations).Msg("Starting migration") | ||
|
|
||
| for i, destinationSpec := range destinationsSpecs { | ||
| if err := destClients[i].Migrate(ctx, tables); err != nil { | ||
| return fmt.Errorf("failed to migrate source %s on destination %s : %w", sourceSpec.Name, destinationSpec.Name, err) | ||
| } | ||
| } | ||
| tt := time.Since(migrateTime) | ||
| fmt.Printf("Migration completed successfully.\n") | ||
| log.Info().Str("source", sourceSpec.Name). | ||
| Strs("destinations", sourceSpec.Destinations). | ||
| Int("num_tables", tableCount). | ||
| Float64("time_took", tt.Seconds()). | ||
| Msg("Migration completed successfully") | ||
|
|
||
| return nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can simplify this a bit using
status.Convert:https://discord.com/channels/872925471417962546/873606591335759872/1065556442800734268