Skip to content

Commit 38db6c5

Browse files
authored
feat(sqlite): Migrate to github.com/cloudquery/plugin-sdk/v3 (#10818)
Closes #10730
1 parent e18131a commit 38db6c5

11 files changed

Lines changed: 113 additions & 127 deletions

File tree

plugins/destination/sqlite/client/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
"fmt"
77

88
"github.com/cloudquery/plugin-pb-go/specs"
9-
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
9+
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
1010
"github.com/rs/zerolog"
1111

1212
// Import sqlite3 driver

plugins/destination/sqlite/client/client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"testing"
55

66
"github.com/cloudquery/plugin-pb-go/specs"
7-
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
7+
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
88
)
99

1010
var migrateStrategy = destination.MigrateStrategy{

plugins/destination/sqlite/client/deletestale.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@ import (
55
"strings"
66
"time"
77

8-
"github.com/cloudquery/plugin-sdk/v2/schema"
8+
"github.com/cloudquery/plugin-sdk/v3/schema"
99
)
1010

11-
func (c *Client) DeleteStale(ctx context.Context, tables schema.Schemas, source string, syncTime time.Time) error {
11+
func (c *Client) DeleteStale(ctx context.Context, tables schema.Tables, source string, syncTime time.Time) error {
1212
for _, table := range tables {
1313
var sb strings.Builder
1414
sb.WriteString("delete from ")
15-
sb.WriteString(`"` + schema.TableName(table) + `"`)
15+
sb.WriteString(`"` + table.Name + `"`)
1616
sb.WriteString(" where ")
1717
sb.WriteString(`"` + schema.CqSourceNameColumn.Name + `"`)
1818
sb.WriteString(" = $1 and datetime(")

plugins/destination/sqlite/client/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package client
22

33
import (
4-
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
4+
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
55
)
66

77
func (c *Client) Metrics() destination.Metrics {

plugins/destination/sqlite/client/migrate.go

Lines changed: 82 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77

88
"github.com/apache/arrow/go/v13/arrow"
99
"github.com/cloudquery/plugin-pb-go/specs"
10-
"github.com/cloudquery/plugin-sdk/v2/schema"
10+
"github.com/cloudquery/plugin-sdk/v3/schema"
1111
)
1212

1313
const (
@@ -27,158 +27,142 @@ type tableInfo struct {
2727
columns []columnInfo
2828
}
2929

30-
func (c *Client) sqliteTables(schemas schema.Schemas) (schema.Schemas, error) {
31-
var schemaTables schema.Schemas
32-
for _, sc := range schemas {
33-
var fields []arrow.Field
34-
tableName := schema.TableName(sc)
35-
if tableName == "" {
36-
return nil, fmt.Errorf("schema %s has no table name", sc.String())
37-
}
38-
info, err := c.getTableInfo(tableName)
30+
func (c *Client) sqliteTables(tables schema.Tables) (schema.Tables, error) {
31+
var schemaTables schema.Tables
32+
for _, table := range tables {
33+
var columns []schema.Column
34+
info, err := c.getTableInfo(table.Name)
3935
if info == nil {
4036
continue
4137
}
4238
if err != nil {
4339
return nil, err
4440
}
4541
for _, col := range info.columns {
46-
var fieldMetadata schema.MetadataFieldOptions
47-
if col.pk != 0 {
48-
fieldMetadata.PrimaryKey = true
49-
}
50-
fields = append(fields, arrow.Field{
51-
Name: col.name,
52-
Type: c.sqliteTypeToArrowType(col.typ),
53-
Nullable: !col.notNull,
54-
Metadata: schema.NewFieldMetadataFromOptions(fieldMetadata),
42+
columns = append(columns, schema.Column{
43+
Name: col.name,
44+
Type: c.sqliteTypeToArrowType(col.typ),
45+
PrimaryKey: col.pk != 0,
46+
NotNull: col.notNull,
5547
})
5648
}
57-
var tableMetadata schema.MetadataSchemaOptions
58-
tableMetadata.TableName = tableName
59-
m := schema.NewSchemaMetadataFromOptions(tableMetadata)
60-
schemaTables = append(schemaTables, arrow.NewSchema(fields, &m))
49+
schemaTables = append(schemaTables, &schema.Table{Name: table.Name, Columns: columns})
6150
}
6251
return schemaTables, nil
6352
}
6453

65-
func (c *Client) normalizeSchemas(scs schema.Schemas) schema.Schemas {
66-
var normalized schema.Schemas
67-
for _, sc := range scs {
68-
fields := make([]arrow.Field, 0)
69-
for _, f := range sc.Fields() {
70-
keys := make([]string, 0)
71-
values := make([]string, 0)
72-
origKeys := f.Metadata.Keys()
73-
origValues := f.Metadata.Values()
74-
for k, v := range origKeys {
75-
if v != schema.MetadataUnique {
76-
keys = append(keys, v)
77-
values = append(values, origValues[k])
78-
}
79-
}
80-
fields = append(fields, arrow.Field{
81-
Name: f.Name,
82-
Type: c.arrowTypeToSqlite(f.Type),
83-
Nullable: f.Nullable,
84-
Metadata: arrow.NewMetadata(keys, values),
85-
})
86-
}
54+
func (c *Client) normalizeTables(tables schema.Tables) schema.Tables {
55+
flattened := tables.FlattenTables()
56+
normalized := make(schema.Tables, len(flattened))
57+
for i, table := range flattened {
58+
normalized[i] = c.normalizeTable(table)
59+
}
60+
return normalized
61+
}
8762

88-
md := sc.Metadata()
89-
normalized = append(normalized, arrow.NewSchema(fields, &md))
63+
func (c *Client) normalizeTable(table *schema.Table) *schema.Table {
64+
columns := make([]schema.Column, len(table.Columns))
65+
for i, col := range table.Columns {
66+
normalized := c.normalizeField(col.ToArrowField())
67+
columns[i] = schema.NewColumnFromArrowField(*normalized)
9068
}
69+
return &schema.Table{Name: table.Name, Columns: columns}
70+
}
9171

92-
return normalized
72+
func (c *Client) normalizeField(field arrow.Field) *arrow.Field {
73+
return &arrow.Field{
74+
Name: field.Name,
75+
Type: c.arrowTypeToSqlite(field.Type),
76+
Nullable: field.Nullable,
77+
Metadata: field.Metadata,
78+
}
9379
}
9480

95-
func (c *Client) nonAutoMigrableTables(tables schema.Schemas, sqliteTables schema.Schemas) ([]string, [][]schema.FieldChange) {
81+
func (c *Client) nonAutoMigratableTables(tables schema.Tables, sqliteTables schema.Tables) ([]string, [][]schema.TableColumnChange) {
9682
var result []string
97-
var tableChanges [][]schema.FieldChange
83+
var tableChanges [][]schema.TableColumnChange
9884
for _, t := range tables {
99-
tableName := schema.TableName(t)
100-
sqliteTable := sqliteTables.SchemaByName(tableName)
85+
sqliteTable := sqliteTables.Get(t.Name)
10186
if sqliteTable == nil {
10287
continue
10388
}
104-
changes := schema.GetSchemaChanges(t, sqliteTable)
89+
changes := sqliteTable.GetChanges(t)
10590
if !c.canAutoMigrate(changes) {
106-
result = append(result, tableName)
91+
result = append(result, t.Name)
10792
tableChanges = append(tableChanges, changes)
10893
}
10994
}
11095
return result, tableChanges
11196
}
11297

113-
func (c *Client) autoMigrateTable(table *arrow.Schema, changes []schema.FieldChange) error {
98+
func (c *Client) autoMigrateTable(table *schema.Table, changes []schema.TableColumnChange) error {
11499
for _, change := range changes {
115100
if change.Type == schema.TableColumnChangeTypeAdd {
116-
if err := c.addColumn(schema.TableName(table), change.Current.Name, c.arrowTypeToSqliteStr(change.Current.Type)); err != nil {
101+
if err := c.addColumn(table.Name, change.Current.Name, c.arrowTypeToSqliteStr(change.Current.Type)); err != nil {
117102
return err
118103
}
119104
}
120105
}
121106
return nil
122107
}
123108

124-
func (*Client) canAutoMigrate(changes []schema.FieldChange) bool {
109+
func (*Client) canAutoMigrate(changes []schema.TableColumnChange) bool {
125110
for _, change := range changes {
126-
if change.Type == schema.TableColumnChangeTypeAdd && (schema.IsPk(change.Current) || !change.Current.Nullable) {
127-
return false
128-
}
129-
130-
if change.Type == schema.TableColumnChangeTypeRemove && (schema.IsPk(change.Previous) || !change.Previous.Nullable) {
131-
return false
132-
}
133-
134-
if change.Type == schema.TableColumnChangeTypeUpdate {
111+
switch change.Type {
112+
case schema.TableColumnChangeTypeAdd:
113+
if change.Current.PrimaryKey || change.Current.NotNull {
114+
return false
115+
}
116+
case schema.TableColumnChangeTypeRemove:
117+
if change.Previous.PrimaryKey || change.Previous.NotNull {
118+
return false
119+
}
120+
case schema.TableColumnChangeTypeUpdate:
135121
return false
122+
default:
123+
panic("unknown change type")
136124
}
137125
}
138126
return true
139127
}
140128

141129
// This is the responsibility of the CLI of the client to lock before running migration
142-
func (c *Client) Migrate(ctx context.Context, schemas schema.Schemas) error {
143-
schemas = c.normalizeSchemas(schemas)
144-
sqliteTables, err := c.sqliteTables(schemas)
130+
func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error {
131+
normalizedTables := c.normalizeTables(tables)
132+
sqliteTables, err := c.sqliteTables(normalizedTables)
145133
if err != nil {
146134
return err
147135
}
148136

149137
if c.spec.MigrateMode != specs.MigrateModeForced {
150-
nonAutoMigrableTables, changes := c.nonAutoMigrableTables(schemas, sqliteTables)
151-
if len(nonAutoMigrableTables) > 0 {
152-
return fmt.Errorf("tables %s with changes %v require force migration. use 'migrate_mode: forced'", strings.Join(nonAutoMigrableTables, ","), changes)
138+
nonAutoMigratableTables, changes := c.nonAutoMigratableTables(normalizedTables, sqliteTables)
139+
if len(nonAutoMigratableTables) > 0 {
140+
return fmt.Errorf("tables %s with changes %v require force migration. use 'migrate_mode: forced'", strings.Join(nonAutoMigratableTables, ","), changes)
153141
}
154142
}
155143

156-
for _, table := range schemas {
157-
tableName := schema.TableName(table)
158-
if tableName == "" {
159-
return fmt.Errorf("schema %s has no table name", table.String())
160-
}
161-
c.logger.Info().Str("table", tableName).Msg("Migrating table")
162-
if len(table.Fields()) == 0 {
163-
c.logger.Info().Str("table", tableName).Msg("Table with no columns, skipping")
144+
for _, table := range normalizedTables {
145+
c.logger.Info().Str("table", table.Name).Msg("Migrating table")
146+
if len(table.Columns) == 0 {
147+
c.logger.Info().Str("table", table.Name).Msg("Table with no columns, skipping")
164148
continue
165149
}
166150

167-
sqlite := sqliteTables.SchemaByName(tableName)
151+
sqlite := sqliteTables.Get(table.Name)
168152
if sqlite == nil {
169-
c.logger.Debug().Str("table", tableName).Msg("Table doesn't exist, creating")
153+
c.logger.Debug().Str("table", table.Name).Msg("Table doesn't exist, creating")
170154
if err := c.createTableIfNotExist(table); err != nil {
171155
return err
172156
}
173157
} else {
174-
changes := schema.GetSchemaChanges(table, sqlite)
158+
changes := table.GetChanges(sqlite)
175159
if c.canAutoMigrate(changes) {
176-
c.logger.Info().Str("table", tableName).Msg("Table exists, auto-migrating")
160+
c.logger.Info().Str("table", table.Name).Msg("Table exists, auto-migrating")
177161
if err := c.autoMigrateTable(table, changes); err != nil {
178162
return err
179163
}
180164
} else {
181-
c.logger.Info().Str("table", tableName).Msg("Table exists, force migration required")
165+
c.logger.Info().Str("table", table.Name).Msg("Table exists, force migration required")
182166
if err := c.recreateTable(table); err != nil {
183167
return err
184168
}
@@ -189,14 +173,10 @@ func (c *Client) Migrate(ctx context.Context, schemas schema.Schemas) error {
189173
return nil
190174
}
191175

192-
func (c *Client) recreateTable(table *arrow.Schema) error {
193-
tableName, ok := table.Metadata().GetValue(schema.MetadataTableName)
194-
if !ok {
195-
return fmt.Errorf("schema %s has no table name", table.String())
196-
}
197-
sql := "drop table if exists \"" + tableName + "\""
176+
func (c *Client) recreateTable(table *schema.Table) error {
177+
sql := "drop table if exists \"" + table.Name + "\""
198178
if _, err := c.db.Exec(sql); err != nil {
199-
return fmt.Errorf("failed to drop table %s: %w", tableName, err)
179+
return fmt.Errorf("failed to drop table %s: %w", table.Name, err)
200180
}
201181
return c.createTableIfNotExist(table)
202182
}
@@ -209,44 +189,41 @@ func (c *Client) addColumn(tableName string, columnName string, columnType strin
209189
return nil
210190
}
211191

212-
func (c *Client) createTableIfNotExist(sc *arrow.Schema) error {
192+
func (c *Client) createTableIfNotExist(table *schema.Table) error {
213193
var sb strings.Builder
214-
tableName, ok := sc.Metadata().GetValue(schema.MetadataTableName)
215-
if !ok {
216-
return fmt.Errorf("schema %s has no table name", sc.String())
217-
}
218-
// TODO sanitize tablename
194+
195+
// TODO sanitize table.Name
219196
sb.WriteString("CREATE TABLE IF NOT EXISTS ")
220-
sb.WriteString(`"` + tableName + `"`)
197+
sb.WriteString(`"` + table.Name + `"`)
221198
sb.WriteString(" (")
222-
totalColumns := len(sc.Fields())
199+
totalColumns := len(table.Columns)
223200

224201
primaryKeys := []string{}
225-
for i, col := range sc.Fields() {
202+
for i, col := range table.Columns {
226203
sqlType := c.arrowTypeToSqliteStr(col.Type)
227204
if sqlType == "" {
228-
c.logger.Warn().Str("table", tableName).Str("column", col.Name).Msg("Column type is not supported, skipping")
205+
c.logger.Warn().Str("table", table.Name).Str("column", col.Name).Msg("Column type is not supported, skipping")
229206
continue
230207
}
231208
// TODO: sanitize column name
232209
fieldDef := `"` + col.Name + `" ` + sqlType
233-
if !col.Nullable {
210+
if col.NotNull {
234211
fieldDef += " NOT NULL"
235212
}
236213
sb.WriteString(fieldDef)
237214
if i != totalColumns-1 {
238215
sb.WriteString(",")
239216
}
240217

241-
if c.enabledPks() && schema.IsPk(col) {
218+
if c.enabledPks() && col.PrimaryKey {
242219
primaryKeys = append(primaryKeys, `"`+col.Name+`"`)
243220
}
244221
}
245222

246223
if len(primaryKeys) > 0 {
247224
// add composite PK constraint on primary key columns
248225
sb.WriteString(", CONSTRAINT ")
249-
sb.WriteString(tableName)
226+
sb.WriteString(table.Name)
250227
sb.WriteString("_cqpk PRIMARY KEY (")
251228
sb.WriteString(strings.Join(primaryKeys, ","))
252229
sb.WriteString(")")

0 commit comments

Comments
 (0)