Skip to content

Commit b8c6f3d

Browse files
feat: Add PostgreSQL online store support for Go feature server (#5963)
* feat: Add PostgreSQL online store support for Go feature server Signed-off-by: samuelkim7 <samuel.kim@goflink.com> * Update go/internal/feast/onlinestore/postgresonlinestore.go Co-authored-by: devin-ai-integration[bot] <158243242+devin-ai-integration[bot]@users.noreply.github.com> Signed-off-by: samuelkim7 <samuel.kim@goflink.com> * Update go/internal/feast/onlinestore/postgresonlinestore.go Co-authored-by: devin-ai-integration[bot] <158243242+devin-ai-integration[bot]@users.noreply.github.com> Signed-off-by: samuelkim7 <samuel.kim@goflink.com> * fix: Deduplicate feature view queries and fix index mapping Signed-off-by: samuelkim7 <samuel.kim@goflink.com> --------- Signed-off-by: samuelkim7 <samuel.kim@goflink.com> Co-authored-by: devin-ai-integration[bot] <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent e2bad34 commit b8c6f3d

File tree

5 files changed

+364
-1
lines changed

5 files changed

+364
-1
lines changed

go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ require (
8282
github.com/googleapis/enterprise-certificate-proxy v0.3.7 // indirect
8383
github.com/googleapis/gax-go/v2 v2.15.0 // indirect
8484
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 // indirect
85+
github.com/jackc/pgpassfile v1.0.0 // indirect
86+
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
87+
github.com/jackc/pgx/v5 v5.8.0 // indirect
88+
github.com/jackc/puddle/v2 v2.2.2 // indirect
8589
github.com/klauspost/asmfmt v1.3.2 // indirect
8690
github.com/klauspost/compress v1.18.0 // indirect
8791
github.com/klauspost/cpuid/v2 v2.2.8 // indirect

go.sum

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL
8989
github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 h1:aQ3y1lwWyqYPiWZThqv1aFbZMiM9vblcSArJRf2Irls=
9090
github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
9191
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
92+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
9293
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
9394
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
9495
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
@@ -137,6 +138,14 @@ github.com/googleapis/gax-go/v2 v2.15.0 h1:SyjDc1mGgZU5LncH8gimWo9lW1DtIfPibOG81
137138
github.com/googleapis/gax-go/v2 v2.15.0/go.mod h1:zVVkkxAQHa1RQpg9z2AUCMnKhi0Qld9rcmyfL1OZhoc=
138139
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 h1:8Tjv8EJ+pM1xP8mK6egEbD1OgnVTyacbefKhmbLhIhU=
139140
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2/go.mod h1:pkJQ2tZHJ0aFOVEEot6oZmaVEZcRme73eIFmhiVuRWs=
141+
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
142+
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
143+
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
144+
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
145+
github.com/jackc/pgx/v5 v5.8.0 h1:TYPDoleBBme0xGSAX3/+NujXXtpZn9HBONkQC7IEZSo=
146+
github.com/jackc/pgx/v5 v5.8.0/go.mod h1:QVeDInX2m9VyzvNeiCJVjCkNFqzsNb43204HshNSZKw=
147+
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
148+
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
140149
github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4=
141150
github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE=
142151
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
@@ -165,6 +174,7 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
165174
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
166175
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo=
167176
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8=
177+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
168178
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
169179
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
170180
github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4=
@@ -180,8 +190,11 @@ github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0b
180190
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
181191
github.com/spiffe/go-spiffe/v2 v2.5.0 h1:N2I01KCUkv1FAjZXJMwh95KK1ZIQLYbPfhaxw8WS0hE=
182192
github.com/spiffe/go-spiffe/v2 v2.5.0/go.mod h1:P+NxobPc6wXhVtINNtFjNWGBTreew1GBUCwT2wPmb7g=
193+
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
183194
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
184195
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
196+
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
197+
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
185198
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
186199
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
187200
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
@@ -265,5 +278,6 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN
265278
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
266279
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
267280
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
281+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
268282
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
269283
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

go/internal/feast/onlinestore/onlinestore.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,11 @@ func NewOnlineStore(config *registry.RepoConfig) (OnlineStore, error) {
6666
} else if onlineStoreType == "dynamodb" {
6767
onlineStore, err := NewDynamodbOnlineStore(config.Project, config, config.OnlineStore)
6868
return onlineStore, err
69+
} else if onlineStoreType == "postgres" {
70+
onlineStore, err := NewPostgresOnlineStore(config.Project, config, config.OnlineStore)
71+
return onlineStore, err
6972
} else {
70-
return nil, fmt.Errorf("%s online store type is currently not supported; only redis and sqlite are supported", onlineStoreType)
73+
return nil, fmt.Errorf("%s online store type is currently not supported; only redis, sqlite, dynamodb, and postgres are supported", onlineStoreType)
7174
}
7275
}
7376

Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
package onlinestore
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/url"
7+
"strings"
8+
"time"
9+
10+
"github.com/feast-dev/feast/go/internal/feast/registry"
11+
"github.com/feast-dev/feast/go/protos/feast/serving"
12+
"github.com/feast-dev/feast/go/protos/feast/types"
13+
"github.com/jackc/pgx/v5/pgxpool"
14+
"google.golang.org/protobuf/proto"
15+
"google.golang.org/protobuf/types/known/timestamppb"
16+
)
17+
18+
type PostgresOnlineStore struct {
19+
project string
20+
pool *pgxpool.Pool
21+
config *registry.RepoConfig
22+
}
23+
24+
func NewPostgresOnlineStore(project string, config *registry.RepoConfig, onlineStoreConfig map[string]interface{}) (*PostgresOnlineStore, error) {
25+
connStr := buildPostgresConnString(onlineStoreConfig)
26+
poolConfig, err := pgxpool.ParseConfig(connStr)
27+
if err != nil {
28+
return nil, fmt.Errorf("failed to parse postgres config: %w", err)
29+
}
30+
31+
if schema, ok := onlineStoreConfig["db_schema"].(string); ok && schema != "" {
32+
poolConfig.ConnConfig.RuntimeParams["search_path"] = schema
33+
}
34+
35+
pool, err := pgxpool.NewWithConfig(context.Background(), poolConfig)
36+
if err != nil {
37+
return nil, fmt.Errorf("failed to create postgres pool: %w", err)
38+
}
39+
40+
return &PostgresOnlineStore{
41+
project: project,
42+
pool: pool,
43+
config: config,
44+
}, nil
45+
}
46+
47+
func (p *PostgresOnlineStore) Destruct() {
48+
if p.pool != nil {
49+
p.pool.Close()
50+
}
51+
}
52+
53+
func (p *PostgresOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error) {
54+
featureCount := len(featureNames)
55+
results := make([][]FeatureData, len(entityKeys))
56+
57+
serializedKeys := make([][]byte, len(entityKeys))
58+
entityKeyMap := make(map[string]int, len(entityKeys))
59+
60+
for i, entityKey := range entityKeys {
61+
serKey, err := serializeEntityKey(entityKey, p.config.EntityKeySerializationVersion)
62+
if err != nil {
63+
return nil, err
64+
}
65+
serializedKeys[i] = *serKey
66+
entityKeyMap[string(*serKey)] = i
67+
}
68+
69+
type featureRef struct {
70+
name string
71+
index int
72+
}
73+
featuresByView := make(map[string][]featureRef)
74+
for i, viewName := range featureViewNames {
75+
featuresByView[viewName] = append(featuresByView[viewName], featureRef{
76+
name: featureNames[i],
77+
index: i,
78+
})
79+
}
80+
81+
for viewName, features := range featuresByView {
82+
featureNamesToIdx := make(map[string]int, len(features))
83+
for _, f := range features {
84+
featureNamesToIdx[f.name] = f.index
85+
}
86+
87+
tableName := fmt.Sprintf(`"%s"`, strings.ReplaceAll(tableId(p.project, viewName), `"`, `""`))
88+
query := fmt.Sprintf(
89+
`SELECT entity_key, feature_name, value, event_ts FROM %s WHERE entity_key = ANY($1)`,
90+
tableName,
91+
)
92+
93+
rows, err := p.pool.Query(ctx, query, serializedKeys)
94+
if err != nil {
95+
return nil, fmt.Errorf("failed to query postgres: %w", err)
96+
}
97+
98+
for rows.Next() {
99+
var entityKeyBytes []byte
100+
var featureName string
101+
var valueBytes []byte
102+
var eventTs time.Time
103+
104+
if err := rows.Scan(&entityKeyBytes, &featureName, &valueBytes, &eventTs); err != nil {
105+
rows.Close()
106+
return nil, fmt.Errorf("failed to scan postgres row: %w", err)
107+
}
108+
109+
rowIdx, ok := entityKeyMap[string(entityKeyBytes)]
110+
if !ok {
111+
continue
112+
}
113+
114+
if results[rowIdx] == nil {
115+
results[rowIdx] = make([]FeatureData, featureCount)
116+
}
117+
118+
if featureIdx, ok := featureNamesToIdx[featureName]; ok {
119+
var value types.Value
120+
if err := proto.Unmarshal(valueBytes, &value); err != nil {
121+
rows.Close()
122+
return nil, fmt.Errorf("failed to unmarshal feature value: %w", err)
123+
}
124+
125+
results[rowIdx][featureIdx] = FeatureData{
126+
Reference: serving.FeatureReferenceV2{FeatureViewName: viewName, FeatureName: featureName},
127+
Timestamp: *timestamppb.New(eventTs),
128+
Value: types.Value{Val: value.Val},
129+
}
130+
}
131+
}
132+
rows.Close()
133+
if err := rows.Err(); err != nil {
134+
return nil, fmt.Errorf("error iterating postgres rows: %w", err)
135+
}
136+
}
137+
138+
return results, nil
139+
}
140+
141+
func buildPostgresConnString(config map[string]interface{}) string {
142+
host, _ := config["host"].(string)
143+
if host == "" {
144+
host = "localhost"
145+
}
146+
147+
port := 5432
148+
if p, ok := config["port"].(float64); ok {
149+
port = int(p)
150+
}
151+
152+
database, _ := config["database"].(string)
153+
user, _ := config["user"].(string)
154+
password, _ := config["password"].(string)
155+
156+
var userInfo *url.Userinfo
157+
if user != "" {
158+
if password != "" {
159+
userInfo = url.UserPassword(user, password)
160+
} else {
161+
userInfo = url.User(user)
162+
}
163+
}
164+
165+
query := url.Values{}
166+
if sslMode, ok := config["sslmode"].(string); ok && sslMode != "" {
167+
query.Set("sslmode", sslMode)
168+
} else {
169+
query.Set("sslmode", "disable")
170+
}
171+
172+
if v, ok := config["sslcert_path"].(string); ok && v != "" {
173+
query.Set("sslcert", v)
174+
}
175+
if v, ok := config["sslkey_path"].(string); ok && v != "" {
176+
query.Set("sslkey", v)
177+
}
178+
if v, ok := config["sslrootcert_path"].(string); ok && v != "" {
179+
query.Set("sslrootcert", v)
180+
}
181+
if v, ok := config["min_conn"].(float64); ok {
182+
query.Set("pool_min_conns", fmt.Sprintf("%d", int(v)))
183+
}
184+
if v, ok := config["max_conn"].(float64); ok {
185+
query.Set("pool_max_conns", fmt.Sprintf("%d", int(v)))
186+
}
187+
188+
connURL := url.URL{
189+
Scheme: "postgres",
190+
User: userInfo,
191+
Host: fmt.Sprintf("%s:%d", host, port),
192+
Path: database,
193+
RawQuery: query.Encode(),
194+
}
195+
196+
return connURL.String()
197+
}

0 commit comments

Comments
 (0)