Skip to content

Commit dddd852

Browse files
authored
feat(sqlite-migrate): Support PK changes to schema (#7006)
#### Summary Related to #6600, #6763 and #6759 This PR adds support for PK changes migration. Before we would just ignore them, so the following use cases were ignored: 1. Adding a new column as a primary key -> The column was added, but the table PKs were left unchanged 2. Removing a PK from an existing column -> Was ignored 3. Adding a PK to an existing column -> Was ignored 4. Changing the type of an existing PK column -> Was handled by a regular non PK column type change. We dropped the column, but we need to drop the table. SQLite is quite limited in what can be done with PK changes. On any PK change we need to recreate the table, so either `drop + create` or `create new + copy old to new + drop old + rename new`. I went with the former for simplicity. <!--
1 parent 7cc65a4 commit dddd852

2 files changed

Lines changed: 167 additions & 65 deletions

File tree

plugins/destination/sqlite/client/client_test.go

Lines changed: 64 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,11 @@ func TestMigrateErrors(t *testing.T) {
6161
name: "should fail on migrate mode safe",
6262
spec: specs.Destination{Name: "cq_test_migrate", Spec: Spec{ConnectionString: ":memory:"}},
6363
wantErr: `failed to migrate schema:
64-
can't migrate table "table_1" since adding the new PK column "new_pk_column" is not supported. Try dropping this table
65-
can't migrate table "table_2" since changing the type of column "age" from "integer" to "text" is not supported. Try dropping this column for this table
66-
can't migrate table "table_2" since changing the type of column "created_at" from "text" to "timestamp" is not supported. Try dropping this column for this table
67-
68-
To force a migration add "migrate_mode: forced" to your destination spec`,
64+
can't migrate table "table_with_new_pk_column" since adding the new PK column "new_pk_column" is not supported. Try dropping this table
65+
can't migrate table "table_with_pk_addition_existing_column" since making the existing column "id" a PK is not supported. Try dropping this table
66+
can't migrate table "table_with_pk_removal" since removing an existing column "id" as a PK is not supported. Try dropping this table
67+
can't migrate table "table_with_pk_type_change" since changing the type of the PK column "id" from "integer" to "text" is not supported. Try dropping this table
68+
can't migrate table "table_with_non_pk_type_change" since changing the type of column "created_at" from "text" to "timestamp" is not supported. Try dropping this column for this table`,
6969
},
7070
{
7171
name: "should succeed on migrate mode force",
@@ -75,15 +75,39 @@ To force a migration add "migrate_mode: forced" to your destination spec`,
7575

7676
for _, tt := range tests {
7777
t.Run(tt.name, func(t *testing.T) {
78-
table := schema.Table{
79-
Name: "table_1",
80-
Columns: []schema.Column{
81-
{Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}},
82-
{Name: "name", Type: schema.TypeString},
83-
{Name: "age", Type: schema.TypeInt},
84-
{Name: "created_at", Type: schema.TypeString},
78+
cqId := schema.CqIDColumn
79+
cqIdWithPK := cqId
80+
cqIdWithPK.CreationOptions.PrimaryKey = true
81+
beforeSchema := schema.Tables{
82+
{
83+
Name: "table_with_new_column",
84+
Columns: []schema.Column{cqId, {Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}},
85+
},
86+
{
87+
Name: "table_with_new_pk_column",
88+
// cq_id is needed for initial migrations of tables without a PK
89+
Columns: []schema.Column{cqIdWithPK},
90+
},
91+
{
92+
Name: "table_with_pk_addition_existing_column",
93+
// cq_id is needed for initial migrations of tables without a PK
94+
Columns: []schema.Column{cqIdWithPK, {Name: "id", Type: schema.TypeUUID}},
95+
},
96+
{
97+
Name: "table_with_pk_removal",
98+
Columns: []schema.Column{cqId, {Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}},
99+
},
100+
{
101+
Name: "table_with_pk_type_change",
102+
Columns: []schema.Column{cqId, {Name: "id", Type: schema.TypeInt, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}},
103+
},
104+
{
105+
Name: "table_with_non_pk_type_change",
106+
// cq_id is needed for initial migrations of tables without a PK
107+
Columns: []schema.Column{cqIdWithPK, {Name: "created_at", Type: schema.TypeString}},
85108
},
86109
}
110+
87111
p := destination.NewPlugin("sqlite", "development", New)
88112
ctx := context.Background()
89113

@@ -92,56 +116,44 @@ To force a migration add "migrate_mode: forced" to your destination spec`,
92116
t.Fatal(err)
93117
}
94118

95-
if err := p.Migrate(ctx, []*schema.Table{&table}); err != nil {
119+
if err := p.Migrate(ctx, schema.Tables{beforeSchema[0]}); err != nil {
96120
t.Fatal(err)
97121
}
98122

99-
tableWithMigratableChange := schema.Table{
100-
Name: "table_1",
101-
Columns: []schema.Column{
102-
{Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}},
103-
{Name: "name", Type: schema.TypeString},
104-
{Name: "age", Type: schema.TypeInt},
105-
{Name: "created_at", Type: schema.TypeString},
106-
{Name: "new_column", Type: schema.TypeString},
107-
},
108-
}
109-
newTable := schema.Table{
110-
Name: "table_2",
111-
Columns: []schema.Column{
112-
{Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}},
113-
{Name: "name", Type: schema.TypeString},
114-
{Name: "age", Type: schema.TypeInt},
115-
{Name: "created_at", Type: schema.TypeString},
116-
},
117-
}
118-
119-
if err := p.Migrate(ctx, []*schema.Table{&tableWithMigratableChange, &newTable}); err != nil {
123+
// Adding a new column to the table is the only safe migratable change in SQLite
124+
beforeSchema[0].Columns = append(beforeSchema[0].Columns, schema.Column{Name: "new_column", Type: schema.TypeString})
125+
if err := p.Migrate(ctx, beforeSchema); err != nil {
120126
t.Fatal(err)
121127
}
122128

123-
tableWithNonTableDropNeeded := schema.Table{
124-
Name: "table_1",
125-
Columns: []schema.Column{
126-
{Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}},
127-
{Name: "name", Type: schema.TypeString},
128-
{Name: "age", Type: schema.TypeInt},
129-
{Name: "new_pk_column", Type: schema.TypeString, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}},
130-
{Name: "created_at", Type: schema.TypeTimestamp},
129+
afterSchema := schema.Tables{
130+
{
131+
Name: "table_with_new_column",
132+
Columns: []schema.Column{cqId, {Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}, {Name: "new_column", Type: schema.TypeString}},
131133
},
132-
}
133-
134-
tableWithColumnsDropNeeded := schema.Table{
135-
Name: "table_2",
136-
Columns: []schema.Column{
137-
{Name: "id", Type: schema.TypeUUID},
138-
{Name: "name", Type: schema.TypeString, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}},
139-
{Name: "age", Type: schema.TypeString},
140-
{Name: "created_at", Type: schema.TypeTimestamp},
134+
{
135+
Name: "table_with_new_pk_column",
136+
Columns: []schema.Column{cqId, {Name: "new_pk_column", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}},
137+
},
138+
{
139+
Name: "table_with_pk_addition_existing_column",
140+
Columns: []schema.Column{cqId, {Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}},
141+
},
142+
{
143+
Name: "table_with_pk_removal",
144+
Columns: []schema.Column{cqIdWithPK, {Name: "id", Type: schema.TypeUUID}},
145+
},
146+
{
147+
Name: "table_with_pk_type_change",
148+
Columns: []schema.Column{cqId, {Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}}},
149+
},
150+
{
151+
Name: "table_with_non_pk_type_change",
152+
Columns: []schema.Column{cqIdWithPK, {Name: "created_at", Type: schema.TypeTimestamp}},
141153
},
142154
}
143155

144-
err := p.Migrate(ctx, []*schema.Table{&tableWithNonTableDropNeeded, &tableWithColumnsDropNeeded})
156+
err := p.Migrate(ctx, afterSchema)
145157
if tt.wantErr != "" {
146158
require.ErrorContains(t, err, tt.wantErr)
147159
} else {

plugins/destination/sqlite/client/migrate.go

Lines changed: 103 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,34 @@ type columnChange struct {
4646
newPk bool
4747
}
4848

49+
func (c *columnChange) isNonPKTypeChange() bool {
50+
return !c.new && c.oldType != c.newType && !c.newPk
51+
}
52+
53+
func (c *columnChange) isPKTypeChange() bool {
54+
return !c.new && c.oldType != c.newType && c.newPk
55+
}
56+
57+
func (c *columnChange) isNewPKColumn() bool {
58+
return c.new && c.newPk
59+
}
60+
61+
func (c *columnChange) isPKAddToExistingColumn() bool {
62+
return !c.new && !c.oldPk && c.newPk
63+
}
64+
65+
func (c *columnChange) isPKRemoveFromExistingColumn() bool {
66+
return !c.new && c.oldPk && !c.newPk
67+
}
68+
69+
func (c *columnChange) needsTableDrop() bool {
70+
return c.isNewPKColumn() || c.isPKAddToExistingColumn() || c.isPKRemoveFromExistingColumn() || c.isPKTypeChange()
71+
}
72+
73+
func (c *columnChange) isInternal() bool {
74+
return strings.HasPrefix(c.name, "_cq_")
75+
}
76+
4977
type tableChange struct {
5078
table *schema.Table
5179
new bool
@@ -80,10 +108,10 @@ func (c *Client) getColumnChange(col schema.Column, sqliteColumn *columnInfo) *c
80108
columnType := c.SchemaTypeToSqlite(col.Type)
81109

82110
if sqliteColumn == nil {
83-
return &columnChange{name: columnName, oldType: columnType, newType: columnType, new: true, newPk: col.CreationOptions.PrimaryKey}
111+
return &columnChange{name: columnName, oldType: columnType, newType: columnType, new: true, oldPk: c.enabledPks() && col.CreationOptions.PrimaryKey, newPk: c.enabledPks() && col.CreationOptions.PrimaryKey}
84112
}
85113

86-
return &columnChange{name: columnName, oldType: sqliteColumn.typ, newType: columnType, oldPk: sqliteColumn.pk != 0, newPk: col.CreationOptions.PrimaryKey}
114+
return &columnChange{name: columnName, oldType: sqliteColumn.typ, newType: columnType, oldPk: c.enabledPks() && sqliteColumn.pk != 0, newPk: c.enabledPks() && col.CreationOptions.PrimaryKey}
87115
}
88116

89117
func (c *Client) getColumnChanges(table *schema.Table) ([]*columnChange, error) {
@@ -98,19 +126,28 @@ func (c *Client) getColumnChanges(table *schema.Table) ([]*columnChange, error)
98126
columnChanges[i] = c.getColumnChange(col, info.getColumn(col.Name))
99127
}
100128

101-
// Changes that require dropping the table comes first
102129
sort.SliceStable(columnChanges, func(i, j int) bool {
103130
change1 := columnChanges[i]
104131
change2 := columnChanges[j]
105132

106-
if change1.new && change1.newPk && !(change2.new && change2.newPk) {
133+
// Changes that require dropping the table comes first
134+
if change1.needsTableDrop() && !(change2.needsTableDrop()) {
107135
return true
108136
}
109137

110-
if !(change1.new && change1.newPk) && change2.new && change2.newPk {
138+
if !(change1.needsTableDrop()) && change2.needsTableDrop() {
139+
return false
140+
}
141+
142+
// Internal columns come last
143+
if change1.isInternal() && !change2.isInternal() {
111144
return false
112145
}
113146

147+
if !change1.isInternal() && change2.isInternal() {
148+
return true
149+
}
150+
114151
return change1.name < change2.name
115152
})
116153

@@ -157,15 +194,39 @@ func getMigrationMessages(changes []*tableChange) migrationsMessages {
157194
continue
158195
}
159196
for _, colChange := range tableChange.columnChanges {
160-
if colChange.new && colChange.newPk {
197+
if colChange.isNewPKColumn() {
161198
messages = append(messages, migrationMessage{
162199
err: fmt.Sprintf("can't migrate table %q since adding the new PK column %q is not supported. Try dropping this table", tableChange.table.Name, colChange.name),
163-
info: fmt.Sprintf("table %q will be dropped and recreated", tableChange.table.Name),
200+
info: fmt.Sprintf("table %q will be dropped and recreated due to adding %q as a PK", tableChange.table.Name, colChange.name),
201+
})
202+
// no need to report other errors as the user needs to drop the table altogether
203+
break
204+
}
205+
if colChange.isPKAddToExistingColumn() {
206+
messages = append(messages, migrationMessage{
207+
err: fmt.Sprintf("can't migrate table %q since making the existing column %q a PK is not supported. Try dropping this table", tableChange.table.Name, colChange.name),
208+
info: fmt.Sprintf("table %q will be dropped and recreated due to adding %q as a PK", tableChange.table.Name, colChange.name),
209+
})
210+
// no need to report other errors as the user needs to drop the table altogether
211+
break
212+
}
213+
if colChange.isPKRemoveFromExistingColumn() {
214+
messages = append(messages, migrationMessage{
215+
err: fmt.Sprintf("can't migrate table %q since removing an existing column %q as a PK is not supported. Try dropping this table", tableChange.table.Name, colChange.name),
216+
info: fmt.Sprintf("table %q will be dropped and recreated due to removing %q as a PK", tableChange.table.Name, colChange.name),
164217
})
165218
// no need to report other errors as the user needs to drop the table altogether
166219
break
167220
}
168-
if !colChange.new && colChange.oldType != colChange.newType {
221+
if colChange.isPKTypeChange() {
222+
messages = append(messages, migrationMessage{
223+
err: fmt.Sprintf("can't migrate table %q since changing the type of the PK column %q from %q to %q is not supported. Try dropping this table", tableChange.table.Name, colChange.name, colChange.oldType, colChange.newType),
224+
info: fmt.Sprintf("table %q will be dropped and recreated due to the type change of the PK column %q", tableChange.table.Name, colChange.name),
225+
})
226+
// no need to report other errors as the user needs to drop the table altogether
227+
break
228+
}
229+
if colChange.isNonPKTypeChange() {
169230
messages = append(messages, migrationMessage{
170231
err: fmt.Sprintf("can't migrate table %q since changing the type of column %q from %q to %q is not supported. Try dropping this column for this table", tableChange.table.Name, colChange.name, colChange.oldType, colChange.newType),
171232
info: fmt.Sprintf("column %q of table %q will be dropped and recreated", colChange.name, tableChange.table.Name),
@@ -209,23 +270,43 @@ func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error {
209270
columnName := colChange.name
210271
columnType := colChange.newType
211272
// If this is a new PK column we need to drop the table
212-
if colChange.new && colChange.newPk {
273+
if colChange.isNewPKColumn() {
213274
c.logger.Debug().Str("table", tableName).Str("column", colChange.name).Msg("New column is a primary key, dropping and adding table since in forced migrate mode")
214275
err := c.recreateTable(table)
215276
if err != nil {
216277
return err
217278
}
218279
break
219280
}
220-
if colChange.new {
221-
c.logger.Debug().Str("table", tableName).Str("column", colChange.name).Msg("Column doesn't exist, creating")
222-
err := c.addColumn(tableName, columnName, columnType)
281+
// SQLite doesn't support PK additions on tables so we need to drop and add the table
282+
if colChange.isPKAddToExistingColumn() {
283+
c.logger.Debug().Str("table", table.Name).Str("column", colChange.name).Msg("Primary key added for existing column, dropping and adding table since in forced migrate mode")
284+
err := c.recreateTable(table)
223285
if err != nil {
224286
return err
225287
}
288+
break
289+
}
290+
// SQLite doesn't support PK removals on tables so we need to drop and add the table
291+
if colChange.isPKRemoveFromExistingColumn() {
292+
c.logger.Debug().Str("table", table.Name).Str("column", colChange.name).Msg("Primary key removed for existing column, dropping and adding table since in forced migrate mode")
293+
err := c.recreateTable(table)
294+
if err != nil {
295+
return err
296+
}
297+
break
298+
}
299+
// Since we can't recreate a PK column in SQLite we need to drop and add the table if a type changed for an existing PK column
300+
if colChange.isPKTypeChange() {
301+
c.logger.Debug().Str("table", table.Name).Str("column", colChange.name).Msg("Type changed for existing primary key column, dropping and adding table since in forced migrate mode")
302+
err := c.recreateTable(table)
303+
if err != nil {
304+
return err
305+
}
306+
break
226307
}
227308
// if this is an existing column with type change we need to drop and add it
228-
if !colChange.new && colChange.oldType != colChange.newType {
309+
if colChange.isNonPKTypeChange() {
229310
c.logger.Debug().Str("table", table.Name).Str("column", colChange.name).Msg("Existing column type changed, dropping and adding column since in forced migrate mode")
230311
err := c.dropColumn(tableName, columnName)
231312
if err != nil {
@@ -235,6 +316,15 @@ func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error {
235316
if err != nil {
236317
return err
237318
}
319+
continue
320+
}
321+
if colChange.new {
322+
c.logger.Debug().Str("table", tableName).Str("column", colChange.name).Msg("Column doesn't exist, creating")
323+
err := c.addColumn(tableName, columnName, columnType)
324+
if err != nil {
325+
return err
326+
}
327+
continue
238328
}
239329
}
240330
}

0 commit comments

Comments
 (0)