-
Notifications
You must be signed in to change notification settings - Fork 25
Expand file tree
/
Copy pathplugin_test.go
More file actions
102 lines (91 loc) · 2.4 KB
/
plugin_test.go
File metadata and controls
102 lines (91 loc) · 2.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package plugin
import (
"context"
"testing"
"github.com/apache/arrow-go/v18/arrow"
"github.com/cloudquery/plugin-sdk/v4/message"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/rs/zerolog"
)
type testPluginClient struct {
messages message.SyncMessages
}
func newTestPluginClient(context.Context, zerolog.Logger, []byte, NewClientOptions) (Client, error) {
return &testPluginClient{}, nil
}
func (*testPluginClient) GetSpec() any {
return &struct{}{}
}
func (*testPluginClient) Tables(context.Context, TableOptions) (schema.Tables, error) {
return schema.Tables{}, nil
}
func (*testPluginClient) Read(context.Context, *schema.Table, chan<- arrow.RecordBatch) error {
return nil
}
func (c *testPluginClient) Sync(_ context.Context, _ SyncOptions, res chan<- message.SyncMessage) error {
for _, msg := range c.messages {
res <- msg
}
return nil
}
func (c *testPluginClient) Write(_ context.Context, res <-chan message.WriteMessage) error {
for msg := range res {
switch m := msg.(type) {
case *message.WriteMigrateTable:
c.messages = append(c.messages, &message.SyncMigrateTable{
Table: m.Table,
})
case *message.WriteInsert:
c.messages = append(c.messages, &message.SyncInsert{
Record: m.Record,
})
default:
panic("unknown message")
}
}
return nil
}
func (*testPluginClient) Close(context.Context) error {
return nil
}
func (*testPluginClient) Transform(context.Context, <-chan arrow.RecordBatch, chan<- arrow.RecordBatch) error {
return nil
}
func (*testPluginClient) TransformSchema(context.Context, *arrow.Schema) (*arrow.Schema, error) {
return nil, nil
}
func TestPluginSuccess(t *testing.T) {
ctx := context.Background()
p := NewPlugin("test", "v1.0.0", newTestPluginClient)
if err := p.Init(ctx, []byte(""), NewClientOptions{}); err != nil {
t.Fatal(err)
}
tables, err := p.Tables(ctx, TableOptions{})
if err != nil {
t.Fatal(err)
}
if len(tables) != 0 {
t.Fatal("expected 0 tables")
}
if err := p.WriteAll(ctx, nil); err != nil {
t.Fatal(err)
}
if err := p.WriteAll(ctx, []message.WriteMessage{
&message.WriteMigrateTable{},
}); err != nil {
t.Fatal(err)
}
if len(p.client.(*testPluginClient).messages) != 1 {
t.Fatal("expected 1 message")
}
messages, err := p.SyncAll(ctx, SyncOptions{})
if err != nil {
t.Fatal(err)
}
if len(messages) != 1 {
t.Fatal("expected 1 message")
}
if err := p.Close(ctx); err != nil {
t.Fatal(err)
}
}