Skip to content
Merged
21 changes: 19 additions & 2 deletions cli/cmd/sync_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/cloudquery/cloudquery/cli/internal/analytics"
"github.com/cloudquery/cloudquery/cli/internal/api"
"github.com/cloudquery/cloudquery/cli/internal/specs/v0"
"github.com/cloudquery/cloudquery/cli/internal/tablenamechanger"
"github.com/cloudquery/cloudquery/cli/internal/transformer"
"github.com/cloudquery/cloudquery/cli/internal/transformerpipeline"
"github.com/cloudquery/plugin-pb-go/managedplugin"
Expand Down Expand Up @@ -330,6 +331,12 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des
defer remoteProgressReporter.Cancel()
}

// Transformers can change table names. We need to keep track of table name changes
// in case we do things that depend on table names.
//
// Note that transformers run per-destination, so we need to keep track of table name changes per-destination.
tableNameChanger := tablenamechanger.New(destinationSpecs)

// Note: we want to stop this errorgroup if ctx is cancelled, but we don't want to cancel ctx if gctx is cancelled.
// gctx is always cancelled when the errorgroup returns, and this isn't necessarily an error.
eg, gctx := errgroup.WithContext(ctx)
Expand Down Expand Up @@ -412,11 +419,14 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des
}
case *plugin.Sync_Response_DeleteRecord:
for i := range destinationsPbClients {
// Table name might have changed due to a transformation.
tableName := tableNameChanger.UpdateTableName(destinationSpecs[i].Name, m.DeleteRecord.TableName)

wr := &plugin.Write_Request{}
// Transformations aren't required here because DeleteRecord is only in V3
wr.Message = &plugin.Write_Request_DeleteRecord{
DeleteRecord: &plugin.Write_MessageDeleteRecord{
TableName: m.DeleteRecord.TableName,
TableName: tableName,
TableRelations: m.DeleteRecord.TableRelations,
WhereClause: m.DeleteRecord.WhereClause,
},
Expand Down Expand Up @@ -460,6 +470,11 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des
transformedSchemaBytes = resp.Schema
}

// Table name might have changed due to a transformation.
if err := tableNameChanger.LearnTableNameChange(destinationName, table.Name, transformedSchemaBytes); err != nil {
return err
}

wr := &plugin.Write_Request{}
wr.Message = &plugin.Write_Request_MigrateTable{
MigrateTable: &plugin.Write_MessageMigrateTable{
Expand Down Expand Up @@ -526,7 +541,9 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des

for i := range destinationsClients {
if destinationSpecs[i].WriteMode == specs.WriteModeOverwriteDeleteStale {
if err := deleteStale(writeClients[i], tablesForDeleteStale, sourceName, syncTime); err != nil {
// Table names might have changed due to transformers
updatedTablesForDeleteStale := tableNameChanger.UpdateTableNames(destinationSpecs[i].Name, tablesForDeleteStale)
if err := deleteStale(writeClients[i], updatedTablesForDeleteStale, sourceName, syncTime); err != nil {
return err
}
}
Expand Down
62 changes: 62 additions & 0 deletions cli/internal/tablenamechanger/table_name_changer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package tablenamechanger

import (
"github.com/cloudquery/cloudquery/cli/internal/specs/v0"
"github.com/cloudquery/plugin-pb-go/pb/plugin/v3"
"github.com/cloudquery/plugin-sdk/v4/schema"
)

// TableNameChanger manages table name changes due to transformers, on a per-destination basis.
// It can learn about table name changes and apply those changes on further steps.
type TableNameChanger struct {
tableNameChanges map[string]map[string]string
Comment thread
erezrokah marked this conversation as resolved.
}

func New(destinationSpecs []specs.Destination) *TableNameChanger {
tnc := &TableNameChanger{
tableNameChanges: make(map[string]map[string]string),
}
for _, destinationSpec := range destinationSpecs {
tnc.tableNameChanges[destinationSpec.Name] = make(map[string]string)
}
return tnc
}

func (c TableNameChanger) UpdateTableNames(destinationName string, tables map[string]bool) map[string]bool {
newTables := make(map[string]bool, len(tables))
for oldTableName := range tables {
if newTableName, ok := c.tableNameChanges[destinationName][oldTableName]; ok {
delete(tables, oldTableName)
newTables[newTableName] = true
} else {
newTables[oldTableName] = true
}
}
return newTables
}

func (c TableNameChanger) UpdateTableName(destinationName string, oldTableName string) string {
if newTableName, ok := c.tableNameChanges[destinationName][oldTableName]; ok {
return newTableName
}
return oldTableName
}

func (c *TableNameChanger) LearnTableNameChange(destinationName, oldTableName string, schemaBytes []byte) error {
// Implicit assumption that table name changes are deterministic.
// Therefore, once we learned about a table name change, we don't need to learn it again.
if c.tableNameChanges[destinationName][oldTableName] != "" {
return nil
}

sc, err := plugin.NewSchemaFromBytes(schemaBytes)
if err != nil {
return err
}
newTableName, ok := sc.Metadata().GetValue(schema.MetadataTableName)
if !ok {
return nil // this would be an error, but let it fail at a more relevant step
}
c.tableNameChanges[destinationName][oldTableName] = newTableName
return nil
}
149 changes: 149 additions & 0 deletions cli/internal/tablenamechanger/table_name_changer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package tablenamechanger

import (
"testing"

"github.com/apache/arrow/go/v17/arrow"
"github.com/cloudquery/cloudquery/cli/internal/specs/v0"
"github.com/cloudquery/plugin-pb-go/pb/plugin/v3"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/stretchr/testify/require"
)

func TestTableNameChanger(t *testing.T) {
destinationSpecs := []specs.Destination{
{Metadata: specs.Metadata{Name: "dest1"}},
{Metadata: specs.Metadata{Name: "dest2"}},
}

t.Run("UpdateTableNames", func(t *testing.T) {
tests := []struct {
name string
destinationName string
tables map[string]bool
expectedNewTables map[string]bool
}{
{
name: "NoChanges",
destinationName: "dest1",
tables: map[string]bool{"table1": true, "table2": true},
expectedNewTables: map[string]bool{"table1": true, "table2": true},
},
{
name: "WithChanges",
destinationName: "dest1",
tables: map[string]bool{"old_table1": true, "table2": true},
expectedNewTables: map[string]bool{"new_table1": true, "table2": true},
},
}

tnc := New(destinationSpecs)
err := tnc.LearnTableNameChange("dest1", "old_table1", createSchemaBytes(t, "new_table1"))
require.NoError(t, err)

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
newTables := tnc.UpdateTableNames(tt.destinationName, tt.tables)
require.Equal(t, tt.expectedNewTables, newTables)
})
}
})

t.Run("UpdateTableName", func(t *testing.T) {
tests := []struct {
name string
destinationName string
oldTableName string
expectedNewName string
}{
{
name: "NoChange",
destinationName: "dest1",
oldTableName: "table1",
expectedNewName: "table1",
},
{
name: "WithChange",
destinationName: "dest1",
oldTableName: "old_table1",
expectedNewName: "new_table1",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tnc := New(destinationSpecs)
err := tnc.LearnTableNameChange("dest1", "old_table1", createSchemaBytes(t, "new_table1"))
require.NoError(t, err)

newName := tnc.UpdateTableName(tt.destinationName, tt.oldTableName)
require.Equal(t, tt.expectedNewName, newName)
})
}
})

t.Run("LearnTableNameChange", func(t *testing.T) {
tests := []struct {
name string
learnDestName string
learnOldTableName string
learnNewTableName string
updateDestName string
updateOldTableName string
updateNewTableName string
}{
{
name: "ValidChange",
learnDestName: "dest1",
learnOldTableName: "old_table",
learnNewTableName: "new_table",
updateDestName: "dest1",
updateOldTableName: "old_table",
updateNewTableName: "new_table",
},
{
name: "NoChangeOnSameDestination",
learnDestName: "dest1",
learnOldTableName: "old_table",
learnNewTableName: "old_table",
updateDestName: "dest1",
updateOldTableName: "old_table",
updateNewTableName: "old_table",
},
{
name: "ValidChangeButDoesntUpdateOnDifferentDestination",
learnDestName: "dest1",
learnOldTableName: "old_table",
learnNewTableName: "new_table",
updateDestName: "dest2",
updateOldTableName: "old_table",
updateNewTableName: "old_table",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tnc := New(destinationSpecs)

err := tnc.LearnTableNameChange(tt.learnDestName, tt.learnOldTableName, createSchemaBytes(t, tt.learnNewTableName))
require.NoError(t, err)
newName := tnc.UpdateTableName(tt.updateDestName, tt.updateOldTableName)
require.Equal(t, tt.updateNewTableName, newName)
})
}
})
}

func createSchemaBytes(t *testing.T, tableName string) []byte {
md := arrow.NewMetadata([]string{schema.MetadataTableName}, []string{tableName})
sc := arrow.NewSchema(
[]arrow.Field{
{Name: "col1", Type: arrow.BinaryTypes.String, Nullable: true},
{Name: "col2", Type: arrow.BinaryTypes.String, Nullable: true},
},
&md,
)
bytes, err := plugin.SchemaToBytes(sc)
require.NoError(t, err)
return bytes
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ func (r *RecordUpdater) ObfuscateColumns(columnNames []string) (arrow.Record, er
return r.record, nil
}

func (r *RecordUpdater) ChangeTableName(newTableNamePattern string) (arrow.Record, error) {
newSchema, err := r.schemaUpdater.ChangeTableName(newTableNamePattern)
if err != nil {
return nil, err
}
r.record = array.NewRecord(newSchema, r.record.Columns(), r.record.NumRows())
return r.record, nil
}

func (r *RecordUpdater) colIndicesByNames(columnNames []string) (map[int]struct{}, error) {
colNameMap := make(map[string]struct{})
for _, columnName := range columnNames {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/apache/arrow/go/v17/arrow"
"github.com/apache/arrow/go/v17/arrow/array"
"github.com/apache/arrow/go/v17/arrow/memory"
"github.com/cloudquery/plugin-sdk/v2/schema"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -53,13 +54,30 @@ func TestObfuscateColumns(t *testing.T) {
require.Equal(t, "528e5290f8ff0eb0325f0472b9c1a9ef4fac0b02ff6094b64d9382af4a10444b", updatedRecord.Column(0).(*array.String).Value(1))
}

func TestChangeTableName(t *testing.T) {
record := createTestRecord()
updater := New(record)

updatedRecord, err := updater.ChangeTableName("cq_sync_{{.OldName}}")
require.NoError(t, err)

require.Equal(t, int64(2), updatedRecord.NumCols())
require.Equal(t, int64(2), updatedRecord.NumRows())
requireAllColsLenMatchRecordsLen(t, updatedRecord)

newTableName, ok := updatedRecord.Schema().Metadata().GetValue(schema.MetadataTableName)
require.True(t, ok, "Expected table name to be present in metadata")
require.Equal(t, "cq_sync_testTable", newTableName)
}

func createTestRecord() arrow.Record {
md := arrow.NewMetadata([]string{schema.MetadataTableName}, []string{"testTable"})
bld := array.NewRecordBuilder(memory.DefaultAllocator, arrow.NewSchema(
[]arrow.Field{
{Name: "col1", Type: arrow.BinaryTypes.String},
{Name: "col2", Type: arrow.BinaryTypes.String},
},
nil,
&md,
))
defer bld.Release()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
package schemaupdater

import "github.com/apache/arrow/go/v17/arrow"
import (
"bytes"
"errors"
"text/template"

"github.com/apache/arrow/go/v17/arrow"
"github.com/cloudquery/plugin-sdk/v2/schema"
)

// SchemaUpdater takes an `arrow.Schema` and knows how to make simple subsequent changes to it.
// It doesn't know which table it belongs to or if the changes make sense.
type SchemaUpdater struct {
schema *arrow.Schema
}

func New(schema *arrow.Schema) *SchemaUpdater {
return &SchemaUpdater{schema: schema}
func New(sc *arrow.Schema) *SchemaUpdater {
return &SchemaUpdater{schema: sc}
}

func (s *SchemaUpdater) RemoveColumnIndices(colIndices map[int]struct{}) *arrow.Schema {
Expand All @@ -35,3 +42,32 @@ func (s *SchemaUpdater) AddStringColumnAtPos(columnName string, zeroIndexedPosit
arrow.Field{Name: columnName, Type: arrow.BinaryTypes.String, Nullable: isNullable},
)
}

func (s *SchemaUpdater) ChangeTableName(newTableNamePattern string) (*arrow.Schema, error) {
existingMetadata := s.schema.Metadata()
tableName, ok := existingMetadata.GetValue(schema.MetadataTableName)
if !ok {
return nil, errors.New("table name not found in record's metadata")
}

type tpl struct {
OldName string
}

t, err := template.New("table_name").Parse(newTableNamePattern)
if err != nil {
return nil, err
}

var tplBuf bytes.Buffer
if err := t.Execute(&tplBuf, tpl{OldName: tableName}); err != nil {
return nil, err
}

newName := tplBuf.String()

m := existingMetadata.ToMap()
m[schema.MetadataTableName] = newName
newMetadata := arrow.MetadataFrom(m)
return arrow.NewSchema(s.schema.Fields(), &newMetadata), nil
}
Loading