From 96f04a179d3b44c8195e4b614dc3c2c582419c8a Mon Sep 17 00:00:00 2001 From: Jerome Wirsztel Date: Wed, 23 Jul 2025 10:13:56 +0200 Subject: [PATCH 1/4] Add support for DeleteRecord for SQLite --- plugins/destination/sqlite/client/client.go | 1 - .../destination/sqlite/client/client_test.go | 1 - plugins/destination/sqlite/client/delete.go | 95 ++++++++++ .../destination/sqlite/client/delete_test.go | 170 ++++++++++++++++++ 4 files changed, 265 insertions(+), 2 deletions(-) create mode 100644 plugins/destination/sqlite/client/delete.go create mode 100644 plugins/destination/sqlite/client/delete_test.go diff --git a/plugins/destination/sqlite/client/client.go b/plugins/destination/sqlite/client/client.go index 1c1846b63d5e01..4b15ed1e2a1855 100644 --- a/plugins/destination/sqlite/client/client.go +++ b/plugins/destination/sqlite/client/client.go @@ -17,7 +17,6 @@ import ( type Client struct { plugin.UnimplementedSource - batchwriter.UnimplementedDeleteRecord writer *batchwriter.BatchWriter db *sql.DB diff --git a/plugins/destination/sqlite/client/client_test.go b/plugins/destination/sqlite/client/client_test.go index 09b96b3138a85a..af9b37cd2cec69 100644 --- a/plugins/destination/sqlite/client/client_test.go +++ b/plugins/destination/sqlite/client/client_test.go @@ -24,7 +24,6 @@ func TestPlugin(t *testing.T) { plugin.TestWriterSuiteRunner(t, p, plugin.WriterTestSuiteTests{ - SkipDeleteRecord: true, SafeMigrations: plugin.SafeMigrations{ AddColumn: true, RemoveColumn: true, diff --git a/plugins/destination/sqlite/client/delete.go b/plugins/destination/sqlite/client/delete.go new file mode 100644 index 00000000000000..7966ed4164f746 --- /dev/null +++ b/plugins/destination/sqlite/client/delete.go @@ -0,0 +1,95 @@ +package client + +import ( + "context" + "fmt" + "reflect" + "strings" + + "github.com/cloudquery/cloudquery/plugins/destination/sqlite/v2/typeconv" + "github.com/cloudquery/plugin-sdk/v4/message" +) + +func (c *Client) DeleteRecord(ctx context.Context, messages message.WriteDeleteRecords) error { + for _, msg := range messages { + sql := generateDelete(msg.DeleteRecord) + + params, err := extractPredicateValues(msg.DeleteRecord.WhereClause) + if err != nil { + return err + } + + fmt.Println("Delete SQL:", sql) + fmt.Println("Params:", params) + + if _, err = c.db.ExecContext(ctx, sql, params...); err != nil { + return err + } + } + return nil +} + +func generateDelete(msg message.DeleteRecord) string { + var sb strings.Builder + + sb.WriteString("DELETE FROM ") + sb.WriteString("\"" + msg.TableName + "\"") + sb.WriteString(" WHERE ") + if len(msg.WhereClause) == 0 { + sb.WriteString("1") + } else { + for i, predicateGroup := range msg.WhereClause { + if len(predicateGroup.Predicates) == 0 { + continue + } + sb.WriteString("(") + for i, predicate := range predicateGroup.Predicates { + if i > 0 { + sb.WriteString(" ") + sb.WriteString(predicateGroup.GroupingType) + sb.WriteString(" ") + } + sb.WriteString("\"" + predicate.Column + "\"") + sb.WriteString(" = ?") + } + sb.WriteString(")") + if i < len(msg.WhereClause)-1 { + sb.WriteString(" AND ") + } + } + } + + return sb.String() +} + +func extractPredicateValues(where message.PredicateGroups) ([]any, error) { + predicateCount := 0 + for _, predicateGroup := range where { + predicateCount += len(predicateGroup.Predicates) + } + results := make([]any, predicateCount) + counter := 0 + for _, predicateGroup := range where { + for _, predicate := range predicateGroup.Predicates { + col := predicate.Record.Column(0) + primitiveValues, err := typeconv.FromArray(col) + if err != nil { + return nil, err + } + unpacked := unpackArray(primitiveValues) + results[counter] = unpacked[0] + counter++ + } + } + return results, nil +} + +func unpackArray(s any) []any { + v := reflect.ValueOf(s) + fmt.Println(v.Kind(), v.Len()) + r := make([]any, v.Len()) + for i := range v.Len() { + r[i] = v.Index(i).Interface() + } + return r +} diff --git a/plugins/destination/sqlite/client/delete_test.go b/plugins/destination/sqlite/client/delete_test.go new file mode 100644 index 00000000000000..2abfbd7fabe0cf --- /dev/null +++ b/plugins/destination/sqlite/client/delete_test.go @@ -0,0 +1,170 @@ +package client + +import ( + "context" + "encoding/json" + "fmt" + "math/rand" + "testing" + "time" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/cloudquery/plugin-sdk/v4/message" + "github.com/cloudquery/plugin-sdk/v4/plugin" + "github.com/cloudquery/plugin-sdk/v4/schema" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" +) + +func TestDelete(t *testing.T) { + testCases := []struct { + name string + insertValues []string + deleteValues []string + deleteAll bool + expectedCount int + }{ + { + name: "delete single record", + insertValues: []string{"foo", "bar"}, + deleteValues: []string{"foo"}, + expectedCount: 1, + }, + { + name: "delete both records", + insertValues: []string{"foo", "bar"}, + deleteValues: []string{"foo", "bar"}, + expectedCount: 0, + }, + { + name: "delete none", + insertValues: []string{"foo"}, + deleteValues: []string{"bar"}, + expectedCount: 1, + }, + { + name: "delete all records", + insertValues: []string{"foo", "bar"}, + deleteAll: true, + expectedCount: 0, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + r := require.New(t) + ctx := context.Background() + client := withPluginClient(ctx, r) + + table := createTestTable() + r.NoError(client.MigrateTables(ctx, message.WriteMigrateTables{{Table: table}})) + + writeInserts := createInsertMessages(tc.insertValues, table) + r.NoError(client.WriteTableBatch(ctx, "", writeInserts)) + + writeDeletes := createDeleteMessages(tc.deleteAll, table, tc.deleteValues) + r.NoError(client.DeleteRecord(ctx, writeDeletes)) + + count, err := countAllRows(ctx, client, table) + r.NoError(err) + r.EqualValues(tc.expectedCount, count, "unexpected amount of items after delete with match") + }) + } +} + +func countAllRows(ctx context.Context, client *Client, table *schema.Table) (int64, error) { + var err error + ch := make(chan arrow.Record) + go func() { + defer close(ch) + err = client.Read(ctx, table, ch) + }() + count := int64(0) + for record := range ch { + count += record.NumRows() + } + return count, err +} + +func withPluginClient(ctx context.Context, r *require.Assertions) *Client { + s := &Spec{ConnectionString: ":memory:"} + b, err := json.Marshal(s) + r.NoError(err) + c, err := New(ctx, zerolog.Nop(), b, plugin.NewClientOptions{}) + r.NoError(err) + return c.(*Client) +} + +func valueToArrowRecord(tableName string, value string) arrow.Record { + bldrDeleteMatch := array.NewRecordBuilder(memory.DefaultAllocator, (&schema.Table{ + Name: tableName, + Columns: schema.ColumnList{ + schema.Column{Name: "id", Type: arrow.BinaryTypes.String}, + }, + }).ToArrowSchema()) + bldrDeleteMatch.Field(0).(*array.StringBuilder).Append(value) + deleteValue := bldrDeleteMatch.NewRecord() + return deleteValue +} + +func createDeleteMessages(deleteAll bool, table *schema.Table, deleteValues []string) message.WriteDeleteRecords { + writeDeletes := message.WriteDeleteRecords{} + + if deleteAll { + msg := message.WriteDeleteRecord{ + DeleteRecord: message.DeleteRecord{ + TableName: table.Name, + }, + } + return append(writeDeletes, &msg) + } + for _, deleteValue := range deleteValues { + msg := message.WriteDeleteRecord{ + DeleteRecord: message.DeleteRecord{ + TableName: table.Name, + WhereClause: message.PredicateGroups{ + { + GroupingType: "AND", + Predicates: []message.Predicate{ + { + Operator: "eq", + Column: "id", + Record: valueToArrowRecord(table.Name, deleteValue), + }, + }, + }, + }, + }, + } + writeDeletes = append(writeDeletes, &msg) + } + return writeDeletes +} + +func createInsertMessages(values []string, table *schema.Table) message.WriteInserts { + const sourceName = "source-test" + writeInserts := message.WriteInserts{} + for _, insertValue := range values { + bldr := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema()) + bldr.Field(0).(*array.StringBuilder).Append(insertValue) + bldr.Field(1).(*array.StringBuilder).Append(sourceName) + bldr.Field(2).(*array.TimestampBuilder).AppendTime(time.Now()) + record := bldr.NewRecord() + writeInserts = append(writeInserts, &message.WriteInsert{Record: record}) + } + return writeInserts +} + +func createTestTable() *schema.Table { + tableName := fmt.Sprintf("cq_delete_test_%d_%04d", time.Now().UnixNano(), rand.Intn(1000)) + table := &schema.Table{ + Name: tableName, + Columns: schema.ColumnList{ + schema.Column{Name: "id", Type: arrow.BinaryTypes.String, PrimaryKey: true, NotNull: true}, + schema.CqSourceNameColumn, + schema.CqSyncTimeColumn, + }, + } + return table +} From e38f81a3e68606a566b3dd2494dfb194454fabca Mon Sep 17 00:00:00 2001 From: Jerome Wirsztel Date: Wed, 23 Jul 2025 10:24:37 +0200 Subject: [PATCH 2/4] Removed debug logs --- plugins/destination/sqlite/client/delete.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/plugins/destination/sqlite/client/delete.go b/plugins/destination/sqlite/client/delete.go index 7966ed4164f746..47561ea37d3ae4 100644 --- a/plugins/destination/sqlite/client/delete.go +++ b/plugins/destination/sqlite/client/delete.go @@ -19,9 +19,6 @@ func (c *Client) DeleteRecord(ctx context.Context, messages message.WriteDeleteR return err } - fmt.Println("Delete SQL:", sql) - fmt.Println("Params:", params) - if _, err = c.db.ExecContext(ctx, sql, params...); err != nil { return err } From 5b4502253c18ddce86ec7d795a51bf688eebb4b3 Mon Sep 17 00:00:00 2001 From: Jerome Wirsztel Date: Wed, 23 Jul 2025 10:51:54 +0200 Subject: [PATCH 3/4] Added missing folder and fixed variable shadow --- plugins/destination/sqlite/client/delete.go | 4 +- .../destination/sqlite/typeconv/primitive.go | 21 ++++++++ .../destination/sqlite/typeconv/special.go | 36 +++++++++++++ plugins/destination/sqlite/typeconv/values.go | 51 ++++++++++++++++++ .../sqlite/typeconv/values_test.go | 53 +++++++++++++++++++ 5 files changed, 163 insertions(+), 2 deletions(-) create mode 100644 plugins/destination/sqlite/typeconv/primitive.go create mode 100644 plugins/destination/sqlite/typeconv/special.go create mode 100644 plugins/destination/sqlite/typeconv/values.go create mode 100644 plugins/destination/sqlite/typeconv/values_test.go diff --git a/plugins/destination/sqlite/client/delete.go b/plugins/destination/sqlite/client/delete.go index 47561ea37d3ae4..ae091e2ce69e8a 100644 --- a/plugins/destination/sqlite/client/delete.go +++ b/plugins/destination/sqlite/client/delete.go @@ -40,8 +40,8 @@ func generateDelete(msg message.DeleteRecord) string { continue } sb.WriteString("(") - for i, predicate := range predicateGroup.Predicates { - if i > 0 { + for j, predicate := range predicateGroup.Predicates { + if j > 0 { sb.WriteString(" ") sb.WriteString(predicateGroup.GroupingType) sb.WriteString(" ") diff --git a/plugins/destination/sqlite/typeconv/primitive.go b/plugins/destination/sqlite/typeconv/primitive.go new file mode 100644 index 00000000000000..b9459a6f516930 --- /dev/null +++ b/plugins/destination/sqlite/typeconv/primitive.go @@ -0,0 +1,21 @@ +package typeconv + +import ( + "github.com/apache/arrow-go/v18/arrow" +) + +type primitive[A any] interface { + arrow.Array + Value(int) A +} + +func primitiveValue[A any, ARR primitive[A]](arr ARR) []A { + res := make([]A, arr.Len()) + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + val := arr.Value(i) + res[i] = val + } + } + return res +} diff --git a/plugins/destination/sqlite/typeconv/special.go b/plugins/destination/sqlite/typeconv/special.go new file mode 100644 index 00000000000000..dd976a99da5a67 --- /dev/null +++ b/plugins/destination/sqlite/typeconv/special.go @@ -0,0 +1,36 @@ +package typeconv + +import ( + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" +) + +func valueStrData(arr arrow.Array) []string { + res := make([]string, arr.Len()) + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + res[i] = arr.ValueStr(i) + } + } + return res +} + +func float16Value(arr *array.Float16) []float32 { + res := make([]float32, arr.Len()) + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + res[i] = arr.Value(i).Float32() + } + } + return res +} + +func byteArrValue(arr primitive[[]byte]) []string { + res := make([]string, arr.Len()) + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + res[i] = string(arr.Value(i)) + } + } + return res +} diff --git a/plugins/destination/sqlite/typeconv/values.go b/plugins/destination/sqlite/typeconv/values.go new file mode 100644 index 00000000000000..02de26b9e2a9bd --- /dev/null +++ b/plugins/destination/sqlite/typeconv/values.go @@ -0,0 +1,51 @@ +package typeconv + +import ( + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" +) + +func FromArray(arr arrow.Array) (any, error) { + switch arr := arr.(type) { + case *array.Boolean: + return primitiveValue(arr), nil + + case *array.Uint8: + return primitiveValue(arr), nil + case *array.Uint16: + return primitiveValue(arr), nil + case *array.Uint32: + return primitiveValue(arr), nil + case *array.Uint64: + return primitiveValue(arr), nil + + case *array.Int8: + return primitiveValue(arr), nil + case *array.Int16: + return primitiveValue(arr), nil + case *array.Int32: + return primitiveValue(arr), nil + case *array.Int64: + return primitiveValue(arr), nil + + case *array.Float16: + return float16Value(arr), nil + case *array.Float32: + return primitiveValue(arr), nil + case *array.Float64: + return primitiveValue(arr), nil + + case *array.String: + return primitiveValue(arr), nil + + case *array.Binary: + return byteArrValue(arr), nil + case *array.FixedSizeBinary: + return byteArrValue(arr), nil + case *array.LargeBinary: + return byteArrValue(arr), nil + + default: + return valueStrData(arr), nil + } +} diff --git a/plugins/destination/sqlite/typeconv/values_test.go b/plugins/destination/sqlite/typeconv/values_test.go new file mode 100644 index 00000000000000..24eb4cab1f7800 --- /dev/null +++ b/plugins/destination/sqlite/typeconv/values_test.go @@ -0,0 +1,53 @@ +package typeconv + +import ( + "math" + "math/rand" + "testing" + "time" + + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/cloudquery/plugin-sdk/v4/schema" + "github.com/stretchr/testify/require" +) + +func TestFromArray(t *testing.T) { + const amount = 100 + + values := make([]float64, amount) + for i := range values { + values[i] = rand.Float64()*(math.MaxFloat64-1) + rand.Float64() + } + + builder := array.NewFloat64Builder(memory.DefaultAllocator) + for _, f := range values { + builder.Append(f) + } + + data, err := FromArray(builder.NewArray()) + require.NoError(t, err) + + elems := data.([]float64) + require.Equal(t, amount, len(elems)) + for i, elem := range elems { + require.NotNil(t, elem) + require.Exactly(t, values[i], elem) + } +} + +func BenchmarkFromArray(b *testing.B) { + table := schema.TestTable("test", schema.TestSourceOptions{}) + sourceName := "test-source" + syncTime := time.Now().UTC().Round(time.Second) + opts := schema.GenTestDataOptions{ + SourceName: sourceName, + SyncTime: syncTime, + MaxRows: b.N, + } + record := schema.NewTestDataGenerator(0).Generate(table, opts) + b.ResetTimer() + for _, col := range record.Columns() { + _, _ = FromArray(col) + } +} From e108e49a4df182bb46cce81a6d9ea53f6cc01263 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Wed, 23 Jul 2025 14:34:13 +0100 Subject: [PATCH 4/4] Add ability to delete records in test plugin --- plugins/source/test/client/spec.go | 3 ++ .../source/test/resources/plugin/client.go | 43 ++++++++++++++++++- 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/plugins/source/test/client/spec.go b/plugins/source/test/client/spec.go index 0eae5333fc7726..578ca3e9cd8b43 100644 --- a/plugins/source/test/client/spec.go +++ b/plugins/source/test/client/spec.go @@ -29,6 +29,9 @@ type Spec struct { // If true, the plugin will os.Exit(1) immediately at the table resolver level, before any resources are synced ExitImmediately bool `json:"exit_immediately" jsonschema:"default=false"` + + // If true, the plugin will send a request to delete the records with the given resource_id's in test_some_table (after syncing them). + DeleteRecords []int64 `json:"delete_records" jsonschema:"default=false"` } //go:embed schema.json diff --git a/plugins/source/test/resources/plugin/client.go b/plugins/source/test/resources/plugin/client.go index d8bca557c20e48..95d968c89d3efe 100644 --- a/plugins/source/test/resources/plugin/client.go +++ b/plugins/source/test/resources/plugin/client.go @@ -7,6 +7,9 @@ import ( "os" "strings" + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" "github.com/cloudquery/cloudquery/plugins/source/test/v4/client" "github.com/cloudquery/cloudquery/plugins/source/test/v4/resources/services" "github.com/cloudquery/plugin-sdk/v4/message" @@ -70,7 +73,45 @@ func (c *Client) Sync(ctx context.Context, options plugin.SyncOptions, res chan< schedulerOptions = append(schedulerOptions, scheduler.WithShard(options.Shard.Num, options.Shard.Total)) } - return c.scheduler.Sync(ctx, schedulerClient, tt, res, schedulerOptions...) + err = c.scheduler.Sync(ctx, schedulerClient, tt, res, schedulerOptions...) + if err != nil { + return fmt.Errorf("failed to run sync: %w", err) + } + + if len(c.config.DeleteRecords) > 0 { + c.deleteRecords(res) + } + return nil +} + +func (c *Client) deleteRecords(res chan<- message.SyncMessage) { + predicates := make([]message.Predicate, 0, len(c.config.DeleteRecords)) + for _, deleteID := range c.config.DeleteRecords { + deleteRecord := array.NewRecordBuilder(memory.DefaultAllocator, (&schema.Table{ + Name: "test_some_table", + Columns: schema.ColumnList{ + schema.Column{Name: "resource_id", Type: arrow.PrimitiveTypes.Int64}, + }, + }).ToArrowSchema()) + deleteRecord.Field(0).(*array.Int64Builder).Append(deleteID) + deleteValue := deleteRecord.NewRecord() + predicates = append(predicates, message.Predicate{ + Operator: "eq", + Column: "resource_id", + Record: deleteValue, + }) + } + res <- &message.SyncDeleteRecord{ + DeleteRecord: message.DeleteRecord{ + TableName: "test_some_table", + WhereClause: message.PredicateGroups{ + { + GroupingType: "OR", + Predicates: predicates, + }, + }, + }, + } } func (c *Client) Tables(_ context.Context, options plugin.TableOptions) (schema.Tables, error) {