diff --git a/plugins/destination/postgresql/client/insert.go b/plugins/destination/postgresql/client/insert.go index 7b00128a1a5ec9..e69d641f5e9104 100644 --- a/plugins/destination/postgresql/client/insert.go +++ b/plugins/destination/postgresql/client/insert.go @@ -20,16 +20,7 @@ func (c *Client) InsertBatch(ctx context.Context, messages message.WriteInserts) return err } - include := make([]string, len(tables)) - for i, table := range tables { - include[i] = table.Name - } - var exclude []string - pgTables, err := c.listTables(ctx, include, exclude) - if err != nil { - return err - } - tables = c.normalizeTables(tables, pgTables) + tables = c.normalizeTables(tables, tables) if err != nil { return err } diff --git a/plugins/destination/postgresql/client/migrate.go b/plugins/destination/postgresql/client/migrate.go index 8dc95ae2dd9c10..85112a30993db7 100644 --- a/plugins/destination/postgresql/client/migrate.go +++ b/plugins/destination/postgresql/client/migrate.go @@ -94,6 +94,9 @@ func (c *Client) normalizeTable(table *schema.Table, pgTable *schema.Table) *sch if pgTable != nil && pgTable.PkConstraintName != "" { normalizedTable.PkConstraintName = pgTable.PkConstraintName + } else { + // TODO: Check if this is OK + normalizedTable.PkConstraintName = table.Name + "_cqpk" } return &normalizedTable diff --git a/plugins/destination/postgresql/client/spec.go b/plugins/destination/postgresql/client/spec.go index 932a2d10dd6c4e..d7ed9e56b68880 100644 --- a/plugins/destination/postgresql/client/spec.go +++ b/plugins/destination/postgresql/client/spec.go @@ -8,8 +8,8 @@ import ( const ( defaultBatchSize = 10000 - defaultBatchSizeBytes = 1000000 - defaultBatchTimeout = 10 * time.Second + defaultBatchSizeBytes = 100000000 + defaultBatchTimeout = 60 * time.Second ) type Spec struct {