77
88 "github.com/apache/arrow/go/v13/arrow"
99 "github.com/cloudquery/plugin-pb-go/specs"
10- "github.com/cloudquery/plugin-sdk/v2 /schema"
10+ "github.com/cloudquery/plugin-sdk/v3 /schema"
1111)
1212
1313const (
@@ -27,158 +27,142 @@ type tableInfo struct {
2727 columns []columnInfo
2828}
2929
30- func (c * Client ) sqliteTables (schemas schema.Schemas ) (schema.Schemas , error ) {
31- var schemaTables schema.Schemas
32- for _ , sc := range schemas {
33- var fields []arrow.Field
34- tableName := schema .TableName (sc )
35- if tableName == "" {
36- return nil , fmt .Errorf ("schema %s has no table name" , sc .String ())
37- }
38- info , err := c .getTableInfo (tableName )
30+ func (c * Client ) sqliteTables (tables schema.Tables ) (schema.Tables , error ) {
31+ var schemaTables schema.Tables
32+ for _ , table := range tables {
33+ var columns []schema.Column
34+ info , err := c .getTableInfo (table .Name )
3935 if info == nil {
4036 continue
4137 }
4238 if err != nil {
4339 return nil , err
4440 }
4541 for _ , col := range info .columns {
46- var fieldMetadata schema.MetadataFieldOptions
47- if col .pk != 0 {
48- fieldMetadata .PrimaryKey = true
49- }
50- fields = append (fields , arrow.Field {
51- Name : col .name ,
52- Type : c .sqliteTypeToArrowType (col .typ ),
53- Nullable : ! col .notNull ,
54- Metadata : schema .NewFieldMetadataFromOptions (fieldMetadata ),
42+ columns = append (columns , schema.Column {
43+ Name : col .name ,
44+ Type : c .sqliteTypeToArrowType (col .typ ),
45+ PrimaryKey : col .pk != 0 ,
46+ NotNull : col .notNull ,
5547 })
5648 }
57- var tableMetadata schema.MetadataSchemaOptions
58- tableMetadata .TableName = tableName
59- m := schema .NewSchemaMetadataFromOptions (tableMetadata )
60- schemaTables = append (schemaTables , arrow .NewSchema (fields , & m ))
49+ schemaTables = append (schemaTables , & schema.Table {Name : table .Name , Columns : columns })
6150 }
6251 return schemaTables , nil
6352}
6453
65- func (c * Client ) normalizeSchemas (scs schema.Schemas ) schema.Schemas {
66- var normalized schema.Schemas
67- for _ , sc := range scs {
68- fields := make ([]arrow.Field , 0 )
69- for _ , f := range sc .Fields () {
70- keys := make ([]string , 0 )
71- values := make ([]string , 0 )
72- origKeys := f .Metadata .Keys ()
73- origValues := f .Metadata .Values ()
74- for k , v := range origKeys {
75- if v != schema .MetadataUnique {
76- keys = append (keys , v )
77- values = append (values , origValues [k ])
78- }
79- }
80- fields = append (fields , arrow.Field {
81- Name : f .Name ,
82- Type : c .arrowTypeToSqlite (f .Type ),
83- Nullable : f .Nullable ,
84- Metadata : arrow .NewMetadata (keys , values ),
85- })
86- }
54+ func (c * Client ) normalizeTables (tables schema.Tables ) schema.Tables {
55+ flattened := tables .FlattenTables ()
56+ normalized := make (schema.Tables , len (flattened ))
57+ for i , table := range flattened {
58+ normalized [i ] = c .normalizeTable (table )
59+ }
60+ return normalized
61+ }
8762
88- md := sc .Metadata ()
89- normalized = append (normalized , arrow .NewSchema (fields , & md ))
63+ func (c * Client ) normalizeTable (table * schema.Table ) * schema.Table {
64+ columns := make ([]schema.Column , len (table .Columns ))
65+ for i , col := range table .Columns {
66+ normalized := c .normalizeField (col .ToArrowField ())
67+ columns [i ] = schema .NewColumnFromArrowField (* normalized )
9068 }
69+ return & schema.Table {Name : table .Name , Columns : columns }
70+ }
9171
92- return normalized
72+ func (c * Client ) normalizeField (field arrow.Field ) * arrow.Field {
73+ return & arrow.Field {
74+ Name : field .Name ,
75+ Type : c .arrowTypeToSqlite (field .Type ),
76+ Nullable : field .Nullable ,
77+ Metadata : field .Metadata ,
78+ }
9379}
9480
95- func (c * Client ) nonAutoMigrableTables (tables schema.Schemas , sqliteTables schema.Schemas ) ([]string , [][]schema.FieldChange ) {
81+ func (c * Client ) nonAutoMigratableTables (tables schema.Tables , sqliteTables schema.Tables ) ([]string , [][]schema.TableColumnChange ) {
9682 var result []string
97- var tableChanges [][]schema.FieldChange
83+ var tableChanges [][]schema.TableColumnChange
9884 for _ , t := range tables {
99- tableName := schema .TableName (t )
100- sqliteTable := sqliteTables .SchemaByName (tableName )
85+ sqliteTable := sqliteTables .Get (t .Name )
10186 if sqliteTable == nil {
10287 continue
10388 }
104- changes := schema . GetSchemaChanges ( t , sqliteTable )
89+ changes := sqliteTable . GetChanges ( t )
10590 if ! c .canAutoMigrate (changes ) {
106- result = append (result , tableName )
91+ result = append (result , t . Name )
10792 tableChanges = append (tableChanges , changes )
10893 }
10994 }
11095 return result , tableChanges
11196}
11297
113- func (c * Client ) autoMigrateTable (table * arrow. Schema , changes []schema.FieldChange ) error {
98+ func (c * Client ) autoMigrateTable (table * schema. Table , changes []schema.TableColumnChange ) error {
11499 for _ , change := range changes {
115100 if change .Type == schema .TableColumnChangeTypeAdd {
116- if err := c .addColumn (schema . TableName ( table ) , change .Current .Name , c .arrowTypeToSqliteStr (change .Current .Type )); err != nil {
101+ if err := c .addColumn (table . Name , change .Current .Name , c .arrowTypeToSqliteStr (change .Current .Type )); err != nil {
117102 return err
118103 }
119104 }
120105 }
121106 return nil
122107}
123108
124- func (* Client ) canAutoMigrate (changes []schema.FieldChange ) bool {
109+ func (* Client ) canAutoMigrate (changes []schema.TableColumnChange ) bool {
125110 for _ , change := range changes {
126- if change .Type == schema .TableColumnChangeTypeAdd && (schema .IsPk (change .Current ) || ! change .Current .Nullable ) {
127- return false
128- }
129-
130- if change .Type == schema .TableColumnChangeTypeRemove && (schema .IsPk (change .Previous ) || ! change .Previous .Nullable ) {
131- return false
132- }
133-
134- if change .Type == schema .TableColumnChangeTypeUpdate {
111+ switch change .Type {
112+ case schema .TableColumnChangeTypeAdd :
113+ if change .Current .PrimaryKey || change .Current .NotNull {
114+ return false
115+ }
116+ case schema .TableColumnChangeTypeRemove :
117+ if change .Previous .PrimaryKey || change .Previous .NotNull {
118+ return false
119+ }
120+ case schema .TableColumnChangeTypeUpdate :
135121 return false
122+ default :
123+ panic ("unknown change type" )
136124 }
137125 }
138126 return true
139127}
140128
141129// This is the responsibility of the CLI of the client to lock before running migration
142- func (c * Client ) Migrate (ctx context.Context , schemas schema.Schemas ) error {
143- schemas = c .normalizeSchemas ( schemas )
144- sqliteTables , err := c .sqliteTables (schemas )
130+ func (c * Client ) Migrate (ctx context.Context , tables schema.Tables ) error {
131+ normalizedTables : = c .normalizeTables ( tables )
132+ sqliteTables , err := c .sqliteTables (normalizedTables )
145133 if err != nil {
146134 return err
147135 }
148136
149137 if c .spec .MigrateMode != specs .MigrateModeForced {
150- nonAutoMigrableTables , changes := c .nonAutoMigrableTables ( schemas , sqliteTables )
151- if len (nonAutoMigrableTables ) > 0 {
152- return fmt .Errorf ("tables %s with changes %v require force migration. use 'migrate_mode: forced'" , strings .Join (nonAutoMigrableTables , "," ), changes )
138+ nonAutoMigratableTables , changes := c .nonAutoMigratableTables ( normalizedTables , sqliteTables )
139+ if len (nonAutoMigratableTables ) > 0 {
140+ return fmt .Errorf ("tables %s with changes %v require force migration. use 'migrate_mode: forced'" , strings .Join (nonAutoMigratableTables , "," ), changes )
153141 }
154142 }
155143
156- for _ , table := range schemas {
157- tableName := schema .TableName (table )
158- if tableName == "" {
159- return fmt .Errorf ("schema %s has no table name" , table .String ())
160- }
161- c .logger .Info ().Str ("table" , tableName ).Msg ("Migrating table" )
162- if len (table .Fields ()) == 0 {
163- c .logger .Info ().Str ("table" , tableName ).Msg ("Table with no columns, skipping" )
144+ for _ , table := range normalizedTables {
145+ c .logger .Info ().Str ("table" , table .Name ).Msg ("Migrating table" )
146+ if len (table .Columns ) == 0 {
147+ c .logger .Info ().Str ("table" , table .Name ).Msg ("Table with no columns, skipping" )
164148 continue
165149 }
166150
167- sqlite := sqliteTables .SchemaByName ( tableName )
151+ sqlite := sqliteTables .Get ( table . Name )
168152 if sqlite == nil {
169- c .logger .Debug ().Str ("table" , tableName ).Msg ("Table doesn't exist, creating" )
153+ c .logger .Debug ().Str ("table" , table . Name ).Msg ("Table doesn't exist, creating" )
170154 if err := c .createTableIfNotExist (table ); err != nil {
171155 return err
172156 }
173157 } else {
174- changes := schema . GetSchemaChanges ( table , sqlite )
158+ changes := table . GetChanges ( sqlite )
175159 if c .canAutoMigrate (changes ) {
176- c .logger .Info ().Str ("table" , tableName ).Msg ("Table exists, auto-migrating" )
160+ c .logger .Info ().Str ("table" , table . Name ).Msg ("Table exists, auto-migrating" )
177161 if err := c .autoMigrateTable (table , changes ); err != nil {
178162 return err
179163 }
180164 } else {
181- c .logger .Info ().Str ("table" , tableName ).Msg ("Table exists, force migration required" )
165+ c .logger .Info ().Str ("table" , table . Name ).Msg ("Table exists, force migration required" )
182166 if err := c .recreateTable (table ); err != nil {
183167 return err
184168 }
@@ -189,14 +173,10 @@ func (c *Client) Migrate(ctx context.Context, schemas schema.Schemas) error {
189173 return nil
190174}
191175
192- func (c * Client ) recreateTable (table * arrow.Schema ) error {
193- tableName , ok := table .Metadata ().GetValue (schema .MetadataTableName )
194- if ! ok {
195- return fmt .Errorf ("schema %s has no table name" , table .String ())
196- }
197- sql := "drop table if exists \" " + tableName + "\" "
176+ func (c * Client ) recreateTable (table * schema.Table ) error {
177+ sql := "drop table if exists \" " + table .Name + "\" "
198178 if _ , err := c .db .Exec (sql ); err != nil {
199- return fmt .Errorf ("failed to drop table %s: %w" , tableName , err )
179+ return fmt .Errorf ("failed to drop table %s: %w" , table . Name , err )
200180 }
201181 return c .createTableIfNotExist (table )
202182}
@@ -209,44 +189,41 @@ func (c *Client) addColumn(tableName string, columnName string, columnType strin
209189 return nil
210190}
211191
212- func (c * Client ) createTableIfNotExist (sc * arrow. Schema ) error {
192+ func (c * Client ) createTableIfNotExist (table * schema. Table ) error {
213193 var sb strings.Builder
214- tableName , ok := sc .Metadata ().GetValue (schema .MetadataTableName )
215- if ! ok {
216- return fmt .Errorf ("schema %s has no table name" , sc .String ())
217- }
218- // TODO sanitize tablename
194+
195+ // TODO sanitize table.Name
219196 sb .WriteString ("CREATE TABLE IF NOT EXISTS " )
220- sb .WriteString (`"` + tableName + `"` )
197+ sb .WriteString (`"` + table . Name + `"` )
221198 sb .WriteString (" (" )
222- totalColumns := len (sc . Fields () )
199+ totalColumns := len (table . Columns )
223200
224201 primaryKeys := []string {}
225- for i , col := range sc . Fields () {
202+ for i , col := range table . Columns {
226203 sqlType := c .arrowTypeToSqliteStr (col .Type )
227204 if sqlType == "" {
228- c .logger .Warn ().Str ("table" , tableName ).Str ("column" , col .Name ).Msg ("Column type is not supported, skipping" )
205+ c .logger .Warn ().Str ("table" , table . Name ).Str ("column" , col .Name ).Msg ("Column type is not supported, skipping" )
229206 continue
230207 }
231208 // TODO: sanitize column name
232209 fieldDef := `"` + col .Name + `" ` + sqlType
233- if ! col .Nullable {
210+ if col .NotNull {
234211 fieldDef += " NOT NULL"
235212 }
236213 sb .WriteString (fieldDef )
237214 if i != totalColumns - 1 {
238215 sb .WriteString ("," )
239216 }
240217
241- if c .enabledPks () && schema . IsPk ( col ) {
218+ if c .enabledPks () && col . PrimaryKey {
242219 primaryKeys = append (primaryKeys , `"` + col .Name + `"` )
243220 }
244221 }
245222
246223 if len (primaryKeys ) > 0 {
247224 // add composite PK constraint on primary key columns
248225 sb .WriteString (", CONSTRAINT " )
249- sb .WriteString (tableName )
226+ sb .WriteString (table . Name )
250227 sb .WriteString ("_cqpk PRIMARY KEY (" )
251228 sb .WriteString (strings .Join (primaryKeys , "," ))
252229 sb .WriteString (")" )
0 commit comments