Skip to content

Commit 26a971c

Browse files
committed
Merge remote-tracking branch 'upstream/main' into gcp-enabled-services
2 parents e3cc278 + 2f130c1 commit 26a971c

34 files changed

Lines changed: 172 additions & 306 deletions

.github/workflows/dest_postgresql.yml

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,13 @@ jobs:
6060
run: make test
6161
# cockroachdb doesn't support services right now
6262
# https://stackoverflow.com/questions/73648218/github-action-setup-secure-cockroachdb
63-
- name: Spin CockroachDB
64-
run: |
65-
docker run -d -p 26257:26257 cockroachdb/cockroach:v22.1.8 start-single-node --insecure
66-
sudo apt update && sudo apt install wait-for-it -y
67-
wait-for-it -h localhost -p 26257
68-
- name: Test CockroachDB
69-
run: CQ_DEST_PG_TEST_CONN="postgresql://root@localhost:26257/postgres?sslmode=disable" make test
63+
# - name: Spin CockroachDB
64+
# run: |
65+
# docker run -d -p 26257:26257 cockroachdb/cockroach:v22.1.8 start-single-node --insecure
66+
# sudo apt update && sudo apt install wait-for-it -y
67+
# wait-for-it -h localhost -p 26257
68+
# - name: Test CockroachDB
69+
# run: CQ_DEST_PG_TEST_CONN="postgresql://root@localhost:26257/postgres?sslmode=disable" make test
7070
validate-release:
7171
timeout-minutes: 30
7272
runs-on: ubuntu-latest

plugins/destination/bigquery/client/client.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,21 @@ import (
1212
)
1313

1414
type Client struct {
15+
destination.UnimplementedUnmanagedWriter
1516
destination.DefaultReverseTransformer
1617
logger zerolog.Logger
1718
spec specs.Destination
1819
metrics destination.Metrics
1920
batchSize int
2021
pluginSpec Spec
22+
client *bigquery.Client
2123
}
2224

2325
func New(ctx context.Context, logger zerolog.Logger, destSpec specs.Destination) (destination.Client, error) {
2426
if destSpec.WriteMode != specs.WriteModeAppend {
2527
return nil, fmt.Errorf("bigquery destination only supports append mode")
2628
}
29+
var err error
2730
c := &Client{
2831
logger: logger.With().Str("module", "bq-dest").Logger(),
2932
spec: destSpec,
@@ -39,13 +42,14 @@ func New(ctx context.Context, logger zerolog.Logger, destSpec specs.Destination)
3942

4043
c.pluginSpec = spec
4144
c.batchSize = spec.BatchSize
42-
// create a client to test that we can do it, but new clients will also be instantiated
43-
// for queries so that we can use a new context there.
44-
client, err := c.bqClient(ctx)
45+
46+
// the context here is used for token refresh so this is workaround as suggested
47+
// https://github.com/googleapis/google-cloud-go/issues/946
48+
// https://github.com/googleapis/google-cloud-go/commit/2d59af0cb37fb29e5b7980a15088938778f117c7
49+
c.client, err = c.bqClient(context.Background())
4550
if err != nil {
46-
return nil, fmt.Errorf("failed to create new BigQuery client: %w", err)
51+
return nil, err
4752
}
48-
defer client.Close()
4953

5054
return c, nil
5155
}
@@ -65,6 +69,6 @@ func (c *Client) bqClient(ctx context.Context) (*bigquery.Client, error) {
6569
return client, nil
6670
}
6771

68-
func (*Client) Close(_ context.Context) error {
69-
return nil
72+
func (c *Client) Close(_ context.Context) error {
73+
return c.client.Close()
7074
}

plugins/destination/bigquery/client/client_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ import (
88
)
99

1010
func TestPlugin(t *testing.T) {
11-
p := destination.NewPlugin("bigquery", "development", New)
11+
p := destination.NewPlugin("bigquery", "development", New, destination.WithManagedWriter())
1212
destination.PluginTestSuiteRunner(t, p,
1313
Spec{
1414
ProjectID: os.Getenv("BIGQUERY_PROJECT_ID"),
1515
DatasetID: os.Getenv("BIGQUERY_DATASET_ID"),
1616
TimePartitioning: "none",
1717
},
18-
destination.TestSuiteTests{
18+
destination.PluginTestSuiteTests{
1919
SkipOverwrite: true,
2020
})
2121
}

plugins/destination/bigquery/client/migrate.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,35 +20,31 @@ const (
2020

2121
// Migrate tables. It is the responsibility of the CLI of the client to lock before running migrations.
2222
func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error {
23-
client, err := c.bqClient(ctx)
24-
if err != nil {
25-
return fmt.Errorf("failed to create client: %w", err)
26-
}
2723
eg, gctx := errgroup.WithContext(ctx)
2824
eg.SetLimit(concurrentMigrations)
2925
for _, table := range tables.FlattenTables() {
3026
table := table
3127
eg.Go(func() error {
3228
c.logger.Debug().Str("table", table.Name).Msg("Migrating table")
33-
tableExists, err := c.doesTableExist(gctx, client, table.Name)
29+
tableExists, err := c.doesTableExist(gctx, c.client, table.Name)
3430
if err != nil {
3531
return fmt.Errorf("failed to check if table %s exists: %w", table.Name, err)
3632
}
3733
if tableExists {
3834
c.logger.Debug().Str("table", table.Name).Msg("Table exists, auto-migrating")
39-
if err := c.autoMigrateTable(gctx, client, table); err != nil {
35+
if err := c.autoMigrateTable(gctx, c.client, table); err != nil {
4036
return err
4137
}
42-
err = c.waitForSchemaToMatch(gctx, client, table)
38+
err = c.waitForSchemaToMatch(gctx, c.client, table)
4339
if err != nil {
4440
return err
4541
}
4642
} else {
4743
c.logger.Debug().Str("table", table.Name).Msg("Table doesn't exist, creating")
48-
if err := c.createTable(gctx, client, table); err != nil {
44+
if err := c.createTable(gctx, c.client, table); err != nil {
4945
return err
5046
}
51-
err = c.waitForTableToExist(gctx, client, table)
47+
err = c.waitForTableToExist(gctx, c.client, table)
5248
if err != nil {
5349
return err
5450
}
@@ -68,7 +64,7 @@ func (c *Client) doesTableExist(ctx context.Context, client *bigquery.Client, ta
6864
return false, nil
6965
}
7066
}
71-
c.logger.Error().Err(err).Msg("Got unexpected error while checking table metadata")
67+
c.logger.Error().Str("dataset", c.pluginSpec.DatasetID).Str("table", table).Err(err).Msg("Got unexpected error while checking table metadata")
7268
return false, err
7369
}
7470

plugins/destination/bigquery/client/read.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
)
1313

1414
const (
15-
readSQL = "SELECT * FROM `%s.%s.%s` WHERE `_cq_source_name` = @cq_source_name"
15+
readSQL = "SELECT * FROM `%s.%s.%s` WHERE `_cq_source_name` = @cq_source_name order by _cq_sync_time asc"
1616
)
1717

1818
func (*Client) createResultsArray(table *schema.Table) []bigquery.Value {
@@ -79,11 +79,7 @@ func (*Client) createResultsArray(table *schema.Table) []bigquery.Value {
7979

8080
func (c *Client) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- []any) error {
8181
stmt := fmt.Sprintf(readSQL, c.pluginSpec.ProjectID, c.pluginSpec.DatasetID, table.Name)
82-
client, err := c.bqClient(ctx)
83-
if err != nil {
84-
return fmt.Errorf("failed to create client: %w", err)
85-
}
86-
q := client.Query(stmt)
82+
q := c.client.Query(stmt)
8783
q.Parameters = []bigquery.QueryParameter{
8884
{
8985
Name: "cq_source_name",
@@ -94,9 +90,9 @@ func (c *Client) Read(ctx context.Context, table *schema.Table, sourceName strin
9490
if err != nil {
9591
return fmt.Errorf("failed to read table %s: %w", table.Name, err)
9692
}
97-
values := c.createResultsArray(table)
98-
v := make([]any, len(values))
9993
for {
94+
values := c.createResultsArray(table)
95+
v := make([]any, len(values))
10096
err := it.Next(&values)
10197
if err == iterator.Done {
10298
break

plugins/destination/bigquery/client/transformer.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package client
22

33
import (
4-
"encoding/hex"
5-
64
"github.com/cloudquery/plugin-sdk/schema"
75
)
86

@@ -11,7 +9,7 @@ func (*Client) TransformBool(v *schema.Bool) any {
119
}
1210

1311
func (*Client) TransformBytea(v *schema.Bytea) any {
14-
return hex.EncodeToString(v.Bytes)
12+
return v.Bytes
1513
}
1614

1715
func (*Client) TransformFloat8(v *schema.Float8) any {

plugins/destination/bigquery/client/write.go

Lines changed: 9 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,14 @@ import (
66
"time"
77

88
"cloud.google.com/go/bigquery"
9-
"github.com/cloudquery/plugin-sdk/plugins/destination"
109
"github.com/cloudquery/plugin-sdk/schema"
11-
"golang.org/x/sync/errgroup"
1210
)
1311

1412
const (
1513
batchSize = 1000
1614
writeTimeout = 5 * time.Minute
1715
)
1816

19-
type worker struct {
20-
writeChan chan []any
21-
}
22-
2317
type item struct {
2418
cols map[string]bigquery.Value
2519
}
@@ -29,13 +23,12 @@ func (i *item) Save() (map[string]bigquery.Value, string, error) {
2923
return i.cols, bigquery.NoDedupeID, nil
3024
}
3125

32-
func (c *Client) writeResource(ctx context.Context, table *schema.Table, client *bigquery.Client, resources <-chan []any) error {
33-
inserter := client.Dataset(c.pluginSpec.DatasetID).Table(table.Name).Inserter()
26+
func (c *Client) WriteTableBatch(ctx context.Context, table *schema.Table, resources [][]any) error {
27+
inserter := c.client.Dataset(c.pluginSpec.DatasetID).Table(table.Name).Inserter()
3428
inserter.IgnoreUnknownValues = true
3529
inserter.SkipInvalidRows = false
3630
batch := make([]*item, 0)
37-
for cols := range resources {
38-
c.logger.Debug().Msg("Got resource")
31+
for _, cols := range resources {
3932
saver := &item{
4033
cols: make(map[string]bigquery.Value, len(table.Columns)),
4134
}
@@ -46,71 +39,15 @@ func (c *Client) writeResource(ctx context.Context, table *schema.Table, client
4639
}
4740
saver.cols[table.Columns[i].Name] = cols[i]
4841
}
49-
c.logger.Debug().Interface("cols", saver.cols).Msg("got resource")
5042
batch = append(batch, saver)
51-
if len(batch) >= c.batchSize {
52-
c.logger.Debug().Msg("Writing batch")
53-
// we use a context with timeout here, because inserter.Put can retry indefinitely
54-
// on retryable errors if not given a context timeout
55-
timeoutCtx, cancel := context.WithTimeout(ctx, writeTimeout)
56-
err := inserter.Put(timeoutCtx, batch)
57-
if err != nil {
58-
cancel()
59-
return fmt.Errorf("failed to put item into BigQuery table %s: %w", table.Name, err)
60-
}
61-
// release resources from timeout context if it finished early
62-
batch = nil
63-
cancel()
64-
}
6543
}
66-
if len(batch) > 0 {
67-
c.logger.Debug().Msg("Writing final batch")
68-
// flush final rows
69-
timeoutCtx, cancel := context.WithTimeout(ctx, writeTimeout)
70-
defer cancel()
71-
err := inserter.Put(timeoutCtx, batch)
72-
if err != nil {
73-
return fmt.Errorf("failed to put item into BigQuery table %s: %w", table.Name, err)
74-
}
75-
}
76-
77-
return nil
78-
}
79-
80-
func (c *Client) Write(ctx context.Context, tables schema.Tables, res <-chan *destination.ClientResource) error {
81-
eg, gctx := errgroup.WithContext(ctx)
82-
workers := make(map[string]*worker, len(tables))
83-
client, err := c.bqClient(ctx)
44+
// flush final rows
45+
timeoutCtx, cancel := context.WithTimeout(ctx, writeTimeout)
46+
defer cancel()
47+
err := inserter.Put(timeoutCtx, batch)
8448
if err != nil {
85-
return fmt.Errorf("failed to create client: %w", err)
86-
}
87-
for _, t := range tables.FlattenTables() {
88-
t := t
89-
writeChan := make(chan []any)
90-
workers[t.Name] = &worker{
91-
writeChan: writeChan,
92-
}
93-
eg.Go(func() error {
94-
return c.writeResource(gctx, t, client, writeChan)
95-
})
49+
return fmt.Errorf("failed to put item into BigQuery table %s: %w", table.Name, err)
9650
}
9751

98-
done := false
99-
for !done {
100-
select {
101-
case r, ok := <-res:
102-
if !ok {
103-
done = true
104-
break
105-
}
106-
workers[r.TableName].writeChan <- r.Data
107-
case <-gctx.Done():
108-
done = true
109-
}
110-
}
111-
for _, w := range workers {
112-
close(w.writeChan)
113-
}
114-
115-
return eg.Wait()
52+
return nil
11653
}

plugins/destination/bigquery/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ go 1.19
44

55
require (
66
cloud.google.com/go/bigquery v1.43.0
7-
github.com/cloudquery/plugin-sdk v1.12.7
7+
github.com/cloudquery/plugin-sdk v1.13.1
88
github.com/rs/zerolog v1.28.0
99
golang.org/x/sync v0.1.0
1010
google.golang.org/api v0.103.0

plugins/destination/bigquery/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR
5353
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
5454
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
5555
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
56-
github.com/cloudquery/plugin-sdk v1.12.7 h1:hg84meEkASUPLnAuBxvgFpyjWUP2iWjxZulkxVcnxTY=
57-
github.com/cloudquery/plugin-sdk v1.12.7/go.mod h1:PKne4lmvDFCEbTAS8EQzPohkXchwi/7NSvu77l07hCg=
56+
github.com/cloudquery/plugin-sdk v1.13.1 h1:ny/9L4C/pp77wrkLjNtGSZp70XrVkXxOAV/EI0kVVo0=
57+
github.com/cloudquery/plugin-sdk v1.13.1/go.mod h1:PKne4lmvDFCEbTAS8EQzPohkXchwi/7NSvu77l07hCg=
5858
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
5959
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
6060
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=

plugins/destination/bigquery/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,6 @@ const (
1212
)
1313

1414
func main() {
15-
p := destination.NewPlugin("bigquery", plugin.Version, client.New)
15+
p := destination.NewPlugin("bigquery", plugin.Version, client.New, destination.WithManagedWriter())
1616
serve.Destination(p, serve.WithDestinationSentryDSN(sentryDSN))
1717
}

0 commit comments

Comments
 (0)