From c21b91bb8ddd194a1a4a7a378d8f97ad10b2e0ef Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Sat, 21 Jan 2023 19:18:04 +0200 Subject: [PATCH 1/4] feat: Add version discovery service --- Makefile | 1 + clients/discovery/v0/constants.go | 5 + clients/discovery/v0/destination_terminate.go | 30 ++ .../v0/destination_terminate_windows.go | 20 ++ clients/discovery/v0/discovery.go | 203 ++++++++++++++ clients/source/v1/source.go | 4 +- cloudquery/discovery/v0/discovery.proto | 16 ++ internal/pb/discovery/v0/discovery.pb.go | 261 ++++++++++++++++++ internal/pb/discovery/v0/discovery_grpc.pb.go | 107 +++++++ .../servers/destination/v0/destinations.go | 2 +- internal/servers/discovery/v0/discovery.go | 16 ++ serve/destination.go | 5 + serve/source.go | 7 + 13 files changed, 674 insertions(+), 3 deletions(-) create mode 100644 clients/discovery/v0/constants.go create mode 100644 clients/discovery/v0/destination_terminate.go create mode 100644 clients/discovery/v0/destination_terminate_windows.go create mode 100644 clients/discovery/v0/discovery.go create mode 100644 cloudquery/discovery/v0/discovery.proto create mode 100644 internal/pb/discovery/v0/discovery.pb.go create mode 100644 internal/pb/discovery/v0/discovery_grpc.pb.go create mode 100644 internal/servers/discovery/v0/discovery.go diff --git a/Makefile b/Makefile index 5605f02a40..425f19118b 100644 --- a/Makefile +++ b/Makefile @@ -10,6 +10,7 @@ lint: gen-proto: protoc --proto_path=. --go_out . --go_opt=module="github.com/cloudquery/plugin-sdk" --go-grpc_out=. --go-grpc_opt=module="github.com/cloudquery/plugin-sdk" cloudquery/base/v0/base.proto cloudquery/destination/v0/destination.proto cloudquery/source/v0/source.proto protoc --proto_path=. --go_out . --go_opt=module="github.com/cloudquery/plugin-sdk" --go-grpc_out=. --go-grpc_opt=module="github.com/cloudquery/plugin-sdk" cloudquery/source/v1/source.proto + protoc --proto_path=. --go_out . --go_opt=module="github.com/cloudquery/plugin-sdk" --go-grpc_out=. --go-grpc_opt=module="github.com/cloudquery/plugin-sdk" cloudquery/discovery/v0/discovery.proto .PHONY: benchmark benchmark: diff --git a/clients/discovery/v0/constants.go b/clients/discovery/v0/constants.go new file mode 100644 index 0000000000..9651915154 --- /dev/null +++ b/clients/discovery/v0/constants.go @@ -0,0 +1,5 @@ +package discovery + +const ( + maxMsgSize = 100 * 1024 * 1024 // 100 MiB +) diff --git a/clients/discovery/v0/destination_terminate.go b/clients/discovery/v0/destination_terminate.go new file mode 100644 index 0000000000..8f17c3c3ea --- /dev/null +++ b/clients/discovery/v0/destination_terminate.go @@ -0,0 +1,30 @@ +//go:build !windows + +package discovery + +import ( + "fmt" + "os" + "time" +) + +func (c *Client) terminateProcess() error { + if err := c.cmd.Process.Signal(os.Interrupt); err != nil { + c.logger.Error().Err(err).Msg("failed to send interrupt signal to destination plugin") + } + timer := time.AfterFunc(5*time.Second, func() { + if err := c.cmd.Process.Kill(); err != nil { + c.logger.Error().Err(err).Msg("failed to kill destination plugin") + } + }) + st, err := c.cmd.Process.Wait() + timer.Stop() + if err != nil { + return err + } + if !st.Success() { + return fmt.Errorf("destination plugin process exited with status %s", st.String()) + } + + return nil +} diff --git a/clients/discovery/v0/destination_terminate_windows.go b/clients/discovery/v0/destination_terminate_windows.go new file mode 100644 index 0000000000..ff5ffed3f5 --- /dev/null +++ b/clients/discovery/v0/destination_terminate_windows.go @@ -0,0 +1,20 @@ +//go:build windows + +package discovery + +func (c *Client) terminateProcess() error { + if err := c.cmd.Process.Kill(); err != nil { + c.logger.Error().Err(err).Msg("failed to kill destination plugin") + } + st, err := c.cmd.Process.Wait() + if err != nil { + return err + } + if !st.Success() { + // on windows there is no way to shutdown gracefully via signal. Maybe we can do it via grpc api? + // though it is a bit strange to expose api to shutdown a server :thinking?: + c.logger.Info().Msgf("destination plugin process exited with %s", st.String()) + } + + return nil +} diff --git a/clients/discovery/v0/discovery.go b/clients/discovery/v0/discovery.go new file mode 100644 index 0000000000..f7727edf06 --- /dev/null +++ b/clients/discovery/v0/discovery.go @@ -0,0 +1,203 @@ +package discovery + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + + "github.com/cloudquery/plugin-sdk/internal/logging" + pb "github.com/cloudquery/plugin-sdk/internal/pb/discovery/v0" + "github.com/cloudquery/plugin-sdk/internal/random" + "github.com/cloudquery/plugin-sdk/registry" + "github.com/cloudquery/plugin-sdk/specs" + "github.com/rs/zerolog" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +type Client struct { + pbClient pb.DiscoveryClient + directory string + cmd *exec.Cmd + logger zerolog.Logger + userConn *grpc.ClientConn + conn *grpc.ClientConn + grpcSocketName string + noSentry bool + wg *sync.WaitGroup +} + +type ClientOption func(*Client) + +func WithDirectory(directory string) func(*Client) { + return func(c *Client) { + c.directory = directory + } +} + +func WithGrpcConn(userConn *grpc.ClientConn) func(*Client) { + return func(c *Client) { + // we use a different variable here because we don't want to close a connection that wasn't created by us. + c.userConn = userConn + } +} + +func NewClient(ctx context.Context, registrySpec specs.Registry, pluginType registry.PluginType, path string, version string, opts ...ClientOption) (*Client, error) { + var err error + c := &Client{ + directory: registry.DefaultDownloadDir, + wg: &sync.WaitGroup{}, + } + for _, opt := range opts { + opt(c) + } + switch registrySpec { + case specs.RegistryGrpc: + if c.userConn == nil { + c.conn, err = grpc.DialContext(ctx, path, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(maxMsgSize), + grpc.MaxCallSendMsgSize(maxMsgSize), + ), + ) + if err != nil { + return nil, fmt.Errorf("failed to dial grpc source plugin at %s: %w", path, err) + } + c.pbClient = pb.NewDiscoveryClient(c.conn) + } else { + c.pbClient = pb.NewDiscoveryClient(c.userConn) + } + return c, nil + case specs.RegistryLocal: + if err := c.newManagedClient(ctx, path); err != nil { + return nil, err + } + case specs.RegistryGithub: + pathSplit := strings.Split(path, "/") + if len(pathSplit) != 2 { + return nil, fmt.Errorf("invalid github plugin path: %s. format should be owner/repo", path) + } + org, name := pathSplit[0], pathSplit[1] + localPath := filepath.Join(c.directory, "plugins", string(pluginType), org, name, version, "plugin") + localPath = registry.WithBinarySuffix(localPath) + if err := registry.DownloadPluginFromGithub(ctx, localPath, org, name, version, pluginType); err != nil { + return nil, err + } + if err := c.newManagedClient(ctx, localPath); err != nil { + return nil, err + } + default: + return nil, fmt.Errorf("unsupported registry %s", registrySpec) + } + + return c, nil +} + +// newManagedClient starts a new destination plugin process from local file, connects to it via gRPC server +// and returns a new Client +func (c *Client) newManagedClient(ctx context.Context, path string) error { + c.grpcSocketName = random.GenerateRandomUnixSocketName() + // spawn the plugin first and then connect + args := []string{"serve", "--network", "unix", "--address", c.grpcSocketName, + "--log-level", c.logger.GetLevel().String(), "--log-format", "json"} + if c.noSentry { + args = append(args, "--no-sentry") + } + cmd := exec.CommandContext(ctx, path, args...) + reader, err := cmd.StdoutPipe() + if err != nil { + return fmt.Errorf("failed to get stdout pipe: %w", err) + } + cmd.Stderr = os.Stderr + if err := cmd.Start(); err != nil { + return fmt.Errorf("failed to start destination plugin %s: %w", path, err) + } + + c.cmd = cmd + + c.wg.Add(1) + go func() { + defer c.wg.Done() + lr := logging.NewLogReader(reader) + for { + line, err := lr.NextLine() + if errors.Is(err, io.EOF) { + break + } + if errors.Is(err, logging.ErrLogLineToLong) { + c.logger.Info().Str("line", string(line)).Msg("truncated destination plugin log line") + continue + } + if err != nil { + c.logger.Err(err).Msg("failed to read log line from destination plugin") + break + } + var structuredLogLine map[string]any + if err := json.Unmarshal(line, &structuredLogLine); err != nil { + c.logger.Err(err).Str("line", string(line)).Msg("failed to unmarshal log line from destination plugin") + } else { + logging.JSONToLog(c.logger, structuredLogLine) + } + } + }() + + dialer := func(ctx context.Context, addr string) (net.Conn, error) { + d := &net.Dialer{} + return d.DialContext(ctx, "unix", addr) + } + c.conn, err = grpc.DialContext(ctx, c.grpcSocketName, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), grpc.WithContextDialer(dialer)) + if err != nil { + if err := cmd.Process.Kill(); err != nil { + c.logger.Error().Err(err).Msg("failed to kill plugin process") + } + return err + } + c.pbClient = pb.NewDiscoveryClient(c.conn) + return nil +} + +func (c *Client) GetVersions(ctx context.Context) ([]string, error){ + res, err := c.pbClient.GetVersions(ctx, &pb.GetVersions_Request{}) + if err != nil { + return nil, fmt.Errorf("failed to call GetVersions: %w", err) + } + return res.Versions, nil +} + +// Terminate is used only in conjunction with NewManagedClient. +// It closes the connection it created, kills the spawned process and removes the socket file. +func (c *Client) Terminate() error { + // wait for log streaming to complete before returning from this function + defer c.wg.Wait() + + if c.grpcSocketName != "" { + defer func() { + if err := os.RemoveAll(c.grpcSocketName); err != nil { + c.logger.Error().Err(err).Msg("failed to remove destination socket file") + } + }() + } + + if c.conn != nil { + if err := c.conn.Close(); err != nil { + c.logger.Error().Err(err).Msg("failed to close gRPC connection to destination plugin") + } + c.conn = nil + } + if c.cmd != nil && c.cmd.Process != nil { + if err := c.terminateProcess(); err != nil { + return err + } + } + + return nil +} \ No newline at end of file diff --git a/clients/source/v1/source.go b/clients/source/v1/source.go index 0ab4aa71cb..a21823e37c 100644 --- a/clients/source/v1/source.go +++ b/clients/source/v1/source.go @@ -226,12 +226,12 @@ func (c *Client) GetTables(ctx context.Context) ([]*schema.Table, error) { return tables, nil } -func (c *Client) GetDynamicTables(ctx context.Context) ([]*schema.Table, error) { +func (c *Client) GetDynamicTables(ctx context.Context) (schema.Tables, error) { res, err := c.pbClient.GetDynamicTables(ctx, &pb.GetDynamicTables_Request{}) if err != nil { return nil, fmt.Errorf("failed to call GetDynamicTables: %w", err) } - var tables []*schema.Table + var tables schema.Tables if err := json.Unmarshal(res.Tables, &tables); err != nil { return nil, fmt.Errorf("failed to unmarshal tables: %w", err) } diff --git a/cloudquery/discovery/v0/discovery.proto b/cloudquery/discovery/v0/discovery.proto new file mode 100644 index 0000000000..8894df6c2a --- /dev/null +++ b/cloudquery/discovery/v0/discovery.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; +package cloudquery.discovery.v0; + +option go_package = "github.com/cloudquery/plugin-sdk/internal/pb/discovery/v0;discovery"; + +service Discovery { + // Get the name of the plugin + rpc GetVersions(GetVersions.Request) returns (GetVersions.Response); +} + +message GetVersions { + message Request {} + message Response { + repeated string versions = 1; + } +} diff --git a/internal/pb/discovery/v0/discovery.pb.go b/internal/pb/discovery/v0/discovery.pb.go new file mode 100644 index 0000000000..16ab84a0c0 --- /dev/null +++ b/internal/pb/discovery/v0/discovery.pb.go @@ -0,0 +1,261 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.21.12 +// source: cloudquery/discovery/v0/discovery.proto + +package discovery + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type GetVersions struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *GetVersions) Reset() { + *x = GetVersions{} + if protoimpl.UnsafeEnabled { + mi := &file_cloudquery_discovery_v0_discovery_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetVersions) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetVersions) ProtoMessage() {} + +func (x *GetVersions) ProtoReflect() protoreflect.Message { + mi := &file_cloudquery_discovery_v0_discovery_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetVersions.ProtoReflect.Descriptor instead. +func (*GetVersions) Descriptor() ([]byte, []int) { + return file_cloudquery_discovery_v0_discovery_proto_rawDescGZIP(), []int{0} +} + +type GetVersions_Request struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *GetVersions_Request) Reset() { + *x = GetVersions_Request{} + if protoimpl.UnsafeEnabled { + mi := &file_cloudquery_discovery_v0_discovery_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetVersions_Request) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetVersions_Request) ProtoMessage() {} + +func (x *GetVersions_Request) ProtoReflect() protoreflect.Message { + mi := &file_cloudquery_discovery_v0_discovery_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetVersions_Request.ProtoReflect.Descriptor instead. +func (*GetVersions_Request) Descriptor() ([]byte, []int) { + return file_cloudquery_discovery_v0_discovery_proto_rawDescGZIP(), []int{0, 0} +} + +type GetVersions_Response struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Versions []string `protobuf:"bytes,1,rep,name=versions,proto3" json:"versions,omitempty"` +} + +func (x *GetVersions_Response) Reset() { + *x = GetVersions_Response{} + if protoimpl.UnsafeEnabled { + mi := &file_cloudquery_discovery_v0_discovery_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetVersions_Response) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetVersions_Response) ProtoMessage() {} + +func (x *GetVersions_Response) ProtoReflect() protoreflect.Message { + mi := &file_cloudquery_discovery_v0_discovery_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetVersions_Response.ProtoReflect.Descriptor instead. +func (*GetVersions_Response) Descriptor() ([]byte, []int) { + return file_cloudquery_discovery_v0_discovery_proto_rawDescGZIP(), []int{0, 1} +} + +func (x *GetVersions_Response) GetVersions() []string { + if x != nil { + return x.Versions + } + return nil +} + +var File_cloudquery_discovery_v0_discovery_proto protoreflect.FileDescriptor + +var file_cloudquery_discovery_v0_discovery_proto_rawDesc = []byte{ + 0x0a, 0x27, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2f, 0x64, 0x69, 0x73, + 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2f, 0x76, 0x30, 0x2f, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, + 0x65, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x17, 0x63, 0x6c, 0x6f, 0x75, 0x64, + 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, + 0x76, 0x30, 0x22, 0x40, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, + 0x73, 0x1a, 0x09, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x0a, 0x08, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x76, 0x65, 0x72, 0x73, + 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x08, 0x76, 0x65, 0x72, 0x73, + 0x69, 0x6f, 0x6e, 0x73, 0x32, 0x77, 0x0a, 0x09, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, + 0x79, 0x12, 0x6a, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73, + 0x12, 0x2c, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x64, 0x69, + 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x30, 0x2e, 0x47, 0x65, 0x74, 0x56, 0x65, + 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2d, + 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x64, 0x69, 0x73, 0x63, + 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x30, 0x2e, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, + 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x45, 0x5a, + 0x43, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6c, 0x6f, 0x75, + 0x64, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2f, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2d, 0x73, 0x64, + 0x6b, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x62, 0x2f, 0x64, 0x69, + 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2f, 0x76, 0x30, 0x3b, 0x64, 0x69, 0x73, 0x63, 0x6f, + 0x76, 0x65, 0x72, 0x79, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_cloudquery_discovery_v0_discovery_proto_rawDescOnce sync.Once + file_cloudquery_discovery_v0_discovery_proto_rawDescData = file_cloudquery_discovery_v0_discovery_proto_rawDesc +) + +func file_cloudquery_discovery_v0_discovery_proto_rawDescGZIP() []byte { + file_cloudquery_discovery_v0_discovery_proto_rawDescOnce.Do(func() { + file_cloudquery_discovery_v0_discovery_proto_rawDescData = protoimpl.X.CompressGZIP(file_cloudquery_discovery_v0_discovery_proto_rawDescData) + }) + return file_cloudquery_discovery_v0_discovery_proto_rawDescData +} + +var file_cloudquery_discovery_v0_discovery_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_cloudquery_discovery_v0_discovery_proto_goTypes = []interface{}{ + (*GetVersions)(nil), // 0: cloudquery.discovery.v0.GetVersions + (*GetVersions_Request)(nil), // 1: cloudquery.discovery.v0.GetVersions.Request + (*GetVersions_Response)(nil), // 2: cloudquery.discovery.v0.GetVersions.Response +} +var file_cloudquery_discovery_v0_discovery_proto_depIdxs = []int32{ + 1, // 0: cloudquery.discovery.v0.Discovery.GetVersions:input_type -> cloudquery.discovery.v0.GetVersions.Request + 2, // 1: cloudquery.discovery.v0.Discovery.GetVersions:output_type -> cloudquery.discovery.v0.GetVersions.Response + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_cloudquery_discovery_v0_discovery_proto_init() } +func file_cloudquery_discovery_v0_discovery_proto_init() { + if File_cloudquery_discovery_v0_discovery_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_cloudquery_discovery_v0_discovery_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetVersions); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_cloudquery_discovery_v0_discovery_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetVersions_Request); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_cloudquery_discovery_v0_discovery_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetVersions_Response); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_cloudquery_discovery_v0_discovery_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_cloudquery_discovery_v0_discovery_proto_goTypes, + DependencyIndexes: file_cloudquery_discovery_v0_discovery_proto_depIdxs, + MessageInfos: file_cloudquery_discovery_v0_discovery_proto_msgTypes, + }.Build() + File_cloudquery_discovery_v0_discovery_proto = out.File + file_cloudquery_discovery_v0_discovery_proto_rawDesc = nil + file_cloudquery_discovery_v0_discovery_proto_goTypes = nil + file_cloudquery_discovery_v0_discovery_proto_depIdxs = nil +} diff --git a/internal/pb/discovery/v0/discovery_grpc.pb.go b/internal/pb/discovery/v0/discovery_grpc.pb.go new file mode 100644 index 0000000000..bcdab222be --- /dev/null +++ b/internal/pb/discovery/v0/discovery_grpc.pb.go @@ -0,0 +1,107 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.21.12 +// source: cloudquery/discovery/v0/discovery.proto + +package discovery + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// DiscoveryClient is the client API for Discovery service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type DiscoveryClient interface { + // Get the name of the plugin + GetVersions(ctx context.Context, in *GetVersions_Request, opts ...grpc.CallOption) (*GetVersions_Response, error) +} + +type discoveryClient struct { + cc grpc.ClientConnInterface +} + +func NewDiscoveryClient(cc grpc.ClientConnInterface) DiscoveryClient { + return &discoveryClient{cc} +} + +func (c *discoveryClient) GetVersions(ctx context.Context, in *GetVersions_Request, opts ...grpc.CallOption) (*GetVersions_Response, error) { + out := new(GetVersions_Response) + err := c.cc.Invoke(ctx, "/cloudquery.discovery.v0.Discovery/GetVersions", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// DiscoveryServer is the server API for Discovery service. +// All implementations must embed UnimplementedDiscoveryServer +// for forward compatibility +type DiscoveryServer interface { + // Get the name of the plugin + GetVersions(context.Context, *GetVersions_Request) (*GetVersions_Response, error) + mustEmbedUnimplementedDiscoveryServer() +} + +// UnimplementedDiscoveryServer must be embedded to have forward compatible implementations. +type UnimplementedDiscoveryServer struct { +} + +func (UnimplementedDiscoveryServer) GetVersions(context.Context, *GetVersions_Request) (*GetVersions_Response, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetVersions not implemented") +} +func (UnimplementedDiscoveryServer) mustEmbedUnimplementedDiscoveryServer() {} + +// UnsafeDiscoveryServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to DiscoveryServer will +// result in compilation errors. +type UnsafeDiscoveryServer interface { + mustEmbedUnimplementedDiscoveryServer() +} + +func RegisterDiscoveryServer(s grpc.ServiceRegistrar, srv DiscoveryServer) { + s.RegisterService(&Discovery_ServiceDesc, srv) +} + +func _Discovery_GetVersions_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetVersions_Request) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DiscoveryServer).GetVersions(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/cloudquery.discovery.v0.Discovery/GetVersions", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DiscoveryServer).GetVersions(ctx, req.(*GetVersions_Request)) + } + return interceptor(ctx, in, info, handler) +} + +// Discovery_ServiceDesc is the grpc.ServiceDesc for Discovery service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Discovery_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "cloudquery.discovery.v0.Discovery", + HandlerType: (*DiscoveryServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "GetVersions", + Handler: _Discovery_GetVersions_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "cloudquery/discovery/v0/discovery.proto", +} diff --git a/internal/servers/destination/v0/destinations.go b/internal/servers/destination/v0/destinations.go index fa78a91dfb..411659c54a 100644 --- a/internal/servers/destination/v0/destinations.go +++ b/internal/servers/destination/v0/destinations.go @@ -1,4 +1,4 @@ -package servers +package destination import ( "context" diff --git a/internal/servers/discovery/v0/discovery.go b/internal/servers/discovery/v0/discovery.go new file mode 100644 index 0000000000..56b8f50c67 --- /dev/null +++ b/internal/servers/discovery/v0/discovery.go @@ -0,0 +1,16 @@ +package discovery + +import ( + "context" + + pb "github.com/cloudquery/plugin-sdk/internal/pb/discovery/v0" +) + +type DiscoveryServer struct { + pb.UnimplementedDiscoveryServer + Versions []string +} + +func (s *DiscoveryServer) GetVersions(context.Context, *pb.GetVersions_Request) (*pb.GetVersions_Response, error) { + return &pb.GetVersions_Response{Versions: s.Versions}, nil +} \ No newline at end of file diff --git a/serve/destination.go b/serve/destination.go index 2489ca457d..17b86d5894 100644 --- a/serve/destination.go +++ b/serve/destination.go @@ -9,7 +9,9 @@ import ( "sync" pbv0 "github.com/cloudquery/plugin-sdk/internal/pb/destination/v0" + pbdiscoveryv0 "github.com/cloudquery/plugin-sdk/internal/pb/discovery/v0" servers "github.com/cloudquery/plugin-sdk/internal/servers/destination/v0" + discoveryServerV0 "github.com/cloudquery/plugin-sdk/internal/servers/discovery/v0" "github.com/cloudquery/plugin-sdk/plugins/destination" "github.com/getsentry/sentry-go" grpczerolog "github.com/grpc-ecosystem/go-grpc-middleware/providers/zerolog/v2" @@ -112,6 +114,9 @@ func newCmdDestinationServe(serve *destinationServe) *cobra.Command { Plugin: serve.plugin, Logger: logger, }) + pbdiscoveryv0.RegisterDiscoveryServer(s, &discoveryServerV0.DiscoveryServer{ + Versions: []string{"v0"}, + }) version := serve.plugin.Version() if serve.sentryDSN != "" && !strings.EqualFold(version, "development") && !noSentry { diff --git a/serve/source.go b/serve/source.go index 13cfca9aa4..20562ffe91 100644 --- a/serve/source.go +++ b/serve/source.go @@ -8,8 +8,10 @@ import ( "strings" "sync" + pbdiscoveryv0 "github.com/cloudquery/plugin-sdk/internal/pb/discovery/v0" pbv0 "github.com/cloudquery/plugin-sdk/internal/pb/source/v0" pbv1 "github.com/cloudquery/plugin-sdk/internal/pb/source/v1" + discoveryServerV0 "github.com/cloudquery/plugin-sdk/internal/servers/discovery/v0" serversv0 "github.com/cloudquery/plugin-sdk/internal/servers/source/v0" serversv1 "github.com/cloudquery/plugin-sdk/internal/servers/source/v1" "github.com/cloudquery/plugin-sdk/plugins/source" @@ -125,6 +127,11 @@ func newCmdSourceServe(serve *sourceServe) *cobra.Command { Plugin: serve.plugin, Logger: logger, }) + pbdiscoveryv0.RegisterDiscoveryServer(s, &discoveryServerV0.DiscoveryServer{ + Versions: []string{"v0", "v1"}, + }) + + version := serve.plugin.Version() if serve.sentryDSN != "" && !strings.EqualFold(version, "development") && !noSentry { From c1ee05a481f5a0180658f608fa44eb3076eee66c Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Sat, 21 Jan 2023 19:24:16 +0200 Subject: [PATCH 2/4] fix lint --- clients/discovery/v0/discovery.go | 4 ++-- .../servers/destination/v0/destinations.go | 22 +++++++++---------- internal/servers/discovery/v0/discovery.go | 6 ++--- serve/destination.go | 4 ++-- serve/source.go | 5 ++--- 5 files changed, 20 insertions(+), 21 deletions(-) diff --git a/clients/discovery/v0/discovery.go b/clients/discovery/v0/discovery.go index f7727edf06..4b843419af 100644 --- a/clients/discovery/v0/discovery.go +++ b/clients/discovery/v0/discovery.go @@ -165,7 +165,7 @@ func (c *Client) newManagedClient(ctx context.Context, path string) error { return nil } -func (c *Client) GetVersions(ctx context.Context) ([]string, error){ +func (c *Client) GetVersions(ctx context.Context) ([]string, error) { res, err := c.pbClient.GetVersions(ctx, &pb.GetVersions_Request{}) if err != nil { return nil, fmt.Errorf("failed to call GetVersions: %w", err) @@ -200,4 +200,4 @@ func (c *Client) Terminate() error { } return nil -} \ No newline at end of file +} diff --git a/internal/servers/destination/v0/destinations.go b/internal/servers/destination/v0/destinations.go index 411659c54a..c6a2c0ff70 100644 --- a/internal/servers/destination/v0/destinations.go +++ b/internal/servers/destination/v0/destinations.go @@ -17,19 +17,19 @@ import ( "google.golang.org/grpc/status" ) -type DestinationServer struct { +type Server struct { pb.UnimplementedDestinationServer Plugin *destination.Plugin Logger zerolog.Logger } -func (*DestinationServer) GetProtocolVersion(context.Context, *pbBase.GetProtocolVersion_Request) (*pbBase.GetProtocolVersion_Response, error) { +func (*Server) GetProtocolVersion(context.Context, *pbBase.GetProtocolVersion_Request) (*pbBase.GetProtocolVersion_Response, error) { return &pbBase.GetProtocolVersion_Response{ Version: 2, }, nil } -func (s *DestinationServer) Configure(ctx context.Context, req *pbBase.Configure_Request) (*pbBase.Configure_Response, error) { +func (s *Server) Configure(ctx context.Context, req *pbBase.Configure_Request) (*pbBase.Configure_Response, error) { var spec specs.Destination if err := json.Unmarshal(req.Config, &spec); err != nil { return nil, status.Errorf(codes.InvalidArgument, "failed to unmarshal spec: %v", err) @@ -37,19 +37,19 @@ func (s *DestinationServer) Configure(ctx context.Context, req *pbBase.Configure return &pbBase.Configure_Response{}, s.Plugin.Init(ctx, s.Logger, spec) } -func (s *DestinationServer) GetName(context.Context, *pbBase.GetName_Request) (*pbBase.GetName_Response, error) { +func (s *Server) GetName(context.Context, *pbBase.GetName_Request) (*pbBase.GetName_Response, error) { return &pbBase.GetName_Response{ Name: s.Plugin.Name(), }, nil } -func (s *DestinationServer) GetVersion(context.Context, *pbBase.GetVersion_Request) (*pbBase.GetVersion_Response, error) { +func (s *Server) GetVersion(context.Context, *pbBase.GetVersion_Request) (*pbBase.GetVersion_Response, error) { return &pbBase.GetVersion_Response{ Version: s.Plugin.Version(), }, nil } -func (s *DestinationServer) Migrate(ctx context.Context, req *pb.Migrate_Request) (*pb.Migrate_Response, error) { +func (s *Server) Migrate(ctx context.Context, req *pb.Migrate_Request) (*pb.Migrate_Response, error) { var tables []*schema.Table if err := json.Unmarshal(req.Tables, &tables); err != nil { return nil, status.Errorf(codes.InvalidArgument, "failed to unmarshal tables: %v", err) @@ -57,13 +57,13 @@ func (s *DestinationServer) Migrate(ctx context.Context, req *pb.Migrate_Request return &pb.Migrate_Response{}, s.Plugin.Migrate(ctx, tables) } -func (*DestinationServer) Write(pb.Destination_WriteServer) error { +func (*Server) Write(pb.Destination_WriteServer) error { return status.Errorf(codes.Unimplemented, "method Write is deprecated please upgrade client") } // Note the order of operations in this method is important! // Trying to insert into the `resources` channel before starting the reader goroutine will cause a deadlock. -func (s *DestinationServer) Write2(msg pb.Destination_Write2Server) error { +func (s *Server) Write2(msg pb.Destination_Write2Server) error { resources := make(chan schema.DestinationResource) r, err := msg.Recv() @@ -131,7 +131,7 @@ func (s *DestinationServer) Write2(msg pb.Destination_Write2Server) error { } } -func (s *DestinationServer) GetMetrics(context.Context, *pb.GetDestinationMetrics_Request) (*pb.GetDestinationMetrics_Response, error) { +func (s *Server) GetMetrics(context.Context, *pb.GetDestinationMetrics_Request) (*pb.GetDestinationMetrics_Response, error) { stats := s.Plugin.Metrics() b, err := json.Marshal(stats) if err != nil { @@ -142,7 +142,7 @@ func (s *DestinationServer) GetMetrics(context.Context, *pb.GetDestinationMetric }, nil } -func (s *DestinationServer) DeleteStale(ctx context.Context, req *pb.DeleteStale_Request) (*pb.DeleteStale_Response, error) { +func (s *Server) DeleteStale(ctx context.Context, req *pb.DeleteStale_Request) (*pb.DeleteStale_Response, error) { var tables schema.Tables if err := json.Unmarshal(req.Tables, &tables); err != nil { return nil, status.Errorf(codes.InvalidArgument, "failed to unmarshal tables: %v", err) @@ -154,6 +154,6 @@ func (s *DestinationServer) DeleteStale(ctx context.Context, req *pb.DeleteStale return &pb.DeleteStale_Response{}, nil } -func (s *DestinationServer) Close(ctx context.Context, _ *pb.Close_Request) (*pb.Close_Response, error) { +func (s *Server) Close(ctx context.Context, _ *pb.Close_Request) (*pb.Close_Response, error) { return &pb.Close_Response{}, s.Plugin.Close(ctx) } diff --git a/internal/servers/discovery/v0/discovery.go b/internal/servers/discovery/v0/discovery.go index 56b8f50c67..d02a563135 100644 --- a/internal/servers/discovery/v0/discovery.go +++ b/internal/servers/discovery/v0/discovery.go @@ -6,11 +6,11 @@ import ( pb "github.com/cloudquery/plugin-sdk/internal/pb/discovery/v0" ) -type DiscoveryServer struct { +type Server struct { pb.UnimplementedDiscoveryServer Versions []string } -func (s *DiscoveryServer) GetVersions(context.Context, *pb.GetVersions_Request) (*pb.GetVersions_Response, error) { +func (s *Server) GetVersions(context.Context, *pb.GetVersions_Request) (*pb.GetVersions_Response, error) { return &pb.GetVersions_Response{Versions: s.Versions}, nil -} \ No newline at end of file +} diff --git a/serve/destination.go b/serve/destination.go index 17b86d5894..401c0c574f 100644 --- a/serve/destination.go +++ b/serve/destination.go @@ -110,11 +110,11 @@ func newCmdDestinationServe(serve *destinationServe) *cobra.Command { grpc.MaxRecvMsgSize(pbv0.MaxMsgSize), grpc.MaxSendMsgSize(pbv0.MaxMsgSize), ) - pbv0.RegisterDestinationServer(s, &servers.DestinationServer{ + pbv0.RegisterDestinationServer(s, &servers.Server{ Plugin: serve.plugin, Logger: logger, }) - pbdiscoveryv0.RegisterDiscoveryServer(s, &discoveryServerV0.DiscoveryServer{ + pbdiscoveryv0.RegisterDiscoveryServer(s, &discoveryServerV0.Server{ Versions: []string{"v0"}, }) version := serve.plugin.Version() diff --git a/serve/source.go b/serve/source.go index 20562ffe91..6c01f776a0 100644 --- a/serve/source.go +++ b/serve/source.go @@ -127,11 +127,10 @@ func newCmdSourceServe(serve *sourceServe) *cobra.Command { Plugin: serve.plugin, Logger: logger, }) - pbdiscoveryv0.RegisterDiscoveryServer(s, &discoveryServerV0.DiscoveryServer{ + pbdiscoveryv0.RegisterDiscoveryServer(s, &discoveryServerV0.Server{ Versions: []string{"v0", "v1"}, }) - - + version := serve.plugin.Version() if serve.sentryDSN != "" && !strings.EqualFold(version, "development") && !noSentry { From d84ba09dfdfd93867332d08c5947533c873113e2 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Sat, 21 Jan 2023 19:27:19 +0200 Subject: [PATCH 3/4] nits --- .../discovery/v0/{destination_terminate.go => terminate.go} | 6 +++--- ...estination_terminate_windows.go => terminate_windows.go} | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) rename clients/discovery/v0/{destination_terminate.go => terminate.go} (72%) rename clients/discovery/v0/{destination_terminate_windows.go => terminate_windows.go} (73%) diff --git a/clients/discovery/v0/destination_terminate.go b/clients/discovery/v0/terminate.go similarity index 72% rename from clients/discovery/v0/destination_terminate.go rename to clients/discovery/v0/terminate.go index 8f17c3c3ea..1b11c4e103 100644 --- a/clients/discovery/v0/destination_terminate.go +++ b/clients/discovery/v0/terminate.go @@ -10,11 +10,11 @@ import ( func (c *Client) terminateProcess() error { if err := c.cmd.Process.Signal(os.Interrupt); err != nil { - c.logger.Error().Err(err).Msg("failed to send interrupt signal to destination plugin") + c.logger.Error().Err(err).Msg("failed to send interrupt signal to plugin") } timer := time.AfterFunc(5*time.Second, func() { if err := c.cmd.Process.Kill(); err != nil { - c.logger.Error().Err(err).Msg("failed to kill destination plugin") + c.logger.Error().Err(err).Msg("failed to kill plugin") } }) st, err := c.cmd.Process.Wait() @@ -23,7 +23,7 @@ func (c *Client) terminateProcess() error { return err } if !st.Success() { - return fmt.Errorf("destination plugin process exited with status %s", st.String()) + return fmt.Errorf("plugin process exited with status %s", st.String()) } return nil diff --git a/clients/discovery/v0/destination_terminate_windows.go b/clients/discovery/v0/terminate_windows.go similarity index 73% rename from clients/discovery/v0/destination_terminate_windows.go rename to clients/discovery/v0/terminate_windows.go index ff5ffed3f5..387f6e35dc 100644 --- a/clients/discovery/v0/destination_terminate_windows.go +++ b/clients/discovery/v0/terminate_windows.go @@ -4,7 +4,7 @@ package discovery func (c *Client) terminateProcess() error { if err := c.cmd.Process.Kill(); err != nil { - c.logger.Error().Err(err).Msg("failed to kill destination plugin") + c.logger.Error().Err(err).Msg("failed to kill plugin") } st, err := c.cmd.Process.Wait() if err != nil { @@ -13,7 +13,7 @@ func (c *Client) terminateProcess() error { if !st.Success() { // on windows there is no way to shutdown gracefully via signal. Maybe we can do it via grpc api? // though it is a bit strange to expose api to shutdown a server :thinking?: - c.logger.Info().Msgf("destination plugin process exited with %s", st.String()) + c.logger.Info().Msgf("plugin process exited with %s", st.String()) } return nil From 4054a7585f1e69585b20d173b6379e356b6de2b1 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats Date: Mon, 23 Jan 2023 10:49:44 +0200 Subject: [PATCH 4/4] Update clients/discovery/v0/discovery.go Co-authored-by: Erez Rokah --- clients/discovery/v0/discovery.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/discovery/v0/discovery.go b/clients/discovery/v0/discovery.go index 4b843419af..c556b7afc0 100644 --- a/clients/discovery/v0/discovery.go +++ b/clients/discovery/v0/discovery.go @@ -102,7 +102,7 @@ func NewClient(ctx context.Context, registrySpec specs.Registry, pluginType regi return c, nil } -// newManagedClient starts a new destination plugin process from local file, connects to it via gRPC server +// newManagedClient starts a new discovery plugin process from local file, connects to it via gRPC server // and returns a new Client func (c *Client) newManagedClient(ctx context.Context, path string) error { c.grpcSocketName = random.GenerateRandomUnixSocketName()