|
1 | 1 | package cmd |
2 | 2 |
|
3 | 3 | import ( |
4 | | - "context" |
5 | 4 | "fmt" |
6 | 5 | "strings" |
7 | | - "time" |
8 | 6 |
|
9 | | - "github.com/cloudquery/plugin-sdk/clients" |
| 7 | + discovery "github.com/cloudquery/plugin-sdk/clients/discovery/v0" |
| 8 | + "github.com/cloudquery/plugin-sdk/registry" |
10 | 9 | "github.com/cloudquery/plugin-sdk/specs" |
11 | 10 | "github.com/rs/zerolog/log" |
12 | 11 | "github.com/spf13/cobra" |
| 12 | + "golang.org/x/exp/slices" |
13 | 13 | ) |
14 | 14 |
|
15 | 15 | const ( |
@@ -59,74 +59,42 @@ func migrate(cmd *cobra.Command, args []string) error { |
59 | 59 | } |
60 | 60 | destinationsSpecs = append(destinationsSpecs, *spec) |
61 | 61 | } |
62 | | - if err := migrateConnection(ctx, cqDir, *sourceSpec, destinationsSpecs); err != nil { |
63 | | - return fmt.Errorf("failed to migrate source %s: %w", sourceSpec.Name, err) |
| 62 | + discoveryClient, err := discovery.NewClient(ctx, sourceSpec.Registry, registry.PluginTypeSource, sourceSpec.Path, sourceSpec.Version) |
| 63 | + if err != nil { |
| 64 | + return fmt.Errorf("failed to create discovery client for source %s: %w", sourceSpec.Name, err) |
64 | 65 | } |
65 | | - } |
66 | | - |
67 | | - return nil |
68 | | -} |
69 | | - |
70 | | -func migrateConnection(ctx context.Context, cqDir string, sourceSpec specs.Source, destinationsSpecs []specs.Destination) error { |
71 | | - destinationNames := make([]string, len(destinationsSpecs)) |
72 | | - for i := range destinationsSpecs { |
73 | | - destinationNames[i] = destinationsSpecs[i].Name |
74 | | - } |
75 | | - migrateTime := time.Now().UTC() |
76 | | - |
77 | | - log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationNames).Time("migrate_time", migrateTime).Msg("Start migration") |
78 | | - defer log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationNames).Time("migrate_time", migrateTime).Msg("End migration") |
79 | | - |
80 | | - sourceClient, err := clients.NewSourceClient(ctx, sourceSpec.Registry, sourceSpec.Path, sourceSpec.Version, |
81 | | - clients.WithSourceLogger(log.Logger), |
82 | | - clients.WithSourceDirectory(cqDir), |
83 | | - ) |
84 | | - if err != nil { |
85 | | - return fmt.Errorf("failed to get source plugin client for %s: %w", sourceSpec.Name, err) |
86 | | - } |
87 | | - defer func() { |
88 | | - if err := sourceClient.Terminate(); err != nil { |
89 | | - log.Error().Err(err).Msg("Failed to terminate source client") |
90 | | - fmt.Println("failed to terminate source client:", err) |
| 66 | + versions, err := discoveryClient.GetVersions(ctx) |
| 67 | + if err != nil { |
| 68 | + if discoveryErr := discoveryClient.Terminate(); err != nil { |
| 69 | + log.Error().Err(discoveryErr).Msg("failed to terminate discovery client") |
| 70 | + fmt.Println("failed to terminate discovery client:", discoveryErr) |
| 71 | + } |
| 72 | + if err := migrateConnectionV0(ctx, cqDir, *sourceSpec, destinationsSpecs); err != nil { |
| 73 | + return fmt.Errorf("failed to migrate source %s: %w", sourceSpec.Name, err) |
| 74 | + } |
| 75 | + return nil |
91 | 76 | } |
92 | | - }() |
93 | 77 |
|
94 | | - destClients, err := newDestinationClients(ctx, sourceSpec, destinationsSpecs, cqDir) |
95 | | - if err != nil { |
96 | | - return err |
97 | | - } |
98 | | - defer destClients.Close() |
99 | | - |
100 | | - selectedTables, tablesForSpecSupported, err := getTablesForSpec(ctx, sourceClient, sourceSpec) |
101 | | - if err != nil { |
102 | | - return fmt.Errorf("failed to get tables for source %s: %w", sourceSpec.Name, err) |
103 | | - } |
104 | | - tableCount := len(selectedTables.FlattenTables()) |
105 | | - |
106 | | - // Print a count of the tables that will be migrated. |
107 | | - if tablesForSpecSupported { |
108 | | - word := "tables" |
109 | | - if tableCount == 1 { |
110 | | - word = "table" |
| 78 | + if err := discoveryClient.Terminate(); err != nil { |
| 79 | + return fmt.Errorf("failed to terminate discovery client: %w", err) |
111 | 80 | } |
112 | | - fmt.Printf("Source %s will migrate %d %s.\n", sourceSpec.Name, tableCount, word) |
113 | | - } |
114 | 81 |
|
115 | | - fmt.Println("Starting migration for:", sourceSpec.Name, "->", sourceSpec.Destinations) |
116 | | - log.Info().Str("source", sourceSpec.Name).Strs("destinations", sourceSpec.Destinations).Msg("Starting migration") |
| 82 | + if slices.Index(versions, "v1") != -1 { |
| 83 | + if err := migrateConnectionV1(ctx, cqDir, *sourceSpec, destinationsSpecs); err != nil { |
| 84 | + return fmt.Errorf("failed to migrate source %s: %w", sourceSpec.Name, err) |
| 85 | + } |
| 86 | + return nil |
| 87 | + } |
117 | 88 |
|
118 | | - for i, destinationSpec := range destinationsSpecs { |
119 | | - if err := destClients[i].Migrate(ctx, selectedTables); err != nil { |
120 | | - return fmt.Errorf("failed to migrate source %s on destination %s : %w", sourceSpec.Name, destinationSpec.Name, err) |
| 89 | + if slices.Index(versions, "v0") != -1 { |
| 90 | + if err := migrateConnectionV0(ctx, cqDir, *sourceSpec, destinationsSpecs); err != nil { |
| 91 | + return fmt.Errorf("failed to migrate source %s: %w", sourceSpec.Name, err) |
| 92 | + } |
| 93 | + return nil |
121 | 94 | } |
| 95 | + |
| 96 | + return fmt.Errorf("failed to migrate source %s, unknown versions %v", sourceSpec.Name, versions) |
122 | 97 | } |
123 | | - tt := time.Since(migrateTime) |
124 | | - fmt.Printf("Migration completed successfully.\n") |
125 | | - log.Info().Str("source", sourceSpec.Name). |
126 | | - Strs("destinations", sourceSpec.Destinations). |
127 | | - Int("num_tables", tableCount). |
128 | | - Float64("time_took", tt.Seconds()). |
129 | | - Msg("Migration completed successfully") |
130 | 98 |
|
131 | 99 | return nil |
132 | 100 | } |
0 commit comments