diff --git a/plugins/destination/sqlite/client/client_test.go b/plugins/destination/sqlite/client/client_test.go index e7563876c8dac3..dfd70e328c14cc 100644 --- a/plugins/destination/sqlite/client/client_test.go +++ b/plugins/destination/sqlite/client/client_test.go @@ -52,81 +52,101 @@ func TestPluginMigrateMultiplePKs(t *testing.T) { } func TestMigrateErrors(t *testing.T) { - table := schema.Table{ - Name: "table_1", - Columns: []schema.Column{ - {Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}, - {Name: "name", Type: schema.TypeString}, - {Name: "age", Type: schema.TypeInt}, - {Name: "created_at", Type: schema.TypeString}, - }, - } - p := destination.NewPlugin("sqlite", "development", New) - ctx := context.Background() - - spec := Spec{ - ConnectionString: ":memory:", - } - - // Init the plugin so we can call migrate - if err := p.Init(ctx, zerolog.Logger{}, specs.Destination{Name: "cq_test_migrate", Spec: spec}); err != nil { - t.Fatal(err) - } - - if err := p.Migrate(ctx, []*schema.Table{&table}); err != nil { - t.Fatal(err) - } - - tableWithMigratableChange := schema.Table{ - Name: "table_1", - Columns: []schema.Column{ - {Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}, - {Name: "name", Type: schema.TypeString}, - {Name: "age", Type: schema.TypeInt}, - {Name: "created_at", Type: schema.TypeString}, - {Name: "new_column", Type: schema.TypeString}, + tests := []struct { + name string + spec specs.Destination + wantErr string + }{ + { + name: "should fail on migrate mode safe", + spec: specs.Destination{Name: "cq_test_migrate", Spec: Spec{ConnectionString: ":memory:"}}, + wantErr: `failed to migrate schema: +can't migrate table "table_1" since adding the new PK column "new_pk_column" is not supported. Try dropping this table +can't migrate table "table_2" since changing the type of column "age" from "integer" to "text" is not supported. Try dropping this column for this table +can't migrate table "table_2" since changing the type of column "created_at" from "text" to "timestamp" is not supported. Try dropping this column for this table + +To force a migration add "migrate_mode: forced" to your destination spec`, }, - } - newTable := schema.Table{ - Name: "table_2", - Columns: []schema.Column{ - {Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}, - {Name: "name", Type: schema.TypeString}, - {Name: "age", Type: schema.TypeInt}, - {Name: "created_at", Type: schema.TypeString}, - }, - } - - if err := p.Migrate(ctx, []*schema.Table{&tableWithMigratableChange, &newTable}); err != nil { - t.Fatal(err) - } - - tableWithNonTableDropNeeded := schema.Table{ - Name: "table_1", - Columns: []schema.Column{ - {Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}, - {Name: "name", Type: schema.TypeString}, - {Name: "age", Type: schema.TypeInt}, - {Name: "new_pk_column", Type: schema.TypeString, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}, - {Name: "created_at", Type: schema.TypeTimestamp}, + { + name: "should succeed on migrate mode force", + spec: specs.Destination{Name: "cq_test_migrate", Spec: Spec{ConnectionString: ":memory:"}, MigrateMode: specs.MigrateModeForced}, }, } - tableWithColumnsDropNeeded := schema.Table{ - Name: "table_2", - Columns: []schema.Column{ - {Name: "id", Type: schema.TypeUUID}, - {Name: "name", Type: schema.TypeString, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}, - {Name: "age", Type: schema.TypeString}, - {Name: "created_at", Type: schema.TypeTimestamp}, - }, + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + table := schema.Table{ + Name: "table_1", + Columns: []schema.Column{ + {Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}, + {Name: "name", Type: schema.TypeString}, + {Name: "age", Type: schema.TypeInt}, + {Name: "created_at", Type: schema.TypeString}, + }, + } + p := destination.NewPlugin("sqlite", "development", New) + ctx := context.Background() + + // Init the plugin so we can call migrate + if err := p.Init(ctx, zerolog.Logger{}, tt.spec); err != nil { + t.Fatal(err) + } + + if err := p.Migrate(ctx, []*schema.Table{&table}); err != nil { + t.Fatal(err) + } + + tableWithMigratableChange := schema.Table{ + Name: "table_1", + Columns: []schema.Column{ + {Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}, + {Name: "name", Type: schema.TypeString}, + {Name: "age", Type: schema.TypeInt}, + {Name: "created_at", Type: schema.TypeString}, + {Name: "new_column", Type: schema.TypeString}, + }, + } + newTable := schema.Table{ + Name: "table_2", + Columns: []schema.Column{ + {Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}, + {Name: "name", Type: schema.TypeString}, + {Name: "age", Type: schema.TypeInt}, + {Name: "created_at", Type: schema.TypeString}, + }, + } + + if err := p.Migrate(ctx, []*schema.Table{&tableWithMigratableChange, &newTable}); err != nil { + t.Fatal(err) + } + + tableWithNonTableDropNeeded := schema.Table{ + Name: "table_1", + Columns: []schema.Column{ + {Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}, + {Name: "name", Type: schema.TypeString}, + {Name: "age", Type: schema.TypeInt}, + {Name: "new_pk_column", Type: schema.TypeString, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}, + {Name: "created_at", Type: schema.TypeTimestamp}, + }, + } + + tableWithColumnsDropNeeded := schema.Table{ + Name: "table_2", + Columns: []schema.Column{ + {Name: "id", Type: schema.TypeUUID}, + {Name: "name", Type: schema.TypeString, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}, + {Name: "age", Type: schema.TypeString}, + {Name: "created_at", Type: schema.TypeTimestamp}, + }, + } + + err := p.Migrate(ctx, []*schema.Table{&tableWithNonTableDropNeeded, &tableWithColumnsDropNeeded}) + if tt.wantErr != "" { + require.ErrorContains(t, err, tt.wantErr) + } else { + require.NoError(t, err) + } + }) } - - err := p.Migrate(ctx, []*schema.Table{&tableWithNonTableDropNeeded, &tableWithColumnsDropNeeded}) - expectedError := `failed to migrate schema: - can't migrate table "table_1" since adding the new PK column "new_pk_column" is not supported. Try dropping this table - can't migrate table "table_2" since changing the type of column "age" from "integer" to "text" is not supported. Try dropping this column for this table - can't migrate table "table_2" since changing the type of column "created_at" from "text" to "timestamp" is not supported. Try dropping this column for this table - ` - require.Errorf(t, err, expectedError) } diff --git a/plugins/destination/sqlite/client/migrate.go b/plugins/destination/sqlite/client/migrate.go index 172fc905334fe5..9c888b27283e20 100644 --- a/plugins/destination/sqlite/client/migrate.go +++ b/plugins/destination/sqlite/client/migrate.go @@ -52,6 +52,29 @@ type tableChange struct { columnChanges []*columnChange } +type migrationMessage struct { + err string + info string +} + +type migrationsMessages []migrationMessage + +func (m migrationsMessages) Errors() []string { + errs := make([]string, 0, len(m)) + for _, msg := range m { + errs = append(errs, msg.err) + } + return errs +} + +func (m migrationsMessages) Infos() []string { + infos := make([]string, 0, len(m)) + for _, msg := range m { + infos = append(infos, msg.info) + } + return infos +} + func (c *Client) getColumnChange(col schema.Column, sqliteColumn *columnInfo) *columnChange { columnName := col.Name columnType := c.SchemaTypeToSqlite(col.Type) @@ -111,42 +134,46 @@ func (c *Client) getTableChange(ctx context.Context, table *schema.Table) (*tabl } func (c *Client) getSchemaChanges(ctx context.Context, tables schema.Tables) ([]*tableChange, error) { - changes := make([]*tableChange, len(tables)) - for i, table := range tables { + changes := make([]*tableChange, 0, len(tables)) + for _, table := range tables { tableChange, err := c.getTableChange(ctx, table) if err != nil { return nil, err } - changes[i] = tableChange - for _, relation := range table.Relations { - relationChanges, err := c.getTableChange(ctx, relation) - if err != nil { - return nil, err - } - changes = append(changes, relationChanges) + changes = append(changes, tableChange) + relationChanges, err := c.getSchemaChanges(ctx, table.Relations) + if err != nil { + return nil, err } + changes = append(changes, relationChanges...) } return changes, nil } -func getMigrationErrors(changes []*tableChange) []string { - var errors []string +func getMigrationMessages(changes []*tableChange) migrationsMessages { + var messages migrationsMessages for _, tableChange := range changes { if tableChange.new { continue } for _, colChange := range tableChange.columnChanges { if colChange.new && colChange.newPk { - errors = append(errors, fmt.Sprintf("can't migrate table %q since adding the new PK column %q is not supported. Try dropping this table", tableChange.table.Name, colChange.name)) + messages = append(messages, migrationMessage{ + err: fmt.Sprintf("can't migrate table %q since adding the new PK column %q is not supported. Try dropping this table", tableChange.table.Name, colChange.name), + info: fmt.Sprintf("table %q will be dropped and recreated", tableChange.table.Name), + }) // no need to report other errors as the user needs to drop the table altogether break } if !colChange.new && colChange.oldType != colChange.newType { - errors = append(errors, fmt.Sprintf("can't migrate table %q since changing the type of column %q from %q to %q is not supported. Try dropping this column for this table", tableChange.table.Name, colChange.name, colChange.oldType, colChange.newType)) + messages = append(messages, migrationMessage{ + err: fmt.Sprintf("can't migrate table %q since changing the type of column %q from %q to %q is not supported. Try dropping this column for this table", tableChange.table.Name, colChange.name, colChange.oldType, colChange.newType), + info: fmt.Sprintf("column %q of table %q will be dropped and recreated", colChange.name, tableChange.table.Name), + }) } } } - return errors + return messages } // This is the responsibility of the CLI of the client to lock before running migration @@ -156,9 +183,14 @@ func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error { return err } - migrationErrors := getMigrationErrors(schemaChanges) - if len(migrationErrors) > 0 { - return fmt.Errorf("failed to migrate schema:\n%s", strings.Join(migrationErrors, "\n")) + migrationMessages := getMigrationMessages(schemaChanges) + if len(migrationMessages) > 0 { + if c.spec.MigrateMode == specs.MigrateModeSafe { + return fmt.Errorf("failed to migrate schema:\n%s\n\nTo force a migration add \"migrate_mode: %s\" to your destination spec", strings.Join(migrationMessages.Errors(), "\n"), specs.MigrateModeForced.String()) + } + for _, msg := range migrationMessages.Infos() { + c.logger.Info().Msg(msg) + } } for _, tableChange := range schemaChanges { @@ -173,11 +205,39 @@ func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error { } else { c.logger.Debug().Str("table", table.Name).Msg("Table exists, auto-migrating") for _, colChange := range tableChange.columnChanges { + tableName := tableChange.table.Name + columnName := colChange.name + columnType := colChange.newType + // If this is a new PK column we need to drop the table + if colChange.new && colChange.newPk { + c.logger.Debug().Str("table", table.Name).Str("column", colChange.name).Msg("New column is a primary key, dropping and adding table since in forced migrate mode") + sql := "drop table if exists \"" + tableName + "\"" + if _, err := c.db.Exec(sql); err != nil { + return fmt.Errorf("failed to drop table %s: %w", tableName, err) + } + err := c.createTableIfNotExist(ctx, tableChange.table) + if err != nil { + return err + } + break + } if colChange.new { c.logger.Debug().Str("table", table.Name).Str("column", colChange.name).Msg("Column doesn't exist, creating") - sql := "alter table \"" + table.Name + "\" add column \"" + colChange.name + "\" \"" + colChange.newType + `"` + sql := "alter table \"" + tableName + "\" add column \"" + columnName + "\" \"" + columnType + `"` + if _, err := c.db.Exec(sql); err != nil { + return fmt.Errorf("failed to add column %s on table %s: %w", colChange.name, tableName, err) + } + } + // if this is an existing column with type change we need to drop and add it + if !colChange.new && colChange.oldType != colChange.newType { + c.logger.Debug().Str("table", table.Name).Str("column", colChange.name).Msg("Existing column type changed, dropping and adding column since in forced migrate mode") + sql := "alter table " + tableName + " drop column " + columnName + if _, err := c.db.Exec(sql); err != nil { + return fmt.Errorf("failed to drop column %s on table %s: %w", columnName, tableName, err) + } + sql = "alter table \"" + tableName + "\" add column \"" + columnName + "\" \"" + columnType + `"` if _, err := c.db.Exec(sql); err != nil { - return fmt.Errorf("failed to add column %s on table %s: %w", colChange.name, table.Name, err) + return fmt.Errorf("failed to add column %s on table %s: %w", colChange.name, tableName, err) } } }