diff --git a/cli/cmd/sync_v3.go b/cli/cmd/sync_v3.go index 0af5da45efcc1b..a9a6d0cab3f8de 100644 --- a/cli/cmd/sync_v3.go +++ b/cli/cmd/sync_v3.go @@ -15,6 +15,7 @@ import ( "github.com/cloudquery/cloudquery/cli/internal/analytics" "github.com/cloudquery/cloudquery/cli/internal/api" "github.com/cloudquery/cloudquery/cli/internal/specs/v0" + "github.com/cloudquery/cloudquery/cli/internal/tablenamechanger" "github.com/cloudquery/cloudquery/cli/internal/transformer" "github.com/cloudquery/cloudquery/cli/internal/transformerpipeline" "github.com/cloudquery/plugin-pb-go/managedplugin" @@ -330,6 +331,12 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des defer remoteProgressReporter.Cancel() } + // Transformers can change table names. We need to keep track of table name changes + // in case we do things that depend on table names. + // + // Note that transformers run per-destination, so we need to keep track of table name changes per-destination. + tableNameChanger := tablenamechanger.New(destinationSpecs) + // Note: we want to stop this errorgroup if ctx is cancelled, but we don't want to cancel ctx if gctx is cancelled. // gctx is always cancelled when the errorgroup returns, and this isn't necessarily an error. eg, gctx := errgroup.WithContext(ctx) @@ -412,11 +419,14 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des } case *plugin.Sync_Response_DeleteRecord: for i := range destinationsPbClients { + // Table name might have changed due to a transformation. + tableName := tableNameChanger.UpdateTableName(destinationSpecs[i].Name, m.DeleteRecord.TableName) + wr := &plugin.Write_Request{} // Transformations aren't required here because DeleteRecord is only in V3 wr.Message = &plugin.Write_Request_DeleteRecord{ DeleteRecord: &plugin.Write_MessageDeleteRecord{ - TableName: m.DeleteRecord.TableName, + TableName: tableName, TableRelations: m.DeleteRecord.TableRelations, WhereClause: m.DeleteRecord.WhereClause, }, @@ -460,6 +470,11 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des transformedSchemaBytes = resp.Schema } + // Table name might have changed due to a transformation. + if err := tableNameChanger.LearnTableNameChange(destinationName, table.Name, transformedSchemaBytes); err != nil { + return err + } + wr := &plugin.Write_Request{} wr.Message = &plugin.Write_Request_MigrateTable{ MigrateTable: &plugin.Write_MessageMigrateTable{ @@ -526,7 +541,9 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des for i := range destinationsClients { if destinationSpecs[i].WriteMode == specs.WriteModeOverwriteDeleteStale { - if err := deleteStale(writeClients[i], tablesForDeleteStale, sourceName, syncTime); err != nil { + // Table names might have changed due to transformers + updatedTablesForDeleteStale := tableNameChanger.UpdateTableNames(destinationSpecs[i].Name, tablesForDeleteStale) + if err := deleteStale(writeClients[i], updatedTablesForDeleteStale, sourceName, syncTime); err != nil { return err } } diff --git a/cli/internal/tablenamechanger/table_name_changer.go b/cli/internal/tablenamechanger/table_name_changer.go new file mode 100644 index 00000000000000..f8f27d1022ae04 --- /dev/null +++ b/cli/internal/tablenamechanger/table_name_changer.go @@ -0,0 +1,62 @@ +package tablenamechanger + +import ( + "github.com/cloudquery/cloudquery/cli/internal/specs/v0" + "github.com/cloudquery/plugin-pb-go/pb/plugin/v3" + "github.com/cloudquery/plugin-sdk/v4/schema" +) + +// TableNameChanger manages table name changes due to transformers, on a per-destination basis. +// It can learn about table name changes and apply those changes on further steps. +type TableNameChanger struct { + tableNameChanges map[string]map[string]string +} + +func New(destinationSpecs []specs.Destination) *TableNameChanger { + tnc := &TableNameChanger{ + tableNameChanges: make(map[string]map[string]string), + } + for _, destinationSpec := range destinationSpecs { + tnc.tableNameChanges[destinationSpec.Name] = make(map[string]string) + } + return tnc +} + +func (c TableNameChanger) UpdateTableNames(destinationName string, tables map[string]bool) map[string]bool { + newTables := make(map[string]bool, len(tables)) + for oldTableName := range tables { + if newTableName, ok := c.tableNameChanges[destinationName][oldTableName]; ok { + delete(tables, oldTableName) + newTables[newTableName] = true + } else { + newTables[oldTableName] = true + } + } + return newTables +} + +func (c TableNameChanger) UpdateTableName(destinationName string, oldTableName string) string { + if newTableName, ok := c.tableNameChanges[destinationName][oldTableName]; ok { + return newTableName + } + return oldTableName +} + +func (c *TableNameChanger) LearnTableNameChange(destinationName, oldTableName string, schemaBytes []byte) error { + // Implicit assumption that table name changes are deterministic. + // Therefore, once we learned about a table name change, we don't need to learn it again. + if c.tableNameChanges[destinationName][oldTableName] != "" { + return nil + } + + sc, err := plugin.NewSchemaFromBytes(schemaBytes) + if err != nil { + return err + } + newTableName, ok := sc.Metadata().GetValue(schema.MetadataTableName) + if !ok { + return nil // this would be an error, but let it fail at a more relevant step + } + c.tableNameChanges[destinationName][oldTableName] = newTableName + return nil +} diff --git a/cli/internal/tablenamechanger/table_name_changer_test.go b/cli/internal/tablenamechanger/table_name_changer_test.go new file mode 100644 index 00000000000000..e6acc32ac08ed4 --- /dev/null +++ b/cli/internal/tablenamechanger/table_name_changer_test.go @@ -0,0 +1,149 @@ +package tablenamechanger + +import ( + "testing" + + "github.com/apache/arrow/go/v17/arrow" + "github.com/cloudquery/cloudquery/cli/internal/specs/v0" + "github.com/cloudquery/plugin-pb-go/pb/plugin/v3" + "github.com/cloudquery/plugin-sdk/v4/schema" + "github.com/stretchr/testify/require" +) + +func TestTableNameChanger(t *testing.T) { + destinationSpecs := []specs.Destination{ + {Metadata: specs.Metadata{Name: "dest1"}}, + {Metadata: specs.Metadata{Name: "dest2"}}, + } + + t.Run("UpdateTableNames", func(t *testing.T) { + tests := []struct { + name string + destinationName string + tables map[string]bool + expectedNewTables map[string]bool + }{ + { + name: "NoChanges", + destinationName: "dest1", + tables: map[string]bool{"table1": true, "table2": true}, + expectedNewTables: map[string]bool{"table1": true, "table2": true}, + }, + { + name: "WithChanges", + destinationName: "dest1", + tables: map[string]bool{"old_table1": true, "table2": true}, + expectedNewTables: map[string]bool{"new_table1": true, "table2": true}, + }, + } + + tnc := New(destinationSpecs) + err := tnc.LearnTableNameChange("dest1", "old_table1", createSchemaBytes(t, "new_table1")) + require.NoError(t, err) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + newTables := tnc.UpdateTableNames(tt.destinationName, tt.tables) + require.Equal(t, tt.expectedNewTables, newTables) + }) + } + }) + + t.Run("UpdateTableName", func(t *testing.T) { + tests := []struct { + name string + destinationName string + oldTableName string + expectedNewName string + }{ + { + name: "NoChange", + destinationName: "dest1", + oldTableName: "table1", + expectedNewName: "table1", + }, + { + name: "WithChange", + destinationName: "dest1", + oldTableName: "old_table1", + expectedNewName: "new_table1", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tnc := New(destinationSpecs) + err := tnc.LearnTableNameChange("dest1", "old_table1", createSchemaBytes(t, "new_table1")) + require.NoError(t, err) + + newName := tnc.UpdateTableName(tt.destinationName, tt.oldTableName) + require.Equal(t, tt.expectedNewName, newName) + }) + } + }) + + t.Run("LearnTableNameChange", func(t *testing.T) { + tests := []struct { + name string + learnDestName string + learnOldTableName string + learnNewTableName string + updateDestName string + updateOldTableName string + updateNewTableName string + }{ + { + name: "ValidChange", + learnDestName: "dest1", + learnOldTableName: "old_table", + learnNewTableName: "new_table", + updateDestName: "dest1", + updateOldTableName: "old_table", + updateNewTableName: "new_table", + }, + { + name: "NoChangeOnSameDestination", + learnDestName: "dest1", + learnOldTableName: "old_table", + learnNewTableName: "old_table", + updateDestName: "dest1", + updateOldTableName: "old_table", + updateNewTableName: "old_table", + }, + { + name: "ValidChangeButDoesntUpdateOnDifferentDestination", + learnDestName: "dest1", + learnOldTableName: "old_table", + learnNewTableName: "new_table", + updateDestName: "dest2", + updateOldTableName: "old_table", + updateNewTableName: "old_table", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tnc := New(destinationSpecs) + + err := tnc.LearnTableNameChange(tt.learnDestName, tt.learnOldTableName, createSchemaBytes(t, tt.learnNewTableName)) + require.NoError(t, err) + newName := tnc.UpdateTableName(tt.updateDestName, tt.updateOldTableName) + require.Equal(t, tt.updateNewTableName, newName) + }) + } + }) +} + +func createSchemaBytes(t *testing.T, tableName string) []byte { + md := arrow.NewMetadata([]string{schema.MetadataTableName}, []string{tableName}) + sc := arrow.NewSchema( + []arrow.Field{ + {Name: "col1", Type: arrow.BinaryTypes.String, Nullable: true}, + {Name: "col2", Type: arrow.BinaryTypes.String, Nullable: true}, + }, + &md, + ) + bytes, err := plugin.SchemaToBytes(sc) + require.NoError(t, err) + return bytes +} diff --git a/plugins/transformer/basic/client/recordupdater/record_updater.go b/plugins/transformer/basic/client/recordupdater/record_updater.go index e02e3af75a04e1..9627dd2c8ec33c 100644 --- a/plugins/transformer/basic/client/recordupdater/record_updater.go +++ b/plugins/transformer/basic/client/recordupdater/record_updater.go @@ -96,6 +96,15 @@ func (r *RecordUpdater) ObfuscateColumns(columnNames []string) (arrow.Record, er return r.record, nil } +func (r *RecordUpdater) ChangeTableName(newTableNamePattern string) (arrow.Record, error) { + newSchema, err := r.schemaUpdater.ChangeTableName(newTableNamePattern) + if err != nil { + return nil, err + } + r.record = array.NewRecord(newSchema, r.record.Columns(), r.record.NumRows()) + return r.record, nil +} + func (r *RecordUpdater) colIndicesByNames(columnNames []string) (map[int]struct{}, error) { colNameMap := make(map[string]struct{}) for _, columnName := range columnNames { diff --git a/plugins/transformer/basic/client/recordupdater/record_updater_test.go b/plugins/transformer/basic/client/recordupdater/record_updater_test.go index e907a9399b69b1..300d02221cecc8 100644 --- a/plugins/transformer/basic/client/recordupdater/record_updater_test.go +++ b/plugins/transformer/basic/client/recordupdater/record_updater_test.go @@ -6,6 +6,7 @@ import ( "github.com/apache/arrow/go/v17/arrow" "github.com/apache/arrow/go/v17/arrow/array" "github.com/apache/arrow/go/v17/arrow/memory" + "github.com/cloudquery/plugin-sdk/v2/schema" "github.com/stretchr/testify/require" ) @@ -53,13 +54,30 @@ func TestObfuscateColumns(t *testing.T) { require.Equal(t, "528e5290f8ff0eb0325f0472b9c1a9ef4fac0b02ff6094b64d9382af4a10444b", updatedRecord.Column(0).(*array.String).Value(1)) } +func TestChangeTableName(t *testing.T) { + record := createTestRecord() + updater := New(record) + + updatedRecord, err := updater.ChangeTableName("cq_sync_{{.OldName}}") + require.NoError(t, err) + + require.Equal(t, int64(2), updatedRecord.NumCols()) + require.Equal(t, int64(2), updatedRecord.NumRows()) + requireAllColsLenMatchRecordsLen(t, updatedRecord) + + newTableName, ok := updatedRecord.Schema().Metadata().GetValue(schema.MetadataTableName) + require.True(t, ok, "Expected table name to be present in metadata") + require.Equal(t, "cq_sync_testTable", newTableName) +} + func createTestRecord() arrow.Record { + md := arrow.NewMetadata([]string{schema.MetadataTableName}, []string{"testTable"}) bld := array.NewRecordBuilder(memory.DefaultAllocator, arrow.NewSchema( []arrow.Field{ {Name: "col1", Type: arrow.BinaryTypes.String}, {Name: "col2", Type: arrow.BinaryTypes.String}, }, - nil, + &md, )) defer bld.Release() diff --git a/plugins/transformer/basic/client/schemaupdater/schema_updater.go b/plugins/transformer/basic/client/schemaupdater/schema_updater.go index 0b89093835a928..b6bcc01ff6692f 100644 --- a/plugins/transformer/basic/client/schemaupdater/schema_updater.go +++ b/plugins/transformer/basic/client/schemaupdater/schema_updater.go @@ -1,6 +1,13 @@ package schemaupdater -import "github.com/apache/arrow/go/v17/arrow" +import ( + "bytes" + "errors" + "text/template" + + "github.com/apache/arrow/go/v17/arrow" + "github.com/cloudquery/plugin-sdk/v2/schema" +) // SchemaUpdater takes an `arrow.Schema` and knows how to make simple subsequent changes to it. // It doesn't know which table it belongs to or if the changes make sense. @@ -8,8 +15,8 @@ type SchemaUpdater struct { schema *arrow.Schema } -func New(schema *arrow.Schema) *SchemaUpdater { - return &SchemaUpdater{schema: schema} +func New(sc *arrow.Schema) *SchemaUpdater { + return &SchemaUpdater{schema: sc} } func (s *SchemaUpdater) RemoveColumnIndices(colIndices map[int]struct{}) *arrow.Schema { @@ -35,3 +42,32 @@ func (s *SchemaUpdater) AddStringColumnAtPos(columnName string, zeroIndexedPosit arrow.Field{Name: columnName, Type: arrow.BinaryTypes.String, Nullable: isNullable}, ) } + +func (s *SchemaUpdater) ChangeTableName(newTableNamePattern string) (*arrow.Schema, error) { + existingMetadata := s.schema.Metadata() + tableName, ok := existingMetadata.GetValue(schema.MetadataTableName) + if !ok { + return nil, errors.New("table name not found in record's metadata") + } + + type tpl struct { + OldName string + } + + t, err := template.New("table_name").Parse(newTableNamePattern) + if err != nil { + return nil, err + } + + var tplBuf bytes.Buffer + if err := t.Execute(&tplBuf, tpl{OldName: tableName}); err != nil { + return nil, err + } + + newName := tplBuf.String() + + m := existingMetadata.ToMap() + m[schema.MetadataTableName] = newName + newMetadata := arrow.MetadataFrom(m) + return arrow.NewSchema(s.schema.Fields(), &newMetadata), nil +} diff --git a/plugins/transformer/basic/client/schemaupdater/schemaupdater_test.go b/plugins/transformer/basic/client/schemaupdater/schemaupdater_test.go index 107983606b200d..7dea8af4ae8f0f 100644 --- a/plugins/transformer/basic/client/schemaupdater/schemaupdater_test.go +++ b/plugins/transformer/basic/client/schemaupdater/schemaupdater_test.go @@ -4,12 +4,13 @@ import ( "testing" "github.com/apache/arrow/go/v17/arrow" + "github.com/cloudquery/plugin-sdk/v2/schema" "github.com/stretchr/testify/require" ) func TestRemoveColumnIndices(t *testing.T) { - schema := createTestSchema() - updater := New(schema) + sc := createTestSchema() + updater := New(sc) colIndices := map[int]struct{}{0: {}} updatedSchema := updater.RemoveColumnIndices(colIndices) @@ -19,8 +20,8 @@ func TestRemoveColumnIndices(t *testing.T) { } func TestAddStringColumnAtPos(t *testing.T) { - schema := createTestSchema() - updater := New(schema) + sc := createTestSchema() + updater := New(sc) updatedSchema, err := updater.AddStringColumnAtPos("col3", 1, false) require.NoError(t, err) @@ -31,8 +32,8 @@ func TestAddStringColumnAtPos(t *testing.T) { } func TestAddStringColumnAtEnd(t *testing.T) { - schema := createTestSchema() - updater := New(schema) + sc := createTestSchema() + updater := New(sc) updatedSchema, err := updater.AddStringColumnAtPos("col3", -1, true) require.NoError(t, err) @@ -46,28 +47,44 @@ func TestTransformMaintainsMetadata(t *testing.T) { md1 := arrow.NewMetadata([]string{"key1", "key2"}, []string{"value1", "value2"}) md2 := arrow.NewMetadata([]string{"key3", "key4"}, []string{"value3", "value4"}) md3 := arrow.NewMetadata([]string{"key5", "key6"}, []string{"value5", "value6"}) - schema := arrow.NewSchema( + sc := arrow.NewSchema( []arrow.Field{ {Name: "col1", Type: arrow.BinaryTypes.String, Nullable: true, Metadata: md1}, {Name: "col2", Type: arrow.BinaryTypes.String, Nullable: true, Metadata: md2}, }, &md3, ) - updater := New(schema) + updater := New(sc) updatedSchema, err := updater.AddStringColumnAtPos("col3", -1, true) require.NoError(t, err) - require.Equal(t, schema.Metadata(), updatedSchema.Metadata(), "Expected metadata to be retained") - require.Equal(t, schema.Field(0).Metadata, updatedSchema.Field(0).Metadata, "Expected metadata to be retained") - require.Equal(t, schema.Field(1).Metadata, updatedSchema.Field(1).Metadata, "Expected metadata to be retained") + require.Equal(t, sc.Metadata(), updatedSchema.Metadata(), "Expected metadata to be retained") + require.Equal(t, sc.Field(0).Metadata, updatedSchema.Field(0).Metadata, "Expected metadata to be retained") + require.Equal(t, sc.Field(1).Metadata, updatedSchema.Field(1).Metadata, "Expected metadata to be retained") +} + +func TestChangeTableName(t *testing.T) { + testSchema := createTestSchema() + updater := New(testSchema) + + updatedSchema, err := updater.ChangeTableName("cq_sync_{{.OldName}}") + require.NoError(t, err) + + newTableName, ok := updatedSchema.Metadata().GetValue(schema.MetadataTableName) + require.True(t, ok, "Expected table name to be present in metadata") + require.Equal(t, "cq_sync_testTable", newTableName, "Expected table name to be 'cq_sync_testTable'") + require.Equal(t, testSchema.NumFields(), updatedSchema.NumFields(), "Expected number of fields to remain the same") + require.Equal(t, testSchema.Field(0).Name, updatedSchema.Field(0).Name, "Expected field name to remain the same") + require.Equal(t, testSchema.Field(1).Name, updatedSchema.Field(1).Name, "Expected field name to remain the same") } func createTestSchema() *arrow.Schema { + md := arrow.NewMetadata([]string{schema.MetadataTableName}, []string{"testTable"}) return arrow.NewSchema( []arrow.Field{ {Name: "col1", Type: arrow.BinaryTypes.String, Nullable: true}, {Name: "col2", Type: arrow.BinaryTypes.String, Nullable: true}, }, - nil, + &md, ) } diff --git a/plugins/transformer/basic/client/spec/spec.go b/plugins/transformer/basic/client/spec/spec.go index 8a40934abb37f9..1d103048c041c8 100644 --- a/plugins/transformer/basic/client/spec/spec.go +++ b/plugins/transformer/basic/client/spec/spec.go @@ -3,12 +3,14 @@ package spec import ( "errors" "fmt" + "text/template" ) const ( KindRemoveColumns = "remove_columns" KindAddColumn = "add_column" KindObfuscateColumns = "obfuscate_columns" + KindChangeTableNames = "change_table_names" ) type TransformationSpec struct { @@ -17,6 +19,9 @@ type TransformationSpec struct { Columns []string `json:"columns"` Name string `json:"name"` Value string `json:"value"` + + // For change_table_names transformation + NewTableNameTemplate string `json:"new_table_name_template"` } type Spec struct { @@ -37,37 +42,43 @@ func (s *Spec) Validate() error { switch t.Kind { case KindRemoveColumns: if len(t.Columns) == 0 { - err = errors.Join(err, fmt.Errorf("'columns' field must be specified for remove_columns transformation")) - } - if t.Name != "" { - err = errors.Join(err, fmt.Errorf("'name' field must not be specified for remove_columns transformation")) + err = errors.Join(err, fmt.Errorf("'%s' field must be specified for %s transformation", "columns", t.Kind)) } - if t.Value != "" { - err = errors.Join(err, fmt.Errorf("'value' field must not be specified for remove_columns transformation")) + if t.Name != "" || t.Value != "" || t.NewTableNameTemplate != "" { + err = errors.Join(err, fmt.Errorf("name/value/new_table_name_template fields must not be specified for %s transformation", t.Kind)) } case KindAddColumn: - if t.Name == "" { - err = errors.Join(err, fmt.Errorf("'name' field must be specified for add_column transformation")) + if t.Name == "" || t.Value == "" { + err = errors.Join(err, fmt.Errorf("'%s' and '%s' fields must be specified for %s transformation", "name", "value", t.Kind)) } - if t.Value == "" { - err = errors.Join(err, fmt.Errorf("'value' field must be specified for add_column transformation")) - } - if len(t.Columns) > 0 { - err = errors.Join(err, fmt.Errorf("'columns' field must not be specified for add_column transformation")) + if t.NewTableNameTemplate != "" { + err = errors.Join(err, fmt.Errorf("new_table_name_template field must not be specified for %s transformation", t.Kind)) } case KindObfuscateColumns: if len(t.Columns) == 0 { - err = errors.Join(err, fmt.Errorf("'columns' field must be specified for obfuscate_columns transformation")) + err = errors.Join(err, fmt.Errorf("'%s' field must be specified for %s transformation", "columns", t.Kind)) + } + if t.Name != "" || t.Value != "" || t.NewTableNameTemplate != "" { + err = errors.Join(err, fmt.Errorf("name/value/new_table_name_template fields must not be specified for %s transformation", t.Kind)) } - if t.Name != "" { - err = errors.Join(err, fmt.Errorf("'name' field must not be specified for obfuscate_columns transformation")) + case KindChangeTableNames: + if t.NewTableNameTemplate == "" { + err = errors.Join(err, fmt.Errorf("'%s' field must be specified for %s transformation", "new_table_name_template", t.Kind)) } - if t.Value != "" { - err = errors.Join(err, fmt.Errorf("'value' field must not be specified for obfuscate_columns transformation")) + if t.Name != "" || t.Value != "" || len(t.Columns) > 0 { + err = errors.Join(err, fmt.Errorf("name/value/columns fields must not be specified for %s transformation", t.Kind)) } default: - err = errors.Join(err, fmt.Errorf("invalid transformation kind: %s; must be one of: remove_columns, add_column, obfuscate_columns", t.Kind)) + err = errors.Join(err, fmt.Errorf("unknown transformation kind: %s", t.Kind)) + } + + // Non-trivial validations + if t.Kind == KindChangeTableNames { + if _, tplErr := template.New("table_name").Parse(t.NewTableNameTemplate); err != nil { + err = errors.Join(err, fmt.Errorf("error parsing new_table_name_template: %v", tplErr)) + } } } + return err } diff --git a/plugins/transformer/basic/client/transformers/transformers.go b/plugins/transformer/basic/client/transformers/transformers.go index a1f36b9f0fbf1b..226efae9297203 100644 --- a/plugins/transformer/basic/client/transformers/transformers.go +++ b/plugins/transformer/basic/client/transformers/transformers.go @@ -30,6 +30,8 @@ func NewFromSpec(sp spec.TransformationSpec) (*Transformer, error) { tr.fn = RemoveColumns(sp.Columns) case spec.KindObfuscateColumns: tr.fn = ObfuscateColumns(sp.Columns) + case spec.KindChangeTableNames: + tr.fn = ChangeTableName(sp.NewTableNameTemplate) default: return nil, fmt.Errorf("unknown transformation kind: %s", sp.Kind) } @@ -88,6 +90,12 @@ func ObfuscateColumns(columnNames []string) TransformationFn { } } +func ChangeTableName(newTableNamePattern string) TransformationFn { + return func(record arrow.Record) (arrow.Record, error) { + return recordupdater.New(record).ChangeTableName(newTableNamePattern) + } +} + func transformSchema(tf TransformationFn) SchemaTransformationFn { return func(schema *arrow.Schema) (*arrow.Schema, error) { newRecord, err := tf(makeEmptyRecord(schema)) diff --git a/plugins/transformer/basic/client/transformers/transformers_test.go b/plugins/transformer/basic/client/transformers/transformers_test.go index 8008a29d497e2c..5694fa02d2b033 100644 --- a/plugins/transformer/basic/client/transformers/transformers_test.go +++ b/plugins/transformer/basic/client/transformers/transformers_test.go @@ -42,6 +42,14 @@ func TestNewFromSpec(t *testing.T) { }, wantErr: false, }, + { + name: "ChangeTableNames", + spec: spec.TransformationSpec{ + Kind: spec.KindChangeTableNames, + NewTableNameTemplate: "cq_sync_{{.OldName}}", + }, + wantErr: false, + }, { name: "InvalidKind", spec: spec.TransformationSpec{ @@ -112,6 +120,20 @@ func TestTransform(t *testing.T) { require.Equal(t, int64(2), record.NumRows(), "Expected 2 rows") }, }, + { + name: "ChangeTableName", + spec: spec.TransformationSpec{ + Kind: spec.KindChangeTableNames, + NewTableNameTemplate: "cq_sync_{{.OldName}}", + Tables: []string{"*"}, + }, + record: createTestRecord(), + validate: func(t *testing.T, record arrow.Record) { + newTableName, ok := record.Schema().Metadata().GetValue(schema.MetadataTableName) + require.True(t, ok, "Expected table name to be present in metadata") + require.Equal(t, "cq_sync_table1", newTableName) + }, + }, } for _, tt := range tests { diff --git a/plugins/transformer/basic/docs/_configuration.md b/plugins/transformer/basic/docs/_configuration.md index d210e04429ac8d..cfcedb4998755f 100644 --- a/plugins/transformer/basic/docs/_configuration.md +++ b/plugins/transformer/basic/docs/_configuration.md @@ -16,4 +16,7 @@ spec: tables: ["xkcd_comics"] name: "source" value: "xkcd" + - kind: change_table_names + tables: ["*"] + new_table_name_template: "cq_sync_{{.OldName}}" ``` \ No newline at end of file diff --git a/plugins/transformer/basic/docs/overview.md b/plugins/transformer/basic/docs/overview.md index 26da219d52a7b6..f85a38d2b1a4a0 100644 --- a/plugins/transformer/basic/docs/overview.md +++ b/plugins/transformer/basic/docs/overview.md @@ -3,6 +3,7 @@ This CloudQuery transformer plugin provides basic transformation capabilities: - Removing columns - Adding literal string columns - Obfuscating string columns +- Renaming tables using a name template (use `{{.OldName}}` to refer to the original name, see example below) ## Configuration @@ -28,4 +29,6 @@ The `migrate_mode: forced` setting might make sense if you plan on modifying the Then, add your transformer spec. Here's an example that transforms the XKCD source table: -:configuration \ No newline at end of file +:configuration + +Note: transformations are applied sequentially. If you rename tables, the table matcher configuration of subsequent transformations will need to be updated to the new names. \ No newline at end of file