forked from coder/coder
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwebhook.go
More file actions
130 lines (112 loc) · 4.36 KB
/
webhook.go
File metadata and controls
130 lines (112 loc) · 4.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package dispatch
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"text/template"
"github.com/google/uuid"
"golang.org/x/xerrors"
"cdr.dev/slog/v3"
"github.com/coder/coder/v2/coderd/notifications/types"
markdown "github.com/coder/coder/v2/coderd/render"
"github.com/coder/coder/v2/codersdk"
)
// WebhookHandler dispatches notification messages via an HTTP POST webhook.
type WebhookHandler struct {
cfg codersdk.NotificationsWebhookConfig
log slog.Logger
cl *http.Client
}
// WebhookPayload describes the JSON payload to be delivered to the configured webhook endpoint.
type WebhookPayload struct {
Version string `json:"_version"`
MsgID uuid.UUID `json:"msg_id"`
Payload types.MessagePayload `json:"payload"`
Title string `json:"title"`
TitleMarkdown string `json:"title_markdown"`
Body string `json:"body"`
BodyMarkdown string `json:"body_markdown"`
}
func NewWebhookHandler(cfg codersdk.NotificationsWebhookConfig, log slog.Logger) *WebhookHandler {
// Create a new transport in favor of reusing the default, since other http clients may interfere.
// http.Transport maintains its own connection pool, and we want to avoid cross-contamination.
var rt http.RoundTripper
def := http.DefaultTransport
t, ok := def.(*http.Transport)
if !ok {
// The API has changed (very unlikely), so let's use the default transport (previous behavior) and log.
log.Warn(context.Background(), "failed to clone default HTTP transport, unexpected type", slog.F("type", fmt.Sprintf("%T", def)))
rt = def
} else {
// Clone the transport's exported fields, but not its connection pool.
rt = t.Clone()
}
return &WebhookHandler{cfg: cfg, log: log, cl: &http.Client{Transport: rt}}
}
func (w *WebhookHandler) Dispatcher(payload types.MessagePayload, titleMarkdown, bodyMarkdown string, _ template.FuncMap) (DeliveryFunc, error) {
if w.cfg.Endpoint.String() == "" {
return nil, xerrors.New("webhook endpoint not defined")
}
titlePlaintext, err := markdown.PlaintextFromMarkdown(titleMarkdown)
if err != nil {
return nil, xerrors.Errorf("render title: %w", err)
}
bodyPlaintext, err := markdown.PlaintextFromMarkdown(bodyMarkdown)
if err != nil {
return nil, xerrors.Errorf("render body: %w", err)
}
return w.dispatch(payload, titlePlaintext, titleMarkdown, bodyPlaintext, bodyMarkdown, w.cfg.Endpoint.String()), nil
}
func (w *WebhookHandler) dispatch(msgPayload types.MessagePayload, titlePlaintext, titleMarkdown, bodyPlaintext, bodyMarkdown, endpoint string) DeliveryFunc {
return func(ctx context.Context, msgID uuid.UUID) (retryable bool, err error) {
// Prepare payload.
payload := WebhookPayload{
Version: "1.1",
MsgID: msgID,
Title: titlePlaintext,
TitleMarkdown: titleMarkdown,
Body: bodyPlaintext,
BodyMarkdown: bodyMarkdown,
Payload: msgPayload,
}
m, err := json.Marshal(payload)
if err != nil {
return false, xerrors.Errorf("marshal payload: %v", err)
}
// Prepare request.
// Outer context has a deadline (see CODER_NOTIFICATIONS_DISPATCH_TIMEOUT).
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewBuffer(m))
if err != nil {
return false, xerrors.Errorf("create HTTP request: %v", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Message-Id", msgID.String())
// Send request.
resp, err := w.cl.Do(req)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return true, xerrors.Errorf("request timeout: %w", err)
}
return true, xerrors.Errorf("request failed: %w", err)
}
defer resp.Body.Close()
// Handle response.
if resp.StatusCode/100 > 2 {
// Body could be quite long here, let's grab the first 512B and hope it contains useful debug info.
respBody := make([]byte, 512)
lr := io.LimitReader(resp.Body, int64(len(respBody)))
n, err := lr.Read(respBody)
if err != nil && !errors.Is(err, io.EOF) {
return true, xerrors.Errorf("non-2xx response (%d), read body: %w", resp.StatusCode, err)
}
w.log.Warn(ctx, "unsuccessful delivery", slog.F("status_code", resp.StatusCode),
slog.F("response", string(respBody[:n])), slog.F("msg_id", msgID))
return true, xerrors.Errorf("non-2xx response (%d)", resp.StatusCode)
}
return false, nil
}
}