From 139b7dad0e733b662760e6c1011118fd8f346ef8 Mon Sep 17 00:00:00 2001 From: erezrokah Date: Thu, 12 Jan 2023 15:43:40 +0200 Subject: [PATCH 1/7] feat(sqlite): Collect and report migration errors before starting the migration --- plugins/destination/sqlite/client/migrate.go | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/destination/sqlite/client/migrate.go b/plugins/destination/sqlite/client/migrate.go index 172fc905334fe5..591ac036970e0f 100644 --- a/plugins/destination/sqlite/client/migrate.go +++ b/plugins/destination/sqlite/client/migrate.go @@ -175,6 +175,7 @@ func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error { for _, colChange := range tableChange.columnChanges { if colChange.new { c.logger.Debug().Str("table", table.Name).Str("column", colChange.name).Msg("Column doesn't exist, creating") + table := tableChange.table sql := "alter table \"" + table.Name + "\" add column \"" + colChange.name + "\" \"" + colChange.newType + `"` if _, err := c.db.Exec(sql); err != nil { return fmt.Errorf("failed to add column %s on table %s: %w", colChange.name, table.Name, err) From 104d777e1ac489d7e1f21b6ec07f23775bf3beaa Mon Sep 17 00:00:00 2001 From: erezrokah Date: Sun, 15 Jan 2023 18:39:56 +0200 Subject: [PATCH 2/7] feat(sqlite): Support force migration --- .../destination/sqlite/client/client_test.go | 164 ++++++++++-------- plugins/destination/sqlite/client/migrate.go | 37 +++- plugins/destination/sqlite/go.mod | 2 + 3 files changed, 126 insertions(+), 77 deletions(-) diff --git a/plugins/destination/sqlite/client/client_test.go b/plugins/destination/sqlite/client/client_test.go index e7563876c8dac3..dfd70e328c14cc 100644 --- a/plugins/destination/sqlite/client/client_test.go +++ b/plugins/destination/sqlite/client/client_test.go @@ -52,81 +52,101 @@ func TestPluginMigrateMultiplePKs(t *testing.T) { } func TestMigrateErrors(t *testing.T) { - table := schema.Table{ - Name: "table_1", - Columns: []schema.Column{ - {Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}, - {Name: "name", Type: schema.TypeString}, - {Name: "age", Type: schema.TypeInt}, - {Name: "created_at", Type: schema.TypeString}, - }, - } - p := destination.NewPlugin("sqlite", "development", New) - ctx := context.Background() - - spec := Spec{ - ConnectionString: ":memory:", - } - - // Init the plugin so we can call migrate - if err := p.Init(ctx, zerolog.Logger{}, specs.Destination{Name: "cq_test_migrate", Spec: spec}); err != nil { - t.Fatal(err) - } - - if err := p.Migrate(ctx, []*schema.Table{&table}); err != nil { - t.Fatal(err) - } - - tableWithMigratableChange := schema.Table{ - Name: "table_1", - Columns: []schema.Column{ - {Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}, - {Name: "name", Type: schema.TypeString}, - {Name: "age", Type: schema.TypeInt}, - {Name: "created_at", Type: schema.TypeString}, - {Name: "new_column", Type: schema.TypeString}, + tests := []struct { + name string + spec specs.Destination + wantErr string + }{ + { + name: "should fail on migrate mode safe", + spec: specs.Destination{Name: "cq_test_migrate", Spec: Spec{ConnectionString: ":memory:"}}, + wantErr: `failed to migrate schema: +can't migrate table "table_1" since adding the new PK column "new_pk_column" is not supported. Try dropping this table +can't migrate table "table_2" since changing the type of column "age" from "integer" to "text" is not supported. Try dropping this column for this table +can't migrate table "table_2" since changing the type of column "created_at" from "text" to "timestamp" is not supported. Try dropping this column for this table + +To force a migration add "migrate_mode: forced" to your destination spec`, }, - } - newTable := schema.Table{ - Name: "table_2", - Columns: []schema.Column{ - {Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}, - {Name: "name", Type: schema.TypeString}, - {Name: "age", Type: schema.TypeInt}, - {Name: "created_at", Type: schema.TypeString}, - }, - } - - if err := p.Migrate(ctx, []*schema.Table{&tableWithMigratableChange, &newTable}); err != nil { - t.Fatal(err) - } - - tableWithNonTableDropNeeded := schema.Table{ - Name: "table_1", - Columns: []schema.Column{ - {Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}, - {Name: "name", Type: schema.TypeString}, - {Name: "age", Type: schema.TypeInt}, - {Name: "new_pk_column", Type: schema.TypeString, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}, - {Name: "created_at", Type: schema.TypeTimestamp}, + { + name: "should succeed on migrate mode force", + spec: specs.Destination{Name: "cq_test_migrate", Spec: Spec{ConnectionString: ":memory:"}, MigrateMode: specs.MigrateModeForced}, }, } - tableWithColumnsDropNeeded := schema.Table{ - Name: "table_2", - Columns: []schema.Column{ - {Name: "id", Type: schema.TypeUUID}, - {Name: "name", Type: schema.TypeString, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}, - {Name: "age", Type: schema.TypeString}, - {Name: "created_at", Type: schema.TypeTimestamp}, - }, + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + table := schema.Table{ + Name: "table_1", + Columns: []schema.Column{ + {Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}, + {Name: "name", Type: schema.TypeString}, + {Name: "age", Type: schema.TypeInt}, + {Name: "created_at", Type: schema.TypeString}, + }, + } + p := destination.NewPlugin("sqlite", "development", New) + ctx := context.Background() + + // Init the plugin so we can call migrate + if err := p.Init(ctx, zerolog.Logger{}, tt.spec); err != nil { + t.Fatal(err) + } + + if err := p.Migrate(ctx, []*schema.Table{&table}); err != nil { + t.Fatal(err) + } + + tableWithMigratableChange := schema.Table{ + Name: "table_1", + Columns: []schema.Column{ + {Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}, + {Name: "name", Type: schema.TypeString}, + {Name: "age", Type: schema.TypeInt}, + {Name: "created_at", Type: schema.TypeString}, + {Name: "new_column", Type: schema.TypeString}, + }, + } + newTable := schema.Table{ + Name: "table_2", + Columns: []schema.Column{ + {Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}, + {Name: "name", Type: schema.TypeString}, + {Name: "age", Type: schema.TypeInt}, + {Name: "created_at", Type: schema.TypeString}, + }, + } + + if err := p.Migrate(ctx, []*schema.Table{&tableWithMigratableChange, &newTable}); err != nil { + t.Fatal(err) + } + + tableWithNonTableDropNeeded := schema.Table{ + Name: "table_1", + Columns: []schema.Column{ + {Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}, + {Name: "name", Type: schema.TypeString}, + {Name: "age", Type: schema.TypeInt}, + {Name: "new_pk_column", Type: schema.TypeString, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}, + {Name: "created_at", Type: schema.TypeTimestamp}, + }, + } + + tableWithColumnsDropNeeded := schema.Table{ + Name: "table_2", + Columns: []schema.Column{ + {Name: "id", Type: schema.TypeUUID}, + {Name: "name", Type: schema.TypeString, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}, + {Name: "age", Type: schema.TypeString}, + {Name: "created_at", Type: schema.TypeTimestamp}, + }, + } + + err := p.Migrate(ctx, []*schema.Table{&tableWithNonTableDropNeeded, &tableWithColumnsDropNeeded}) + if tt.wantErr != "" { + require.ErrorContains(t, err, tt.wantErr) + } else { + require.NoError(t, err) + } + }) } - - err := p.Migrate(ctx, []*schema.Table{&tableWithNonTableDropNeeded, &tableWithColumnsDropNeeded}) - expectedError := `failed to migrate schema: - can't migrate table "table_1" since adding the new PK column "new_pk_column" is not supported. Try dropping this table - can't migrate table "table_2" since changing the type of column "age" from "integer" to "text" is not supported. Try dropping this column for this table - can't migrate table "table_2" since changing the type of column "created_at" from "text" to "timestamp" is not supported. Try dropping this column for this table - ` - require.Errorf(t, err, expectedError) } diff --git a/plugins/destination/sqlite/client/migrate.go b/plugins/destination/sqlite/client/migrate.go index 591ac036970e0f..aa81aea7961bee 100644 --- a/plugins/destination/sqlite/client/migrate.go +++ b/plugins/destination/sqlite/client/migrate.go @@ -157,8 +157,8 @@ func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error { } migrationErrors := getMigrationErrors(schemaChanges) - if len(migrationErrors) > 0 { - return fmt.Errorf("failed to migrate schema:\n%s", strings.Join(migrationErrors, "\n")) + if c.spec.MigrateMode == specs.MigrateModeSafe && len(migrationErrors) > 0 { + return fmt.Errorf("failed to migrate schema:\n%s\n\nTo force a migration add \"migrate_mode: %s\" to your destination spec", strings.Join(migrationErrors, "\n"), specs.MigrateModeForced.String()) } for _, tableChange := range schemaChanges { @@ -173,12 +173,39 @@ func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error { } else { c.logger.Debug().Str("table", table.Name).Msg("Table exists, auto-migrating") for _, colChange := range tableChange.columnChanges { + tableName := tableChange.table.Name + columnName := colChange.name + columnType := colChange.newType + // If this is a new PK column we need to drop the table + if colChange.new && colChange.newPk { + c.logger.Debug().Str("table", table.Name).Str("column", colChange.name).Msg("New column is a primary key, dropping and adding table since in forced migrate mode") + sql := "drop table if exists \"" + tableName + "\"" + if _, err := c.db.Exec(sql); err != nil { + return fmt.Errorf("failed to drop table %s: %w", tableName, err) + } + err := c.createTableIfNotExist(ctx, tableChange.table) + if err != nil { + return err + } + break + } if colChange.new { c.logger.Debug().Str("table", table.Name).Str("column", colChange.name).Msg("Column doesn't exist, creating") - table := tableChange.table - sql := "alter table \"" + table.Name + "\" add column \"" + colChange.name + "\" \"" + colChange.newType + `"` + sql := "alter table \"" + tableName + "\" add column \"" + columnName + "\" \"" + columnType + `"` + if _, err := c.db.Exec(sql); err != nil { + return fmt.Errorf("failed to add column %s on table %s: %w", colChange.name, tableName, err) + } + } + // if this is an existing column with type change we need to drop and add it + if !colChange.new && colChange.oldType != colChange.newType { + c.logger.Debug().Str("table", table.Name).Str("column", colChange.name).Msg("Existing column type changed, dropping and adding column since in forced migrate mode") + sql := "alter table " + tableName + " drop column " + columnName + if _, err := c.db.Exec(sql); err != nil { + return fmt.Errorf("failed to drop column %s on table %s: %w", columnName, tableName, err) + } + sql = "alter table \"" + tableName + "\" add column \"" + columnName + "\" \"" + columnType + `"` if _, err := c.db.Exec(sql); err != nil { - return fmt.Errorf("failed to add column %s on table %s: %w", colChange.name, table.Name, err) + return fmt.Errorf("failed to add column %s on table %s: %w", colChange.name, tableName, err) } } } diff --git a/plugins/destination/sqlite/go.mod b/plugins/destination/sqlite/go.mod index 16f4a821c67fcb..ed33b6b6380b38 100644 --- a/plugins/destination/sqlite/go.mod +++ b/plugins/destination/sqlite/go.mod @@ -35,3 +35,5 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/cloudquery/plugin-sdk => ../../../../plugin-sdk From 8c652b92ec8c8ced28ea62114b177db096455cc2 Mon Sep 17 00:00:00 2001 From: erezrokah Date: Tue, 17 Jan 2023 14:46:54 +0200 Subject: [PATCH 3/7] chore: Remove replace in go.mod --- plugins/destination/sqlite/go.mod | 2 -- 1 file changed, 2 deletions(-) diff --git a/plugins/destination/sqlite/go.mod b/plugins/destination/sqlite/go.mod index ed33b6b6380b38..16f4a821c67fcb 100644 --- a/plugins/destination/sqlite/go.mod +++ b/plugins/destination/sqlite/go.mod @@ -35,5 +35,3 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) - -replace github.com/cloudquery/plugin-sdk => ../../../../plugin-sdk From a7822f87c9d1d82ab1802e80879d5dcb8ada5c5c Mon Sep 17 00:00:00 2001 From: erezrokah Date: Thu, 19 Jan 2023 15:29:58 +0200 Subject: [PATCH 4/7] feat: Log forced changes to schema --- plugins/destination/sqlite/client/migrate.go | 50 ++++++++++++++++---- 1 file changed, 42 insertions(+), 8 deletions(-) diff --git a/plugins/destination/sqlite/client/migrate.go b/plugins/destination/sqlite/client/migrate.go index aa81aea7961bee..c48351c2cdd1fc 100644 --- a/plugins/destination/sqlite/client/migrate.go +++ b/plugins/destination/sqlite/client/migrate.go @@ -52,6 +52,29 @@ type tableChange struct { columnChanges []*columnChange } +type migrationMessage struct { + err string + info string +} + +type migrationsMessages []migrationMessage + +func (m migrationsMessages) Errors() []string { + var errs []string + for _, msg := range m { + errs = append(errs, msg.err) + } + return errs +} + +func (m migrationsMessages) Infos() []string { + var infos []string + for _, msg := range m { + infos = append(infos, msg.info) + } + return infos +} + func (c *Client) getColumnChange(col schema.Column, sqliteColumn *columnInfo) *columnChange { columnName := col.Name columnType := c.SchemaTypeToSqlite(col.Type) @@ -129,24 +152,30 @@ func (c *Client) getSchemaChanges(ctx context.Context, tables schema.Tables) ([] return changes, nil } -func getMigrationErrors(changes []*tableChange) []string { - var errors []string +func getMigrationMessages(changes []*tableChange) migrationsMessages { + var messages migrationsMessages for _, tableChange := range changes { if tableChange.new { continue } for _, colChange := range tableChange.columnChanges { if colChange.new && colChange.newPk { - errors = append(errors, fmt.Sprintf("can't migrate table %q since adding the new PK column %q is not supported. Try dropping this table", tableChange.table.Name, colChange.name)) + messages = append(messages, migrationMessage{ + err: fmt.Sprintf("can't migrate table %q since adding the new PK column %q is not supported. Try dropping this table", tableChange.table.Name, colChange.name), + info: fmt.Sprintf("table %q will be dropped and recreated", tableChange.table.Name), + }) // no need to report other errors as the user needs to drop the table altogether break } if !colChange.new && colChange.oldType != colChange.newType { - errors = append(errors, fmt.Sprintf("can't migrate table %q since changing the type of column %q from %q to %q is not supported. Try dropping this column for this table", tableChange.table.Name, colChange.name, colChange.oldType, colChange.newType)) + messages = append(messages, migrationMessage{ + err: fmt.Sprintf("can't migrate table %q since changing the type of column %q from %q to %q is not supported. Try dropping this column for this table", tableChange.table.Name, colChange.name, colChange.oldType, colChange.newType), + info: fmt.Sprintf("column %q of table %q will be dropped and recreated", colChange.name, tableChange.table.Name), + }) } } } - return errors + return messages } // This is the responsibility of the CLI of the client to lock before running migration @@ -156,9 +185,14 @@ func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error { return err } - migrationErrors := getMigrationErrors(schemaChanges) - if c.spec.MigrateMode == specs.MigrateModeSafe && len(migrationErrors) > 0 { - return fmt.Errorf("failed to migrate schema:\n%s\n\nTo force a migration add \"migrate_mode: %s\" to your destination spec", strings.Join(migrationErrors, "\n"), specs.MigrateModeForced.String()) + migrationMessages := getMigrationMessages(schemaChanges) + if len(migrationMessages) > 0 { + if c.spec.MigrateMode == specs.MigrateModeSafe { + return fmt.Errorf("failed to migrate schema:\n%s\n\nTo force a migration add \"migrate_mode: %s\" to your destination spec", strings.Join(migrationMessages.Errors(), "\n"), specs.MigrateModeForced.String()) + } + for _, msg := range migrationMessages.Infos() { + c.logger.Info().Msg(msg) + } } for _, tableChange := range schemaChanges { From d98d12e6161d042ae7546e3b1ec5972d5e98d88e Mon Sep 17 00:00:00 2001 From: erezrokah Date: Thu, 19 Jan 2023 15:36:19 +0200 Subject: [PATCH 5/7] style: Fix linting --- plugins/destination/sqlite/client/migrate.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/destination/sqlite/client/migrate.go b/plugins/destination/sqlite/client/migrate.go index c48351c2cdd1fc..817af6c9a21423 100644 --- a/plugins/destination/sqlite/client/migrate.go +++ b/plugins/destination/sqlite/client/migrate.go @@ -60,7 +60,7 @@ type migrationMessage struct { type migrationsMessages []migrationMessage func (m migrationsMessages) Errors() []string { - var errs []string + errs := make([]string, 0, len(m)) for _, msg := range m { errs = append(errs, msg.err) } @@ -68,7 +68,7 @@ func (m migrationsMessages) Errors() []string { } func (m migrationsMessages) Infos() []string { - var infos []string + infos := make([]string, 0, len(m)) for _, msg := range m { infos = append(infos, msg.info) } From 1925b9802cdabbd8bbd89d7545739563c8905a6c Mon Sep 17 00:00:00 2001 From: erezrokah Date: Thu, 19 Jan 2023 16:46:30 +0200 Subject: [PATCH 6/7] refactor: Code cleanup --- plugins/destination/sqlite/client/migrate.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/plugins/destination/sqlite/client/migrate.go b/plugins/destination/sqlite/client/migrate.go index 817af6c9a21423..3cb7f9d43b87e2 100644 --- a/plugins/destination/sqlite/client/migrate.go +++ b/plugins/destination/sqlite/client/migrate.go @@ -134,20 +134,18 @@ func (c *Client) getTableChange(ctx context.Context, table *schema.Table) (*tabl } func (c *Client) getSchemaChanges(ctx context.Context, tables schema.Tables) ([]*tableChange, error) { - changes := make([]*tableChange, len(tables)) - for i, table := range tables { + var changes []*tableChange + for _, table := range tables { tableChange, err := c.getTableChange(ctx, table) if err != nil { return nil, err } - changes[i] = tableChange - for _, relation := range table.Relations { - relationChanges, err := c.getTableChange(ctx, relation) - if err != nil { - return nil, err - } - changes = append(changes, relationChanges) + changes = append(changes, tableChange) + relationChanges, err := c.getSchemaChanges(ctx, table.Relations) + if err != nil { + return nil, err } + changes = append(changes, relationChanges...) } return changes, nil } From ed3a225525cbd691f7f02fbc40d686459b644952 Mon Sep 17 00:00:00 2001 From: erezrokah Date: Thu, 19 Jan 2023 16:57:38 +0200 Subject: [PATCH 7/7] style: Fix linting --- plugins/destination/sqlite/client/migrate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/destination/sqlite/client/migrate.go b/plugins/destination/sqlite/client/migrate.go index 3cb7f9d43b87e2..9c888b27283e20 100644 --- a/plugins/destination/sqlite/client/migrate.go +++ b/plugins/destination/sqlite/client/migrate.go @@ -134,7 +134,7 @@ func (c *Client) getTableChange(ctx context.Context, table *schema.Table) (*tabl } func (c *Client) getSchemaChanges(ctx context.Context, tables schema.Tables) ([]*tableChange, error) { - var changes []*tableChange + changes := make([]*tableChange, 0, len(tables)) for _, table := range tables { tableChange, err := c.getTableChange(ctx, table) if err != nil {