Skip to content

Commit 82aca5b

Browse files
feat(go): implement metrics and tracing for http and grpc servers
1 parent d6c0b2d commit 82aca5b

File tree

9 files changed

+270
-7
lines changed

9 files changed

+270
-7
lines changed

go.mod

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@ require (
1111
github.com/aws/aws-sdk-go-v2/config v1.29.14
1212
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.43.3
1313
github.com/aws/aws-sdk-go-v2/service/s3 v1.79.3
14+
github.com/cabify/gotoprom v1.1.0
1415
github.com/ghodss/yaml v1.0.0
1516
github.com/golang/protobuf v1.5.4
1617
github.com/google/uuid v1.6.0
1718
github.com/mattn/go-sqlite3 v1.14.23
1819
github.com/pkg/errors v0.9.1
20+
github.com/prometheus/client_golang v1.23.2
1921
github.com/redis/go-redis/v9 v9.6.1
2022
github.com/roberson-io/mmh3 v0.0.0-20190729202758-fdfce3ba6225
2123
github.com/rs/zerolog v1.33.0
@@ -62,6 +64,7 @@ require (
6264
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.1 // indirect
6365
github.com/aws/aws-sdk-go-v2/service/sts v1.33.19 // indirect
6466
github.com/aws/smithy-go v1.22.2 // indirect
67+
github.com/beorn7/perks v1.0.1 // indirect
6568
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
6669
github.com/cespare/xxhash/v2 v2.3.0 // indirect
6770
github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 // indirect
@@ -81,15 +84,19 @@ require (
8184
github.com/googleapis/gax-go/v2 v2.15.0 // indirect
8285
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 // indirect
8386
github.com/klauspost/asmfmt v1.3.2 // indirect
84-
github.com/klauspost/compress v1.17.9 // indirect
87+
github.com/klauspost/compress v1.18.0 // indirect
8588
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
8689
github.com/mattn/go-colorable v0.1.13 // indirect
8790
github.com/mattn/go-isatty v0.0.20 // indirect
8891
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
8992
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
93+
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
9094
github.com/pierrec/lz4/v4 v4.1.21 // indirect
9195
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
9296
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
97+
github.com/prometheus/client_model v0.6.2 // indirect
98+
github.com/prometheus/common v0.66.1 // indirect
99+
github.com/prometheus/procfs v0.16.1 // indirect
93100
github.com/spiffe/go-spiffe/v2 v2.5.0 // indirect
94101
github.com/stretchr/objx v0.5.2 // indirect
95102
github.com/zeebo/errs v1.4.0 // indirect
@@ -101,6 +108,7 @@ require (
101108
go.opentelemetry.io/otel/metric v1.38.0 // indirect
102109
go.opentelemetry.io/otel/sdk/metric v1.38.0 // indirect
103110
go.opentelemetry.io/proto/otlp v1.7.1 // indirect
111+
go.yaml.in/yaml/v2 v2.4.2 // indirect
104112
golang.org/x/crypto v0.45.0 // indirect
105113
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect
106114
golang.org/x/mod v0.29.0 // indirect

go.sum

Lines changed: 63 additions & 2 deletions
Large diffs are not rendered by default.

go/feast-server

94.6 MB
Binary file not shown.
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package metrics
2+
3+
import (
4+
"reflect"
5+
"time"
6+
7+
"github.com/cabify/gotoprom"
8+
"github.com/cabify/gotoprom/prometheusvanilla"
9+
"github.com/prometheus/client_golang/prometheus"
10+
)
11+
12+
var Metrics struct {
13+
HttpDuration func(HttpLabels) TimeHistogram `name:"feast_http_request_duration_seconds" help:"Time taken to serve HTTP requests" buckets:".005,.01,.025,.05,.1,.25,.5,1,2.5,5,10"`
14+
15+
GrpcDuration func(GrpcLabels) TimeHistogram `name:"feast_grpc_request_duration_seconds" help:"Time taken to serve gRPC requests" buckets:".005,.01,.025,.05,.1,.25,.5,1,2.5,5,10"`
16+
17+
HttpRequestsTotal func(HttpLabels) prometheus.Counter `name:"feast_http_requests_total" help:"Total number of HTTP requests"`
18+
19+
GrpcRequestsTotal func(GrpcLabels) prometheus.Counter `name:"feast_grpc_requests_total" help:"Total number of gRPC requests"`
20+
}
21+
22+
type HttpLabels struct {
23+
Method string `label:"method"`
24+
Status int `label:"status"`
25+
Path string `label:"path"`
26+
}
27+
28+
type GrpcLabels struct {
29+
Service string `label:"service"`
30+
Method string `label:"method"`
31+
Code string `label:"code"`
32+
}
33+
34+
func InitMetrics() {
35+
gotoprom.MustInit(&Metrics, "feast")
36+
}
37+
38+
// TimeHistogram boilerplate from gotoprom README
39+
40+
var (
41+
// TimeHistogramType is the reflect.Type of the TimeHistogram interface
42+
TimeHistogramType = reflect.TypeOf((*TimeHistogram)(nil)).Elem()
43+
)
44+
45+
func init() {
46+
gotoprom.MustAddBuilder(TimeHistogramType, RegisterTimeHistogram)
47+
}
48+
49+
// RegisterTimeHistogram registers a TimeHistogram after registering the underlying prometheus.Histogram in the prometheus.Registerer provided
50+
// The function it returns returns a TimeHistogram type as an interface{}
51+
func RegisterTimeHistogram(name, help, namespace string, labelNames []string, tag reflect.StructTag) (func(prometheus.Labels) interface{}, prometheus.Collector, error) {
52+
f, collector, err := prometheusvanilla.BuildHistogram(name, help, namespace, labelNames, tag)
53+
if err != nil {
54+
return nil, nil, err
55+
}
56+
57+
return func(labels prometheus.Labels) interface{} {
58+
return timeHistogramAdapter{Histogram: f(labels).(prometheus.Histogram)}
59+
}, collector, nil
60+
}
61+
62+
// TimeHistogram offers the basic prometheus.Histogram functionality
63+
// with additional time-observing functions
64+
type TimeHistogram interface {
65+
prometheus.Histogram
66+
// Duration observes the duration in seconds
67+
Duration(duration time.Duration)
68+
// Since observes the duration in seconds since the time point provided
69+
Since(time.Time)
70+
}
71+
72+
type timeHistogramAdapter struct {
73+
prometheus.Histogram
74+
}
75+
76+
// Duration observes the duration in seconds
77+
func (to timeHistogramAdapter) Duration(duration time.Duration) {
78+
to.Observe(duration.Seconds())
79+
}
80+
81+
// Since observes the duration in seconds since the time point provided
82+
func (to timeHistogramAdapter) Since(duration time.Time) {
83+
to.Duration(time.Since(duration))
84+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package metrics
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestMetricsInit(t *testing.T) {
10+
InitMetrics()
11+
12+
// We can't easily check the global registry without possibly conflicting with other tests or init order,
13+
// but we can check if the struct fields are populated (not nil)
14+
15+
assert.NotNil(t, Metrics.HttpDuration)
16+
assert.NotNil(t, Metrics.GrpcDuration)
17+
assert.NotNil(t, Metrics.HttpRequestsTotal)
18+
19+
// Call them to ensure no panic
20+
Metrics.HttpRequestsTotal(HttpLabels{Method: "GET", Status: 200, Path: "/"}).Inc()
21+
}

go/internal/feast/server/grpc_server.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
prototypes "github.com/feast-dev/feast/go/protos/feast/types"
1010
"github.com/feast-dev/feast/go/types"
1111
"github.com/google/uuid"
12-
1312
)
1413

1514
const feastServerVersion = "0.0.1"

go/internal/feast/server/http_server.go

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ import (
1717
prototypes "github.com/feast-dev/feast/go/protos/feast/types"
1818
"github.com/feast-dev/feast/go/types"
1919
"github.com/rs/zerolog/log"
20+
21+
"github.com/feast-dev/feast/go/internal/feast/metrics"
22+
"github.com/prometheus/client_golang/prometheus/promhttp"
2023
)
2124

2225
type httpServer struct {
@@ -335,9 +338,51 @@ func recoverMiddleware(next http.Handler) http.Handler {
335338
})
336339
}
337340

341+
type statusWriter struct {
342+
http.ResponseWriter
343+
status int
344+
length int
345+
}
346+
347+
func (w *statusWriter) WriteHeader(status int) {
348+
w.status = status
349+
w.ResponseWriter.WriteHeader(status)
350+
}
351+
352+
func (w *statusWriter) Write(b []byte) (int, error) {
353+
if w.status == 0 {
354+
w.status = 200
355+
}
356+
n, err := w.ResponseWriter.Write(b)
357+
w.length += n
358+
return n, err
359+
}
360+
361+
func metricsMiddleware(next http.Handler) http.Handler {
362+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
363+
t0 := time.Now()
364+
sw := &statusWriter{ResponseWriter: w}
365+
next.ServeHTTP(sw, r)
366+
duration := time.Since(t0)
367+
368+
metrics.Metrics.HttpDuration(metrics.HttpLabels{
369+
Method: r.Method,
370+
Status: sw.status,
371+
Path: r.URL.Path,
372+
}).Duration(duration)
373+
374+
metrics.Metrics.HttpRequestsTotal(metrics.HttpLabels{
375+
Method: r.Method,
376+
Status: sw.status,
377+
Path: r.URL.Path,
378+
}).Inc()
379+
})
380+
}
381+
338382
func (s *httpServer) Serve(host string, port int) error {
339383
mux := http.NewServeMux()
340-
mux.Handle("/get-online-features", recoverMiddleware(http.HandlerFunc(s.getOnlineFeatures)))
384+
mux.Handle("/get-online-features", metricsMiddleware(recoverMiddleware(http.HandlerFunc(s.getOnlineFeatures))))
385+
mux.Handle("/metrics", promhttp.Handler())
341386
mux.HandleFunc("/health", healthCheckHandler)
342387
s.server = &http.Server{Addr: fmt.Sprintf("%s:%d", host, port), Handler: mux, ReadTimeout: 5 * time.Second, WriteTimeout: 10 * time.Second, IdleTimeout: 15 * time.Second}
343388
err := s.server.ListenAndServe()

go/internal/feast/server/server_commons.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import (
44
"os"
55

66
"github.com/rs/zerolog"
7-
"go.opentelemetry.io/otel/trace"
87
"go.opentelemetry.io/otel"
8+
"go.opentelemetry.io/otel/trace"
99
)
1010

1111
var tracer = otel.Tracer("github.com/feast-dev/feast/go/server")

go/main.go

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@ import (
55
"flag"
66
"fmt"
77
"net"
8+
"net/http"
89
"os"
910
"os/signal"
1011
"strings"
1112
"syscall"
13+
"time"
1214

1315
"github.com/feast-dev/feast/go/internal/feast"
1416
"github.com/feast-dev/feast/go/internal/feast/registry"
@@ -19,7 +21,10 @@ import (
1921
"google.golang.org/grpc"
2022
"google.golang.org/grpc/health"
2123
"google.golang.org/grpc/health/grpc_health_v1"
24+
"google.golang.org/grpc/status"
2225

26+
"github.com/feast-dev/feast/go/internal/feast/metrics"
27+
"github.com/prometheus/client_golang/prometheus/promhttp"
2328
"go.opentelemetry.io/otel"
2429
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
2530
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
@@ -65,6 +70,8 @@ func main() {
6570
flag.IntVar(&port, "port", port, "Specify a port for the server")
6671
flag.Parse()
6772

73+
metrics.InitMetrics()
74+
6875
// Initialize tracer
6976
if OTELTracingEnabled() {
7077
ctx := context.Background()
@@ -156,11 +163,49 @@ func StartGrpcServer(fs *feast.FeatureStore, host string, port int, writeLoggedF
156163
return err
157164
}
158165

159-
grpcServer := grpc.NewServer()
166+
grpcServer := grpc.NewServer(
167+
grpc.UnaryInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
168+
t0 := time.Now()
169+
resp, err := handler(ctx, req)
170+
duration := time.Since(t0)
171+
code := status.Code(err).String()
172+
173+
parts := strings.Split(info.FullMethod, "/")
174+
service := "unknown"
175+
method := "unknown"
176+
if len(parts) >= 3 {
177+
service = parts[1]
178+
method = parts[2]
179+
}
180+
181+
metrics.Metrics.GrpcDuration(metrics.GrpcLabels{
182+
Service: service,
183+
Method: method,
184+
Code: code,
185+
}).Duration(duration)
186+
187+
metrics.Metrics.GrpcRequestsTotal(metrics.GrpcLabels{
188+
Service: service,
189+
Method: method,
190+
Code: code,
191+
}).Inc()
192+
193+
return resp, err
194+
}),
195+
)
160196
serving.RegisterServingServiceServer(grpcServer, ser)
161197
healthService := health.NewServer()
162198
grpc_health_v1.RegisterHealthServer(grpcServer, healthService)
163199

200+
go func() {
201+
metricsPort := 9090
202+
log.Info().Msgf("Starting metrics server on port %d", metricsPort)
203+
http.Handle("/metrics", promhttp.Handler())
204+
if err := http.ListenAndServe(fmt.Sprintf(":%d", metricsPort), nil); err != nil {
205+
log.Error().Err(err).Msg("Failed to start metrics server")
206+
}
207+
}()
208+
164209
stop := make(chan os.Signal, 1)
165210
signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)
166211

0 commit comments

Comments
 (0)