From b21e9878f1cc27746decea80d985bb4ec1f2c95f Mon Sep 17 00:00:00 2001 From: bbernays Date: Thu, 25 Jan 2024 15:52:50 -0600 Subject: [PATCH 1/8] Update scheduler.go --- scheduler/scheduler.go | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index c6911edddb..3fbe8861a4 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -192,24 +192,8 @@ func (s *Scheduler) Sync(ctx context.Context, client schema.ClientMeta, tables s // var table *schema.Table table := tableOriginal.Copy(nil) if syncClient.deterministicCQID { - // No PK adjustment should occur if `_cq_id` is not present in the table - cqIDCol := table.Columns.Get(schema.CqIDColumn.Name) - if cqIDCol == nil { - continue - } - for i, c := range table.Columns { - if c.Name == schema.CqIDColumn.Name { - // Ensure that the cq_id column is the primary key - table.Columns[i].PrimaryKey = true - continue - } - if !c.PrimaryKey { - continue - } - table.Columns[i].PrimaryKey = false - } + table = schema.CqIDAsPK(table) } - res <- &message.SyncMigrateTable{ Table: table, } From 4042daecc0c16ed35406301ed6ad483ce93fd44c Mon Sep 17 00:00:00 2001 From: bbernays Date: Thu, 25 Jan 2024 15:52:52 -0600 Subject: [PATCH 2/8] Update table.go --- schema/table.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/schema/table.go b/schema/table.go index 6338868379..feeed2ec40 100644 --- a/schema/table.go +++ b/schema/table.go @@ -577,6 +577,26 @@ func (t *Table) TableNames() []string { return ret } +func (t *Table) CqIDAsPK() *Table { + table := t.Copy(nil) + cqIDCol := table.Columns.Get(CqIDColumn.Name) + if cqIDCol == nil { + return table + } + for i, c := range table.Columns { + if c.Name == CqIDColumn.Name { + // Ensure that the cq_id column is the primary key + table.Columns[i].PrimaryKey = true + continue + } + if !c.PrimaryKey { + continue + } + table.Columns[i].PrimaryKey = false + } + return table +} + func (t *Table) Copy(parent *Table) *Table { c := *t c.Parent = parent From a6380fdfcac5468543004977bda9cd4a7356f63b Mon Sep 17 00:00:00 2001 From: bbernays Date: Thu, 25 Jan 2024 15:57:11 -0600 Subject: [PATCH 3/8] Update table.go --- schema/table.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/schema/table.go b/schema/table.go index feeed2ec40..95ac743b2b 100644 --- a/schema/table.go +++ b/schema/table.go @@ -577,7 +577,7 @@ func (t *Table) TableNames() []string { return ret } -func (t *Table) CqIDAsPK() *Table { +func CqIDAsPK(t *Table) *Table { table := t.Copy(nil) cqIDCol := table.Columns.Get(CqIDColumn.Name) if cqIDCol == nil { From 20e5c6ca8725ab362f0574949929a61a262c2f1c Mon Sep 17 00:00:00 2001 From: bbernays Date: Thu, 25 Jan 2024 16:07:08 -0600 Subject: [PATCH 4/8] refactor --- scheduler/scheduler.go | 11 +++++------ schema/table.go | 42 ++++++++++++++++++++++-------------------- 2 files changed, 27 insertions(+), 26 deletions(-) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 3fbe8861a4..aed205b433 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -189,14 +189,13 @@ func (s *Scheduler) Sync(ctx context.Context, client schema.ClientMeta, tables s // send migrate messages first for _, tableOriginal := range tables.FlattenTables() { - // var table *schema.Table - table := tableOriginal.Copy(nil) - if syncClient.deterministicCQID { - table = schema.CqIDAsPK(table) + migrateMessage := &message.SyncMigrateTable{ + Table: tableOriginal, } - res <- &message.SyncMigrateTable{ - Table: table, + if syncClient.deterministicCQID { + migrateMessage.Table = schema.CqIDAsPK(tableOriginal.Copy(nil)) } + res <- migrateMessage } resources := make(chan *schema.Resource) diff --git a/schema/table.go b/schema/table.go index 95ac743b2b..f4dbcecdb3 100644 --- a/schema/table.go +++ b/schema/table.go @@ -124,6 +124,28 @@ func AddCqIDs(table *Table) { } } +// CqIDAsPK sets the cq_id column as primary key if it exists +// and removes the primary key from all other columns +func CqIDAsPK(t *Table) *Table { + table := t.Copy(nil) + cqIDCol := table.Columns.Get(CqIDColumn.Name) + if cqIDCol == nil { + return table + } + for i, c := range table.Columns { + if c.Name == CqIDColumn.Name { + // Ensure that the cq_id column is the primary key + table.Columns[i].PrimaryKey = true + continue + } + if !c.PrimaryKey { + continue + } + table.Columns[i].PrimaryKey = false + } + return table +} + func NewTablesFromArrowSchemas(schemas []*arrow.Schema) (Tables, error) { tables := make(Tables, len(schemas)) for i, schema := range schemas { @@ -577,26 +599,6 @@ func (t *Table) TableNames() []string { return ret } -func CqIDAsPK(t *Table) *Table { - table := t.Copy(nil) - cqIDCol := table.Columns.Get(CqIDColumn.Name) - if cqIDCol == nil { - return table - } - for i, c := range table.Columns { - if c.Name == CqIDColumn.Name { - // Ensure that the cq_id column is the primary key - table.Columns[i].PrimaryKey = true - continue - } - if !c.PrimaryKey { - continue - } - table.Columns[i].PrimaryKey = false - } - return table -} - func (t *Table) Copy(parent *Table) *Table { c := *t c.Parent = parent From 268efc8781977c90f5f9de40bdea33f1f5df8fce Mon Sep 17 00:00:00 2001 From: bbernays Date: Thu, 25 Jan 2024 16:25:41 -0600 Subject: [PATCH 5/8] Update scheduler.go --- scheduler/scheduler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index aed205b433..30c496be2c 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -190,10 +190,10 @@ func (s *Scheduler) Sync(ctx context.Context, client schema.ClientMeta, tables s // send migrate messages first for _, tableOriginal := range tables.FlattenTables() { migrateMessage := &message.SyncMigrateTable{ - Table: tableOriginal, + Table: tableOriginal.Copy(nil), } if syncClient.deterministicCQID { - migrateMessage.Table = schema.CqIDAsPK(tableOriginal.Copy(nil)) + schema.CqIDAsPK(migrateMessage.Table) } res <- migrateMessage } From 30f6befd18144948bdccf4349639287ac68e6962 Mon Sep 17 00:00:00 2001 From: bbernays Date: Thu, 25 Jan 2024 16:25:45 -0600 Subject: [PATCH 6/8] Update scheduler_test.go --- scheduler/scheduler_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go index 31b3e1d8db..7d3d79c19a 100644 --- a/scheduler/scheduler_test.go +++ b/scheduler/scheduler_test.go @@ -45,7 +45,7 @@ func testColumnResolverPanic(context.Context, schema.ClientMeta, *schema.Resourc func testTableSuccessWithData(data []any) *schema.Table { return &schema.Table{ - Name: "test_table_success", + Name: "test_table_success_with_data", Resolver: func(_ context.Context, _ schema.ClientMeta, _ *schema.Resource, res chan<- any) error { res <- data return nil @@ -74,7 +74,7 @@ func testTableSuccess() *schema.Table { func testTableSuccessWithPK() *schema.Table { return &schema.Table{ - Name: "test_table_success", + Name: "test_table_success_pk", Resolver: testResolverSuccess, Columns: []schema.Column{ { @@ -88,7 +88,7 @@ func testTableSuccessWithPK() *schema.Table { func testTableSuccessWithCQIDPK() *schema.Table { return &schema.Table{ - Name: "test_table_success", + Name: "test_table_success_cq_id", Resolver: testResolverSuccess, Columns: []schema.Column{ schema.CqIDColumn, From f0660de7cb3293d0ac3ef3d40b3d160b447f520e Mon Sep 17 00:00:00 2001 From: bbernays Date: Thu, 25 Jan 2024 16:25:49 -0600 Subject: [PATCH 7/8] Update table.go --- schema/table.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/schema/table.go b/schema/table.go index f4dbcecdb3..ab058d7219 100644 --- a/schema/table.go +++ b/schema/table.go @@ -126,24 +126,22 @@ func AddCqIDs(table *Table) { // CqIDAsPK sets the cq_id column as primary key if it exists // and removes the primary key from all other columns -func CqIDAsPK(t *Table) *Table { - table := t.Copy(nil) - cqIDCol := table.Columns.Get(CqIDColumn.Name) +func CqIDAsPK(t *Table) { + cqIDCol := t.Columns.Get(CqIDColumn.Name) if cqIDCol == nil { - return table + return } - for i, c := range table.Columns { + for i, c := range t.Columns { if c.Name == CqIDColumn.Name { // Ensure that the cq_id column is the primary key - table.Columns[i].PrimaryKey = true + t.Columns[i].PrimaryKey = true continue } if !c.PrimaryKey { continue } - table.Columns[i].PrimaryKey = false + t.Columns[i].PrimaryKey = false } - return table } func NewTablesFromArrowSchemas(schemas []*arrow.Schema) (Tables, error) { From 595e4105e203587cda455f46e46547a6cae197d6 Mon Sep 17 00:00:00 2001 From: bbernays Date: Thu, 25 Jan 2024 16:32:42 -0600 Subject: [PATCH 8/8] Update scheduler_test.go --- scheduler/scheduler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go index 7d3d79c19a..254c6b7d6f 100644 --- a/scheduler/scheduler_test.go +++ b/scheduler/scheduler_test.go @@ -411,7 +411,7 @@ func testSyncTable(t *testing.T, tc syncTestCase, strategy Strategy, determinist initialTable := tables.Get(v.Table.Name) pks := migratedTable.PrimaryKeys() - if deterministicCQID { + if deterministicCQID && initialTable.Columns.Get(schema.CqIDColumn.Name) != nil { if len(pks) != 1 { t.Fatalf("expected 1 pk. got %d", len(pks)) }