Skip to content

Commit e21a693

Browse files
authored
fix(deps): Update module github.com/aws/aws-sdk-go-v2/feature/s3/manager to v1.22.0 (#22029)
This PR contains the following updates: | Package | Type | Update | Change | |---|---|---|---| | [github.com/aws/aws-sdk-go-v2/feature/s3/manager](https://redirect.github.com/aws/aws-sdk-go-v2) | require | minor | `v1.21.1` -> `v1.22.0` | --- > [!WARNING] > Some dependencies could not be looked up. Check the Dependency Dashboard for more information. --- ### Configuration 📅 **Schedule**: Branch creation - "before 3am on Saturday" (UTC), Automerge - At any time (no schedule defined). 🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied. ♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox. 🔕 **Ignore**: Close this PR and you won't be reminded about this update again. --- - [ ] If you want to rebase/retry this PR, check this box --- This PR has been generated by [Renovate Bot](https://redirect.github.com/renovatebot/renovate).
1 parent 38532b6 commit e21a693

5 files changed

Lines changed: 33 additions & 53 deletions

File tree

plugins/destination/s3/client/client.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ import (
1717
awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
1818
"github.com/aws/aws-sdk-go-v2/config"
1919
"github.com/aws/aws-sdk-go-v2/credentials/stscreds"
20-
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
20+
"github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager"
21+
tmtypes "github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager/types"
2122
"github.com/aws/aws-sdk-go-v2/service/s3"
2223
"github.com/aws/aws-sdk-go-v2/service/sts"
2324
"github.com/cloudquery/cloudquery/plugins/destination/s3/v7/client/spec"
@@ -41,7 +42,8 @@ type Client struct {
4142
*filetypes.Client
4243
writer *streamingbatchwriter.StreamingBatchWriter
4344

44-
s3Client *s3.Client
45+
s3Client *s3.Client
46+
transferManager *transfermanager.Client
4547

4648
initializedTablesLock sync.Mutex
4749
initializedTables map[string]string
@@ -145,23 +147,28 @@ func New(ctx context.Context, logger zerolog.Logger, s []byte, opts plugin.NewCl
145147
return nil, errRetrievingCredentials
146148
}
147149

150+
// Initialize the transfer manager
151+
c.transferManager = transfermanager.New(c.s3Client, func(o *transfermanager.Options) {
152+
o.PartSizeBytes = *c.spec.PartSize
153+
})
154+
148155
if *c.spec.TestWrite {
149156
// we want to run this test because we want it to fail early if the bucket is not accessible
150157
timeNow := time.Now().UTC()
151158

152-
params := &s3.PutObjectInput{
159+
params := &transfermanager.UploadObjectInput{
153160
Bucket: aws.String(c.spec.Bucket),
154161
Key: aws.String(c.spec.ReplacePathVariables("TEST_TABLE", "TEST_UUID", timeNow, c.syncID)),
155162
Body: bytes.NewReader([]byte("")),
156163
}
157164

158165
sseConfiguration := c.spec.ServerSideEncryptionConfiguration
159166
if sseConfiguration != nil {
160-
params.SSEKMSKeyId = &sseConfiguration.SSEKMSKeyId
161-
params.ServerSideEncryption = sseConfiguration.ServerSideEncryption
167+
params.SSEKMSKeyID = &sseConfiguration.SSEKMSKeyId
168+
params.ServerSideEncryption = tmtypes.ServerSideEncryption(sseConfiguration.ServerSideEncryption)
162169
}
163170

164-
if _, err := manager.NewUploader(c.s3Client).Upload(ctx, params); err != nil {
171+
if _, err := c.transferManager.UploadObject(ctx, params); err != nil {
165172
return nil, errors.Join(errTestWriteFailed, err)
166173
}
167174
}

plugins/destination/s3/client/read.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"github.com/aws/aws-sdk-go-v2/aws"
1313
awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
1414
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
15-
"github.com/aws/aws-sdk-go-v2/service/s3"
15+
"github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager"
1616
"github.com/aws/smithy-go"
1717
"github.com/cloudquery/plugin-sdk/v4/schema"
1818
"github.com/google/uuid"
@@ -30,11 +30,11 @@ func (c *Client) Read(ctx context.Context, table *schema.Table, res chan<- arrow
3030

3131
name := c.spec.ReplacePathVariables(table.Name, uuid.NewString(), time.Time{}, c.syncID)
3232
writerAtBuffer := manager.NewWriteAtBuffer(make([]byte, 0, maxFileSize))
33-
_, err := manager.NewDownloader(c.s3Client).Download(ctx,
34-
writerAtBuffer,
35-
&s3.GetObjectInput{
36-
Bucket: aws.String(c.spec.Bucket),
37-
Key: aws.String(name),
33+
_, err := c.transferManager.DownloadObject(ctx,
34+
&transfermanager.DownloadObjectInput{
35+
Bucket: aws.String(c.spec.Bucket),
36+
Key: aws.String(name),
37+
WriterAt: writerAtBuffer,
3838
})
3939

4040
if err != nil {

plugins/destination/s3/client/write.go

Lines changed: 8 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package client
22

33
import (
4-
"bytes"
54
"context"
65
"encoding/json"
76
"errors"
@@ -13,9 +12,8 @@ import (
1312
"github.com/apache/arrow-go/v18/arrow/array"
1413
"github.com/apache/arrow-go/v18/arrow/memory"
1514
"github.com/aws/aws-sdk-go-v2/aws"
16-
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
17-
"github.com/aws/aws-sdk-go-v2/service/s3"
18-
awstypes "github.com/aws/aws-sdk-go-v2/service/s3/types"
15+
"github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager"
16+
tmtypes "github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager/types"
1917
"github.com/cloudquery/filetypes/v4"
2018
"github.com/cloudquery/plugin-sdk/v4/message"
2119
"github.com/cloudquery/plugin-sdk/v4/schema"
@@ -25,54 +23,26 @@ import (
2523

2624
var reInvalidJSONKey = regexp.MustCompile(`\W`)
2725

28-
// If we don't use a reader that supports seeking, the S3 SDK will allocate a 5MB buffer each time it reads a chunk
29-
// While this can work for large files, it's not optimal for small files, based on our tests we're mostly uploading small files
30-
// And depending on source concurrency and destination batch settings, we can upload quite a bit of small files at the same time
31-
// For example we've seen 220 concurrent uploads where most files are only a few dozen KB, this means we're allocating ~1GB of memory, where we could be allocating only a few MBs
32-
// The memory is only cleared after the upload is finished, which makes it even worse
33-
// Please note that for large files the memory we read is capped by the batch size setting which defaults to 50MB
34-
func (c *Client) adjustReaderIfNeeded(r io.Reader) (io.Reader, error) {
35-
// We don't use a reader that supports seeking if NoRotate is set since that means we'll be holding the entire file in memory until the table has finished resolving and the file is uploaded.
36-
// This has the downside of potentially using a lot of memory for tables with small files.
37-
// But it's a tradeoff we're willing to make for now until https://github.com/aws/aws-sdk-go-v2/issues/2694 is resolved, as it has a workaround that we can use (reducing source concurrency).
38-
if c.spec.NoRotate {
39-
return r, nil
40-
}
41-
42-
allData, err := io.ReadAll(r)
43-
if err != nil {
44-
return nil, err
45-
}
46-
readerSeeker := bytes.NewReader(allData)
47-
return readerSeeker, nil
48-
}
49-
5026
func (c *Client) createObject(ctx context.Context, table *schema.Table, objKey string) (*filetypes.Stream, error) {
5127
s, err := c.Client.StartStream(table, func(r io.Reader) error {
52-
adjustedReader, err := c.adjustReaderIfNeeded(r)
53-
if err != nil {
54-
return err
55-
}
56-
params := &s3.PutObjectInput{
28+
params := &transfermanager.UploadObjectInput{
5729
Bucket: aws.String(c.spec.Bucket),
5830
Key: aws.String(objKey),
59-
Body: adjustedReader,
31+
Body: r,
6032
ContentType: aws.String(c.spec.GetContentType()),
6133
}
6234

6335
if c.spec.ACL != "" {
64-
params.ACL = awstypes.ObjectCannedACL(c.spec.ACL)
36+
params.ACL = tmtypes.ObjectCannedACL(c.spec.ACL)
6537
}
6638

6739
sseConfiguration := c.spec.ServerSideEncryptionConfiguration
6840
if sseConfiguration != nil {
69-
params.SSEKMSKeyId = &sseConfiguration.SSEKMSKeyId
70-
params.ServerSideEncryption = sseConfiguration.ServerSideEncryption
41+
params.SSEKMSKeyID = &sseConfiguration.SSEKMSKeyId
42+
params.ServerSideEncryption = tmtypes.ServerSideEncryption(sseConfiguration.ServerSideEncryption)
7143
}
7244

73-
_, err = manager.NewUploader(c.s3Client, func(uploader *manager.Uploader) {
74-
uploader.PartSize = *c.spec.PartSize
75-
}).Upload(ctx, params)
45+
_, err := c.transferManager.UploadObject(ctx, params)
7646
return err
7747
})
7848
return s, err

plugins/destination/s3/go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ require (
77
github.com/aws/aws-sdk-go-v2 v1.41.1
88
github.com/aws/aws-sdk-go-v2/config v1.32.7
99
github.com/aws/aws-sdk-go-v2/credentials v1.19.7
10-
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.21.1
10+
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.22.0
11+
github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager v0.1.2
1112
github.com/aws/aws-sdk-go-v2/service/s3 v1.96.0
1213
github.com/aws/aws-sdk-go-v2/service/sts v1.41.6
1314
github.com/aws/smithy-go v1.24.0

plugins/destination/s3/go.sum

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ github.com/aws/aws-sdk-go-v2/credentials v1.19.7 h1:tHK47VqqtJxOymRrNtUXN5SP/zUT
2121
github.com/aws/aws-sdk-go-v2/credentials v1.19.7/go.mod h1:qOZk8sPDrxhf+4Wf4oT2urYJrYt3RejHSzgAquYeppw=
2222
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.17 h1:I0GyV8wiYrP8XpA70g1HBcQO1JlQxCMTW9npl5UbDHY=
2323
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.17/go.mod h1:tyw7BOl5bBe/oqvoIeECFJjMdzXoa/dfVz3QQ5lgHGA=
24-
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.21.1 h1:1hWFp+52Vq8Fevy/KUhbW/1MEApMz7uitCF/PQXRJpk=
25-
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.21.1/go.mod h1:sIec8j802/rCkCKgZV678HFR0s7lhQUYXT77tIvlaa4=
24+
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.22.0 h1:MpkX8EjkwuvyuX9B7+Zgk5M4URb2WQ84Y6jM81n5imw=
25+
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.22.0/go.mod h1:4V9Pv5sFfMPWQF0Q0zYN6BlV/504dFGaTeogallRqQw=
26+
github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager v0.1.2 h1:1q8/WwEqZnM/vO4q1gx2g7lHYmyN+o4P7G6EW4zKbRQ=
27+
github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager v0.1.2/go.mod h1:owKRexW+Ir5ACD2UTesmjkQ+w7mcmknLNfwOiKfVLTg=
2628
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.17 h1:xOLELNKGp2vsiteLsvLPwxC+mYmO6OZ8PYgiuPJzF8U=
2729
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.17/go.mod h1:5M5CI3D12dNOtH3/mk6minaRwI2/37ifCURZISxA/IQ=
2830
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.17 h1:WWLqlh79iO48yLkj1v3ISRNiv+3KdQoZ6JWyfcsyQik=

0 commit comments

Comments
 (0)