From e851bbe2673c5a403f6ca6cc11c1829a44d9692b Mon Sep 17 00:00:00 2001 From: samuelkim7 Date: Wed, 11 Feb 2026 23:07:50 +0100 Subject: [PATCH 1/4] feat: Add PostgreSQL online store support for Go feature server Signed-off-by: samuelkim7 --- go.mod | 4 + go.sum | 14 ++ go/internal/feast/onlinestore/onlinestore.go | 5 +- .../feast/onlinestore/postgresonlinestore.go | 181 ++++++++++++++++++ .../onlinestore/postgresonlinestore_test.go | 145 ++++++++++++++ 5 files changed, 348 insertions(+), 1 deletion(-) create mode 100644 go/internal/feast/onlinestore/postgresonlinestore.go create mode 100644 go/internal/feast/onlinestore/postgresonlinestore_test.go diff --git a/go.mod b/go.mod index f0dc02d116d..46332e3c807 100644 --- a/go.mod +++ b/go.mod @@ -82,6 +82,10 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.3.7 // indirect github.com/googleapis/gax-go/v2 v2.15.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/pgx/v5 v5.8.0 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/klauspost/asmfmt v1.3.2 // indirect github.com/klauspost/compress v1.18.0 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect diff --git a/go.sum b/go.sum index 3080dbdd50a..51d24f57197 100644 --- a/go.sum +++ b/go.sum @@ -89,6 +89,7 @@ github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 h1:aQ3y1lwWyqYPiWZThqv1aFbZMiM9vblcSArJRf2Irls= github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= @@ -137,6 +138,14 @@ github.com/googleapis/gax-go/v2 v2.15.0 h1:SyjDc1mGgZU5LncH8gimWo9lW1DtIfPibOG81 github.com/googleapis/gax-go/v2 v2.15.0/go.mod h1:zVVkkxAQHa1RQpg9z2AUCMnKhi0Qld9rcmyfL1OZhoc= github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 h1:8Tjv8EJ+pM1xP8mK6egEbD1OgnVTyacbefKhmbLhIhU= github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2/go.mod h1:pkJQ2tZHJ0aFOVEEot6oZmaVEZcRme73eIFmhiVuRWs= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.8.0 h1:TYPDoleBBme0xGSAX3/+NujXXtpZn9HBONkQC7IEZSo= +github.com/jackc/pgx/v5 v5.8.0/go.mod h1:QVeDInX2m9VyzvNeiCJVjCkNFqzsNb43204HshNSZKw= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= @@ -165,6 +174,7 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo= github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4= @@ -180,8 +190,11 @@ github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0b github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spiffe/go-spiffe/v2 v2.5.0 h1:N2I01KCUkv1FAjZXJMwh95KK1ZIQLYbPfhaxw8WS0hE= github.com/spiffe/go-spiffe/v2 v2.5.0/go.mod h1:P+NxobPc6wXhVtINNtFjNWGBTreew1GBUCwT2wPmb7g= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= @@ -265,5 +278,6 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/go/internal/feast/onlinestore/onlinestore.go b/go/internal/feast/onlinestore/onlinestore.go index a2652af2e1b..837a3f31d82 100644 --- a/go/internal/feast/onlinestore/onlinestore.go +++ b/go/internal/feast/onlinestore/onlinestore.go @@ -66,8 +66,11 @@ func NewOnlineStore(config *registry.RepoConfig) (OnlineStore, error) { } else if onlineStoreType == "dynamodb" { onlineStore, err := NewDynamodbOnlineStore(config.Project, config, config.OnlineStore) return onlineStore, err + } else if onlineStoreType == "postgres" { + onlineStore, err := NewPostgresOnlineStore(config.Project, config, config.OnlineStore) + return onlineStore, err } else { - return nil, fmt.Errorf("%s online store type is currently not supported; only redis and sqlite are supported", onlineStoreType) + return nil, fmt.Errorf("%s online store type is currently not supported; only redis, sqlite, dynamodb, and postgres are supported", onlineStoreType) } } diff --git a/go/internal/feast/onlinestore/postgresonlinestore.go b/go/internal/feast/onlinestore/postgresonlinestore.go new file mode 100644 index 00000000000..73c80a216c6 --- /dev/null +++ b/go/internal/feast/onlinestore/postgresonlinestore.go @@ -0,0 +1,181 @@ +package onlinestore + +import ( + "context" + "fmt" + "net/url" + "time" + + "github.com/feast-dev/feast/go/internal/feast/registry" + "github.com/feast-dev/feast/go/protos/feast/serving" + "github.com/feast-dev/feast/go/protos/feast/types" + "github.com/jackc/pgx/v5/pgxpool" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" +) + +type PostgresOnlineStore struct { + project string + pool *pgxpool.Pool + config *registry.RepoConfig +} + +func NewPostgresOnlineStore(project string, config *registry.RepoConfig, onlineStoreConfig map[string]interface{}) (*PostgresOnlineStore, error) { + connStr := buildPostgresConnString(onlineStoreConfig) + poolConfig, err := pgxpool.ParseConfig(connStr) + if err != nil { + return nil, fmt.Errorf("failed to parse postgres config: %w", err) + } + + if schema, ok := onlineStoreConfig["db_schema"].(string); ok && schema != "" { + poolConfig.ConnConfig.RuntimeParams["search_path"] = schema + } + + pool, err := pgxpool.NewWithConfig(context.Background(), poolConfig) + if err != nil { + return nil, fmt.Errorf("failed to create postgres pool: %w", err) + } + + return &PostgresOnlineStore{ + project: project, + pool: pool, + config: config, + }, nil +} + +func (p *PostgresOnlineStore) Destruct() { + if p.pool != nil { + p.pool.Close() + } +} + +func (p *PostgresOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error) { + featureCount := len(featureNames) + results := make([][]FeatureData, len(entityKeys)) + + serializedKeys := make([][]byte, len(entityKeys)) + entityKeyMap := make(map[string]int, len(entityKeys)) + + for i, entityKey := range entityKeys { + serKey, err := serializeEntityKey(entityKey, p.config.EntityKeySerializationVersion) + if err != nil { + return nil, err + } + serializedKeys[i] = *serKey + entityKeyMap[string(*serKey)] = i + } + + featureNamesToIdx := make(map[string]int, len(featureNames)) + for idx, name := range featureNames { + featureNamesToIdx[name] = idx + } + + for _, featureViewName := range featureViewNames { + tableName := fmt.Sprintf(`"%s"`, tableId(p.project, featureViewName)) + query := fmt.Sprintf( + `SELECT entity_key, feature_name, value, event_ts FROM %s WHERE entity_key = ANY($1)`, + tableName, + ) + + rows, err := p.pool.Query(ctx, query, serializedKeys) + if err != nil { + return nil, fmt.Errorf("failed to query postgres: %w", err) + } + + for rows.Next() { + var entityKeyBytes []byte + var featureName string + var valueBytes []byte + var eventTs time.Time + + if err := rows.Scan(&entityKeyBytes, &featureName, &valueBytes, &eventTs); err != nil { + rows.Close() + return nil, fmt.Errorf("failed to scan postgres row: %w", err) + } + + rowIdx, ok := entityKeyMap[string(entityKeyBytes)] + if !ok { + continue + } + + if results[rowIdx] == nil { + results[rowIdx] = make([]FeatureData, featureCount) + } + + if featureIdx, ok := featureNamesToIdx[featureName]; ok { + var value types.Value + if err := proto.Unmarshal(valueBytes, &value); err != nil { + rows.Close() + return nil, fmt.Errorf("failed to unmarshal feature value: %w", err) + } + + results[rowIdx][featureIdx] = FeatureData{ + Reference: serving.FeatureReferenceV2{FeatureViewName: featureViewName, FeatureName: featureName}, + Timestamp: *timestamppb.New(eventTs), + Value: types.Value{Val: value.Val}, + } + } + } + rows.Close() + } + + return results, nil +} + +func buildPostgresConnString(config map[string]interface{}) string { + host, _ := config["host"].(string) + if host == "" { + host = "localhost" + } + + port := 5432 + if p, ok := config["port"].(float64); ok { + port = int(p) + } + + database, _ := config["database"].(string) + user, _ := config["user"].(string) + password, _ := config["password"].(string) + + var userInfo *url.Userinfo + if user != "" { + if password != "" { + userInfo = url.UserPassword(user, password) + } else { + userInfo = url.User(user) + } + } + + query := url.Values{} + if sslMode, ok := config["sslmode"].(string); ok && sslMode != "" { + query.Set("sslmode", sslMode) + } else { + query.Set("sslmode", "disable") + } + + if v, ok := config["sslcert_path"].(string); ok && v != "" { + query.Set("sslcert", v) + } + if v, ok := config["sslkey_path"].(string); ok && v != "" { + query.Set("sslkey", v) + } + if v, ok := config["sslrootcert_path"].(string); ok && v != "" { + query.Set("sslrootcert", v) + } + if v, ok := config["min_conn"].(float64); ok { + query.Set("pool_min_conns", fmt.Sprintf("%d", int(v))) + } + if v, ok := config["max_conn"].(float64); ok { + query.Set("pool_max_conns", fmt.Sprintf("%d", int(v))) + } + + connURL := url.URL{ + Scheme: "postgres", + User: userInfo, + Host: fmt.Sprintf("%s:%d", host, port), + Path: database, + RawQuery: query.Encode(), + } + + return connURL.String() +} \ No newline at end of file diff --git a/go/internal/feast/onlinestore/postgresonlinestore_test.go b/go/internal/feast/onlinestore/postgresonlinestore_test.go new file mode 100644 index 00000000000..2d81ba2cddb --- /dev/null +++ b/go/internal/feast/onlinestore/postgresonlinestore_test.go @@ -0,0 +1,145 @@ +package onlinestore + +import ( + "testing" + + "github.com/feast-dev/feast/go/internal/feast/registry" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/stretchr/testify/assert" +) + +func TestBuildPostgresConnString(t *testing.T) { + config := map[string]interface{}{ + "type": "postgres", + "host": "db.example.com", + "port": float64(5432), + "database": "feast", + "user": "feast_user", + "password": "feast_pass", + "sslmode": "require", + } + connStr := buildPostgresConnString(config) + assert.Contains(t, connStr, "db.example.com:5432") + assert.Contains(t, connStr, "feast_user:") + assert.Contains(t, connStr, "feast_pass@") + assert.Contains(t, connStr, "/feast") + assert.Contains(t, connStr, "sslmode=require") +} + +func TestBuildPostgresConnStringDefaults(t *testing.T) { + config := map[string]interface{}{ + "database": "feast", + "user": "feast_user", + "password": "feast_pass", + } + connStr := buildPostgresConnString(config) + assert.Contains(t, connStr, "localhost:5432") + assert.Contains(t, connStr, "sslmode=disable") +} + +func TestBuildPostgresConnStringWithSSL(t *testing.T) { + config := map[string]interface{}{ + "host": "db.example.com", + "port": float64(5433), + "database": "feast", + "user": "feast_user", + "password": "feast_pass", + "sslmode": "verify-full", + "sslcert_path": "/path/to/cert", + "sslkey_path": "/path/to/key", + "sslrootcert_path": "/path/to/rootcert", + } + connStr := buildPostgresConnString(config) + assert.Contains(t, connStr, "db.example.com:5433") + assert.Contains(t, connStr, "sslmode=verify-full") + assert.Contains(t, connStr, "sslcert=%2Fpath%2Fto%2Fcert") + assert.Contains(t, connStr, "sslkey=%2Fpath%2Fto%2Fkey") + assert.Contains(t, connStr, "sslrootcert=%2Fpath%2Fto%2Frootcert") +} + +func TestBuildPostgresConnStringWithPooling(t *testing.T) { + config := map[string]interface{}{ + "host": "localhost", + "database": "feast", + "user": "feast_user", + "password": "feast_pass", + "min_conn": float64(2), + "max_conn": float64(10), + } + connStr := buildPostgresConnString(config) + assert.Contains(t, connStr, "pool_min_conns=2") + assert.Contains(t, connStr, "pool_max_conns=10") +} + +func TestBuildPostgresConnStringSpecialChars(t *testing.T) { + config := map[string]interface{}{ + "host": "localhost", + "database": "feast", + "user": "user@domain", + "password": "p@ss=word&special", + } + connStr := buildPostgresConnString(config) + poolConfig, err := pgxpool.ParseConfig(connStr) + assert.Nil(t, err) + assert.Equal(t, "user@domain", poolConfig.ConnConfig.User) + assert.Equal(t, "p@ss=word&special", poolConfig.ConnConfig.Password) +} + +func TestBuildPostgresConnStringParseable(t *testing.T) { + config := map[string]interface{}{ + "host": "localhost", + "port": float64(5432), + "database": "feast", + "user": "feast_user", + "password": "feast_pass", + } + connStr := buildPostgresConnString(config) + poolConfig, err := pgxpool.ParseConfig(connStr) + assert.Nil(t, err) + assert.Equal(t, "localhost", poolConfig.ConnConfig.Host) + assert.Equal(t, uint16(5432), poolConfig.ConnConfig.Port) + assert.Equal(t, "feast", poolConfig.ConnConfig.Database) + assert.Equal(t, "feast_user", poolConfig.ConnConfig.User) + assert.Equal(t, "feast_pass", poolConfig.ConnConfig.Password) +} + +func TestNewPostgresOnlineStore(t *testing.T) { + config := map[string]interface{}{ + "type": "postgres", + "host": "localhost", + "port": float64(5432), + "database": "feast", + "user": "feast_user", + "password": "feast_pass", + } + rc := ®istry.RepoConfig{ + OnlineStore: config, + EntityKeySerializationVersion: 2, + } + store, err := NewPostgresOnlineStore("test", rc, config) + assert.Nil(t, err) + assert.NotNil(t, store) + assert.Equal(t, "feast", store.pool.Config().ConnConfig.Database) + store.Destruct() +} + +func TestNewPostgresOnlineStoreWithSchema(t *testing.T) { + config := map[string]interface{}{ + "type": "postgres", + "host": "localhost", + "port": float64(5432), + "database": "feast", + "user": "feast_user", + "password": "feast_pass", + "db_schema": "custom_schema", + } + rc := ®istry.RepoConfig{ + OnlineStore: config, + EntityKeySerializationVersion: 2, + } + store, err := NewPostgresOnlineStore("test", rc, config) + assert.Nil(t, err) + assert.NotNil(t, store) + assert.Equal(t, "custom_schema", store.pool.Config().ConnConfig.RuntimeParams["search_path"]) + store.Destruct() +} From fe222204f079279e6b3051b64313c63f367506b6 Mon Sep 17 00:00:00 2001 From: Myeongwon Kim <65876994+samuelkim7@users.noreply.github.com> Date: Wed, 11 Feb 2026 23:36:33 +0100 Subject: [PATCH 2/4] Update go/internal/feast/onlinestore/postgresonlinestore.go Co-authored-by: devin-ai-integration[bot] <158243242+devin-ai-integration[bot]@users.noreply.github.com> Signed-off-by: samuelkim7 --- go/internal/feast/onlinestore/postgresonlinestore.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/go/internal/feast/onlinestore/postgresonlinestore.go b/go/internal/feast/onlinestore/postgresonlinestore.go index 73c80a216c6..5e26b68ff89 100644 --- a/go/internal/feast/onlinestore/postgresonlinestore.go +++ b/go/internal/feast/onlinestore/postgresonlinestore.go @@ -115,8 +115,12 @@ func (p *PostgresOnlineStore) OnlineRead(ctx context.Context, entityKeys []*type Value: types.Value{Val: value.Val}, } } - } - rows.Close() + } + rows.Close() + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("error iterating postgres rows: %w", err) + } + } return results, nil From b25c9c6976b825b6a42346ac7c180a4989c9ff85 Mon Sep 17 00:00:00 2001 From: Myeongwon Kim <65876994+samuelkim7@users.noreply.github.com> Date: Wed, 11 Feb 2026 23:40:31 +0100 Subject: [PATCH 3/4] Update go/internal/feast/onlinestore/postgresonlinestore.go Co-authored-by: devin-ai-integration[bot] <158243242+devin-ai-integration[bot]@users.noreply.github.com> Signed-off-by: samuelkim7 --- go/internal/feast/onlinestore/postgresonlinestore.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/go/internal/feast/onlinestore/postgresonlinestore.go b/go/internal/feast/onlinestore/postgresonlinestore.go index 5e26b68ff89..779000e0b66 100644 --- a/go/internal/feast/onlinestore/postgresonlinestore.go +++ b/go/internal/feast/onlinestore/postgresonlinestore.go @@ -71,7 +71,8 @@ func (p *PostgresOnlineStore) OnlineRead(ctx context.Context, entityKeys []*type } for _, featureViewName := range featureViewNames { - tableName := fmt.Sprintf(`"%s"`, tableId(p.project, featureViewName)) + tableName := fmt.Sprintf(`"%s"`, strings.ReplaceAll(tableId(p.project, featureViewName), `"`, `""`)) + query := fmt.Sprintf( `SELECT entity_key, feature_name, value, event_ts FROM %s WHERE entity_key = ANY($1)`, tableName, From a9b803d3ca699539b99c7c640058e618e62282dc Mon Sep 17 00:00:00 2001 From: samuelkim7 Date: Thu, 12 Feb 2026 00:05:47 +0100 Subject: [PATCH 4/4] fix: Deduplicate feature view queries and fix index mapping Signed-off-by: samuelkim7 --- .../feast/onlinestore/postgresonlinestore.go | 35 ++++++++++++------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/go/internal/feast/onlinestore/postgresonlinestore.go b/go/internal/feast/onlinestore/postgresonlinestore.go index 779000e0b66..a05e21df775 100644 --- a/go/internal/feast/onlinestore/postgresonlinestore.go +++ b/go/internal/feast/onlinestore/postgresonlinestore.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/url" + "strings" "time" "github.com/feast-dev/feast/go/internal/feast/registry" @@ -65,14 +66,25 @@ func (p *PostgresOnlineStore) OnlineRead(ctx context.Context, entityKeys []*type entityKeyMap[string(*serKey)] = i } - featureNamesToIdx := make(map[string]int, len(featureNames)) - for idx, name := range featureNames { - featureNamesToIdx[name] = idx + type featureRef struct { + name string + index int + } + featuresByView := make(map[string][]featureRef) + for i, viewName := range featureViewNames { + featuresByView[viewName] = append(featuresByView[viewName], featureRef{ + name: featureNames[i], + index: i, + }) } - for _, featureViewName := range featureViewNames { - tableName := fmt.Sprintf(`"%s"`, strings.ReplaceAll(tableId(p.project, featureViewName), `"`, `""`)) + for viewName, features := range featuresByView { + featureNamesToIdx := make(map[string]int, len(features)) + for _, f := range features { + featureNamesToIdx[f.name] = f.index + } + tableName := fmt.Sprintf(`"%s"`, strings.ReplaceAll(tableId(p.project, viewName), `"`, `""`)) query := fmt.Sprintf( `SELECT entity_key, feature_name, value, event_ts FROM %s WHERE entity_key = ANY($1)`, tableName, @@ -111,17 +123,16 @@ func (p *PostgresOnlineStore) OnlineRead(ctx context.Context, entityKeys []*type } results[rowIdx][featureIdx] = FeatureData{ - Reference: serving.FeatureReferenceV2{FeatureViewName: featureViewName, FeatureName: featureName}, + Reference: serving.FeatureReferenceV2{FeatureViewName: viewName, FeatureName: featureName}, Timestamp: *timestamppb.New(eventTs), Value: types.Value{Val: value.Val}, } } - } - rows.Close() - if err := rows.Err(); err != nil { - return nil, fmt.Errorf("error iterating postgres rows: %w", err) - } - + } + rows.Close() + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("error iterating postgres rows: %w", err) + } } return results, nil