From d2bf5a6e2abde6e3c18df6c4b0c68a3cd25e806d Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Sun, 14 May 2023 19:43:35 +0300 Subject: [PATCH 01/18] feat(postgresql): Update PG to SDK V3 native arrow support --- .../destination/postgresql/client/client.go | 2 +- .../postgresql/client/client_test.go | 2 +- .../postgresql/client/deletestale.go | 6 +- .../destination/postgresql/client/metrics.go | 2 +- .../destination/postgresql/client/migrate.go | 138 ++++++++---------- plugins/destination/postgresql/client/read.go | 15 +- .../postgresql/client/transformer.go | 2 +- .../postgresql/client/types_cockroach.go | 2 +- .../destination/postgresql/client/types_pg.go | 2 +- .../destination/postgresql/client/write.go | 31 ++-- plugins/destination/postgresql/go.mod | 6 +- plugins/destination/postgresql/go.sum | 6 + plugins/destination/postgresql/main.go | 4 +- 13 files changed, 109 insertions(+), 109 deletions(-) diff --git a/plugins/destination/postgresql/client/client.go b/plugins/destination/postgresql/client/client.go index 7599433673d822..1ac954b4a0c8e7 100644 --- a/plugins/destination/postgresql/client/client.go +++ b/plugins/destination/postgresql/client/client.go @@ -6,7 +6,7 @@ import ( "strings" "github.com/cloudquery/plugin-pb-go/specs" - "github.com/cloudquery/plugin-sdk/v2/plugins/destination" + "github.com/cloudquery/plugin-sdk/v3/plugins/destination" pgx_zero_log "github.com/jackc/pgx-zerolog" "github.com/jackc/pgx/v5" diff --git a/plugins/destination/postgresql/client/client_test.go b/plugins/destination/postgresql/client/client_test.go index 19a872dad5dff8..5f0a4cdc1f9f9d 100644 --- a/plugins/destination/postgresql/client/client_test.go +++ b/plugins/destination/postgresql/client/client_test.go @@ -5,7 +5,7 @@ import ( "testing" "github.com/cloudquery/plugin-pb-go/specs" - "github.com/cloudquery/plugin-sdk/v2/plugins/destination" + "github.com/cloudquery/plugin-sdk/v3/plugins/destination" ) func getTestConnection() string { diff --git a/plugins/destination/postgresql/client/deletestale.go b/plugins/destination/postgresql/client/deletestale.go index 320d6ec52bb974..edad26fead5a9d 100644 --- a/plugins/destination/postgresql/client/deletestale.go +++ b/plugins/destination/postgresql/client/deletestale.go @@ -6,16 +6,16 @@ import ( "strings" "time" - "github.com/cloudquery/plugin-sdk/v2/schema" + "github.com/cloudquery/plugin-sdk/v3/schema" "github.com/jackc/pgx/v5" ) -func (c *Client) DeleteStale(ctx context.Context, tables schema.Schemas, source string, syncTime time.Time) error { +func (c *Client) DeleteStale(ctx context.Context, tables schema.Tables, source string, syncTime time.Time) error { batch := &pgx.Batch{} for _, table := range tables { var sb strings.Builder sb.WriteString("delete from ") - sb.WriteString(pgx.Identifier{schema.TableName(table)}.Sanitize()) + sb.WriteString(pgx.Identifier{table.Name}.Sanitize()) sb.WriteString(" where ") sb.WriteString(schema.CqSourceNameColumn.Name) sb.WriteString(" = $1 and ") diff --git a/plugins/destination/postgresql/client/metrics.go b/plugins/destination/postgresql/client/metrics.go index 6e0bc82117c87a..0abe9f700b726d 100644 --- a/plugins/destination/postgresql/client/metrics.go +++ b/plugins/destination/postgresql/client/metrics.go @@ -1,7 +1,7 @@ package client import ( - "github.com/cloudquery/plugin-sdk/v2/plugins/destination" + "github.com/cloudquery/plugin-sdk/v3/plugins/destination" ) func (c *Client) Metrics() destination.Metrics { diff --git a/plugins/destination/postgresql/client/migrate.go b/plugins/destination/postgresql/client/migrate.go index 2fe0654047cdab..3c111812758e5c 100644 --- a/plugins/destination/postgresql/client/migrate.go +++ b/plugins/destination/postgresql/client/migrate.go @@ -3,12 +3,10 @@ package client import ( "context" "fmt" - "strconv" "strings" - "github.com/apache/arrow/go/v13/arrow" "github.com/cloudquery/plugin-pb-go/specs" - "github.com/cloudquery/plugin-sdk/v2/schema" + "github.com/cloudquery/plugin-sdk/v3/schema" "github.com/jackc/pgx/v5" ) @@ -94,10 +92,8 @@ ORDER BY ` ) -func (c *Client) listPgTables(ctx context.Context, pluginTables schema.Schemas) (schema.Schemas, error) { - var tables schema.Schemas - var fields []arrow.Field - tableMetaData := make(map[string]string) +func (c *Client) listPgTables(ctx context.Context, pluginTables schema.Tables) (schema.Tables, error) { + var tables schema.Tables sql := selectAllTables if c.pgType == pgTypeCockroachDB { sql = selectAllTablesCockroach @@ -115,67 +111,55 @@ func (c *Client) listPgTables(ctx context.Context, pluginTables schema.Schemas) return nil, err } // We don't want to migrate tables that are not a part of the spec, or non CloudQuery tables - if pluginTables.SchemaByName(tableName) == nil { + if pluginTables.Get(tableName) == nil { continue } if ordinalPosition == 1 { - if fields != nil { - md := arrow.MetadataFrom(tableMetaData) - tables = append(tables, arrow.NewSchema(fields, &md)) - fields = nil - tableMetaData = make(map[string]string, 0) - } - tableMetaData[schema.MetadataTableName] = tableName + tables = append(tables, &schema.Table{ + Name: tableName, + }) } + table := tables[len(tables)-1] if pkName != "" { - tableMetaData[schema.MetadataConstraintName] = pkName + table.PkConstraintName = pkName } schemaType := c.PgToSchemaType(columnType) - fields = append(fields, arrow.Field{ - Name: columnName, - Type: schemaType, - Nullable: !notNull, - Metadata: arrow.MetadataFrom(map[string]string{ - schema.MetadataPrimaryKey: strconv.FormatBool(isPrimaryKey), - }), + table.Columns = append(table.Columns, schema.Column{ + Name: columnName, + Type: schemaType, + CreationOptions: schema.ColumnCreationOptions{ + PrimaryKey: isPrimaryKey, + NotNull: notNull, + }, }) } - if fields != nil { - md := arrow.MetadataFrom(tableMetaData) - tables = append(tables, arrow.NewSchema(fields, &md)) - } return tables, nil } -func (c *Client) normalizeTable(table *arrow.Schema, pgTable *arrow.Schema) *arrow.Schema { - fields := make([]arrow.Field, len(table.Fields())) - for i, f := range table.Fields() { - metadata := make(map[string]string, 0) - if c.enabledPks() && schema.IsPk(f) { - metadata[schema.MetadataPrimaryKey] = schema.MetadataTrue - f.Nullable = false +func (c *Client) normalizeTable(table *schema.Table, pgTable *schema.Table) *schema.Table { + normalizedTable := schema.Table{ + Name: table.Name, + } + for _, f := range table.Columns { + col := f + if c.enabledPks() && f.CreationOptions.PrimaryKey { + col.CreationOptions.NotNull = true } else { - metadata[schema.MetadataPrimaryKey] = schema.MetadataFalse + col.CreationOptions.PrimaryKey = false } - - f.Metadata = arrow.MetadataFrom(metadata) - f.Type = c.PgToSchemaType(c.SchemaTypeToPg(f.Type)) - fields[i] = f + col.Type = c.PgToSchemaType(c.SchemaTypeToPg(col.Type)) + normalizedTable.Columns = append(normalizedTable.Columns, col) } - mdMap := make(map[string]string) - if pgTable != nil { - mdMap[schema.MetadataTableName] = schema.TableName(pgTable) - if constraintName, ok := pgTable.Metadata().GetValue(schema.MetadataConstraintName); ok { - mdMap[schema.MetadataConstraintName] = constraintName - } + + if pgTable != nil && pgTable.PkConstraintName != "" { + normalizedTable.PkConstraintName = pgTable.PkConstraintName } - mdMap[schema.MetadataTableName] = schema.TableName(table) - md := arrow.MetadataFrom(mdMap) - return arrow.NewSchema(fields, &md) + + return &normalizedTable } -func (c *Client) autoMigrateTable(ctx context.Context, table *arrow.Schema, changes []schema.FieldChange) error { - tableName := schema.TableName(table) +func (c *Client) autoMigrateTable(ctx context.Context, table *schema.Table, changes []schema.TableColumnChange) error { + tableName := table.Name for _, change := range changes { switch change.Type { case schema.TableColumnChangeTypeAdd: @@ -191,15 +175,15 @@ func (c *Client) autoMigrateTable(ctx context.Context, table *arrow.Schema, chan return nil } -func (*Client) canAutoMigrate(changes []schema.FieldChange) bool { +func (*Client) canAutoMigrate(changes []schema.TableColumnChange) bool { for _, change := range changes { switch change.Type { case schema.TableColumnChangeTypeAdd: - if schema.IsPk(change.Current) || !change.Current.Nullable { + if change.Current.CreationOptions.PrimaryKey || change.Current.CreationOptions.NotNull { return false } case schema.TableColumnChangeTypeRemove: - if schema.IsPk(change.Previous) || !change.Previous.Nullable { + if change.Previous.CreationOptions.PrimaryKey || change.Previous.CreationOptions.NotNull { return false } case schema.TableColumnChangeTypeUpdate: @@ -212,26 +196,30 @@ func (*Client) canAutoMigrate(changes []schema.FieldChange) bool { } // normalize the requested schema to be compatible with what Postgres supports -func (c *Client) normalizeTables(tables schema.Schemas, pgTables schema.Schemas) schema.Schemas { - var result schema.Schemas +func (c *Client) normalizeTables(tables schema.Tables, pgTables schema.Tables) schema.Tables { + var result schema.Tables for _, table := range tables { - pgTabe := pgTables.SchemaByName(schema.TableName(table)) - result = append(result, c.normalizeTable(table, pgTabe)) + pgTabe := pgTables.Get(table.Name) + if pgTabe == nil { + result = append(result, table) + } else { + result = append(result, c.normalizeTable(table, pgTabe)) + } } return result } -func (c *Client) nonAutoMigrableTables(tables schema.Schemas, pgTables schema.Schemas) ([]string, [][]schema.FieldChange) { +func (c *Client) nonAutoMigrableTables(tables schema.Tables, pgTables schema.Tables) ([]string, [][]schema.TableColumnChange) { var result []string - var tableChanges [][]schema.FieldChange + var tableChanges [][]schema.TableColumnChange for _, t := range tables { - pgTable := pgTables.SchemaByName(schema.TableName(t)) + pgTable := pgTables.Get(t.Name) if pgTable == nil { continue } - changes := schema.GetSchemaChanges(t, pgTable) + changes := t.GetChanges(pgTable) if !c.canAutoMigrate(changes) { - result = append(result, schema.TableName(t)) + result = append(result, t.Name) tableChanges = append(tableChanges, changes) } } @@ -239,7 +227,7 @@ func (c *Client) nonAutoMigrableTables(tables schema.Schemas, pgTables schema.Sc } // This is the responsibility of the CLI of the client to lock before running migration -func (c *Client) Migrate(ctx context.Context, tables schema.Schemas) error { +func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error { pgTables, err := c.listPgTables(ctx, tables) if err != nil { return fmt.Errorf("failed listing postgres tables: %w", err) @@ -253,20 +241,20 @@ func (c *Client) Migrate(ctx context.Context, tables schema.Schemas) error { } for _, table := range tables { - tableName := schema.TableName(table) + tableName := table.Name c.logger.Info().Str("table", tableName).Msg("Migrating table") - if len(table.Fields()) == 0 { + if len(table.Columns) == 0 { c.logger.Info().Str("table", tableName).Msg("Table with no columns, skipping") continue } - pgTable := pgTables.SchemaByName(tableName) + pgTable := pgTables.Get(tableName) if pgTable == nil { c.logger.Debug().Str("table", tableName).Msg("Table doesn't exist, creating") if err := c.createTableIfNotExist(ctx, table); err != nil { return err } } else { - changes := schema.GetSchemaChanges(table, pgTable) + changes := table.GetChanges(pgTable) if c.canAutoMigrate(changes) { c.logger.Info().Str("table", tableName).Msg("Table exists, auto-migrating") if err := c.autoMigrateTable(ctx, table, changes); err != nil { @@ -303,7 +291,7 @@ func (c *Client) dropTable(ctx context.Context, tableName string) error { return nil } -func (c *Client) addColumn(ctx context.Context, tableName string, column arrow.Field) error { +func (c *Client) addColumn(ctx context.Context, tableName string, column schema.Column) error { c.logger.Info().Str("table", tableName).Str("column", column.Name).Msg("Column doesn't exist, creating") columnName := pgx.Identifier{column.Name}.Sanitize() columnType := c.SchemaTypeToPg(column.Type) @@ -314,31 +302,31 @@ func (c *Client) addColumn(ctx context.Context, tableName string, column arrow.F return nil } -func (c *Client) createTableIfNotExist(ctx context.Context, table *arrow.Schema) error { +func (c *Client) createTableIfNotExist(ctx context.Context, table *schema.Table) error { var sb strings.Builder - tName := schema.TableName(table) + tName := table.Name tableName := pgx.Identifier{tName}.Sanitize() sb.WriteString("CREATE TABLE IF NOT EXISTS ") sb.WriteString(tableName) sb.WriteString(" (") - totalColumns := len(table.Fields()) + totalColumns := len(table.Columns) primaryKeys := []string{} - for i, col := range table.Fields() { + for i, col := range table.Columns { pgType := c.SchemaTypeToPg(col.Type) columnName := pgx.Identifier{col.Name}.Sanitize() fieldDef := columnName + " " + pgType - if schema.IsUnique(col) { + if col.CreationOptions.Unique { fieldDef += " UNIQUE" } - if !col.Nullable { + if col.CreationOptions.NotNull { fieldDef += " NOT NULL" } sb.WriteString(fieldDef) if i != totalColumns-1 { sb.WriteString(",") } - if c.enabledPks() && schema.IsPk(col) { + if c.enabledPks() && col.CreationOptions.PrimaryKey { primaryKeys = append(primaryKeys, pgx.Identifier{col.Name}.Sanitize()) } } diff --git a/plugins/destination/postgresql/client/read.go b/plugins/destination/postgresql/client/read.go index f35e8cd93826e9..f3e87a2c5c14e4 100644 --- a/plugins/destination/postgresql/client/read.go +++ b/plugins/destination/postgresql/client/read.go @@ -11,8 +11,8 @@ import ( "github.com/apache/arrow/go/v13/arrow" "github.com/apache/arrow/go/v13/arrow/array" "github.com/apache/arrow/go/v13/arrow/memory" - "github.com/cloudquery/plugin-sdk/v2/schema" - "github.com/cloudquery/plugin-sdk/v2/types" + "github.com/cloudquery/plugin-sdk/v3/schema" + "github.com/cloudquery/plugin-sdk/v3/types" "github.com/google/uuid" "github.com/jackc/pgx/v5" ) @@ -113,7 +113,8 @@ func (c *Client) reverseTransform(f arrow.Field, bldr array.Builder, val any) er return nil } -func (c *Client) reverseTransformer(sc *arrow.Schema, values []any) (arrow.Record, error) { +func (c *Client) reverseTransformer(table *schema.Table, values []any) (arrow.Record, error) { + sc := table.ToArrowSchema() bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc) for i, f := range sc.Fields() { if err := c.reverseTransform(f, bldr.Field(i), values[i]); err != nil { @@ -124,13 +125,13 @@ func (c *Client) reverseTransformer(sc *arrow.Schema, values []any) (arrow.Recor return rec, nil } -func (c *Client) Read(ctx context.Context, table *arrow.Schema, sourceName string, res chan<- arrow.Record) error { - colNames := make([]string, 0, len(table.Fields())) - for _, col := range table.Fields() { +func (c *Client) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- arrow.Record) error { + colNames := make([]string, 0, len(table.Columns)) + for _, col := range table.Columns { colNames = append(colNames, pgx.Identifier{col.Name}.Sanitize()) } cols := strings.Join(colNames, ",") - tableName := schema.TableName(table) + tableName := table.Name sql := fmt.Sprintf(readSQL, cols, pgx.Identifier{tableName}.Sanitize()) rows, err := c.conn.Query(ctx, sql, sourceName) if err != nil { diff --git a/plugins/destination/postgresql/client/transformer.go b/plugins/destination/postgresql/client/transformer.go index 3744d8ad953e7d..ca7f5d51cd5b47 100644 --- a/plugins/destination/postgresql/client/transformer.go +++ b/plugins/destination/postgresql/client/transformer.go @@ -6,7 +6,7 @@ import ( "github.com/apache/arrow/go/v13/arrow" "github.com/apache/arrow/go/v13/arrow/array" - "github.com/cloudquery/plugin-sdk/v2/types" + "github.com/cloudquery/plugin-sdk/v3/types" "github.com/jackc/pgx/v5/pgtype" ) diff --git a/plugins/destination/postgresql/client/types_cockroach.go b/plugins/destination/postgresql/client/types_cockroach.go index a2adf2dfe5e596..6c3a225af11d52 100644 --- a/plugins/destination/postgresql/client/types_cockroach.go +++ b/plugins/destination/postgresql/client/types_cockroach.go @@ -5,7 +5,7 @@ import ( "strings" "github.com/apache/arrow/go/v13/arrow" - "github.com/cloudquery/plugin-sdk/v2/types" + "github.com/cloudquery/plugin-sdk/v3/types" ) func (c *Client) SchemaTypeToCockroach(t arrow.DataType) string { diff --git a/plugins/destination/postgresql/client/types_pg.go b/plugins/destination/postgresql/client/types_pg.go index 3edd52dfd14382..35be6b1020ed87 100644 --- a/plugins/destination/postgresql/client/types_pg.go +++ b/plugins/destination/postgresql/client/types_pg.go @@ -5,7 +5,7 @@ import ( "strings" "github.com/apache/arrow/go/v13/arrow" - "github.com/cloudquery/plugin-sdk/v2/types" + "github.com/cloudquery/plugin-sdk/v3/types" ) func (c *Client) SchemaTypeToPg(t arrow.DataType) string { diff --git a/plugins/destination/postgresql/client/write.go b/plugins/destination/postgresql/client/write.go index 9ad5a9e0916fbe..471362c90dbba3 100644 --- a/plugins/destination/postgresql/client/write.go +++ b/plugins/destination/postgresql/client/write.go @@ -10,7 +10,7 @@ import ( "github.com/apache/arrow/go/v13/arrow" "github.com/cloudquery/plugin-pb-go/specs" - "github.com/cloudquery/plugin-sdk/v2/schema" + "github.com/cloudquery/plugin-sdk/v3/schema" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" ) @@ -54,7 +54,7 @@ func pgErrToStr(err *pgconn.PgError) string { return sb.String() } -func (c *Client) Write(ctx context.Context, tables schema.Schemas, res <-chan arrow.Record) error { +func (c *Client) Write(ctx context.Context, tables schema.Tables, res <-chan arrow.Record) error { var sql string batch := &pgx.Batch{} pgTables, err := c.listPgTables(ctx, tables) @@ -66,15 +66,19 @@ func (c *Client) Write(ctx context.Context, tables schema.Schemas, res <-chan ar return err } for r := range res { - tableName := schema.TableName(r.Schema()) - table := tables.SchemaByName(tableName) + md := r.Schema().Metadata() + tableName, ok := md.GetValue(schema.MetadataTableName) + if !ok { + return fmt.Errorf("table name not found in metadata") + } + table := tables.Get(tableName) if table == nil { - panic(fmt.Errorf("table %s not found", tableName)) + return fmt.Errorf("table %s not found", tableName) } if c.spec.WriteMode == specs.WriteModeAppend { sql = c.insert(table) } else { - if len(schema.PrimaryKeyIndices(table)) > 0 { + if len(table.PrimaryKeysIndexes()) > 0 { sql = c.upsert(table) } else { sql = c.insert(table) @@ -117,13 +121,13 @@ func (c *Client) Write(ctx context.Context, tables schema.Schemas, res <-chan ar return nil } -func (*Client) insert(table *arrow.Schema) string { +func (*Client) insert(table *schema.Table) string { var sb strings.Builder - tableName := schema.TableName(table) + tableName := table.Name sb.WriteString("insert into ") sb.WriteString(pgx.Identifier{tableName}.Sanitize()) sb.WriteString(" (") - columns := table.Fields() + columns := table.Columns columnsLen := len(columns) for i, c := range columns { sb.WriteString(pgx.Identifier{c.Name}.Sanitize()) @@ -144,17 +148,14 @@ func (*Client) insert(table *arrow.Schema) string { return sb.String() } -func (c *Client) upsert(table *arrow.Schema) string { +func (c *Client) upsert(table *schema.Table) string { var sb strings.Builder sb.WriteString(c.insert(table)) - columns := table.Fields() + columns := table.Columns columnsLen := len(columns) - constraintName, ok := table.Metadata().GetValue(schema.MetadataConstraintName) - if !ok { - panic(fmt.Errorf("constraint_name not found in table metadata")) - } + constraintName := table.PkConstraintName sb.WriteString(" on conflict on constraint ") sb.WriteString(pgx.Identifier{constraintName}.Sanitize()) sb.WriteString(" do update set ") diff --git a/plugins/destination/postgresql/go.mod b/plugins/destination/postgresql/go.mod index 3c253d2cb5441e..590de675fb4e75 100644 --- a/plugins/destination/postgresql/go.mod +++ b/plugins/destination/postgresql/go.mod @@ -5,7 +5,7 @@ go 1.19 require ( github.com/apache/arrow/go/v13 v13.0.0-20230509040948-de6c3cd2b604 github.com/cloudquery/plugin-pb-go v1.0.8 - github.com/cloudquery/plugin-sdk/v2 v2.7.0 + github.com/cloudquery/plugin-sdk/v3 v3.0.0 github.com/google/uuid v1.3.0 github.com/jackc/pgx-zerolog v0.0.0-20230315001418-f978528409eb github.com/jackc/pgx/v5 v5.3.1 @@ -16,9 +16,12 @@ require ( // TODO: remove once all updates are merged replace github.com/apache/arrow/go/v13 => github.com/cloudquery/arrow/go/v13 v13.0.0-20230509053643-898a79b1d3c8 +replace github.com/cloudquery/plugin-sdk/v3 => ../../../../plugin-sdk + require ( github.com/andybalholm/brotli v1.0.5 // indirect github.com/apache/thrift v0.16.0 // indirect + github.com/cloudquery/plugin-sdk/v2 v2.7.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/getsentry/sentry-go v0.20.0 // indirect github.com/ghodss/yaml v1.0.0 // indirect @@ -39,6 +42,7 @@ require ( github.com/mattn/go-isatty v0.0.18 // indirect github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect + github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/spf13/cast v1.5.0 // indirect github.com/spf13/cobra v1.6.1 // indirect diff --git a/plugins/destination/postgresql/go.sum b/plugins/destination/postgresql/go.sum index 844c893478c784..186ef2cfe1a675 100644 --- a/plugins/destination/postgresql/go.sum +++ b/plugins/destination/postgresql/go.sum @@ -46,8 +46,13 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cloudquery/arrow/go/v13 v13.0.0-20230509053643-898a79b1d3c8 h1:CmgLSEGQNLHpUQ5cU4L4aF7cuJZRnc1toIIWqC1gmPg= github.com/cloudquery/arrow/go/v13 v13.0.0-20230509053643-898a79b1d3c8/go.mod h1:/XatdE3kDIBqZKhZ7OBUHwP2jaASDFZHqF4puOWM8po= +<<<<<<< HEAD github.com/cloudquery/plugin-pb-go v1.0.8 h1:wn3GXhcNItcP+6wUUZuzUFbvdL59liKBO37/izMi+FQ= github.com/cloudquery/plugin-pb-go v1.0.8/go.mod h1:vAGA27psem7ZZNAY4a3S9TKuA/JDQWstjKcHPJX91Mc= +======= +github.com/cloudquery/plugin-pb-go v1.0.6 h1:jWLXFUGgobO28rBERuipSUbr6Rqoth4nzGZ/XEQD86w= +github.com/cloudquery/plugin-pb-go v1.0.6/go.mod h1:vAGA27psem7ZZNAY4a3S9TKuA/JDQWstjKcHPJX91Mc= +>>>>>>> af340cc1d (feat(postgresql): Update PG to SDK V3 native arrow support) github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U= github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= @@ -184,6 +189,7 @@ github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8D github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= diff --git a/plugins/destination/postgresql/main.go b/plugins/destination/postgresql/main.go index 1425dfbd2bb00f..a62404dea843d7 100644 --- a/plugins/destination/postgresql/main.go +++ b/plugins/destination/postgresql/main.go @@ -3,8 +3,8 @@ package main import ( "github.com/cloudquery/cloudquery/plugins/destination/postgresql/client" "github.com/cloudquery/cloudquery/plugins/destination/postgresql/resources/plugin" - "github.com/cloudquery/plugin-sdk/v2/plugins/destination" - "github.com/cloudquery/plugin-sdk/v2/serve" + "github.com/cloudquery/plugin-sdk/v3/plugins/destination" + "github.com/cloudquery/plugin-sdk/v3/serve" ) const ( From 5955662fab9d27f8d1be34936f868d2c7ef0adab Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Mon, 15 May 2023 17:13:14 +0300 Subject: [PATCH 02/18] fix pg --- .../destination/postgresql/client/migrate.go | 22 +++++++++---------- plugins/destination/postgresql/client/read.go | 2 +- .../destination/postgresql/client/types_pg.go | 4 ++-- 3 files changed, 13 insertions(+), 15 deletions(-) diff --git a/plugins/destination/postgresql/client/migrate.go b/plugins/destination/postgresql/client/migrate.go index 3c111812758e5c..79ea1b822d9a57 100644 --- a/plugins/destination/postgresql/client/migrate.go +++ b/plugins/destination/postgresql/client/migrate.go @@ -127,10 +127,8 @@ func (c *Client) listPgTables(ctx context.Context, pluginTables schema.Tables) ( table.Columns = append(table.Columns, schema.Column{ Name: columnName, Type: schemaType, - CreationOptions: schema.ColumnCreationOptions{ - PrimaryKey: isPrimaryKey, - NotNull: notNull, - }, + PrimaryKey: isPrimaryKey, + NotNull: notNull, }) } return tables, nil @@ -142,10 +140,10 @@ func (c *Client) normalizeTable(table *schema.Table, pgTable *schema.Table) *sch } for _, f := range table.Columns { col := f - if c.enabledPks() && f.CreationOptions.PrimaryKey { - col.CreationOptions.NotNull = true + if c.enabledPks() && f.PrimaryKey { + col.NotNull = true } else { - col.CreationOptions.PrimaryKey = false + col.PrimaryKey = false } col.Type = c.PgToSchemaType(c.SchemaTypeToPg(col.Type)) normalizedTable.Columns = append(normalizedTable.Columns, col) @@ -179,11 +177,11 @@ func (*Client) canAutoMigrate(changes []schema.TableColumnChange) bool { for _, change := range changes { switch change.Type { case schema.TableColumnChangeTypeAdd: - if change.Current.CreationOptions.PrimaryKey || change.Current.CreationOptions.NotNull { + if change.Current.PrimaryKey || change.Current.NotNull { return false } case schema.TableColumnChangeTypeRemove: - if change.Previous.CreationOptions.PrimaryKey || change.Previous.CreationOptions.NotNull { + if change.Previous.PrimaryKey || change.Previous.NotNull { return false } case schema.TableColumnChangeTypeUpdate: @@ -316,17 +314,17 @@ func (c *Client) createTableIfNotExist(ctx context.Context, table *schema.Table) pgType := c.SchemaTypeToPg(col.Type) columnName := pgx.Identifier{col.Name}.Sanitize() fieldDef := columnName + " " + pgType - if col.CreationOptions.Unique { + if col.Unique { fieldDef += " UNIQUE" } - if col.CreationOptions.NotNull { + if col.NotNull { fieldDef += " NOT NULL" } sb.WriteString(fieldDef) if i != totalColumns-1 { sb.WriteString(",") } - if c.enabledPks() && col.CreationOptions.PrimaryKey { + if c.enabledPks() && col.PrimaryKey { primaryKeys = append(primaryKeys, pgx.Identifier{col.Name}.Sanitize()) } } diff --git a/plugins/destination/postgresql/client/read.go b/plugins/destination/postgresql/client/read.go index f3e87a2c5c14e4..d376e6294615c9 100644 --- a/plugins/destination/postgresql/client/read.go +++ b/plugins/destination/postgresql/client/read.go @@ -83,7 +83,7 @@ func (c *Client) reverseTransform(f arrow.Field, bldr array.Builder, val any) er return nil } b.Append(val.(*net.IPNet)) - case *types.MacBuilder: + case *types.MACBuilder: if c.pgType == pgTypePostgreSQL { b.Append(val.(net.HardwareAddr)) } else { diff --git a/plugins/destination/postgresql/client/types_pg.go b/plugins/destination/postgresql/client/types_pg.go index 35be6b1020ed87..5dc6951f822f8c 100644 --- a/plugins/destination/postgresql/client/types_pg.go +++ b/plugins/destination/postgresql/client/types_pg.go @@ -60,7 +60,7 @@ func (c *Client) SchemaTypeToPg10(t arrow.DataType) string { return "jsonb" case *types.InetType: return "inet" - case *types.MacType: + case *types.MACType: return "macaddr" default: return "text" @@ -99,7 +99,7 @@ func (c *Client) Pg10ToSchemaType(t string) arrow.DataType { case "cidr": return types.ExtensionTypes.Inet case "macaddr", "macaddr8": - return types.ExtensionTypes.Mac + return types.ExtensionTypes.MAC case "inet": return types.ExtensionTypes.Inet default: From 11cdc55f0b5b236c2d2c99234231abfd732d5ef1 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Mon, 15 May 2023 18:50:59 +0300 Subject: [PATCH 03/18] fix postgresql --- plugins/destination/postgresql/client/transformer.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/plugins/destination/postgresql/client/transformer.go b/plugins/destination/postgresql/client/transformer.go index ca7f5d51cd5b47..ad415292ad347b 100644 --- a/plugins/destination/postgresql/client/transformer.go +++ b/plugins/destination/postgresql/client/transformer.go @@ -106,7 +106,7 @@ func transformArr(arr arrow.Array) []any { } case *array.Timestamp: pgArr[i] = pgtype.Timestamptz{ - Time: a.Value(i).ToTime(arrow.Microsecond), + Time: a.Value(i).ToTime(arrow.Microsecond).UTC(), Valid: a.IsValid(i), } case *types.UUIDArray: @@ -123,6 +123,8 @@ func transformArr(arr arrow.Array) []any { start, end := a.ValueOffsets(i) nested := array.NewSlice(a.ListValues(), start, end) pgArr[i] = transformArr(nested) + case *types.JSONArray: + pgArr[i] = a.Storage().(*array.Binary).Value(i) default: pgArr[i] = stripNulls(arr.ValueStr(i)) } From 78c1bd6b08106ae2d525c425b1ce6a75284d3938 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Tue, 16 May 2023 00:58:43 +0300 Subject: [PATCH 04/18] fix --- plugins/destination/postgresql/client/read.go | 35 +++++++++++++++---- .../postgresql/client/transformer.go | 27 +++++++++++++- .../postgresql/client/types_cockroach.go | 6 ++-- plugins/destination/postgresql/go.mod | 6 ++-- plugins/destination/postgresql/go.sum | 7 ++++ 5 files changed, 66 insertions(+), 15 deletions(-) diff --git a/plugins/destination/postgresql/client/read.go b/plugins/destination/postgresql/client/read.go index d376e6294615c9..c8ce2ba65a46c3 100644 --- a/plugins/destination/postgresql/client/read.go +++ b/plugins/destination/postgresql/client/read.go @@ -1,6 +1,7 @@ package client import ( + "bytes" "context" "fmt" "net" @@ -8,6 +9,8 @@ import ( "strings" "time" + "github.com/goccy/go-json" + "github.com/apache/arrow/go/v13/arrow" "github.com/apache/arrow/go/v13/arrow/array" "github.com/apache/arrow/go/v13/arrow/memory" @@ -26,11 +29,13 @@ func (c *Client) reverseTransform(f arrow.Field, bldr array.Builder, val any) er bldr.AppendNull() return nil } + switch b := bldr.(type) { case *array.BooleanBuilder: b.Append(val.(bool)) case *array.Int8Builder: - b.Append(val.(int8)) + // pgx always return int16 for int8 + b.Append(int8(val.(int16))) case *array.Int16Builder: b.Append(val.(int16)) case *array.Int32Builder: @@ -38,13 +43,13 @@ func (c *Client) reverseTransform(f arrow.Field, bldr array.Builder, val any) er case *array.Int64Builder: b.Append(val.(int64)) case *array.Uint8Builder: - b.Append(val.(uint8)) + b.Append(uint8(val.(int16))) case *array.Uint16Builder: - b.Append(val.(uint16)) + b.Append(uint16(val.(int16))) case *array.Uint32Builder: - b.Append(val.(uint32)) + b.Append(uint32(val.(int32))) case *array.Uint64Builder: - b.Append(val.(uint64)) + b.Append(uint64(val.(int64))) case *array.Float32Builder: b.Append(val.(float32)) case *array.Float64Builder: @@ -60,7 +65,18 @@ func (c *Client) reverseTransform(f arrow.Field, bldr array.Builder, val any) er case *array.BinaryBuilder: b.Append(val.([]byte)) case *array.TimestampBuilder: - b.Append(arrow.Timestamp(val.(time.Time).UnixMicro())) + switch b.Type().(*arrow.TimestampType).Unit { + case arrow.Second: + b.Append(arrow.Timestamp(val.(time.Time).Unix())) + case arrow.Millisecond: + b.Append(arrow.Timestamp(val.(time.Time).UnixMilli())) + case arrow.Microsecond: + b.Append(arrow.Timestamp(val.(time.Time).UnixMicro())) + case arrow.Nanosecond: + b.Append(arrow.Timestamp(val.(time.Time).UnixNano())) + default: + return fmt.Errorf("unsupported timestamp unit %s", f.Type.(*arrow.TimestampType).Unit) + } case *types.UUIDBuilder: va, ok := val.([16]byte) if !ok { @@ -73,6 +89,13 @@ func (c *Client) reverseTransform(f arrow.Field, bldr array.Builder, val any) er b.Append(u) case *types.JSONBuilder: b.Append(val) + case *array.StructBuilder: + structBytes, err := json.Marshal(val) + if err != nil { + return fmt.Errorf("failed to marshal struct: %w", err) + } + dec := json.NewDecoder(bytes.NewReader(structBytes)) + b.UnmarshalOne(dec) case *types.InetBuilder: if v, ok := val.(netip.Prefix); ok { _, ipnet, err := net.ParseCIDR(v.String()) diff --git a/plugins/destination/postgresql/client/transformer.go b/plugins/destination/postgresql/client/transformer.go index ad415292ad347b..cfb68fc41d01dd 100644 --- a/plugins/destination/postgresql/client/transformer.go +++ b/plugins/destination/postgresql/client/transformer.go @@ -65,6 +65,11 @@ func transformArr(arr arrow.Array) []any { Bool: a.Value(i), Valid: a.IsValid(i), } + case *array.Int8: + pgArr[i] = pgtype.Int2{ + Int16: int16(a.Value(i)), + Valid: a.IsValid(i), + } case *array.Int16: pgArr[i] = pgtype.Int2{ Int16: a.Value(i), @@ -80,6 +85,26 @@ func transformArr(arr arrow.Array) []any { Int64: a.Value(i), Valid: a.IsValid(i), } + case *array.Uint8: + pgArr[i] = pgtype.Int2{ + Int16: int16(a.Value(i)), + Valid: a.IsValid(i), + } + case *array.Uint16: + pgArr[i] = pgtype.Int2{ + Int16: int16(a.Value(i)), + Valid: a.IsValid(i), + } + case *array.Uint32: + pgArr[i] = pgtype.Int4{ + Int32: int32(a.Value(i)), + Valid: a.IsValid(i), + } + case *array.Uint64: + pgArr[i] = pgtype.Int8{ + Int64: int64(a.Value(i)), + Valid: a.IsValid(i), + } case *array.Float32: pgArr[i] = pgtype.Float4{ Float32: a.Value(i), @@ -106,7 +131,7 @@ func transformArr(arr arrow.Array) []any { } case *array.Timestamp: pgArr[i] = pgtype.Timestamptz{ - Time: a.Value(i).ToTime(arrow.Microsecond).UTC(), + Time: a.Value(i).ToTime(a.DataType().(*arrow.TimestampType).Unit).UTC(), Valid: a.IsValid(i), } case *types.UUIDArray: diff --git a/plugins/destination/postgresql/client/types_cockroach.go b/plugins/destination/postgresql/client/types_cockroach.go index 6c3a225af11d52..a6b416a4fcc7cc 100644 --- a/plugins/destination/postgresql/client/types_cockroach.go +++ b/plugins/destination/postgresql/client/types_cockroach.go @@ -16,13 +16,11 @@ func (c *Client) SchemaTypeToCockroach(t arrow.DataType) string { return c.SchemaTypeToCockroach(v.Elem()) + fmt.Sprintf("[%d]", v.Len()) case *arrow.BooleanType: return "boolean" - case *arrow.Int8Type, *arrow.Uint8Type: - return "smallint" - case *arrow.Int16Type, *arrow.Uint16Type: + case *arrow.Int8Type, *arrow.Uint8Type, *arrow.Int16Type, *arrow.Uint16Type: return "smallint" case *arrow.Int32Type, *arrow.Uint32Type: return "integer" - case *arrow.Int64Type, *arrow.Uint64Type: + case *arrow.Int64Type, *arrow.Uint64Type: return "bigint" case *arrow.Float32Type: return "real" diff --git a/plugins/destination/postgresql/go.mod b/plugins/destination/postgresql/go.mod index 590de675fb4e75..4bca5210181440 100644 --- a/plugins/destination/postgresql/go.mod +++ b/plugins/destination/postgresql/go.mod @@ -5,7 +5,8 @@ go 1.19 require ( github.com/apache/arrow/go/v13 v13.0.0-20230509040948-de6c3cd2b604 github.com/cloudquery/plugin-pb-go v1.0.8 - github.com/cloudquery/plugin-sdk/v3 v3.0.0 + github.com/cloudquery/plugin-sdk/v3 v3.3.0 + github.com/goccy/go-json v0.9.11 github.com/google/uuid v1.3.0 github.com/jackc/pgx-zerolog v0.0.0-20230315001418-f978528409eb github.com/jackc/pgx/v5 v5.3.1 @@ -16,8 +17,6 @@ require ( // TODO: remove once all updates are merged replace github.com/apache/arrow/go/v13 => github.com/cloudquery/arrow/go/v13 v13.0.0-20230509053643-898a79b1d3c8 -replace github.com/cloudquery/plugin-sdk/v3 => ../../../../plugin-sdk - require ( github.com/andybalholm/brotli v1.0.5 // indirect github.com/apache/thrift v0.16.0 // indirect @@ -25,7 +24,6 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/getsentry/sentry-go v0.20.0 // indirect github.com/ghodss/yaml v1.0.0 // indirect - github.com/goccy/go-json v0.9.11 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/flatbuffers v2.0.8+incompatible // indirect diff --git a/plugins/destination/postgresql/go.sum b/plugins/destination/postgresql/go.sum index 186ef2cfe1a675..c95ea9df7fe1a2 100644 --- a/plugins/destination/postgresql/go.sum +++ b/plugins/destination/postgresql/go.sum @@ -47,14 +47,21 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cloudquery/arrow/go/v13 v13.0.0-20230509053643-898a79b1d3c8 h1:CmgLSEGQNLHpUQ5cU4L4aF7cuJZRnc1toIIWqC1gmPg= github.com/cloudquery/arrow/go/v13 v13.0.0-20230509053643-898a79b1d3c8/go.mod h1:/XatdE3kDIBqZKhZ7OBUHwP2jaASDFZHqF4puOWM8po= <<<<<<< HEAD +<<<<<<< HEAD github.com/cloudquery/plugin-pb-go v1.0.8 h1:wn3GXhcNItcP+6wUUZuzUFbvdL59liKBO37/izMi+FQ= github.com/cloudquery/plugin-pb-go v1.0.8/go.mod h1:vAGA27psem7ZZNAY4a3S9TKuA/JDQWstjKcHPJX91Mc= ======= github.com/cloudquery/plugin-pb-go v1.0.6 h1:jWLXFUGgobO28rBERuipSUbr6Rqoth4nzGZ/XEQD86w= github.com/cloudquery/plugin-pb-go v1.0.6/go.mod h1:vAGA27psem7ZZNAY4a3S9TKuA/JDQWstjKcHPJX91Mc= >>>>>>> af340cc1d (feat(postgresql): Update PG to SDK V3 native arrow support) +======= +github.com/cloudquery/plugin-pb-go v1.0.8 h1:wn3GXhcNItcP+6wUUZuzUFbvdL59liKBO37/izMi+FQ= +github.com/cloudquery/plugin-pb-go v1.0.8/go.mod h1:vAGA27psem7ZZNAY4a3S9TKuA/JDQWstjKcHPJX91Mc= +>>>>>>> 363aad2b1 (fix) github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U= github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug= +github.com/cloudquery/plugin-sdk/v3 v3.3.0 h1:jTj9bJqMJZUg6WcohIwA/XQic6FgUn+q179gXQraWts= +github.com/cloudquery/plugin-sdk/v3 v3.3.0/go.mod h1:3JrZXEULmGXpkOukVaRIzaA63d7TJr9Ukp6hemTjbtc= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= From 7d07366648bdbfd245fa79dd628f7ee17a349f1d Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Tue, 16 May 2023 01:05:31 +0300 Subject: [PATCH 05/18] gomod --- plugins/destination/postgresql/go.sum | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/plugins/destination/postgresql/go.sum b/plugins/destination/postgresql/go.sum index c95ea9df7fe1a2..82919f43f58750 100644 --- a/plugins/destination/postgresql/go.sum +++ b/plugins/destination/postgresql/go.sum @@ -46,18 +46,8 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cloudquery/arrow/go/v13 v13.0.0-20230509053643-898a79b1d3c8 h1:CmgLSEGQNLHpUQ5cU4L4aF7cuJZRnc1toIIWqC1gmPg= github.com/cloudquery/arrow/go/v13 v13.0.0-20230509053643-898a79b1d3c8/go.mod h1:/XatdE3kDIBqZKhZ7OBUHwP2jaASDFZHqF4puOWM8po= -<<<<<<< HEAD -<<<<<<< HEAD github.com/cloudquery/plugin-pb-go v1.0.8 h1:wn3GXhcNItcP+6wUUZuzUFbvdL59liKBO37/izMi+FQ= github.com/cloudquery/plugin-pb-go v1.0.8/go.mod h1:vAGA27psem7ZZNAY4a3S9TKuA/JDQWstjKcHPJX91Mc= -======= -github.com/cloudquery/plugin-pb-go v1.0.6 h1:jWLXFUGgobO28rBERuipSUbr6Rqoth4nzGZ/XEQD86w= -github.com/cloudquery/plugin-pb-go v1.0.6/go.mod h1:vAGA27psem7ZZNAY4a3S9TKuA/JDQWstjKcHPJX91Mc= ->>>>>>> af340cc1d (feat(postgresql): Update PG to SDK V3 native arrow support) -======= -github.com/cloudquery/plugin-pb-go v1.0.8 h1:wn3GXhcNItcP+6wUUZuzUFbvdL59liKBO37/izMi+FQ= -github.com/cloudquery/plugin-pb-go v1.0.8/go.mod h1:vAGA27psem7ZZNAY4a3S9TKuA/JDQWstjKcHPJX91Mc= ->>>>>>> 363aad2b1 (fix) github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U= github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug= github.com/cloudquery/plugin-sdk/v3 v3.3.0 h1:jTj9bJqMJZUg6WcohIwA/XQic6FgUn+q179gXQraWts= From 4f72cf766c3f622b9cec8e7136b90ffa85bc3148 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Tue, 16 May 2023 01:15:38 +0300 Subject: [PATCH 06/18] fix fmt --- plugins/destination/postgresql/client/migrate.go | 4 ++-- plugins/destination/postgresql/client/read.go | 6 ++++-- plugins/destination/postgresql/client/types_cockroach.go | 2 +- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/plugins/destination/postgresql/client/migrate.go b/plugins/destination/postgresql/client/migrate.go index 79ea1b822d9a57..2a72b2f07f952a 100644 --- a/plugins/destination/postgresql/client/migrate.go +++ b/plugins/destination/postgresql/client/migrate.go @@ -125,8 +125,8 @@ func (c *Client) listPgTables(ctx context.Context, pluginTables schema.Tables) ( } schemaType := c.PgToSchemaType(columnType) table.Columns = append(table.Columns, schema.Column{ - Name: columnName, - Type: schemaType, + Name: columnName, + Type: schemaType, PrimaryKey: isPrimaryKey, NotNull: notNull, }) diff --git a/plugins/destination/postgresql/client/read.go b/plugins/destination/postgresql/client/read.go index c8ce2ba65a46c3..ed15d5cda5cf4a 100644 --- a/plugins/destination/postgresql/client/read.go +++ b/plugins/destination/postgresql/client/read.go @@ -65,7 +65,7 @@ func (c *Client) reverseTransform(f arrow.Field, bldr array.Builder, val any) er case *array.BinaryBuilder: b.Append(val.([]byte)) case *array.TimestampBuilder: - switch b.Type().(*arrow.TimestampType).Unit { + switch b.Type().(*arrow.TimestampType).Unit { case arrow.Second: b.Append(arrow.Timestamp(val.(time.Time).Unix())) case arrow.Millisecond: @@ -95,7 +95,9 @@ func (c *Client) reverseTransform(f arrow.Field, bldr array.Builder, val any) er return fmt.Errorf("failed to marshal struct: %w", err) } dec := json.NewDecoder(bytes.NewReader(structBytes)) - b.UnmarshalOne(dec) + if err := b.UnmarshalOne(dec); err != nil { + return fmt.Errorf("failed to unmarshal struct: %w", err) + } case *types.InetBuilder: if v, ok := val.(netip.Prefix); ok { _, ipnet, err := net.ParseCIDR(v.String()) diff --git a/plugins/destination/postgresql/client/types_cockroach.go b/plugins/destination/postgresql/client/types_cockroach.go index a6b416a4fcc7cc..6b019d3faee992 100644 --- a/plugins/destination/postgresql/client/types_cockroach.go +++ b/plugins/destination/postgresql/client/types_cockroach.go @@ -20,7 +20,7 @@ func (c *Client) SchemaTypeToCockroach(t arrow.DataType) string { return "smallint" case *arrow.Int32Type, *arrow.Uint32Type: return "integer" - case *arrow.Int64Type, *arrow.Uint64Type: + case *arrow.Int64Type, *arrow.Uint64Type: return "bigint" case *arrow.Float32Type: return "real" From 48a3cb4d26d30aeabf68dce38234dcea87d6b7c9 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Tue, 16 May 2023 01:42:49 +0300 Subject: [PATCH 07/18] cockroach fixes --- plugins/destination/postgresql/client/read.go | 14 +- .../postgresql/client/read_cockroach.go | 131 ++++++++++++++++++ 2 files changed, 141 insertions(+), 4 deletions(-) create mode 100644 plugins/destination/postgresql/client/read_cockroach.go diff --git a/plugins/destination/postgresql/client/read.go b/plugins/destination/postgresql/client/read.go index ed15d5cda5cf4a..e664724d7311c6 100644 --- a/plugins/destination/postgresql/client/read.go +++ b/plugins/destination/postgresql/client/read.go @@ -39,7 +39,7 @@ func (c *Client) reverseTransform(f arrow.Field, bldr array.Builder, val any) er case *array.Int16Builder: b.Append(val.(int16)) case *array.Int32Builder: - b.Append(val.(int32)) + b.Append(int32(val.(int64))) case *array.Int64Builder: b.Append(val.(int64)) case *array.Uint8Builder: @@ -47,7 +47,7 @@ func (c *Client) reverseTransform(f arrow.Field, bldr array.Builder, val any) er case *array.Uint16Builder: b.Append(uint16(val.(int16))) case *array.Uint32Builder: - b.Append(uint32(val.(int32))) + b.Append(uint32(val.(int64))) case *array.Uint64Builder: b.Append(uint64(val.(int64))) case *array.Float32Builder: @@ -142,8 +142,14 @@ func (c *Client) reverseTransformer(table *schema.Table, values []any) (arrow.Re sc := table.ToArrowSchema() bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc) for i, f := range sc.Fields() { - if err := c.reverseTransform(f, bldr.Field(i), values[i]); err != nil { - return nil, err + if c.pgType == pgTypePostgreSQL { + if err := c.reverseTransform(f, bldr.Field(i), values[i]); err != nil { + return nil, err + } + } else { + if err := c.reverseTransformCockroach(f, bldr.Field(i), values[i]); err != nil { + return nil, err + } } } rec := bldr.NewRecord() diff --git a/plugins/destination/postgresql/client/read_cockroach.go b/plugins/destination/postgresql/client/read_cockroach.go new file mode 100644 index 00000000000000..11e2329acbea91 --- /dev/null +++ b/plugins/destination/postgresql/client/read_cockroach.go @@ -0,0 +1,131 @@ +package client + +import ( + "bytes" + "fmt" + "net" + "net/netip" + "time" + + "github.com/goccy/go-json" + + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/array" + "github.com/cloudquery/plugin-sdk/v3/types" + "github.com/google/uuid" +) + + +func (c *Client) reverseTransformCockroach(f arrow.Field, bldr array.Builder, val any) error { + if val == nil { + bldr.AppendNull() + return nil + } + + switch b := bldr.(type) { + case *array.BooleanBuilder: + b.Append(val.(bool)) + case *array.Int8Builder: + // pgx always return int16 for int8 + b.Append(int8(val.(int16))) + case *array.Int16Builder: + b.Append(val.(int16)) + case *array.Int32Builder: + b.Append(int32(val.(int64))) + case *array.Int64Builder: + b.Append(val.(int64)) + case *array.Uint8Builder: + b.Append(uint8(val.(int16))) + case *array.Uint16Builder: + b.Append(uint16(val.(int16))) + case *array.Uint32Builder: + b.Append(uint32(val.(int64))) + case *array.Uint64Builder: + b.Append(uint64(val.(int64))) + case *array.Float32Builder: + b.Append(val.(float32)) + case *array.Float64Builder: + b.Append(val.(float64)) + case *array.StringBuilder: + va, ok := val.(string) + if !ok { + return fmt.Errorf("unsupported type %T with builder %T and column %s", val, bldr, f.Name) + } + b.Append(va) + case *array.LargeStringBuilder: + b.Append(val.(string)) + case *array.BinaryBuilder: + b.Append(val.([]byte)) + case *array.TimestampBuilder: + switch b.Type().(*arrow.TimestampType).Unit { + case arrow.Second: + b.Append(arrow.Timestamp(val.(time.Time).Unix())) + case arrow.Millisecond: + b.Append(arrow.Timestamp(val.(time.Time).UnixMilli())) + case arrow.Microsecond: + b.Append(arrow.Timestamp(val.(time.Time).UnixMicro())) + case arrow.Nanosecond: + b.Append(arrow.Timestamp(val.(time.Time).UnixNano())) + default: + return fmt.Errorf("unsupported timestamp unit %s", f.Type.(*arrow.TimestampType).Unit) + } + case *types.UUIDBuilder: + va, ok := val.([16]byte) + if !ok { + return fmt.Errorf("unsupported type %T with builder %T", val, bldr) + } + u, err := uuid.FromBytes(va[:]) + if err != nil { + return err + } + b.Append(u) + case *types.JSONBuilder: + b.Append(val) + case *array.StructBuilder: + structBytes, err := json.Marshal(val) + if err != nil { + return fmt.Errorf("failed to marshal struct: %w", err) + } + dec := json.NewDecoder(bytes.NewReader(structBytes)) + if err := b.UnmarshalOne(dec); err != nil { + return fmt.Errorf("failed to unmarshal struct: %w", err) + } + case *types.InetBuilder: + if v, ok := val.(netip.Prefix); ok { + _, ipnet, err := net.ParseCIDR(v.String()) + if err != nil { + return err + } + b.Append(ipnet) + return nil + } + b.Append(val.(*net.IPNet)) + case *types.MACBuilder: + if c.pgType == pgTypePostgreSQL { + b.Append(val.(net.HardwareAddr)) + } else { + hardwareAddr, err := net.ParseMAC(val.(string)) + if err != nil { + return err + } + b.Append(hardwareAddr) + } + case array.ListLikeBuilder: + b.Append(true) + valBuilder := b.ValueBuilder() + for _, v := range val.([]any) { + if err := c.reverseTransform(f, valBuilder, v); err != nil { + return err + } + } + default: + v, ok := val.(string) + if !ok { + return fmt.Errorf("unsupported type %T with builder %T", val, bldr) + } + if err := bldr.AppendValueFromString(v); err != nil { + return err + } + } + return nil +} \ No newline at end of file From 5d4aaeeb5e1aff52b4b1ae0a0e0d2e80037c3074 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Tue, 16 May 2023 15:59:08 +0300 Subject: [PATCH 08/18] fix pg --- plugins/destination/postgresql/client/read.go | 4 ++-- plugins/destination/postgresql/client/read_cockroach.go | 2 +- plugins/destination/postgresql/client/types_cockroach.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/plugins/destination/postgresql/client/read.go b/plugins/destination/postgresql/client/read.go index e664724d7311c6..c27e01ed8bdc61 100644 --- a/plugins/destination/postgresql/client/read.go +++ b/plugins/destination/postgresql/client/read.go @@ -39,7 +39,7 @@ func (c *Client) reverseTransform(f arrow.Field, bldr array.Builder, val any) er case *array.Int16Builder: b.Append(val.(int16)) case *array.Int32Builder: - b.Append(int32(val.(int64))) + b.Append(int32(val.(int32))) case *array.Int64Builder: b.Append(val.(int64)) case *array.Uint8Builder: @@ -47,7 +47,7 @@ func (c *Client) reverseTransform(f arrow.Field, bldr array.Builder, val any) er case *array.Uint16Builder: b.Append(uint16(val.(int16))) case *array.Uint32Builder: - b.Append(uint32(val.(int64))) + b.Append(uint32(val.(int32))) case *array.Uint64Builder: b.Append(uint64(val.(int64))) case *array.Float32Builder: diff --git a/plugins/destination/postgresql/client/read_cockroach.go b/plugins/destination/postgresql/client/read_cockroach.go index 11e2329acbea91..6196e81dbe5ba2 100644 --- a/plugins/destination/postgresql/client/read_cockroach.go +++ b/plugins/destination/postgresql/client/read_cockroach.go @@ -114,7 +114,7 @@ func (c *Client) reverseTransformCockroach(f arrow.Field, bldr array.Builder, va b.Append(true) valBuilder := b.ValueBuilder() for _, v := range val.([]any) { - if err := c.reverseTransform(f, valBuilder, v); err != nil { + if err := c.reverseTransformCockroach(f, valBuilder, v); err != nil { return err } } diff --git a/plugins/destination/postgresql/client/types_cockroach.go b/plugins/destination/postgresql/client/types_cockroach.go index 6b019d3faee992..9f8876e0b089df 100644 --- a/plugins/destination/postgresql/client/types_cockroach.go +++ b/plugins/destination/postgresql/client/types_cockroach.go @@ -56,7 +56,7 @@ func (c *Client) CockroachToSchemaType(t string) arrow.DataType { switch t { case "boolean": return arrow.FixedWidthTypes.Boolean - case "bigint", "int", "oid", "serial": + case "bigint", "int", "oid", "serial", "integer": return arrow.PrimitiveTypes.Int64 case "decimal", "float", "real", "double precision": return arrow.PrimitiveTypes.Float64 From 05acb71744bf20e1633ad0f38c0dcaac525f7427 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Tue, 16 May 2023 16:03:49 +0300 Subject: [PATCH 09/18] fix lint --- plugins/destination/postgresql/client/read.go | 2 +- plugins/destination/postgresql/client/read_cockroach.go | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/plugins/destination/postgresql/client/read.go b/plugins/destination/postgresql/client/read.go index c27e01ed8bdc61..5e20eb123f3c7b 100644 --- a/plugins/destination/postgresql/client/read.go +++ b/plugins/destination/postgresql/client/read.go @@ -149,7 +149,7 @@ func (c *Client) reverseTransformer(table *schema.Table, values []any) (arrow.Re } else { if err := c.reverseTransformCockroach(f, bldr.Field(i), values[i]); err != nil { return nil, err - } + } } } rec := bldr.NewRecord() diff --git a/plugins/destination/postgresql/client/read_cockroach.go b/plugins/destination/postgresql/client/read_cockroach.go index 6196e81dbe5ba2..cbe55828b742d0 100644 --- a/plugins/destination/postgresql/client/read_cockroach.go +++ b/plugins/destination/postgresql/client/read_cockroach.go @@ -15,7 +15,6 @@ import ( "github.com/google/uuid" ) - func (c *Client) reverseTransformCockroach(f arrow.Field, bldr array.Builder, val any) error { if val == nil { bldr.AppendNull() @@ -128,4 +127,4 @@ func (c *Client) reverseTransformCockroach(f arrow.Field, bldr array.Builder, va } } return nil -} \ No newline at end of file +} From 12bad7478d6a8d9f6a42231bd82155dc6baa2294 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Tue, 16 May 2023 16:07:07 +0300 Subject: [PATCH 10/18] commit --- plugins/destination/postgresql/client/read.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/destination/postgresql/client/read.go b/plugins/destination/postgresql/client/read.go index 5e20eb123f3c7b..ff31c374856f55 100644 --- a/plugins/destination/postgresql/client/read.go +++ b/plugins/destination/postgresql/client/read.go @@ -39,7 +39,7 @@ func (c *Client) reverseTransform(f arrow.Field, bldr array.Builder, val any) er case *array.Int16Builder: b.Append(val.(int16)) case *array.Int32Builder: - b.Append(int32(val.(int32))) + b.Append(val.(int32)) case *array.Int64Builder: b.Append(val.(int64)) case *array.Uint8Builder: From 5da147ae7b9e2624534a05713b4f9c9d08684731 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Tue, 16 May 2023 16:41:34 +0300 Subject: [PATCH 11/18] fix tests --- plugins/destination/postgresql/go.mod | 2 +- plugins/destination/postgresql/go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/destination/postgresql/go.mod b/plugins/destination/postgresql/go.mod index 4bca5210181440..f7fdba1d5e43da 100644 --- a/plugins/destination/postgresql/go.mod +++ b/plugins/destination/postgresql/go.mod @@ -5,7 +5,7 @@ go 1.19 require ( github.com/apache/arrow/go/v13 v13.0.0-20230509040948-de6c3cd2b604 github.com/cloudquery/plugin-pb-go v1.0.8 - github.com/cloudquery/plugin-sdk/v3 v3.3.0 + github.com/cloudquery/plugin-sdk/v3 v3.5.0 github.com/goccy/go-json v0.9.11 github.com/google/uuid v1.3.0 github.com/jackc/pgx-zerolog v0.0.0-20230315001418-f978528409eb diff --git a/plugins/destination/postgresql/go.sum b/plugins/destination/postgresql/go.sum index 82919f43f58750..185f9579f2b111 100644 --- a/plugins/destination/postgresql/go.sum +++ b/plugins/destination/postgresql/go.sum @@ -50,8 +50,8 @@ github.com/cloudquery/plugin-pb-go v1.0.8 h1:wn3GXhcNItcP+6wUUZuzUFbvdL59liKBO37 github.com/cloudquery/plugin-pb-go v1.0.8/go.mod h1:vAGA27psem7ZZNAY4a3S9TKuA/JDQWstjKcHPJX91Mc= github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U= github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug= -github.com/cloudquery/plugin-sdk/v3 v3.3.0 h1:jTj9bJqMJZUg6WcohIwA/XQic6FgUn+q179gXQraWts= -github.com/cloudquery/plugin-sdk/v3 v3.3.0/go.mod h1:3JrZXEULmGXpkOukVaRIzaA63d7TJr9Ukp6hemTjbtc= +github.com/cloudquery/plugin-sdk/v3 v3.5.0 h1:hE+VCyCkzDXBVdZ2UB/2/tHxy9swK3/GphjIpwZuU4s= +github.com/cloudquery/plugin-sdk/v3 v3.5.0/go.mod h1:3JrZXEULmGXpkOukVaRIzaA63d7TJr9Ukp6hemTjbtc= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= From c8088eb3c2626294e615ce4771a02fd8c2791793 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Tue, 16 May 2023 17:55:17 +0300 Subject: [PATCH 12/18] update sdk --- plugins/destination/postgresql/go.mod | 2 +- plugins/destination/postgresql/go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/destination/postgresql/go.mod b/plugins/destination/postgresql/go.mod index f7fdba1d5e43da..f2bb9747205116 100644 --- a/plugins/destination/postgresql/go.mod +++ b/plugins/destination/postgresql/go.mod @@ -5,7 +5,7 @@ go 1.19 require ( github.com/apache/arrow/go/v13 v13.0.0-20230509040948-de6c3cd2b604 github.com/cloudquery/plugin-pb-go v1.0.8 - github.com/cloudquery/plugin-sdk/v3 v3.5.0 + github.com/cloudquery/plugin-sdk/v3 v3.5.1 github.com/goccy/go-json v0.9.11 github.com/google/uuid v1.3.0 github.com/jackc/pgx-zerolog v0.0.0-20230315001418-f978528409eb diff --git a/plugins/destination/postgresql/go.sum b/plugins/destination/postgresql/go.sum index 185f9579f2b111..2daea4ff00526f 100644 --- a/plugins/destination/postgresql/go.sum +++ b/plugins/destination/postgresql/go.sum @@ -50,8 +50,8 @@ github.com/cloudquery/plugin-pb-go v1.0.8 h1:wn3GXhcNItcP+6wUUZuzUFbvdL59liKBO37 github.com/cloudquery/plugin-pb-go v1.0.8/go.mod h1:vAGA27psem7ZZNAY4a3S9TKuA/JDQWstjKcHPJX91Mc= github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U= github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug= -github.com/cloudquery/plugin-sdk/v3 v3.5.0 h1:hE+VCyCkzDXBVdZ2UB/2/tHxy9swK3/GphjIpwZuU4s= -github.com/cloudquery/plugin-sdk/v3 v3.5.0/go.mod h1:3JrZXEULmGXpkOukVaRIzaA63d7TJr9Ukp6hemTjbtc= +github.com/cloudquery/plugin-sdk/v3 v3.5.1 h1:797hWUEsojwvp7xtr6LSaf5tk5iG9UDixoRACxu3xrU= +github.com/cloudquery/plugin-sdk/v3 v3.5.1/go.mod h1:3JrZXEULmGXpkOukVaRIzaA63d7TJr9Ukp6hemTjbtc= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= From b0e9b559e58200e6b33a12b5c6695e2d2b87a0b0 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Wed, 17 May 2023 11:43:40 +0300 Subject: [PATCH 13/18] fix uints --- plugins/destination/postgresql/client/read.go | 8 +++++--- .../postgresql/client/read_cockroach.go | 8 +++++--- .../postgresql/client/types_cockroach.go | 18 +++++++++++++----- .../destination/postgresql/client/types_pg.go | 10 +++++++--- 4 files changed, 30 insertions(+), 14 deletions(-) diff --git a/plugins/destination/postgresql/client/read.go b/plugins/destination/postgresql/client/read.go index ff31c374856f55..60c68531e3d66d 100644 --- a/plugins/destination/postgresql/client/read.go +++ b/plugins/destination/postgresql/client/read.go @@ -18,6 +18,7 @@ import ( "github.com/cloudquery/plugin-sdk/v3/types" "github.com/google/uuid" "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" ) const ( @@ -45,11 +46,12 @@ func (c *Client) reverseTransform(f arrow.Field, bldr array.Builder, val any) er case *array.Uint8Builder: b.Append(uint8(val.(int16))) case *array.Uint16Builder: - b.Append(uint16(val.(int16))) + b.Append(uint16(val.(int32))) case *array.Uint32Builder: - b.Append(uint32(val.(int32))) + b.Append(uint32(val.(int64))) case *array.Uint64Builder: - b.Append(uint64(val.(int64))) + v := val.(pgtype.Numeric) + b.Append(v.Int.Uint64()) case *array.Float32Builder: b.Append(val.(float32)) case *array.Float64Builder: diff --git a/plugins/destination/postgresql/client/read_cockroach.go b/plugins/destination/postgresql/client/read_cockroach.go index cbe55828b742d0..8799e307361b76 100644 --- a/plugins/destination/postgresql/client/read_cockroach.go +++ b/plugins/destination/postgresql/client/read_cockroach.go @@ -8,6 +8,7 @@ import ( "time" "github.com/goccy/go-json" + "github.com/jackc/pgx/v5/pgtype" "github.com/apache/arrow/go/v13/arrow" "github.com/apache/arrow/go/v13/arrow/array" @@ -30,17 +31,18 @@ func (c *Client) reverseTransformCockroach(f arrow.Field, bldr array.Builder, va case *array.Int16Builder: b.Append(val.(int16)) case *array.Int32Builder: - b.Append(int32(val.(int64))) + b.Append(val.(int32)) case *array.Int64Builder: b.Append(val.(int64)) case *array.Uint8Builder: b.Append(uint8(val.(int16))) case *array.Uint16Builder: - b.Append(uint16(val.(int16))) + b.Append(uint16(val.(int32))) case *array.Uint32Builder: b.Append(uint32(val.(int64))) case *array.Uint64Builder: - b.Append(uint64(val.(int64))) + v := val.(pgtype.Numeric) + b.Append(v.Int.Uint64()) case *array.Float32Builder: b.Append(val.(float32)) case *array.Float64Builder: diff --git a/plugins/destination/postgresql/client/types_cockroach.go b/plugins/destination/postgresql/client/types_cockroach.go index 9f8876e0b089df..de20756bd6feb6 100644 --- a/plugins/destination/postgresql/client/types_cockroach.go +++ b/plugins/destination/postgresql/client/types_cockroach.go @@ -16,12 +16,14 @@ func (c *Client) SchemaTypeToCockroach(t arrow.DataType) string { return c.SchemaTypeToCockroach(v.Elem()) + fmt.Sprintf("[%d]", v.Len()) case *arrow.BooleanType: return "boolean" - case *arrow.Int8Type, *arrow.Uint8Type, *arrow.Int16Type, *arrow.Uint16Type: + case *arrow.Int8Type, *arrow.Uint8Type, *arrow.Int16Type: return "smallint" - case *arrow.Int32Type, *arrow.Uint32Type: - return "integer" - case *arrow.Int64Type, *arrow.Uint64Type: + case *arrow.Uint16Type, *arrow.Int32Type: + return "int4" + case *arrow.Uint32Type, *arrow.Int64Type: return "bigint" + case *arrow.Uint64Type: + return "numeric" case *arrow.Float32Type: return "real" case *arrow.Float64Type: @@ -56,10 +58,16 @@ func (c *Client) CockroachToSchemaType(t string) arrow.DataType { switch t { case "boolean": return arrow.FixedWidthTypes.Boolean - case "bigint", "int", "oid", "serial", "integer": + case "smallint": + return arrow.PrimitiveTypes.Int16 + case "int4": + return arrow.PrimitiveTypes.Int32 + case "bigint", "int", "oid", "serial", "integer", "int8": return arrow.PrimitiveTypes.Int64 case "decimal", "float", "real", "double precision": return arrow.PrimitiveTypes.Float64 + case "numeric": + return arrow.PrimitiveTypes.Uint64 case "uuid": return types.ExtensionTypes.UUID case "bytea": diff --git a/plugins/destination/postgresql/client/types_pg.go b/plugins/destination/postgresql/client/types_pg.go index 5dc6951f822f8c..2e8069134c1851 100644 --- a/plugins/destination/postgresql/client/types_pg.go +++ b/plugins/destination/postgresql/client/types_pg.go @@ -36,12 +36,14 @@ func (c *Client) SchemaTypeToPg10(t arrow.DataType) string { return "boolean" case *arrow.Int8Type, *arrow.Uint8Type: return "smallint" - case *arrow.Int16Type, *arrow.Uint16Type: + case *arrow.Int16Type: return "smallint" - case *arrow.Int32Type, *arrow.Uint32Type: + case *arrow.Uint16Type, *arrow.Int32Type: return "integer" - case *arrow.Int64Type, *arrow.Uint64Type: + case *arrow.Uint32Type, *arrow.Int64Type: return "bigint" + case *arrow.Uint64Type: + return "numeric" case *arrow.Float32Type: return "real" case *arrow.Float64Type: @@ -86,6 +88,8 @@ func (c *Client) Pg10ToSchemaType(t string) arrow.DataType { return arrow.PrimitiveTypes.Int32 case "bigint": return arrow.PrimitiveTypes.Int64 + case "numeric": + return arrow.PrimitiveTypes.Uint64 case "real": return arrow.PrimitiveTypes.Float32 case "double precision": From 0dc5fb12d707c1fcb9f6143ba65639991dd70798 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Wed, 17 May 2023 11:44:40 +0300 Subject: [PATCH 14/18] fix lint --- plugins/destination/postgresql/client/read.go | 1 + plugins/destination/postgresql/client/read_cockroach.go | 1 + 2 files changed, 2 insertions(+) diff --git a/plugins/destination/postgresql/client/read.go b/plugins/destination/postgresql/client/read.go index 60c68531e3d66d..0f9e195494aa3f 100644 --- a/plugins/destination/postgresql/client/read.go +++ b/plugins/destination/postgresql/client/read.go @@ -25,6 +25,7 @@ const ( readSQL = "SELECT %s FROM %s WHERE _cq_source_name = $1 order by _cq_sync_time asc" ) +// nolint: dupl func (c *Client) reverseTransform(f arrow.Field, bldr array.Builder, val any) error { if val == nil { bldr.AppendNull() diff --git a/plugins/destination/postgresql/client/read_cockroach.go b/plugins/destination/postgresql/client/read_cockroach.go index 8799e307361b76..9f1bed6af4385c 100644 --- a/plugins/destination/postgresql/client/read_cockroach.go +++ b/plugins/destination/postgresql/client/read_cockroach.go @@ -16,6 +16,7 @@ import ( "github.com/google/uuid" ) +// nolint:dupl func (c *Client) reverseTransformCockroach(f arrow.Field, bldr array.Builder, val any) error { if val == nil { bldr.AppendNull() From c6dd3058aae4d5b7d0363741ed03e41dd2e9f7af Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Wed, 17 May 2023 12:17:41 +0300 Subject: [PATCH 15/18] fix cockroach --- plugins/destination/postgresql/client/read_cockroach.go | 4 ++-- plugins/destination/postgresql/client/types_cockroach.go | 8 ++------ 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/plugins/destination/postgresql/client/read_cockroach.go b/plugins/destination/postgresql/client/read_cockroach.go index 9f1bed6af4385c..2af4feea58235d 100644 --- a/plugins/destination/postgresql/client/read_cockroach.go +++ b/plugins/destination/postgresql/client/read_cockroach.go @@ -32,13 +32,13 @@ func (c *Client) reverseTransformCockroach(f arrow.Field, bldr array.Builder, va case *array.Int16Builder: b.Append(val.(int16)) case *array.Int32Builder: - b.Append(val.(int32)) + b.Append(int32(val.(int64))) case *array.Int64Builder: b.Append(val.(int64)) case *array.Uint8Builder: b.Append(uint8(val.(int16))) case *array.Uint16Builder: - b.Append(uint16(val.(int32))) + b.Append(uint16(val.(int64))) case *array.Uint32Builder: b.Append(uint32(val.(int64))) case *array.Uint64Builder: diff --git a/plugins/destination/postgresql/client/types_cockroach.go b/plugins/destination/postgresql/client/types_cockroach.go index de20756bd6feb6..ccbeecb44b81ab 100644 --- a/plugins/destination/postgresql/client/types_cockroach.go +++ b/plugins/destination/postgresql/client/types_cockroach.go @@ -18,9 +18,7 @@ func (c *Client) SchemaTypeToCockroach(t arrow.DataType) string { return "boolean" case *arrow.Int8Type, *arrow.Uint8Type, *arrow.Int16Type: return "smallint" - case *arrow.Uint16Type, *arrow.Int32Type: - return "int4" - case *arrow.Uint32Type, *arrow.Int64Type: + case *arrow.Uint16Type, *arrow.Int32Type, *arrow.Uint32Type, *arrow.Int64Type: return "bigint" case *arrow.Uint64Type: return "numeric" @@ -60,9 +58,7 @@ func (c *Client) CockroachToSchemaType(t string) arrow.DataType { return arrow.FixedWidthTypes.Boolean case "smallint": return arrow.PrimitiveTypes.Int16 - case "int4": - return arrow.PrimitiveTypes.Int32 - case "bigint", "int", "oid", "serial", "integer", "int8": + case "int4", "bigint", "int", "oid", "serial", "integer", "int8", "int64": return arrow.PrimitiveTypes.Int64 case "decimal", "float", "real", "double precision": return arrow.PrimitiveTypes.Float64 From cc407875915cb54426209682e8b197d6a893231a Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Wed, 17 May 2023 12:23:28 +0300 Subject: [PATCH 16/18] fix uint --- plugins/destination/postgresql/client/transformer.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/plugins/destination/postgresql/client/transformer.go b/plugins/destination/postgresql/client/transformer.go index cfb68fc41d01dd..d19bdced827f72 100644 --- a/plugins/destination/postgresql/client/transformer.go +++ b/plugins/destination/postgresql/client/transformer.go @@ -91,20 +91,17 @@ func transformArr(arr arrow.Array) []any { Valid: a.IsValid(i), } case *array.Uint16: - pgArr[i] = pgtype.Int2{ - Int16: int16(a.Value(i)), - Valid: a.IsValid(i), - } - case *array.Uint32: pgArr[i] = pgtype.Int4{ Int32: int32(a.Value(i)), Valid: a.IsValid(i), } - case *array.Uint64: + case *array.Uint32: pgArr[i] = pgtype.Int8{ Int64: int64(a.Value(i)), Valid: a.IsValid(i), } + case *array.Uint64: + pgArr[i] = a.Value(i) case *array.Float32: pgArr[i] = pgtype.Float4{ Float32: a.Value(i), From 6271e02f46d3bcb92f407c6b025793c4a2ecadda Mon Sep 17 00:00:00 2001 From: Yevgeny Pats Date: Wed, 17 May 2023 12:27:16 +0300 Subject: [PATCH 17/18] Update plugins/destination/postgresql/client/migrate.go Co-authored-by: Herman Schaaf --- plugins/destination/postgresql/client/migrate.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/plugins/destination/postgresql/client/migrate.go b/plugins/destination/postgresql/client/migrate.go index 2a72b2f07f952a..035be71fee3f01 100644 --- a/plugins/destination/postgresql/client/migrate.go +++ b/plugins/destination/postgresql/client/migrate.go @@ -138,9 +138,8 @@ func (c *Client) normalizeTable(table *schema.Table, pgTable *schema.Table) *sch normalizedTable := schema.Table{ Name: table.Name, } - for _, f := range table.Columns { - col := f - if c.enabledPks() && f.PrimaryKey { + for _, col := range table.Columns { + if c.enabledPks() && col.PrimaryKey { col.NotNull = true } else { col.PrimaryKey = false From 48d0e5916837a60a6a9a25302221c2227a25896e Mon Sep 17 00:00:00 2001 From: Yevgeny Pats Date: Wed, 17 May 2023 13:01:12 +0300 Subject: [PATCH 18/18] Update plugins/destination/postgresql/client/migrate.go Co-authored-by: Alex Shcherbakov --- plugins/destination/postgresql/client/migrate.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/destination/postgresql/client/migrate.go b/plugins/destination/postgresql/client/migrate.go index 035be71fee3f01..a13713d8a0437f 100644 --- a/plugins/destination/postgresql/client/migrate.go +++ b/plugins/destination/postgresql/client/migrate.go @@ -196,11 +196,11 @@ func (*Client) canAutoMigrate(changes []schema.TableColumnChange) bool { func (c *Client) normalizeTables(tables schema.Tables, pgTables schema.Tables) schema.Tables { var result schema.Tables for _, table := range tables { - pgTabe := pgTables.Get(table.Name) - if pgTabe == nil { + pgTable := pgTables.Get(table.Name) + if pgTable == nil { result = append(result, table) } else { - result = append(result, c.normalizeTable(table, pgTabe)) + result = append(result, c.normalizeTable(table, pgTable)) } } return result