diff --git a/plugins/destination/postgresql/client/write.go b/plugins/destination/postgresql/client/write.go index bd2aa027e3dc26..8bc07e7e243c7f 100644 --- a/plugins/destination/postgresql/client/write.go +++ b/plugins/destination/postgresql/client/write.go @@ -2,7 +2,6 @@ package client import ( "context" - "errors" "fmt" "strings" "sync/atomic" @@ -10,7 +9,6 @@ import ( "github.com/cloudquery/plugin-sdk/plugins" "github.com/cloudquery/plugin-sdk/schema" "github.com/cloudquery/plugin-sdk/specs" - "github.com/jackc/pgconn" "github.com/jackc/pgtype" "github.com/jackc/pgx/v4" ) @@ -21,9 +19,35 @@ var cqStatusToPgStatus = map[schema.Status]pgtype.Status{ schema.Present: pgtype.Present, } +// Postgres best performance for CQ per our benchmark is achieved with batch that includes different tables +// given we can't predict how many objects will be in each table. +// This is why if we encounter an error we can't know which object/table caused and related the error to the object +// we we need to find the error and exit/stop the sync. +// Also, it is not advice to split the batch into different tables (for postgresql) as +// this will mean increased number of goroutines and will mean more connections which PostgreSQL is not great with. +// Also, I've benchmarked CopyFrom and batch and seems there is no significant difference in CQ case. +// maybe in the future we can provide a flag to switch batching mechanism. +func (c *Client) findErrorInBatch(ctx context.Context, items []*batchItem) error { + for _, item := range items { + _, err := c.conn.Exec(ctx, item.sql, item.arguments...) + if err != nil { + return fmt.Errorf("failed to insert into table %s with sql %s: %w", item.table, item.sql, err) + } + } + return nil +} + +type batchItem struct { + table string + sql string + arguments []interface{} +} + func (c *Client) Write(ctx context.Context, tables schema.Tables, res <-chan *plugins.ClientResource) error { var sql string batch := &pgx.Batch{} + batchItems := make([]*batchItem, 0, c.batchSize) + // resources := for r := range res { table := tables.Get(r.TableName) @@ -35,35 +59,38 @@ func (c *Client) Write(ctx context.Context, tables schema.Tables, res <-chan *pl } else { sql = c.upsert(table) } - + batchItems = append(batchItems, &batchItem{ + table: r.TableName, + sql: sql, + arguments: r.Data, + }) batch.Queue(sql, r.Data...) if batch.Len() >= c.batchSize { br := c.conn.SendBatch(ctx, batch) - if err := br.Close(); err != nil { - var pgErr *pgconn.PgError - if !errors.As(err, &pgErr) { - // not recoverable error - return fmt.Errorf("failed to execute batch: %w", err) + if batchErr := br.Close(); batchErr != nil { + if err := c.findErrorInBatch(ctx, batchItems); err != nil { + return err } - atomic.AddUint64(&c.metrics.Errors, 1) - c.logger.Error().Err(pgErr).Str("table", pgErr.TableName).Msg("failed to execute batch with pgerror") + return fmt.Errorf("failed to execute batch and was unable to pinpoint table: %w", batchErr) } atomic.AddUint64(&c.metrics.Writes, uint64(c.batchSize)) batch = &pgx.Batch{} + batchItems = make([]*batchItem, 0, c.batchSize) } } if batch.Len() > 0 { br := c.conn.SendBatch(ctx, batch) if err := br.Close(); err != nil { - var pgErr *pgconn.PgError - if !errors.As(err, &pgErr) { - // no recoverable error - return fmt.Errorf("failed to execute batch: %w", err) + if batchErr := br.Close(); batchErr != nil { + if err := c.findErrorInBatch(ctx, batchItems); err != nil { + return err + } + // this should never happen + return fmt.Errorf("failed to execute batch and was unable to pinpoint table: %w", batchErr) } - c.logger.Error().Err(pgErr).Str("table", pgErr.TableName).Msg("failed to execute batch with pgerror") } - atomic.AddUint64(&c.metrics.Writes, uint64(c.batchSize)) + atomic.AddUint64(&c.metrics.Writes, uint64(batch.Len())) } return nil