From a61a3df465ebb2c2dcefab44d064b54654dc5a47 Mon Sep 17 00:00:00 2001 From: bbernays Date: Thu, 15 Dec 2022 12:36:44 -0600 Subject: [PATCH 1/2] add batch size --- plugins/destination/bigquery/client/spec.go | 6 ++++++ plugins/destination/bigquery/client/write.go | 2 +- .../pages/docs/plugins/destinations/bigquery/overview.md | 3 +++ 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/plugins/destination/bigquery/client/spec.go b/plugins/destination/bigquery/client/spec.go index 1bdd97419a0c87..550c0e645ee249 100644 --- a/plugins/destination/bigquery/client/spec.go +++ b/plugins/destination/bigquery/client/spec.go @@ -34,12 +34,18 @@ type Spec struct { DatasetLocation string `json:"dataset_location"` TimePartitioning TimePartitioningOption `json:"time_partitioning"` ServiceAccountKeyJSON string `json:"service_account_key_json"` + BatchSize int `json:"batch_size,omitempty"` } +const defaultBatchSize = 1000 + func (s *Spec) SetDefaults() { if s.TimePartitioning == "" { s.TimePartitioning = TimePartitioningOptionNone } + if s.BatchSize <= 0 { + s.BatchSize = defaultBatchSize + } } func (s *Spec) Validate() error { diff --git a/plugins/destination/bigquery/client/write.go b/plugins/destination/bigquery/client/write.go index b4de4fb53b6dcf..61d1dbf03ec047 100644 --- a/plugins/destination/bigquery/client/write.go +++ b/plugins/destination/bigquery/client/write.go @@ -48,7 +48,7 @@ func (c *Client) writeResource(ctx context.Context, table *schema.Table, client } c.logger.Debug().Interface("cols", saver.cols).Msg("got resource") batch = append(batch, saver) - if len(batch) >= batchSize { + if len(batch) >= c.batchSize { c.logger.Debug().Msg("Writing batch") // we use a context with timeout here, because inserter.Put can retry indefinitely // on retryable errors if not given a context timeout diff --git a/website/pages/docs/plugins/destinations/bigquery/overview.md b/website/pages/docs/plugins/destinations/bigquery/overview.md index 7d3a7845902ca1..d7d2e0eae8237f 100644 --- a/website/pages/docs/plugins/destinations/bigquery/overview.md +++ b/website/pages/docs/plugins/destinations/bigquery/overview.md @@ -91,6 +91,9 @@ This is the top-level spec used by the BigQuery destination plugin. GCP service account key content. This allows for using different service accounts for the GCP source and BigQuery destination. If using service account keys, it is best to use [environment or file variable substitution](/docs/advanced-topics/environment-variable-substitution). +- `batch_size` (int, optional. Default: 1000) + + Number of rows to insert in a single batch. ## Underlying library From d48f096c4d5bd02a97179cb411bc3573a9ebe63f Mon Sep 17 00:00:00 2001 From: bbernays Date: Thu, 15 Dec 2022 15:20:20 -0600 Subject: [PATCH 2/2] Update client.go --- plugins/destination/bigquery/client/client.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plugins/destination/bigquery/client/client.go b/plugins/destination/bigquery/client/client.go index c54df5763bc480..f261addd958840 100644 --- a/plugins/destination/bigquery/client/client.go +++ b/plugins/destination/bigquery/client/client.go @@ -16,6 +16,7 @@ type Client struct { logger zerolog.Logger spec specs.Destination metrics plugins.DestinationMetrics + batchSize int pluginSpec Spec } @@ -37,7 +38,7 @@ func New(ctx context.Context, logger zerolog.Logger, destSpec specs.Destination) } c.pluginSpec = spec - + c.batchSize = spec.BatchSize // create a client to test that we can do it, but new clients will also be instantiated // for queries so that we can use a new context there. client, err := c.bqClient(ctx)