@@ -6,20 +6,14 @@ import (
66 "time"
77
88 "cloud.google.com/go/bigquery"
9- "github.com/cloudquery/plugin-sdk/plugins/destination"
109 "github.com/cloudquery/plugin-sdk/schema"
11- "golang.org/x/sync/errgroup"
1210)
1311
1412const (
1513 batchSize = 1000
1614 writeTimeout = 5 * time .Minute
1715)
1816
19- type worker struct {
20- writeChan chan []any
21- }
22-
2317type item struct {
2418 cols map [string ]bigquery.Value
2519}
@@ -29,13 +23,12 @@ func (i *item) Save() (map[string]bigquery.Value, string, error) {
2923 return i .cols , bigquery .NoDedupeID , nil
3024}
3125
32- func (c * Client ) writeResource (ctx context.Context , table * schema.Table , client * bigquery. Client , resources <- chan []any ) error {
33- inserter := client .Dataset (c .pluginSpec .DatasetID ).Table (table .Name ).Inserter ()
26+ func (c * Client ) WriteTableBatch (ctx context.Context , table * schema.Table , resources [] []any ) error {
27+ inserter := c . client .Dataset (c .pluginSpec .DatasetID ).Table (table .Name ).Inserter ()
3428 inserter .IgnoreUnknownValues = true
3529 inserter .SkipInvalidRows = false
3630 batch := make ([]* item , 0 )
37- for cols := range resources {
38- c .logger .Debug ().Msg ("Got resource" )
31+ for _ , cols := range resources {
3932 saver := & item {
4033 cols : make (map [string ]bigquery.Value , len (table .Columns )),
4134 }
@@ -46,71 +39,15 @@ func (c *Client) writeResource(ctx context.Context, table *schema.Table, client
4639 }
4740 saver .cols [table .Columns [i ].Name ] = cols [i ]
4841 }
49- c .logger .Debug ().Interface ("cols" , saver .cols ).Msg ("got resource" )
5042 batch = append (batch , saver )
51- if len (batch ) >= c .batchSize {
52- c .logger .Debug ().Msg ("Writing batch" )
53- // we use a context with timeout here, because inserter.Put can retry indefinitely
54- // on retryable errors if not given a context timeout
55- timeoutCtx , cancel := context .WithTimeout (ctx , writeTimeout )
56- err := inserter .Put (timeoutCtx , batch )
57- if err != nil {
58- cancel ()
59- return fmt .Errorf ("failed to put item into BigQuery table %s: %w" , table .Name , err )
60- }
61- // release resources from timeout context if it finished early
62- batch = nil
63- cancel ()
64- }
6543 }
66- if len (batch ) > 0 {
67- c .logger .Debug ().Msg ("Writing final batch" )
68- // flush final rows
69- timeoutCtx , cancel := context .WithTimeout (ctx , writeTimeout )
70- defer cancel ()
71- err := inserter .Put (timeoutCtx , batch )
72- if err != nil {
73- return fmt .Errorf ("failed to put item into BigQuery table %s: %w" , table .Name , err )
74- }
75- }
76-
77- return nil
78- }
79-
80- func (c * Client ) Write (ctx context.Context , tables schema.Tables , res <- chan * destination.ClientResource ) error {
81- eg , gctx := errgroup .WithContext (ctx )
82- workers := make (map [string ]* worker , len (tables ))
83- client , err := c .bqClient (ctx )
44+ // flush final rows
45+ timeoutCtx , cancel := context .WithTimeout (ctx , writeTimeout )
46+ defer cancel ()
47+ err := inserter .Put (timeoutCtx , batch )
8448 if err != nil {
85- return fmt .Errorf ("failed to create client: %w" , err )
86- }
87- for _ , t := range tables .FlattenTables () {
88- t := t
89- writeChan := make (chan []any )
90- workers [t .Name ] = & worker {
91- writeChan : writeChan ,
92- }
93- eg .Go (func () error {
94- return c .writeResource (gctx , t , client , writeChan )
95- })
49+ return fmt .Errorf ("failed to put item into BigQuery table %s: %w" , table .Name , err )
9650 }
9751
98- done := false
99- for ! done {
100- select {
101- case r , ok := <- res :
102- if ! ok {
103- done = true
104- break
105- }
106- workers [r .TableName ].writeChan <- r .Data
107- case <- gctx .Done ():
108- done = true
109- }
110- }
111- for _ , w := range workers {
112- close (w .writeChan )
113- }
114-
115- return eg .Wait ()
52+ return nil
11653}
0 commit comments