Skip to content

Commit d295825

Browse files
refact: separate s3 registry file from local.go
1 parent 2ee26db commit d295825

File tree

2 files changed

+104
-96
lines changed

2 files changed

+104
-96
lines changed
Lines changed: 1 addition & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,10 @@
11
package registry
22

33
import (
4-
"context"
5-
"errors"
6-
"github.com/aws/aws-sdk-go-v2/aws"
7-
awsConfig "github.com/aws/aws-sdk-go-v2/config"
4+
"github.com/google/uuid"
85
"io/ioutil"
96
"os"
107
"path/filepath"
11-
"strings"
12-
"time"
13-
14-
"github.com/aws/aws-sdk-go-v2/service/s3"
15-
"github.com/google/uuid"
168

179
"google.golang.org/protobuf/proto"
1810
"google.golang.org/protobuf/types/known/timestamppb"
@@ -25,12 +17,6 @@ type FileRegistryStore struct {
2517
filePath string
2618
}
2719

28-
// A S3RegistryStore is a S3 object storage-based implementation of the RegistryStore interface
29-
type S3RegistryStore struct {
30-
filePath string
31-
s3Client *s3.Client
32-
}
33-
3420
// NewFileRegistryStore creates a FileRegistryStore with the given configuration and infers
3521
// the file path from the repo path and registry path.
3622
func NewFileRegistryStore(config *RegistryConfig, repoPath string) *FileRegistryStore {
@@ -44,26 +30,6 @@ func NewFileRegistryStore(config *RegistryConfig, repoPath string) *FileRegistry
4430
return &lr
4531
}
4632

47-
// NewS3RegistryStore creates a S3RegistryStore with the given configuration
48-
func NewS3RegistryStore(config *RegistryConfig, repoPath string) *S3RegistryStore {
49-
var lr S3RegistryStore
50-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
51-
defer cancel()
52-
53-
cfg, err := awsConfig.LoadDefaultConfig(ctx)
54-
if err != nil {
55-
lr = S3RegistryStore{
56-
filePath: config.Path,
57-
}
58-
} else {
59-
lr = S3RegistryStore{
60-
filePath: config.Path,
61-
s3Client: s3.NewFromConfig(cfg),
62-
}
63-
}
64-
return &lr
65-
}
66-
6733
// GetRegistryProto reads and parses the registry proto from the file path.
6834
func (r *FileRegistryStore) GetRegistryProto() (*core.Registry, error) {
6935
registry := &core.Registry{}
@@ -98,64 +64,3 @@ func (r *FileRegistryStore) writeRegistry(rp *core.Registry) error {
9864
}
9965
return nil
10066
}
101-
102-
func (r *S3RegistryStore) GetRegistryProto() (*core.Registry, error) {
103-
bucket, key, err := r.parseS3Path()
104-
if err != nil {
105-
return nil, err
106-
}
107-
108-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
109-
defer cancel()
110-
output, err := r.s3Client.GetObject(ctx,
111-
&s3.GetObjectInput{
112-
Bucket: aws.String(bucket),
113-
Key: aws.String(key),
114-
})
115-
if err != nil {
116-
return nil, err
117-
}
118-
defer output.Body.Close()
119-
120-
data, err := ioutil.ReadAll(output.Body)
121-
if err != nil {
122-
return nil, err
123-
}
124-
125-
registry := &core.Registry{}
126-
if err := proto.Unmarshal(data, registry); err != nil {
127-
return nil, err
128-
}
129-
return registry, nil
130-
}
131-
132-
func (r *S3RegistryStore) UpdateRegistryProto(rp *core.Registry) error {
133-
return errors.New("not implemented in S3RegistryStore")
134-
}
135-
136-
func (r *S3RegistryStore) Teardown() error {
137-
bucket, key, err := r.parseS3Path()
138-
if err != nil {
139-
return err
140-
}
141-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
142-
defer cancel()
143-
_, err = r.s3Client.DeleteObject(ctx,
144-
&s3.DeleteObjectInput{
145-
Bucket: aws.String(bucket),
146-
Key: aws.String(key),
147-
})
148-
if err != nil {
149-
return err
150-
}
151-
return nil
152-
}
153-
154-
func (r *S3RegistryStore) parseS3Path() (string, string, error) {
155-
path := strings.TrimPrefix(r.filePath, "s3://")
156-
parts := strings.SplitN(path, "/", 2)
157-
if len(parts) != 2 {
158-
return "", "", errors.New("invalid S3 file path format")
159-
}
160-
return parts[0], parts[1], nil
161-
}

go/internal/feast/registry/s3.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package registry
2+
3+
import (
4+
"context"
5+
"errors"
6+
"io/ioutil"
7+
"strings"
8+
"time"
9+
10+
"github.com/aws/aws-sdk-go-v2/aws"
11+
awsConfig "github.com/aws/aws-sdk-go-v2/config"
12+
"github.com/aws/aws-sdk-go-v2/service/s3"
13+
"github.com/feast-dev/feast/go/protos/feast/core"
14+
15+
"google.golang.org/protobuf/proto"
16+
)
17+
18+
// A S3RegistryStore is a S3 object storage-based implementation of the RegistryStore interface
19+
type S3RegistryStore struct {
20+
filePath string
21+
s3Client *s3.Client
22+
}
23+
24+
// NewS3RegistryStore creates a S3RegistryStore with the given configuration
25+
func NewS3RegistryStore(config *RegistryConfig, repoPath string) *S3RegistryStore {
26+
var lr S3RegistryStore
27+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
28+
defer cancel()
29+
30+
cfg, err := awsConfig.LoadDefaultConfig(ctx)
31+
if err != nil {
32+
lr = S3RegistryStore{
33+
filePath: config.Path,
34+
}
35+
} else {
36+
lr = S3RegistryStore{
37+
filePath: config.Path,
38+
s3Client: s3.NewFromConfig(cfg),
39+
}
40+
}
41+
return &lr
42+
}
43+
44+
func (r *S3RegistryStore) GetRegistryProto() (*core.Registry, error) {
45+
bucket, key, err := r.parseS3Path()
46+
if err != nil {
47+
return nil, err
48+
}
49+
50+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
51+
defer cancel()
52+
output, err := r.s3Client.GetObject(ctx,
53+
&s3.GetObjectInput{
54+
Bucket: aws.String(bucket),
55+
Key: aws.String(key),
56+
})
57+
if err != nil {
58+
return nil, err
59+
}
60+
defer output.Body.Close()
61+
62+
data, err := ioutil.ReadAll(output.Body)
63+
if err != nil {
64+
return nil, err
65+
}
66+
67+
registry := &core.Registry{}
68+
if err := proto.Unmarshal(data, registry); err != nil {
69+
return nil, err
70+
}
71+
return registry, nil
72+
}
73+
74+
func (r *S3RegistryStore) UpdateRegistryProto(rp *core.Registry) error {
75+
return errors.New("not implemented in S3RegistryStore")
76+
}
77+
78+
func (r *S3RegistryStore) Teardown() error {
79+
bucket, key, err := r.parseS3Path()
80+
if err != nil {
81+
return err
82+
}
83+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
84+
defer cancel()
85+
_, err = r.s3Client.DeleteObject(ctx,
86+
&s3.DeleteObjectInput{
87+
Bucket: aws.String(bucket),
88+
Key: aws.String(key),
89+
})
90+
if err != nil {
91+
return err
92+
}
93+
return nil
94+
}
95+
96+
func (r *S3RegistryStore) parseS3Path() (string, string, error) {
97+
path := strings.TrimPrefix(r.filePath, "s3://")
98+
parts := strings.SplitN(path, "/", 2)
99+
if len(parts) != 2 {
100+
return "", "", errors.New("invalid S3 file path format")
101+
}
102+
return parts[0], parts[1], nil
103+
}

0 commit comments

Comments
 (0)