Skip to content
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
815b178
Implement support for transformations in CLI.
marianogappa Jul 23, 2024
192d4bf
Keep plugins folder as it is in main branch.
marianogappa Jul 23, 2024
ebdad9e
Remove test transformer.
marianogappa Jul 23, 2024
23ec33c
Remove incorrect rebase changes.
marianogappa Jul 23, 2024
826f02f
Merge branch 'main' into mariano/transformations-support
marianogappa Jul 23, 2024
beabe30
Add transformerpipeline tests.
marianogappa Jul 23, 2024
088df8a
Merge branch 'main' into mariano/transformations-support
marianogappa Jul 25, 2024
c1266e6
Also transform MigrateTable message, so that schemas can change.
marianogappa Jul 26, 2024
6b7009e
Merge branch 'main' into mariano/transformations-support
marianogappa Jul 29, 2024
4da8ca9
Implement review comments. Use TransformSchema method.
marianogappa Jul 31, 2024
22febc7
Merge branch 'main' into mariano/transformations-support
marianogappa Jul 31, 2024
6deb9b2
Run go mod tidy.
marianogappa Jul 31, 2024
84b3256
feat: Add basic transformer plugin.
marianogappa Jul 26, 2024
8fc3731
Rename function for clarity.
marianogappa Jul 26, 2024
c106934
Also transform MigrateTable message, so that schemas can change.
marianogappa Jul 26, 2024
d0b15a3
Update plugins/transformer/basic/client/client.go
marianogappa Jul 30, 2024
2f61c2e
Reuse arrow's existing AddField method.
marianogappa Jul 30, 2024
174a6f2
Upgrade sdk; remove "destination" hack.
marianogappa Jul 30, 2024
38979c4
Return all validation errors; not just the first one.
marianogappa Jul 30, 2024
35a29ce
Use switch statement.
marianogappa Jul 30, 2024
820cb82
Use memory.DefaultAllocator.
marianogappa Jul 30, 2024
9cfa7f5
Remove underscore from function name.
marianogappa Jul 30, 2024
e505bcc
Implement TransformSchema in basic transformer.
marianogappa Jul 31, 2024
bc42ff3
Update plugins/transformer/basic/README.md
marianogappa Jul 31, 2024
46f83f8
Add sample docs to ba able to publish.
marianogappa Jul 31, 2024
6428509
Fix panic when spec is empty.
marianogappa Aug 1, 2024
a3eca50
Follow docs conventions.
marianogappa Aug 1, 2024
c2ff2e8
Fix outdated logic for calculating slice cap.
marianogappa Aug 1, 2024
2ce76a2
Add plugin to CQ Monorepo steps.
marianogappa Aug 1, 2024
2e95627
Add transformer plugins to `wait_for_required_workflows`.
marianogappa Aug 1, 2024
94edcfc
Merge branch 'main' into mariano/transformations-support-transformer
marianogappa Aug 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 70 additions & 1 deletion cli/cmd/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ func sync(cmd *cobra.Command, args []string) error {

sources := specReader.Sources
destinations := specReader.Destinations
transformers := specReader.Transformers

sourcePluginClients := make(managedplugin.Clients, 0)
defer func() {
if err := sourcePluginClients.Terminate(); err != nil {
Expand Down Expand Up @@ -220,6 +222,43 @@ func sync(cmd *cobra.Command, args []string) error {
destinationPluginClients = append(destinationPluginClients, destPluginClient)
}

transformerPluginClients := make(managedplugin.Clients, 0)
defer func() {
if err := transformerPluginClients.Terminate(); err != nil {
fmt.Println(err)
}
}()
for _, transformer := range transformers {
opts := []managedplugin.Option{
managedplugin.WithLogger(log.Logger),
managedplugin.WithAuthToken(authToken.Value),
managedplugin.WithTeamName(teamName),
managedplugin.WithLicenseFile(licenseFile),
}
if logConsole {
opts = append(opts, managedplugin.WithNoProgress())
}
if cqDir != "" {
opts = append(opts, managedplugin.WithDirectory(cqDir))
}
if disableSentry {
opts = append(opts, managedplugin.WithNoSentry())
}

cfg := managedplugin.Config{
Name: transformer.Name,
Registry: SpecRegistryToPlugin(transformer.Registry),
Version: transformer.Version,
Path: transformer.Path,
DockerAuth: transformer.DockerRegistryAuthToken,
}
transPluginClient, err := managedplugin.NewClient(ctx, managedplugin.PluginTransformer, cfg, opts...)
if err != nil {
return enrichClientError(managedplugin.Clients{}, []bool{transformer.RegistryInferred()}, err)
}
transformerPluginClients = append(transformerPluginClients, transPluginClient)
}

for _, source := range sources {
cl := sourcePluginClients.ClientByName(source.Name)
versions, err := cl.Versions(ctx)
Expand All @@ -230,12 +269,26 @@ func sync(cmd *cobra.Command, args []string) error {

var destinationClientsForSource []*managedplugin.Client
var destinationForSourceSpec []specs.Destination
var transformerClientsForDestination = map[string][]*managedplugin.Client{}
var transformerForDestinationSpec = map[string][]specs.Transformer{}
var backendClientForSource *managedplugin.Client
var destinationForSourceBackendSpec *specs.Destination
for _, destination := range destinations {
if slices.Contains(source.Destinations, destination.Name) {
destinationClientsForSource = append(destinationClientsForSource, destinationPluginClients.ClientByName(destination.Name))
destinationForSourceSpec = append(destinationForSourceSpec, *destination)

// Each destination defines their own transformers
ts := []*managedplugin.Client{}
tsSpecs := []specs.Transformer{}
for _, transformer := range transformers {
if slices.Contains(destination.Transformers, transformer.Name) {
ts = append(ts, transformerPluginClients.ClientByName(transformer.Name))
tsSpecs = append(tsSpecs, *transformer)
}
}
transformerClientsForDestination[destination.Name] = ts
transformerForDestinationSpec[destination.Name] = tsSpecs
continue
}

Expand Down Expand Up @@ -264,6 +317,12 @@ func sync(cmd *cobra.Command, args []string) error {
for field, msg := range destWarnings {
log.Warn().Str("destination", destination.Name()).Str("field", field).Msg(msg)
}
for _, transformer := range transformerClientsForDestination[destination.Name()] {
transformerWarnings := specReader.GetTransformerWarningsByName(source.Name)
for field, msg := range transformerWarnings {
log.Warn().Str("transformer", transformer.Name()).Str("field", field).Msg(msg)
}
}
}

src := v3source{
Expand All @@ -277,6 +336,16 @@ func sync(cmd *cobra.Command, args []string) error {
spec: destinationForSourceSpec[i],
})
}
transfs := map[string][]v3transformer{}
for destinationName, transformerClients := range transformerClientsForDestination {
for i, transformer := range transformerClients {
transfs[destinationName] = append(transfs[destinationName], v3transformer{
client: transformer,
spec: transformerForDestinationSpec[destinationName][i],
})
}
}

var backend *v3destination
if backendClientForSource != nil && destinationForSourceBackendSpec != nil {
backend = &v3destination{
Expand All @@ -290,7 +359,7 @@ func sync(cmd *cobra.Command, args []string) error {
return err
}

if err := syncConnectionV3(ctx, src, dests, backend, invocationUUID.String(), noMigrate, summaryLocation); err != nil {
if err := syncConnectionV3(ctx, src, dests, transfs, backend, invocationUUID.String(), noMigrate, summaryLocation); err != nil {
return fmt.Errorf("failed to sync v3 source %s: %w", cl.Name(), err)
}

Expand Down
Loading