From 4d0cd734a9344ed904f1fd9cd4adafb0b6b32e06 Mon Sep 17 00:00:00 2001 From: bbernays Date: Tue, 16 May 2023 09:08:03 -0500 Subject: [PATCH 1/8] upgrade --- plugins/destination/sqlite/client/client.go | 2 +- .../destination/sqlite/client/client_test.go | 2 +- .../destination/sqlite/client/deletestale.go | 6 +++--- plugins/destination/sqlite/client/metrics.go | 2 +- plugins/destination/sqlite/client/migrate.go | 12 ++++++------ plugins/destination/sqlite/client/read.go | 18 +++++++++--------- plugins/destination/sqlite/client/write.go | 4 ++-- plugins/destination/sqlite/go.mod | 4 +++- plugins/destination/sqlite/go.sum | 3 +++ plugins/destination/sqlite/main.go | 4 ++-- 10 files changed, 31 insertions(+), 26 deletions(-) diff --git a/plugins/destination/sqlite/client/client.go b/plugins/destination/sqlite/client/client.go index 575b1fef34c0dc..89b86c39410e7b 100644 --- a/plugins/destination/sqlite/client/client.go +++ b/plugins/destination/sqlite/client/client.go @@ -6,7 +6,7 @@ import ( "fmt" "github.com/cloudquery/plugin-pb-go/specs" - "github.com/cloudquery/plugin-sdk/v2/plugins/destination" + "github.com/cloudquery/plugin-sdk/v3/plugins/destination" "github.com/rs/zerolog" // Import sqlite3 driver diff --git a/plugins/destination/sqlite/client/client_test.go b/plugins/destination/sqlite/client/client_test.go index b49c53627f56a2..2ea8e0ac1a0d1d 100644 --- a/plugins/destination/sqlite/client/client_test.go +++ b/plugins/destination/sqlite/client/client_test.go @@ -4,7 +4,7 @@ import ( "testing" "github.com/cloudquery/plugin-pb-go/specs" - "github.com/cloudquery/plugin-sdk/v2/plugins/destination" + "github.com/cloudquery/plugin-sdk/v3/plugins/destination" ) var migrateStrategy = destination.MigrateStrategy{ diff --git a/plugins/destination/sqlite/client/deletestale.go b/plugins/destination/sqlite/client/deletestale.go index c2f04b31276d1c..34f7e7e0ad332b 100644 --- a/plugins/destination/sqlite/client/deletestale.go +++ b/plugins/destination/sqlite/client/deletestale.go @@ -5,14 +5,14 @@ import ( "strings" "time" - "github.com/cloudquery/plugin-sdk/v2/schema" + "github.com/cloudquery/plugin-sdk/v3/schema" ) -func (c *Client) DeleteStale(ctx context.Context, tables schema.Schemas, source string, syncTime time.Time) error { +func (c *Client) DeleteStale(ctx context.Context, tables schema.Tables, source string, syncTime time.Time) error { for _, table := range tables { var sb strings.Builder sb.WriteString("delete from ") - sb.WriteString(`"` + schema.TableName(table) + `"`) + sb.WriteString(`"` + table.Name + `"`) sb.WriteString(" where ") sb.WriteString(`"` + schema.CqSourceNameColumn.Name + `"`) sb.WriteString(" = $1 and datetime(") diff --git a/plugins/destination/sqlite/client/metrics.go b/plugins/destination/sqlite/client/metrics.go index 6e0bc82117c87a..0abe9f700b726d 100644 --- a/plugins/destination/sqlite/client/metrics.go +++ b/plugins/destination/sqlite/client/metrics.go @@ -1,7 +1,7 @@ package client import ( - "github.com/cloudquery/plugin-sdk/v2/plugins/destination" + "github.com/cloudquery/plugin-sdk/v3/plugins/destination" ) func (c *Client) Metrics() destination.Metrics { diff --git a/plugins/destination/sqlite/client/migrate.go b/plugins/destination/sqlite/client/migrate.go index dcec74916e61dd..48b5b80f5ad699 100644 --- a/plugins/destination/sqlite/client/migrate.go +++ b/plugins/destination/sqlite/client/migrate.go @@ -7,7 +7,7 @@ import ( "github.com/apache/arrow/go/v13/arrow" "github.com/cloudquery/plugin-pb-go/specs" - "github.com/cloudquery/plugin-sdk/v2/schema" + "github.com/cloudquery/plugin-sdk/v3/schema" ) const ( @@ -62,7 +62,7 @@ func (c *Client) sqliteTables(schemas schema.Schemas) (schema.Schemas, error) { return schemaTables, nil } -func (c *Client) normalizeSchemas(scs schema.Schemas) schema.Schemas { +func (c *Client) normalizeSchemas(tables schema.Tables) schema.Tables { var normalized schema.Schemas for _, sc := range scs { fields := make([]arrow.Field, 0) @@ -110,10 +110,10 @@ func (c *Client) nonAutoMigrableTables(tables schema.Schemas, sqliteTables schem return result, tableChanges } -func (c *Client) autoMigrateTable(table *arrow.Schema, changes []schema.FieldChange) error { +func (c *Client) autoMigrateTable(table *schema.Table, changes []schema.TableColumnChange) error { for _, change := range changes { if change.Type == schema.TableColumnChangeTypeAdd { - if err := c.addColumn(schema.TableName(table), change.Current.Name, c.arrowTypeToSqliteStr(change.Current.Type)); err != nil { + if err := c.addColumn(table.Name, change.Current.Name, c.arrowTypeToSqliteStr(change.Current.Type)); err != nil { return err } } @@ -121,7 +121,7 @@ func (c *Client) autoMigrateTable(table *arrow.Schema, changes []schema.FieldCha return nil } -func (*Client) canAutoMigrate(changes []schema.FieldChange) bool { +func (*Client) canAutoMigrate(changes []schema.TableColumnChange) bool { for _, change := range changes { if change.Type == schema.TableColumnChangeTypeAdd && (schema.IsPk(change.Current) || !change.Current.Nullable) { return false @@ -139,7 +139,7 @@ func (*Client) canAutoMigrate(changes []schema.FieldChange) bool { } // This is the responsibility of the CLI of the client to lock before running migration -func (c *Client) Migrate(ctx context.Context, schemas schema.Schemas) error { +func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error { schemas = c.normalizeSchemas(schemas) sqliteTables, err := c.sqliteTables(schemas) if err != nil { diff --git a/plugins/destination/sqlite/client/read.go b/plugins/destination/sqlite/client/read.go index 341b176f0649d7..7fd680ca332327 100644 --- a/plugins/destination/sqlite/client/read.go +++ b/plugins/destination/sqlite/client/read.go @@ -9,7 +9,7 @@ import ( "github.com/apache/arrow/go/v13/arrow" "github.com/apache/arrow/go/v13/arrow/array" "github.com/apache/arrow/go/v13/arrow/memory" - "github.com/cloudquery/plugin-sdk/v2/schema" + "github.com/cloudquery/plugin-sdk/v3/schema" ) const ( @@ -148,23 +148,23 @@ func reverseTransform(sc *arrow.Schema, values []any) (arrow.Record, error) { return rec, nil } -func (c *Client) Read(ctx context.Context, table *arrow.Schema, sourceName string, res chan<- arrow.Record) error { - colNames := make([]string, 0, len(table.Fields())) - for _, col := range table.Fields() { +func (c *Client) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- arrow.Record) error { + colNames := make([]string, 0, len(table.Columns)) + for _, col := range table.Columns { colNames = append(colNames, `"`+col.Name+`"`) } cols := strings.Join(colNames, ", ") - tableName := schema.TableName(table) - rows, err := c.db.Query(fmt.Sprintf(readSQL, cols, tableName), sourceName) + rows, err := c.db.Query(fmt.Sprintf(readSQL, cols, table.Name), sourceName) if err != nil { return err } for rows.Next() { - values := c.createResultsArray(table) + arrowSchema := table.ToArrowSchema() + values := c.createResultsArray(arrowSchema) if err := rows.Scan(values...); err != nil { - return fmt.Errorf("failed to read from table %s: %w", tableName, err) + return fmt.Errorf("failed to read from table %s: %w", table.Name, err) } - record, err := reverseTransform(table, values) + record, err := reverseTransform(arrowSchema, values) if err != nil { return err } diff --git a/plugins/destination/sqlite/client/write.go b/plugins/destination/sqlite/client/write.go index 0b2ead909e4a8b..c35eaf8dccd3cc 100644 --- a/plugins/destination/sqlite/client/write.go +++ b/plugins/destination/sqlite/client/write.go @@ -7,10 +7,10 @@ import ( "github.com/apache/arrow/go/v13/arrow" "github.com/cloudquery/plugin-pb-go/specs" - "github.com/cloudquery/plugin-sdk/v2/schema" + "github.com/cloudquery/plugin-sdk/v3/schema" ) -func (c *Client) Write(ctx context.Context, tables schema.Schemas, res <-chan arrow.Record) error { +func (c *Client) Write(ctx context.Context, tables schema.Tables, res <-chan arrow.Record) error { var sql string for r := range res { if c.spec.WriteMode == specs.WriteModeAppend { diff --git a/plugins/destination/sqlite/go.mod b/plugins/destination/sqlite/go.mod index a4f7cc9e55739b..ae7f801060e338 100644 --- a/plugins/destination/sqlite/go.mod +++ b/plugins/destination/sqlite/go.mod @@ -5,7 +5,7 @@ go 1.19 require ( github.com/apache/arrow/go/v13 v13.0.0-20230509040948-de6c3cd2b604 github.com/cloudquery/plugin-pb-go v1.0.8 - github.com/cloudquery/plugin-sdk/v2 v2.7.0 + github.com/cloudquery/plugin-sdk/v3 v3.4.0 github.com/mattn/go-sqlite3 v1.14.16 github.com/rs/zerolog v1.29.0 ) @@ -16,6 +16,7 @@ replace github.com/apache/arrow/go/v13 => github.com/cloudquery/arrow/go/v13 v13 require ( github.com/andybalholm/brotli v1.0.5 // indirect github.com/apache/thrift v0.16.0 // indirect + github.com/cloudquery/plugin-sdk/v2 v2.7.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/getsentry/sentry-go v0.20.0 // indirect github.com/ghodss/yaml v1.0.0 // indirect @@ -34,6 +35,7 @@ require ( github.com/mattn/go-isatty v0.0.18 // indirect github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect + github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/spf13/cast v1.5.0 // indirect github.com/spf13/cobra v1.6.1 // indirect diff --git a/plugins/destination/sqlite/go.sum b/plugins/destination/sqlite/go.sum index 34f5f41224b04f..22300b5b552122 100644 --- a/plugins/destination/sqlite/go.sum +++ b/plugins/destination/sqlite/go.sum @@ -50,6 +50,8 @@ github.com/cloudquery/plugin-pb-go v1.0.8 h1:wn3GXhcNItcP+6wUUZuzUFbvdL59liKBO37 github.com/cloudquery/plugin-pb-go v1.0.8/go.mod h1:vAGA27psem7ZZNAY4a3S9TKuA/JDQWstjKcHPJX91Mc= github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U= github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug= +github.com/cloudquery/plugin-sdk/v3 v3.4.0 h1:Vy2BfZu4b283kKsT5VuzDbYYgLyiBeU+3AU0+qvywbQ= +github.com/cloudquery/plugin-sdk/v3 v3.4.0/go.mod h1:3JrZXEULmGXpkOukVaRIzaA63d7TJr9Ukp6hemTjbtc= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= @@ -176,6 +178,7 @@ github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8D github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= diff --git a/plugins/destination/sqlite/main.go b/plugins/destination/sqlite/main.go index f2ef7ca54df647..57661d7748d98e 100644 --- a/plugins/destination/sqlite/main.go +++ b/plugins/destination/sqlite/main.go @@ -3,8 +3,8 @@ package main import ( "github.com/cloudquery/cloudquery/plugins/destination/sqlite/client" "github.com/cloudquery/cloudquery/plugins/destination/sqlite/resources/plugin" - "github.com/cloudquery/plugin-sdk/v2/plugins/destination" - "github.com/cloudquery/plugin-sdk/v2/serve" + "github.com/cloudquery/plugin-sdk/v3/plugins/destination" + "github.com/cloudquery/plugin-sdk/v3/serve" ) const ( From 731557536c6c0c182a654f49555ffbf0915125e2 Mon Sep 17 00:00:00 2001 From: bbernays Date: Tue, 16 May 2023 11:55:42 -0500 Subject: [PATCH 2/8] compile --- plugins/destination/sqlite/client/migrate.go | 190 +++++++++---------- plugins/destination/sqlite/go.mod | 2 +- plugins/destination/sqlite/go.sum | 2 + 3 files changed, 94 insertions(+), 100 deletions(-) diff --git a/plugins/destination/sqlite/client/migrate.go b/plugins/destination/sqlite/client/migrate.go index 48b5b80f5ad699..6cac724b571d2a 100644 --- a/plugins/destination/sqlite/client/migrate.go +++ b/plugins/destination/sqlite/client/migrate.go @@ -27,15 +27,11 @@ type tableInfo struct { columns []columnInfo } -func (c *Client) sqliteTables(schemas schema.Schemas) (schema.Schemas, error) { - var schemaTables schema.Schemas - for _, sc := range schemas { - var fields []arrow.Field - tableName := schema.TableName(sc) - if tableName == "" { - return nil, fmt.Errorf("schema %s has no table name", sc.String()) - } - info, err := c.getTableInfo(tableName) +func (c *Client) sqliteTables(tables schema.Tables) (schema.Tables, error) { + var schemaTables schema.Tables + for _, table := range tables { + var columns []schema.Column + info, err := c.getTableInfo(table.Name) if info == nil { continue } @@ -43,67 +39,65 @@ func (c *Client) sqliteTables(schemas schema.Schemas) (schema.Schemas, error) { return nil, err } for _, col := range info.columns { - var fieldMetadata schema.MetadataFieldOptions - if col.pk != 0 { - fieldMetadata.PrimaryKey = true - } - fields = append(fields, arrow.Field{ - Name: col.name, - Type: c.sqliteTypeToArrowType(col.typ), - Nullable: !col.notNull, - Metadata: schema.NewFieldMetadataFromOptions(fieldMetadata), + columns = append(columns, schema.Column{ + Name: col.name, + Type: c.sqliteTypeToArrowType(col.typ), + PrimaryKey: col.pk != 0, + NotNull: col.notNull, }) } - var tableMetadata schema.MetadataSchemaOptions - tableMetadata.TableName = tableName - m := schema.NewSchemaMetadataFromOptions(tableMetadata) - schemaTables = append(schemaTables, arrow.NewSchema(fields, &m)) + schemaTables = append(schemaTables, &schema.Table{Name: table.Name, Columns: columns}) } return schemaTables, nil } -func (c *Client) normalizeSchemas(tables schema.Tables) schema.Tables { - var normalized schema.Schemas - for _, sc := range scs { - fields := make([]arrow.Field, 0) - for _, f := range sc.Fields() { - keys := make([]string, 0) - values := make([]string, 0) - origKeys := f.Metadata.Keys() - origValues := f.Metadata.Values() - for k, v := range origKeys { - if v != schema.MetadataUnique { - keys = append(keys, v) - values = append(values, origValues[k]) - } - } - fields = append(fields, arrow.Field{ - Name: f.Name, - Type: c.arrowTypeToSqlite(f.Type), - Nullable: f.Nullable, - Metadata: arrow.NewMetadata(keys, values), - }) +func (c *Client) normalizeTables(tables schema.Tables) (schema.Tables, error) { + flattened := tables.FlattenTables() + canonized := make(schema.Tables, len(flattened)) + var err error + for i, table := range flattened { + canonized[i], err = c.normalizeTable(table) + if err != nil { + return nil, err } + } + return canonized, nil +} - md := sc.Metadata() - normalized = append(normalized, arrow.NewSchema(fields, &md)) +func (c *Client) normalizeTable(table *schema.Table) (*schema.Table, error) { + columns := make([]schema.Column, len(table.Columns)) + for i, col := range table.Columns { + canonized := c.normalizeField(col.ToArrowField()) + columns[i] = schema.NewColumnFromArrowField(*canonized) } + return &schema.Table{Name: table.Name, Columns: columns}, nil +} - return normalized +func (c *Client) normalizeField(field arrow.Field) *arrow.Field { + // 1 - convert to the ClickHouse + fieldType := c.arrowTypeToSqlite(field.Type) + + // 2 - convert back to Apache Arrow + return &arrow.Field{ + Name: field.Name, + Type: fieldType, + Nullable: field.Nullable, + Metadata: field.Metadata, + } } -func (c *Client) nonAutoMigrableTables(tables schema.Schemas, sqliteTables schema.Schemas) ([]string, [][]schema.FieldChange) { +func (c *Client) nonAutoMigratableTables(tables schema.Tables, sqliteTables schema.Tables) ([]string, [][]schema.TableColumnChange) { var result []string - var tableChanges [][]schema.FieldChange + var tableChanges [][]schema.TableColumnChange for _, t := range tables { - tableName := schema.TableName(t) - sqliteTable := sqliteTables.SchemaByName(tableName) + + sqliteTable := sqliteTables.Get(t.Name) if sqliteTable == nil { continue } - changes := schema.GetSchemaChanges(t, sqliteTable) + changes := sqliteTable.GetChanges(t) if !c.canAutoMigrate(changes) { - result = append(result, tableName) + result = append(result, t.Name) tableChanges = append(tableChanges, changes) } } @@ -123,16 +117,19 @@ func (c *Client) autoMigrateTable(table *schema.Table, changes []schema.TableCol func (*Client) canAutoMigrate(changes []schema.TableColumnChange) bool { for _, change := range changes { - if change.Type == schema.TableColumnChangeTypeAdd && (schema.IsPk(change.Current) || !change.Current.Nullable) { - return false - } - - if change.Type == schema.TableColumnChangeTypeRemove && (schema.IsPk(change.Previous) || !change.Previous.Nullable) { - return false - } - - if change.Type == schema.TableColumnChangeTypeUpdate { + switch change.Type { + case schema.TableColumnChangeTypeAdd: + if change.Current.PrimaryKey || change.Current.NotNull { + return false + } + case schema.TableColumnChangeTypeRemove: + if change.Previous.PrimaryKey || change.Previous.NotNull { + return false + } + case schema.TableColumnChangeTypeUpdate: return false + default: + panic("unknown change type") } } return true @@ -140,45 +137,46 @@ func (*Client) canAutoMigrate(changes []schema.TableColumnChange) bool { // This is the responsibility of the CLI of the client to lock before running migration func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error { - schemas = c.normalizeSchemas(schemas) - sqliteTables, err := c.sqliteTables(schemas) + normalizedTables, err := c.normalizeTables(tables) + if err != nil { + return err + } + + sqliteTables, err := c.sqliteTables(normalizedTables) if err != nil { return err } if c.spec.MigrateMode != specs.MigrateModeForced { - nonAutoMigrableTables, changes := c.nonAutoMigrableTables(schemas, sqliteTables) - if len(nonAutoMigrableTables) > 0 { - return fmt.Errorf("tables %s with changes %v require force migration. use 'migrate_mode: forced'", strings.Join(nonAutoMigrableTables, ","), changes) + nonAutoMigratableTables, changes := c.nonAutoMigratableTables(normalizedTables, sqliteTables) + if len(nonAutoMigratableTables) > 0 { + return fmt.Errorf("tables %s with changes %v require force migration. use 'migrate_mode: forced'", strings.Join(nonAutoMigratableTables, ","), changes) } } - for _, table := range schemas { - tableName := schema.TableName(table) - if tableName == "" { - return fmt.Errorf("schema %s has no table name", table.String()) - } - c.logger.Info().Str("table", tableName).Msg("Migrating table") - if len(table.Fields()) == 0 { - c.logger.Info().Str("table", tableName).Msg("Table with no columns, skipping") + for _, table := range normalizedTables { + c.logger.Info().Str("table", table.Name).Msg("Migrating table") + if len(table.Columns) == 0 { + c.logger.Info().Str("table", table.Name).Msg("Table with no columns, skipping") continue } - sqlite := sqliteTables.SchemaByName(tableName) + sqlite := sqliteTables.Get(table.Name) if sqlite == nil { - c.logger.Debug().Str("table", tableName).Msg("Table doesn't exist, creating") + c.logger.Debug().Str("table", table.Name).Msg("Table doesn't exist, creating") if err := c.createTableIfNotExist(table); err != nil { return err } } else { - changes := schema.GetSchemaChanges(table, sqlite) + + changes := table.GetChanges(sqlite) if c.canAutoMigrate(changes) { - c.logger.Info().Str("table", tableName).Msg("Table exists, auto-migrating") + c.logger.Info().Str("table", table.Name).Msg("Table exists, auto-migrating") if err := c.autoMigrateTable(table, changes); err != nil { return err } } else { - c.logger.Info().Str("table", tableName).Msg("Table exists, force migration required") + c.logger.Info().Str("table", table.Name).Msg("Table exists, force migration required") if err := c.recreateTable(table); err != nil { return err } @@ -189,14 +187,11 @@ func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error { return nil } -func (c *Client) recreateTable(table *arrow.Schema) error { - tableName, ok := table.Metadata().GetValue(schema.MetadataTableName) - if !ok { - return fmt.Errorf("schema %s has no table name", table.String()) - } - sql := "drop table if exists \"" + tableName + "\"" +func (c *Client) recreateTable(table *schema.Table) error { + + sql := "drop table if exists \"" + table.Name + "\"" if _, err := c.db.Exec(sql); err != nil { - return fmt.Errorf("failed to drop table %s: %w", tableName, err) + return fmt.Errorf("failed to drop table %s: %w", table.Name, err) } return c.createTableIfNotExist(table) } @@ -209,28 +204,25 @@ func (c *Client) addColumn(tableName string, columnName string, columnType strin return nil } -func (c *Client) createTableIfNotExist(sc *arrow.Schema) error { +func (c *Client) createTableIfNotExist(table *schema.Table) error { var sb strings.Builder - tableName, ok := sc.Metadata().GetValue(schema.MetadataTableName) - if !ok { - return fmt.Errorf("schema %s has no table name", sc.String()) - } - // TODO sanitize tablename + + // TODO sanitize table.Name sb.WriteString("CREATE TABLE IF NOT EXISTS ") - sb.WriteString(`"` + tableName + `"`) + sb.WriteString(`"` + table.Name + `"`) sb.WriteString(" (") - totalColumns := len(sc.Fields()) + totalColumns := len(table.Columns) primaryKeys := []string{} - for i, col := range sc.Fields() { + for i, col := range table.Columns { sqlType := c.arrowTypeToSqliteStr(col.Type) if sqlType == "" { - c.logger.Warn().Str("table", tableName).Str("column", col.Name).Msg("Column type is not supported, skipping") + c.logger.Warn().Str("table", table.Name).Str("column", col.Name).Msg("Column type is not supported, skipping") continue } // TODO: sanitize column name fieldDef := `"` + col.Name + `" ` + sqlType - if !col.Nullable { + if col.NotNull { fieldDef += " NOT NULL" } sb.WriteString(fieldDef) @@ -238,7 +230,7 @@ func (c *Client) createTableIfNotExist(sc *arrow.Schema) error { sb.WriteString(",") } - if c.enabledPks() && schema.IsPk(col) { + if c.enabledPks() && col.PrimaryKey { primaryKeys = append(primaryKeys, `"`+col.Name+`"`) } } @@ -246,7 +238,7 @@ func (c *Client) createTableIfNotExist(sc *arrow.Schema) error { if len(primaryKeys) > 0 { // add composite PK constraint on primary key columns sb.WriteString(", CONSTRAINT ") - sb.WriteString(tableName) + sb.WriteString(table.Name) sb.WriteString("_cqpk PRIMARY KEY (") sb.WriteString(strings.Join(primaryKeys, ",")) sb.WriteString(")") diff --git a/plugins/destination/sqlite/go.mod b/plugins/destination/sqlite/go.mod index ae7f801060e338..8554d200a0cf9a 100644 --- a/plugins/destination/sqlite/go.mod +++ b/plugins/destination/sqlite/go.mod @@ -5,7 +5,7 @@ go 1.19 require ( github.com/apache/arrow/go/v13 v13.0.0-20230509040948-de6c3cd2b604 github.com/cloudquery/plugin-pb-go v1.0.8 - github.com/cloudquery/plugin-sdk/v3 v3.4.0 + github.com/cloudquery/plugin-sdk/v3 v3.5.1 github.com/mattn/go-sqlite3 v1.14.16 github.com/rs/zerolog v1.29.0 ) diff --git a/plugins/destination/sqlite/go.sum b/plugins/destination/sqlite/go.sum index 22300b5b552122..b0956ed7611702 100644 --- a/plugins/destination/sqlite/go.sum +++ b/plugins/destination/sqlite/go.sum @@ -52,6 +52,8 @@ github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3K github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug= github.com/cloudquery/plugin-sdk/v3 v3.4.0 h1:Vy2BfZu4b283kKsT5VuzDbYYgLyiBeU+3AU0+qvywbQ= github.com/cloudquery/plugin-sdk/v3 v3.4.0/go.mod h1:3JrZXEULmGXpkOukVaRIzaA63d7TJr9Ukp6hemTjbtc= +github.com/cloudquery/plugin-sdk/v3 v3.5.1 h1:797hWUEsojwvp7xtr6LSaf5tk5iG9UDixoRACxu3xrU= +github.com/cloudquery/plugin-sdk/v3 v3.5.1/go.mod h1:3JrZXEULmGXpkOukVaRIzaA63d7TJr9Ukp6hemTjbtc= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= From a35efafa29c892a4b441505132c3e420672405be Mon Sep 17 00:00:00 2001 From: bbernays Date: Tue, 16 May 2023 12:41:00 -0500 Subject: [PATCH 3/8] Update read.go --- plugins/destination/sqlite/client/read.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/destination/sqlite/client/read.go b/plugins/destination/sqlite/client/read.go index 7fd680ca332327..1f20660b10085b 100644 --- a/plugins/destination/sqlite/client/read.go +++ b/plugins/destination/sqlite/client/read.go @@ -127,7 +127,7 @@ func reverseTransform(sc *arrow.Schema, values []any) (arrow.Record, error) { } else { bldr.Field(i).(*array.StringBuilder).Append(val.(*sql.NullString).String) } - case arrow.BINARY: + case arrow.BINARY, arrow.LARGE_BINARY: if *val.(*[]byte) == nil { bldr.Field(i).AppendNull() } else { From c8b29d073960f11bde252e04c3f7055fed70e944 Mon Sep 17 00:00:00 2001 From: bbernays Date: Tue, 16 May 2023 12:41:02 -0500 Subject: [PATCH 4/8] Update transformer.go --- plugins/destination/sqlite/client/transformer.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/plugins/destination/sqlite/client/transformer.go b/plugins/destination/sqlite/client/transformer.go index 7d938c13997c8f..180219291cef2e 100644 --- a/plugins/destination/sqlite/client/transformer.go +++ b/plugins/destination/sqlite/client/transformer.go @@ -36,6 +36,8 @@ func getValue(arr arrow.Array, i int) any { return arr.(*array.String).Value(i) case arrow.BINARY: return arr.(*array.Binary).Value(i) + case arrow.LARGE_BINARY: + return arr.(*array.LargeBinary).Value(i) case arrow.FIXED_SIZE_BINARY: return arr.(*array.FixedSizeBinary).Value(i) default: From f47071908b70ee8e11f108aec37284f79a6786b4 Mon Sep 17 00:00:00 2001 From: bbernays Date: Tue, 16 May 2023 12:48:26 -0500 Subject: [PATCH 5/8] Update migrate.go --- plugins/destination/sqlite/client/migrate.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/plugins/destination/sqlite/client/migrate.go b/plugins/destination/sqlite/client/migrate.go index 6cac724b571d2a..0332a79a0284b6 100644 --- a/plugins/destination/sqlite/client/migrate.go +++ b/plugins/destination/sqlite/client/migrate.go @@ -53,22 +53,22 @@ func (c *Client) sqliteTables(tables schema.Tables) (schema.Tables, error) { func (c *Client) normalizeTables(tables schema.Tables) (schema.Tables, error) { flattened := tables.FlattenTables() - canonized := make(schema.Tables, len(flattened)) + normalized := make(schema.Tables, len(flattened)) var err error for i, table := range flattened { - canonized[i], err = c.normalizeTable(table) + normalized[i], err = c.normalizeTable(table) if err != nil { return nil, err } } - return canonized, nil + return normalized, nil } func (c *Client) normalizeTable(table *schema.Table) (*schema.Table, error) { columns := make([]schema.Column, len(table.Columns)) for i, col := range table.Columns { - canonized := c.normalizeField(col.ToArrowField()) - columns[i] = schema.NewColumnFromArrowField(*canonized) + normalized := c.normalizeField(col.ToArrowField()) + columns[i] = schema.NewColumnFromArrowField(*normalized) } return &schema.Table{Name: table.Name, Columns: columns}, nil } From 7104b9ec3f5f98c93d373a422b1354e2c88c3a88 Mon Sep 17 00:00:00 2001 From: bbernays Date: Tue, 16 May 2023 12:57:36 -0500 Subject: [PATCH 6/8] linting --- plugins/destination/sqlite/client/migrate.go | 23 +++++--------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/plugins/destination/sqlite/client/migrate.go b/plugins/destination/sqlite/client/migrate.go index 0332a79a0284b6..b43129c6f48380 100644 --- a/plugins/destination/sqlite/client/migrate.go +++ b/plugins/destination/sqlite/client/migrate.go @@ -51,26 +51,22 @@ func (c *Client) sqliteTables(tables schema.Tables) (schema.Tables, error) { return schemaTables, nil } -func (c *Client) normalizeTables(tables schema.Tables) (schema.Tables, error) { +func (c *Client) normalizeTables(tables schema.Tables) schema.Tables { flattened := tables.FlattenTables() normalized := make(schema.Tables, len(flattened)) - var err error for i, table := range flattened { - normalized[i], err = c.normalizeTable(table) - if err != nil { - return nil, err - } + normalized[i] = c.normalizeTable(table) } - return normalized, nil + return normalized } -func (c *Client) normalizeTable(table *schema.Table) (*schema.Table, error) { +func (c *Client) normalizeTable(table *schema.Table) *schema.Table { columns := make([]schema.Column, len(table.Columns)) for i, col := range table.Columns { normalized := c.normalizeField(col.ToArrowField()) columns[i] = schema.NewColumnFromArrowField(*normalized) } - return &schema.Table{Name: table.Name, Columns: columns}, nil + return &schema.Table{Name: table.Name, Columns: columns} } func (c *Client) normalizeField(field arrow.Field) *arrow.Field { @@ -90,7 +86,6 @@ func (c *Client) nonAutoMigratableTables(tables schema.Tables, sqliteTables sche var result []string var tableChanges [][]schema.TableColumnChange for _, t := range tables { - sqliteTable := sqliteTables.Get(t.Name) if sqliteTable == nil { continue @@ -137,11 +132,7 @@ func (*Client) canAutoMigrate(changes []schema.TableColumnChange) bool { // This is the responsibility of the CLI of the client to lock before running migration func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error { - normalizedTables, err := c.normalizeTables(tables) - if err != nil { - return err - } - + normalizedTables := c.normalizeTables(tables) sqliteTables, err := c.sqliteTables(normalizedTables) if err != nil { return err @@ -168,7 +159,6 @@ func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error { return err } } else { - changes := table.GetChanges(sqlite) if c.canAutoMigrate(changes) { c.logger.Info().Str("table", table.Name).Msg("Table exists, auto-migrating") @@ -188,7 +178,6 @@ func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error { } func (c *Client) recreateTable(table *schema.Table) error { - sql := "drop table if exists \"" + table.Name + "\"" if _, err := c.db.Exec(sql); err != nil { return fmt.Errorf("failed to drop table %s: %w", table.Name, err) From 0013d218201858762e0cf8612c5b7b95781b8e91 Mon Sep 17 00:00:00 2001 From: bbernays Date: Tue, 16 May 2023 14:03:54 -0500 Subject: [PATCH 7/8] PR comments --- plugins/destination/sqlite/client/migrate.go | 6 +----- plugins/destination/sqlite/client/read.go | 2 +- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/plugins/destination/sqlite/client/migrate.go b/plugins/destination/sqlite/client/migrate.go index b43129c6f48380..336ab99c35a0a1 100644 --- a/plugins/destination/sqlite/client/migrate.go +++ b/plugins/destination/sqlite/client/migrate.go @@ -70,13 +70,9 @@ func (c *Client) normalizeTable(table *schema.Table) *schema.Table { } func (c *Client) normalizeField(field arrow.Field) *arrow.Field { - // 1 - convert to the ClickHouse - fieldType := c.arrowTypeToSqlite(field.Type) - - // 2 - convert back to Apache Arrow return &arrow.Field{ Name: field.Name, - Type: fieldType, + Type: c.arrowTypeToSqlite(field.Type), Nullable: field.Nullable, Metadata: field.Metadata, } diff --git a/plugins/destination/sqlite/client/read.go b/plugins/destination/sqlite/client/read.go index 1f20660b10085b..87489430614f11 100644 --- a/plugins/destination/sqlite/client/read.go +++ b/plugins/destination/sqlite/client/read.go @@ -158,8 +158,8 @@ func (c *Client) Read(ctx context.Context, table *schema.Table, sourceName strin if err != nil { return err } + arrowSchema := table.ToArrowSchema() for rows.Next() { - arrowSchema := table.ToArrowSchema() values := c.createResultsArray(arrowSchema) if err := rows.Scan(values...); err != nil { return fmt.Errorf("failed to read from table %s: %w", table.Name, err) From 308af401f226b1da5b4afd149c5fc9d562284362 Mon Sep 17 00:00:00 2001 From: bbernays Date: Tue, 16 May 2023 14:15:05 -0500 Subject: [PATCH 8/8] Update read.go --- plugins/destination/sqlite/client/read.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/destination/sqlite/client/read.go b/plugins/destination/sqlite/client/read.go index 87489430614f11..6c9d9568631277 100644 --- a/plugins/destination/sqlite/client/read.go +++ b/plugins/destination/sqlite/client/read.go @@ -149,9 +149,9 @@ func reverseTransform(sc *arrow.Schema, values []any) (arrow.Record, error) { } func (c *Client) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- arrow.Record) error { - colNames := make([]string, 0, len(table.Columns)) - for _, col := range table.Columns { - colNames = append(colNames, `"`+col.Name+`"`) + colNames := make([]string, len(table.Columns)) + for i, col := range table.Columns { + colNames[i] = `"` + col.Name + `"` } cols := strings.Join(colNames, ", ") rows, err := c.db.Query(fmt.Sprintf(readSQL, cols, table.Name), sourceName)