Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fine tune fixes
  • Loading branch information
yevgenypats committed Dec 18, 2022
commit 2bc4540386a63a662829d96b96cad9e47021c21c
22 changes: 4 additions & 18 deletions plugins/destination/postgresql/client/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,16 @@ func (c *Client) Write(ctx context.Context, tables schema.Tables, res <-chan *pl
} else {
sql = c.upsert(table)
}

if batch.Len() == c.batchSize - 1 {
r.Data[0] = 1
}

queuedQuery := batch.Queue(sql, r.Data...)
queuedQuery.Exec(func(ct pgconn.CommandTag) error {
panic("what is this?")
return nil
})
batch.Queue(sql, r.Data...)
if batch.Len() >= c.batchSize {
br := c.conn.SendBatch(ctx, batch)
// _, err := br.Exec()
// if err != nil {
// panic(err)
// }
if err := br.Close(); err != nil {
var pgErr *pgconn.PgError
if !errors.As(err, &pgErr) {
// not recoverable error
return fmt.Errorf("failed to execute batch: %w", err)
}
atomic.AddUint64(&c.metrics.Errors, 1)
c.logger.Error().Err(pgErr).Str("table", pgErr.TableName).Msg("failed to execute batch with pgerror")
return fmt.Errorf("failed to execute batch with pgerror on table %s: %w", pgErr.TableName, err)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this stops the sync (correct me if I'm wrong), maybe add a call to action?
Try skipping this table via 'skip_tables: ["<table_name>"]' and re-running the sync command

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think no need, better we get the report and fix it asap. This is basically equivalent of panic, it shouldn't happen mostly as validation should happen in our type system.

}
atomic.AddUint64(&c.metrics.Writes, uint64(c.batchSize))
batch = &pgx.Batch{}
Expand All @@ -64,10 +50,10 @@ func (c *Client) Write(ctx context.Context, tables schema.Tables, res <-chan *pl
if err := br.Close(); err != nil {
var pgErr *pgconn.PgError
if !errors.As(err, &pgErr) {
// no recoverable error
// not recoverable error
return fmt.Errorf("failed to execute batch: %w", err)
}
c.logger.Error().Err(pgErr).Str("table", pgErr.TableName).Msg("failed to execute batch with pgerror")
return fmt.Errorf("failed to execute batch with pgerror on table %s: %w", pgErr.TableName, err)
}
atomic.AddUint64(&c.metrics.Writes, uint64(c.batchSize))
}
Expand Down
8 changes: 1 addition & 7 deletions plugins/destination/postgresql/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.19

require (
github.com/cloudquery/plugin-sdk v1.12.5
github.com/jackc/pgconn v1.13.0
github.com/jackc/pgx-zerolog v0.0.0-20220923130014-7856b90a65ae
github.com/jackc/pgx/v5 v5.2.0
github.com/rs/zerolog v1.28.0
)
Expand All @@ -17,13 +17,8 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware/providers/zerolog/v2 v2.0.0-rc.3 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.3 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.1 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/pgx-zerolog v0.0.0-20220923130014-7856b90a65ae // indirect
github.com/jackc/puddle v1.3.0 // indirect
github.com/jackc/puddle/v2 v2.1.2 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
Expand All @@ -37,7 +32,6 @@ require (
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.2.0 // indirect
golang.org/x/text v0.4.0 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/genproto v0.0.0-20221111202108-142d8a6fa32e // indirect
google.golang.org/grpc v1.51.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
Expand Down
Loading