Skip to content

Commit fe8ff12

Browse files
authored
feat(cli): Update CLI to support source v1 protocol (#6986)
Ready for review
1 parent 67f39fb commit fe8ff12

14 files changed

Lines changed: 633 additions & 367 deletions

File tree

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@ package cmd
33
import (
44
"context"
55
"fmt"
6-
"github.com/cloudquery/plugin-sdk/clients"
6+
7+
destination "github.com/cloudquery/plugin-sdk/clients/destination/v0"
78
"github.com/cloudquery/plugin-sdk/specs"
89
"github.com/rs/zerolog/log"
910
)
1011

11-
type destinationClients []*clients.DestinationClient
12+
type destinationClients []*destination.Client
1213

13-
func newDestinationClients(ctx context.Context, sourceSpec specs.Source, destinationsSpecs []specs.Destination, cqDir string) (destinationClients, error) {
14+
func newDestinationClientsV0(ctx context.Context, sourceSpec specs.Source, destinationsSpecs []specs.Destination, cqDir string) (destinationClients, error) {
1415
var err error
1516
destClients := make(destinationClients, len(sourceSpec.Destinations))
1617
defer func() {
@@ -22,14 +23,14 @@ func newDestinationClients(ctx context.Context, sourceSpec specs.Source, destina
2223
}()
2324

2425
for i, destinationSpec := range destinationsSpecs {
25-
opts := []clients.DestinationClientOption{
26-
clients.WithDestinationLogger(log.Logger),
27-
clients.WithDestinationDirectory(cqDir),
26+
opts := []destination.ClientOption{
27+
destination.WithLogger(log.Logger),
28+
destination.WithDirectory(cqDir),
2829
}
2930
if disableSentry {
30-
opts = append(opts, clients.WithDestinationNoSentry())
31+
opts = append(opts, destination.WithNoSentry())
3132
}
32-
destClients[i], err = clients.NewDestinationClient(ctx, destinationSpec.Registry, destinationSpec.Path, destinationSpec.Version, opts...)
33+
destClients[i], err = destination.NewClient(ctx, destinationSpec.Registry, destinationSpec.Path, destinationSpec.Version, opts...)
3334
if err != nil {
3435
return nil, fmt.Errorf("failed to create destination plugin client for %s: %w", destinationSpec.Name, err)
3536
}

cli/cmd/errors.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package cmd
2+
3+
import (
4+
"errors"
5+
6+
"google.golang.org/grpc/codes"
7+
"google.golang.org/grpc/status"
8+
)
9+
10+
// isUnimplemented returns true if an error indicates that the underlying grpc call
11+
// was unimplemented on the server side.
12+
func isUnimplemented(err error) bool {
13+
if err == nil {
14+
return false
15+
}
16+
st, ok := status.FromError(err)
17+
if ok && st.Code() == codes.Unimplemented {
18+
return true
19+
}
20+
err = errors.Unwrap(err)
21+
return isUnimplemented(err)
22+
}

cli/cmd/migrate.go

Lines changed: 31 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
package cmd
22

33
import (
4-
"context"
54
"fmt"
65
"strings"
7-
"time"
86

9-
"github.com/cloudquery/plugin-sdk/clients"
7+
discovery "github.com/cloudquery/plugin-sdk/clients/discovery/v0"
8+
"github.com/cloudquery/plugin-sdk/registry"
109
"github.com/cloudquery/plugin-sdk/specs"
1110
"github.com/rs/zerolog/log"
1211
"github.com/spf13/cobra"
12+
"golang.org/x/exp/slices"
1313
)
1414

1515
const (
@@ -59,74 +59,42 @@ func migrate(cmd *cobra.Command, args []string) error {
5959
}
6060
destinationsSpecs = append(destinationsSpecs, *spec)
6161
}
62-
if err := migrateConnection(ctx, cqDir, *sourceSpec, destinationsSpecs); err != nil {
63-
return fmt.Errorf("failed to migrate source %s: %w", sourceSpec.Name, err)
62+
discoveryClient, err := discovery.NewClient(ctx, sourceSpec.Registry, registry.PluginTypeSource, sourceSpec.Path, sourceSpec.Version)
63+
if err != nil {
64+
return fmt.Errorf("failed to create discovery client for source %s: %w", sourceSpec.Name, err)
6465
}
65-
}
66-
67-
return nil
68-
}
69-
70-
func migrateConnection(ctx context.Context, cqDir string, sourceSpec specs.Source, destinationsSpecs []specs.Destination) error {
71-
destinationNames := make([]string, len(destinationsSpecs))
72-
for i := range destinationsSpecs {
73-
destinationNames[i] = destinationsSpecs[i].Name
74-
}
75-
migrateTime := time.Now().UTC()
76-
77-
log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationNames).Time("migrate_time", migrateTime).Msg("Start migration")
78-
defer log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationNames).Time("migrate_time", migrateTime).Msg("End migration")
79-
80-
sourceClient, err := clients.NewSourceClient(ctx, sourceSpec.Registry, sourceSpec.Path, sourceSpec.Version,
81-
clients.WithSourceLogger(log.Logger),
82-
clients.WithSourceDirectory(cqDir),
83-
)
84-
if err != nil {
85-
return fmt.Errorf("failed to get source plugin client for %s: %w", sourceSpec.Name, err)
86-
}
87-
defer func() {
88-
if err := sourceClient.Terminate(); err != nil {
89-
log.Error().Err(err).Msg("Failed to terminate source client")
90-
fmt.Println("failed to terminate source client:", err)
66+
versions, err := discoveryClient.GetVersions(ctx)
67+
if err != nil {
68+
if discoveryErr := discoveryClient.Terminate(); err != nil {
69+
log.Error().Err(discoveryErr).Msg("failed to terminate discovery client")
70+
fmt.Println("failed to terminate discovery client:", discoveryErr)
71+
}
72+
if err := migrateConnectionV0(ctx, cqDir, *sourceSpec, destinationsSpecs); err != nil {
73+
return fmt.Errorf("failed to migrate source %s: %w", sourceSpec.Name, err)
74+
}
75+
return nil
9176
}
92-
}()
9377

94-
destClients, err := newDestinationClients(ctx, sourceSpec, destinationsSpecs, cqDir)
95-
if err != nil {
96-
return err
97-
}
98-
defer destClients.Close()
99-
100-
selectedTables, tablesForSpecSupported, err := getTablesForSpec(ctx, sourceClient, sourceSpec)
101-
if err != nil {
102-
return fmt.Errorf("failed to get tables for source %s: %w", sourceSpec.Name, err)
103-
}
104-
tableCount := len(selectedTables.FlattenTables())
105-
106-
// Print a count of the tables that will be migrated.
107-
if tablesForSpecSupported {
108-
word := "tables"
109-
if tableCount == 1 {
110-
word = "table"
78+
if err := discoveryClient.Terminate(); err != nil {
79+
return fmt.Errorf("failed to terminate discovery client: %w", err)
11180
}
112-
fmt.Printf("Source %s will migrate %d %s.\n", sourceSpec.Name, tableCount, word)
113-
}
11481

115-
fmt.Println("Starting migration for:", sourceSpec.Name, "->", sourceSpec.Destinations)
116-
log.Info().Str("source", sourceSpec.Name).Strs("destinations", sourceSpec.Destinations).Msg("Starting migration")
82+
if slices.Index(versions, "v1") != -1 {
83+
if err := migrateConnectionV1(ctx, cqDir, *sourceSpec, destinationsSpecs); err != nil {
84+
return fmt.Errorf("failed to migrate source %s: %w", sourceSpec.Name, err)
85+
}
86+
return nil
87+
}
11788

118-
for i, destinationSpec := range destinationsSpecs {
119-
if err := destClients[i].Migrate(ctx, selectedTables); err != nil {
120-
return fmt.Errorf("failed to migrate source %s on destination %s : %w", sourceSpec.Name, destinationSpec.Name, err)
89+
if slices.Index(versions, "v0") != -1 {
90+
if err := migrateConnectionV0(ctx, cqDir, *sourceSpec, destinationsSpecs); err != nil {
91+
return fmt.Errorf("failed to migrate source %s: %w", sourceSpec.Name, err)
92+
}
93+
return nil
12194
}
95+
96+
return fmt.Errorf("failed to migrate source %s, unknown versions %v", sourceSpec.Name, versions)
12297
}
123-
tt := time.Since(migrateTime)
124-
fmt.Printf("Migration completed successfully.\n")
125-
log.Info().Str("source", sourceSpec.Name).
126-
Strs("destinations", sourceSpec.Destinations).
127-
Int("num_tables", tableCount).
128-
Float64("time_took", tt.Seconds()).
129-
Msg("Migration completed successfully")
13098

13199
return nil
132100
}

cli/cmd/migrate_v0.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package cmd
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
source "github.com/cloudquery/plugin-sdk/clients/source/v0"
9+
"github.com/cloudquery/plugin-sdk/specs"
10+
"github.com/rs/zerolog/log"
11+
)
12+
13+
func migrateConnectionV0(ctx context.Context, cqDir string, sourceSpec specs.Source, destinationsSpecs []specs.Destination) error {
14+
destinationNames := make([]string, len(destinationsSpecs))
15+
for i := range destinationsSpecs {
16+
destinationNames[i] = destinationsSpecs[i].Name
17+
}
18+
migrateTime := time.Now().UTC()
19+
20+
log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationNames).Time("migrate_time", migrateTime).Msg("Start migration")
21+
defer log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationNames).Time("migrate_time", migrateTime).Msg("End migration")
22+
23+
sourceClient, err := source.NewClient(ctx, sourceSpec.Registry, sourceSpec.Path, sourceSpec.Version,
24+
source.WithLogger(log.Logger),
25+
source.WithDirectory(cqDir),
26+
)
27+
if err != nil {
28+
return fmt.Errorf("failed to get source plugin client for %s: %w", sourceSpec.Name, err)
29+
}
30+
defer func() {
31+
if err := sourceClient.Terminate(); err != nil {
32+
log.Error().Err(err).Msg("Failed to terminate source client")
33+
fmt.Println("failed to terminate source client:", err)
34+
}
35+
}()
36+
37+
destClients, err := newDestinationClientsV0(ctx, sourceSpec, destinationsSpecs, cqDir)
38+
if err != nil {
39+
return err
40+
}
41+
defer destClients.Close()
42+
43+
selectedTables, tablesForSpecSupported, err := getTablesForSpec(ctx, sourceClient, sourceSpec)
44+
if err != nil {
45+
return fmt.Errorf("failed to get tables for source %s: %w", sourceSpec.Name, err)
46+
}
47+
tableCount := len(selectedTables.FlattenTables())
48+
49+
// Print a count of the tables that will be migrated.
50+
if tablesForSpecSupported {
51+
word := "tables"
52+
if tableCount == 1 {
53+
word = "table"
54+
}
55+
fmt.Printf("Source %s will migrate %d %s.\n", sourceSpec.Name, tableCount, word)
56+
}
57+
58+
fmt.Println("Starting migration for:", sourceSpec.Name, "->", sourceSpec.Destinations)
59+
log.Info().Str("source", sourceSpec.Name).Strs("destinations", sourceSpec.Destinations).Msg("Starting migration")
60+
61+
for i, destinationSpec := range destinationsSpecs {
62+
if err := destClients[i].Migrate(ctx, selectedTables); err != nil {
63+
return fmt.Errorf("failed to migrate source %s on destination %s : %w", sourceSpec.Name, destinationSpec.Name, err)
64+
}
65+
}
66+
tt := time.Since(migrateTime)
67+
fmt.Printf("Migration completed successfully.\n")
68+
log.Info().Str("source", sourceSpec.Name).
69+
Strs("destinations", sourceSpec.Destinations).
70+
Int("num_tables", tableCount).
71+
Float64("time_took", tt.Seconds()).
72+
Msg("Migration completed successfully")
73+
74+
return nil
75+
}

cli/cmd/migrate_v1.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package cmd
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
source "github.com/cloudquery/plugin-sdk/clients/source/v1"
9+
"github.com/cloudquery/plugin-sdk/specs"
10+
"github.com/rs/zerolog/log"
11+
)
12+
13+
func migrateConnectionV1(ctx context.Context, cqDir string, sourceSpec specs.Source, destinationsSpecs []specs.Destination) error {
14+
destinationNames := make([]string, len(destinationsSpecs))
15+
for i := range destinationsSpecs {
16+
destinationNames[i] = destinationsSpecs[i].Name
17+
}
18+
migrateTime := time.Now().UTC()
19+
20+
log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationNames).Time("migrate_time", migrateTime).Msg("Start migration")
21+
defer log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationNames).Time("migrate_time", migrateTime).Msg("End migration")
22+
23+
sourceClient, err := source.NewClient(ctx, sourceSpec.Registry, sourceSpec.Path, sourceSpec.Version,
24+
source.WithLogger(log.Logger),
25+
source.WithDirectory(cqDir),
26+
)
27+
if err != nil {
28+
return fmt.Errorf("failed to get source plugin client for %s: %w", sourceSpec.Name, err)
29+
}
30+
defer func() {
31+
if err := sourceClient.Terminate(); err != nil {
32+
log.Error().Err(err).Msg("Failed to terminate source client")
33+
fmt.Println("failed to terminate source client:", err)
34+
}
35+
}()
36+
37+
destClients, err := newDestinationClientsV0(ctx, sourceSpec, destinationsSpecs, cqDir)
38+
if err != nil {
39+
return err
40+
}
41+
defer destClients.Close()
42+
43+
if err := sourceClient.Init(ctx, sourceSpec); err != nil {
44+
return fmt.Errorf("failed to init source %s: %w", sourceSpec.Name, err)
45+
}
46+
47+
tables, err := sourceClient.GetDynamicTables(ctx)
48+
if err != nil {
49+
return fmt.Errorf("failed to get dynamic tables for source %s: %w", sourceSpec.Name, err)
50+
}
51+
52+
tableCount := len(tables.FlattenTables())
53+
fmt.Println("Starting migration for:", sourceSpec.Name, "->", sourceSpec.Destinations, "with", tableCount, "tables")
54+
log.Info().Str("source", sourceSpec.Name).Strs("destinations", sourceSpec.Destinations).Msg("Starting migration")
55+
56+
for i, destinationSpec := range destinationsSpecs {
57+
if err := destClients[i].Migrate(ctx, tables); err != nil {
58+
return fmt.Errorf("failed to migrate source %s on destination %s : %w", sourceSpec.Name, destinationSpec.Name, err)
59+
}
60+
}
61+
tt := time.Since(migrateTime)
62+
fmt.Printf("Migration completed successfully.\n")
63+
log.Info().Str("source", sourceSpec.Name).
64+
Strs("destinations", sourceSpec.Destinations).
65+
Int("num_tables", tableCount).
66+
Float64("time_took", tt.Seconds()).
67+
Msg("Migration completed successfully")
68+
69+
return nil
70+
}

0 commit comments

Comments
 (0)