-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Expand file tree
/
Copy paths3.go
More file actions
109 lines (95 loc) · 2.71 KB
/
s3.go
File metadata and controls
109 lines (95 loc) · 2.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package registry
import (
"context"
"errors"
"io"
"strings"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
awsConfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/feast-dev/feast/go/protos/feast/core"
"google.golang.org/protobuf/proto"
)
// S3ClientInterface define interface of s3.Client for making mocking s3 client and testing it
type S3ClientInterface interface {
GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error)
DeleteObject(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error)
}
// A S3RegistryStore is a S3 object storage-based implementation of the RegistryStore interface
type S3RegistryStore struct {
filePath string
s3Client S3ClientInterface
}
// NewS3RegistryStore creates a S3RegistryStore with the given configuration
func NewS3RegistryStore(config *RegistryConfig, repoPath string) *S3RegistryStore {
var lr S3RegistryStore
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
cfg, err := awsConfig.LoadDefaultConfig(ctx)
if err != nil {
lr = S3RegistryStore{
filePath: config.Path,
}
} else {
lr = S3RegistryStore{
filePath: config.Path,
s3Client: s3.NewFromConfig(cfg),
}
}
return &lr
}
func (r *S3RegistryStore) GetRegistryProto() (*core.Registry, error) {
bucket, key, err := r.parseS3Path()
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
output, err := r.s3Client.GetObject(ctx,
&s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
panic(err)
}
defer output.Body.Close()
data, err := io.ReadAll(output.Body)
if err != nil {
return nil, err
}
registry := &core.Registry{}
if err := proto.Unmarshal(data, registry); err != nil {
return nil, err
}
return registry, nil
}
func (r *S3RegistryStore) UpdateRegistryProto(rp *core.Registry) error {
return errors.New("not implemented in S3RegistryStore")
}
func (r *S3RegistryStore) Teardown() error {
bucket, key, err := r.parseS3Path()
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = r.s3Client.DeleteObject(ctx,
&s3.DeleteObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
panic(err)
}
return nil
}
func (r *S3RegistryStore) parseS3Path() (string, string, error) {
path := strings.TrimPrefix(r.filePath, "s3://")
parts := strings.SplitN(path, "/", 2)
if len(parts) != 2 {
return "", "", errors.New("invalid S3 file path format")
}
return parts[0], parts[1], nil
}