diff --git a/plugins/destination/sqlite/client/client.go b/plugins/destination/sqlite/client/client.go index 575b1fef34c0dc..89b86c39410e7b 100644 --- a/plugins/destination/sqlite/client/client.go +++ b/plugins/destination/sqlite/client/client.go @@ -6,7 +6,7 @@ import ( "fmt" "github.com/cloudquery/plugin-pb-go/specs" - "github.com/cloudquery/plugin-sdk/v2/plugins/destination" + "github.com/cloudquery/plugin-sdk/v3/plugins/destination" "github.com/rs/zerolog" // Import sqlite3 driver diff --git a/plugins/destination/sqlite/client/client_test.go b/plugins/destination/sqlite/client/client_test.go index b49c53627f56a2..2ea8e0ac1a0d1d 100644 --- a/plugins/destination/sqlite/client/client_test.go +++ b/plugins/destination/sqlite/client/client_test.go @@ -4,7 +4,7 @@ import ( "testing" "github.com/cloudquery/plugin-pb-go/specs" - "github.com/cloudquery/plugin-sdk/v2/plugins/destination" + "github.com/cloudquery/plugin-sdk/v3/plugins/destination" ) var migrateStrategy = destination.MigrateStrategy{ diff --git a/plugins/destination/sqlite/client/deletestale.go b/plugins/destination/sqlite/client/deletestale.go index c2f04b31276d1c..34f7e7e0ad332b 100644 --- a/plugins/destination/sqlite/client/deletestale.go +++ b/plugins/destination/sqlite/client/deletestale.go @@ -5,14 +5,14 @@ import ( "strings" "time" - "github.com/cloudquery/plugin-sdk/v2/schema" + "github.com/cloudquery/plugin-sdk/v3/schema" ) -func (c *Client) DeleteStale(ctx context.Context, tables schema.Schemas, source string, syncTime time.Time) error { +func (c *Client) DeleteStale(ctx context.Context, tables schema.Tables, source string, syncTime time.Time) error { for _, table := range tables { var sb strings.Builder sb.WriteString("delete from ") - sb.WriteString(`"` + schema.TableName(table) + `"`) + sb.WriteString(`"` + table.Name + `"`) sb.WriteString(" where ") sb.WriteString(`"` + schema.CqSourceNameColumn.Name + `"`) sb.WriteString(" = $1 and datetime(") diff --git a/plugins/destination/sqlite/client/metrics.go b/plugins/destination/sqlite/client/metrics.go index 6e0bc82117c87a..0abe9f700b726d 100644 --- a/plugins/destination/sqlite/client/metrics.go +++ b/plugins/destination/sqlite/client/metrics.go @@ -1,7 +1,7 @@ package client import ( - "github.com/cloudquery/plugin-sdk/v2/plugins/destination" + "github.com/cloudquery/plugin-sdk/v3/plugins/destination" ) func (c *Client) Metrics() destination.Metrics { diff --git a/plugins/destination/sqlite/client/migrate.go b/plugins/destination/sqlite/client/migrate.go index dcec74916e61dd..336ab99c35a0a1 100644 --- a/plugins/destination/sqlite/client/migrate.go +++ b/plugins/destination/sqlite/client/migrate.go @@ -7,7 +7,7 @@ import ( "github.com/apache/arrow/go/v13/arrow" "github.com/cloudquery/plugin-pb-go/specs" - "github.com/cloudquery/plugin-sdk/v2/schema" + "github.com/cloudquery/plugin-sdk/v3/schema" ) const ( @@ -27,15 +27,11 @@ type tableInfo struct { columns []columnInfo } -func (c *Client) sqliteTables(schemas schema.Schemas) (schema.Schemas, error) { - var schemaTables schema.Schemas - for _, sc := range schemas { - var fields []arrow.Field - tableName := schema.TableName(sc) - if tableName == "" { - return nil, fmt.Errorf("schema %s has no table name", sc.String()) - } - info, err := c.getTableInfo(tableName) +func (c *Client) sqliteTables(tables schema.Tables) (schema.Tables, error) { + var schemaTables schema.Tables + for _, table := range tables { + var columns []schema.Column + info, err := c.getTableInfo(table.Name) if info == nil { continue } @@ -43,77 +39,66 @@ func (c *Client) sqliteTables(schemas schema.Schemas) (schema.Schemas, error) { return nil, err } for _, col := range info.columns { - var fieldMetadata schema.MetadataFieldOptions - if col.pk != 0 { - fieldMetadata.PrimaryKey = true - } - fields = append(fields, arrow.Field{ - Name: col.name, - Type: c.sqliteTypeToArrowType(col.typ), - Nullable: !col.notNull, - Metadata: schema.NewFieldMetadataFromOptions(fieldMetadata), + columns = append(columns, schema.Column{ + Name: col.name, + Type: c.sqliteTypeToArrowType(col.typ), + PrimaryKey: col.pk != 0, + NotNull: col.notNull, }) } - var tableMetadata schema.MetadataSchemaOptions - tableMetadata.TableName = tableName - m := schema.NewSchemaMetadataFromOptions(tableMetadata) - schemaTables = append(schemaTables, arrow.NewSchema(fields, &m)) + schemaTables = append(schemaTables, &schema.Table{Name: table.Name, Columns: columns}) } return schemaTables, nil } -func (c *Client) normalizeSchemas(scs schema.Schemas) schema.Schemas { - var normalized schema.Schemas - for _, sc := range scs { - fields := make([]arrow.Field, 0) - for _, f := range sc.Fields() { - keys := make([]string, 0) - values := make([]string, 0) - origKeys := f.Metadata.Keys() - origValues := f.Metadata.Values() - for k, v := range origKeys { - if v != schema.MetadataUnique { - keys = append(keys, v) - values = append(values, origValues[k]) - } - } - fields = append(fields, arrow.Field{ - Name: f.Name, - Type: c.arrowTypeToSqlite(f.Type), - Nullable: f.Nullable, - Metadata: arrow.NewMetadata(keys, values), - }) - } +func (c *Client) normalizeTables(tables schema.Tables) schema.Tables { + flattened := tables.FlattenTables() + normalized := make(schema.Tables, len(flattened)) + for i, table := range flattened { + normalized[i] = c.normalizeTable(table) + } + return normalized +} - md := sc.Metadata() - normalized = append(normalized, arrow.NewSchema(fields, &md)) +func (c *Client) normalizeTable(table *schema.Table) *schema.Table { + columns := make([]schema.Column, len(table.Columns)) + for i, col := range table.Columns { + normalized := c.normalizeField(col.ToArrowField()) + columns[i] = schema.NewColumnFromArrowField(*normalized) } + return &schema.Table{Name: table.Name, Columns: columns} +} - return normalized +func (c *Client) normalizeField(field arrow.Field) *arrow.Field { + return &arrow.Field{ + Name: field.Name, + Type: c.arrowTypeToSqlite(field.Type), + Nullable: field.Nullable, + Metadata: field.Metadata, + } } -func (c *Client) nonAutoMigrableTables(tables schema.Schemas, sqliteTables schema.Schemas) ([]string, [][]schema.FieldChange) { +func (c *Client) nonAutoMigratableTables(tables schema.Tables, sqliteTables schema.Tables) ([]string, [][]schema.TableColumnChange) { var result []string - var tableChanges [][]schema.FieldChange + var tableChanges [][]schema.TableColumnChange for _, t := range tables { - tableName := schema.TableName(t) - sqliteTable := sqliteTables.SchemaByName(tableName) + sqliteTable := sqliteTables.Get(t.Name) if sqliteTable == nil { continue } - changes := schema.GetSchemaChanges(t, sqliteTable) + changes := sqliteTable.GetChanges(t) if !c.canAutoMigrate(changes) { - result = append(result, tableName) + result = append(result, t.Name) tableChanges = append(tableChanges, changes) } } return result, tableChanges } -func (c *Client) autoMigrateTable(table *arrow.Schema, changes []schema.FieldChange) error { +func (c *Client) autoMigrateTable(table *schema.Table, changes []schema.TableColumnChange) error { for _, change := range changes { if change.Type == schema.TableColumnChangeTypeAdd { - if err := c.addColumn(schema.TableName(table), change.Current.Name, c.arrowTypeToSqliteStr(change.Current.Type)); err != nil { + if err := c.addColumn(table.Name, change.Current.Name, c.arrowTypeToSqliteStr(change.Current.Type)); err != nil { return err } } @@ -121,64 +106,63 @@ func (c *Client) autoMigrateTable(table *arrow.Schema, changes []schema.FieldCha return nil } -func (*Client) canAutoMigrate(changes []schema.FieldChange) bool { +func (*Client) canAutoMigrate(changes []schema.TableColumnChange) bool { for _, change := range changes { - if change.Type == schema.TableColumnChangeTypeAdd && (schema.IsPk(change.Current) || !change.Current.Nullable) { - return false - } - - if change.Type == schema.TableColumnChangeTypeRemove && (schema.IsPk(change.Previous) || !change.Previous.Nullable) { - return false - } - - if change.Type == schema.TableColumnChangeTypeUpdate { + switch change.Type { + case schema.TableColumnChangeTypeAdd: + if change.Current.PrimaryKey || change.Current.NotNull { + return false + } + case schema.TableColumnChangeTypeRemove: + if change.Previous.PrimaryKey || change.Previous.NotNull { + return false + } + case schema.TableColumnChangeTypeUpdate: return false + default: + panic("unknown change type") } } return true } // This is the responsibility of the CLI of the client to lock before running migration -func (c *Client) Migrate(ctx context.Context, schemas schema.Schemas) error { - schemas = c.normalizeSchemas(schemas) - sqliteTables, err := c.sqliteTables(schemas) +func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error { + normalizedTables := c.normalizeTables(tables) + sqliteTables, err := c.sqliteTables(normalizedTables) if err != nil { return err } if c.spec.MigrateMode != specs.MigrateModeForced { - nonAutoMigrableTables, changes := c.nonAutoMigrableTables(schemas, sqliteTables) - if len(nonAutoMigrableTables) > 0 { - return fmt.Errorf("tables %s with changes %v require force migration. use 'migrate_mode: forced'", strings.Join(nonAutoMigrableTables, ","), changes) + nonAutoMigratableTables, changes := c.nonAutoMigratableTables(normalizedTables, sqliteTables) + if len(nonAutoMigratableTables) > 0 { + return fmt.Errorf("tables %s with changes %v require force migration. use 'migrate_mode: forced'", strings.Join(nonAutoMigratableTables, ","), changes) } } - for _, table := range schemas { - tableName := schema.TableName(table) - if tableName == "" { - return fmt.Errorf("schema %s has no table name", table.String()) - } - c.logger.Info().Str("table", tableName).Msg("Migrating table") - if len(table.Fields()) == 0 { - c.logger.Info().Str("table", tableName).Msg("Table with no columns, skipping") + for _, table := range normalizedTables { + c.logger.Info().Str("table", table.Name).Msg("Migrating table") + if len(table.Columns) == 0 { + c.logger.Info().Str("table", table.Name).Msg("Table with no columns, skipping") continue } - sqlite := sqliteTables.SchemaByName(tableName) + sqlite := sqliteTables.Get(table.Name) if sqlite == nil { - c.logger.Debug().Str("table", tableName).Msg("Table doesn't exist, creating") + c.logger.Debug().Str("table", table.Name).Msg("Table doesn't exist, creating") if err := c.createTableIfNotExist(table); err != nil { return err } } else { - changes := schema.GetSchemaChanges(table, sqlite) + changes := table.GetChanges(sqlite) if c.canAutoMigrate(changes) { - c.logger.Info().Str("table", tableName).Msg("Table exists, auto-migrating") + c.logger.Info().Str("table", table.Name).Msg("Table exists, auto-migrating") if err := c.autoMigrateTable(table, changes); err != nil { return err } } else { - c.logger.Info().Str("table", tableName).Msg("Table exists, force migration required") + c.logger.Info().Str("table", table.Name).Msg("Table exists, force migration required") if err := c.recreateTable(table); err != nil { return err } @@ -189,14 +173,10 @@ func (c *Client) Migrate(ctx context.Context, schemas schema.Schemas) error { return nil } -func (c *Client) recreateTable(table *arrow.Schema) error { - tableName, ok := table.Metadata().GetValue(schema.MetadataTableName) - if !ok { - return fmt.Errorf("schema %s has no table name", table.String()) - } - sql := "drop table if exists \"" + tableName + "\"" +func (c *Client) recreateTable(table *schema.Table) error { + sql := "drop table if exists \"" + table.Name + "\"" if _, err := c.db.Exec(sql); err != nil { - return fmt.Errorf("failed to drop table %s: %w", tableName, err) + return fmt.Errorf("failed to drop table %s: %w", table.Name, err) } return c.createTableIfNotExist(table) } @@ -209,28 +189,25 @@ func (c *Client) addColumn(tableName string, columnName string, columnType strin return nil } -func (c *Client) createTableIfNotExist(sc *arrow.Schema) error { +func (c *Client) createTableIfNotExist(table *schema.Table) error { var sb strings.Builder - tableName, ok := sc.Metadata().GetValue(schema.MetadataTableName) - if !ok { - return fmt.Errorf("schema %s has no table name", sc.String()) - } - // TODO sanitize tablename + + // TODO sanitize table.Name sb.WriteString("CREATE TABLE IF NOT EXISTS ") - sb.WriteString(`"` + tableName + `"`) + sb.WriteString(`"` + table.Name + `"`) sb.WriteString(" (") - totalColumns := len(sc.Fields()) + totalColumns := len(table.Columns) primaryKeys := []string{} - for i, col := range sc.Fields() { + for i, col := range table.Columns { sqlType := c.arrowTypeToSqliteStr(col.Type) if sqlType == "" { - c.logger.Warn().Str("table", tableName).Str("column", col.Name).Msg("Column type is not supported, skipping") + c.logger.Warn().Str("table", table.Name).Str("column", col.Name).Msg("Column type is not supported, skipping") continue } // TODO: sanitize column name fieldDef := `"` + col.Name + `" ` + sqlType - if !col.Nullable { + if col.NotNull { fieldDef += " NOT NULL" } sb.WriteString(fieldDef) @@ -238,7 +215,7 @@ func (c *Client) createTableIfNotExist(sc *arrow.Schema) error { sb.WriteString(",") } - if c.enabledPks() && schema.IsPk(col) { + if c.enabledPks() && col.PrimaryKey { primaryKeys = append(primaryKeys, `"`+col.Name+`"`) } } @@ -246,7 +223,7 @@ func (c *Client) createTableIfNotExist(sc *arrow.Schema) error { if len(primaryKeys) > 0 { // add composite PK constraint on primary key columns sb.WriteString(", CONSTRAINT ") - sb.WriteString(tableName) + sb.WriteString(table.Name) sb.WriteString("_cqpk PRIMARY KEY (") sb.WriteString(strings.Join(primaryKeys, ",")) sb.WriteString(")") diff --git a/plugins/destination/sqlite/client/read.go b/plugins/destination/sqlite/client/read.go index 341b176f0649d7..6c9d9568631277 100644 --- a/plugins/destination/sqlite/client/read.go +++ b/plugins/destination/sqlite/client/read.go @@ -9,7 +9,7 @@ import ( "github.com/apache/arrow/go/v13/arrow" "github.com/apache/arrow/go/v13/arrow/array" "github.com/apache/arrow/go/v13/arrow/memory" - "github.com/cloudquery/plugin-sdk/v2/schema" + "github.com/cloudquery/plugin-sdk/v3/schema" ) const ( @@ -127,7 +127,7 @@ func reverseTransform(sc *arrow.Schema, values []any) (arrow.Record, error) { } else { bldr.Field(i).(*array.StringBuilder).Append(val.(*sql.NullString).String) } - case arrow.BINARY: + case arrow.BINARY, arrow.LARGE_BINARY: if *val.(*[]byte) == nil { bldr.Field(i).AppendNull() } else { @@ -148,23 +148,23 @@ func reverseTransform(sc *arrow.Schema, values []any) (arrow.Record, error) { return rec, nil } -func (c *Client) Read(ctx context.Context, table *arrow.Schema, sourceName string, res chan<- arrow.Record) error { - colNames := make([]string, 0, len(table.Fields())) - for _, col := range table.Fields() { - colNames = append(colNames, `"`+col.Name+`"`) +func (c *Client) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- arrow.Record) error { + colNames := make([]string, len(table.Columns)) + for i, col := range table.Columns { + colNames[i] = `"` + col.Name + `"` } cols := strings.Join(colNames, ", ") - tableName := schema.TableName(table) - rows, err := c.db.Query(fmt.Sprintf(readSQL, cols, tableName), sourceName) + rows, err := c.db.Query(fmt.Sprintf(readSQL, cols, table.Name), sourceName) if err != nil { return err } + arrowSchema := table.ToArrowSchema() for rows.Next() { - values := c.createResultsArray(table) + values := c.createResultsArray(arrowSchema) if err := rows.Scan(values...); err != nil { - return fmt.Errorf("failed to read from table %s: %w", tableName, err) + return fmt.Errorf("failed to read from table %s: %w", table.Name, err) } - record, err := reverseTransform(table, values) + record, err := reverseTransform(arrowSchema, values) if err != nil { return err } diff --git a/plugins/destination/sqlite/client/transformer.go b/plugins/destination/sqlite/client/transformer.go index 7d938c13997c8f..180219291cef2e 100644 --- a/plugins/destination/sqlite/client/transformer.go +++ b/plugins/destination/sqlite/client/transformer.go @@ -36,6 +36,8 @@ func getValue(arr arrow.Array, i int) any { return arr.(*array.String).Value(i) case arrow.BINARY: return arr.(*array.Binary).Value(i) + case arrow.LARGE_BINARY: + return arr.(*array.LargeBinary).Value(i) case arrow.FIXED_SIZE_BINARY: return arr.(*array.FixedSizeBinary).Value(i) default: diff --git a/plugins/destination/sqlite/client/write.go b/plugins/destination/sqlite/client/write.go index 0b2ead909e4a8b..c35eaf8dccd3cc 100644 --- a/plugins/destination/sqlite/client/write.go +++ b/plugins/destination/sqlite/client/write.go @@ -7,10 +7,10 @@ import ( "github.com/apache/arrow/go/v13/arrow" "github.com/cloudquery/plugin-pb-go/specs" - "github.com/cloudquery/plugin-sdk/v2/schema" + "github.com/cloudquery/plugin-sdk/v3/schema" ) -func (c *Client) Write(ctx context.Context, tables schema.Schemas, res <-chan arrow.Record) error { +func (c *Client) Write(ctx context.Context, tables schema.Tables, res <-chan arrow.Record) error { var sql string for r := range res { if c.spec.WriteMode == specs.WriteModeAppend { diff --git a/plugins/destination/sqlite/go.mod b/plugins/destination/sqlite/go.mod index a4f7cc9e55739b..8554d200a0cf9a 100644 --- a/plugins/destination/sqlite/go.mod +++ b/plugins/destination/sqlite/go.mod @@ -5,7 +5,7 @@ go 1.19 require ( github.com/apache/arrow/go/v13 v13.0.0-20230509040948-de6c3cd2b604 github.com/cloudquery/plugin-pb-go v1.0.8 - github.com/cloudquery/plugin-sdk/v2 v2.7.0 + github.com/cloudquery/plugin-sdk/v3 v3.5.1 github.com/mattn/go-sqlite3 v1.14.16 github.com/rs/zerolog v1.29.0 ) @@ -16,6 +16,7 @@ replace github.com/apache/arrow/go/v13 => github.com/cloudquery/arrow/go/v13 v13 require ( github.com/andybalholm/brotli v1.0.5 // indirect github.com/apache/thrift v0.16.0 // indirect + github.com/cloudquery/plugin-sdk/v2 v2.7.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/getsentry/sentry-go v0.20.0 // indirect github.com/ghodss/yaml v1.0.0 // indirect @@ -34,6 +35,7 @@ require ( github.com/mattn/go-isatty v0.0.18 // indirect github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect + github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/spf13/cast v1.5.0 // indirect github.com/spf13/cobra v1.6.1 // indirect diff --git a/plugins/destination/sqlite/go.sum b/plugins/destination/sqlite/go.sum index 34f5f41224b04f..b0956ed7611702 100644 --- a/plugins/destination/sqlite/go.sum +++ b/plugins/destination/sqlite/go.sum @@ -50,6 +50,10 @@ github.com/cloudquery/plugin-pb-go v1.0.8 h1:wn3GXhcNItcP+6wUUZuzUFbvdL59liKBO37 github.com/cloudquery/plugin-pb-go v1.0.8/go.mod h1:vAGA27psem7ZZNAY4a3S9TKuA/JDQWstjKcHPJX91Mc= github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U= github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug= +github.com/cloudquery/plugin-sdk/v3 v3.4.0 h1:Vy2BfZu4b283kKsT5VuzDbYYgLyiBeU+3AU0+qvywbQ= +github.com/cloudquery/plugin-sdk/v3 v3.4.0/go.mod h1:3JrZXEULmGXpkOukVaRIzaA63d7TJr9Ukp6hemTjbtc= +github.com/cloudquery/plugin-sdk/v3 v3.5.1 h1:797hWUEsojwvp7xtr6LSaf5tk5iG9UDixoRACxu3xrU= +github.com/cloudquery/plugin-sdk/v3 v3.5.1/go.mod h1:3JrZXEULmGXpkOukVaRIzaA63d7TJr9Ukp6hemTjbtc= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= @@ -176,6 +180,7 @@ github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8D github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= diff --git a/plugins/destination/sqlite/main.go b/plugins/destination/sqlite/main.go index f2ef7ca54df647..57661d7748d98e 100644 --- a/plugins/destination/sqlite/main.go +++ b/plugins/destination/sqlite/main.go @@ -3,8 +3,8 @@ package main import ( "github.com/cloudquery/cloudquery/plugins/destination/sqlite/client" "github.com/cloudquery/cloudquery/plugins/destination/sqlite/resources/plugin" - "github.com/cloudquery/plugin-sdk/v2/plugins/destination" - "github.com/cloudquery/plugin-sdk/v2/serve" + "github.com/cloudquery/plugin-sdk/v3/plugins/destination" + "github.com/cloudquery/plugin-sdk/v3/serve" ) const (