Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 43 additions & 16 deletions plugins/destination/postgresql/client/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ package client

import (
"context"
"errors"
"fmt"
"strings"
"sync/atomic"

"github.com/cloudquery/plugin-sdk/plugins"
"github.com/cloudquery/plugin-sdk/schema"
"github.com/cloudquery/plugin-sdk/specs"
"github.com/jackc/pgconn"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
)
Expand All @@ -21,9 +19,35 @@ var cqStatusToPgStatus = map[schema.Status]pgtype.Status{
schema.Present: pgtype.Present,
}

// Postgres best performance for CQ per our benchmark is achieved with batch that includes different tables
// given we can't predict how many objects will be in each table.
// This is why if we encounter an error we can't know which object/table caused and related the error to the object
// we we need to find the error and exit/stop the sync.
// Also, it is not advice to split the batch into different tables (for postgresql) as
// this will mean increased number of goroutines and will mean more connections which PostgreSQL is not great with.
// Also, I've benchmarked CopyFrom and batch and seems there is no significant difference in CQ case.
// maybe in the future we can provide a flag to switch batching mechanism.
func (c *Client) findErrorInBatch(ctx context.Context, items []*batchItem) error {
for _, item := range items {
_, err := c.conn.Exec(ctx, item.sql, item.arguments...)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are queries guaranteed to behave the same the second time we execute them? I'm concerned this can generated unrelated errors

if err != nil {
return fmt.Errorf("failed to insert into table %s with sql %s: %w", item.table, item.sql, err)
}
}
return nil
}

type batchItem struct {
table string
sql string
arguments []interface{}
}

func (c *Client) Write(ctx context.Context, tables schema.Tables, res <-chan *plugins.ClientResource) error {
var sql string
batch := &pgx.Batch{}
batchItems := make([]*batchItem, 0, c.batchSize)
// resources :=

for r := range res {
table := tables.Get(r.TableName)
Expand All @@ -35,35 +59,38 @@ func (c *Client) Write(ctx context.Context, tables schema.Tables, res <-chan *pl
} else {
sql = c.upsert(table)
}

batchItems = append(batchItems, &batchItem{
table: r.TableName,
sql: sql,
arguments: r.Data,
})
batch.Queue(sql, r.Data...)
if batch.Len() >= c.batchSize {
br := c.conn.SendBatch(ctx, batch)
if err := br.Close(); err != nil {
var pgErr *pgconn.PgError
if !errors.As(err, &pgErr) {
// not recoverable error
return fmt.Errorf("failed to execute batch: %w", err)
if batchErr := br.Close(); batchErr != nil {
if err := c.findErrorInBatch(ctx, batchItems); err != nil {
return err
}
atomic.AddUint64(&c.metrics.Errors, 1)
c.logger.Error().Err(pgErr).Str("table", pgErr.TableName).Msg("failed to execute batch with pgerror")
return fmt.Errorf("failed to execute batch and was unable to pinpoint table: %w", batchErr)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this mean everything was successfully inserted? Maybe log it and not return an error

}
atomic.AddUint64(&c.metrics.Writes, uint64(c.batchSize))
batch = &pgx.Batch{}
batchItems = make([]*batchItem, 0, c.batchSize)
}
}

if batch.Len() > 0 {
br := c.conn.SendBatch(ctx, batch)
if err := br.Close(); err != nil {
var pgErr *pgconn.PgError
if !errors.As(err, &pgErr) {
// no recoverable error
return fmt.Errorf("failed to execute batch: %w", err)
if batchErr := br.Close(); batchErr != nil {
if err := c.findErrorInBatch(ctx, batchItems); err != nil {
return err
}
// this should never happen
return fmt.Errorf("failed to execute batch and was unable to pinpoint table: %w", batchErr)
}
c.logger.Error().Err(pgErr).Str("table", pgErr.TableName).Msg("failed to execute batch with pgerror")
}
atomic.AddUint64(&c.metrics.Writes, uint64(c.batchSize))
atomic.AddUint64(&c.metrics.Writes, uint64(batch.Len()))
}

return nil
Expand Down