-
Notifications
You must be signed in to change notification settings - Fork 544
fix: Don't list Postgres tables during insert #13323
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,7 +25,7 @@ func (c *Client) MigrateTableBatch(ctx context.Context, messages message.WriteMi | |
| if err != nil { | ||
| return fmt.Errorf("failed listing postgres tables: %w", err) | ||
| } | ||
| tables = c.normalizeTables(tables, pgTables) | ||
| tables = c.normalizeTables(tables) | ||
|
|
||
| safeTables := map[string]bool{} | ||
| for _, msg := range messages { | ||
|
|
@@ -80,7 +80,7 @@ func (c *Client) MigrateTableBatch(ctx context.Context, messages message.WriteMi | |
| return nil | ||
| } | ||
|
|
||
| func (c *Client) normalizeTable(table *schema.Table, pgTable *schema.Table) *schema.Table { | ||
| func (c *Client) normalizeTable(table *schema.Table) *schema.Table { | ||
| normalizedTable := schema.Table{ | ||
| Name: table.Name, | ||
| } | ||
|
|
@@ -90,10 +90,8 @@ func (c *Client) normalizeTable(table *schema.Table, pgTable *schema.Table) *sch | |
| } | ||
| col.Type = c.PgToSchemaType(c.SchemaTypeToPg(col.Type)) | ||
| normalizedTable.Columns = append(normalizedTable.Columns, col) | ||
| } | ||
|
|
||
| if pgTable != nil && pgTable.PkConstraintName != "" { | ||
| normalizedTable.PkConstraintName = pgTable.PkConstraintName | ||
| // pgTablesToPKConstraints is populated when handling migrate messages | ||
| normalizedTable.PkConstraintName = c.pgTablesToPKConstraints[table.Name] | ||
| } | ||
|
|
||
| return &normalizedTable | ||
|
|
@@ -142,10 +140,10 @@ func (*Client) canAutoMigrate(changes []schema.TableColumnChange) bool { | |
| } | ||
|
|
||
| // normalize the requested schema to be compatible with what Postgres supports | ||
| func (c *Client) normalizeTables(tables schema.Tables, pgTables schema.Tables) schema.Tables { | ||
| func (c *Client) normalizeTables(tables schema.Tables) schema.Tables { | ||
| result := make(schema.Tables, len(tables)) | ||
| for i, table := range tables { | ||
| result[i] = c.normalizeTable(table, pgTables.Get(table.Name)) | ||
| result[i] = c.normalizeTable(table) | ||
| } | ||
| return result | ||
| } | ||
|
|
@@ -189,10 +187,10 @@ func (c *Client) addColumn(ctx context.Context, tableName string, column schema. | |
|
|
||
| func (c *Client) createTableIfNotExist(ctx context.Context, table *schema.Table) error { | ||
| var sb strings.Builder | ||
| tName := table.Name | ||
| tableName := pgx.Identifier{tName}.Sanitize() | ||
| tableName := table.Name | ||
| sanitizedTableName := pgx.Identifier{tableName}.Sanitize() | ||
| sb.WriteString("CREATE TABLE IF NOT EXISTS ") | ||
| sb.WriteString(tableName) | ||
| sb.WriteString(sanitizedTableName) | ||
| sb.WriteString(" (") | ||
| totalColumns := len(table.Columns) | ||
|
|
||
|
|
@@ -216,19 +214,22 @@ func (c *Client) createTableIfNotExist(ctx context.Context, table *schema.Table) | |
| } | ||
| } | ||
|
|
||
| pkConstraintName := tableName + "_cqpk" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't it actually use the
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is inside
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. right, but what about |
||
| c.pgTablesToPKConstraints[tableName] = pkConstraintName | ||
|
|
||
| if len(primaryKeys) > 0 { | ||
| // add composite PK constraint on primary key columns | ||
| sb.WriteString(", CONSTRAINT ") | ||
| sb.WriteString(pgx.Identifier{tName + "_cqpk"}.Sanitize()) | ||
| sb.WriteString(pgx.Identifier{pkConstraintName}.Sanitize()) | ||
| sb.WriteString(" PRIMARY KEY (") | ||
| sb.WriteString(strings.Join(primaryKeys, ",")) | ||
| sb.WriteString(")") | ||
| } | ||
| sb.WriteString(")") | ||
| _, err := c.conn.Exec(ctx, sb.String()) | ||
| if err != nil { | ||
| c.logger.Error().Err(err).Str("table", tName).Str("query", sb.String()).Msg("Failed to create table") | ||
| return fmt.Errorf("failed to create table %s: %w"+sb.String(), tName, err) | ||
| c.logger.Error().Err(err).Str("table", tableName).Str("query", sb.String()).Msg("Failed to create table") | ||
| return fmt.Errorf("failed to create table %s: %w"+sb.String(), tableName, err) | ||
| } | ||
| return nil | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed these since I initially used
tableNamefor the dictionary key which is wrong since it had the sanitized value