From e80367dd4d9f3c25881d282ad2a6733324a15558 Mon Sep 17 00:00:00 2001 From: Mariano Gappa Date: Tue, 30 Jul 2024 16:14:53 +0100 Subject: [PATCH 1/2] feat: Implement TransformSchema support. --- examples/simple_plugin/go.mod | 2 +- examples/simple_plugin/go.sum | 4 ++-- examples/simple_plugin/plugin/client.go | 5 +++++ go.mod | 2 +- go.sum | 2 ++ internal/memdb/memdb.go | 4 ++++ internal/reversertransformer/reversertransformer.go | 4 ++++ plugin/plugin.go | 3 +++ plugin/plugin_test.go | 3 +++ plugin/plugin_transformer.go | 4 ++++ 10 files changed, 29 insertions(+), 4 deletions(-) diff --git a/examples/simple_plugin/go.mod b/examples/simple_plugin/go.mod index 753e0c5d4e..643e275070 100644 --- a/examples/simple_plugin/go.mod +++ b/examples/simple_plugin/go.mod @@ -16,7 +16,7 @@ require ( github.com/buger/jsonparser v1.1.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cloudquery/cloudquery-api-go v1.12.4 // indirect - github.com/cloudquery/plugin-pb-go v1.21.1 // indirect + github.com/cloudquery/plugin-pb-go v1.21.2 // indirect github.com/cloudquery/plugin-sdk/v2 v2.7.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/ghodss/yaml v1.0.0 // indirect diff --git a/examples/simple_plugin/go.sum b/examples/simple_plugin/go.sum index fd5ee8a13e..63163001ec 100644 --- a/examples/simple_plugin/go.sum +++ b/examples/simple_plugin/go.sum @@ -18,8 +18,8 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK3 github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cloudquery/cloudquery-api-go v1.12.4 h1:48zJRUONRb0AJD/l4u5QZtGsVBq1QUU3M9+/+sCU3xo= github.com/cloudquery/cloudquery-api-go v1.12.4/go.mod h1:5oo8HHnv2Y7NgcVvZn59xFlYKJUyeP0tcN8JH3IP2Aw= -github.com/cloudquery/plugin-pb-go v1.21.1 h1:Fm4FyGrgLUTcllaEI6jdk1Q1NQ6VEbaSl8UCN3AxFOg= -github.com/cloudquery/plugin-pb-go v1.21.1/go.mod h1:Gv336j2QCqGlUABUA6gBcrqWNjLpzv9NOmAJAHC3xg0= +github.com/cloudquery/plugin-pb-go v1.21.2 h1:WfuiZbunZBhcVBNW/FwJyv7RrSen9XNYH4ZbVl+4vzk= +github.com/cloudquery/plugin-pb-go v1.21.2/go.mod h1:Gv336j2QCqGlUABUA6gBcrqWNjLpzv9NOmAJAHC3xg0= github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U= github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= diff --git a/examples/simple_plugin/plugin/client.go b/examples/simple_plugin/plugin/client.go index 5ef3bf773d..72bb5131dc 100644 --- a/examples/simple_plugin/plugin/client.go +++ b/examples/simple_plugin/plugin/client.go @@ -69,6 +69,11 @@ func (*Client) Transform(_ context.Context, _ <-chan arrow.Record, _ chan<- arro return nil } +func (*Client) TransformSchema(_ context.Context, _ *arrow.Schema) (*arrow.Schema, error) { + // Not implemented, just used for testing destination packaging + return nil, nil +} + func Configure(_ context.Context, logger zerolog.Logger, spec []byte, opts plugin.NewClientOptions) (plugin.Client, error) { if opts.NoConnection { return &Client{ diff --git a/go.mod b/go.mod index ceb0e41528..c301a7e7fe 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/apache/arrow/go/v17 v17.0.0 github.com/bradleyjkemp/cupaloy/v2 v2.8.0 github.com/cloudquery/cloudquery-api-go v1.12.4 - github.com/cloudquery/plugin-pb-go v1.21.1 + github.com/cloudquery/plugin-pb-go v1.21.2 github.com/cloudquery/plugin-sdk/v2 v2.7.0 github.com/goccy/go-json v0.10.3 github.com/google/go-cmp v0.6.0 diff --git a/go.sum b/go.sum index 8e3880ea81..659bbe80af 100644 --- a/go.sum +++ b/go.sum @@ -20,6 +20,8 @@ github.com/cloudquery/cloudquery-api-go v1.12.4 h1:48zJRUONRb0AJD/l4u5QZtGsVBq1Q github.com/cloudquery/cloudquery-api-go v1.12.4/go.mod h1:5oo8HHnv2Y7NgcVvZn59xFlYKJUyeP0tcN8JH3IP2Aw= github.com/cloudquery/plugin-pb-go v1.21.1 h1:Fm4FyGrgLUTcllaEI6jdk1Q1NQ6VEbaSl8UCN3AxFOg= github.com/cloudquery/plugin-pb-go v1.21.1/go.mod h1:Gv336j2QCqGlUABUA6gBcrqWNjLpzv9NOmAJAHC3xg0= +github.com/cloudquery/plugin-pb-go v1.21.2 h1:WfuiZbunZBhcVBNW/FwJyv7RrSen9XNYH4ZbVl+4vzk= +github.com/cloudquery/plugin-pb-go v1.21.2/go.mod h1:Gv336j2QCqGlUABUA6gBcrqWNjLpzv9NOmAJAHC3xg0= github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U= github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= diff --git a/internal/memdb/memdb.go b/internal/memdb/memdb.go index e0c7fa410c..5874e1c9b2 100644 --- a/internal/memdb/memdb.go +++ b/internal/memdb/memdb.go @@ -311,6 +311,10 @@ func (*client) Transform(_ context.Context, _ <-chan arrow.Record, _ chan<- arro return nil } +func (*client) TransformSchema(_ context.Context, _ *arrow.Schema) (*arrow.Schema, error) { + return nil, nil +} + func evaluatePredicate(pred message.Predicate, record arrow.Record) bool { sc := record.Schema() indices := sc.FieldIndices(pred.Column) diff --git a/internal/reversertransformer/reversertransformer.go b/internal/reversertransformer/reversertransformer.go index 831443fec5..0932900796 100644 --- a/internal/reversertransformer/reversertransformer.go +++ b/internal/reversertransformer/reversertransformer.go @@ -58,6 +58,10 @@ func (c *client) Transform(ctx context.Context, recvRecords <-chan arrow.Record, } } +func (*client) TransformSchema(_ context.Context, old *arrow.Schema) (*arrow.Schema, error) { + return old, nil +} + func (*client) reverseStrings(record arrow.Record) (arrow.Record, error) { for i, column := range record.Columns() { if column.DataType().ID() != arrow.STRING { diff --git a/plugin/plugin.go b/plugin/plugin.go index b7a00671dc..7ad2824cfd 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -58,6 +58,9 @@ type UnimplementedTransformer struct{} func (UnimplementedTransformer) Transform(context.Context, <-chan arrow.Record, chan<- arrow.Record) error { return ErrNotImplemented } +func (UnimplementedTransformer) TransformSchema(context.Context, *arrow.Schema) (*arrow.Schema, error) { + return nil, ErrNotImplemented +} // Plugin is the base structure required to pass to sdk.serve // We take a declarative approach to API here similar to Cobra diff --git a/plugin/plugin_test.go b/plugin/plugin_test.go index be7af4ff81..634adafed6 100644 --- a/plugin/plugin_test.go +++ b/plugin/plugin_test.go @@ -59,6 +59,9 @@ func (*testPluginClient) Close(context.Context) error { func (*testPluginClient) Transform(context.Context, <-chan arrow.Record, chan<- arrow.Record) error { return nil } +func (*testPluginClient) TransformSchema(ctx context.Context, old *arrow.Schema) (*arrow.Schema, error) { + return nil, nil +} func TestPluginSuccess(t *testing.T) { ctx := context.Background() diff --git a/plugin/plugin_transformer.go b/plugin/plugin_transformer.go index a273021078..225b3e8129 100644 --- a/plugin/plugin_transformer.go +++ b/plugin/plugin_transformer.go @@ -8,8 +8,12 @@ import ( type TransformerClient interface { Transform(context.Context, <-chan arrow.Record, chan<- arrow.Record) error + TransformSchema(context.Context, *arrow.Schema) (*arrow.Schema, error) } func (p *Plugin) Transform(ctx context.Context, recvRecords <-chan arrow.Record, sendRecords chan<- arrow.Record) error { return p.client.Transform(ctx, recvRecords, sendRecords) } +func (p *Plugin) TransformSchema(ctx context.Context, old *arrow.Schema) (*arrow.Schema, error) { + return p.client.TransformSchema(ctx, old) +} From a5ca6a9f10a495a4d8a50a1c38694f318e04fff4 Mon Sep 17 00:00:00 2001 From: Mariano Gappa Date: Wed, 31 Jul 2024 10:55:25 +0100 Subject: [PATCH 2/2] feat: Implement TransformSchema method. --- examples/simple_plugin/go.mod | 2 +- examples/simple_plugin/go.sum | 2 + go.mod | 2 +- go.sum | 2 + .../reversertransformer_test.go | 4 +- internal/servers/plugin/v3/plugin.go | 16 +++++ internal/servers/plugin/v3/plugin_test.go | 65 +++++++++++++++++++ plugin/plugin_test.go | 2 +- 8 files changed, 89 insertions(+), 6 deletions(-) diff --git a/examples/simple_plugin/go.mod b/examples/simple_plugin/go.mod index 643e275070..1a74eaaee6 100644 --- a/examples/simple_plugin/go.mod +++ b/examples/simple_plugin/go.mod @@ -16,7 +16,7 @@ require ( github.com/buger/jsonparser v1.1.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cloudquery/cloudquery-api-go v1.12.4 // indirect - github.com/cloudquery/plugin-pb-go v1.21.2 // indirect + github.com/cloudquery/plugin-pb-go v1.21.3 // indirect github.com/cloudquery/plugin-sdk/v2 v2.7.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/ghodss/yaml v1.0.0 // indirect diff --git a/examples/simple_plugin/go.sum b/examples/simple_plugin/go.sum index 63163001ec..5ddcabf866 100644 --- a/examples/simple_plugin/go.sum +++ b/examples/simple_plugin/go.sum @@ -20,6 +20,8 @@ github.com/cloudquery/cloudquery-api-go v1.12.4 h1:48zJRUONRb0AJD/l4u5QZtGsVBq1Q github.com/cloudquery/cloudquery-api-go v1.12.4/go.mod h1:5oo8HHnv2Y7NgcVvZn59xFlYKJUyeP0tcN8JH3IP2Aw= github.com/cloudquery/plugin-pb-go v1.21.2 h1:WfuiZbunZBhcVBNW/FwJyv7RrSen9XNYH4ZbVl+4vzk= github.com/cloudquery/plugin-pb-go v1.21.2/go.mod h1:Gv336j2QCqGlUABUA6gBcrqWNjLpzv9NOmAJAHC3xg0= +github.com/cloudquery/plugin-pb-go v1.21.3 h1:IlhLN6LbZeAzOjIm0VMELmj7PpFkDroJ41QCrAEcWwg= +github.com/cloudquery/plugin-pb-go v1.21.3/go.mod h1:Gv336j2QCqGlUABUA6gBcrqWNjLpzv9NOmAJAHC3xg0= github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U= github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= diff --git a/go.mod b/go.mod index c301a7e7fe..57b7644dc9 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/apache/arrow/go/v17 v17.0.0 github.com/bradleyjkemp/cupaloy/v2 v2.8.0 github.com/cloudquery/cloudquery-api-go v1.12.4 - github.com/cloudquery/plugin-pb-go v1.21.2 + github.com/cloudquery/plugin-pb-go v1.21.3 github.com/cloudquery/plugin-sdk/v2 v2.7.0 github.com/goccy/go-json v0.10.3 github.com/google/go-cmp v0.6.0 diff --git a/go.sum b/go.sum index 659bbe80af..69aa0ca352 100644 --- a/go.sum +++ b/go.sum @@ -22,6 +22,8 @@ github.com/cloudquery/plugin-pb-go v1.21.1 h1:Fm4FyGrgLUTcllaEI6jdk1Q1NQ6VEbaSl8 github.com/cloudquery/plugin-pb-go v1.21.1/go.mod h1:Gv336j2QCqGlUABUA6gBcrqWNjLpzv9NOmAJAHC3xg0= github.com/cloudquery/plugin-pb-go v1.21.2 h1:WfuiZbunZBhcVBNW/FwJyv7RrSen9XNYH4ZbVl+4vzk= github.com/cloudquery/plugin-pb-go v1.21.2/go.mod h1:Gv336j2QCqGlUABUA6gBcrqWNjLpzv9NOmAJAHC3xg0= +github.com/cloudquery/plugin-pb-go v1.21.3 h1:IlhLN6LbZeAzOjIm0VMELmj7PpFkDroJ41QCrAEcWwg= +github.com/cloudquery/plugin-pb-go v1.21.3/go.mod h1:Gv336j2QCqGlUABUA6gBcrqWNjLpzv9NOmAJAHC3xg0= github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U= github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= diff --git a/internal/reversertransformer/reversertransformer_test.go b/internal/reversertransformer/reversertransformer_test.go index 1f7c19225a..9750ed729c 100644 --- a/internal/reversertransformer/reversertransformer_test.go +++ b/internal/reversertransformer/reversertransformer_test.go @@ -16,8 +16,6 @@ import ( "google.golang.org/grpc/metadata" ) -var mem = memory.NewGoAllocator() - func TestReverserTransformer(t *testing.T) { p := plugin.NewPlugin("test", "development", GetNewClient()) s := internalPlugin.Server{ @@ -58,7 +56,7 @@ func makeRequestFromString(s string) *pb.Transform_Request { } func makeRecordFromString(s string) arrow.Record { - str := array.NewStringBuilder(mem) + str := array.NewStringBuilder(memory.DefaultAllocator) str.AppendString(s) arr := str.NewStringArray() schema := arrow.NewSchema([]arrow.Field{{Name: "col1", Type: arrow.BinaryTypes.String}}, nil) diff --git a/internal/servers/plugin/v3/plugin.go b/internal/servers/plugin/v3/plugin.go index 699fde1477..917fe3c58b 100644 --- a/internal/servers/plugin/v3/plugin.go +++ b/internal/servers/plugin/v3/plugin.go @@ -475,6 +475,22 @@ func (s *Server) Transform(stream pb.Plugin_TransformServer) error { return eg.Wait() } +func (s *Server) TransformSchema(ctx context.Context, req *pb.TransformSchema_Request) (*pb.TransformSchema_Response, error) { + sc, err := pb.NewSchemaFromBytes(req.Schema) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "failed to create schema from bytes: %v", err) + } + newSchema, err := s.Plugin.TransformSchema(ctx, sc) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to transform schema: %v", err) + } + encoded, err := pb.SchemaToBytes(newSchema) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to encode schema: %v", err) + } + return &pb.TransformSchema_Response{Schema: encoded}, nil +} + func (s *Server) Close(ctx context.Context, _ *pb.Close_Request) (*pb.Close_Response, error) { return &pb.Close_Response{}, s.Plugin.Close(ctx) } diff --git a/internal/servers/plugin/v3/plugin_test.go b/internal/servers/plugin/v3/plugin_test.go index dc5610994c..ac32af2091 100644 --- a/internal/servers/plugin/v3/plugin_test.go +++ b/internal/servers/plugin/v3/plugin_test.go @@ -12,6 +12,8 @@ import ( "github.com/cloudquery/plugin-sdk/v4/internal/memdb" "github.com/cloudquery/plugin-sdk/v4/plugin" "github.com/cloudquery/plugin-sdk/v4/schema" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) @@ -183,3 +185,66 @@ func TestPluginSync(t *testing.T) { t.Fatal(err) } } + +func TestTransformSchema(t *testing.T) { + ctx := context.Background() + s := Server{ + Plugin: plugin.NewPlugin("test", "development", getColumnAdderPlugin()), + } + + _, err := s.Init(ctx, &pb.Init_Request{}) + if err != nil { + t.Fatal(err) + } + + table := &schema.Table{ + Name: "test", + Columns: []schema.Column{ + { + Name: "test", + Type: arrow.BinaryTypes.String, + }, + }, + } + sc := table.ToArrowSchema() + + schemaBytes, err := pb.SchemaToBytes(sc) + require.NoError(t, err) + + resp, err := s.TransformSchema(ctx, &pb.TransformSchema_Request{Schema: schemaBytes}) + if err != nil { + t.Fatal(err) + } + + newSchema, err := pb.NewSchemaFromBytes(resp.Schema) + require.NoError(t, err) + + require.Len(t, newSchema.Fields(), 2) + require.Equal(t, "test", newSchema.Fields()[0].Name) + require.Equal(t, "source", newSchema.Fields()[1].Name) + require.Equal(t, "utf8", newSchema.Fields()[1].Type.(*arrow.StringType).Name()) + + if _, err := s.Close(ctx, &pb.Close_Request{}); err != nil { + t.Fatal(err) + } +} + +type mockSourceColumnAdderPluginClient struct { + plugin.UnimplementedDestination + plugin.UnimplementedSource +} + +func getColumnAdderPlugin(...plugin.Option) plugin.NewClientFunc { + c := &mockSourceColumnAdderPluginClient{} + return func(context.Context, zerolog.Logger, []byte, plugin.NewClientOptions) (plugin.Client, error) { + return c, nil + } +} + +func (*mockSourceColumnAdderPluginClient) Transform(context.Context, <-chan arrow.Record, chan<- arrow.Record) error { + return nil +} +func (*mockSourceColumnAdderPluginClient) TransformSchema(_ context.Context, old *arrow.Schema) (*arrow.Schema, error) { + return old.AddField(1, arrow.Field{Name: "source", Type: arrow.BinaryTypes.String}) +} +func (*mockSourceColumnAdderPluginClient) Close(context.Context) error { return nil } diff --git a/plugin/plugin_test.go b/plugin/plugin_test.go index 634adafed6..22d20d5a25 100644 --- a/plugin/plugin_test.go +++ b/plugin/plugin_test.go @@ -59,7 +59,7 @@ func (*testPluginClient) Close(context.Context) error { func (*testPluginClient) Transform(context.Context, <-chan arrow.Record, chan<- arrow.Record) error { return nil } -func (*testPluginClient) TransformSchema(ctx context.Context, old *arrow.Schema) (*arrow.Schema, error) { +func (*testPluginClient) TransformSchema(context.Context, *arrow.Schema) (*arrow.Schema, error) { return nil, nil }