From 22a253bd0ea1b4ebf5b14fc01e459a109deba362 Mon Sep 17 00:00:00 2001 From: Luis Azofra Begara Date: Fri, 30 Jan 2026 12:43:30 +0100 Subject: [PATCH 1/5] feat(go): implement metrics and tracing for http and grpc servers Signed-off-by: Luis Azofra Begara --- go.mod | 10 ++++ go.sum | 68 +++++++++++++++++++++++++ go/internal/feast/metrics/metrics.go | 66 ++++++++++++++++++++++++ go/internal/feast/server/http_server.go | 49 +++++++++++++++++- go/main.go | 39 ++++++++++++-- 5 files changed, 228 insertions(+), 4 deletions(-) create mode 100644 go/internal/feast/metrics/metrics.go diff --git a/go.mod b/go.mod index f0dc02d116d..ecab4f10196 100644 --- a/go.mod +++ b/go.mod @@ -11,12 +11,15 @@ require ( github.com/aws/aws-sdk-go-v2/config v1.29.14 github.com/aws/aws-sdk-go-v2/service/dynamodb v1.43.3 github.com/aws/aws-sdk-go-v2/service/s3 v1.79.3 + github.com/cabify/gotoprom v1.1.0 github.com/ghodss/yaml v1.0.0 github.com/go-sql-driver/mysql v1.8.1 github.com/golang/protobuf v1.5.4 github.com/google/uuid v1.6.0 + github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.1.0 github.com/mattn/go-sqlite3 v1.14.23 github.com/pkg/errors v0.9.1 + github.com/prometheus/client_golang v1.23.2 github.com/redis/go-redis/v9 v9.6.1 github.com/roberson-io/mmh3 v0.0.0-20190729202758-fdfce3ba6225 github.com/rs/zerolog v1.33.0 @@ -64,6 +67,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.1 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.33.19 // indirect github.com/aws/smithy-go v1.22.2 // indirect + github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 // indirect @@ -81,6 +85,7 @@ require ( github.com/google/s2a-go v0.1.9 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.7 // indirect github.com/googleapis/gax-go/v2 v2.15.0 // indirect + github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 // indirect github.com/klauspost/asmfmt v1.3.2 // indirect github.com/klauspost/compress v1.18.0 // indirect @@ -89,9 +94,13 @@ require ( github.com/mattn/go-isatty v0.0.20 // indirect github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.66.1 // indirect + github.com/prometheus/procfs v0.16.1 // indirect github.com/spiffe/go-spiffe/v2 v2.5.0 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/zeebo/errs v1.4.0 // indirect @@ -103,6 +112,7 @@ require ( go.opentelemetry.io/otel/metric v1.38.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.38.0 // indirect go.opentelemetry.io/proto/otlp v1.7.1 // indirect + go.yaml.in/yaml/v2 v2.4.2 // indirect golang.org/x/crypto v0.45.0 // indirect golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect golang.org/x/mod v0.29.0 // indirect diff --git a/go.sum b/go.sum index 3080dbdd50a..ca11803e1c6 100644 --- a/go.sum +++ b/go.sum @@ -32,6 +32,8 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapp github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.54.0/go.mod h1:Mf6O40IAyB9zR/1J8nGDDPirZQQPbYJni8Yisy7NTMc= github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU= github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk= +github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= github.com/apache/arrow/go/v17 v17.0.0 h1:RRR2bdqKcdbss9Gxy2NS/hK8i4LDMh23L6BbkN5+F54= @@ -78,10 +80,16 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.33.19 h1:1XuUZ8mYJw9B6lzAkXhqHlJd/Xv github.com/aws/aws-sdk-go-v2/service/sts v1.33.19/go.mod h1:cQnB8CUnxbMU82JvlqjKR2HBOm3fe9pWorWBza6MBJ4= github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ= github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cabify/gotoprom v1.1.0 h1:IyM06IuVDPpEhBdXqSIfQK1KGrerkjGkDamrQqu8dWo= +github.com/cabify/gotoprom v1.1.0/go.mod h1:8H4gdB+iJqM8QrNneQxxbYsx4xA7m3h1BP8K7h16R4w= github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -89,6 +97,8 @@ github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 h1:aQ3y1lwWyqYPiWZThqv1aFbZMiM9vblcSArJRf2Irls= github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= @@ -107,16 +117,25 @@ github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-jose/go-jose/v4 v4.1.2 h1:TK/7NqRQZfgAh+Td8AlsrvtPoUyiHh0LqVvokh+1vHI= github.com/go-jose/go-jose/v4 v4.1.2/go.mod h1:22cg9HWM1pOlnRiY+9cQYJ9XHmya1bYW8OeDM6Ku6Oo= +github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= +github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +<<<<<<< HEAD github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= +======= +github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +>>>>>>> b9f99a60e (feat(go): implement metrics and tracing for http and grpc servers) github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= @@ -135,18 +154,28 @@ github.com/googleapis/enterprise-certificate-proxy v0.3.7 h1:zrn2Ee/nWmHulBx5sAV github.com/googleapis/enterprise-certificate-proxy v0.3.7/go.mod h1:MkHOF77EYAE7qfSuSS9PU6g4Nt4e11cnsDUowfwewLA= github.com/googleapis/gax-go/v2 v2.15.0 h1:SyjDc1mGgZU5LncH8gimWo9lW1DtIfPibOG81vgd/bo= github.com/googleapis/gax-go/v2 v2.15.0/go.mod h1:zVVkkxAQHa1RQpg9z2AUCMnKhi0Qld9rcmyfL1OZhoc= +github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.1.0 h1:QGLs/O40yoNK9vmy4rhUGBVyMf1lISBGtXRpsu/Qu/o= +github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.1.0/go.mod h1:hM2alZsMUni80N33RBe6J0e423LB+odMj7d3EMP9l20= +github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 h1:pRhl55Yx1eC7BZ1N+BBWwnKaMyD8uC+34TLdndZMAKk= +github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0/go.mod h1:XKMd7iuf/RGPSMJ/U4HP0zS2Z9Fh8Ps9a+6X26m/tmI= github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 h1:8Tjv8EJ+pM1xP8mK6egEbD1OgnVTyacbefKhmbLhIhU= github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2/go.mod h1:pkJQ2tZHJ0aFOVEEot6oZmaVEZcRme73eIFmhiVuRWs= +github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= @@ -155,18 +184,41 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-sqlite3 v1.14.23 h1:gbShiuAP1W5j9UOksQ06aiiqPMxYecovVGwmTxWtuw0= github.com/mattn/go-sqlite3 v1.14.23/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo= github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= +github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= +github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= +github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs= +github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= +github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= +github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4= github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= github.com/roberson-io/mmh3 v0.0.0-20190729202758-fdfce3ba6225 h1:ZMsPCp7oYgjoIFt1c+sM2qojxZXotSYcMF8Ur9/LJlM= @@ -176,12 +228,18 @@ github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7 github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8= github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= +github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spiffe/go-spiffe/v2 v2.5.0 h1:N2I01KCUkv1FAjZXJMwh95KK1ZIQLYbPfhaxw8WS0hE= github.com/spiffe/go-spiffe/v2 v2.5.0/go.mod h1:P+NxobPc6wXhVtINNtFjNWGBTreew1GBUCwT2wPmb7g= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= @@ -218,18 +276,26 @@ go.opentelemetry.io/proto/otlp v1.7.1 h1:gTOMpGDb0WTBOP8JaO72iL3auEZhVmAQg4ipjOV go.opentelemetry.io/proto/otlp v1.7.1/go.mod h1:b2rVh6rfI/s2pHWNlB7ILJcRALpcNDzKhACevjI+ZnE= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= +go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q= golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4= golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk= golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY= golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA= golang.org/x/mod v0.29.0/go.mod h1:NyhrlYXJ2H4eJiRy/WDBO6HMqZQ6q9nk4JzS3NuCK+w= +golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= golang.org/x/oauth2 v0.33.0 h1:4Q+qn+E5z8gPRJfmRy7C2gGG3T4jIprK6aSYgTXGRpo= golang.org/x/oauth2 v0.33.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I= golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -260,9 +326,11 @@ google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A= google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c= google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= +gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/go/internal/feast/metrics/metrics.go b/go/internal/feast/metrics/metrics.go new file mode 100644 index 00000000000..804eef6fa1b --- /dev/null +++ b/go/internal/feast/metrics/metrics.go @@ -0,0 +1,66 @@ +package metrics + +import ( + "reflect" + "time" + + "github.com/cabify/gotoprom" + "github.com/cabify/gotoprom/prometheusvanilla" + "github.com/prometheus/client_golang/prometheus" +) + +var HttpMetrics struct { + Duration func(HttpLabels) TimeHistogram `name:"http_request_duration_seconds" help:"Time taken to serve HTTP requests" buckets:".005,.01,.025,.05,.1,.25,.5,1,2.5,5,10"` + + RequestsTotal func(HttpLabels) prometheus.Counter `name:"http_requests_total" help:"Total number of HTTP requests"` +} + +type HttpLabels struct { + Method string `label:"method"` + Status int `label:"status"` + Path string `label:"path"` +} + +func init() { + gotoprom.MustAddBuilder(TimeHistogramType, RegisterTimeHistogram) + gotoprom.MustInit(&HttpMetrics, "feast") +} + +var ( + TimeHistogramType = reflect.TypeOf((*TimeHistogram)(nil)).Elem() +) + + +func RegisterTimeHistogram(name, help, namespace string, labelNames []string, tag reflect.StructTag) (func(prometheus.Labels) interface{}, prometheus.Collector, error) { + f, collector, err := prometheusvanilla.BuildHistogram(name, help, namespace, labelNames, tag) + if err != nil { + return nil, nil, err + } + + return func(labels prometheus.Labels) interface{} { + return timeHistogramAdapter{Histogram: f(labels).(prometheus.Histogram)} + }, collector, nil +} + +// TimeHistogram offers the basic prometheus.Histogram functionality +type TimeHistogram interface { + prometheus.Histogram + // Duration observes the duration in seconds + Duration(duration time.Duration) + // Since observes the duration in seconds since the time point provided + Since(time.Time) +} + +type timeHistogramAdapter struct { + prometheus.Histogram +} + +// Duration observes the duration in seconds +func (to timeHistogramAdapter) Duration(duration time.Duration) { + to.Observe(duration.Seconds()) +} + +// Since observes the duration in seconds since the time point provided +func (to timeHistogramAdapter) Since(duration time.Time) { + to.Duration(time.Since(duration)) +} diff --git a/go/internal/feast/server/http_server.go b/go/internal/feast/server/http_server.go index 312a0a6352e..7acfdae407c 100644 --- a/go/internal/feast/server/http_server.go +++ b/go/internal/feast/server/http_server.go @@ -17,6 +17,9 @@ import ( prototypes "github.com/feast-dev/feast/go/protos/feast/types" "github.com/feast-dev/feast/go/types" "github.com/rs/zerolog/log" + + "github.com/feast-dev/feast/go/internal/feast/metrics" + "github.com/prometheus/client_golang/prometheus/promhttp" ) type httpServer struct { @@ -335,9 +338,53 @@ func recoverMiddleware(next http.Handler) http.Handler { }) } +type statusWriter struct { + http.ResponseWriter + status int + length int +} + +func (w *statusWriter) WriteHeader(status int) { + if w.status == 0 { + w.status = status + } + w.ResponseWriter.WriteHeader(status) +} + +func (w *statusWriter) Write(b []byte) (int, error) { + if w.status == 0 { + w.status = 200 + } + n, err := w.ResponseWriter.Write(b) + w.length += n + return n, err +} + +func metricsMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t0 := time.Now() + sw := &statusWriter{ResponseWriter: w} + next.ServeHTTP(sw, r) + duration := time.Since(t0) + + metrics.HttpMetrics.Duration(metrics.HttpLabels{ + Method: r.Method, + Status: sw.status, + Path: r.URL.Path, + }).Duration(duration) + + metrics.HttpMetrics.RequestsTotal(metrics.HttpLabels{ + Method: r.Method, + Status: sw.status, + Path: r.URL.Path, + }).Inc() + }) +} + func (s *httpServer) Serve(host string, port int) error { mux := http.NewServeMux() - mux.Handle("/get-online-features", recoverMiddleware(http.HandlerFunc(s.getOnlineFeatures))) + mux.Handle("/get-online-features", metricsMiddleware(recoverMiddleware(http.HandlerFunc(s.getOnlineFeatures)))) + mux.Handle("/metrics", promhttp.Handler()) mux.HandleFunc("/health", healthCheckHandler) s.server = &http.Server{Addr: fmt.Sprintf("%s:%d", host, port), Handler: mux, ReadTimeout: 5 * time.Second, WriteTimeout: 10 * time.Second, IdleTimeout: 15 * time.Second} err := s.server.ListenAndServe() diff --git a/go/main.go b/go/main.go index 77999671e07..8673f368e60 100644 --- a/go/main.go +++ b/go/main.go @@ -5,9 +5,11 @@ import ( "flag" "fmt" "net" + "net/http" "os" "os/signal" "strings" + "sync" "syscall" "github.com/feast-dev/feast/go/internal/feast" @@ -20,6 +22,9 @@ import ( "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" @@ -155,16 +160,38 @@ func StartGrpcServer(fs *feast.FeatureStore, host string, port int, writeLoggedF if err != nil { return err } - - grpcServer := grpc.NewServer() + srvMetrics := grpc_prometheus.NewServerMetrics( + grpc_prometheus.WithServerHandlingTimeHistogram( + grpc_prometheus.WithHistogramBuckets([]float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10}), + ), + ) + prometheus.MustRegister(srvMetrics) + grpcServer := grpc.NewServer( + grpc.UnaryInterceptor(srvMetrics.UnaryServerInterceptor()), + ) serving.RegisterServingServiceServer(grpcServer, ser) healthService := health.NewServer() grpc_health_v1.RegisterHealthServer(grpcServer, healthService) + srvMetrics.InitializeMetrics(grpcServer) + + metricsServer := &http.Server{Addr: fmt.Sprintf(":%d", 9090)} + go func() { + log.Info().Msgf("Starting metrics server on port %d", 9090) + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + metricsServer.Handler = mux + if err := metricsServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Error().Err(err).Msg("Failed to start metrics server") + } + }() stop := make(chan os.Signal, 1) signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM) + var wg sync.WaitGroup + wg.Add(1) go func() { + defer wg.Done() // As soon as these signals are received from OS, try to gracefully stop the gRPC server <-stop log.Info().Msg("Stopping the gRPC server...") @@ -172,10 +199,16 @@ func StartGrpcServer(fs *feast.FeatureStore, host string, port int, writeLoggedF if loggingService != nil { loggingService.Stop() } + log.Info().Msg("Stopping metrics server...") + if err := metricsServer.Shutdown(context.Background()); err != nil { + log.Error().Err(err).Msg("Error stopping metrics server") + } log.Info().Msg("gRPC server terminated") }() - return grpcServer.Serve(lis) + err = grpcServer.Serve(lis) + wg.Wait() + return err } // StartHttpServerWithLogging starts HTTP server with enabled feature logging From 1664ae5888658b283c5cfb56fc9d133db2e68470 Mon Sep 17 00:00:00 2001 From: Luis Azofra Begara Date: Wed, 11 Feb 2026 13:08:08 +0100 Subject: [PATCH 2/5] fix(server): improve metrics, config, and shutdown logic Signed-off-by: Luis Azofra Begara --- go/internal/feast/server/http_server.go | 10 ++--- go/main.go | 55 ++++++++++++++++++------- go/main_test.go | 18 ++++---- 3 files changed, 54 insertions(+), 29 deletions(-) diff --git a/go/internal/feast/server/http_server.go b/go/internal/feast/server/http_server.go index 7acfdae407c..adfd40110e7 100644 --- a/go/internal/feast/server/http_server.go +++ b/go/internal/feast/server/http_server.go @@ -19,7 +19,6 @@ import ( "github.com/rs/zerolog/log" "github.com/feast-dev/feast/go/internal/feast/metrics" - "github.com/prometheus/client_golang/prometheus/promhttp" ) type httpServer struct { @@ -341,7 +340,6 @@ func recoverMiddleware(next http.Handler) http.Handler { type statusWriter struct { http.ResponseWriter status int - length int } func (w *statusWriter) WriteHeader(status int) { @@ -356,7 +354,6 @@ func (w *statusWriter) Write(b []byte) (int, error) { w.status = 200 } n, err := w.ResponseWriter.Write(b) - w.length += n return n, err } @@ -367,6 +364,10 @@ func metricsMiddleware(next http.Handler) http.Handler { next.ServeHTTP(sw, r) duration := time.Since(t0) + if sw.status == 0 { + sw.status = 200 + } + metrics.HttpMetrics.Duration(metrics.HttpLabels{ Method: r.Method, Status: sw.status, @@ -384,8 +385,7 @@ func metricsMiddleware(next http.Handler) http.Handler { func (s *httpServer) Serve(host string, port int) error { mux := http.NewServeMux() mux.Handle("/get-online-features", metricsMiddleware(recoverMiddleware(http.HandlerFunc(s.getOnlineFeatures)))) - mux.Handle("/metrics", promhttp.Handler()) - mux.HandleFunc("/health", healthCheckHandler) + mux.Handle("/health", metricsMiddleware(http.HandlerFunc(healthCheckHandler))) s.server = &http.Server{Addr: fmt.Sprintf("%s:%d", host, port), Handler: mux, ReadTimeout: 5 * time.Second, WriteTimeout: 10 * time.Second, IdleTimeout: 15 * time.Second} err := s.server.ListenAndServe() // Don't return the error if it's caused by graceful shutdown using Stop() diff --git a/go/main.go b/go/main.go index 8673f368e60..46e5ccfc3e8 100644 --- a/go/main.go +++ b/go/main.go @@ -37,18 +37,18 @@ import ( var tracer trace.Tracer type ServerStarter interface { - StartHttpServer(fs *feast.FeatureStore, host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error - StartGrpcServer(fs *feast.FeatureStore, host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error + StartHttpServer(fs *feast.FeatureStore, host string, port int, metricsPort int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error + StartGrpcServer(fs *feast.FeatureStore, host string, port int, metricsPort int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error } type RealServerStarter struct{} -func (s *RealServerStarter) StartHttpServer(fs *feast.FeatureStore, host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { - return StartHttpServer(fs, host, port, writeLoggedFeaturesCallback, loggingOpts) +func (s *RealServerStarter) StartHttpServer(fs *feast.FeatureStore, host string, port int, metricsPort int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { + return StartHttpServer(fs, host, port, metricsPort, writeLoggedFeaturesCallback, loggingOpts) } -func (s *RealServerStarter) StartGrpcServer(fs *feast.FeatureStore, host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { - return StartGrpcServer(fs, host, port, writeLoggedFeaturesCallback, loggingOpts) +func (s *RealServerStarter) StartGrpcServer(fs *feast.FeatureStore, host string, port int, metricsPort int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { + return StartGrpcServer(fs, host, port, metricsPort, writeLoggedFeaturesCallback, loggingOpts) } func main() { @@ -56,6 +56,7 @@ func main() { serverType := "http" host := "" port := 8080 + metricsPort := 9090 server := RealServerStarter{} // Current Directory repoPath, err := os.Getwd() @@ -68,6 +69,7 @@ func main() { flag.StringVar(&host, "host", host, "Specify a host for the server") flag.IntVar(&port, "port", port, "Specify a port for the server") + flag.IntVar(&metricsPort, "metrics-port", metricsPort, "Specify a port for the metrics server") flag.Parse() // Initialize tracer @@ -114,9 +116,9 @@ func main() { // TODO: writeLoggedFeaturesCallback is defaulted to nil. write_logged_features functionality needs to be // implemented in Golang specific to OfflineStoreSink. Python Feature Server doesn't support this. if serverType == "http" { - err = server.StartHttpServer(fs, host, port, nil, loggingOptions) + err = server.StartHttpServer(fs, host, port, metricsPort, nil, loggingOptions) } else if serverType == "grpc" { - err = server.StartGrpcServer(fs, host, port, nil, loggingOptions) + err = server.StartGrpcServer(fs, host, port, metricsPort, nil, loggingOptions) } else { fmt.Println("Unknown server type. Please specify 'http' or 'grpc'.") } @@ -149,7 +151,7 @@ func constructLoggingService(fs *feast.FeatureStore, writeLoggedFeaturesCallback } // StartGprcServerWithLogging starts gRPC server with enabled feature logging -func StartGrpcServer(fs *feast.FeatureStore, host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { +func StartGrpcServer(fs *feast.FeatureStore, host string, port int, metricsPort int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { loggingService, err := constructLoggingService(fs, writeLoggedFeaturesCallback, loggingOpts) if err != nil { return err @@ -174,9 +176,10 @@ func StartGrpcServer(fs *feast.FeatureStore, host string, port int, writeLoggedF grpc_health_v1.RegisterHealthServer(grpcServer, healthService) srvMetrics.InitializeMetrics(grpcServer) - metricsServer := &http.Server{Addr: fmt.Sprintf(":%d", 9090)} + // Start metrics server + metricsServer := &http.Server{Addr: fmt.Sprintf(":%d", metricsPort)} go func() { - log.Info().Msgf("Starting metrics server on port %d", 9090) + log.Info().Msgf("Starting metrics server on port %d", metricsPort) mux := http.NewServeMux() mux.Handle("/metrics", promhttp.Handler()) metricsServer.Handler = mux @@ -214,18 +217,32 @@ func StartGrpcServer(fs *feast.FeatureStore, host string, port int, writeLoggedF // StartHttpServerWithLogging starts HTTP server with enabled feature logging // Go does not allow direct assignment to package-level functions as a way to // mock them for tests -func StartHttpServer(fs *feast.FeatureStore, host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { +func StartHttpServer(fs *feast.FeatureStore, host string, port int, metricsPort int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { loggingService, err := constructLoggingService(fs, writeLoggedFeaturesCallback, loggingOpts) if err != nil { return err } ser := server.NewHttpServer(fs, loggingService) log.Info().Msgf("Starting a HTTP server on host %s, port %d", host, port) + // Start metrics server + metricsServer := &http.Server{Addr: fmt.Sprintf(":%d", metricsPort)} + go func() { + log.Info().Msgf("Starting metrics server on port %d", metricsPort) + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + metricsServer.Handler = mux + if err := metricsServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Error().Err(err).Msg("Failed to start metrics server") + } + }() stop := make(chan os.Signal, 1) signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM) + var wg sync.WaitGroup + wg.Add(1) go func() { + defer wg.Done() // As soon as these signals are received from OS, try to gracefully stop the gRPC server <-stop log.Info().Msg("Stopping the HTTP server...") @@ -233,13 +250,19 @@ func StartHttpServer(fs *feast.FeatureStore, host string, port int, writeLoggedF if err != nil { log.Error().Err(err).Msg("Error when stopping the HTTP server") } + log.Info().Msg("Stopping metrics server...") + if err := metricsServer.Shutdown(context.Background()); err != nil { + log.Error().Err(err).Msg("Error stopping metrics server") + } if loggingService != nil { loggingService.Stop() } log.Info().Msg("HTTP server terminated") }() - return ser.Serve(host, port) + err = ser.Serve(host, port) + wg.Wait() + return err } func OTELTracingEnabled() bool { @@ -256,11 +279,15 @@ func newExporter(ctx context.Context) (*otlptrace.Exporter, error) { } func newTracerProvider(exp sdktrace.SpanExporter) (*sdktrace.TracerProvider, error) { + serviceName := os.Getenv("OTEL_SERVICE_NAME") + if serviceName == "" { + serviceName = "FeastGoFeatureServer" + } r, err := resource.Merge( resource.Default(), resource.NewWithAttributes( semconv.SchemaURL, - semconv.ServiceName("FeastGoFeatureServer"), + semconv.ServiceName(serviceName), ), ) diff --git a/go/main_test.go b/go/main_test.go index 567a6cf5af4..f1f2ae98698 100644 --- a/go/main_test.go +++ b/go/main_test.go @@ -14,13 +14,13 @@ type MockServerStarter struct { mock.Mock } -func (m *MockServerStarter) StartHttpServer(fs *feast.FeatureStore, host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { - args := m.Called(fs, host, port, writeLoggedFeaturesCallback, loggingOpts) +func (m *MockServerStarter) StartHttpServer(fs *feast.FeatureStore, host string, port int, metricsPort int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { + args := m.Called(fs, host, port, metricsPort, writeLoggedFeaturesCallback, loggingOpts) return args.Error(0) } -func (m *MockServerStarter) StartGrpcServer(fs *feast.FeatureStore, host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { - args := m.Called(fs, host, port, writeLoggedFeaturesCallback, loggingOpts) +func (m *MockServerStarter) StartGrpcServer(fs *feast.FeatureStore, host string, port int, metricsPort int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { + args := m.Called(fs, host, port, metricsPort, writeLoggedFeaturesCallback, loggingOpts) return args.Error(0) } @@ -34,9 +34,9 @@ func TestStartHttpServer(t *testing.T) { loggingOpts := &logging.LoggingOptions{} - mockServerStarter.On("StartHttpServer", fs, host, port, mock.AnythingOfType("logging.OfflineStoreWriteCallback"), loggingOpts).Return(nil) + mockServerStarter.On("StartHttpServer", fs, host, port, 9090, mock.AnythingOfType("logging.OfflineStoreWriteCallback"), loggingOpts).Return(nil) - err := mockServerStarter.StartHttpServer(fs, host, port, writeLoggedFeaturesCallback, loggingOpts) + err := mockServerStarter.StartHttpServer(fs, host, port, 9090, writeLoggedFeaturesCallback, loggingOpts) assert.NoError(t, err) mockServerStarter.AssertExpectations(t) } @@ -50,9 +50,9 @@ func TestStartGrpcServer(t *testing.T) { var writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback loggingOpts := &logging.LoggingOptions{} - mockServerStarter.On("StartGrpcServer", fs, host, port, mock.AnythingOfType("logging.OfflineStoreWriteCallback"), loggingOpts).Return(nil) + mockServerStarter.On("StartGrpcServer", fs, host, port, 9090, mock.AnythingOfType("logging.OfflineStoreWriteCallback"), loggingOpts).Return(nil) - err := mockServerStarter.StartGrpcServer(fs, host, port, writeLoggedFeaturesCallback, loggingOpts) + err := mockServerStarter.StartGrpcServer(fs, host, port, 9090, writeLoggedFeaturesCallback, loggingOpts) assert.NoError(t, err) mockServerStarter.AssertExpectations(t) } @@ -67,5 +67,3 @@ func TestConstructLoggingService(t *testing.T) { assert.NoError(t, err) // Further assertions can be added here based on the expected behavior of constructLoggingService } - -// Note: Additional tests can be written for other functions and error scenarios. From 1bf7277fb730eca31558c93fa53dc2fde7baaf5c Mon Sep 17 00:00:00 2001 From: Luis Azofra Begara Date: Wed, 11 Feb 2026 13:15:28 +0100 Subject: [PATCH 3/5] chore: update go.sum after rebase Signed-off-by: Luis Azofra Begara --- go.sum | 3 --- 1 file changed, 3 deletions(-) diff --git a/go.sum b/go.sum index ca11803e1c6..8e586aaa5ab 100644 --- a/go.sum +++ b/go.sum @@ -124,12 +124,9 @@ github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= -<<<<<<< HEAD github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= -======= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= ->>>>>>> b9f99a60e (feat(go): implement metrics and tracing for http and grpc servers) github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= From 816423b7b16286df5f6cd981bc3e667950732557 Mon Sep 17 00:00:00 2001 From: Luis Azofra Begara Date: Wed, 11 Feb 2026 14:26:23 +0100 Subject: [PATCH 4/5] docs: improve README instructions for metrics and tracing Signed-off-by: Luis Azofra Begara --- go/README.md | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/go/README.md b/go/README.md index a8e381519a4..308787cac9f 100644 --- a/go/README.md +++ b/go/README.md @@ -6,14 +6,27 @@ To build and run the Go Feature Server locally, create a feature_store.yaml file ```bash go build -o feast-go ./go/main.go - # start the http server - ./feast-go --type=http --port=8080 + # start the http server (metrics on port 9090 by default) + ./feast-go --type=http --port=8080 --metrics-port=9090 # or start the gRPC server - #./feast-go --type=grpc --port=[your-choice] + #./feast-go --type=grpc --port=[your-choice] --metrics-port=9091 ``` +## Prometheus Metrics +The server exposes Prometheus metrics at the `/metrics` endpoint on a dedicated port (default `:9090`). +- **HTTP Mode**: Metrics server runs on port `9090` (configurable via `-metrics-port`). +- **gRPC Mode**: Metrics server runs on port `9090` (configurable via `-metrics-port`). + +Key metrics include: +- `http_request_duration_seconds`: Histogram of response latency. +- `http_requests_total`: Counter of HTTP requests by status, method, and path. +- Standard Go and Process metrics. + +A `/health` endpoint is available on the main application port (default `:8080`) for readiness probes. + ## OTEL based observability The OS level env variable `ENABLE_OTEL_TRACING=="true"/"false"` (string type) is used to enable/disable this service (with Tracing only). +You can also configure the service name using `OTEL_SERVICE_NAME` env variable (defaults to "FeastGoFeatureServer"). The default exporter URL is "http://localhost:4318". The default schema of sending data to collector is **HTTP**. Please refer the following two docs about the configuration of the OTEL exporter: 1. https://opentelemetry.io/docs/languages/sdk-configuration/otlp-exporter/ From 41d58e282866d175cc3482cf5ea135be6c29835f Mon Sep 17 00:00:00 2001 From: Luis Azofra Begara Date: Wed, 11 Feb 2026 16:09:44 +0100 Subject: [PATCH 5/5] fix(server): resolve potential deadlock during shutdown Signed-off-by: Luis Azofra Begara --- go/main.go | 70 +++++++++++++++++++++++++++++++++++------------------- 1 file changed, 45 insertions(+), 25 deletions(-) diff --git a/go/main.go b/go/main.go index 46e5ccfc3e8..f49a27efa46 100644 --- a/go/main.go +++ b/go/main.go @@ -193,23 +193,33 @@ func StartGrpcServer(fs *feast.FeatureStore, host string, port int, metricsPort var wg sync.WaitGroup wg.Add(1) + serverExited := make(chan struct{}) go func() { defer wg.Done() - // As soon as these signals are received from OS, try to gracefully stop the gRPC server - <-stop - log.Info().Msg("Stopping the gRPC server...") - grpcServer.GracefulStop() - if loggingService != nil { - loggingService.Stop() + select { + case <-stop: + // Received SIGINT/SIGTERM. Perform graceful shutdown. + log.Info().Msg("Stopping the gRPC server...") + grpcServer.GracefulStop() + if loggingService != nil { + loggingService.Stop() + } + log.Info().Msg("Stopping metrics server...") + if err := metricsServer.Shutdown(context.Background()); err != nil { + log.Error().Err(err).Msg("Error stopping metrics server") + } + log.Info().Msg("gRPC server terminated") + case <-serverExited: + // Server exited (e.g. startup error), ensure metrics server is stopped + metricsServer.Shutdown(context.Background()) + if loggingService != nil { + loggingService.Stop() + } } - log.Info().Msg("Stopping metrics server...") - if err := metricsServer.Shutdown(context.Background()); err != nil { - log.Error().Err(err).Msg("Error stopping metrics server") - } - log.Info().Msg("gRPC server terminated") }() err = grpcServer.Serve(lis) + close(serverExited) wg.Wait() return err } @@ -241,26 +251,36 @@ func StartHttpServer(fs *feast.FeatureStore, host string, port int, metricsPort var wg sync.WaitGroup wg.Add(1) + serverExited := make(chan struct{}) go func() { defer wg.Done() - // As soon as these signals are received from OS, try to gracefully stop the gRPC server - <-stop - log.Info().Msg("Stopping the HTTP server...") - err := ser.Stop() - if err != nil { - log.Error().Err(err).Msg("Error when stopping the HTTP server") - } - log.Info().Msg("Stopping metrics server...") - if err := metricsServer.Shutdown(context.Background()); err != nil { - log.Error().Err(err).Msg("Error stopping metrics server") - } - if loggingService != nil { - loggingService.Stop() + select { + case <-stop: + // Received SIGINT/SIGTERM. Perform graceful shutdown. + log.Info().Msg("Stopping the HTTP server...") + err := ser.Stop() + if err != nil { + log.Error().Err(err).Msg("Error when stopping the HTTP server") + } + log.Info().Msg("Stopping metrics server...") + if err := metricsServer.Shutdown(context.Background()); err != nil { + log.Error().Err(err).Msg("Error stopping metrics server") + } + if loggingService != nil { + loggingService.Stop() + } + log.Info().Msg("HTTP server terminated") + case <-serverExited: + // Server exited (e.g. startup error), ensure metrics server is stopped + metricsServer.Shutdown(context.Background()) + if loggingService != nil { + loggingService.Stop() + } } - log.Info().Msg("HTTP server terminated") }() err = ser.Serve(host, port) + close(serverExited) wg.Wait() return err }