Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ require (
github.com/googleapis/enterprise-certificate-proxy v0.3.7 // indirect
github.com/googleapis/gax-go/v2 v2.15.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/pgx/v5 v5.8.0 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
Expand Down
14 changes: 14 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL
github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 h1:aQ3y1lwWyqYPiWZThqv1aFbZMiM9vblcSArJRf2Irls=
github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
Expand Down Expand Up @@ -137,6 +138,14 @@ github.com/googleapis/gax-go/v2 v2.15.0 h1:SyjDc1mGgZU5LncH8gimWo9lW1DtIfPibOG81
github.com/googleapis/gax-go/v2 v2.15.0/go.mod h1:zVVkkxAQHa1RQpg9z2AUCMnKhi0Qld9rcmyfL1OZhoc=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 h1:8Tjv8EJ+pM1xP8mK6egEbD1OgnVTyacbefKhmbLhIhU=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2/go.mod h1:pkJQ2tZHJ0aFOVEEot6oZmaVEZcRme73eIFmhiVuRWs=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.8.0 h1:TYPDoleBBme0xGSAX3/+NujXXtpZn9HBONkQC7IEZSo=
github.com/jackc/pgx/v5 v5.8.0/go.mod h1:QVeDInX2m9VyzvNeiCJVjCkNFqzsNb43204HshNSZKw=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4=
github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
Expand Down Expand Up @@ -165,6 +174,7 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo=
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4=
Expand All @@ -180,8 +190,11 @@ github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0b
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spiffe/go-spiffe/v2 v2.5.0 h1:N2I01KCUkv1FAjZXJMwh95KK1ZIQLYbPfhaxw8WS0hE=
github.com/spiffe/go-spiffe/v2 v2.5.0/go.mod h1:P+NxobPc6wXhVtINNtFjNWGBTreew1GBUCwT2wPmb7g=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
Expand Down Expand Up @@ -265,5 +278,6 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
5 changes: 4 additions & 1 deletion go/internal/feast/onlinestore/onlinestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,11 @@ func NewOnlineStore(config *registry.RepoConfig) (OnlineStore, error) {
} else if onlineStoreType == "dynamodb" {
onlineStore, err := NewDynamodbOnlineStore(config.Project, config, config.OnlineStore)
return onlineStore, err
} else if onlineStoreType == "postgres" {
onlineStore, err := NewPostgresOnlineStore(config.Project, config, config.OnlineStore)
return onlineStore, err
} else {
return nil, fmt.Errorf("%s online store type is currently not supported; only redis and sqlite are supported", onlineStoreType)
return nil, fmt.Errorf("%s online store type is currently not supported; only redis, sqlite, dynamodb, and postgres are supported", onlineStoreType)
}
}

Expand Down
186 changes: 186 additions & 0 deletions go/internal/feast/onlinestore/postgresonlinestore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package onlinestore

import (
"context"
"fmt"
"net/url"
"time"

"github.com/feast-dev/feast/go/internal/feast/registry"
"github.com/feast-dev/feast/go/protos/feast/serving"
"github.com/feast-dev/feast/go/protos/feast/types"
"github.com/jackc/pgx/v5/pgxpool"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)

type PostgresOnlineStore struct {
project string
pool *pgxpool.Pool
config *registry.RepoConfig
}

func NewPostgresOnlineStore(project string, config *registry.RepoConfig, onlineStoreConfig map[string]interface{}) (*PostgresOnlineStore, error) {
connStr := buildPostgresConnString(onlineStoreConfig)
poolConfig, err := pgxpool.ParseConfig(connStr)
if err != nil {
return nil, fmt.Errorf("failed to parse postgres config: %w", err)
}

if schema, ok := onlineStoreConfig["db_schema"].(string); ok && schema != "" {
poolConfig.ConnConfig.RuntimeParams["search_path"] = schema
}

pool, err := pgxpool.NewWithConfig(context.Background(), poolConfig)
if err != nil {
return nil, fmt.Errorf("failed to create postgres pool: %w", err)
}

return &PostgresOnlineStore{
project: project,
pool: pool,
config: config,
}, nil
}

func (p *PostgresOnlineStore) Destruct() {
if p.pool != nil {
p.pool.Close()
}
}

func (p *PostgresOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error) {
featureCount := len(featureNames)
results := make([][]FeatureData, len(entityKeys))

serializedKeys := make([][]byte, len(entityKeys))
entityKeyMap := make(map[string]int, len(entityKeys))

for i, entityKey := range entityKeys {
serKey, err := serializeEntityKey(entityKey, p.config.EntityKeySerializationVersion)
if err != nil {
return nil, err
}
serializedKeys[i] = *serKey
entityKeyMap[string(*serKey)] = i
}

featureNamesToIdx := make(map[string]int, len(featureNames))
for idx, name := range featureNames {
featureNamesToIdx[name] = idx
}

for _, featureViewName := range featureViewNames {
tableName := fmt.Sprintf(`"%s"`, strings.ReplaceAll(tableId(p.project, featureViewName), `"`, `""`))

query := fmt.Sprintf(
`SELECT entity_key, feature_name, value, event_ts FROM %s WHERE entity_key = ANY($1)`,
tableName,
)

rows, err := p.pool.Query(ctx, query, serializedKeys)
if err != nil {
return nil, fmt.Errorf("failed to query postgres: %w", err)
}

for rows.Next() {
var entityKeyBytes []byte
var featureName string
var valueBytes []byte
var eventTs time.Time

if err := rows.Scan(&entityKeyBytes, &featureName, &valueBytes, &eventTs); err != nil {
rows.Close()
return nil, fmt.Errorf("failed to scan postgres row: %w", err)
}

rowIdx, ok := entityKeyMap[string(entityKeyBytes)]
if !ok {
continue
}

if results[rowIdx] == nil {
results[rowIdx] = make([]FeatureData, featureCount)
}

if featureIdx, ok := featureNamesToIdx[featureName]; ok {
var value types.Value
if err := proto.Unmarshal(valueBytes, &value); err != nil {
rows.Close()
return nil, fmt.Errorf("failed to unmarshal feature value: %w", err)
}

results[rowIdx][featureIdx] = FeatureData{
Reference: serving.FeatureReferenceV2{FeatureViewName: featureViewName, FeatureName: featureName},
Timestamp: *timestamppb.New(eventTs),
Value: types.Value{Val: value.Val},
}
}
}
rows.Close()
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating postgres rows: %w", err)
}

}
Comment thread
samuelkim7 marked this conversation as resolved.
Outdated

return results, nil
}

func buildPostgresConnString(config map[string]interface{}) string {
host, _ := config["host"].(string)
if host == "" {
host = "localhost"
}

port := 5432
if p, ok := config["port"].(float64); ok {
port = int(p)
}

database, _ := config["database"].(string)
user, _ := config["user"].(string)
password, _ := config["password"].(string)

var userInfo *url.Userinfo
if user != "" {
if password != "" {
userInfo = url.UserPassword(user, password)
} else {
userInfo = url.User(user)
}
}

query := url.Values{}
if sslMode, ok := config["sslmode"].(string); ok && sslMode != "" {
query.Set("sslmode", sslMode)
} else {
query.Set("sslmode", "disable")
}

if v, ok := config["sslcert_path"].(string); ok && v != "" {
query.Set("sslcert", v)
}
if v, ok := config["sslkey_path"].(string); ok && v != "" {
query.Set("sslkey", v)
}
if v, ok := config["sslrootcert_path"].(string); ok && v != "" {
query.Set("sslrootcert", v)
}
if v, ok := config["min_conn"].(float64); ok {
query.Set("pool_min_conns", fmt.Sprintf("%d", int(v)))
}
if v, ok := config["max_conn"].(float64); ok {
query.Set("pool_max_conns", fmt.Sprintf("%d", int(v)))
}

connURL := url.URL{
Scheme: "postgres",
User: userInfo,
Host: fmt.Sprintf("%s:%d", host, port),
Path: database,
RawQuery: query.Encode(),
}

return connURL.String()
}
Loading
Loading