Skip to content

Commit e1bf276

Browse files
authored
feat: Add DeleteRecord handling to SQLite destination (#21102)
#### Summary Following up on https://community.cloudquery.io/t/incremental-tables-how-to-handle-deletion/1205/6 This PR adds support for `DeleteRecord` messages so sources can specify which items can be deleted specifically. I used a lot of the existing code present in the ClickHouse PR: https://github.com/cloudquery/cloudquery/pull/20772/files
1 parent 129b6ee commit e1bf276

8 files changed

Lines changed: 423 additions & 2 deletions

File tree

plugins/destination/sqlite/client/client.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import (
1717

1818
type Client struct {
1919
plugin.UnimplementedSource
20-
batchwriter.UnimplementedDeleteRecord
2120

2221
writer *batchwriter.BatchWriter
2322
db *sql.DB

plugins/destination/sqlite/client/client_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ func TestPlugin(t *testing.T) {
2424
plugin.TestWriterSuiteRunner(t,
2525
p,
2626
plugin.WriterTestSuiteTests{
27-
SkipDeleteRecord: true,
2827
SafeMigrations: plugin.SafeMigrations{
2928
AddColumn: true,
3029
RemoveColumn: true,
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package client
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"reflect"
7+
"strings"
8+
9+
"github.com/cloudquery/cloudquery/plugins/destination/sqlite/v2/typeconv"
10+
"github.com/cloudquery/plugin-sdk/v4/message"
11+
)
12+
13+
func (c *Client) DeleteRecord(ctx context.Context, messages message.WriteDeleteRecords) error {
14+
for _, msg := range messages {
15+
sql := generateDelete(msg.DeleteRecord)
16+
17+
params, err := extractPredicateValues(msg.DeleteRecord.WhereClause)
18+
if err != nil {
19+
return err
20+
}
21+
22+
if _, err = c.db.ExecContext(ctx, sql, params...); err != nil {
23+
return err
24+
}
25+
}
26+
return nil
27+
}
28+
29+
func generateDelete(msg message.DeleteRecord) string {
30+
var sb strings.Builder
31+
32+
sb.WriteString("DELETE FROM ")
33+
sb.WriteString("\"" + msg.TableName + "\"")
34+
sb.WriteString(" WHERE ")
35+
if len(msg.WhereClause) == 0 {
36+
sb.WriteString("1")
37+
} else {
38+
for i, predicateGroup := range msg.WhereClause {
39+
if len(predicateGroup.Predicates) == 0 {
40+
continue
41+
}
42+
sb.WriteString("(")
43+
for j, predicate := range predicateGroup.Predicates {
44+
if j > 0 {
45+
sb.WriteString(" ")
46+
sb.WriteString(predicateGroup.GroupingType)
47+
sb.WriteString(" ")
48+
}
49+
sb.WriteString("\"" + predicate.Column + "\"")
50+
sb.WriteString(" = ?")
51+
}
52+
sb.WriteString(")")
53+
if i < len(msg.WhereClause)-1 {
54+
sb.WriteString(" AND ")
55+
}
56+
}
57+
}
58+
59+
return sb.String()
60+
}
61+
62+
func extractPredicateValues(where message.PredicateGroups) ([]any, error) {
63+
predicateCount := 0
64+
for _, predicateGroup := range where {
65+
predicateCount += len(predicateGroup.Predicates)
66+
}
67+
results := make([]any, predicateCount)
68+
counter := 0
69+
for _, predicateGroup := range where {
70+
for _, predicate := range predicateGroup.Predicates {
71+
col := predicate.Record.Column(0)
72+
primitiveValues, err := typeconv.FromArray(col)
73+
if err != nil {
74+
return nil, err
75+
}
76+
unpacked := unpackArray(primitiveValues)
77+
results[counter] = unpacked[0]
78+
counter++
79+
}
80+
}
81+
return results, nil
82+
}
83+
84+
func unpackArray(s any) []any {
85+
v := reflect.ValueOf(s)
86+
fmt.Println(v.Kind(), v.Len())
87+
r := make([]any, v.Len())
88+
for i := range v.Len() {
89+
r[i] = v.Index(i).Interface()
90+
}
91+
return r
92+
}
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
package client
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"math/rand"
8+
"testing"
9+
"time"
10+
11+
"github.com/apache/arrow-go/v18/arrow"
12+
"github.com/apache/arrow-go/v18/arrow/array"
13+
"github.com/apache/arrow-go/v18/arrow/memory"
14+
"github.com/cloudquery/plugin-sdk/v4/message"
15+
"github.com/cloudquery/plugin-sdk/v4/plugin"
16+
"github.com/cloudquery/plugin-sdk/v4/schema"
17+
"github.com/rs/zerolog"
18+
"github.com/stretchr/testify/require"
19+
)
20+
21+
func TestDelete(t *testing.T) {
22+
testCases := []struct {
23+
name string
24+
insertValues []string
25+
deleteValues []string
26+
deleteAll bool
27+
expectedCount int
28+
}{
29+
{
30+
name: "delete single record",
31+
insertValues: []string{"foo", "bar"},
32+
deleteValues: []string{"foo"},
33+
expectedCount: 1,
34+
},
35+
{
36+
name: "delete both records",
37+
insertValues: []string{"foo", "bar"},
38+
deleteValues: []string{"foo", "bar"},
39+
expectedCount: 0,
40+
},
41+
{
42+
name: "delete none",
43+
insertValues: []string{"foo"},
44+
deleteValues: []string{"bar"},
45+
expectedCount: 1,
46+
},
47+
{
48+
name: "delete all records",
49+
insertValues: []string{"foo", "bar"},
50+
deleteAll: true,
51+
expectedCount: 0,
52+
},
53+
}
54+
for _, tc := range testCases {
55+
t.Run(tc.name, func(t *testing.T) {
56+
r := require.New(t)
57+
ctx := context.Background()
58+
client := withPluginClient(ctx, r)
59+
60+
table := createTestTable()
61+
r.NoError(client.MigrateTables(ctx, message.WriteMigrateTables{{Table: table}}))
62+
63+
writeInserts := createInsertMessages(tc.insertValues, table)
64+
r.NoError(client.WriteTableBatch(ctx, "", writeInserts))
65+
66+
writeDeletes := createDeleteMessages(tc.deleteAll, table, tc.deleteValues)
67+
r.NoError(client.DeleteRecord(ctx, writeDeletes))
68+
69+
count, err := countAllRows(ctx, client, table)
70+
r.NoError(err)
71+
r.EqualValues(tc.expectedCount, count, "unexpected amount of items after delete with match")
72+
})
73+
}
74+
}
75+
76+
func countAllRows(ctx context.Context, client *Client, table *schema.Table) (int64, error) {
77+
var err error
78+
ch := make(chan arrow.Record)
79+
go func() {
80+
defer close(ch)
81+
err = client.Read(ctx, table, ch)
82+
}()
83+
count := int64(0)
84+
for record := range ch {
85+
count += record.NumRows()
86+
}
87+
return count, err
88+
}
89+
90+
func withPluginClient(ctx context.Context, r *require.Assertions) *Client {
91+
s := &Spec{ConnectionString: ":memory:"}
92+
b, err := json.Marshal(s)
93+
r.NoError(err)
94+
c, err := New(ctx, zerolog.Nop(), b, plugin.NewClientOptions{})
95+
r.NoError(err)
96+
return c.(*Client)
97+
}
98+
99+
func valueToArrowRecord(tableName string, value string) arrow.Record {
100+
bldrDeleteMatch := array.NewRecordBuilder(memory.DefaultAllocator, (&schema.Table{
101+
Name: tableName,
102+
Columns: schema.ColumnList{
103+
schema.Column{Name: "id", Type: arrow.BinaryTypes.String},
104+
},
105+
}).ToArrowSchema())
106+
bldrDeleteMatch.Field(0).(*array.StringBuilder).Append(value)
107+
deleteValue := bldrDeleteMatch.NewRecord()
108+
return deleteValue
109+
}
110+
111+
func createDeleteMessages(deleteAll bool, table *schema.Table, deleteValues []string) message.WriteDeleteRecords {
112+
writeDeletes := message.WriteDeleteRecords{}
113+
114+
if deleteAll {
115+
msg := message.WriteDeleteRecord{
116+
DeleteRecord: message.DeleteRecord{
117+
TableName: table.Name,
118+
},
119+
}
120+
return append(writeDeletes, &msg)
121+
}
122+
for _, deleteValue := range deleteValues {
123+
msg := message.WriteDeleteRecord{
124+
DeleteRecord: message.DeleteRecord{
125+
TableName: table.Name,
126+
WhereClause: message.PredicateGroups{
127+
{
128+
GroupingType: "AND",
129+
Predicates: []message.Predicate{
130+
{
131+
Operator: "eq",
132+
Column: "id",
133+
Record: valueToArrowRecord(table.Name, deleteValue),
134+
},
135+
},
136+
},
137+
},
138+
},
139+
}
140+
writeDeletes = append(writeDeletes, &msg)
141+
}
142+
return writeDeletes
143+
}
144+
145+
func createInsertMessages(values []string, table *schema.Table) message.WriteInserts {
146+
const sourceName = "source-test"
147+
writeInserts := message.WriteInserts{}
148+
for _, insertValue := range values {
149+
bldr := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema())
150+
bldr.Field(0).(*array.StringBuilder).Append(insertValue)
151+
bldr.Field(1).(*array.StringBuilder).Append(sourceName)
152+
bldr.Field(2).(*array.TimestampBuilder).AppendTime(time.Now())
153+
record := bldr.NewRecord()
154+
writeInserts = append(writeInserts, &message.WriteInsert{Record: record})
155+
}
156+
return writeInserts
157+
}
158+
159+
func createTestTable() *schema.Table {
160+
tableName := fmt.Sprintf("cq_delete_test_%d_%04d", time.Now().UnixNano(), rand.Intn(1000))
161+
table := &schema.Table{
162+
Name: tableName,
163+
Columns: schema.ColumnList{
164+
schema.Column{Name: "id", Type: arrow.BinaryTypes.String, PrimaryKey: true, NotNull: true},
165+
schema.CqSourceNameColumn,
166+
schema.CqSyncTimeColumn,
167+
},
168+
}
169+
return table
170+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package typeconv
2+
3+
import (
4+
"github.com/apache/arrow-go/v18/arrow"
5+
)
6+
7+
type primitive[A any] interface {
8+
arrow.Array
9+
Value(int) A
10+
}
11+
12+
func primitiveValue[A any, ARR primitive[A]](arr ARR) []A {
13+
res := make([]A, arr.Len())
14+
for i := 0; i < arr.Len(); i++ {
15+
if arr.IsValid(i) {
16+
val := arr.Value(i)
17+
res[i] = val
18+
}
19+
}
20+
return res
21+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package typeconv
2+
3+
import (
4+
"github.com/apache/arrow-go/v18/arrow"
5+
"github.com/apache/arrow-go/v18/arrow/array"
6+
)
7+
8+
func valueStrData(arr arrow.Array) []string {
9+
res := make([]string, arr.Len())
10+
for i := 0; i < arr.Len(); i++ {
11+
if arr.IsValid(i) {
12+
res[i] = arr.ValueStr(i)
13+
}
14+
}
15+
return res
16+
}
17+
18+
func float16Value(arr *array.Float16) []float32 {
19+
res := make([]float32, arr.Len())
20+
for i := 0; i < arr.Len(); i++ {
21+
if arr.IsValid(i) {
22+
res[i] = arr.Value(i).Float32()
23+
}
24+
}
25+
return res
26+
}
27+
28+
func byteArrValue(arr primitive[[]byte]) []string {
29+
res := make([]string, arr.Len())
30+
for i := 0; i < arr.Len(); i++ {
31+
if arr.IsValid(i) {
32+
res[i] = string(arr.Value(i))
33+
}
34+
}
35+
return res
36+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package typeconv
2+
3+
import (
4+
"github.com/apache/arrow-go/v18/arrow"
5+
"github.com/apache/arrow-go/v18/arrow/array"
6+
)
7+
8+
func FromArray(arr arrow.Array) (any, error) {
9+
switch arr := arr.(type) {
10+
case *array.Boolean:
11+
return primitiveValue(arr), nil
12+
13+
case *array.Uint8:
14+
return primitiveValue(arr), nil
15+
case *array.Uint16:
16+
return primitiveValue(arr), nil
17+
case *array.Uint32:
18+
return primitiveValue(arr), nil
19+
case *array.Uint64:
20+
return primitiveValue(arr), nil
21+
22+
case *array.Int8:
23+
return primitiveValue(arr), nil
24+
case *array.Int16:
25+
return primitiveValue(arr), nil
26+
case *array.Int32:
27+
return primitiveValue(arr), nil
28+
case *array.Int64:
29+
return primitiveValue(arr), nil
30+
31+
case *array.Float16:
32+
return float16Value(arr), nil
33+
case *array.Float32:
34+
return primitiveValue(arr), nil
35+
case *array.Float64:
36+
return primitiveValue(arr), nil
37+
38+
case *array.String:
39+
return primitiveValue(arr), nil
40+
41+
case *array.Binary:
42+
return byteArrValue(arr), nil
43+
case *array.FixedSizeBinary:
44+
return byteArrValue(arr), nil
45+
case *array.LargeBinary:
46+
return byteArrValue(arr), nil
47+
48+
default:
49+
return valueStrData(arr), nil
50+
}
51+
}

0 commit comments

Comments
 (0)