From 575ae1620a93634f39a297e2c5872aca40063d4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 17 Apr 2026 13:05:54 +0300 Subject: [PATCH 01/30] feat(scheduler): add Storage interface for pluggable queue backends --- scheduler/storage/storage.go | 48 ++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 scheduler/storage/storage.go diff --git a/scheduler/storage/storage.go b/scheduler/storage/storage.go new file mode 100644 index 0000000000..260dba9460 --- /dev/null +++ b/scheduler/storage/storage.go @@ -0,0 +1,48 @@ +// Package storage defines the contract for backends that hold scheduler +// work state (work units and parent resources) during a sync. +// +// Backends store opaque bytes — the scheduler owns all serialization. +// See docs/superpowers/specs/2026-04-17-external-queue-scheduler-design.md +// for the full design. +package storage + +import ( + "context" + "errors" +) + +// ErrResourceNotFound is returned by GetResource / DecResourceRefcount when +// the ID is absent. Callers should treat this as a programming error +// (indicates a leaked reference or double-free) and fail the sync. +var ErrResourceNotFound = errors.New("resource not found") + +// SerializedWorkUnit is the on-the-wire representation of a scheduled unit of +// work. It holds references only — the concrete Table/Client/Parent are +// reconstituted in-process by the scheduler. +type SerializedWorkUnit struct { + TableName string // lookup key into the plugin's registered tables + ClientID string // lookup key into the plugin's initialized clients + ParentID string // empty if top-level; else ID in resource KV +} + +// Storage is the pluggable backend for scheduler work state. +type Storage interface { + PushWork(ctx context.Context, w SerializedWorkUnit) error + PushWorkBatch(ctx context.Context, ws []SerializedWorkUnit) error + // PopWork removes and returns a work unit. Returns (nil, nil) when empty; + // returns an error only on backend failure. Pop semantics are defined by + // the backend (random for in-memory, FIFO-ish for badger — callers must + // not assume an ordering beyond "work eventually drains"). + PopWork(ctx context.Context) (*SerializedWorkUnit, error) + WorkLen(ctx context.Context) (int, error) + + // PutResource inserts a resource blob with an initial refcount. + // refcount must be >= 1 (a resource with zero pins should never exist). + PutResource(ctx context.Context, id string, data []byte, refcount int) error + GetResource(ctx context.Context, id string) ([]byte, error) + // DecResourceRefcount decrements refcount by 1 and deletes when it + // reaches zero, atomically within a single backend operation. + DecResourceRefcount(ctx context.Context, id string) error + + Close(ctx context.Context) error +} From aa5a0bad5bca966682823f95f46e04eea4ad87af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 17 Apr 2026 13:10:46 +0300 Subject: [PATCH 02/30] feat(scheduler): add Storage contract test suite --- scheduler/storage/storagetest/contract.go | 176 ++++++++++++++++++++++ 1 file changed, 176 insertions(+) create mode 100644 scheduler/storage/storagetest/contract.go diff --git a/scheduler/storage/storagetest/contract.go b/scheduler/storage/storagetest/contract.go new file mode 100644 index 0000000000..6a318f6d50 --- /dev/null +++ b/scheduler/storage/storagetest/contract.go @@ -0,0 +1,176 @@ +// Package storagetest provides a contract test suite that every Storage +// backend must pass. Each backend's test file calls TestContract with a +// factory for a fresh instance. +package storagetest + +import ( + "context" + "errors" + "sync" + "testing" + + "github.com/cloudquery/plugin-sdk/v4/scheduler/storage" + "github.com/stretchr/testify/require" +) + +// TestContract runs every contract assertion against the Storage returned by +// newStorage. The factory must return an empty, independent instance on each +// call (contract tests mutate state). +func TestContract(t *testing.T, newStorage func(t *testing.T) storage.Storage) { + t.Helper() + t.Run("push_pop_roundtrip", func(t *testing.T) { testPushPopRoundtrip(t, newStorage(t)) }) + t.Run("pop_empty_returns_nil", func(t *testing.T) { testPopEmptyReturnsNil(t, newStorage(t)) }) + t.Run("push_batch", func(t *testing.T) { testPushBatch(t, newStorage(t)) }) + t.Run("work_len", func(t *testing.T) { testWorkLen(t, newStorage(t)) }) + t.Run("resource_put_get", func(t *testing.T) { testResourcePutGet(t, newStorage(t)) }) + t.Run("resource_refcount_delete_on_zero", func(t *testing.T) { testRefcountDeleteOnZero(t, newStorage(t)) }) + t.Run("resource_get_missing_errors", func(t *testing.T) { testGetMissingErrors(t, newStorage(t)) }) + t.Run("resource_dec_missing_errors", func(t *testing.T) { testDecMissingErrors(t, newStorage(t)) }) + t.Run("concurrent_push_pop_no_loss", func(t *testing.T) { testConcurrentPushPopNoLoss(t, newStorage(t)) }) + t.Run("concurrent_refcount_no_double_delete", func(t *testing.T) { testConcurrentRefcountNoDoubleDelete(t, newStorage(t)) }) + t.Run("close_is_idempotent", func(t *testing.T) { testCloseIsIdempotent(t, newStorage(t)) }) +} + +func testPushPopRoundtrip(t *testing.T, s storage.Storage) { + ctx := context.Background() + wu := storage.SerializedWorkUnit{TableName: "t1", ClientID: "c1", ParentID: "p1"} + require.NoError(t, s.PushWork(ctx, wu)) + + got, err := s.PopWork(ctx) + require.NoError(t, err) + require.NotNil(t, got) + require.Equal(t, wu, *got) + + // Second pop drains the queue. + got, err = s.PopWork(ctx) + require.NoError(t, err) + require.Nil(t, got) +} + +func testPopEmptyReturnsNil(t *testing.T, s storage.Storage) { + ctx := context.Background() + got, err := s.PopWork(ctx) + require.NoError(t, err) + require.Nil(t, got) +} + +func testPushBatch(t *testing.T, s storage.Storage) { + ctx := context.Background() + batch := []storage.SerializedWorkUnit{ + {TableName: "a"}, {TableName: "b"}, {TableName: "c"}, + } + require.NoError(t, s.PushWorkBatch(ctx, batch)) + + seen := map[string]bool{} + for i := 0; i < 3; i++ { + got, err := s.PopWork(ctx) + require.NoError(t, err) + require.NotNil(t, got) + seen[got.TableName] = true + } + require.Equal(t, map[string]bool{"a": true, "b": true, "c": true}, seen) +} + +func testWorkLen(t *testing.T, s storage.Storage) { + ctx := context.Background() + n, err := s.WorkLen(ctx) + require.NoError(t, err) + require.Equal(t, 0, n) + + require.NoError(t, s.PushWork(ctx, storage.SerializedWorkUnit{TableName: "t"})) + require.NoError(t, s.PushWork(ctx, storage.SerializedWorkUnit{TableName: "t"})) + + n, err = s.WorkLen(ctx) + require.NoError(t, err) + require.Equal(t, 2, n) +} + +func testResourcePutGet(t *testing.T, s storage.Storage) { + ctx := context.Background() + data := []byte("hello") + require.NoError(t, s.PutResource(ctx, "id-1", data, 1)) + + got, err := s.GetResource(ctx, "id-1") + require.NoError(t, err) + require.Equal(t, data, got) +} + +func testRefcountDeleteOnZero(t *testing.T, s storage.Storage) { + ctx := context.Background() + require.NoError(t, s.PutResource(ctx, "id-1", []byte("x"), 2)) + + // First dec: resource still exists. + require.NoError(t, s.DecResourceRefcount(ctx, "id-1")) + got, err := s.GetResource(ctx, "id-1") + require.NoError(t, err) + require.Equal(t, []byte("x"), got) + + // Second dec: resource deleted. + require.NoError(t, s.DecResourceRefcount(ctx, "id-1")) + _, err = s.GetResource(ctx, "id-1") + require.ErrorIs(t, err, storage.ErrResourceNotFound) +} + +func testGetMissingErrors(t *testing.T, s storage.Storage) { + ctx := context.Background() + _, err := s.GetResource(ctx, "missing") + require.ErrorIs(t, err, storage.ErrResourceNotFound) +} + +func testDecMissingErrors(t *testing.T, s storage.Storage) { + ctx := context.Background() + err := s.DecResourceRefcount(ctx, "missing") + require.ErrorIs(t, err, storage.ErrResourceNotFound) +} + +func testConcurrentPushPopNoLoss(t *testing.T, s storage.Storage) { + ctx := context.Background() + const n = 500 + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < n; i++ { + _ = s.PushWork(ctx, storage.SerializedWorkUnit{TableName: "t"}) + } + }() + + popped := 0 + for popped < n { + got, err := s.PopWork(ctx) + require.NoError(t, err) + if got != nil { + popped++ + } + } + wg.Wait() + + n2, err := s.WorkLen(ctx) + require.NoError(t, err) + require.Equal(t, 0, n2) +} + +func testConcurrentRefcountNoDoubleDelete(t *testing.T, s storage.Storage) { + ctx := context.Background() + const n = 100 + require.NoError(t, s.PutResource(ctx, "shared", []byte("x"), n)) + + var wg sync.WaitGroup + wg.Add(n) + for i := 0; i < n; i++ { + go func() { + defer wg.Done() + _ = s.DecResourceRefcount(ctx, "shared") + }() + } + wg.Wait() + + _, err := s.GetResource(ctx, "shared") + require.True(t, errors.Is(err, storage.ErrResourceNotFound), "resource should be deleted after all refs drained, got err=%v", err) +} + +func testCloseIsIdempotent(t *testing.T, s storage.Storage) { + ctx := context.Background() + require.NoError(t, s.Close(ctx)) + require.NoError(t, s.Close(ctx)) +} From 7b808f05e5ec14536f344714dfa558a24ffc7d3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 17 Apr 2026 13:15:24 +0300 Subject: [PATCH 03/30] =?UTF-8?q?test(scheduler/storage):=20harden=20contr?= =?UTF-8?q?act=20suite=20=E2=80=94=20timeouts,=20error=20capture,=20edge?= =?UTF-8?q?=20cases?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scheduler/storage/storagetest/contract.go | 62 +++++++++++++++++++++-- 1 file changed, 58 insertions(+), 4 deletions(-) diff --git a/scheduler/storage/storagetest/contract.go b/scheduler/storage/storagetest/contract.go index 6a318f6d50..076dc2a98c 100644 --- a/scheduler/storage/storagetest/contract.go +++ b/scheduler/storage/storagetest/contract.go @@ -5,9 +5,9 @@ package storagetest import ( "context" - "errors" "sync" "testing" + "time" "github.com/cloudquery/plugin-sdk/v4/scheduler/storage" "github.com/stretchr/testify/require" @@ -21,8 +21,10 @@ func TestContract(t *testing.T, newStorage func(t *testing.T) storage.Storage) { t.Run("push_pop_roundtrip", func(t *testing.T) { testPushPopRoundtrip(t, newStorage(t)) }) t.Run("pop_empty_returns_nil", func(t *testing.T) { testPopEmptyReturnsNil(t, newStorage(t)) }) t.Run("push_batch", func(t *testing.T) { testPushBatch(t, newStorage(t)) }) + t.Run("push_batch_empty", func(t *testing.T) { testPushBatchEmpty(t, newStorage(t)) }) t.Run("work_len", func(t *testing.T) { testWorkLen(t, newStorage(t)) }) t.Run("resource_put_get", func(t *testing.T) { testResourcePutGet(t, newStorage(t)) }) + t.Run("resource_put_rejects_zero_refcount", func(t *testing.T) { testPutResourceRejectsZeroRefcount(t, newStorage(t)) }) t.Run("resource_refcount_delete_on_zero", func(t *testing.T) { testRefcountDeleteOnZero(t, newStorage(t)) }) t.Run("resource_get_missing_errors", func(t *testing.T) { testGetMissingErrors(t, newStorage(t)) }) t.Run("resource_dec_missing_errors", func(t *testing.T) { testDecMissingErrors(t, newStorage(t)) }) @@ -127,16 +129,30 @@ func testConcurrentPushPopNoLoss(t *testing.T, s storage.Storage) { ctx := context.Background() const n = 500 var wg sync.WaitGroup + var pushErr error + var pushErrMu sync.Mutex + wg.Add(1) go func() { defer wg.Done() for i := 0; i < n; i++ { - _ = s.PushWork(ctx, storage.SerializedWorkUnit{TableName: "t"}) + if err := s.PushWork(ctx, storage.SerializedWorkUnit{TableName: "t"}); err != nil { + pushErrMu.Lock() + if pushErr == nil { + pushErr = err + } + pushErrMu.Unlock() + return + } } }() + deadline := time.Now().Add(10 * time.Second) popped := 0 for popped < n { + if time.Now().After(deadline) { + t.Fatalf("lost work: popped %d of %d after deadline", popped, n) + } got, err := s.PopWork(ctx) require.NoError(t, err) if got != nil { @@ -145,6 +161,10 @@ func testConcurrentPushPopNoLoss(t *testing.T, s storage.Storage) { } wg.Wait() + pushErrMu.Lock() + require.NoError(t, pushErr, "push error during concurrent test") + pushErrMu.Unlock() + n2, err := s.WorkLen(ctx) require.NoError(t, err) require.Equal(t, 0, n2) @@ -156,17 +176,29 @@ func testConcurrentRefcountNoDoubleDelete(t *testing.T, s storage.Storage) { require.NoError(t, s.PutResource(ctx, "shared", []byte("x"), n)) var wg sync.WaitGroup + errs := make(chan error, n) wg.Add(n) for i := 0; i < n; i++ { go func() { defer wg.Done() - _ = s.DecResourceRefcount(ctx, "shared") + if err := s.DecResourceRefcount(ctx, "shared"); err != nil { + errs <- err + } }() } wg.Wait() + close(errs) + + // A correct backend gives us exactly n successful decs. Any error here + // indicates a double-delete bug (over-decrement) or worse. + var got []error + for e := range errs { + got = append(got, e) + } + require.Empty(t, got, "expected no errors from %d concurrent decs, got: %v", n, got) _, err := s.GetResource(ctx, "shared") - require.True(t, errors.Is(err, storage.ErrResourceNotFound), "resource should be deleted after all refs drained, got err=%v", err) + require.ErrorIs(t, err, storage.ErrResourceNotFound, "resource should be deleted after all refs drained") } func testCloseIsIdempotent(t *testing.T, s storage.Storage) { @@ -174,3 +206,25 @@ func testCloseIsIdempotent(t *testing.T, s storage.Storage) { require.NoError(t, s.Close(ctx)) require.NoError(t, s.Close(ctx)) } + +func testPutResourceRejectsZeroRefcount(t *testing.T, s storage.Storage) { + ctx := context.Background() + err := s.PutResource(ctx, "id", []byte("x"), 0) + require.Error(t, err, "PutResource with refcount=0 must return an error") + + err = s.PutResource(ctx, "id", []byte("x"), -1) + require.Error(t, err, "PutResource with refcount<0 must return an error") + + _, err = s.GetResource(ctx, "id") + require.ErrorIs(t, err, storage.ErrResourceNotFound, "failed Put must not create the resource") +} + +func testPushBatchEmpty(t *testing.T, s storage.Storage) { + ctx := context.Background() + require.NoError(t, s.PushWorkBatch(ctx, nil), "empty batch should be a no-op") + require.NoError(t, s.PushWorkBatch(ctx, []storage.SerializedWorkUnit{}), "empty batch should be a no-op") + + n, err := s.WorkLen(ctx) + require.NoError(t, err) + require.Equal(t, 0, n) +} From 419739c095f573097cf1f0b23ff8a6fcaab5f475 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 17 Apr 2026 13:20:04 +0300 Subject: [PATCH 04/30] feat(scheduler): add in-memory Storage backend passing contract --- scheduler/storage/inmemory/inmemory.go | 114 ++++++++++++++++++++ scheduler/storage/inmemory/inmemory_test.go | 15 +++ 2 files changed, 129 insertions(+) create mode 100644 scheduler/storage/inmemory/inmemory.go create mode 100644 scheduler/storage/inmemory/inmemory_test.go diff --git a/scheduler/storage/inmemory/inmemory.go b/scheduler/storage/inmemory/inmemory.go new file mode 100644 index 0000000000..f8833671bc --- /dev/null +++ b/scheduler/storage/inmemory/inmemory.go @@ -0,0 +1,114 @@ +// Package inmemory is the default Storage backend — holds all scheduler +// state in process memory. Matches the behavior of the pre-existing +// ConcurrentRandomQueue: random-pop work semantics, atomic refcounts. +package inmemory + +import ( + "context" + "errors" + "math/rand" + "sync" + + "github.com/cloudquery/plugin-sdk/v4/scheduler/storage" +) + +type Storage struct { + mu sync.Mutex + queue []storage.SerializedWorkUnit + resources map[string]*resourceEntry + random *rand.Rand +} + +type resourceEntry struct { + data []byte + refcount int +} + +// New returns a Storage seeded for deterministic random-pop ordering. +func New(seed int64) *Storage { + return &Storage{ + queue: make([]storage.SerializedWorkUnit, 0), + resources: make(map[string]*resourceEntry), + random: rand.New(rand.NewSource(seed)), + } +} + +func (s *Storage) PushWork(_ context.Context, w storage.SerializedWorkUnit) error { + s.mu.Lock() + defer s.mu.Unlock() + s.queue = append(s.queue, w) + return nil +} + +func (s *Storage) PushWorkBatch(_ context.Context, ws []storage.SerializedWorkUnit) error { + s.mu.Lock() + defer s.mu.Unlock() + s.queue = append(s.queue, ws...) + return nil +} + +func (s *Storage) PopWork(_ context.Context) (*storage.SerializedWorkUnit, error) { + s.mu.Lock() + defer s.mu.Unlock() + if len(s.queue) == 0 { + return nil, nil + } + idx := s.random.Intn(len(s.queue)) + last := len(s.queue) - 1 + s.queue[idx], s.queue[last] = s.queue[last], s.queue[idx] + item := s.queue[last] + s.queue = s.queue[:last] + return &item, nil +} + +func (s *Storage) WorkLen(_ context.Context) (int, error) { + s.mu.Lock() + defer s.mu.Unlock() + return len(s.queue), nil +} + +func (s *Storage) PutResource(_ context.Context, id string, data []byte, refcount int) error { + if refcount < 1 { + return errors.New("storage/inmemory: refcount must be >= 1") + } + s.mu.Lock() + defer s.mu.Unlock() + cp := make([]byte, len(data)) + copy(cp, data) + s.resources[id] = &resourceEntry{data: cp, refcount: refcount} + return nil +} + +func (s *Storage) GetResource(_ context.Context, id string) ([]byte, error) { + s.mu.Lock() + defer s.mu.Unlock() + entry, ok := s.resources[id] + if !ok { + return nil, storage.ErrResourceNotFound + } + out := make([]byte, len(entry.data)) + copy(out, entry.data) + return out, nil +} + +func (s *Storage) DecResourceRefcount(_ context.Context, id string) error { + s.mu.Lock() + defer s.mu.Unlock() + entry, ok := s.resources[id] + if !ok { + return storage.ErrResourceNotFound + } + entry.refcount-- + if entry.refcount <= 0 { + delete(s.resources, id) + } + return nil +} + +func (s *Storage) Close(_ context.Context) error { + s.mu.Lock() + defer s.mu.Unlock() + s.queue = nil + s.resources = nil + return nil +} diff --git a/scheduler/storage/inmemory/inmemory_test.go b/scheduler/storage/inmemory/inmemory_test.go new file mode 100644 index 0000000000..f338313d84 --- /dev/null +++ b/scheduler/storage/inmemory/inmemory_test.go @@ -0,0 +1,15 @@ +package inmemory_test + +import ( + "testing" + + "github.com/cloudquery/plugin-sdk/v4/scheduler/storage" + "github.com/cloudquery/plugin-sdk/v4/scheduler/storage/inmemory" + "github.com/cloudquery/plugin-sdk/v4/scheduler/storage/storagetest" +) + +func TestInMemory_Contract(t *testing.T) { + storagetest.TestContract(t, func(t *testing.T) storage.Storage { + return inmemory.New(1) + }) +} From 53134350aa7cdb79d8b11d22d850e85b755bf575 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 17 Apr 2026 13:26:14 +0300 Subject: [PATCH 05/30] feat(schema): add itemSample field + accessors to Table --- schema/table.go | 34 ++++++++++++++++++++++++++++++ schema/table_itemsample_test.go | 37 +++++++++++++++++++++++++++++++++ schema/table_test.go | 3 ++- 3 files changed, 73 insertions(+), 1 deletion(-) create mode 100644 schema/table_itemsample_test.go diff --git a/schema/table.go b/schema/table.go index 48712ac35b..360278e8ee 100644 --- a/schema/table.go +++ b/schema/table.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "reflect" "regexp" "slices" "strings" @@ -118,6 +119,13 @@ type Table struct { // IgnorePKComponentsMismatchValidation is a flag that indicates if the table should skip validating usage of both primary key components and primary keys IgnorePKComponentsMismatchValidation bool `json:"ignore_pk_components_mismatch_validation"` + + // itemSample holds a zero-value sample of the type returned by this + // table's Resolver (the runtime type of Resource.Item). Populated + // automatically by transformers.TransformWithStruct; exposed via + // SetItemSample for plugins that use a custom Transform. + // Consulted only when the scheduler uses an external queue backend. + itemSample reflect.Type } var ( @@ -766,3 +774,29 @@ func (t *Table) Copy(parent *Table) *Table { func ValidColumnName(name string) bool { return reValidColumnName.MatchString(name) } + +// SetItemSample records a sample of the Item type. First-write-wins — if +// already set, subsequent calls are ignored. Safe to call multiple times +// from plugin registration code. +// +// Typically called automatically by transformers.TransformWithStruct; +// plugin authors using a custom Transform should call this directly with +// a zero-value of their Item type. +func (t *Table) SetItemSample(sample any) { + if t.itemSample != nil { + return + } + if sample == nil { + return + } + rt := reflect.TypeOf(sample) + if rt.Kind() == reflect.Pointer { + rt = rt.Elem() + } + t.itemSample = rt +} + +// ItemSampleType returns the recorded Item type, or nil if unset. +func (t *Table) ItemSampleType() reflect.Type { + return t.itemSample +} diff --git a/schema/table_itemsample_test.go b/schema/table_itemsample_test.go new file mode 100644 index 0000000000..e8e6bd7767 --- /dev/null +++ b/schema/table_itemsample_test.go @@ -0,0 +1,37 @@ +package schema + +import ( + "reflect" + "testing" + + "github.com/stretchr/testify/require" +) + +type sampleItem struct{ Name string } + +func TestTable_SetItemSample(t *testing.T) { + tbl := &Table{Name: "t1"} + require.Nil(t, tbl.ItemSampleType()) + + tbl.SetItemSample(sampleItem{}) + got := tbl.ItemSampleType() + require.NotNil(t, got) + require.Equal(t, reflect.TypeOf(sampleItem{}), got) +} + +func TestTable_SetItemSample_PointerUnwrapped(t *testing.T) { + tbl := &Table{Name: "t1"} + tbl.SetItemSample(&sampleItem{}) + got := tbl.ItemSampleType() + require.NotNil(t, got) + require.Equal(t, reflect.TypeOf(sampleItem{}), got, "pointer should be unwrapped to the element type") +} + +func TestTable_SetItemSample_Idempotent(t *testing.T) { + tbl := &Table{Name: "t1"} + tbl.SetItemSample(sampleItem{}) + // Second call with a different type is a no-op — first-write-wins. + tbl.SetItemSample(42) + got := tbl.ItemSampleType() + require.Equal(t, reflect.TypeOf(sampleItem{}), got) +} diff --git a/schema/table_test.go b/schema/table_test.go index 2d491a245a..7e5baf0599 100644 --- a/schema/table_test.go +++ b/schema/table_test.go @@ -7,6 +7,7 @@ import ( "github.com/apache/arrow-go/v18/arrow" "github.com/cloudquery/plugin-sdk/v4/types" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/require" ) @@ -787,7 +788,7 @@ func TestTablesToAndFromArrow(t *testing.T) { if err != nil { t.Fatal(err) } - if diff := cmp.Diff(table, tableFromArrow); diff != "" { + if diff := cmp.Diff(table, tableFromArrow, cmpopts.IgnoreUnexported(Table{})); diff != "" { t.Errorf("diff (+got, -want): %v", diff) } } From c409c24593251b10b23b47e51a7157ee1583ddbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 17 Apr 2026 13:57:52 +0300 Subject: [PATCH 06/30] feat(schema): panic on itemSample type conflict + support **T unwrap --- schema/table.go | 20 +++++++++++++------- schema/table_itemsample_test.go | 26 +++++++++++++++++++++++--- 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/schema/table.go b/schema/table.go index 360278e8ee..7f09e0f79d 100644 --- a/schema/table.go +++ b/schema/table.go @@ -775,24 +775,30 @@ func ValidColumnName(name string) bool { return reValidColumnName.MatchString(name) } -// SetItemSample records a sample of the Item type. First-write-wins — if -// already set, subsequent calls are ignored. Safe to call multiple times -// from plugin registration code. +// SetItemSample records a sample of the Item type. Idempotent for identical +// types (multiple calls with the same type are no-ops); panics on conflict +// because a Table cannot legitimately resolve two different Item types. // // Typically called automatically by transformers.TransformWithStruct; // plugin authors using a custom Transform should call this directly with // a zero-value of their Item type. +// +// Not safe for concurrent writes. Expected to be called only during plugin +// table registration, never during a sync. func (t *Table) SetItemSample(sample any) { - if t.itemSample != nil { - return - } if sample == nil { return } rt := reflect.TypeOf(sample) - if rt.Kind() == reflect.Pointer { + for rt.Kind() == reflect.Pointer { rt = rt.Elem() } + if t.itemSample != nil { + if t.itemSample != rt { + panic(fmt.Sprintf("schema.Table %q: itemSample already set to %v, got conflicting %v", t.Name, t.itemSample, rt)) + } + return + } t.itemSample = rt } diff --git a/schema/table_itemsample_test.go b/schema/table_itemsample_test.go index e8e6bd7767..908a92e78e 100644 --- a/schema/table_itemsample_test.go +++ b/schema/table_itemsample_test.go @@ -27,11 +27,31 @@ func TestTable_SetItemSample_PointerUnwrapped(t *testing.T) { require.Equal(t, reflect.TypeOf(sampleItem{}), got, "pointer should be unwrapped to the element type") } -func TestTable_SetItemSample_Idempotent(t *testing.T) { +func TestTable_SetItemSample_IdempotentSameType(t *testing.T) { tbl := &Table{Name: "t1"} tbl.SetItemSample(sampleItem{}) - // Second call with a different type is a no-op — first-write-wins. - tbl.SetItemSample(42) + // Second call with the SAME type is a no-op. + tbl.SetItemSample(sampleItem{}) + got := tbl.ItemSampleType() + require.Equal(t, reflect.TypeOf(sampleItem{}), got) +} + +func TestTable_SetItemSample_PanicsOnConflict(t *testing.T) { + tbl := &Table{Name: "t1"} + tbl.SetItemSample(sampleItem{}) + require.PanicsWithValue(t, + `schema.Table "t1": itemSample already set to schema.sampleItem, got conflicting int`, + func() { + tbl.SetItemSample(42) + }, + ) +} + +func TestTable_SetItemSample_IdempotentValueVsPointer(t *testing.T) { + tbl := &Table{Name: "t1"} + tbl.SetItemSample(sampleItem{}) + // Pointer-to-same-type is idempotent since pointer is unwrapped. + tbl.SetItemSample(&sampleItem{}) got := tbl.ItemSampleType() require.Equal(t, reflect.TypeOf(sampleItem{}), got) } From bf3c4071fd81f71f9eac75470125252e672a6354 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 17 Apr 2026 14:00:00 +0300 Subject: [PATCH 07/30] feat(transformers): TransformWithStruct populates Table.itemSample --- transformers/struct.go | 1 + transformers/struct_itemsample_test.go | 24 ++++++++++++++++++++++++ 2 files changed, 25 insertions(+) create mode 100644 transformers/struct_itemsample_test.go diff --git a/transformers/struct.go b/transformers/struct.go index 9cfc2cef76..dc564ed64e 100644 --- a/transformers/struct.go +++ b/transformers/struct.go @@ -220,6 +220,7 @@ func TransformWithStruct(st any, opts ...StructTransformerOption) schema.Transfo return func(table *schema.Table) error { t.table = table + table.SetItemSample(st) e := reflect.ValueOf(st) if e.Kind() == reflect.Pointer { e = e.Elem() diff --git a/transformers/struct_itemsample_test.go b/transformers/struct_itemsample_test.go new file mode 100644 index 0000000000..3001d11145 --- /dev/null +++ b/transformers/struct_itemsample_test.go @@ -0,0 +1,24 @@ +package transformers_test + +import ( + "reflect" + "testing" + + "github.com/cloudquery/plugin-sdk/v4/schema" + "github.com/cloudquery/plugin-sdk/v4/transformers" + "github.com/stretchr/testify/require" +) + +type myItem struct { + ID string + Name string +} + +func TestTransformWithStruct_PopulatesItemSample(t *testing.T) { + tbl := &schema.Table{Name: "t1", Transform: transformers.TransformWithStruct(&myItem{})} + require.NoError(t, tbl.Transform(tbl)) + + got := tbl.ItemSampleType() + require.NotNil(t, got, "TransformWithStruct should populate itemSample") + require.Equal(t, reflect.TypeOf(myItem{}), got) +} From 14a5973b6a26b2d5b9c57c8347635e6176ee58c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 17 Apr 2026 14:03:23 +0300 Subject: [PATCH 08/30] feat(scheduler/queue): add Resource codec using itemSample + JSON --- scheduler/queue/codec.go | 84 ++++++++++++++++++++++++++++++++++ scheduler/queue/codec_test.go | 86 +++++++++++++++++++++++++++++++++++ 2 files changed, 170 insertions(+) create mode 100644 scheduler/queue/codec.go create mode 100644 scheduler/queue/codec_test.go diff --git a/scheduler/queue/codec.go b/scheduler/queue/codec.go new file mode 100644 index 0000000000..c721187205 --- /dev/null +++ b/scheduler/queue/codec.go @@ -0,0 +1,84 @@ +package queue + +import ( + "encoding/json" + "fmt" + "reflect" + + "github.com/cloudquery/plugin-sdk/v4/schema" +) + +// Codec serializes *schema.Resource values so the queue scheduler can spill +// them to an external Storage backend and reconstruct them on the consumer +// side. Uses JSON for Item payloads (exploiting existing API-response JSON +// tags) and reflects on Table.ItemSampleType() to round-trip to the concrete +// Go type. +type Codec struct { + tablesByName map[string]*schema.Table +} + +// NewCodec builds a codec with a table lookup. tables may be the flattened +// list of all tables a plugin handles — only table names are used as keys. +func NewCodec(tables schema.Tables) *Codec { + m := make(map[string]*schema.Table, len(tables)) + walk(tables, func(t *schema.Table) { m[t.Name] = t }) + return &Codec{tablesByName: m} +} + +func walk(tables schema.Tables, f func(*schema.Table)) { + for _, t := range tables { + f(t) + walk(t.Relations, f) + } +} + +type serializedResource struct { + TableName string `json:"table_name"` + Item json.RawMessage `json:"item"` + ParentID string `json:"parent_id,omitempty"` +} + +// EncodeResource serializes r with an explicit parentID (caller-chosen UUID +// of the resource's parent in the Storage, or "" for root resources). +func (c *Codec) EncodeResource(r *schema.Resource, parentID string) ([]byte, error) { + if r == nil { + return nil, fmt.Errorf("codec: nil resource") + } + if r.Table == nil { + return nil, fmt.Errorf("codec: resource has nil table") + } + itemBytes, err := json.Marshal(r.Item) + if err != nil { + return nil, fmt.Errorf("codec: marshal item for table %q: %w", r.Table.Name, err) + } + return json.Marshal(serializedResource{ + TableName: r.Table.Name, + Item: itemBytes, + ParentID: parentID, + }) +} + +// DecodeResource reconstructs a *schema.Resource from bytes. Returns the +// resource, the parentID it references (for callers that want to chain-load +// ancestors), and any error. The returned Resource has Parent=nil; callers +// wanting the ancestor chain attached should use a higher-level helper. +func (c *Codec) DecodeResource(data []byte) (*schema.Resource, string, error) { + var sr serializedResource + if err := json.Unmarshal(data, &sr); err != nil { + return nil, "", fmt.Errorf("codec: unmarshal envelope: %w", err) + } + tbl, ok := c.tablesByName[sr.TableName] + if !ok { + return nil, "", fmt.Errorf("codec: unknown table %q", sr.TableName) + } + sampleType := tbl.ItemSampleType() + if sampleType == nil { + return nil, "", fmt.Errorf("codec: table %q has no itemSample; configure TransformWithStruct or SetItemSample", sr.TableName) + } + ptr := reflect.New(sampleType).Interface() + if err := json.Unmarshal(sr.Item, ptr); err != nil { + return nil, "", fmt.Errorf("codec: unmarshal item for table %q: %w", sr.TableName, err) + } + item := reflect.ValueOf(ptr).Elem().Interface() + return schema.NewResourceData(tbl, nil, item), sr.ParentID, nil +} diff --git a/scheduler/queue/codec_test.go b/scheduler/queue/codec_test.go new file mode 100644 index 0000000000..5612f17d2a --- /dev/null +++ b/scheduler/queue/codec_test.go @@ -0,0 +1,86 @@ +package queue + +import ( + "testing" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/cloudquery/plugin-sdk/v4/schema" + "github.com/cloudquery/plugin-sdk/v4/transformers" + "github.com/stretchr/testify/require" +) + +type codecTestItem struct { + ID string + Name string + Age int +} + +func TestCodec_RoundTripRoot(t *testing.T) { + tbl := &schema.Table{ + Name: "codec_test", + Transform: transformers.TransformWithStruct(&codecTestItem{}), + Columns: schema.ColumnList{{Name: "id", Type: arrow.BinaryTypes.String}}, + } + require.NoError(t, tbl.Transform(tbl)) + + item := codecTestItem{ID: "a", Name: "alice", Age: 30} + res := schema.NewResourceData(tbl, nil, item) + require.NoError(t, res.Set("id", "a")) + + tables := schema.Tables{tbl} + c := NewCodec(tables) + + data, err := c.EncodeResource(res, "") // empty parentID = root + require.NoError(t, err) + require.NotEmpty(t, data) + + decoded, parentID, err := c.DecodeResource(data) + require.NoError(t, err) + require.Equal(t, "", parentID) + require.Equal(t, "codec_test", decoded.Table.Name) + require.Nil(t, decoded.Parent) + + typed, ok := decoded.Item.(codecTestItem) + require.True(t, ok, "Item should round-trip to concrete type, got %T", decoded.Item) + require.Equal(t, item, typed) +} + +func TestCodec_RoundTripWithParentRef(t *testing.T) { + parentTbl := &schema.Table{ + Name: "parent_tbl", + Transform: transformers.TransformWithStruct(&codecTestItem{}), + Columns: schema.ColumnList{{Name: "id", Type: arrow.BinaryTypes.String}}, + } + childTbl := &schema.Table{ + Name: "child_tbl", + Transform: transformers.TransformWithStruct(&codecTestItem{}), + Columns: schema.ColumnList{{Name: "id", Type: arrow.BinaryTypes.String}}, + } + require.NoError(t, parentTbl.Transform(parentTbl)) + require.NoError(t, childTbl.Transform(childTbl)) + + child := schema.NewResourceData(childTbl, nil, codecTestItem{ID: "c"}) + c := NewCodec(schema.Tables{parentTbl, childTbl}) + + data, err := c.EncodeResource(child, "parent-id-123") + require.NoError(t, err) + + decoded, parentID, err := c.DecodeResource(data) + require.NoError(t, err) + require.Equal(t, "parent-id-123", parentID) + require.Equal(t, "child_tbl", decoded.Table.Name) +} + +func TestCodec_DecodeUnknownTableErrors(t *testing.T) { + other := &schema.Table{Name: "unknown_table", Transform: transformers.TransformWithStruct(&codecTestItem{})} + require.NoError(t, other.Transform(other)) + + // Use a codec that does NOT have other/unknown_table registered. + c := NewCodec(schema.Tables{}) + r := schema.NewResourceData(other, nil, codecTestItem{}) + blob, err := c.EncodeResource(r, "") + require.NoError(t, err) + _, _, err = c.DecodeResource(blob) + require.Error(t, err) + require.Contains(t, err.Error(), "unknown_table") +} From 3694beccae8a7542232d7a112357f2a20af968c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 17 Apr 2026 14:06:58 +0300 Subject: [PATCH 09/30] feat(scheduler/storage/badger): add package skeleton + dependency --- go.mod | 3 ++ go.sum | 8 ++++ scheduler/storage/badger/badger.go | 71 ++++++++++++++++++++++++++++++ 3 files changed, 82 insertions(+) create mode 100644 scheduler/storage/badger/badger.go diff --git a/go.mod b/go.mod index 82c35b8e1e..b4fa617ceb 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/cloudquery/codegen v0.4.1 github.com/cloudquery/plugin-pb-go v1.27.14 github.com/cloudquery/plugin-sdk/v2 v2.7.0 + github.com/dgraph-io/badger/v4 v4.9.1 github.com/getsentry/sentry-go v0.44.1 github.com/goccy/go-json v0.10.6 github.com/golang/mock v1.6.0 @@ -68,6 +69,8 @@ require ( github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/dgraph-io/ristretto/v2 v2.2.0 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/ghodss/yaml v1.0.0 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect diff --git a/go.sum b/go.sum index 8f94134f84..05be45d73a 100644 --- a/go.sum +++ b/go.sum @@ -70,8 +70,16 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgraph-io/badger/v4 v4.9.1 h1:DocZXZkg5JJHJPtUErA0ibyHxOVUDVoXLSCV6t8NC8w= +github.com/dgraph-io/badger/v4 v4.9.1/go.mod h1:5/MEx97uzdPUHR4KtkNt8asfI2T4JiEiQlV7kWUo8c0= +github.com/dgraph-io/ristretto/v2 v2.2.0 h1:bkY3XzJcXoMuELV8F+vS8kzNgicwQFAaGINAEJdWGOM= +github.com/dgraph-io/ristretto/v2 v2.2.0/go.mod h1:RZrm63UmcBAaYWC1DotLYBmTvgkrs0+XhBd7Npn7/zI= +github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da h1:aIftn67I1fkbMa512G+w+Pxci9hJPB8oMnkcP3iZF38= +github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dlclark/regexp2 v1.11.0 h1:G/nrcoOa7ZXlpoa/91N3X7mM3r8eIlMBBJZvsz/mxKI= github.com/dlclark/regexp2 v1.11.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/getsentry/sentry-go v0.44.1 h1:/cPtrA5qB7uMRrhgSn9TYtcEF36auGP3Y6+ThvD/yaI= diff --git a/scheduler/storage/badger/badger.go b/scheduler/storage/badger/badger.go new file mode 100644 index 0000000000..9248e9debe --- /dev/null +++ b/scheduler/storage/badger/badger.go @@ -0,0 +1,71 @@ +// Package badger implements the Storage interface using a local embedded +// BadgerDB. Primary v1 backend for memory offload. +package badger + +import ( + "context" + "errors" + "fmt" + + badgerdb "github.com/dgraph-io/badger/v4" + + "github.com/cloudquery/plugin-sdk/v4/scheduler/storage" +) + +// Options configure the Badger backend. +type Options struct { + // Path is the on-disk directory for the Badger database. Must be writable. + // The caller is responsible for ensuring isolation (e.g. appending an + // invocation ID) across concurrent syncs of the same plugin. + Path string +} + +// Storage is a BadgerDB-backed Storage. +type Storage struct { + db *badgerdb.DB +} + +// Open creates or opens a Badger database at opts.Path. +func Open(opts Options) (*Storage, error) { + if opts.Path == "" { + return nil, errors.New("badger: Options.Path is required") + } + bopts := badgerdb.DefaultOptions(opts.Path).WithLogger(nil) + db, err := badgerdb.Open(bopts) + if err != nil { + return nil, fmt.Errorf("badger: open %q: %w", opts.Path, err) + } + return &Storage{db: db}, nil +} + +// --- Storage methods below this line will be filled in subsequent tasks. --- + +func (s *Storage) PushWork(ctx context.Context, w storage.SerializedWorkUnit) error { + return errors.New("not implemented") +} +func (s *Storage) PushWorkBatch(ctx context.Context, ws []storage.SerializedWorkUnit) error { + return errors.New("not implemented") +} +func (s *Storage) PopWork(ctx context.Context) (*storage.SerializedWorkUnit, error) { + return nil, errors.New("not implemented") +} +func (s *Storage) WorkLen(ctx context.Context) (int, error) { + return 0, errors.New("not implemented") +} +func (s *Storage) PutResource(ctx context.Context, id string, data []byte, refcount int) error { + return errors.New("not implemented") +} +func (s *Storage) GetResource(ctx context.Context, id string) ([]byte, error) { + return nil, errors.New("not implemented") +} +func (s *Storage) DecResourceRefcount(ctx context.Context, id string) error { + return errors.New("not implemented") +} +func (s *Storage) Close(ctx context.Context) error { + if s.db == nil { + return nil + } + err := s.db.Close() + s.db = nil + return err +} From 75b8189581d36b89601427ddcdb783e756c877fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 17 Apr 2026 14:10:09 +0300 Subject: [PATCH 10/30] feat(scheduler/storage/badger): implement work-queue ops --- scheduler/storage/badger/badger.go | 82 +++++++++++++++++++- scheduler/storage/badger/badger_work_test.go | 50 ++++++++++++ 2 files changed, 128 insertions(+), 4 deletions(-) create mode 100644 scheduler/storage/badger/badger_work_test.go diff --git a/scheduler/storage/badger/badger.go b/scheduler/storage/badger/badger.go index 9248e9debe..35fe2da8cc 100644 --- a/scheduler/storage/badger/badger.go +++ b/scheduler/storage/badger/badger.go @@ -4,10 +4,12 @@ package badger import ( "context" + "encoding/json" "errors" "fmt" badgerdb "github.com/dgraph-io/badger/v4" + "github.com/google/uuid" "github.com/cloudquery/plugin-sdk/v4/scheduler/storage" ) @@ -40,18 +42,90 @@ func Open(opts Options) (*Storage, error) { // --- Storage methods below this line will be filled in subsequent tasks. --- +const ( + workPrefix = "w/" +) + +// workKey generates a unique key for a queued work unit. UUID gives random +// scan order when iterating the prefix, satisfying the contract's "no +// particular pop order" requirement. +func workKey() []byte { + return []byte(workPrefix + uuid.NewString()) +} + func (s *Storage) PushWork(ctx context.Context, w storage.SerializedWorkUnit) error { - return errors.New("not implemented") + data, err := json.Marshal(w) + if err != nil { + return fmt.Errorf("badger: marshal work: %w", err) + } + return s.db.Update(func(txn *badgerdb.Txn) error { + return txn.Set(workKey(), data) + }) } + func (s *Storage) PushWorkBatch(ctx context.Context, ws []storage.SerializedWorkUnit) error { - return errors.New("not implemented") + if len(ws) == 0 { + return nil + } + wb := s.db.NewWriteBatch() + defer wb.Cancel() + for _, w := range ws { + data, err := json.Marshal(w) + if err != nil { + return fmt.Errorf("badger: marshal work: %w", err) + } + if err := wb.Set(workKey(), data); err != nil { + return fmt.Errorf("badger: write batch: %w", err) + } + } + return wb.Flush() } + func (s *Storage) PopWork(ctx context.Context) (*storage.SerializedWorkUnit, error) { - return nil, errors.New("not implemented") + var out *storage.SerializedWorkUnit + err := s.db.Update(func(txn *badgerdb.Txn) error { + it := txn.NewIterator(badgerdb.DefaultIteratorOptions) + defer it.Close() + prefix := []byte(workPrefix) + it.Seek(prefix) + if !it.ValidForPrefix(prefix) { + return nil // empty queue + } + item := it.Item() + key := item.KeyCopy(nil) + value, err := item.ValueCopy(nil) + if err != nil { + return fmt.Errorf("badger: read work value: %w", err) + } + var w storage.SerializedWorkUnit + if err := json.Unmarshal(value, &w); err != nil { + return fmt.Errorf("badger: unmarshal work: %w", err) + } + out = &w + return txn.Delete(key) + }) + if err != nil { + return nil, err + } + return out, nil } + func (s *Storage) WorkLen(ctx context.Context) (int, error) { - return 0, errors.New("not implemented") + count := 0 + err := s.db.View(func(txn *badgerdb.Txn) error { + opts := badgerdb.DefaultIteratorOptions + opts.PrefetchValues = false + it := txn.NewIterator(opts) + defer it.Close() + prefix := []byte(workPrefix) + for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + count++ + } + return nil + }) + return count, err } + func (s *Storage) PutResource(ctx context.Context, id string, data []byte, refcount int) error { return errors.New("not implemented") } diff --git a/scheduler/storage/badger/badger_work_test.go b/scheduler/storage/badger/badger_work_test.go new file mode 100644 index 0000000000..2a63ddc6ad --- /dev/null +++ b/scheduler/storage/badger/badger_work_test.go @@ -0,0 +1,50 @@ +package badger_test + +import ( + "context" + "testing" + + "github.com/cloudquery/plugin-sdk/v4/scheduler/storage" + bstore "github.com/cloudquery/plugin-sdk/v4/scheduler/storage/badger" + "github.com/stretchr/testify/require" +) + +func newBadger(t *testing.T) *bstore.Storage { + t.Helper() + dir := t.TempDir() + s, err := bstore.Open(bstore.Options{Path: dir}) + require.NoError(t, err) + t.Cleanup(func() { _ = s.Close(context.Background()) }) + return s +} + +func TestBadger_PushPopWorkRoundtrip(t *testing.T) { + s := newBadger(t) + ctx := context.Background() + want := storage.SerializedWorkUnit{TableName: "t1", ClientID: "c1", ParentID: "p1"} + require.NoError(t, s.PushWork(ctx, want)) + + got, err := s.PopWork(ctx) + require.NoError(t, err) + require.NotNil(t, got) + require.Equal(t, want, *got) + + got, err = s.PopWork(ctx) + require.NoError(t, err) + require.Nil(t, got) +} + +func TestBadger_WorkLen(t *testing.T) { + s := newBadger(t) + ctx := context.Background() + n, err := s.WorkLen(ctx) + require.NoError(t, err) + require.Equal(t, 0, n) + + require.NoError(t, s.PushWork(ctx, storage.SerializedWorkUnit{TableName: "t"})) + require.NoError(t, s.PushWorkBatch(ctx, []storage.SerializedWorkUnit{{TableName: "t"}, {TableName: "t"}})) + + n, err = s.WorkLen(ctx) + require.NoError(t, err) + require.Equal(t, 3, n) +} From 31f0dc88bffe6d41b28c6062a77e1d7002667215 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 17 Apr 2026 14:12:53 +0300 Subject: [PATCH 11/30] feat(scheduler/storage/badger): implement resource KV + refcount --- scheduler/storage/badger/badger.go | 76 ++++++++++++++++++- .../storage/badger/badger_resource_test.go | 47 ++++++++++++ 2 files changed, 119 insertions(+), 4 deletions(-) create mode 100644 scheduler/storage/badger/badger_resource_test.go diff --git a/scheduler/storage/badger/badger.go b/scheduler/storage/badger/badger.go index 35fe2da8cc..d72723cc2e 100644 --- a/scheduler/storage/badger/badger.go +++ b/scheduler/storage/badger/badger.go @@ -43,7 +43,8 @@ func Open(opts Options) (*Storage, error) { // --- Storage methods below this line will be filled in subsequent tasks. --- const ( - workPrefix = "w/" + workPrefix = "w/" + resourcePrefix = "r/" ) // workKey generates a unique key for a queued work unit. UUID gives random @@ -53,6 +54,18 @@ func workKey() []byte { return []byte(workPrefix + uuid.NewString()) } +// resourceEntry is the on-disk representation of a parent resource. Stored +// as a single JSON blob; refcount is a field so DecResourceRefcount can +// update it atomically within one transaction. +type resourceEntry struct { + Data []byte `json:"data"` + Refcount int `json:"refcount"` +} + +func resourceKey(id string) []byte { + return []byte(resourcePrefix + id) +} + func (s *Storage) PushWork(ctx context.Context, w storage.SerializedWorkUnit) error { data, err := json.Marshal(w) if err != nil { @@ -127,14 +140,69 @@ func (s *Storage) WorkLen(ctx context.Context) (int, error) { } func (s *Storage) PutResource(ctx context.Context, id string, data []byte, refcount int) error { - return errors.New("not implemented") + if refcount < 1 { + return errors.New("badger: refcount must be >= 1") + } + entry := resourceEntry{Data: data, Refcount: refcount} + blob, err := json.Marshal(entry) + if err != nil { + return fmt.Errorf("badger: marshal resource: %w", err) + } + return s.db.Update(func(txn *badgerdb.Txn) error { + return txn.Set(resourceKey(id), blob) + }) } + func (s *Storage) GetResource(ctx context.Context, id string) ([]byte, error) { - return nil, errors.New("not implemented") + var out []byte + err := s.db.View(func(txn *badgerdb.Txn) error { + item, err := txn.Get(resourceKey(id)) + if errors.Is(err, badgerdb.ErrKeyNotFound) { + return storage.ErrResourceNotFound + } + if err != nil { + return err + } + return item.Value(func(val []byte) error { + var entry resourceEntry + if err := json.Unmarshal(val, &entry); err != nil { + return fmt.Errorf("badger: unmarshal resource: %w", err) + } + out = append(out[:0], entry.Data...) + return nil + }) + }) + return out, err } + func (s *Storage) DecResourceRefcount(ctx context.Context, id string) error { - return errors.New("not implemented") + return s.db.Update(func(txn *badgerdb.Txn) error { + key := resourceKey(id) + item, err := txn.Get(key) + if errors.Is(err, badgerdb.ErrKeyNotFound) { + return storage.ErrResourceNotFound + } + if err != nil { + return err + } + var entry resourceEntry + if err := item.Value(func(val []byte) error { + return json.Unmarshal(val, &entry) + }); err != nil { + return fmt.Errorf("badger: unmarshal resource: %w", err) + } + entry.Refcount-- + if entry.Refcount <= 0 { + return txn.Delete(key) + } + blob, err := json.Marshal(entry) + if err != nil { + return fmt.Errorf("badger: marshal resource: %w", err) + } + return txn.Set(key, blob) + }) } + func (s *Storage) Close(ctx context.Context) error { if s.db == nil { return nil diff --git a/scheduler/storage/badger/badger_resource_test.go b/scheduler/storage/badger/badger_resource_test.go new file mode 100644 index 0000000000..b2c47960f9 --- /dev/null +++ b/scheduler/storage/badger/badger_resource_test.go @@ -0,0 +1,47 @@ +package badger_test + +import ( + "context" + "testing" + + "github.com/cloudquery/plugin-sdk/v4/scheduler/storage" + "github.com/stretchr/testify/require" +) + +func TestBadger_PutGetResource(t *testing.T) { + s := newBadger(t) + ctx := context.Background() + require.NoError(t, s.PutResource(ctx, "id-1", []byte("hello"), 1)) + + got, err := s.GetResource(ctx, "id-1") + require.NoError(t, err) + require.Equal(t, []byte("hello"), got) +} + +func TestBadger_RefcountDeleteOnZero(t *testing.T) { + s := newBadger(t) + ctx := context.Background() + require.NoError(t, s.PutResource(ctx, "id-1", []byte("x"), 2)) + + require.NoError(t, s.DecResourceRefcount(ctx, "id-1")) + _, err := s.GetResource(ctx, "id-1") + require.NoError(t, err) + + require.NoError(t, s.DecResourceRefcount(ctx, "id-1")) + _, err = s.GetResource(ctx, "id-1") + require.ErrorIs(t, err, storage.ErrResourceNotFound) +} + +func TestBadger_GetMissing(t *testing.T) { + s := newBadger(t) + ctx := context.Background() + _, err := s.GetResource(ctx, "missing") + require.ErrorIs(t, err, storage.ErrResourceNotFound) +} + +func TestBadger_DecMissing(t *testing.T) { + s := newBadger(t) + ctx := context.Background() + err := s.DecResourceRefcount(ctx, "missing") + require.ErrorIs(t, err, storage.ErrResourceNotFound) +} From 7e0f8e042f6d3410b0b07dd9502815b4e6fe52f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 17 Apr 2026 14:17:30 +0300 Subject: [PATCH 12/30] feat(scheduler/storage/badger): retry Update on conflict + verify contract suite --- scheduler/storage/badger/badger.go | 23 +++++++++++++++++-- .../storage/badger/badger_contract_test.go | 22 ++++++++++++++++++ 2 files changed, 43 insertions(+), 2 deletions(-) create mode 100644 scheduler/storage/badger/badger_contract_test.go diff --git a/scheduler/storage/badger/badger.go b/scheduler/storage/badger/badger.go index d72723cc2e..ec5287c9c7 100644 --- a/scheduler/storage/badger/badger.go +++ b/scheduler/storage/badger/badger.go @@ -27,6 +27,25 @@ type Storage struct { db *badgerdb.DB } +// maxUpdateRetries bounds the retry loop for Badger Update calls that hit +// SSI transaction conflicts. Under contention from many concurrent workers, +// a handful of retries is typical; exceeding this limit indicates pathology +// (e.g., every worker racing on the same key at once). +const maxUpdateRetries = 100 + +// updateWithRetry runs fn inside s.db.Update, retrying on ErrConflict up to +// maxUpdateRetries times. Necessary because Badger's SSI Update does not +// auto-retry; callers must when they intend read-modify-write semantics. +func (s *Storage) updateWithRetry(fn func(txn *badgerdb.Txn) error) error { + for i := 0; i < maxUpdateRetries; i++ { + err := s.db.Update(fn) + if !errors.Is(err, badgerdb.ErrConflict) { + return err + } + } + return fmt.Errorf("badger: transaction conflict after %d retries", maxUpdateRetries) +} + // Open creates or opens a Badger database at opts.Path. func Open(opts Options) (*Storage, error) { if opts.Path == "" { @@ -96,7 +115,7 @@ func (s *Storage) PushWorkBatch(ctx context.Context, ws []storage.SerializedWork func (s *Storage) PopWork(ctx context.Context) (*storage.SerializedWorkUnit, error) { var out *storage.SerializedWorkUnit - err := s.db.Update(func(txn *badgerdb.Txn) error { + err := s.updateWithRetry(func(txn *badgerdb.Txn) error { it := txn.NewIterator(badgerdb.DefaultIteratorOptions) defer it.Close() prefix := []byte(workPrefix) @@ -176,7 +195,7 @@ func (s *Storage) GetResource(ctx context.Context, id string) ([]byte, error) { } func (s *Storage) DecResourceRefcount(ctx context.Context, id string) error { - return s.db.Update(func(txn *badgerdb.Txn) error { + return s.updateWithRetry(func(txn *badgerdb.Txn) error { key := resourceKey(id) item, err := txn.Get(key) if errors.Is(err, badgerdb.ErrKeyNotFound) { diff --git a/scheduler/storage/badger/badger_contract_test.go b/scheduler/storage/badger/badger_contract_test.go new file mode 100644 index 0000000000..aa2b105d01 --- /dev/null +++ b/scheduler/storage/badger/badger_contract_test.go @@ -0,0 +1,22 @@ +package badger_test + +import ( + "context" + "testing" + + "github.com/cloudquery/plugin-sdk/v4/scheduler/storage" + bstore "github.com/cloudquery/plugin-sdk/v4/scheduler/storage/badger" + "github.com/cloudquery/plugin-sdk/v4/scheduler/storage/storagetest" +) + +func TestBadger_Contract(t *testing.T) { + storagetest.TestContract(t, func(t *testing.T) storage.Storage { + dir := t.TempDir() + s, err := bstore.Open(bstore.Options{Path: dir}) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = s.Close(context.Background()) }) + return s + }) +} From 92d2dbcd83fd24263feb9f67a3a70407bab747b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 17 Apr 2026 14:20:48 +0300 Subject: [PATCH 13/30] feat(scheduler): add QueueConfig with validation --- scheduler/queue_config.go | 59 ++++++++++++++++++++++++++++++++++ scheduler/queue_config_test.go | 47 +++++++++++++++++++++++++++ 2 files changed, 106 insertions(+) create mode 100644 scheduler/queue_config.go create mode 100644 scheduler/queue_config_test.go diff --git a/scheduler/queue_config.go b/scheduler/queue_config.go new file mode 100644 index 0000000000..5a7bd9777a --- /dev/null +++ b/scheduler/queue_config.go @@ -0,0 +1,59 @@ +package scheduler + +import "fmt" + +// QueueType identifies a Storage backend. +type QueueType string + +const ( + QueueTypeInMemory QueueType = "in-memory" + QueueTypeBadger QueueType = "badger" +) + +// AllQueueTypes is used for error messages listing valid options. +var AllQueueTypes = []QueueType{QueueTypeInMemory, QueueTypeBadger} + +// QueueConfig is the user-facing spec.queue configuration. Populated from +// source plugin spec. Validated during plugin spec.Validate(). +type QueueConfig struct { + // Type of backend. Defaults to QueueTypeInMemory when unset. + Type QueueType `json:"type,omitempty"` + // Path is the directory for the Badger backend. Required when Type=badger. + Path string `json:"path,omitempty"` +} + +// Validate checks backend-specific required fields. +func (c *QueueConfig) Validate() error { + if c == nil { + return nil + } + switch c.Type { + case "", QueueTypeInMemory: + return nil + case QueueTypeBadger: + if c.Path == "" { + return fmt.Errorf("queue: type=%q requires path", QueueTypeBadger) + } + return nil + default: + return fmt.Errorf("queue: unknown type %q; supported: %v", c.Type, AllQueueTypes) + } +} + +// ValidateWithStrategy enforces that non-in-memory backends may only be used +// with the shuffle-queue scheduler strategy. +func (c *QueueConfig) ValidateWithStrategy(s Strategy) error { + if c == nil { + return nil + } + if err := c.Validate(); err != nil { + return err + } + if c.Type == "" || c.Type == QueueTypeInMemory { + return nil + } + if s != StrategyShuffleQueue { + return fmt.Errorf("queue: type=%q requires scheduler=shuffle-queue (got %s)", c.Type, s.String()) + } + return nil +} diff --git a/scheduler/queue_config_test.go b/scheduler/queue_config_test.go new file mode 100644 index 0000000000..aabfb97b72 --- /dev/null +++ b/scheduler/queue_config_test.go @@ -0,0 +1,47 @@ +package scheduler + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestQueueConfig_ValidateInMemoryNoPath(t *testing.T) { + cfg := &QueueConfig{Type: QueueTypeInMemory} + require.NoError(t, cfg.Validate()) +} + +func TestQueueConfig_ValidateBadgerRequiresPath(t *testing.T) { + cfg := &QueueConfig{Type: QueueTypeBadger} + err := cfg.Validate() + require.Error(t, err) + require.Contains(t, err.Error(), "path") +} + +func TestQueueConfig_ValidateUnknownType(t *testing.T) { + cfg := &QueueConfig{Type: "redis"} + err := cfg.Validate() + require.Error(t, err) + require.Contains(t, err.Error(), "redis") + require.Contains(t, err.Error(), "in-memory") + require.Contains(t, err.Error(), "badger") +} + +func TestQueueConfig_ValidateBadgerOK(t *testing.T) { + cfg := &QueueConfig{Type: QueueTypeBadger, Path: "/tmp/q"} + require.NoError(t, cfg.Validate()) +} + +func TestQueueConfig_RequiresShuffleQueueStrategy(t *testing.T) { + cfg := &QueueConfig{Type: QueueTypeBadger, Path: "/tmp/q"} + require.NoError(t, cfg.ValidateWithStrategy(StrategyShuffleQueue)) + + err := cfg.ValidateWithStrategy(StrategyDFS) + require.Error(t, err) + require.Contains(t, err.Error(), "shuffle-queue") +} + +func TestQueueConfig_InMemoryAllowsAnyStrategy(t *testing.T) { + cfg := &QueueConfig{Type: QueueTypeInMemory} + require.NoError(t, cfg.ValidateWithStrategy(StrategyDFS)) +} From 9295066c58c1c009aeda479621332377146b1d5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 17 Apr 2026 14:21:14 +0300 Subject: [PATCH 14/30] feat(scheduler): add NewStorageFromConfig factory --- scheduler/queue_factory.go | 34 ++++++++++++++++++++++++++++++++ scheduler/queue_factory_test.go | 35 +++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+) create mode 100644 scheduler/queue_factory.go create mode 100644 scheduler/queue_factory_test.go diff --git a/scheduler/queue_factory.go b/scheduler/queue_factory.go new file mode 100644 index 0000000000..4bb16251df --- /dev/null +++ b/scheduler/queue_factory.go @@ -0,0 +1,34 @@ +package scheduler + +import ( + "path/filepath" + + "github.com/cloudquery/plugin-sdk/v4/scheduler/storage" + badgerstore "github.com/cloudquery/plugin-sdk/v4/scheduler/storage/badger" + "github.com/cloudquery/plugin-sdk/v4/scheduler/storage/inmemory" +) + +// NewStorageFromConfig constructs a Storage backend from the user-facing +// QueueConfig. The seed is used by the in-memory backend for deterministic +// random-pop ordering. invocationID is appended to disk-backed paths so +// concurrent syncs of the same plugin don't collide on the same directory. +// +// A nil cfg is treated as in-memory (the default). +func NewStorageFromConfig(cfg *QueueConfig, seed int64, invocationID string) (storage.Storage, error) { + if cfg == nil { + return inmemory.New(seed), nil + } + if err := cfg.Validate(); err != nil { + return nil, err + } + switch cfg.Type { + case "", QueueTypeInMemory: + return inmemory.New(seed), nil + case QueueTypeBadger: + path := filepath.Join(cfg.Path, invocationID) + return badgerstore.Open(badgerstore.Options{Path: path}) + default: + // unreachable given Validate above, but keeps the switch exhaustive + return inmemory.New(seed), nil + } +} diff --git a/scheduler/queue_factory_test.go b/scheduler/queue_factory_test.go new file mode 100644 index 0000000000..9591a535a5 --- /dev/null +++ b/scheduler/queue_factory_test.go @@ -0,0 +1,35 @@ +package scheduler + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNewStorageFromConfig_InMemoryDefault(t *testing.T) { + s, err := NewStorageFromConfig(nil, 42, "inv-1") + require.NoError(t, err) + require.NotNil(t, s) + defer s.Close(context.Background()) +} + +func TestNewStorageFromConfig_InMemoryExplicit(t *testing.T) { + s, err := NewStorageFromConfig(&QueueConfig{Type: QueueTypeInMemory}, 42, "inv-1") + require.NoError(t, err) + require.NotNil(t, s) + defer s.Close(context.Background()) +} + +func TestNewStorageFromConfig_BadgerOpens(t *testing.T) { + dir := t.TempDir() + s, err := NewStorageFromConfig(&QueueConfig{Type: QueueTypeBadger, Path: dir}, 42, "inv-1") + require.NoError(t, err) + require.NotNil(t, s) + defer s.Close(context.Background()) +} + +func TestNewStorageFromConfig_Invalid(t *testing.T) { + _, err := NewStorageFromConfig(&QueueConfig{Type: "nope"}, 42, "inv-1") + require.Error(t, err) +} From 030d241ea5f454b73a52fc05309d71861332b594 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 17 Apr 2026 14:22:00 +0300 Subject: [PATCH 15/30] feat(scheduler): add WithStorage option --- scheduler/scheduler.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 8d17e08ffe..5b8cd798b6 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -10,6 +10,7 @@ import ( "github.com/cloudquery/plugin-sdk/v4/caser" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/scheduler/metrics" + "github.com/cloudquery/plugin-sdk/v4/scheduler/storage" "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/rs/zerolog" "github.com/samber/lo" @@ -95,6 +96,15 @@ func WithShard(num int32, total int32) SyncOption { } } +// WithStorage configures the queue backend used by the shuffle-queue +// scheduler strategy. When nil, the scheduler constructs an in-memory +// backend at Sync time. Used only when Strategy == StrategyShuffleQueue. +func WithStorage(s storage.Storage) Option { + return func(sc *Scheduler) { + sc.storage = s + } +} + type Client interface { ID() string } @@ -122,6 +132,7 @@ type Scheduler struct { batchSettings *BatchSettings invocationID string + storage storage.Storage } type shard struct { From 68f0064b0638fa96ccb524183778a57e373ceb9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 17 Apr 2026 14:28:50 +0300 Subject: [PATCH 16/30] feat(scheduler/queue): route work + resources through Storage with pin-transfer refcounts --- scheduler/queue/scheduler.go | 109 ++++++++++++-- scheduler/queue/scheduler_test.go | 33 ++++- scheduler/queue/worker.go | 232 ++++++++++++++++++++---------- 3 files changed, 278 insertions(+), 96 deletions(-) diff --git a/scheduler/queue/scheduler.go b/scheduler/queue/scheduler.go index a3f2a97dc5..f613e0d59b 100644 --- a/scheduler/queue/scheduler.go +++ b/scheduler/queue/scheduler.go @@ -6,6 +6,7 @@ import ( "github.com/cloudquery/plugin-sdk/v4/caser" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/scheduler/metrics" + "github.com/cloudquery/plugin-sdk/v4/scheduler/storage" "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/google/uuid" "github.com/rs/zerolog" @@ -33,6 +34,11 @@ type Scheduler struct { metrics *metrics.Metrics invocationID string seed int64 + + // storage holds the pluggable queue+resource backend. Required. + storage storage.Storage + // codec serializes *schema.Resource blobs for the backend. Required. + codec *Codec } type Option func(*Scheduler) @@ -61,6 +67,21 @@ func WithInvocationID(invocationID string) Option { } } +// WithStorage sets the Storage backend that holds work and parent resources. +// Required — the scheduler will no-op (with a logged error) if unset. +func WithStorage(s storage.Storage) Option { + return func(d *Scheduler) { + d.storage = s + } +} + +// WithCodec sets the Codec used to serialize resources into Storage. Required. +func WithCodec(c *Codec) Option { + return func(d *Scheduler) { + d.codec = c + } +} + func NewShuffleQueueScheduler(logger zerolog.Logger, m *metrics.Metrics, seed int64, opts ...Option) *Scheduler { scheduler := &Scheduler{ logger: logger, @@ -78,25 +99,87 @@ func NewShuffleQueueScheduler(logger zerolog.Logger, m *metrics.Metrics, seed in return scheduler } +// workerLookups holds in-process-only references to Tables and Clients so +// workers can reconstitute a (*schema.Table, schema.ClientMeta, *schema.Resource) +// from a SerializedWorkUnit. +type workerLookups struct { + tables map[string]*schema.Table + clients map[string]schema.ClientMeta +} + func (d *Scheduler) Sync(ctx context.Context, tableClients []WorkUnit, resolvedResources chan<- *schema.Resource, msgChan chan<- message.SyncMessage) { if len(tableClients) == 0 { return } - queue := NewConcurrentRandomQueue[WorkUnit](d.seed, len(tableClients)) - for _, tc := range tableClients { - queue.Push(tc) + if d.storage == nil { + d.logger.Error().Msg("queue scheduler started with nil Storage") + return + } + if d.codec == nil { + d.logger.Error().Msg("queue scheduler started with nil Codec") + return + } + + // Maintain in-memory lookup tables so workers can rehydrate. + lookups := &workerLookups{ + tables: make(map[string]*schema.Table), + clients: make(map[string]schema.ClientMeta), + } + // Walk the full table tree so relation tables (which can appear as + // ParentID on future WorkUnits) are resolvable by name. + walkTables := func(t *schema.Table) { + lookups.tables[t.Name] = t + } + for _, wu := range tableClients { + lookups.clients[wu.Client.ID()] = wu.Client + } + walk := func(tables []*schema.Table) { + var do func([]*schema.Table) + do = func(ts []*schema.Table) { + for _, t := range ts { + walkTables(t) + do(t.Relations) + } + } + do(tables) + } + // Collect root tables (dedup) and walk them. + rootTables := make([]*schema.Table, 0, len(tableClients)) + seenRoot := make(map[string]bool) + for _, wu := range tableClients { + if !seenRoot[wu.Table.Name] { + seenRoot[wu.Table.Name] = true + rootTables = append(rootTables, wu.Table) + } + } + walk(rootTables) + + // Seed: push the initial (root-level) WorkUnits. ParentID is empty — + // these have no parent resource in the KV. + seed := make([]storage.SerializedWorkUnit, 0, len(tableClients)) + for _, wu := range tableClients { + seed = append(seed, storage.SerializedWorkUnit{ + TableName: wu.Table.Name, + ClientID: wu.Client.ID(), + // ParentID: "" — top-level + }) + } + if err := d.storage.PushWorkBatch(ctx, seed); err != nil { + d.logger.Error().Err(err).Msg("failed to seed work queue") + return } - jobs := make(chan *WorkUnit) + jobs := make(chan *storage.SerializedWorkUnit) activeWorkSignal := newActiveWorkSignal() - // Worker pool workerPool, _ := errgroup.WithContext(ctx) for w := 0; w < d.workerCount; w++ { workerPool.Go(func() error { newWorker( jobs, - queue, + d.storage, + d.codec, + lookups, resolvedResources, d.logger, d.caser, @@ -109,7 +192,7 @@ func (d *Scheduler) Sync(ctx context.Context, tableClients []WorkUnit, resolvedR }) } - // Work distribution + // Work distribution — pulls from Storage, signals idle. go func() { defer close(jobs) for { @@ -117,20 +200,18 @@ func (d *Scheduler) Sync(ctx context.Context, tableClients []WorkUnit, resolvedR case <-ctx.Done(): return default: - item := queue.Pop() - - // There is work to do + item, err := d.storage.PopWork(ctx) + if err != nil { + d.logger.Error().Err(err).Msg("queue backend pop error; aborting sync") + return + } if item != nil { jobs <- item continue } - - // Queue is empty and no active work, done! if activeWorkSignal.IsIdle() { return } - - // Queue is empty and there is active work, wait for changes activeWorkSignal.Wait() } } diff --git a/scheduler/queue/scheduler_test.go b/scheduler/queue/scheduler_test.go index 8d9b71e4ec..7558e8b8a0 100644 --- a/scheduler/queue/scheduler_test.go +++ b/scheduler/queue/scheduler_test.go @@ -7,6 +7,7 @@ import ( "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/scheduler/metrics" + "github.com/cloudquery/plugin-sdk/v4/scheduler/storage/inmemory" "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/cloudquery/plugin-sdk/v4/transformers" "github.com/google/uuid" @@ -35,8 +36,18 @@ func testResolver(_ context.Context, _ schema.ClientMeta, parent *schema.Resourc if parent == nil { resources = append(resources, &Data{Name: fmt.Sprintf("test-%d", i)}) } else { - item := parent.Item.(*Data) - resources = append(resources, &Data{Name: fmt.Sprintf("%s-test-%d", item.Name, i)}) + // The codec round-trip normalizes Item to the value type, + // so support both *Data (direct in-memory) and Data (post-codec). + var name string + switch it := parent.Item.(type) { + case *Data: + name = it.Name + case Data: + name = it.Name + default: + return fmt.Errorf("unexpected parent item type %T", parent.Item) + } + resources = append(resources, &Data{Name: fmt.Sprintf("%s-test-%d", name, i)}) } } res <- resources @@ -46,7 +57,6 @@ func testResolver(_ context.Context, _ schema.ClientMeta, parent *schema.Resourc func TestScheduler(t *testing.T) { nopLogger := zerolog.Nop() m := metrics.NewMetrics() - scheduler := NewShuffleQueueScheduler(nopLogger, m, int64(0), WithWorkerCount(1000), WithInvocationID(uuid.New().String())) tableClients := []WorkUnit{ { Table: &schema.Table{ @@ -80,10 +90,27 @@ func TestScheduler(t *testing.T) { }, } + // Apply Transform on all tables so they have ItemSampleType available for the codec. + allTables := schema.Tables{} + for _, tc := range tableClients { + require.NoError(t, tc.Table.Transform(tc.Table)) + for _, rel := range tc.Table.Relations { + require.NoError(t, rel.Transform(rel)) + } + allTables = append(allTables, tc.Table) + } + for _, tc := range tableClients { m.InitWithClients(tc.Table, []schema.ClientMeta{tc.Client}) } + scheduler := NewShuffleQueueScheduler(nopLogger, m, int64(0), + WithWorkerCount(1000), + WithInvocationID(uuid.New().String()), + WithStorage(inmemory.New(0)), + WithCodec(NewCodec(allTables)), + ) + resolvedResources := make(chan *schema.Resource) msgs := make(chan message.SyncMessage, 10) go func() { diff --git a/scheduler/queue/worker.go b/scheduler/queue/worker.go index 451421c443..32a3c52516 100644 --- a/scheduler/queue/worker.go +++ b/scheduler/queue/worker.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "runtime/debug" - "sync" "time" "github.com/cloudquery/plugin-sdk/v4/caser" @@ -12,8 +11,10 @@ import ( "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/scheduler/metrics" "github.com/cloudquery/plugin-sdk/v4/scheduler/resolvers" + "github.com/cloudquery/plugin-sdk/v4/scheduler/storage" "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/getsentry/sentry-go" + "github.com/google/uuid" "github.com/rs/zerolog" "github.com/samber/lo" "go.opentelemetry.io/otel" @@ -22,8 +23,10 @@ import ( ) type worker struct { - jobs <-chan *WorkUnit - queue *ConcurrentRandomQueue[WorkUnit] + jobs <-chan *storage.SerializedWorkUnit + store storage.Storage + codec *Codec + lookups *workerLookups resolvedResources chan<- *schema.Resource logger zerolog.Logger @@ -31,23 +34,14 @@ type worker struct { invocationID string deterministicCQID bool metrics *metrics.Metrics - // message channel for sending SyncError messages - msgChan chan<- message.SyncMessage -} - -func (w *worker) work(ctx context.Context, activeWorkSignal *activeWorkSignal) { - for j := range w.jobs { - activeWorkSignal.Add() - - w.resolveTable(ctx, j.Table, j.Client, j.Parent) - - activeWorkSignal.Done() - } + msgChan chan<- message.SyncMessage } func newWorker( - jobs <-chan *WorkUnit, - queue *ConcurrentRandomQueue[WorkUnit], + jobs <-chan *storage.SerializedWorkUnit, + store storage.Storage, + codec *Codec, + lookups *workerLookups, resolvedResources chan<- *schema.Resource, logger zerolog.Logger, @@ -59,7 +53,9 @@ func newWorker( ) *worker { return &worker{ jobs: jobs, - queue: queue, + store: store, + codec: codec, + lookups: lookups, resolvedResources: resolvedResources, logger: logger, caser: c, @@ -70,7 +66,64 @@ func newWorker( } } -func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client schema.ClientMeta, parent *schema.Resource) { +func (w *worker) work(ctx context.Context, activeWorkSignal *activeWorkSignal) { + for j := range w.jobs { + activeWorkSignal.Add() + w.runJob(ctx, j) + activeWorkSignal.Done() + } +} + +// runJob processes a single SerializedWorkUnit. Guarantees: on any return +// path (success, error, panic), if j.ParentID != "" AND the unit did not +// transfer its pin to a stored intermediate, exactly one +// DecResourceRefcount call is made. +func (w *worker) runJob(ctx context.Context, j *storage.SerializedWorkUnit) { + pinTransferred := false + defer func() { + if r := recover(); r != nil { + w.logger.Error().Interface("panic", r).Str("table", j.TableName).Msg("worker panic") + } + if j.ParentID != "" && !pinTransferred { + if err := w.store.DecResourceRefcount(ctx, j.ParentID); err != nil { + w.logger.Error().Err(err).Str("parent_id", j.ParentID).Msg("failed to dec parent refcount") + } + } + }() + + table, ok := w.lookups.tables[j.TableName] + if !ok { + w.logger.Error().Str("table", j.TableName).Msg("unknown table in work unit") + return + } + client, ok := w.lookups.clients[j.ClientID] + if !ok { + w.logger.Error().Str("client", j.ClientID).Msg("unknown client in work unit") + return + } + + var parent *schema.Resource + if j.ParentID != "" { + blob, err := w.store.GetResource(ctx, j.ParentID) + if err != nil { + w.logger.Error().Err(err).Str("parent_id", j.ParentID).Msg("failed to load parent resource") + return + } + parent, _, err = w.codec.DecodeResource(blob) + if err != nil { + w.logger.Error().Err(err).Str("parent_id", j.ParentID).Msg("failed to decode parent resource") + return + } + } + + transferred := w.resolveTable(ctx, table, client, parent, j.ParentID) + pinTransferred = transferred +} + +// resolveTable resolves a single table+client+parent unit. Returns true if +// the WorkUnit's pin on j.ParentID was transferred to one or more stored +// intermediate resources (so the caller must NOT decrement). +func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, parentID string) (pinTransferred bool) { clientName := client.ID() ctx, span := otel.Tracer(metrics.ResourceName).Start(ctx, "sync.table."+table.Name, @@ -83,7 +136,7 @@ func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client s logger := w.logger.With().Str("table", table.Name).Str("client", clientName).Logger() ctx = logger.WithContext(ctx) startTime := time.Now() - if parent == nil { // Log only for root tables, otherwise we spam too much. + if parent == nil { logger.Info().Msg("top level table resolver started") } @@ -114,18 +167,15 @@ func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client s if err := table.Resolver(ctx, client, parent, res); err != nil { logger.Error().Err(err).Msg("table resolver finished with error") w.metrics.AddErrors(ctx, 1, selector) - // Send SyncError message - syncErrorMsg := &message.SyncError{ - TableName: table.Name, - Error: err.Error(), - } - w.msgChan <- syncErrorMsg + w.msgChan <- &message.SyncError{TableName: table.Name, Error: err.Error()} return } }() for r := range res { - w.resolveResource(ctx, table, client, parent, r) + if w.resolveResource(ctx, table, client, parent, parentID, r) { + pinTransferred = true + } } endTime := time.Now() @@ -133,67 +183,91 @@ func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client s if parent == nil { logger.Info().Uint64("resources", w.metrics.GetResources(selector)).Uint64("errors", w.metrics.GetErrors(selector)).Dur("duration_ms", w.metrics.GetDuration(selector)).Msg("table sync finished") } + return pinTransferred } -func (w *worker) resolveResource(ctx context.Context, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, resources any) { +// resolveResource processes one chunk of items returned by a resolver. +// Returns true if at least one stored intermediate resource was created +// with ParentID == parentID, which means the caller's pin must NOT be +// released (it's been transferred to the new intermediate). +func (w *worker) resolveResource(ctx context.Context, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, parentID string, resources any) (pinTransferred bool) { resourcesSlice := helpers.InterfaceSlice(resources) if len(resourcesSlice) == 0 { - return + return false } selector := w.metrics.NewSelector(client.ID(), table.Name) - resourcesChan := make(chan *schema.Resource, len(resourcesSlice)) - go func() { - defer close(resourcesChan) - var wg sync.WaitGroup - chunks := [][]any{resourcesSlice} - if table.PreResourceChunkResolver != nil { - chunks = lo.Chunk(resourcesSlice, table.PreResourceChunkResolver.ChunkSize) - } - for i := range chunks { - wg.Add(1) - go func() { - defer wg.Done() - resolvedResources := resolvers.ResolveResourcesChunk(ctx, w.logger, w.metrics, table, client, parent, chunks[i], w.caser) - for _, resolvedResource := range resolvedResources { - if err := resolvedResource.CalculateCQID(w.deterministicCQID); err != nil { - w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error") - w.metrics.AddErrors(ctx, 1, selector) - return - } - if err := resolvedResource.StoreCQClientID(client.ID()); err != nil { - w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id") - } - if err := resolvedResource.Validate(); err != nil { - switch err.(type) { - case *schema.PKError: - w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error") - w.metrics.AddErrors(ctx, 1, selector) - return - case *schema.PKComponentError: - w.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning") - } - } - select { - case resourcesChan <- resolvedResource: - case <-ctx.Done(): - } + chunks := [][]any{resourcesSlice} + if table.PreResourceChunkResolver != nil { + chunks = lo.Chunk(resourcesSlice, table.PreResourceChunkResolver.ChunkSize) + } + + for i := range chunks { + resolved := resolvers.ResolveResourcesChunk(ctx, w.logger, w.metrics, table, client, parent, chunks[i], w.caser) + for _, r := range resolved { + if err := r.CalculateCQID(w.deterministicCQID); err != nil { + w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error") + w.metrics.AddErrors(ctx, 1, selector) + continue + } + if err := r.StoreCQClientID(client.ID()); err != nil { + w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id") + } + if err := r.Validate(); err != nil { + switch err.(type) { + case *schema.PKError: + w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error") + w.metrics.AddErrors(ctx, 1, selector) + continue + case *schema.PKComponentError: + w.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning") } - }() - } - wg.Wait() - }() + } + + // Emit to destination pipeline. + select { + case w.resolvedResources <- r: + case <-ctx.Done(): + return pinTransferred + } - for resource := range resourcesChan { - resource := resource - w.resolvedResources <- resource - for _, r := range resource.Table.Relations { - relation := r - w.queue.Push(WorkUnit{ - Table: relation, - Client: client, - Parent: resource, - }) + // If this resource has children, store it and push WorkUnits. + if len(r.Table.Relations) > 0 { + newID := uuid.NewString() + blob, err := w.codec.EncodeResource(r, parentID) + if err != nil { + w.logger.Error().Err(err).Str("table", r.Table.Name).Msg("failed to encode resource") + w.metrics.AddErrors(ctx, 1, selector) + continue + } + if err := w.store.PutResource(ctx, newID, blob, len(r.Table.Relations)); err != nil { + w.logger.Error().Err(err).Str("table", r.Table.Name).Msg("failed to persist resource") + w.metrics.AddErrors(ctx, 1, selector) + continue + } + wus := make([]storage.SerializedWorkUnit, 0, len(r.Table.Relations)) + for _, rel := range r.Table.Relations { + wus = append(wus, storage.SerializedWorkUnit{ + TableName: rel.Name, + ClientID: client.ID(), + ParentID: newID, + }) + } + if err := w.store.PushWorkBatch(ctx, wus); err != nil { + w.logger.Error().Err(err).Msg("failed to push child work units") + w.metrics.AddErrors(ctx, 1, selector) + continue + } + + // Pin transfer: since this intermediate references parentID, + // the WorkUnit's pin must NOT be decremented on completion — + // the intermediate now owns that pin and will release it when + // its own refcount drains. + if parentID != "" && !pinTransferred { + pinTransferred = true + } + } } } + return pinTransferred } From 487fd9844dc63f10eafc4c7de4385e5f88d048f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 17 Apr 2026 14:42:34 +0300 Subject: [PATCH 17/30] fix(scheduler): cascade refcount dec through stored parent chain Co-Authored-By: Claude Opus 4.7 (1M context) --- scheduler/queue/worker.go | 51 ++++------ scheduler/storage/badger/badger.go | 93 +++++++++++++------ .../storage/badger/badger_resource_test.go | 4 +- scheduler/storage/inmemory/inmemory.go | 25 ++++- scheduler/storage/storage.go | 17 +++- scheduler/storage/storagetest/contract.go | 52 ++++++++++- 6 files changed, 172 insertions(+), 70 deletions(-) diff --git a/scheduler/queue/worker.go b/scheduler/queue/worker.go index 32a3c52516..e721c577c3 100644 --- a/scheduler/queue/worker.go +++ b/scheduler/queue/worker.go @@ -75,16 +75,16 @@ func (w *worker) work(ctx context.Context, activeWorkSignal *activeWorkSignal) { } // runJob processes a single SerializedWorkUnit. Guarantees: on any return -// path (success, error, panic), if j.ParentID != "" AND the unit did not -// transfer its pin to a stored intermediate, exactly one -// DecResourceRefcount call is made. +// path (success, error, panic), if j.ParentID != "", exactly one +// DecResourceRefcount call is made against j.ParentID. Each stored +// descendant acquires its own pin on parentID via PutResource's atomic Inc, +// and cascade-on-delete releases those pins as the descendants are freed. func (w *worker) runJob(ctx context.Context, j *storage.SerializedWorkUnit) { - pinTransferred := false defer func() { if r := recover(); r != nil { w.logger.Error().Interface("panic", r).Str("table", j.TableName).Msg("worker panic") } - if j.ParentID != "" && !pinTransferred { + if j.ParentID != "" { if err := w.store.DecResourceRefcount(ctx, j.ParentID); err != nil { w.logger.Error().Err(err).Str("parent_id", j.ParentID).Msg("failed to dec parent refcount") } @@ -116,14 +116,14 @@ func (w *worker) runJob(ctx context.Context, j *storage.SerializedWorkUnit) { } } - transferred := w.resolveTable(ctx, table, client, parent, j.ParentID) - pinTransferred = transferred + w.resolveTable(ctx, table, client, parent, j.ParentID) } -// resolveTable resolves a single table+client+parent unit. Returns true if -// the WorkUnit's pin on j.ParentID was transferred to one or more stored -// intermediate resources (so the caller must NOT decrement). -func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, parentID string) (pinTransferred bool) { +// resolveTable resolves a single table+client+parent unit. Each stored +// intermediate resource created while iterating results acquires its own +// fresh pin on parentID via PutResource's atomic Inc, so the caller (runJob) +// can always Dec exactly once on completion. +func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, parentID string) { clientName := client.ID() ctx, span := otel.Tracer(metrics.ResourceName).Start(ctx, "sync.table."+table.Name, @@ -173,9 +173,7 @@ func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client s }() for r := range res { - if w.resolveResource(ctx, table, client, parent, parentID, r) { - pinTransferred = true - } + w.resolveResource(ctx, table, client, parent, parentID, r) } endTime := time.Now() @@ -183,17 +181,17 @@ func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client s if parent == nil { logger.Info().Uint64("resources", w.metrics.GetResources(selector)).Uint64("errors", w.metrics.GetErrors(selector)).Dur("duration_ms", w.metrics.GetDuration(selector)).Msg("table sync finished") } - return pinTransferred } // resolveResource processes one chunk of items returned by a resolver. -// Returns true if at least one stored intermediate resource was created -// with ParentID == parentID, which means the caller's pin must NOT be -// released (it's been transferred to the new intermediate). -func (w *worker) resolveResource(ctx context.Context, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, parentID string, resources any) (pinTransferred bool) { +// For each resolved resource that has child relations, it stores the +// resource (acquiring a fresh pin on parentID via PutResource's atomic Inc) +// and pushes WorkUnits for the relations. Cascade-on-delete in the storage +// backend releases the parent pin when the intermediate is eventually freed. +func (w *worker) resolveResource(ctx context.Context, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, parentID string, resources any) { resourcesSlice := helpers.InterfaceSlice(resources) if len(resourcesSlice) == 0 { - return false + return } selector := w.metrics.NewSelector(client.ID(), table.Name) @@ -228,7 +226,7 @@ func (w *worker) resolveResource(ctx context.Context, table *schema.Table, clien select { case w.resolvedResources <- r: case <-ctx.Done(): - return pinTransferred + return } // If this resource has children, store it and push WorkUnits. @@ -240,7 +238,7 @@ func (w *worker) resolveResource(ctx context.Context, table *schema.Table, clien w.metrics.AddErrors(ctx, 1, selector) continue } - if err := w.store.PutResource(ctx, newID, blob, len(r.Table.Relations)); err != nil { + if err := w.store.PutResource(ctx, newID, blob, len(r.Table.Relations), parentID); err != nil { w.logger.Error().Err(err).Str("table", r.Table.Name).Msg("failed to persist resource") w.metrics.AddErrors(ctx, 1, selector) continue @@ -258,16 +256,7 @@ func (w *worker) resolveResource(ctx context.Context, table *schema.Table, clien w.metrics.AddErrors(ctx, 1, selector) continue } - - // Pin transfer: since this intermediate references parentID, - // the WorkUnit's pin must NOT be decremented on completion — - // the intermediate now owns that pin and will release it when - // its own refcount drains. - if parentID != "" && !pinTransferred { - pinTransferred = true - } } } } - return pinTransferred } diff --git a/scheduler/storage/badger/badger.go b/scheduler/storage/badger/badger.go index ec5287c9c7..2e7ab265a8 100644 --- a/scheduler/storage/badger/badger.go +++ b/scheduler/storage/badger/badger.go @@ -79,6 +79,7 @@ func workKey() []byte { type resourceEntry struct { Data []byte `json:"data"` Refcount int `json:"refcount"` + ParentID string `json:"parent_id,omitempty"` } func resourceKey(id string) []byte { @@ -158,16 +159,40 @@ func (s *Storage) WorkLen(ctx context.Context) (int, error) { return count, err } -func (s *Storage) PutResource(ctx context.Context, id string, data []byte, refcount int) error { +func (s *Storage) PutResource(ctx context.Context, id string, data []byte, refcount int, parentID string) error { if refcount < 1 { return errors.New("badger: refcount must be >= 1") } - entry := resourceEntry{Data: data, Refcount: refcount} - blob, err := json.Marshal(entry) - if err != nil { - return fmt.Errorf("badger: marshal resource: %w", err) - } - return s.db.Update(func(txn *badgerdb.Txn) error { + return s.updateWithRetry(func(txn *badgerdb.Txn) error { + if parentID != "" { + parentKey := resourceKey(parentID) + parentItem, err := txn.Get(parentKey) + if errors.Is(err, badgerdb.ErrKeyNotFound) { + return storage.ErrResourceNotFound + } + if err != nil { + return err + } + var parentEntry resourceEntry + if err := parentItem.Value(func(val []byte) error { + return json.Unmarshal(val, &parentEntry) + }); err != nil { + return fmt.Errorf("badger: unmarshal parent: %w", err) + } + parentEntry.Refcount++ + parentBlob, err := json.Marshal(parentEntry) + if err != nil { + return fmt.Errorf("badger: marshal parent: %w", err) + } + if err := txn.Set(parentKey, parentBlob); err != nil { + return err + } + } + entry := resourceEntry{Data: data, Refcount: refcount, ParentID: parentID} + blob, err := json.Marshal(entry) + if err != nil { + return fmt.Errorf("badger: marshal resource: %w", err) + } return txn.Set(resourceKey(id), blob) }) } @@ -196,30 +221,42 @@ func (s *Storage) GetResource(ctx context.Context, id string) ([]byte, error) { func (s *Storage) DecResourceRefcount(ctx context.Context, id string) error { return s.updateWithRetry(func(txn *badgerdb.Txn) error { - key := resourceKey(id) - item, err := txn.Get(key) - if errors.Is(err, badgerdb.ErrKeyNotFound) { - return storage.ErrResourceNotFound - } - if err != nil { + return decInTxn(txn, id) + }) +} + +func decInTxn(txn *badgerdb.Txn, id string) error { + key := resourceKey(id) + item, err := txn.Get(key) + if errors.Is(err, badgerdb.ErrKeyNotFound) { + return storage.ErrResourceNotFound + } + if err != nil { + return err + } + var entry resourceEntry + if err := item.Value(func(val []byte) error { + return json.Unmarshal(val, &entry) + }); err != nil { + return fmt.Errorf("badger: unmarshal resource: %w", err) + } + entry.Refcount-- + if entry.Refcount <= 0 { + if err := txn.Delete(key); err != nil { return err } - var entry resourceEntry - if err := item.Value(func(val []byte) error { - return json.Unmarshal(val, &entry) - }); err != nil { - return fmt.Errorf("badger: unmarshal resource: %w", err) - } - entry.Refcount-- - if entry.Refcount <= 0 { - return txn.Delete(key) - } - blob, err := json.Marshal(entry) - if err != nil { - return fmt.Errorf("badger: marshal resource: %w", err) + if entry.ParentID != "" { + if err := decInTxn(txn, entry.ParentID); err != nil && !errors.Is(err, storage.ErrResourceNotFound) { + return err + } } - return txn.Set(key, blob) - }) + return nil + } + blob, err := json.Marshal(entry) + if err != nil { + return fmt.Errorf("badger: marshal resource: %w", err) + } + return txn.Set(key, blob) } func (s *Storage) Close(ctx context.Context) error { diff --git a/scheduler/storage/badger/badger_resource_test.go b/scheduler/storage/badger/badger_resource_test.go index b2c47960f9..f1bf0d58bb 100644 --- a/scheduler/storage/badger/badger_resource_test.go +++ b/scheduler/storage/badger/badger_resource_test.go @@ -11,7 +11,7 @@ import ( func TestBadger_PutGetResource(t *testing.T) { s := newBadger(t) ctx := context.Background() - require.NoError(t, s.PutResource(ctx, "id-1", []byte("hello"), 1)) + require.NoError(t, s.PutResource(ctx, "id-1", []byte("hello"), 1, "")) got, err := s.GetResource(ctx, "id-1") require.NoError(t, err) @@ -21,7 +21,7 @@ func TestBadger_PutGetResource(t *testing.T) { func TestBadger_RefcountDeleteOnZero(t *testing.T) { s := newBadger(t) ctx := context.Background() - require.NoError(t, s.PutResource(ctx, "id-1", []byte("x"), 2)) + require.NoError(t, s.PutResource(ctx, "id-1", []byte("x"), 2, "")) require.NoError(t, s.DecResourceRefcount(ctx, "id-1")) _, err := s.GetResource(ctx, "id-1") diff --git a/scheduler/storage/inmemory/inmemory.go b/scheduler/storage/inmemory/inmemory.go index f8833671bc..af32a31ee4 100644 --- a/scheduler/storage/inmemory/inmemory.go +++ b/scheduler/storage/inmemory/inmemory.go @@ -22,6 +22,7 @@ type Storage struct { type resourceEntry struct { data []byte refcount int + parentID string } // New returns a Storage seeded for deterministic random-pop ordering. @@ -67,15 +68,22 @@ func (s *Storage) WorkLen(_ context.Context) (int, error) { return len(s.queue), nil } -func (s *Storage) PutResource(_ context.Context, id string, data []byte, refcount int) error { +func (s *Storage) PutResource(_ context.Context, id string, data []byte, refcount int, parentID string) error { if refcount < 1 { return errors.New("storage/inmemory: refcount must be >= 1") } s.mu.Lock() defer s.mu.Unlock() + if parentID != "" { + parent, ok := s.resources[parentID] + if !ok { + return storage.ErrResourceNotFound + } + parent.refcount++ + } cp := make([]byte, len(data)) copy(cp, data) - s.resources[id] = &resourceEntry{data: cp, refcount: refcount} + s.resources[id] = &resourceEntry{data: cp, refcount: refcount, parentID: parentID} return nil } @@ -94,13 +102,26 @@ func (s *Storage) GetResource(_ context.Context, id string) ([]byte, error) { func (s *Storage) DecResourceRefcount(_ context.Context, id string) error { s.mu.Lock() defer s.mu.Unlock() + return s.decLocked(id) +} + +func (s *Storage) decLocked(id string) error { entry, ok := s.resources[id] if !ok { return storage.ErrResourceNotFound } entry.refcount-- if entry.refcount <= 0 { + parentID := entry.parentID delete(s.resources, id) + if parentID != "" { + // Cascade: releases the stored-descendant pin on the parent. + // Swallow ErrResourceNotFound — it indicates a caller bug but + // shouldn't prevent the delete from succeeding. + if err := s.decLocked(parentID); err != nil && !errors.Is(err, storage.ErrResourceNotFound) { + return err + } + } } return nil } diff --git a/scheduler/storage/storage.go b/scheduler/storage/storage.go index 260dba9460..2da540f3f0 100644 --- a/scheduler/storage/storage.go +++ b/scheduler/storage/storage.go @@ -38,10 +38,23 @@ type Storage interface { // PutResource inserts a resource blob with an initial refcount. // refcount must be >= 1 (a resource with zero pins should never exist). - PutResource(ctx context.Context, id string, data []byte, refcount int) error + // + // parentID is "" for root resources. When parentID != "", the backend + // atomically increments parentID's refcount by 1 as part of the same + // operation that stores the new resource — this gives every stored + // descendant its own pin on its parent. If parentID doesn't exist, the + // call fails with ErrResourceNotFound and the new resource is not stored. + // + // Cascade-on-delete: when DecResourceRefcount causes deletion (refcount + // hits zero), a Dec is propagated to the stored parentID (if any), + // recursively up the chain. This releases each stored-descendant pin as + // the descendant itself is freed. + PutResource(ctx context.Context, id string, data []byte, refcount int, parentID string) error GetResource(ctx context.Context, id string) ([]byte, error) // DecResourceRefcount decrements refcount by 1 and deletes when it - // reaches zero, atomically within a single backend operation. + // reaches zero, atomically within a single backend operation. On + // deletion, a Dec is cascaded to the stored parentID (if any), + // recursively. DecResourceRefcount(ctx context.Context, id string) error Close(ctx context.Context) error diff --git a/scheduler/storage/storagetest/contract.go b/scheduler/storage/storagetest/contract.go index 076dc2a98c..25f9259153 100644 --- a/scheduler/storage/storagetest/contract.go +++ b/scheduler/storage/storagetest/contract.go @@ -31,6 +31,8 @@ func TestContract(t *testing.T, newStorage func(t *testing.T) storage.Storage) { t.Run("concurrent_push_pop_no_loss", func(t *testing.T) { testConcurrentPushPopNoLoss(t, newStorage(t)) }) t.Run("concurrent_refcount_no_double_delete", func(t *testing.T) { testConcurrentRefcountNoDoubleDelete(t, newStorage(t)) }) t.Run("close_is_idempotent", func(t *testing.T) { testCloseIsIdempotent(t, newStorage(t)) }) + t.Run("cascade_dec_deletes_ancestors", func(t *testing.T) { testCascadeDecDeletesAncestors(t, newStorage(t)) }) + t.Run("put_resource_rejects_unknown_parent", func(t *testing.T) { testPutResourceRejectsUnknownParent(t, newStorage(t)) }) } func testPushPopRoundtrip(t *testing.T, s storage.Storage) { @@ -90,7 +92,7 @@ func testWorkLen(t *testing.T, s storage.Storage) { func testResourcePutGet(t *testing.T, s storage.Storage) { ctx := context.Background() data := []byte("hello") - require.NoError(t, s.PutResource(ctx, "id-1", data, 1)) + require.NoError(t, s.PutResource(ctx, "id-1", data, 1, "")) got, err := s.GetResource(ctx, "id-1") require.NoError(t, err) @@ -99,7 +101,7 @@ func testResourcePutGet(t *testing.T, s storage.Storage) { func testRefcountDeleteOnZero(t *testing.T, s storage.Storage) { ctx := context.Background() - require.NoError(t, s.PutResource(ctx, "id-1", []byte("x"), 2)) + require.NoError(t, s.PutResource(ctx, "id-1", []byte("x"), 2, "")) // First dec: resource still exists. require.NoError(t, s.DecResourceRefcount(ctx, "id-1")) @@ -173,7 +175,7 @@ func testConcurrentPushPopNoLoss(t *testing.T, s storage.Storage) { func testConcurrentRefcountNoDoubleDelete(t *testing.T, s storage.Storage) { ctx := context.Background() const n = 100 - require.NoError(t, s.PutResource(ctx, "shared", []byte("x"), n)) + require.NoError(t, s.PutResource(ctx, "shared", []byte("x"), n, "")) var wg sync.WaitGroup errs := make(chan error, n) @@ -209,10 +211,10 @@ func testCloseIsIdempotent(t *testing.T, s storage.Storage) { func testPutResourceRejectsZeroRefcount(t *testing.T, s storage.Storage) { ctx := context.Background() - err := s.PutResource(ctx, "id", []byte("x"), 0) + err := s.PutResource(ctx, "id", []byte("x"), 0, "") require.Error(t, err, "PutResource with refcount=0 must return an error") - err = s.PutResource(ctx, "id", []byte("x"), -1) + err = s.PutResource(ctx, "id", []byte("x"), -1, "") require.Error(t, err, "PutResource with refcount<0 must return an error") _, err = s.GetResource(ctx, "id") @@ -228,3 +230,43 @@ func testPushBatchEmpty(t *testing.T, s storage.Storage) { require.NoError(t, err) require.Equal(t, 0, n) } + +func testCascadeDecDeletesAncestors(t *testing.T, s storage.Storage) { + ctx := context.Background() + // Chain: grandparent (refcount 1) ← parent (refcount 1) ← child (refcount 2) + require.NoError(t, s.PutResource(ctx, "gp", []byte("g"), 1, "")) + require.NoError(t, s.PutResource(ctx, "p", []byte("p"), 1, "gp")) + // After the above, gp.refcount should be 2 (its own 1 + 1 for p). + require.NoError(t, s.PutResource(ctx, "c", []byte("c"), 2, "p")) + // After above, p.refcount should be 2 (its own 1 + 1 for c). + + // Drain child refcount → delete c → cascade dec p → now p.refcount = 1 (still there). + require.NoError(t, s.DecResourceRefcount(ctx, "c")) + require.NoError(t, s.DecResourceRefcount(ctx, "c")) + _, err := s.GetResource(ctx, "c") + require.ErrorIs(t, err, storage.ErrResourceNotFound) + _, err = s.GetResource(ctx, "p") + require.NoError(t, err, "p should still exist with refcount 1 after c cascade") + _, err = s.GetResource(ctx, "gp") + require.NoError(t, err, "gp should still exist with refcount 2 after p cascade") + + // Drain parent → delete p → cascade dec gp → now gp.refcount = 1 (still there because of its own initial 1). + require.NoError(t, s.DecResourceRefcount(ctx, "p")) + _, err = s.GetResource(ctx, "p") + require.ErrorIs(t, err, storage.ErrResourceNotFound) + _, err = s.GetResource(ctx, "gp") + require.NoError(t, err, "gp should still exist after p cascade: its own refcount of 1 keeps it alive") + + // Drain gp's own refcount → delete gp, no further cascade (parentID==""). + require.NoError(t, s.DecResourceRefcount(ctx, "gp")) + _, err = s.GetResource(ctx, "gp") + require.ErrorIs(t, err, storage.ErrResourceNotFound) +} + +func testPutResourceRejectsUnknownParent(t *testing.T, s storage.Storage) { + ctx := context.Background() + err := s.PutResource(ctx, "child", []byte("x"), 1, "does-not-exist") + require.ErrorIs(t, err, storage.ErrResourceNotFound) + _, err = s.GetResource(ctx, "child") + require.ErrorIs(t, err, storage.ErrResourceNotFound, "child must not exist after failed Put") +} From 8d5352bf706b0cebf2ce992ad33dfb694ad51a1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 17 Apr 2026 14:49:27 +0300 Subject: [PATCH 18/30] test(scheduler/storage): add multi-intermediate fanout regression test --- scheduler/storage/storagetest/contract.go | 39 +++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/scheduler/storage/storagetest/contract.go b/scheduler/storage/storagetest/contract.go index 25f9259153..ec7d738627 100644 --- a/scheduler/storage/storagetest/contract.go +++ b/scheduler/storage/storagetest/contract.go @@ -32,6 +32,7 @@ func TestContract(t *testing.T, newStorage func(t *testing.T) storage.Storage) { t.Run("concurrent_refcount_no_double_delete", func(t *testing.T) { testConcurrentRefcountNoDoubleDelete(t, newStorage(t)) }) t.Run("close_is_idempotent", func(t *testing.T) { testCloseIsIdempotent(t, newStorage(t)) }) t.Run("cascade_dec_deletes_ancestors", func(t *testing.T) { testCascadeDecDeletesAncestors(t, newStorage(t)) }) + t.Run("cascade_dec_fanout", func(t *testing.T) { testCascadeDecFanout(t, newStorage(t)) }) t.Run("put_resource_rejects_unknown_parent", func(t *testing.T) { testPutResourceRejectsUnknownParent(t, newStorage(t)) }) } @@ -263,6 +264,44 @@ func testCascadeDecDeletesAncestors(t *testing.T, s storage.Storage) { require.ErrorIs(t, err, storage.ErrResourceNotFound) } +// testCascadeDecFanout verifies the multi-intermediate fanout case: a single +// parent has N stored children each referencing it. Each child drains +// independently, each cascade-Decs the parent exactly once. Parent is +// deleted only when its own initial refcount plus all N fanout pins drain. +// This is the exact pattern that regressed the original pin-transfer design. +func testCascadeDecFanout(t *testing.T, s storage.Storage) { + ctx := context.Background() + require.NoError(t, s.PutResource(ctx, "p", []byte("p"), 1, "")) + // p.refcount = 1 (its own) + + require.NoError(t, s.PutResource(ctx, "c1", []byte("c1"), 1, "p")) + require.NoError(t, s.PutResource(ctx, "c2", []byte("c2"), 1, "p")) + require.NoError(t, s.PutResource(ctx, "c3", []byte("c3"), 1, "p")) + // p.refcount = 4 (1 own + 1 each for c1/c2/c3) + + // Drain c1: c1 deleted → cascade Dec(p) → p.refcount = 3 (still alive). + require.NoError(t, s.DecResourceRefcount(ctx, "c1")) + _, err := s.GetResource(ctx, "c1") + require.ErrorIs(t, err, storage.ErrResourceNotFound) + _, err = s.GetResource(ctx, "p") + require.NoError(t, err, "p must still exist after c1's cascade (refcount 3)") + + // Drain c2: same, p.refcount = 2. + require.NoError(t, s.DecResourceRefcount(ctx, "c2")) + _, err = s.GetResource(ctx, "p") + require.NoError(t, err, "p must still exist after c2's cascade (refcount 2)") + + // Drain c3: c3 deleted, cascade → p.refcount = 1 (its own initial pin). + require.NoError(t, s.DecResourceRefcount(ctx, "c3")) + _, err = s.GetResource(ctx, "p") + require.NoError(t, err, "p must still exist after c3's cascade (refcount 1 from its own initial pin)") + + // Drain p's own pin: p deleted. + require.NoError(t, s.DecResourceRefcount(ctx, "p")) + _, err = s.GetResource(ctx, "p") + require.ErrorIs(t, err, storage.ErrResourceNotFound) +} + func testPutResourceRejectsUnknownParent(t *testing.T, s storage.Storage) { ctx := context.Background() err := s.PutResource(ctx, "child", []byte("x"), 1, "does-not-exist") From aad6634f6d4d71e3d48df783cec891d8d52a8982 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 17 Apr 2026 14:51:53 +0300 Subject: [PATCH 19/30] refactor(scheduler/queue): remove ConcurrentRandomQueue (superseded by Storage) --- scheduler/queue/concurrent_random_queue.go | 41 ---------------------- scheduler/storage/inmemory/inmemory.go | 3 +- 2 files changed, 1 insertion(+), 43 deletions(-) delete mode 100644 scheduler/queue/concurrent_random_queue.go diff --git a/scheduler/queue/concurrent_random_queue.go b/scheduler/queue/concurrent_random_queue.go deleted file mode 100644 index 0df4331010..0000000000 --- a/scheduler/queue/concurrent_random_queue.go +++ /dev/null @@ -1,41 +0,0 @@ -package queue - -import ( - "math/rand" - "sync" -) - -// ConcurrentRandomQueue is a generic, thread-safe queue -// that pops random elements in O(1) time. -type ConcurrentRandomQueue[T any] struct { - mu sync.Mutex - queue []T - random *rand.Rand -} - -func NewConcurrentRandomQueue[T any](seed int64, capacityHint int) *ConcurrentRandomQueue[T] { - return &ConcurrentRandomQueue[T]{queue: make([]T, 0, capacityHint), random: rand.New(rand.NewSource(seed))} -} - -func (q *ConcurrentRandomQueue[T]) Push(item T) { - q.mu.Lock() - defer q.mu.Unlock() - - q.queue = append(q.queue, item) -} - -func (q *ConcurrentRandomQueue[T]) Pop() *T { - q.mu.Lock() - defer q.mu.Unlock() - - if len(q.queue) == 0 { - return nil - } - idx := q.random.Intn(len(q.queue)) - lastIdx := len(q.queue) - 1 - q.queue[idx], q.queue[lastIdx] = q.queue[lastIdx], q.queue[idx] - item := q.queue[lastIdx] - q.queue = q.queue[:lastIdx] - - return &item -} diff --git a/scheduler/storage/inmemory/inmemory.go b/scheduler/storage/inmemory/inmemory.go index af32a31ee4..84e0ec4454 100644 --- a/scheduler/storage/inmemory/inmemory.go +++ b/scheduler/storage/inmemory/inmemory.go @@ -1,6 +1,5 @@ // Package inmemory is the default Storage backend — holds all scheduler -// state in process memory. Matches the behavior of the pre-existing -// ConcurrentRandomQueue: random-pop work semantics, atomic refcounts. +// state in process memory with random-pop work semantics and atomic refcounts. package inmemory import ( From e518dfb4c7312a6fb6da6af7de2e2282b77c564a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 17 Apr 2026 14:53:10 +0300 Subject: [PATCH 20/30] feat(scheduler): wire syncShuffleQueue to configured Storage --- scheduler/scheduler_shuffle_queue.go | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/scheduler/scheduler_shuffle_queue.go b/scheduler/scheduler_shuffle_queue.go index 0bc5867cd7..215cb146fc 100644 --- a/scheduler/scheduler_shuffle_queue.go +++ b/scheduler/scheduler_shuffle_queue.go @@ -3,13 +3,12 @@ package scheduler import ( "context" - "github.com/cloudquery/plugin-sdk/v4/scheduler/queue" "github.com/cloudquery/plugin-sdk/v4/schema" + "github.com/cloudquery/plugin-sdk/v4/scheduler/queue" + "github.com/cloudquery/plugin-sdk/v4/scheduler/storage/inmemory" ) func (s *syncClient) syncShuffleQueue(ctx context.Context, resolvedResources chan<- *schema.Resource) { - // we have this because plugins can return sometimes clients in a random way which will cause - // differences between this run and the next one. preInitialisedClients := make([][]schema.ClientMeta, len(s.tables)) tableNames := make([]string, len(s.tables)) for i, table := range s.tables { @@ -19,8 +18,6 @@ func (s *syncClient) syncShuffleQueue(ctx context.Context, resolvedResources cha clients = table.Multiplex(s.client) } preInitialisedClients[i] = clients - // we do this here to avoid locks so we initial the metrics structure once in the main goroutines - // and then we can just read from it in the other goroutines concurrently given we are not writing to it. s.metrics.InitWithClients(table, clients) } @@ -29,6 +26,22 @@ func (s *syncClient) syncShuffleQueue(ctx context.Context, resolvedResources cha seed := hashTableNames(tableNames) shuffle(tableClients, seed) + // Storage: use the scheduler-provided storage, or construct an in-memory + // default if none was configured. This preserves backward compatibility — + // users not setting spec.queue get the same random-pop in-memory queue as + // before. + store := s.scheduler.storage + if store == nil { + store = inmemory.New(seed) + defer func() { + if err := store.Close(ctx); err != nil { + s.logger.Warn().Err(err).Msg("failed to close in-memory storage") + } + }() + } + + codec := queue.NewCodec(s.tables.FlattenTables()) + scheduler := queue.NewShuffleQueueScheduler( s.logger, s.metrics, @@ -37,6 +50,8 @@ func (s *syncClient) syncShuffleQueue(ctx context.Context, resolvedResources cha queue.WithCaser(s.scheduler.caser), queue.WithDeterministicCQID(s.deterministicCQID), queue.WithInvocationID(s.invocationID), + queue.WithStorage(store), + queue.WithCodec(codec), ) queueClients := make([]queue.WorkUnit, 0, len(tableClients)) for _, tc := range tableClients { From 9e5e5759ade7ddc35a137a905f80f7695f5a393b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 17 Apr 2026 14:56:51 +0300 Subject: [PATCH 21/30] test(scheduler): fix test_table_relation_success fixture for itemSample contract --- scheduler/scheduler_test.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go index cc9f325b67..f71eb81d84 100644 --- a/scheduler/scheduler_test.go +++ b/scheduler/scheduler_test.go @@ -172,7 +172,7 @@ func testTableColumnResolverPanic() *schema.Table { } func testTableRelationSuccess() *schema.Table { - return &schema.Table{ + t := &schema.Table{ Name: "test_table_relation_success", Resolver: testResolverSuccess, Columns: []schema.Column{ @@ -185,6 +185,13 @@ func testTableRelationSuccess() *schema.Table { testTableSuccess(), }, } + // The shuffle-queue strategy spills parent resources to external storage + // and reconstructs them via Codec (JSON round-trip). The codec requires + // the parent table to declare its Item type so it can decode back to the + // concrete Go type. testResolverSuccess emits map[string]any, so we set + // the sample to that type. + t.SetItemSample(map[string]any{}) + return t } const chunkSize = 200 From b3de801449e772a327e1f034fe5e84a8d4381278 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 17 Apr 2026 15:00:24 +0300 Subject: [PATCH 22/30] test(scheduler/queue): in-memory vs badger equivalence test --- scheduler/queue/e2e_test.go | 105 ++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) create mode 100644 scheduler/queue/e2e_test.go diff --git a/scheduler/queue/e2e_test.go b/scheduler/queue/e2e_test.go new file mode 100644 index 0000000000..668918071f --- /dev/null +++ b/scheduler/queue/e2e_test.go @@ -0,0 +1,105 @@ +package queue_test + +import ( + "context" + "testing" + + "github.com/cloudquery/plugin-sdk/v4/message" + "github.com/cloudquery/plugin-sdk/v4/scheduler" + "github.com/cloudquery/plugin-sdk/v4/schema" + "github.com/cloudquery/plugin-sdk/v4/transformers" + "github.com/stretchr/testify/require" +) + +// e2eClient is a minimal ClientMeta for the equivalence test. +type e2eClient struct{} + +func (e2eClient) ID() string { return "client-1" } + +type rootItem struct{ ID string } +type childItem struct { + ID string + ParentID string +} + +func buildE2ETables() schema.Tables { + childTbl := &schema.Table{ + Name: "children", + Resolver: func(ctx context.Context, _ schema.ClientMeta, parent *schema.Resource, res chan<- any) error { + p, ok := parent.Item.(rootItem) + if !ok { + if pp, ok2 := parent.Item.(*rootItem); ok2 { + p = *pp + } else { + return nil + } + } + res <- []any{childItem{ID: "c1-" + p.ID, ParentID: p.ID}, childItem{ID: "c2-" + p.ID, ParentID: p.ID}} + return nil + }, + Transform: transformers.TransformWithStruct(&childItem{}), + } + rootTbl := &schema.Table{ + Name: "roots", + Resolver: func(ctx context.Context, _ schema.ClientMeta, _ *schema.Resource, res chan<- any) error { + res <- []any{rootItem{ID: "r1"}, rootItem{ID: "r2"}} + return nil + }, + Transform: transformers.TransformWithStruct(&rootItem{}), + Relations: []*schema.Table{childTbl}, + } + return schema.Tables{rootTbl} +} + +func runSync(t *testing.T, cfg *scheduler.QueueConfig) []message.SyncMessage { + t.Helper() + tables := buildE2ETables() + for _, tbl := range tables.FlattenTables() { + if tbl.Transform != nil { + require.NoError(t, tbl.Transform(tbl)) + } + } + + opts := []scheduler.Option{ + scheduler.WithStrategy(scheduler.StrategyShuffleQueue), + scheduler.WithConcurrency(100), + } + if cfg != nil { + store, err := scheduler.NewStorageFromConfig(cfg, 1, "inv-1") + require.NoError(t, err) + opts = append(opts, scheduler.WithStorage(store)) + } + + s := scheduler.NewScheduler(opts...) + msgs, err := s.SyncAll(context.Background(), e2eClient{}, tables) + require.NoError(t, err) + return msgs +} + +func TestE2E_InMemoryVsBadger_Equivalent(t *testing.T) { + inMemMsgs := runSync(t, nil) // default = in-memory + badgerDir := t.TempDir() + badgerMsgs := runSync(t, &scheduler.QueueConfig{ + Type: scheduler.QueueTypeBadger, + Path: badgerDir, + }) + + // Count inserts per table. Exact ordering may differ, but totals should match. + countInserts := func(ms []message.SyncMessage) int { + n := 0 + for _, m := range ms { + if _, ok := m.(*message.SyncInsert); ok { + n++ + } + } + return n + } + inMemN := countInserts(inMemMsgs) + badgerN := countInserts(badgerMsgs) + require.Equal(t, inMemN, badgerN, "in-memory vs badger insert count should match: in-memory=%d badger=%d", inMemN, badgerN) + + // Expected: 2 roots + (2 * 2) = 6 resources → some number of SyncInsert batches. + // Minimum 1 message for each table's resources (ensure nonzero). + require.Greater(t, inMemN, 0, "in-memory produced zero inserts") + require.Greater(t, badgerN, 0, "badger produced zero inserts") +} From 24f3d0d859a911a88e93dd93d1c9be2e3e64379a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 17 Apr 2026 15:02:12 +0300 Subject: [PATCH 23/30] test(scheduler/queue): transform tables in-place + assert row equivalence --- scheduler/queue/e2e_test.go | 39 +++++++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/scheduler/queue/e2e_test.go b/scheduler/queue/e2e_test.go index 668918071f..ba660ba5d8 100644 --- a/scheduler/queue/e2e_test.go +++ b/scheduler/queue/e2e_test.go @@ -54,11 +54,20 @@ func buildE2ETables() schema.Tables { func runSync(t *testing.T, cfg *scheduler.QueueConfig) []message.SyncMessage { t.Helper() tables := buildE2ETables() - for _, tbl := range tables.FlattenTables() { - if tbl.Transform != nil { - require.NoError(t, tbl.Transform(tbl)) + + // Apply transforms IN PLACE on the actual table tree (not copies from + // FlattenTables). Transforms populate columns; if we transform copies + // the originals stay column-less and inserts produce zero rows. + var applyTransforms func([]*schema.Table) + applyTransforms = func(ts []*schema.Table) { + for _, tbl := range ts { + if tbl.Transform != nil { + require.NoError(t, tbl.Transform(tbl)) + } + applyTransforms(tbl.Relations) } } + applyTransforms(tables) opts := []scheduler.Option{ scheduler.WithStrategy(scheduler.StrategyShuffleQueue), @@ -84,22 +93,22 @@ func TestE2E_InMemoryVsBadger_Equivalent(t *testing.T) { Path: badgerDir, }) - // Count inserts per table. Exact ordering may differ, but totals should match. - countInserts := func(ms []message.SyncMessage) int { - n := 0 + // Count ROWS across all SyncInsert messages, not just message count. + // Batching differences between backends may produce different message + // counts for the same logical data — row count is the true equivalence. + countRows := func(ms []message.SyncMessage) int64 { + var n int64 for _, m := range ms { - if _, ok := m.(*message.SyncInsert); ok { - n++ + if im, ok := m.(*message.SyncInsert); ok { + n += im.Record.NumRows() } } return n } - inMemN := countInserts(inMemMsgs) - badgerN := countInserts(badgerMsgs) - require.Equal(t, inMemN, badgerN, "in-memory vs badger insert count should match: in-memory=%d badger=%d", inMemN, badgerN) + inMemRows := countRows(inMemMsgs) + badgerRows := countRows(badgerMsgs) - // Expected: 2 roots + (2 * 2) = 6 resources → some number of SyncInsert batches. - // Minimum 1 message for each table's resources (ensure nonzero). - require.Greater(t, inMemN, 0, "in-memory produced zero inserts") - require.Greater(t, badgerN, 0, "badger produced zero inserts") + // Expected: 2 roots + (2 roots * 2 children each) = 6 rows total. + require.Equal(t, int64(6), inMemRows, "in-memory row count should match expected fixture output") + require.Equal(t, inMemRows, badgerRows, "badger row count should match in-memory: in-memory=%d badger=%d", inMemRows, badgerRows) } From 7301f5f261d49ccdc03b39453cf8f81864890110 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 17 Apr 2026 15:04:33 +0300 Subject: [PATCH 24/30] bench(scheduler): memory regression comparison in-memory vs badger --- scheduler/queue_memory_bench_test.go | 109 +++++++++++++++++++++++++++ 1 file changed, 109 insertions(+) create mode 100644 scheduler/queue_memory_bench_test.go diff --git a/scheduler/queue_memory_bench_test.go b/scheduler/queue_memory_bench_test.go new file mode 100644 index 0000000000..4e7bbb826a --- /dev/null +++ b/scheduler/queue_memory_bench_test.go @@ -0,0 +1,109 @@ +package scheduler + +import ( + "context" + "runtime" + "testing" + + "github.com/cloudquery/plugin-sdk/v4/schema" + "github.com/cloudquery/plugin-sdk/v4/transformers" +) + +type benchClient struct{} + +func (benchClient) ID() string { return "c" } + +type benchItem struct { + ID string + Payload [1024]byte // make Items sizeable so memory differences are visible +} + +func buildDeepTables(rootFanout int) schema.Tables { + deep := &schema.Table{ + Name: "deep", + Resolver: func(ctx context.Context, _ schema.ClientMeta, p *schema.Resource, res chan<- any) error { + items := make([]any, 10) + for i := range items { + items[i] = benchItem{ID: "d"} + } + res <- items + return nil + }, + Transform: transformers.TransformWithStruct(&benchItem{}), + } + mid := &schema.Table{ + Name: "mid", + Resolver: func(ctx context.Context, _ schema.ClientMeta, p *schema.Resource, res chan<- any) error { + items := make([]any, 10) + for i := range items { + items[i] = benchItem{ID: "m"} + } + res <- items + return nil + }, + Transform: transformers.TransformWithStruct(&benchItem{}), + Relations: []*schema.Table{deep}, + } + root := &schema.Table{ + Name: "root", + Resolver: func(ctx context.Context, _ schema.ClientMeta, p *schema.Resource, res chan<- any) error { + items := make([]any, rootFanout) + for i := range items { + items[i] = benchItem{ID: "r"} + } + res <- items + return nil + }, + Transform: transformers.TransformWithStruct(&benchItem{}), + Relations: []*schema.Table{mid}, + } + tables := schema.Tables{root} + var apply func([]*schema.Table) + apply = func(ts []*schema.Table) { + for _, t := range ts { + if t.Transform != nil { + _ = t.Transform(t) + } + apply(t.Relations) + } + } + apply(tables) + return tables +} + +func peakHeapMB() uint64 { + runtime.GC() + var m runtime.MemStats + runtime.ReadMemStats(&m) + return m.HeapInuse / (1024 * 1024) +} + +func BenchmarkQueue_InMemory(b *testing.B) { + tables := buildDeepTables(50) // 50 × 10 × 10 = 5000 deep items + for i := 0; i < b.N; i++ { + s := NewScheduler(WithStrategy(StrategyShuffleQueue)) + _, err := s.SyncAll(context.Background(), benchClient{}, tables) + if err != nil { + b.Fatal(err) + } + b.ReportMetric(float64(peakHeapMB()), "peakheap_mb") + } +} + +func BenchmarkQueue_Badger(b *testing.B) { + tables := buildDeepTables(50) + for i := 0; i < b.N; i++ { + dir := b.TempDir() + store, err := NewStorageFromConfig(&QueueConfig{Type: QueueTypeBadger, Path: dir}, 1, "inv-1") + if err != nil { + b.Fatal(err) + } + s := NewScheduler(WithStrategy(StrategyShuffleQueue), WithStorage(store)) + _, err = s.SyncAll(context.Background(), benchClient{}, tables) + if err != nil { + b.Fatal(err) + } + _ = store.Close(context.Background()) + b.ReportMetric(float64(peakHeapMB()), "peakheap_mb") + } +} From 7634626eb2e5d434d95f868a1166616cd408f890 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 17 Apr 2026 15:05:35 +0300 Subject: [PATCH 25/30] feat(scheduler): startup validation of itemSample for queue backends --- scheduler/queue_startup_validate.go | 32 +++++++++++++++ scheduler/queue_startup_validate_test.go | 50 ++++++++++++++++++++++++ 2 files changed, 82 insertions(+) create mode 100644 scheduler/queue_startup_validate.go create mode 100644 scheduler/queue_startup_validate_test.go diff --git a/scheduler/queue_startup_validate.go b/scheduler/queue_startup_validate.go new file mode 100644 index 0000000000..7f1ad20412 --- /dev/null +++ b/scheduler/queue_startup_validate.go @@ -0,0 +1,32 @@ +package scheduler + +import ( + "fmt" + + "github.com/cloudquery/plugin-sdk/v4/schema" +) + +// ValidateTablesForQueue verifies every relation-having table has an +// itemSample registered (via TransformWithStruct or SetItemSample). Only +// checked when cfg selects a non-in-memory backend. +func ValidateTablesForQueue(tables schema.Tables, cfg *QueueConfig) error { + if cfg == nil || cfg.Type == "" || cfg.Type == QueueTypeInMemory { + return nil + } + var walk func(ts []*schema.Table) error + walk = func(ts []*schema.Table) error { + for _, t := range ts { + if len(t.Relations) == 0 { + continue + } + if t.ItemSampleType() == nil { + return fmt.Errorf("queue: table %q has relations but no itemSample; ensure it calls transformers.TransformWithStruct or Table.SetItemSample", t.Name) + } + if err := walk(t.Relations); err != nil { + return err + } + } + return nil + } + return walk(tables) +} diff --git a/scheduler/queue_startup_validate_test.go b/scheduler/queue_startup_validate_test.go new file mode 100644 index 0000000000..bef2661fae --- /dev/null +++ b/scheduler/queue_startup_validate_test.go @@ -0,0 +1,50 @@ +package scheduler + +import ( + "testing" + + "github.com/cloudquery/plugin-sdk/v4/schema" + "github.com/cloudquery/plugin-sdk/v4/transformers" + "github.com/stretchr/testify/require" +) + +type goodItem struct{ ID string } +type childOfGood struct{ ID string } + +func TestValidateTablesForQueue_InMemorySkipsCheck(t *testing.T) { + tables := schema.Tables{{ + Name: "no-sample", + Relations: []*schema.Table{{Name: "child"}}, + }} + require.NoError(t, ValidateTablesForQueue(tables, nil)) +} + +func TestValidateTablesForQueue_BadgerRequiresItemSample(t *testing.T) { + tbl := &schema.Table{ + Name: "no-sample", + Relations: []*schema.Table{{Name: "child"}}, + } + tables := schema.Tables{tbl} + err := ValidateTablesForQueue(tables, &QueueConfig{Type: QueueTypeBadger, Path: "/tmp"}) + require.Error(t, err) + require.Contains(t, err.Error(), "no-sample") + require.Contains(t, err.Error(), "itemSample") +} + +func TestValidateTablesForQueue_BadgerWithItemSamplePasses(t *testing.T) { + child := &schema.Table{Name: "child", Transform: transformers.TransformWithStruct(&childOfGood{})} + root := &schema.Table{ + Name: "root", + Transform: transformers.TransformWithStruct(&goodItem{}), + Relations: []*schema.Table{child}, + } + require.NoError(t, root.Transform(root)) + require.NoError(t, child.Transform(child)) + require.NoError(t, ValidateTablesForQueue(schema.Tables{root}, &QueueConfig{Type: QueueTypeBadger, Path: "/tmp"})) +} + +func TestValidateTablesForQueue_LeafWithoutItemSampleOK(t *testing.T) { + // A leaf table (no Relations) doesn't need itemSample. + tables := schema.Tables{{Name: "leaf"}} + require.NoError(t, ValidateTablesForQueue(tables, &QueueConfig{Type: QueueTypeBadger, Path: "/tmp"})) +} From 8d834c37f70748b3c3fe2111e76f820cd308c0b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 17 Apr 2026 15:07:12 +0300 Subject: [PATCH 26/30] docs: external queue storage guide --- docs/external-queue.md | 49 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 docs/external-queue.md diff --git a/docs/external-queue.md b/docs/external-queue.md new file mode 100644 index 0000000000..bb4b41f57a --- /dev/null +++ b/docs/external-queue.md @@ -0,0 +1,49 @@ +# External Queue Storage (Preview) + +The SDK supports offloading the scheduler's work state to an external queue backend. This is useful for large syncs that would otherwise exhaust RAM — queued work units and parent resources can be spilled to local disk. + +## When to use it + +Use an external queue when: +- Your sync OOMs or runs near memory limits on its host. +- You're using the `shuffle-queue` scheduler strategy. + +Do NOT use it when: +- You're using `dfs`, `round-robin`, or `shuffle` strategies — those cannot use an external queue (they'd need execution-model rewrites). +- Your sync completes well within memory budgets — the backend adds disk I/O overhead. + +## Configuration + +Add a `queue` block alongside `scheduler` in your source plugin spec: + +```yaml +spec: + scheduler: shuffle-queue + queue: + type: badger + path: /var/lib/cq/queue +``` + +### Backends + +| `type` | Description | +|---|---| +| `in-memory` (default) | Current behavior. Everything held in process memory. | +| `badger` | Embedded BadgerDB on local disk. Required: `path`. | + +## Requirements for plugin authors + +Plugins must use `transformers.TransformWithStruct` for tables that have relations. This is already universal — no changes needed. If a plugin has a custom `Transform`, call `table.SetItemSample(yourItemType{})` explicitly; otherwise the sync will fail fast at startup with a clear error. + +## Caveats + +- **No crash recovery.** A crashed sync's queue state is discarded; restart from scratch. +- **No encryption at rest.** Items are stored as plain JSON. Do not configure a disk backend on shared filesystems without filesystem-level encryption if you sync sensitive data. +- **Per-invocation path isolation.** The actual Badger directory is `{queue.path}/{invocation_id}`, so multiple concurrent syncs of the same plugin don't collide. Stale directories from crashed syncs are left for the user to clean up. +- **No remote backends in v1.** Redis/Postgres/SQS are deferred. The contract-test architecture makes adding them a small project. + +## Troubleshooting + +- `queue backend already locked by another process` → a prior sync or an orphaned Badger instance holds the directory lock. Remove the stale directory or pass a fresh `queue.path`. +- `queue: table "X" has relations but no itemSample` → the plugin has a table with relations that doesn't use `TransformWithStruct`. Add `table.SetItemSample(XItemType{})` in the table definition. +- Disk fills up mid-sync → v1 doesn't bound disk usage. Point `queue.path` at a volume with generous headroom; capacity ≈ number-of-resources × relation-depth × average-resource-size. From d55b90f912d5a5c55abd1478309806a40da2bc7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Tue, 28 Apr 2026 14:18:28 +0300 Subject: [PATCH 27/30] fix(scheduler/queue): walk ancestor chain when decoding parent resources --- scheduler/queue/codec.go | 49 ++++++++++++++++++++++++++++++++--- scheduler/queue/codec_test.go | 39 ++++++++++++++++++++++++++++ scheduler/queue/worker.go | 8 +++++- 3 files changed, 91 insertions(+), 5 deletions(-) diff --git a/scheduler/queue/codec.go b/scheduler/queue/codec.go index c721187205..0470cd58c4 100644 --- a/scheduler/queue/codec.go +++ b/scheduler/queue/codec.go @@ -58,11 +58,52 @@ func (c *Codec) EncodeResource(r *schema.Resource, parentID string) ([]byte, err }) } -// DecodeResource reconstructs a *schema.Resource from bytes. Returns the -// resource, the parentID it references (for callers that want to chain-load -// ancestors), and any error. The returned Resource has Parent=nil; callers -// wanting the ancestor chain attached should use a higher-level helper. +// DecodeResource reconstructs a *schema.Resource from bytes with Parent=nil. +// Prefer DecodeResourceWithChain when the caller has access to Storage so +// the ancestor chain can be rebuilt (needed for plugins doing parent.Parent +// access). func (c *Codec) DecodeResource(data []byte) (*schema.Resource, string, error) { + return c.decodeOne(data) +} + +// Fetcher loads a serialized resource blob by ID. Typically backed by +// Storage.GetResource. Pass nil to DecodeResourceWithChain to skip chain +// walking (equivalent to DecodeResource). +type Fetcher func(id string) ([]byte, error) + +// DecodeResourceWithChain reconstructs a *schema.Resource AND rebuilds the +// ancestor chain via the fetcher. Walks up to maxDepth levels; returns an +// error if the chain exceeds that depth (misconfigured plugin or cycle). +func (c *Codec) DecodeResourceWithChain(data []byte, fetch Fetcher, maxDepth int) (*schema.Resource, string, error) { + res, parentID, err := c.decodeOne(data) + if err != nil { + return nil, "", err + } + if fetch == nil || parentID == "" || maxDepth <= 0 { + return res, parentID, nil + } + current := res + currentParentID := parentID + for depth := 0; currentParentID != "" && depth < maxDepth; depth++ { + blob, err := fetch(currentParentID) + if err != nil { + return nil, "", fmt.Errorf("codec: fetch ancestor %q at depth %d: %w", currentParentID, depth, err) + } + ancestor, nextParentID, err := c.decodeOne(blob) + if err != nil { + return nil, "", fmt.Errorf("codec: decode ancestor %q at depth %d: %w", currentParentID, depth, err) + } + current.Parent = ancestor + current = ancestor + currentParentID = nextParentID + } + if currentParentID != "" { + return nil, "", fmt.Errorf("codec: ancestor chain exceeded maxDepth=%d", maxDepth) + } + return res, parentID, nil +} + +func (c *Codec) decodeOne(data []byte) (*schema.Resource, string, error) { var sr serializedResource if err := json.Unmarshal(data, &sr); err != nil { return nil, "", fmt.Errorf("codec: unmarshal envelope: %w", err) diff --git a/scheduler/queue/codec_test.go b/scheduler/queue/codec_test.go index 5612f17d2a..10cccdfe6e 100644 --- a/scheduler/queue/codec_test.go +++ b/scheduler/queue/codec_test.go @@ -1,6 +1,7 @@ package queue import ( + "fmt" "testing" "github.com/apache/arrow-go/v18/arrow" @@ -84,3 +85,41 @@ func TestCodec_DecodeUnknownTableErrors(t *testing.T) { require.Error(t, err) require.Contains(t, err.Error(), "unknown_table") } + +func TestCodec_DecodeResourceWithChain(t *testing.T) { + parentTbl := &schema.Table{ + Name: "parent_tbl", + Transform: transformers.TransformWithStruct(&codecTestItem{}), + } + childTbl := &schema.Table{ + Name: "child_tbl", + Transform: transformers.TransformWithStruct(&codecTestItem{}), + } + require.NoError(t, parentTbl.Transform(parentTbl)) + require.NoError(t, childTbl.Transform(childTbl)) + + c := NewCodec(schema.Tables{parentTbl, childTbl}) + + parentBytes, err := c.EncodeResource(schema.NewResourceData(parentTbl, nil, codecTestItem{ID: "p"}), "") + require.NoError(t, err) + childBytes, err := c.EncodeResource(schema.NewResourceData(childTbl, nil, codecTestItem{ID: "c"}), "parent-id") + require.NoError(t, err) + + blobs := map[string][]byte{"parent-id": parentBytes} + fetch := func(id string) ([]byte, error) { + b, ok := blobs[id] + if !ok { + return nil, fmt.Errorf("not found: %s", id) + } + return b, nil + } + + decoded, parentID, err := c.DecodeResourceWithChain(childBytes, fetch, 4) + require.NoError(t, err) + require.Equal(t, "parent-id", parentID) + require.Equal(t, "child_tbl", decoded.Table.Name) + require.NotNil(t, decoded.Parent, "chain walk should attach parent") + require.Equal(t, "parent_tbl", decoded.Parent.Table.Name) + require.Equal(t, codecTestItem{ID: "p"}, decoded.Parent.Item) + require.Nil(t, decoded.Parent.Parent, "no grandparent in this test") +} diff --git a/scheduler/queue/worker.go b/scheduler/queue/worker.go index e721c577c3..43f9c2eabb 100644 --- a/scheduler/queue/worker.go +++ b/scheduler/queue/worker.go @@ -109,7 +109,13 @@ func (w *worker) runJob(ctx context.Context, j *storage.SerializedWorkUnit) { w.logger.Error().Err(err).Str("parent_id", j.ParentID).Msg("failed to load parent resource") return } - parent, _, err = w.codec.DecodeResource(blob) + fetch := func(id string) ([]byte, error) { + return w.store.GetResource(ctx, id) + } + // maxAncestorDepth matches scheduler.DefaultMaxDepth (4); hard-coded + // here to avoid importing the parent scheduler package. + const maxAncestorDepth = 4 + parent, _, err = w.codec.DecodeResourceWithChain(blob, fetch, maxAncestorDepth) if err != nil { w.logger.Error().Err(err).Str("parent_id", j.ParentID).Msg("failed to decode parent resource") return From fef8814da2a7f03821dcb63d4c30af46dfba4fcf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Tue, 28 Apr 2026 14:18:57 +0300 Subject: [PATCH 28/30] fix(scheduler/queue): drain intermediate refcount if PushWorkBatch fails --- scheduler/queue/worker.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/scheduler/queue/worker.go b/scheduler/queue/worker.go index 43f9c2eabb..014bc1f4a0 100644 --- a/scheduler/queue/worker.go +++ b/scheduler/queue/worker.go @@ -260,6 +260,15 @@ func (w *worker) resolveResource(ctx context.Context, table *schema.Table, clien if err := w.store.PushWorkBatch(ctx, wus); err != nil { w.logger.Error().Err(err).Msg("failed to push child work units") w.metrics.AddErrors(ctx, 1, selector) + // PutResource succeeded; drain the stored intermediate's + // refcount so it doesn't leak. This triggers cascade-Dec + // to the parent chain (including our parentID pin). + for k := 0; k < len(r.Table.Relations); k++ { + if decErr := w.store.DecResourceRefcount(ctx, newID); decErr != nil { + w.logger.Error().Err(decErr).Str("id", newID).Msg("failed to drain orphaned intermediate refcount") + break + } + } continue } } From 9c19ca26ff66e8f6e2179197b75fef4866db0861 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Tue, 28 Apr 2026 14:19:48 +0300 Subject: [PATCH 29/30] fix(scheduler/queue): restore chunk-parallel resource resolution --- scheduler/queue/worker.go | 120 ++++++++++++++++++++------------------ 1 file changed, 64 insertions(+), 56 deletions(-) diff --git a/scheduler/queue/worker.go b/scheduler/queue/worker.go index 014bc1f4a0..092c5de750 100644 --- a/scheduler/queue/worker.go +++ b/scheduler/queue/worker.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "runtime/debug" + "sync" "time" "github.com/cloudquery/plugin-sdk/v4/caser" @@ -206,72 +207,79 @@ func (w *worker) resolveResource(ctx context.Context, table *schema.Table, clien chunks = lo.Chunk(resourcesSlice, table.PreResourceChunkResolver.ChunkSize) } + var wg sync.WaitGroup for i := range chunks { - resolved := resolvers.ResolveResourcesChunk(ctx, w.logger, w.metrics, table, client, parent, chunks[i], w.caser) - for _, r := range resolved { - if err := r.CalculateCQID(w.deterministicCQID); err != nil { - w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error") - w.metrics.AddErrors(ctx, 1, selector) - continue - } - if err := r.StoreCQClientID(client.ID()); err != nil { - w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id") - } - if err := r.Validate(); err != nil { - switch err.(type) { - case *schema.PKError: - w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error") + wg.Add(1) + i := i + go func() { + defer wg.Done() + resolved := resolvers.ResolveResourcesChunk(ctx, w.logger, w.metrics, table, client, parent, chunks[i], w.caser) + for _, r := range resolved { + if err := r.CalculateCQID(w.deterministicCQID); err != nil { + w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error") w.metrics.AddErrors(ctx, 1, selector) continue - case *schema.PKComponentError: - w.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning") } - } - - // Emit to destination pipeline. - select { - case w.resolvedResources <- r: - case <-ctx.Done(): - return - } - - // If this resource has children, store it and push WorkUnits. - if len(r.Table.Relations) > 0 { - newID := uuid.NewString() - blob, err := w.codec.EncodeResource(r, parentID) - if err != nil { - w.logger.Error().Err(err).Str("table", r.Table.Name).Msg("failed to encode resource") - w.metrics.AddErrors(ctx, 1, selector) - continue + if err := r.StoreCQClientID(client.ID()); err != nil { + w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id") } - if err := w.store.PutResource(ctx, newID, blob, len(r.Table.Relations), parentID); err != nil { - w.logger.Error().Err(err).Str("table", r.Table.Name).Msg("failed to persist resource") - w.metrics.AddErrors(ctx, 1, selector) - continue + if err := r.Validate(); err != nil { + switch err.(type) { + case *schema.PKError: + w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error") + w.metrics.AddErrors(ctx, 1, selector) + continue + case *schema.PKComponentError: + w.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning") + } } - wus := make([]storage.SerializedWorkUnit, 0, len(r.Table.Relations)) - for _, rel := range r.Table.Relations { - wus = append(wus, storage.SerializedWorkUnit{ - TableName: rel.Name, - ClientID: client.ID(), - ParentID: newID, - }) + + // Emit to destination pipeline. + select { + case w.resolvedResources <- r: + case <-ctx.Done(): + return } - if err := w.store.PushWorkBatch(ctx, wus); err != nil { - w.logger.Error().Err(err).Msg("failed to push child work units") - w.metrics.AddErrors(ctx, 1, selector) - // PutResource succeeded; drain the stored intermediate's - // refcount so it doesn't leak. This triggers cascade-Dec - // to the parent chain (including our parentID pin). - for k := 0; k < len(r.Table.Relations); k++ { - if decErr := w.store.DecResourceRefcount(ctx, newID); decErr != nil { - w.logger.Error().Err(decErr).Str("id", newID).Msg("failed to drain orphaned intermediate refcount") - break + + // If this resource has children, store it and push WorkUnits. + if len(r.Table.Relations) > 0 { + newID := uuid.NewString() + blob, err := w.codec.EncodeResource(r, parentID) + if err != nil { + w.logger.Error().Err(err).Str("table", r.Table.Name).Msg("failed to encode resource") + w.metrics.AddErrors(ctx, 1, selector) + continue + } + if err := w.store.PutResource(ctx, newID, blob, len(r.Table.Relations), parentID); err != nil { + w.logger.Error().Err(err).Str("table", r.Table.Name).Msg("failed to persist resource") + w.metrics.AddErrors(ctx, 1, selector) + continue + } + wus := make([]storage.SerializedWorkUnit, 0, len(r.Table.Relations)) + for _, rel := range r.Table.Relations { + wus = append(wus, storage.SerializedWorkUnit{ + TableName: rel.Name, + ClientID: client.ID(), + ParentID: newID, + }) + } + if err := w.store.PushWorkBatch(ctx, wus); err != nil { + w.logger.Error().Err(err).Msg("failed to push child work units") + w.metrics.AddErrors(ctx, 1, selector) + // PutResource succeeded; drain the stored intermediate's + // refcount so it doesn't leak. This triggers cascade-Dec + // to the parent chain (including our parentID pin). + for k := 0; k < len(r.Table.Relations); k++ { + if decErr := w.store.DecResourceRefcount(ctx, newID); decErr != nil { + w.logger.Error().Err(decErr).Str("id", newID).Msg("failed to drain orphaned intermediate refcount") + break + } } + continue } - continue } } - } + }() } + wg.Wait() } From 6cd72f59710470fc29ad00606609ebae51e3ba25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Tue, 28 Apr 2026 14:20:31 +0300 Subject: [PATCH 30/30] fix(scheduler): validate tables at syncShuffleQueue startup when Storage is set --- scheduler/scheduler_shuffle_queue.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/scheduler/scheduler_shuffle_queue.go b/scheduler/scheduler_shuffle_queue.go index 215cb146fc..36d0385fa7 100644 --- a/scheduler/scheduler_shuffle_queue.go +++ b/scheduler/scheduler_shuffle_queue.go @@ -9,6 +9,16 @@ import ( ) func (s *syncClient) syncShuffleQueue(ctx context.Context, resolvedResources chan<- *schema.Resource) { + // If a non-default Storage was configured (via WithStorage), validate + // that all tables with relations have an itemSample. Skipping this check + // when storage is nil (default in-memory path) preserves backward compat. + if s.scheduler.storage != nil { + if err := ValidateTablesForQueue(s.tables, &QueueConfig{Type: QueueTypeBadger}); err != nil { + s.logger.Error().Err(err).Msg("external queue startup validation failed") + return + } + } + preInitialisedClients := make([][]schema.ClientMeta, len(s.tables)) tableNames := make([]string, len(s.tables)) for i, table := range s.tables {