forked from coder/coder
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwebsocket.go
More file actions
139 lines (123 loc) · 3.93 KB
/
websocket.go
File metadata and controls
139 lines (123 loc) · 3.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package httpapi
import (
"context"
"errors"
"net"
"time"
"golang.org/x/xerrors"
"cdr.dev/slog/v3"
"github.com/coder/quartz"
"github.com/coder/websocket"
)
const HeartbeatInterval time.Duration = 15 * time.Second
// ProbeResult classifies the outcome of a single WebSocket liveness
// probe so that callers (typically a Prometheus recorder) can track
// successes and the various failure modes independently.
type ProbeResult string
const (
ProbeOK ProbeResult = "ok"
ProbeTimeout ProbeResult = "timeout"
ProbePeerClosed ProbeResult = "peer_closed"
ProbeCanceled ProbeResult = "canceled"
ProbeError ProbeResult = "error"
)
// ProbeRecorder is called once per liveness probe with its outcome.
// It may be nil, in which case probes are still run but not recorded.
type ProbeRecorder func(ctx context.Context, result ProbeResult)
// PingCloser is the minimal interface for WebSocket liveness probing.
// *websocket.Conn satisfies this interface.
type PingCloser interface {
Ping(ctx context.Context) error
Close(code websocket.StatusCode, reason string) error
}
// WSWatcher supervises WebSocket connections for liveness by
// periodically sending ping frames. On probe failure, the watcher
// closes the connection with StatusGoingAway and cancels the
// returned context; the caller owns closing the connection on
// normal teardown.
type WSWatcher struct {
rec ProbeRecorder
clk quartz.Clock
interval time.Duration
}
// NewWSWatcher creates a WSWatcher. Pass nil for rec when no
// recording is needed (e.g. agent-side code without a Prometheus
// registry).
func NewWSWatcher(clk quartz.Clock, rec ProbeRecorder) *WSWatcher {
return &WSWatcher{
rec: rec,
clk: clk,
interval: HeartbeatInterval,
}
}
// Watch supervises conn for liveness. The returned context is
// canceled when parent is canceled or when conn fails a probe.
// Watch closes conn on probe failure with StatusGoingAway; the
// caller owns close on normal teardown.
func (w *WSWatcher) Watch(parent context.Context, log slog.Logger, conn PingCloser) context.Context {
if w == nil {
panic("developer error: WSWatcher is nil")
}
ctx, cancel := context.WithCancel(parent)
go func() {
defer cancel()
w.supervise(ctx, log, conn)
}()
return ctx
}
func (w *WSWatcher) supervise(ctx context.Context, log slog.Logger, conn PingCloser) {
ticker := w.clk.NewTicker(w.interval, "WSWatcher")
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}
result, err := probe(ctx, conn, w.interval)
if w.rec != nil {
w.rec(ctx, result)
}
if result == ProbeOK {
continue
}
if result == ProbeError {
log.Error(ctx, "websocket probe failed", slog.Error(err))
} else {
log.Debug(ctx, "websocket probe stopped",
slog.F("result", string(result)), slog.Error(err))
}
_ = conn.Close(websocket.StatusGoingAway, "liveness probe failed")
return
}
}
func probe(ctx context.Context, conn PingCloser, timeout time.Duration) (ProbeResult, error) {
pingCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
err := conn.Ping(pingCtx)
switch {
case err == nil:
return ProbeOK, nil
case errors.Is(err, context.Canceled):
return ProbeCanceled, err
case errors.Is(err, context.DeadlineExceeded):
return ProbeTimeout, err
case errors.Is(err, net.ErrClosed) || websocket.CloseStatus(err) != -1:
return ProbePeerClosed, err
default:
return ProbeError, xerrors.Errorf("ping: %w", err)
}
}
// HeartbeatClose is a legacy helper that pings conn in a loop and
// calls exit on failure. Callers that need metric recording should
// use WSWatcher directly.
func HeartbeatClose(ctx context.Context, logger slog.Logger, exit func(), conn *websocket.Conn) {
w := NewWSWatcher(quartz.NewReal(), nil)
watchCtx := w.Watch(ctx, logger, conn)
<-watchCtx.Done()
// Only call exit when the probe failed; if the parent context was
// canceled the caller is already shutting down.
if ctx.Err() == nil {
exit()
}
}