From 6ea7afc0f6ef5e93f95659205c17ef39699afb94 Mon Sep 17 00:00:00 2001 From: Mariano Gappa Date: Fri, 2 Aug 2024 14:48:42 +0100 Subject: [PATCH 1/6] feat: Support table name changes on basic transformer. --- cli/cmd/sync_v3.go | 21 ++- .../tablenamechanger/table_name_changer.go | 62 ++++++++ .../table_name_changer_test.go | 149 ++++++++++++++++++ .../client/recordupdater/record_updater.go | 9 ++ .../recordupdater/record_updater_test.go | 20 ++- .../client/schemaupdater/schema_updater.go | 38 ++++- .../schemaupdater/schemaupdater_test.go | 19 ++- plugins/transformer/basic/client/spec/spec.go | 73 ++++++--- .../basic/client/transformers/transformers.go | 8 + .../client/transformers/transformers_test.go | 22 +++ .../transformer/basic/docs/_configuration.md | 3 + plugins/transformer/basic/docs/overview.md | 5 +- 12 files changed, 399 insertions(+), 30 deletions(-) create mode 100644 cli/internal/tablenamechanger/table_name_changer.go create mode 100644 cli/internal/tablenamechanger/table_name_changer_test.go diff --git a/cli/cmd/sync_v3.go b/cli/cmd/sync_v3.go index 0af5da45efcc1b..a9a6d0cab3f8de 100644 --- a/cli/cmd/sync_v3.go +++ b/cli/cmd/sync_v3.go @@ -15,6 +15,7 @@ import ( "github.com/cloudquery/cloudquery/cli/internal/analytics" "github.com/cloudquery/cloudquery/cli/internal/api" "github.com/cloudquery/cloudquery/cli/internal/specs/v0" + "github.com/cloudquery/cloudquery/cli/internal/tablenamechanger" "github.com/cloudquery/cloudquery/cli/internal/transformer" "github.com/cloudquery/cloudquery/cli/internal/transformerpipeline" "github.com/cloudquery/plugin-pb-go/managedplugin" @@ -330,6 +331,12 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des defer remoteProgressReporter.Cancel() } + // Transformers can change table names. We need to keep track of table name changes + // in case we do things that depend on table names. + // + // Note that transformers run per-destination, so we need to keep track of table name changes per-destination. + tableNameChanger := tablenamechanger.New(destinationSpecs) + // Note: we want to stop this errorgroup if ctx is cancelled, but we don't want to cancel ctx if gctx is cancelled. // gctx is always cancelled when the errorgroup returns, and this isn't necessarily an error. eg, gctx := errgroup.WithContext(ctx) @@ -412,11 +419,14 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des } case *plugin.Sync_Response_DeleteRecord: for i := range destinationsPbClients { + // Table name might have changed due to a transformation. + tableName := tableNameChanger.UpdateTableName(destinationSpecs[i].Name, m.DeleteRecord.TableName) + wr := &plugin.Write_Request{} // Transformations aren't required here because DeleteRecord is only in V3 wr.Message = &plugin.Write_Request_DeleteRecord{ DeleteRecord: &plugin.Write_MessageDeleteRecord{ - TableName: m.DeleteRecord.TableName, + TableName: tableName, TableRelations: m.DeleteRecord.TableRelations, WhereClause: m.DeleteRecord.WhereClause, }, @@ -460,6 +470,11 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des transformedSchemaBytes = resp.Schema } + // Table name might have changed due to a transformation. + if err := tableNameChanger.LearnTableNameChange(destinationName, table.Name, transformedSchemaBytes); err != nil { + return err + } + wr := &plugin.Write_Request{} wr.Message = &plugin.Write_Request_MigrateTable{ MigrateTable: &plugin.Write_MessageMigrateTable{ @@ -526,7 +541,9 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des for i := range destinationsClients { if destinationSpecs[i].WriteMode == specs.WriteModeOverwriteDeleteStale { - if err := deleteStale(writeClients[i], tablesForDeleteStale, sourceName, syncTime); err != nil { + // Table names might have changed due to transformers + updatedTablesForDeleteStale := tableNameChanger.UpdateTableNames(destinationSpecs[i].Name, tablesForDeleteStale) + if err := deleteStale(writeClients[i], updatedTablesForDeleteStale, sourceName, syncTime); err != nil { return err } } diff --git a/cli/internal/tablenamechanger/table_name_changer.go b/cli/internal/tablenamechanger/table_name_changer.go new file mode 100644 index 00000000000000..f8f27d1022ae04 --- /dev/null +++ b/cli/internal/tablenamechanger/table_name_changer.go @@ -0,0 +1,62 @@ +package tablenamechanger + +import ( + "github.com/cloudquery/cloudquery/cli/internal/specs/v0" + "github.com/cloudquery/plugin-pb-go/pb/plugin/v3" + "github.com/cloudquery/plugin-sdk/v4/schema" +) + +// TableNameChanger manages table name changes due to transformers, on a per-destination basis. +// It can learn about table name changes and apply those changes on further steps. +type TableNameChanger struct { + tableNameChanges map[string]map[string]string +} + +func New(destinationSpecs []specs.Destination) *TableNameChanger { + tnc := &TableNameChanger{ + tableNameChanges: make(map[string]map[string]string), + } + for _, destinationSpec := range destinationSpecs { + tnc.tableNameChanges[destinationSpec.Name] = make(map[string]string) + } + return tnc +} + +func (c TableNameChanger) UpdateTableNames(destinationName string, tables map[string]bool) map[string]bool { + newTables := make(map[string]bool, len(tables)) + for oldTableName := range tables { + if newTableName, ok := c.tableNameChanges[destinationName][oldTableName]; ok { + delete(tables, oldTableName) + newTables[newTableName] = true + } else { + newTables[oldTableName] = true + } + } + return newTables +} + +func (c TableNameChanger) UpdateTableName(destinationName string, oldTableName string) string { + if newTableName, ok := c.tableNameChanges[destinationName][oldTableName]; ok { + return newTableName + } + return oldTableName +} + +func (c *TableNameChanger) LearnTableNameChange(destinationName, oldTableName string, schemaBytes []byte) error { + // Implicit assumption that table name changes are deterministic. + // Therefore, once we learned about a table name change, we don't need to learn it again. + if c.tableNameChanges[destinationName][oldTableName] != "" { + return nil + } + + sc, err := plugin.NewSchemaFromBytes(schemaBytes) + if err != nil { + return err + } + newTableName, ok := sc.Metadata().GetValue(schema.MetadataTableName) + if !ok { + return nil // this would be an error, but let it fail at a more relevant step + } + c.tableNameChanges[destinationName][oldTableName] = newTableName + return nil +} diff --git a/cli/internal/tablenamechanger/table_name_changer_test.go b/cli/internal/tablenamechanger/table_name_changer_test.go new file mode 100644 index 00000000000000..e6acc32ac08ed4 --- /dev/null +++ b/cli/internal/tablenamechanger/table_name_changer_test.go @@ -0,0 +1,149 @@ +package tablenamechanger + +import ( + "testing" + + "github.com/apache/arrow/go/v17/arrow" + "github.com/cloudquery/cloudquery/cli/internal/specs/v0" + "github.com/cloudquery/plugin-pb-go/pb/plugin/v3" + "github.com/cloudquery/plugin-sdk/v4/schema" + "github.com/stretchr/testify/require" +) + +func TestTableNameChanger(t *testing.T) { + destinationSpecs := []specs.Destination{ + {Metadata: specs.Metadata{Name: "dest1"}}, + {Metadata: specs.Metadata{Name: "dest2"}}, + } + + t.Run("UpdateTableNames", func(t *testing.T) { + tests := []struct { + name string + destinationName string + tables map[string]bool + expectedNewTables map[string]bool + }{ + { + name: "NoChanges", + destinationName: "dest1", + tables: map[string]bool{"table1": true, "table2": true}, + expectedNewTables: map[string]bool{"table1": true, "table2": true}, + }, + { + name: "WithChanges", + destinationName: "dest1", + tables: map[string]bool{"old_table1": true, "table2": true}, + expectedNewTables: map[string]bool{"new_table1": true, "table2": true}, + }, + } + + tnc := New(destinationSpecs) + err := tnc.LearnTableNameChange("dest1", "old_table1", createSchemaBytes(t, "new_table1")) + require.NoError(t, err) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + newTables := tnc.UpdateTableNames(tt.destinationName, tt.tables) + require.Equal(t, tt.expectedNewTables, newTables) + }) + } + }) + + t.Run("UpdateTableName", func(t *testing.T) { + tests := []struct { + name string + destinationName string + oldTableName string + expectedNewName string + }{ + { + name: "NoChange", + destinationName: "dest1", + oldTableName: "table1", + expectedNewName: "table1", + }, + { + name: "WithChange", + destinationName: "dest1", + oldTableName: "old_table1", + expectedNewName: "new_table1", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tnc := New(destinationSpecs) + err := tnc.LearnTableNameChange("dest1", "old_table1", createSchemaBytes(t, "new_table1")) + require.NoError(t, err) + + newName := tnc.UpdateTableName(tt.destinationName, tt.oldTableName) + require.Equal(t, tt.expectedNewName, newName) + }) + } + }) + + t.Run("LearnTableNameChange", func(t *testing.T) { + tests := []struct { + name string + learnDestName string + learnOldTableName string + learnNewTableName string + updateDestName string + updateOldTableName string + updateNewTableName string + }{ + { + name: "ValidChange", + learnDestName: "dest1", + learnOldTableName: "old_table", + learnNewTableName: "new_table", + updateDestName: "dest1", + updateOldTableName: "old_table", + updateNewTableName: "new_table", + }, + { + name: "NoChangeOnSameDestination", + learnDestName: "dest1", + learnOldTableName: "old_table", + learnNewTableName: "old_table", + updateDestName: "dest1", + updateOldTableName: "old_table", + updateNewTableName: "old_table", + }, + { + name: "ValidChangeButDoesntUpdateOnDifferentDestination", + learnDestName: "dest1", + learnOldTableName: "old_table", + learnNewTableName: "new_table", + updateDestName: "dest2", + updateOldTableName: "old_table", + updateNewTableName: "old_table", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tnc := New(destinationSpecs) + + err := tnc.LearnTableNameChange(tt.learnDestName, tt.learnOldTableName, createSchemaBytes(t, tt.learnNewTableName)) + require.NoError(t, err) + newName := tnc.UpdateTableName(tt.updateDestName, tt.updateOldTableName) + require.Equal(t, tt.updateNewTableName, newName) + }) + } + }) +} + +func createSchemaBytes(t *testing.T, tableName string) []byte { + md := arrow.NewMetadata([]string{schema.MetadataTableName}, []string{tableName}) + sc := arrow.NewSchema( + []arrow.Field{ + {Name: "col1", Type: arrow.BinaryTypes.String, Nullable: true}, + {Name: "col2", Type: arrow.BinaryTypes.String, Nullable: true}, + }, + &md, + ) + bytes, err := plugin.SchemaToBytes(sc) + require.NoError(t, err) + return bytes +} diff --git a/plugins/transformer/basic/client/recordupdater/record_updater.go b/plugins/transformer/basic/client/recordupdater/record_updater.go index e02e3af75a04e1..9627dd2c8ec33c 100644 --- a/plugins/transformer/basic/client/recordupdater/record_updater.go +++ b/plugins/transformer/basic/client/recordupdater/record_updater.go @@ -96,6 +96,15 @@ func (r *RecordUpdater) ObfuscateColumns(columnNames []string) (arrow.Record, er return r.record, nil } +func (r *RecordUpdater) ChangeTableName(newTableNamePattern string) (arrow.Record, error) { + newSchema, err := r.schemaUpdater.ChangeTableName(newTableNamePattern) + if err != nil { + return nil, err + } + r.record = array.NewRecord(newSchema, r.record.Columns(), r.record.NumRows()) + return r.record, nil +} + func (r *RecordUpdater) colIndicesByNames(columnNames []string) (map[int]struct{}, error) { colNameMap := make(map[string]struct{}) for _, columnName := range columnNames { diff --git a/plugins/transformer/basic/client/recordupdater/record_updater_test.go b/plugins/transformer/basic/client/recordupdater/record_updater_test.go index e907a9399b69b1..300d02221cecc8 100644 --- a/plugins/transformer/basic/client/recordupdater/record_updater_test.go +++ b/plugins/transformer/basic/client/recordupdater/record_updater_test.go @@ -6,6 +6,7 @@ import ( "github.com/apache/arrow/go/v17/arrow" "github.com/apache/arrow/go/v17/arrow/array" "github.com/apache/arrow/go/v17/arrow/memory" + "github.com/cloudquery/plugin-sdk/v2/schema" "github.com/stretchr/testify/require" ) @@ -53,13 +54,30 @@ func TestObfuscateColumns(t *testing.T) { require.Equal(t, "528e5290f8ff0eb0325f0472b9c1a9ef4fac0b02ff6094b64d9382af4a10444b", updatedRecord.Column(0).(*array.String).Value(1)) } +func TestChangeTableName(t *testing.T) { + record := createTestRecord() + updater := New(record) + + updatedRecord, err := updater.ChangeTableName("cq_sync_{{.OldName}}") + require.NoError(t, err) + + require.Equal(t, int64(2), updatedRecord.NumCols()) + require.Equal(t, int64(2), updatedRecord.NumRows()) + requireAllColsLenMatchRecordsLen(t, updatedRecord) + + newTableName, ok := updatedRecord.Schema().Metadata().GetValue(schema.MetadataTableName) + require.True(t, ok, "Expected table name to be present in metadata") + require.Equal(t, "cq_sync_testTable", newTableName) +} + func createTestRecord() arrow.Record { + md := arrow.NewMetadata([]string{schema.MetadataTableName}, []string{"testTable"}) bld := array.NewRecordBuilder(memory.DefaultAllocator, arrow.NewSchema( []arrow.Field{ {Name: "col1", Type: arrow.BinaryTypes.String}, {Name: "col2", Type: arrow.BinaryTypes.String}, }, - nil, + &md, )) defer bld.Release() diff --git a/plugins/transformer/basic/client/schemaupdater/schema_updater.go b/plugins/transformer/basic/client/schemaupdater/schema_updater.go index 0b89093835a928..0db34867f3fd8e 100644 --- a/plugins/transformer/basic/client/schemaupdater/schema_updater.go +++ b/plugins/transformer/basic/client/schemaupdater/schema_updater.go @@ -1,6 +1,13 @@ package schemaupdater -import "github.com/apache/arrow/go/v17/arrow" +import ( + "bytes" + "errors" + "text/template" + + "github.com/apache/arrow/go/v17/arrow" + "github.com/cloudquery/plugin-sdk/v2/schema" +) // SchemaUpdater takes an `arrow.Schema` and knows how to make simple subsequent changes to it. // It doesn't know which table it belongs to or if the changes make sense. @@ -35,3 +42,32 @@ func (s *SchemaUpdater) AddStringColumnAtPos(columnName string, zeroIndexedPosit arrow.Field{Name: columnName, Type: arrow.BinaryTypes.String, Nullable: isNullable}, ) } + +func (s *SchemaUpdater) ChangeTableName(newTableNamePattern string) (*arrow.Schema, error) { + existingMetadata := s.schema.Metadata() + tableName, ok := existingMetadata.GetValue(schema.MetadataTableName) + if !ok { + return nil, errors.New("table name not found in record's metadata") + } + + type tpl struct { + OldName string + } + + t, err := template.New("table_name").Parse(newTableNamePattern) + if err != nil { + return nil, err + } + + var tplBuf bytes.Buffer + if err := t.Execute(&tplBuf, tpl{OldName: tableName}); err != nil { + return nil, err + } + + newName := tplBuf.String() + + m := existingMetadata.ToMap() + m[schema.MetadataTableName] = newName + newMetadata := arrow.MetadataFrom(m) + return arrow.NewSchema(s.schema.Fields(), &newMetadata), nil +} diff --git a/plugins/transformer/basic/client/schemaupdater/schemaupdater_test.go b/plugins/transformer/basic/client/schemaupdater/schemaupdater_test.go index 107983606b200d..9b4d3fe1fa9c6d 100644 --- a/plugins/transformer/basic/client/schemaupdater/schemaupdater_test.go +++ b/plugins/transformer/basic/client/schemaupdater/schemaupdater_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/apache/arrow/go/v17/arrow" + "github.com/cloudquery/plugin-sdk/v2/schema" "github.com/stretchr/testify/require" ) @@ -62,12 +63,28 @@ func TestTransformMaintainsMetadata(t *testing.T) { require.Equal(t, schema.Field(1).Metadata, updatedSchema.Field(1).Metadata, "Expected metadata to be retained") } +func TestChangeTableName(t *testing.T) { + testSchema := createTestSchema() + updater := New(testSchema) + + updatedSchema, err := updater.ChangeTableName("cq_sync_{{.OldName}}") + require.NoError(t, err) + + newTableName, ok := updatedSchema.Metadata().GetValue(schema.MetadataTableName) + require.True(t, ok, "Expected table name to be present in metadata") + require.Equal(t, "cq_sync_testTable", newTableName, "Expected table name to be 'cq_sync_testTable'") + require.Equal(t, testSchema.NumFields(), updatedSchema.NumFields(), "Expected number of fields to remain the same") + require.Equal(t, testSchema.Field(0).Name, updatedSchema.Field(0).Name, "Expected field name to remain the same") + require.Equal(t, testSchema.Field(1).Name, updatedSchema.Field(1).Name, "Expected field name to remain the same") +} + func createTestSchema() *arrow.Schema { + md := arrow.NewMetadata([]string{schema.MetadataTableName}, []string{"testTable"}) return arrow.NewSchema( []arrow.Field{ {Name: "col1", Type: arrow.BinaryTypes.String, Nullable: true}, {Name: "col2", Type: arrow.BinaryTypes.String, Nullable: true}, }, - nil, + &md, ) } diff --git a/plugins/transformer/basic/client/spec/spec.go b/plugins/transformer/basic/client/spec/spec.go index 8a40934abb37f9..69e899c2e13853 100644 --- a/plugins/transformer/basic/client/spec/spec.go +++ b/plugins/transformer/basic/client/spec/spec.go @@ -3,12 +3,16 @@ package spec import ( "errors" "fmt" + "reflect" + "strings" + "text/template" ) const ( KindRemoveColumns = "remove_columns" KindAddColumn = "add_column" KindObfuscateColumns = "obfuscate_columns" + KindChangeTableNames = "change_table_names" ) type TransformationSpec struct { @@ -17,6 +21,9 @@ type TransformationSpec struct { Columns []string `json:"columns"` Name string `json:"name"` Value string `json:"value"` + + // For change_table_names transformation + NewTableNameTemplate string `json:"new_table_name_template"` } type Spec struct { @@ -31,42 +38,60 @@ func (s *Spec) SetDefaults() { } } +var kindToRequiredFields = map[string]map[string]struct{}{ + KindRemoveColumns: {"Columns": {}}, + KindAddColumn: {"Name": {}, "Value": {}}, + KindObfuscateColumns: {"Columns": {}}, + KindChangeTableNames: {"NewTableNameTemplate": {}}, +} + +var fieldsToCheck = []string{"Columns", "Name", "Value", "FromTableName", "NewTableNameTemplate"} + func (s *Spec) Validate() error { var err error for _, t := range s.TransformationSpecs { - switch t.Kind { - case KindRemoveColumns: - if len(t.Columns) == 0 { - err = errors.Join(err, fmt.Errorf("'columns' field must be specified for remove_columns transformation")) + requiredFields, ok := kindToRequiredFields[t.Kind] + if !ok { + kinds := make([]string, 0, len(kindToRequiredFields)) + for k := range kindToRequiredFields { + kinds = append(kinds, k) } - if t.Name != "" { - err = errors.Join(err, fmt.Errorf("'name' field must not be specified for remove_columns transformation")) + return fmt.Errorf("unknown transformation kind: %s, supported kinds are: %s", t.Kind, strings.Join(kinds, ", ")) + } + for fieldName := range requiredFields { + value := reflect.ValueOf(t) + if value == reflect.Zero(value.Type()) { + panic(fmt.Sprintf("reflect.ValueOf(%v) is zero", t)) // this would be a nil on s.TransformationSpecs } - if t.Value != "" { - err = errors.Join(err, fmt.Errorf("'value' field must not be specified for remove_columns transformation")) + fieldValue := value.FieldByName(fieldName) + if !fieldValue.IsValid() { + panic(fmt.Sprintf("field %s is not valid", fieldName)) // this would be a bug on kindToRequiredFields/fieldsToCheck } - case KindAddColumn: - if t.Name == "" { - err = errors.Join(err, fmt.Errorf("'name' field must be specified for add_column transformation")) + if fieldValue.Kind() == reflect.String && fieldValue.String() == "" { + err = errors.Join(err, fmt.Errorf("'%s' field must be specified for %s transformation", fieldName, t.Kind)) } - if t.Value == "" { - err = errors.Join(err, fmt.Errorf("'value' field must be specified for add_column transformation")) + if fieldValue.Kind() == reflect.Slice && fieldValue.Len() == 0 { + err = errors.Join(err, fmt.Errorf("'%s' field must be specified for %s transformation", fieldName, t.Kind)) } - if len(t.Columns) > 0 { - err = errors.Join(err, fmt.Errorf("'columns' field must not be specified for add_column transformation")) + } + for _, fieldName := range fieldsToCheck { + if _, ok := requiredFields[fieldName]; ok { + continue } - case KindObfuscateColumns: - if len(t.Columns) == 0 { - err = errors.Join(err, fmt.Errorf("'columns' field must be specified for obfuscate_columns transformation")) + fieldValue := reflect.ValueOf(t).FieldByName(fieldName) + if fieldValue.Kind() == reflect.String && fieldValue.String() != "" { + err = errors.Join(err, fmt.Errorf("'%s' field must not be specified for %s transformation", fieldName, t.Kind)) } - if t.Name != "" { - err = errors.Join(err, fmt.Errorf("'name' field must not be specified for obfuscate_columns transformation")) + if fieldValue.Kind() == reflect.Slice && fieldValue.Len() > 0 { + err = errors.Join(err, fmt.Errorf("'%s' field must not be specified for %s transformation", fieldName, t.Kind)) } - if t.Value != "" { - err = errors.Join(err, fmt.Errorf("'value' field must not be specified for obfuscate_columns transformation")) + } + + // Non-trivial validations + if t.Kind == KindChangeTableNames { + if _, tplErr := template.New("table_name").Parse(t.NewTableNameTemplate); err != nil { + err = errors.Join(err, fmt.Errorf("error parsing new_table_name_template: %v", tplErr)) } - default: - err = errors.Join(err, fmt.Errorf("invalid transformation kind: %s; must be one of: remove_columns, add_column, obfuscate_columns", t.Kind)) } } return err diff --git a/plugins/transformer/basic/client/transformers/transformers.go b/plugins/transformer/basic/client/transformers/transformers.go index a1f36b9f0fbf1b..226efae9297203 100644 --- a/plugins/transformer/basic/client/transformers/transformers.go +++ b/plugins/transformer/basic/client/transformers/transformers.go @@ -30,6 +30,8 @@ func NewFromSpec(sp spec.TransformationSpec) (*Transformer, error) { tr.fn = RemoveColumns(sp.Columns) case spec.KindObfuscateColumns: tr.fn = ObfuscateColumns(sp.Columns) + case spec.KindChangeTableNames: + tr.fn = ChangeTableName(sp.NewTableNameTemplate) default: return nil, fmt.Errorf("unknown transformation kind: %s", sp.Kind) } @@ -88,6 +90,12 @@ func ObfuscateColumns(columnNames []string) TransformationFn { } } +func ChangeTableName(newTableNamePattern string) TransformationFn { + return func(record arrow.Record) (arrow.Record, error) { + return recordupdater.New(record).ChangeTableName(newTableNamePattern) + } +} + func transformSchema(tf TransformationFn) SchemaTransformationFn { return func(schema *arrow.Schema) (*arrow.Schema, error) { newRecord, err := tf(makeEmptyRecord(schema)) diff --git a/plugins/transformer/basic/client/transformers/transformers_test.go b/plugins/transformer/basic/client/transformers/transformers_test.go index 8008a29d497e2c..5694fa02d2b033 100644 --- a/plugins/transformer/basic/client/transformers/transformers_test.go +++ b/plugins/transformer/basic/client/transformers/transformers_test.go @@ -42,6 +42,14 @@ func TestNewFromSpec(t *testing.T) { }, wantErr: false, }, + { + name: "ChangeTableNames", + spec: spec.TransformationSpec{ + Kind: spec.KindChangeTableNames, + NewTableNameTemplate: "cq_sync_{{.OldName}}", + }, + wantErr: false, + }, { name: "InvalidKind", spec: spec.TransformationSpec{ @@ -112,6 +120,20 @@ func TestTransform(t *testing.T) { require.Equal(t, int64(2), record.NumRows(), "Expected 2 rows") }, }, + { + name: "ChangeTableName", + spec: spec.TransformationSpec{ + Kind: spec.KindChangeTableNames, + NewTableNameTemplate: "cq_sync_{{.OldName}}", + Tables: []string{"*"}, + }, + record: createTestRecord(), + validate: func(t *testing.T, record arrow.Record) { + newTableName, ok := record.Schema().Metadata().GetValue(schema.MetadataTableName) + require.True(t, ok, "Expected table name to be present in metadata") + require.Equal(t, "cq_sync_table1", newTableName) + }, + }, } for _, tt := range tests { diff --git a/plugins/transformer/basic/docs/_configuration.md b/plugins/transformer/basic/docs/_configuration.md index d210e04429ac8d..cfcedb4998755f 100644 --- a/plugins/transformer/basic/docs/_configuration.md +++ b/plugins/transformer/basic/docs/_configuration.md @@ -16,4 +16,7 @@ spec: tables: ["xkcd_comics"] name: "source" value: "xkcd" + - kind: change_table_names + tables: ["*"] + new_table_name_template: "cq_sync_{{.OldName}}" ``` \ No newline at end of file diff --git a/plugins/transformer/basic/docs/overview.md b/plugins/transformer/basic/docs/overview.md index 26da219d52a7b6..9255af9d2c3336 100644 --- a/plugins/transformer/basic/docs/overview.md +++ b/plugins/transformer/basic/docs/overview.md @@ -3,6 +3,7 @@ This CloudQuery transformer plugin provides basic transformation capabilities: - Removing columns - Adding literal string columns - Obfuscating string columns +- Renaming tables using a templated name ## Configuration @@ -28,4 +29,6 @@ The `migrate_mode: forced` setting might make sense if you plan on modifying the Then, add your transformer spec. Here's an example that transforms the XKCD source table: -:configuration \ No newline at end of file +:configuration + +Note: transformations are applied sequentially. If you rename tables, the table matcher configuration of subsequent transformations will need to be updated to the new names. \ No newline at end of file From 08239ad75ce1980170117d346944f5f7ee00fac8 Mon Sep 17 00:00:00 2001 From: Mariano Gappa Date: Fri, 2 Aug 2024 15:14:14 +0100 Subject: [PATCH 2/6] Fix lint. --- .../client/schemaupdater/schema_updater.go | 4 ++-- .../schemaupdater/schemaupdater_test.go | 22 +++++++++---------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/plugins/transformer/basic/client/schemaupdater/schema_updater.go b/plugins/transformer/basic/client/schemaupdater/schema_updater.go index 0db34867f3fd8e..b6bcc01ff6692f 100644 --- a/plugins/transformer/basic/client/schemaupdater/schema_updater.go +++ b/plugins/transformer/basic/client/schemaupdater/schema_updater.go @@ -15,8 +15,8 @@ type SchemaUpdater struct { schema *arrow.Schema } -func New(schema *arrow.Schema) *SchemaUpdater { - return &SchemaUpdater{schema: schema} +func New(sc *arrow.Schema) *SchemaUpdater { + return &SchemaUpdater{schema: sc} } func (s *SchemaUpdater) RemoveColumnIndices(colIndices map[int]struct{}) *arrow.Schema { diff --git a/plugins/transformer/basic/client/schemaupdater/schemaupdater_test.go b/plugins/transformer/basic/client/schemaupdater/schemaupdater_test.go index 9b4d3fe1fa9c6d..7dea8af4ae8f0f 100644 --- a/plugins/transformer/basic/client/schemaupdater/schemaupdater_test.go +++ b/plugins/transformer/basic/client/schemaupdater/schemaupdater_test.go @@ -9,8 +9,8 @@ import ( ) func TestRemoveColumnIndices(t *testing.T) { - schema := createTestSchema() - updater := New(schema) + sc := createTestSchema() + updater := New(sc) colIndices := map[int]struct{}{0: {}} updatedSchema := updater.RemoveColumnIndices(colIndices) @@ -20,8 +20,8 @@ func TestRemoveColumnIndices(t *testing.T) { } func TestAddStringColumnAtPos(t *testing.T) { - schema := createTestSchema() - updater := New(schema) + sc := createTestSchema() + updater := New(sc) updatedSchema, err := updater.AddStringColumnAtPos("col3", 1, false) require.NoError(t, err) @@ -32,8 +32,8 @@ func TestAddStringColumnAtPos(t *testing.T) { } func TestAddStringColumnAtEnd(t *testing.T) { - schema := createTestSchema() - updater := New(schema) + sc := createTestSchema() + updater := New(sc) updatedSchema, err := updater.AddStringColumnAtPos("col3", -1, true) require.NoError(t, err) @@ -47,20 +47,20 @@ func TestTransformMaintainsMetadata(t *testing.T) { md1 := arrow.NewMetadata([]string{"key1", "key2"}, []string{"value1", "value2"}) md2 := arrow.NewMetadata([]string{"key3", "key4"}, []string{"value3", "value4"}) md3 := arrow.NewMetadata([]string{"key5", "key6"}, []string{"value5", "value6"}) - schema := arrow.NewSchema( + sc := arrow.NewSchema( []arrow.Field{ {Name: "col1", Type: arrow.BinaryTypes.String, Nullable: true, Metadata: md1}, {Name: "col2", Type: arrow.BinaryTypes.String, Nullable: true, Metadata: md2}, }, &md3, ) - updater := New(schema) + updater := New(sc) updatedSchema, err := updater.AddStringColumnAtPos("col3", -1, true) require.NoError(t, err) - require.Equal(t, schema.Metadata(), updatedSchema.Metadata(), "Expected metadata to be retained") - require.Equal(t, schema.Field(0).Metadata, updatedSchema.Field(0).Metadata, "Expected metadata to be retained") - require.Equal(t, schema.Field(1).Metadata, updatedSchema.Field(1).Metadata, "Expected metadata to be retained") + require.Equal(t, sc.Metadata(), updatedSchema.Metadata(), "Expected metadata to be retained") + require.Equal(t, sc.Field(0).Metadata, updatedSchema.Field(0).Metadata, "Expected metadata to be retained") + require.Equal(t, sc.Field(1).Metadata, updatedSchema.Field(1).Metadata, "Expected metadata to be retained") } func TestChangeTableName(t *testing.T) { From d49613f1414d9dfb4a29973e78776ab8aea41030 Mon Sep 17 00:00:00 2001 From: Mariano Gappa Date: Fri, 2 Aug 2024 15:17:23 +0100 Subject: [PATCH 3/6] Make vale happy. --- plugins/transformer/basic/docs/overview.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/transformer/basic/docs/overview.md b/plugins/transformer/basic/docs/overview.md index 9255af9d2c3336..9b40725e038044 100644 --- a/plugins/transformer/basic/docs/overview.md +++ b/plugins/transformer/basic/docs/overview.md @@ -3,7 +3,7 @@ This CloudQuery transformer plugin provides basic transformation capabilities: - Removing columns - Adding literal string columns - Obfuscating string columns -- Renaming tables using a templated name +- Renaming tables using a name template ## Configuration From 8ce54d357cc3b9de47aa5519c4b6645d967d415f Mon Sep 17 00:00:00 2001 From: Mariano Gappa Date: Fri, 2 Aug 2024 15:25:38 +0100 Subject: [PATCH 4/6] Fix lint. --- plugins/transformer/basic/client/spec/spec.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/plugins/transformer/basic/client/spec/spec.go b/plugins/transformer/basic/client/spec/spec.go index 69e899c2e13853..4429e7ad57b851 100644 --- a/plugins/transformer/basic/client/spec/spec.go +++ b/plugins/transformer/basic/client/spec/spec.go @@ -59,11 +59,7 @@ func (s *Spec) Validate() error { return fmt.Errorf("unknown transformation kind: %s, supported kinds are: %s", t.Kind, strings.Join(kinds, ", ")) } for fieldName := range requiredFields { - value := reflect.ValueOf(t) - if value == reflect.Zero(value.Type()) { - panic(fmt.Sprintf("reflect.ValueOf(%v) is zero", t)) // this would be a nil on s.TransformationSpecs - } - fieldValue := value.FieldByName(fieldName) + fieldValue := reflect.ValueOf(t).FieldByName(fieldName) if !fieldValue.IsValid() { panic(fmt.Sprintf("field %s is not valid", fieldName)) // this would be a bug on kindToRequiredFields/fieldsToCheck } From 32789b5263e15510d0e116483c86552f82c3fa49 Mon Sep 17 00:00:00 2001 From: Mariano Gappa Date: Fri, 2 Aug 2024 15:45:12 +0100 Subject: [PATCH 5/6] Clarify templating capabilities in the docs. --- plugins/transformer/basic/docs/overview.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/transformer/basic/docs/overview.md b/plugins/transformer/basic/docs/overview.md index 9b40725e038044..f85a38d2b1a4a0 100644 --- a/plugins/transformer/basic/docs/overview.md +++ b/plugins/transformer/basic/docs/overview.md @@ -3,7 +3,7 @@ This CloudQuery transformer plugin provides basic transformation capabilities: - Removing columns - Adding literal string columns - Obfuscating string columns -- Renaming tables using a name template +- Renaming tables using a name template (use `{{.OldName}}` to refer to the original name, see example below) ## Configuration From be5046378cd2971bf2ad3d6293fe29606eb9a5b2 Mon Sep 17 00:00:00 2001 From: Mariano Gappa Date: Fri, 2 Aug 2024 16:16:10 +0100 Subject: [PATCH 6/6] Revert to switch/case syntax for spec validation. --- plugins/transformer/basic/client/spec/spec.go | 60 ++++++++----------- 1 file changed, 25 insertions(+), 35 deletions(-) diff --git a/plugins/transformer/basic/client/spec/spec.go b/plugins/transformer/basic/client/spec/spec.go index 4429e7ad57b851..1d103048c041c8 100644 --- a/plugins/transformer/basic/client/spec/spec.go +++ b/plugins/transformer/basic/client/spec/spec.go @@ -3,8 +3,6 @@ package spec import ( "errors" "fmt" - "reflect" - "strings" "text/template" ) @@ -38,49 +36,40 @@ func (s *Spec) SetDefaults() { } } -var kindToRequiredFields = map[string]map[string]struct{}{ - KindRemoveColumns: {"Columns": {}}, - KindAddColumn: {"Name": {}, "Value": {}}, - KindObfuscateColumns: {"Columns": {}}, - KindChangeTableNames: {"NewTableNameTemplate": {}}, -} - -var fieldsToCheck = []string{"Columns", "Name", "Value", "FromTableName", "NewTableNameTemplate"} - func (s *Spec) Validate() error { var err error for _, t := range s.TransformationSpecs { - requiredFields, ok := kindToRequiredFields[t.Kind] - if !ok { - kinds := make([]string, 0, len(kindToRequiredFields)) - for k := range kindToRequiredFields { - kinds = append(kinds, k) + switch t.Kind { + case KindRemoveColumns: + if len(t.Columns) == 0 { + err = errors.Join(err, fmt.Errorf("'%s' field must be specified for %s transformation", "columns", t.Kind)) } - return fmt.Errorf("unknown transformation kind: %s, supported kinds are: %s", t.Kind, strings.Join(kinds, ", ")) - } - for fieldName := range requiredFields { - fieldValue := reflect.ValueOf(t).FieldByName(fieldName) - if !fieldValue.IsValid() { - panic(fmt.Sprintf("field %s is not valid", fieldName)) // this would be a bug on kindToRequiredFields/fieldsToCheck + if t.Name != "" || t.Value != "" || t.NewTableNameTemplate != "" { + err = errors.Join(err, fmt.Errorf("name/value/new_table_name_template fields must not be specified for %s transformation", t.Kind)) } - if fieldValue.Kind() == reflect.String && fieldValue.String() == "" { - err = errors.Join(err, fmt.Errorf("'%s' field must be specified for %s transformation", fieldName, t.Kind)) + case KindAddColumn: + if t.Name == "" || t.Value == "" { + err = errors.Join(err, fmt.Errorf("'%s' and '%s' fields must be specified for %s transformation", "name", "value", t.Kind)) } - if fieldValue.Kind() == reflect.Slice && fieldValue.Len() == 0 { - err = errors.Join(err, fmt.Errorf("'%s' field must be specified for %s transformation", fieldName, t.Kind)) + if t.NewTableNameTemplate != "" { + err = errors.Join(err, fmt.Errorf("new_table_name_template field must not be specified for %s transformation", t.Kind)) } - } - for _, fieldName := range fieldsToCheck { - if _, ok := requiredFields[fieldName]; ok { - continue + case KindObfuscateColumns: + if len(t.Columns) == 0 { + err = errors.Join(err, fmt.Errorf("'%s' field must be specified for %s transformation", "columns", t.Kind)) + } + if t.Name != "" || t.Value != "" || t.NewTableNameTemplate != "" { + err = errors.Join(err, fmt.Errorf("name/value/new_table_name_template fields must not be specified for %s transformation", t.Kind)) } - fieldValue := reflect.ValueOf(t).FieldByName(fieldName) - if fieldValue.Kind() == reflect.String && fieldValue.String() != "" { - err = errors.Join(err, fmt.Errorf("'%s' field must not be specified for %s transformation", fieldName, t.Kind)) + case KindChangeTableNames: + if t.NewTableNameTemplate == "" { + err = errors.Join(err, fmt.Errorf("'%s' field must be specified for %s transformation", "new_table_name_template", t.Kind)) } - if fieldValue.Kind() == reflect.Slice && fieldValue.Len() > 0 { - err = errors.Join(err, fmt.Errorf("'%s' field must not be specified for %s transformation", fieldName, t.Kind)) + if t.Name != "" || t.Value != "" || len(t.Columns) > 0 { + err = errors.Join(err, fmt.Errorf("name/value/columns fields must not be specified for %s transformation", t.Kind)) } + default: + err = errors.Join(err, fmt.Errorf("unknown transformation kind: %s", t.Kind)) } // Non-trivial validations @@ -90,5 +79,6 @@ func (s *Spec) Validate() error { } } } + return err }