From 85306ba4adf5f07cca1e287f8d54ee7dfdb69813 Mon Sep 17 00:00:00 2001 From: erezrokah Date: Tue, 24 Jun 2025 18:06:47 +0100 Subject: [PATCH 1/5] test: Add test that simulates multiple concurrent syncs --- .github/workflows/dest_clickhouse.yml | 31 ++----- .../clickhouse/client/client_test.go | 83 +++++++++++++++++++ .../clickhouse/docker-compose.yaml | 28 ++++++- 3 files changed, 117 insertions(+), 25 deletions(-) diff --git a/.github/workflows/dest_clickhouse.yml b/.github/workflows/dest_clickhouse.yml index bfda7ec7ca1b8e..cdab597ed523e8 100644 --- a/.github/workflows/dest_clickhouse.yml +++ b/.github/workflows/dest_clickhouse.yml @@ -21,31 +21,10 @@ jobs: name: 'plugins/destination/clickhouse' runs-on: ubuntu-latest timeout-minutes: 30 - env: - DB_USER: cq - DB_PASSWORD: test - DB_NAME: cloudquery defaults: run: working-directory: plugins/destination/clickhouse - services: - clickhouse: - image: clickhouse/clickhouse-server:24.8.1 - env: - CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1 - CLICKHOUSE_PASSWORD: ${{ env.DB_PASSWORD }} - CLICKHOUSE_USER: ${{ env.DB_USER }} - CLICKHOUSE_DB: ${{ env.DB_NAME }} - ports: - - 8123:8123 - - 9000:9000 - options: >- - --ulimit nofile=262144:262144 - --health-cmd "wget --no-verbose --tries=1 --spider http://localhost:8123/ping || exit 1" - --health-interval 60s - --health-timeout 30s - --health-start-period 20s - --health-retries 5 + steps: - uses: actions/checkout@v4 with: @@ -70,9 +49,13 @@ jobs: run: test "$(git status -s | wc -l)" -eq 0 || (git status -s; exit 1) - name: Build run: go build . + + - uses: hoverkraft-tech/compose-action@ad8e0e414a8244c238d7071359bbf04d1e50cd79 + with: + compose-file: docker-compose.yaml + up-flags: "--wait" + cwd: plugins/destination/clickhouse - name: Test ClickHouse - env: - CQ_DEST_CH_TEST_CONN: 'clickhouse://${{ env.DB_USER }}:${{ env.DB_PASSWORD }}@localhost:9000/${{ env.DB_NAME }}' run: make test deploy: diff --git a/plugins/destination/clickhouse/client/client_test.go b/plugins/destination/clickhouse/client/client_test.go index bd1c17f4a616c1..486e39e39a8273 100644 --- a/plugins/destination/clickhouse/client/client_test.go +++ b/plugins/destination/clickhouse/client/client_test.go @@ -5,6 +5,7 @@ import ( "fmt" "net/url" "os" + "strconv" "testing" "time" @@ -21,6 +22,7 @@ import ( "github.com/goccy/go-json" "github.com/google/uuid" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" ) func getTestConnection() string { @@ -228,3 +230,84 @@ func TestMigrateNewArrayAndMapColumns(t *testing.T) { t.Fatal(fmt.Errorf("failed to insert record: %w", err)) } } + +func TestConcurrentSyncsSameTable(t *testing.T) { + const syncConcurrency = 200 + ctx := context.Background() + group, _ := errgroup.WithContext(ctx) + randomUUIDString := uuid.New().String() + tableName := "k8s_core_namespaces_" + randomUUIDString + table := &schema.Table{ + Name: tableName, + Columns: []schema.Column{ + schema.CqIDColumn, + schema.CqSourceNameColumn, + schema.CqSyncTimeColumn, + }, + } + // Create the table + p := plugin.NewPlugin("clickhouse", + internalPlugin.Version, + New, + plugin.WithJSONSchema(spec.JSONSchema), + ) + s := &spec.Spec{ConnectionString: getTestConnection()} + b, err := json.Marshal(s) + require.NoError(t, err) + err = p.Init(ctx, b, plugin.NewClientOptions{}) + require.NoError(t, err) + if err := p.WriteAll(ctx, []message.WriteMessage{&message.WriteMigrateTable{Table: table}}); err != nil { + t.Fatal(fmt.Errorf("failed to create table: %w", err)) + } + + for i := range syncConcurrency { + group.Go(func() error { + // Simulate a sync job against the same table + syncContext := context.Background() + p := plugin.NewPlugin("clickhouse", + internalPlugin.Version, + New, + plugin.WithJSONSchema(spec.JSONSchema), + ) + s := &spec.Spec{ConnectionString: getTestConnection()} + b, err := json.Marshal(s) + require.NoError(t, err) + err = p.Init(syncContext, b, plugin.NewClientOptions{}) + require.NoError(t, err) + if err := p.WriteAll(syncContext, []message.WriteMessage{&message.WriteMigrateTable{Table: table}}); err != nil { + t.Fatal(fmt.Errorf("failed to create table: %w", err)) + } + + jobIndexAsString := strconv.Itoa(i) + randomUUIDStringWithLastCharacterReplaced := randomUUIDString[:len(randomUUIDString)-len(jobIndexAsString)] + jobIndexAsString + bldr := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema()) + bldr.Field(0).(*sdkTypes.UUIDBuilder).Append(uuid.MustParse(randomUUIDStringWithLastCharacterReplaced)) + bldr.Field(1).(*array.StringBuilder).Append("source") + bldr.Field(2).(*array.TimestampBuilder).Append(arrow.Timestamp(time.Now().UnixMicro())) + record := bldr.NewRecord() + + if err := p.WriteAll(syncContext, []message.WriteMessage{&message.WriteInsert{ + Record: record, + }}); err != nil { + t.Fatal(fmt.Errorf("failed to insert record: %w", err)) + } + return nil + }) + } + + require.NoError(t, group.Wait()) + + ch := make(chan arrow.Record) + go func() { + defer close(ch) + err = p.Read(ctx, table, ch) + }() + + numRows := 0 + for record := range ch { + numRows += int(record.NumRows()) + } + + require.Equal(t, syncConcurrency, numRows) + require.NoError(t, err) +} diff --git a/plugins/destination/clickhouse/docker-compose.yaml b/plugins/destination/clickhouse/docker-compose.yaml index 1896c0524f6316..ee691ec0602e1c 100644 --- a/plugins/destination/clickhouse/docker-compose.yaml +++ b/plugins/destination/clickhouse/docker-compose.yaml @@ -1,6 +1,10 @@ services: clickhouse: - image: clickhouse/clickhouse-server:22.1.2 + image: clickhouse/clickhouse-server:24.8.1 + ulimits: + nofile: + soft: 262144 + hard: 262144 environment: CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1 CLICKHOUSE_PASSWORD: test @@ -13,8 +17,30 @@ services: published: 9000 networks: - clickhouse + configs: + - source: clickhouse.xml + target: /etc/clickhouse-server/config.d/custom_settings.xml + healthcheck: + test: + [ + "CMD", + "wget", + "--no-verbose", + "--tries=1", + "--spider", + "http://localhost:8123/ping", + ] + interval: 10s + timeout: 5s + retries: 5 networks: clickhouse: name: clickhouse driver: bridge +configs: + clickhouse.xml: + content: | + + 100 + \ No newline at end of file From e74c95da76bca5b6ad9c33e737b1f7f7a54d711a Mon Sep 17 00:00:00 2001 From: erezrokah Date: Thu, 26 Jun 2025 18:32:39 +0100 Subject: [PATCH 2/5] fix: Better partition and sort key changes handling --- .../destination/clickhouse/client/migrate.go | 140 +++++++++++------- plugins/destination/clickhouse/go.mod | 1 + plugins/destination/clickhouse/go.sum | 2 + 3 files changed, 88 insertions(+), 55 deletions(-) diff --git a/plugins/destination/clickhouse/client/migrate.go b/plugins/destination/clickhouse/client/migrate.go index bf242cf8db1536..4b4af904c50146 100644 --- a/plugins/destination/clickhouse/client/migrate.go +++ b/plugins/destination/clickhouse/client/migrate.go @@ -2,7 +2,6 @@ package client import ( "context" - "errors" "fmt" "slices" "strings" @@ -10,12 +9,17 @@ import ( "github.com/cloudquery/cloudquery/plugins/destination/clickhouse/v7/client/spec" "github.com/cloudquery/cloudquery/plugins/destination/clickhouse/v7/queries" "github.com/cloudquery/cloudquery/plugins/destination/clickhouse/v7/typeconv" - "github.com/cloudquery/cloudquery/plugins/destination/clickhouse/v7/util" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/schema" + "github.com/samber/lo" "golang.org/x/sync/errgroup" ) +type tableChanges struct { + forcedMigrationNeeded bool + changes []schema.TableColumnChange +} + // MigrateTables relies on the CLI/client to lock before running migration. func (c *Client) MigrateTables(ctx context.Context, messages message.WriteMigrateTables) error { have, err := c.getTableDefinitions(ctx, messages) @@ -28,68 +32,103 @@ func (c *Client) MigrateTables(ctx context.Context, messages message.WriteMigrat return err } - if err := c.checkForced(ctx, have, want, messages); err != nil { + tablesWeCanForceMigrate := map[string]bool{} + for _, msg := range messages { + // last message takes precedence; we don't actually expect the same table to be + // in the same batch twice. + tablesWeCanForceMigrate[msg.Table.Name] = msg.MigrateForce + } + + allTablesChanges, err := c.allTablesChanges(ctx, want, have) + if err != nil { return err } + nonAutoMigratableTables := lo.Filter(lo.Keys(allTablesChanges), func(table string, _ int) bool { + return allTablesChanges[table].forcedMigrationNeeded && !tablesWeCanForceMigrate[table] + }) + if len(nonAutoMigratableTables) > 0 { + changes := lo.Map(nonAutoMigratableTables, func(table string, _ int) []schema.TableColumnChange { + return allTablesChanges[table].changes + }) + return fmt.Errorf("tables %s with changes %v require migration. Migrate manually or consider using 'migrate_mode: forced'", strings.Join(nonAutoMigratableTables, ","), changes) + } + const maxConcurrentMigrate = 10 eg, ctx := errgroup.WithContext(ctx) eg.SetLimit(maxConcurrentMigrate) for _, want := range want { - want := want eg.Go(func() (err error) { c.logger.Info().Str("table", want.Name).Msg("Migrating table started") defer func() { c.logger.Err(err).Str("table", want.Name).Msg("Migrating table done") }() if len(want.Columns) == 0 { - c.logger.Warn().Str("table", want.Name).Msg("Table with no columns, skip") + c.logger.Warn().Str("table", want.Name).Msg("Table with no columns, skipping") return nil } - have := have.Get(want.Name) - if have == nil { + tableName := want.Name + tableChanges := allTablesChanges[tableName] + if tableChanges.changes == nil { + c.logger.Info().Str("table", tableName).Msg("Table doesn't exist, creating") return c.createTable(ctx, want, c.spec.Partition, c.spec.OrderBy) } - return c.autoMigrate(ctx, have, want, c.spec.Partition, c.spec.OrderBy) + if tableChanges.forcedMigrationNeeded { + c.logger.Info().Str("table", tableName).Msg("Table exists, force migration required") + if err := c.dropTable(ctx, want); err != nil { + return err + } + return c.createTable(ctx, want, c.spec.Partition, c.spec.OrderBy) + } + + return c.autoMigrate(ctx, tableName, tableChanges.changes) }) } return eg.Wait() } -func (c *Client) checkForced(ctx context.Context, have, want schema.Tables, messages message.WriteMigrateTables) error { - forcedErr := false - for _, m := range messages { - if m.MigrateForce { +func (c *Client) allTablesChanges(ctx context.Context, want schema.Tables, have schema.Tables) (map[string]tableChanges, error) { + result := make(map[string]tableChanges) + for _, t := range want { + chTable := have.Get(t.Name) + if chTable == nil { + result[t.Name] = tableChanges{ + changes: nil, + forcedMigrationNeeded: false, + } continue } - - // check that this migration can go through - have := have.Get(m.Table.Name) - if have == nil { - continue // create new is always OK - } - want := want.Get(m.Table.Name) // and it should never be nil - if unsafe := unsafeChanges(want.GetChanges(have)); len(unsafe) > 0 { - c.logger.Error(). - Str("table", m.Table.Name). - Str("changes", util.ChangesPrettified(m.Table.Name, unsafe)). - Msg("migrate manually or consider using 'migrate_mode: forced'") - forcedErr = true + changes := t.GetChanges(chTable) + forcedMigrationNeeded, err := c.forceMigrationNeeded(ctx, t, changes) + if err != nil { + return nil, err } - if err := c.checkPartitionOrOrderByChanged(ctx, m.Table, c.spec.Partition, c.spec.OrderBy); err != nil { - c.logger.Error().Str("table", m.Table.Name).Msg(err.Error()) - forcedErr = true + result[t.Name] = tableChanges{ + changes: changes, + forcedMigrationNeeded: forcedMigrationNeeded, } } + return result, nil +} - if forcedErr { - return errors.New("migrate manually or consider using 'migrate_mode: forced'") +func (c *Client) forceMigrationNeeded(ctx context.Context, table *schema.Table, changes []schema.TableColumnChange) (bool, error) { + if unsafe := unsafeChanges(changes); len(unsafe) > 0 { + return true, nil } - return nil + + partitionKeyChange, sortingKeyChange, err := c.checkPartitionOrOrderByChanged(ctx, table, c.spec.Partition, c.spec.OrderBy) + if err != nil { + return false, fmt.Errorf("failed to check partition or order by changed: %w", err) + } + if partitionKeyChange != "" || sortingKeyChange != "" { + return true, nil + } + + return false, nil } func unsafeChanges(changes []schema.TableColumnChange) []schema.TableColumnChange { @@ -103,8 +142,6 @@ func unsafeChanges(changes []schema.TableColumnChange) []schema.TableColumnChang } func (c *Client) createTable(ctx context.Context, table *schema.Table, partition []spec.PartitionStrategy, orderBy []spec.OrderByStrategy) (err error) { - c.logger.Debug().Str("table", table.Name).Msg("Table doesn't exist, creating") - query, err := queries.CreateTable(table, c.spec.Cluster, c.spec.Engine, partition, orderBy) if err != nil { return err @@ -117,7 +154,7 @@ func (c *Client) createTable(ctx context.Context, table *schema.Table, partition } func (c *Client) dropTable(ctx context.Context, table *schema.Table) error { - c.logger.Debug().Str("table", table.Name).Msg("Dropping table") + c.logger.Info().Str("table", table.Name).Msg("Dropping table") return c.conn.Exec(ctx, queries.DropTable(table, c.spec.Cluster)) } @@ -143,27 +180,16 @@ func needsTableDrop(change schema.TableColumnChange) bool { return true } -func (c *Client) autoMigrate(ctx context.Context, have, want *schema.Table, partition []spec.PartitionStrategy, orderBy []spec.OrderByStrategy) error { - changes := want.GetChanges(have) - - if unsafe := unsafeChanges(changes); len(unsafe) > 0 || c.checkPartitionOrOrderByChanged(ctx, want, c.spec.Partition, c.spec.OrderBy) != nil { - // we can get here only with migrate_mode: forced - if err := c.dropTable(ctx, have); err != nil { - return err - } - - return c.createTable(ctx, want, partition, orderBy) - } - +func (c *Client) autoMigrate(ctx context.Context, tableName string, changes []schema.TableColumnChange) error { for _, change := range changes { // we only handle new columns if change.Type != schema.TableColumnChangeTypeAdd { continue } - c.logger.Debug().Str("table", want.Name).Str("column", change.Current.Name).Msg("Adding new column") + c.logger.Info().Str("table", tableName).Str("column", change.Current.Name).Msg("Adding new column") - query, err := queries.AddColumn(want.Name, c.spec.Cluster, change.Current) + query, err := queries.AddColumn(tableName, c.spec.Cluster, change.Current) if err != nil { return err } @@ -177,15 +203,15 @@ func (c *Client) autoMigrate(ctx context.Context, have, want *schema.Table, part return nil } -func (c *Client) checkPartitionOrOrderByChanged(ctx context.Context, table *schema.Table, partition []spec.PartitionStrategy, orderBy []spec.OrderByStrategy) error { +func (c *Client) checkPartitionOrOrderByChanged(ctx context.Context, table *schema.Table, partition []spec.PartitionStrategy, orderBy []spec.OrderByStrategy) (partitionKeyChange, sortingKeyChange string, err error) { resolvedOrderBy, err := queries.ResolveOrderBy(table, orderBy) if err != nil { - return err + return "", "", err } resolvedPartitionBy, err := queries.ResolvePartitionBy(table, partition) if err != nil { - return err + return "", "", err } splitPartitionBy := []string{} @@ -205,18 +231,22 @@ func (c *Client) checkPartitionOrOrderByChanged(ctx context.Context, table *sche havePartitionKey, haveSortingKey, err := c.getPartitionKeyAndSortingKey(ctx, table) if err != nil { - return err + return "", "", err } + partitionKeyChange = "" if !slices.Equal(havePartitionKey, wantPartitionKey) { - return fmt.Errorf("partition key changed (was [%s] and would become [%s]), please drop the table manually", strings.Join(havePartitionKey, ","), strings.Join(wantPartitionKey, ",")) + partitionKeyChange = fmt.Sprintf("partition key changed (was [%s] and would become [%s])", strings.Join(havePartitionKey, ","), strings.Join(wantPartitionKey, ",")) + c.logger.Info().Str("table", table.Name).Msg(partitionKeyChange) } + sortingKeyChange = "" if !slices.Equal(haveSortingKey, wantSortingKey) { - return fmt.Errorf("sorting key changed (was [%s] and would become [%s]), please drop the table manually", strings.Join(haveSortingKey, ","), strings.Join(wantSortingKey, ",")) + sortingKeyChange = fmt.Sprintf("sorting key changed (was [%s] and would become [%s])", strings.Join(haveSortingKey, ","), strings.Join(wantSortingKey, ",")) + c.logger.Info().Str("table", table.Name).Msg(sortingKeyChange) } - return nil + return partitionKeyChange, sortingKeyChange, nil } func dequote(s string) string { diff --git a/plugins/destination/clickhouse/go.mod b/plugins/destination/clickhouse/go.mod index 847a40e1884643..4465680d36993d 100644 --- a/plugins/destination/clickhouse/go.mod +++ b/plugins/destination/clickhouse/go.mod @@ -10,6 +10,7 @@ require ( github.com/goccy/go-json v0.10.5 github.com/google/uuid v1.6.0 github.com/rs/zerolog v1.34.0 + github.com/samber/lo v1.51.0 github.com/stretchr/testify v1.10.0 golang.org/x/sync v0.15.0 ) diff --git a/plugins/destination/clickhouse/go.sum b/plugins/destination/clickhouse/go.sum index ed2e2f16bff960..a04b1d4a8798e3 100644 --- a/plugins/destination/clickhouse/go.sum +++ b/plugins/destination/clickhouse/go.sum @@ -169,6 +169,8 @@ github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6 github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/samber/lo v1.49.1 h1:4BIFyVfuQSEpluc7Fua+j1NolZHiEHEpaSEKdsH0tew= github.com/samber/lo v1.49.1/go.mod h1:dO6KHFzUKXgP8LDhU0oI8d2hekjXnGOu0DB8Jecxd6o= +github.com/samber/lo v1.51.0 h1:kysRYLbHy/MB7kQZf5DSN50JHmMsNEdeY24VzJFu7wI= +github.com/samber/lo v1.51.0/go.mod h1:4+MXEGsJzbKGaUEQFKBq2xtfuznW9oz/WrgyzMzRoM0= github.com/santhosh-tekuri/jsonschema/v6 v6.0.2 h1:KRzFb2m7YtdldCEkzs6KqmJw4nqEVZGK7IN2kJkjTuQ= github.com/santhosh-tekuri/jsonschema/v6 v6.0.2/go.mod h1:JXeL+ps8p7/KNMjDQk3TCwPpBy0wYklyWTfbkIzdIFU= github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= From 10badf537076d43c8408862773f1357cbb04d886 Mon Sep 17 00:00:00 2001 From: erezrokah Date: Tue, 1 Jul 2025 18:54:45 +0100 Subject: [PATCH 3/5] fix: Retry ClickHouse operations --- .../clickhouse/client/client_test.go | 4 +- .../destination/clickhouse/client/delete.go | 4 +- .../destination/clickhouse/client/migrate.go | 8 +- plugins/destination/clickhouse/client/read.go | 22 +-- .../clickhouse/client/retry_helpers.go | 130 ++++++++++++++++++ .../destination/clickhouse/client/table.go | 21 +-- .../destination/clickhouse/client/write.go | 14 +- plugins/destination/clickhouse/go.mod | 1 + plugins/destination/clickhouse/go.sum | 4 +- 9 files changed, 147 insertions(+), 61 deletions(-) create mode 100644 plugins/destination/clickhouse/client/retry_helpers.go diff --git a/plugins/destination/clickhouse/client/client_test.go b/plugins/destination/clickhouse/client/client_test.go index 486e39e39a8273..30b4ef2a74b13a 100644 --- a/plugins/destination/clickhouse/client/client_test.go +++ b/plugins/destination/clickhouse/client/client_test.go @@ -21,6 +21,7 @@ import ( sdkTypes "github.com/cloudquery/plugin-sdk/v4/types" "github.com/goccy/go-json" "github.com/google/uuid" + "github.com/rs/zerolog" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) @@ -232,7 +233,7 @@ func TestMigrateNewArrayAndMapColumns(t *testing.T) { } func TestConcurrentSyncsSameTable(t *testing.T) { - const syncConcurrency = 200 + const syncConcurrency = 2000 ctx := context.Background() group, _ := errgroup.WithContext(ctx) randomUUIDString := uuid.New().String() @@ -269,6 +270,7 @@ func TestConcurrentSyncsSameTable(t *testing.T) { New, plugin.WithJSONSchema(spec.JSONSchema), ) + p.SetLogger(zerolog.New(zerolog.NewTestWriter(t)).Level(zerolog.WarnLevel)) s := &spec.Spec{ConnectionString: getTestConnection()} b, err := json.Marshal(s) require.NoError(t, err) diff --git a/plugins/destination/clickhouse/client/delete.go b/plugins/destination/clickhouse/client/delete.go index 4596723d3c09f6..d47c2fb38e40b7 100644 --- a/plugins/destination/clickhouse/client/delete.go +++ b/plugins/destination/clickhouse/client/delete.go @@ -18,7 +18,7 @@ func (c *Client) DeleteStale(ctx context.Context, messages message.WriteDeleteSt } for _, msg := range messages { - if err := c.conn.Exec(ctx, generateDeleteForDeleteStale(msg), msg.SourceName, msg.SyncTime); err != nil { + if err := retryExec(ctx, c.logger, c.conn, generateDeleteForDeleteStale(msg), msg.SourceName, msg.SyncTime); err != nil { return err } } @@ -50,7 +50,7 @@ func (c *Client) DeleteRecord(ctx context.Context, messages message.WriteDeleteR return err } - if err = c.conn.Exec(ctx, sql, params...); err != nil { + if err = retryExec(ctx, c.logger, c.conn, sql, params...); err != nil { return err } } diff --git a/plugins/destination/clickhouse/client/migrate.go b/plugins/destination/clickhouse/client/migrate.go index 4b4af904c50146..576a67972e07b7 100644 --- a/plugins/destination/clickhouse/client/migrate.go +++ b/plugins/destination/clickhouse/client/migrate.go @@ -22,7 +22,7 @@ type tableChanges struct { // MigrateTables relies on the CLI/client to lock before running migration. func (c *Client) MigrateTables(ctx context.Context, messages message.WriteMigrateTables) error { - have, err := c.getTableDefinitions(ctx, messages) + have, err := retryGetTableDefinitions(ctx, c.logger, c.database, c.conn, messages) if err != nil { return err } @@ -147,7 +147,7 @@ func (c *Client) createTable(ctx context.Context, table *schema.Table, partition return err } - if err := c.conn.Exec(ctx, query); err != nil { + if err := retryExec(ctx, c.logger, c.conn, query); err != nil { return fmt.Errorf("failed to create table, query:\n%s\nerror: %w", query, err) } return nil @@ -156,7 +156,7 @@ func (c *Client) createTable(ctx context.Context, table *schema.Table, partition func (c *Client) dropTable(ctx context.Context, table *schema.Table) error { c.logger.Info().Str("table", table.Name).Msg("Dropping table") - return c.conn.Exec(ctx, queries.DropTable(table, c.spec.Cluster)) + return retryExec(ctx, c.logger, c.conn, queries.DropTable(table, c.spec.Cluster)) } func needsTableDrop(change schema.TableColumnChange) bool { @@ -194,7 +194,7 @@ func (c *Client) autoMigrate(ctx context.Context, tableName string, changes []sc return err } - err = c.conn.Exec(ctx, query) + err = retryExec(ctx, c.logger, c.conn, query) if err != nil { return err } diff --git a/plugins/destination/clickhouse/client/read.go b/plugins/destination/clickhouse/client/read.go index 99a15f8a9a9267..06eaf7e98bf693 100644 --- a/plugins/destination/clickhouse/client/read.go +++ b/plugins/destination/clickhouse/client/read.go @@ -6,33 +6,15 @@ import ( "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "github.com/apache/arrow-go/v18/arrow" - "github.com/apache/arrow-go/v18/arrow/array" - "github.com/apache/arrow-go/v18/arrow/memory" - "github.com/cloudquery/cloudquery/plugins/destination/clickhouse/v7/queries" - "github.com/cloudquery/cloudquery/plugins/destination/clickhouse/v7/typeconv/arrow/values" "github.com/cloudquery/plugin-sdk/v4/schema" ) func (c *Client) Read(ctx context.Context, table *schema.Table, res chan<- arrow.Record) error { - rows, err := c.conn.Query(ctx, queries.Read(table)) + record, err := retryRead(ctx, c.logger, c.conn, table) if err != nil { return err } - defer rows.Close() - - row := rowArr(rows.ColumnTypes()) - builder := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema()) - for rows.Next() { - if err = rows.Scan(row...); err != nil { - return err - } - - if err = values.AppendToRecordBuilder(builder, row); err != nil { - return err - } - } - - res <- builder.NewRecord() + res <- record return nil } diff --git a/plugins/destination/clickhouse/client/retry_helpers.go b/plugins/destination/clickhouse/client/retry_helpers.go new file mode 100644 index 00000000000000..70f5b24d08faf1 --- /dev/null +++ b/plugins/destination/clickhouse/client/retry_helpers.go @@ -0,0 +1,130 @@ +package client + +import ( + "context" + "strings" + "time" + + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + retry "github.com/avast/retry-go/v4" + "github.com/cloudquery/cloudquery/plugins/destination/clickhouse/v7/queries" + arrowvalues "github.com/cloudquery/cloudquery/plugins/destination/clickhouse/v7/typeconv/arrow/values" + chvalues "github.com/cloudquery/cloudquery/plugins/destination/clickhouse/v7/typeconv/ch/values" + "github.com/cloudquery/plugin-sdk/v4/message" + "github.com/cloudquery/plugin-sdk/v4/schema" + "github.com/rs/zerolog" +) + +func getRetryOptions(logger zerolog.Logger, query string) []retry.Option { + onRetryFunc := func(logger zerolog.Logger, query string) func(n uint, err error) { + return func(attempt uint, err error) { + logger.Warn().Err(err).Uint("attempt", attempt).Msgf("Retrying query %s", query) + } + } + + commonRetryOptions := []retry.Option{ + retry.Attempts(10), + retry.Delay(1 * time.Second), + retry.MaxJitter(500 * time.Millisecond), + retry.LastErrorOnly(true), + retry.RetryIf(func(err error) bool { + return strings.Contains(err.Error(), "Too many simultaneous queries") + }), + } + return append(commonRetryOptions, retry.OnRetry(onRetryFunc(logger, query))) +} + +func retryQueryRowAndScan(ctx context.Context, logger zerolog.Logger, conn clickhouse.Conn, query string, args []any, dest []any) error { + err := retry.Do( + func() error { + return conn.QueryRow(ctx, query, args...).Scan(dest...) + }, + getRetryOptions(logger, query)..., + ) + + return err +} + +func retryExec(ctx context.Context, logger zerolog.Logger, conn clickhouse.Conn, query string, args ...any) error { + err := retry.Do( + func() error { + return conn.Exec(ctx, query, args...) + }, + getRetryOptions(logger, query)..., + ) + return err +} + +func retryBatchSend(ctx context.Context, logger zerolog.Logger, conn clickhouse.Conn, table *schema.Table, records []arrow.Record) error { + err := retry.Do( + func() error { + batch, err := conn.PrepareBatch(ctx, queries.Insert(table)) + if err != nil { + return err + } + + if err := chvalues.BatchAddRecords(ctx, batch, table.ToArrowSchema(), records); err != nil { + _ = batch.Abort() + return err + } + + return batch.Send() + }, + getRetryOptions(logger, "batch.Send()")..., + ) + return err +} + +func retryGetTableDefinitions(ctx context.Context, logger zerolog.Logger, database string, conn clickhouse.Conn, messages message.WriteMigrateTables) (schema.Tables, error) { + schemas, err := retry.DoWithData( + func() (schema.Tables, error) { + const flattenNested0 = "SET flatten_nested = 0" + if err := conn.Exec(ctx, flattenNested0); err != nil { + return nil, err + } + + query, params := queries.GetTablesSchema(database) + rows, err := conn.Query(ctx, query, params...) + if err != nil { + return nil, err + } + defer rows.Close() + + return queries.ScanTableSchemas(rows, messages) + }, + getRetryOptions(logger, "getTableDefinitions")..., + ) + + return schemas, err +} + +func retryRead(ctx context.Context, logger zerolog.Logger, conn clickhouse.Conn, table *schema.Table) (arrow.Record, error) { + record, err := retry.DoWithData( + func() (arrow.Record, error) { + rows, err := conn.Query(ctx, queries.Read(table)) + if err != nil { + return nil, err + } + defer rows.Close() + + row := rowArr(rows.ColumnTypes()) + builder := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema()) + for rows.Next() { + if err = rows.Scan(row...); err != nil { + return nil, err + } + + if err = arrowvalues.AppendToRecordBuilder(builder, row); err != nil { + return nil, err + } + } + + return builder.NewRecord(), nil + }, + getRetryOptions(logger, "read")..., + ) + return record, err +} diff --git a/plugins/destination/clickhouse/client/table.go b/plugins/destination/clickhouse/client/table.go index e3496453a2646b..4759583642fa53 100644 --- a/plugins/destination/clickhouse/client/table.go +++ b/plugins/destination/clickhouse/client/table.go @@ -5,31 +5,14 @@ import ( "strings" "github.com/cloudquery/cloudquery/plugins/destination/clickhouse/v7/queries" - "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/schema" ) -func (c *Client) getTableDefinitions(ctx context.Context, messages message.WriteMigrateTables) (schema.Tables, error) { - // need proper description without flattened columns - const flattenNested0 = "SET flatten_nested = 0" - if err := c.conn.Exec(ctx, flattenNested0); err != nil { - return nil, err - } - - query, params := queries.GetTablesSchema(c.database) - rows, err := c.conn.Query(ctx, query, params...) - if err != nil { - return nil, err - } - defer rows.Close() - - return queries.ScanTableSchemas(rows, messages) -} - func (c *Client) getPartitionKeyAndSortingKey(ctx context.Context, table *schema.Table) ([]string, []string, error) { sql := queries.GetPartitionKeyAndSortingKeyQuery(c.database, table.Name) var partitionKey, sortingKey string - err := c.conn.QueryRow(ctx, sql).Scan(&partitionKey, &sortingKey) + + err := retryQueryRowAndScan(ctx, c.logger, c.conn, sql, []any{}, []any{&partitionKey, &sortingKey}) if err != nil { return nil, nil, err } diff --git a/plugins/destination/clickhouse/client/write.go b/plugins/destination/clickhouse/client/write.go index 9484d5c72bd9f2..ad904633dca1de 100644 --- a/plugins/destination/clickhouse/client/write.go +++ b/plugins/destination/clickhouse/client/write.go @@ -4,8 +4,6 @@ import ( "context" "github.com/apache/arrow-go/v18/arrow" - "github.com/cloudquery/cloudquery/plugins/destination/clickhouse/v7/queries" - "github.com/cloudquery/cloudquery/plugins/destination/clickhouse/v7/typeconv/ch/values" "github.com/cloudquery/plugin-sdk/v4/message" ) @@ -27,15 +25,5 @@ func (c *Client) WriteTableBatch(ctx context.Context, _ string, messages message records[i] = m.Record } - batch, err := c.conn.PrepareBatch(ctx, queries.Insert(table)) - if err != nil { - return err - } - - if err := values.BatchAddRecords(ctx, batch, table.ToArrowSchema(), records); err != nil { - _ = batch.Abort() - return err - } - - return batch.Send() + return retryBatchSend(ctx, c.logger, c.conn, table, records) } diff --git a/plugins/destination/clickhouse/go.mod b/plugins/destination/clickhouse/go.mod index 4465680d36993d..3f19e546186f50 100644 --- a/plugins/destination/clickhouse/go.mod +++ b/plugins/destination/clickhouse/go.mod @@ -5,6 +5,7 @@ go 1.24.4 require ( github.com/ClickHouse/clickhouse-go/v2 v2.34.0 github.com/apache/arrow-go/v18 v18.3.1 + github.com/avast/retry-go/v4 v4.6.1 github.com/cloudquery/codegen v0.3.29 github.com/cloudquery/plugin-sdk/v4 v4.86.1 github.com/goccy/go-json v0.10.5 diff --git a/plugins/destination/clickhouse/go.sum b/plugins/destination/clickhouse/go.sum index a04b1d4a8798e3..d64f7a8863997e 100644 --- a/plugins/destination/clickhouse/go.sum +++ b/plugins/destination/clickhouse/go.sum @@ -15,6 +15,8 @@ github.com/apache/thrift v0.21.0 h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE= github.com/apache/thrift v0.21.0/go.mod h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw= github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ= github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk= +github.com/avast/retry-go/v4 v4.6.1 h1:VkOLRubHdisGrHnTu89g08aQEWEgRU7LVEop3GbIcMk= +github.com/avast/retry-go/v4 v4.6.1/go.mod h1:V6oF8njAwxJ5gRo1Q7Cxab24xs5NCWZBeaHHBklR8mA= github.com/aws/aws-sdk-go-v2 v1.36.5 h1:0OF9RiEMEdDdZEMqF9MRjevyxAQcf6gY+E7vwBILFj0= github.com/aws/aws-sdk-go-v2 v1.36.5/go.mod h1:EYrzvCCN9CMUTa5+6lf6MM4tq3Zjp8UhSGR/cBsjai0= github.com/aws/aws-sdk-go-v2/config v1.29.17 h1:jSuiQ5jEe4SAMH6lLRMY9OVC+TqJLP5655pBGjmnjr0= @@ -167,8 +169,6 @@ github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY= github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= -github.com/samber/lo v1.49.1 h1:4BIFyVfuQSEpluc7Fua+j1NolZHiEHEpaSEKdsH0tew= -github.com/samber/lo v1.49.1/go.mod h1:dO6KHFzUKXgP8LDhU0oI8d2hekjXnGOu0DB8Jecxd6o= github.com/samber/lo v1.51.0 h1:kysRYLbHy/MB7kQZf5DSN50JHmMsNEdeY24VzJFu7wI= github.com/samber/lo v1.51.0/go.mod h1:4+MXEGsJzbKGaUEQFKBq2xtfuznW9oz/WrgyzMzRoM0= github.com/santhosh-tekuri/jsonschema/v6 v6.0.2 h1:KRzFb2m7YtdldCEkzs6KqmJw4nqEVZGK7IN2kJkjTuQ= From 4e7fe474068af950e659b44ff7ff966b506a267c Mon Sep 17 00:00:00 2001 From: erezrokah Date: Tue, 1 Jul 2025 19:25:43 +0100 Subject: [PATCH 4/5] chore: Verbose tests --- plugins/destination/clickhouse/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/destination/clickhouse/Makefile b/plugins/destination/clickhouse/Makefile index acfd32ecd70417..28339f619d8a59 100644 --- a/plugins/destination/clickhouse/Makefile +++ b/plugins/destination/clickhouse/Makefile @@ -2,7 +2,7 @@ test: # we clean the cache to avoid scenarios when we change something in the db and we want to retest without noticing nothing run go clean -testcache - go test -race -timeout 3m ./... + go test -v -race -timeout 3m ./... .PHONY: lint lint: From 98540cf29ed773f309b23d4f981d16e6e7102ec2 Mon Sep 17 00:00:00 2001 From: Erez Rokah Date: Tue, 1 Jul 2025 19:31:03 +0100 Subject: [PATCH 5/5] Tweak retries --- plugins/destination/clickhouse/client/retry_helpers.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/destination/clickhouse/client/retry_helpers.go b/plugins/destination/clickhouse/client/retry_helpers.go index 70f5b24d08faf1..8858226f2df4e3 100644 --- a/plugins/destination/clickhouse/client/retry_helpers.go +++ b/plugins/destination/clickhouse/client/retry_helpers.go @@ -26,9 +26,9 @@ func getRetryOptions(logger zerolog.Logger, query string) []retry.Option { } commonRetryOptions := []retry.Option{ - retry.Attempts(10), - retry.Delay(1 * time.Second), - retry.MaxJitter(500 * time.Millisecond), + retry.Attempts(5), + retry.Delay(3 * time.Second), + retry.MaxJitter(1 * time.Second), retry.LastErrorOnly(true), retry.RetryIf(func(err error) bool { return strings.Contains(err.Error(), "Too many simultaneous queries")