1+ package onlinestore
2+
3+ import (
4+ "context"
5+ "fmt"
6+ "net/url"
7+ "strings"
8+ "time"
9+
10+ "github.com/feast-dev/feast/go/internal/feast/registry"
11+ "github.com/feast-dev/feast/go/protos/feast/serving"
12+ "github.com/feast-dev/feast/go/protos/feast/types"
13+ "github.com/jackc/pgx/v5/pgxpool"
14+ "google.golang.org/protobuf/proto"
15+ "google.golang.org/protobuf/types/known/timestamppb"
16+ )
17+
18+ type PostgresOnlineStore struct {
19+ project string
20+ pool * pgxpool.Pool
21+ config * registry.RepoConfig
22+ }
23+
24+ func NewPostgresOnlineStore (project string , config * registry.RepoConfig , onlineStoreConfig map [string ]interface {}) (* PostgresOnlineStore , error ) {
25+ connStr := buildPostgresConnString (onlineStoreConfig )
26+ poolConfig , err := pgxpool .ParseConfig (connStr )
27+ if err != nil {
28+ return nil , fmt .Errorf ("failed to parse postgres config: %w" , err )
29+ }
30+
31+ if schema , ok := onlineStoreConfig ["db_schema" ].(string ); ok && schema != "" {
32+ poolConfig .ConnConfig .RuntimeParams ["search_path" ] = schema
33+ }
34+
35+ pool , err := pgxpool .NewWithConfig (context .Background (), poolConfig )
36+ if err != nil {
37+ return nil , fmt .Errorf ("failed to create postgres pool: %w" , err )
38+ }
39+
40+ return & PostgresOnlineStore {
41+ project : project ,
42+ pool : pool ,
43+ config : config ,
44+ }, nil
45+ }
46+
47+ func (p * PostgresOnlineStore ) Destruct () {
48+ if p .pool != nil {
49+ p .pool .Close ()
50+ }
51+ }
52+
53+ func (p * PostgresOnlineStore ) OnlineRead (ctx context.Context , entityKeys []* types.EntityKey , featureViewNames []string , featureNames []string ) ([][]FeatureData , error ) {
54+ featureCount := len (featureNames )
55+ results := make ([][]FeatureData , len (entityKeys ))
56+
57+ serializedKeys := make ([][]byte , len (entityKeys ))
58+ entityKeyMap := make (map [string ]int , len (entityKeys ))
59+
60+ for i , entityKey := range entityKeys {
61+ serKey , err := serializeEntityKey (entityKey , p .config .EntityKeySerializationVersion )
62+ if err != nil {
63+ return nil , err
64+ }
65+ serializedKeys [i ] = * serKey
66+ entityKeyMap [string (* serKey )] = i
67+ }
68+
69+ type featureRef struct {
70+ name string
71+ index int
72+ }
73+ featuresByView := make (map [string ][]featureRef )
74+ for i , viewName := range featureViewNames {
75+ featuresByView [viewName ] = append (featuresByView [viewName ], featureRef {
76+ name : featureNames [i ],
77+ index : i ,
78+ })
79+ }
80+
81+ for viewName , features := range featuresByView {
82+ featureNamesToIdx := make (map [string ]int , len (features ))
83+ for _ , f := range features {
84+ featureNamesToIdx [f .name ] = f .index
85+ }
86+
87+ tableName := fmt .Sprintf (`"%s"` , strings .ReplaceAll (tableId (p .project , viewName ), `"` , `""` ))
88+ query := fmt .Sprintf (
89+ `SELECT entity_key, feature_name, value, event_ts FROM %s WHERE entity_key = ANY($1)` ,
90+ tableName ,
91+ )
92+
93+ rows , err := p .pool .Query (ctx , query , serializedKeys )
94+ if err != nil {
95+ return nil , fmt .Errorf ("failed to query postgres: %w" , err )
96+ }
97+
98+ for rows .Next () {
99+ var entityKeyBytes []byte
100+ var featureName string
101+ var valueBytes []byte
102+ var eventTs time.Time
103+
104+ if err := rows .Scan (& entityKeyBytes , & featureName , & valueBytes , & eventTs ); err != nil {
105+ rows .Close ()
106+ return nil , fmt .Errorf ("failed to scan postgres row: %w" , err )
107+ }
108+
109+ rowIdx , ok := entityKeyMap [string (entityKeyBytes )]
110+ if ! ok {
111+ continue
112+ }
113+
114+ if results [rowIdx ] == nil {
115+ results [rowIdx ] = make ([]FeatureData , featureCount )
116+ }
117+
118+ if featureIdx , ok := featureNamesToIdx [featureName ]; ok {
119+ var value types.Value
120+ if err := proto .Unmarshal (valueBytes , & value ); err != nil {
121+ rows .Close ()
122+ return nil , fmt .Errorf ("failed to unmarshal feature value: %w" , err )
123+ }
124+
125+ results [rowIdx ][featureIdx ] = FeatureData {
126+ Reference : serving.FeatureReferenceV2 {FeatureViewName : viewName , FeatureName : featureName },
127+ Timestamp : * timestamppb .New (eventTs ),
128+ Value : types.Value {Val : value .Val },
129+ }
130+ }
131+ }
132+ rows .Close ()
133+ if err := rows .Err (); err != nil {
134+ return nil , fmt .Errorf ("error iterating postgres rows: %w" , err )
135+ }
136+ }
137+
138+ return results , nil
139+ }
140+
141+ func buildPostgresConnString (config map [string ]interface {}) string {
142+ host , _ := config ["host" ].(string )
143+ if host == "" {
144+ host = "localhost"
145+ }
146+
147+ port := 5432
148+ if p , ok := config ["port" ].(float64 ); ok {
149+ port = int (p )
150+ }
151+
152+ database , _ := config ["database" ].(string )
153+ user , _ := config ["user" ].(string )
154+ password , _ := config ["password" ].(string )
155+
156+ var userInfo * url.Userinfo
157+ if user != "" {
158+ if password != "" {
159+ userInfo = url .UserPassword (user , password )
160+ } else {
161+ userInfo = url .User (user )
162+ }
163+ }
164+
165+ query := url.Values {}
166+ if sslMode , ok := config ["sslmode" ].(string ); ok && sslMode != "" {
167+ query .Set ("sslmode" , sslMode )
168+ } else {
169+ query .Set ("sslmode" , "disable" )
170+ }
171+
172+ if v , ok := config ["sslcert_path" ].(string ); ok && v != "" {
173+ query .Set ("sslcert" , v )
174+ }
175+ if v , ok := config ["sslkey_path" ].(string ); ok && v != "" {
176+ query .Set ("sslkey" , v )
177+ }
178+ if v , ok := config ["sslrootcert_path" ].(string ); ok && v != "" {
179+ query .Set ("sslrootcert" , v )
180+ }
181+ if v , ok := config ["min_conn" ].(float64 ); ok {
182+ query .Set ("pool_min_conns" , fmt .Sprintf ("%d" , int (v )))
183+ }
184+ if v , ok := config ["max_conn" ].(float64 ); ok {
185+ query .Set ("pool_max_conns" , fmt .Sprintf ("%d" , int (v )))
186+ }
187+
188+ connURL := url.URL {
189+ Scheme : "postgres" ,
190+ User : userInfo ,
191+ Host : fmt .Sprintf ("%s:%d" , host , port ),
192+ Path : database ,
193+ RawQuery : query .Encode (),
194+ }
195+
196+ return connURL .String ()
197+ }
0 commit comments