Skip to content

Commit 4c65730

Browse files
authored
fix: Skip relations of incremental tables from delete stale (#18239)
#### Summary Fixes cloudquery/cloudquery-issues#1844 (internal issue). Relations of incremental tables gets cleared due to delete stale, we should exclude them I think. The fix works since we always sync parents from before children, so parents get excluded first. ~~Still needs to test this.~~ Blocked by cloudquery/plugin-sdk#1742 (I mean the code works, but the bug won't be fixed until source plugins upgrade to latest SDK)
1 parent 200480a commit 4c65730

1 file changed

Lines changed: 11 additions & 14 deletions

File tree

cli/cmd/sync_v3.go

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"sync/atomic"
1212
"time"
1313

14-
"github.com/apache/arrow/go/v16/arrow"
1514
"github.com/cloudquery/cloudquery-api-go/auth"
1615
"github.com/cloudquery/cloudquery/cli/internal/analytics"
1716
"github.com/cloudquery/cloudquery/cli/internal/api"
@@ -20,6 +19,7 @@ import (
2019
"github.com/cloudquery/plugin-pb-go/managedplugin"
2120
"github.com/cloudquery/plugin-pb-go/metrics"
2221
"github.com/cloudquery/plugin-pb-go/pb/plugin/v3"
22+
"github.com/cloudquery/plugin-sdk/v4/schema"
2323
"github.com/rs/zerolog/log"
2424
"github.com/schollz/progressbar/v3"
2525
"github.com/vnteamopen/godebouncer"
@@ -56,6 +56,7 @@ func getProgressAPIClient() (*cloudquery_api.ClientWithResponses, error) {
5656
func syncConnectionV3(ctx context.Context, source v3source, destinations []v3destination, backend *v3destination, uid string, noMigrate bool, summaryLocation string) (syncErr error) {
5757
var mt metrics.Metrics
5858
var exitReason = ExitReasonStopped
59+
skippedFromDeleteStale := make(map[string]bool, 0)
5960
tablesForDeleteStale := make(map[string]bool, 0)
6061

6162
sourceSpec := source.spec
@@ -340,10 +341,16 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des
340341
if err != nil {
341342
return err
342343
}
343-
tableName := tableNameFromSchema(sc)
344+
table, err := schema.NewTableFromArrowSchema(sc)
345+
if err != nil {
346+
return err
347+
}
344348

345-
if !isStateBackendEnabled || !tableIsIncremental(sc) {
346-
tablesForDeleteStale[tableName] = true
349+
// This works since we sync and send migrate messages for parents before children
350+
if isStateBackendEnabled && (table.IsIncremental || (table.Parent != nil && skippedFromDeleteStale[table.Parent.Name])) {
351+
skippedFromDeleteStale[table.Name] = true
352+
} else {
353+
tablesForDeleteStale[table.Name] = true
347354
}
348355
if noMigrate {
349356
continue
@@ -455,16 +462,6 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des
455462
return nil
456463
}
457464

458-
func tableNameFromSchema(sc *arrow.Schema) string {
459-
tableName, _ := sc.Metadata().GetValue("cq:table_name")
460-
return tableName
461-
}
462-
463-
func tableIsIncremental(sc *arrow.Schema) bool {
464-
inc, _ := sc.Metadata().GetValue("cq:extension:incremental")
465-
return inc == "true"
466-
}
467-
468465
func deleteStale(client plugin.Plugin_WriteClient, tables map[string]bool, sourceName string, syncTime time.Time) error {
469466
for tableName := range tables {
470467
if err := client.Send(&plugin.Write_Request{

0 commit comments

Comments
 (0)