-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathapi.go
More file actions
297 lines (276 loc) · 9.28 KB
/
api.go
File metadata and controls
297 lines (276 loc) · 9.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
// Copyright (c) Facebook, Inc. and its affiliates.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
package api
import (
"context"
"errors"
"fmt"
"os"
"time"
"github.com/facebookincubator/go-belt/beltctx"
"github.com/linuxboot/contest/pkg/signaling"
"github.com/linuxboot/contest/pkg/signals"
"github.com/linuxboot/contest/pkg/storage"
"github.com/linuxboot/contest/pkg/storage/limits"
"github.com/linuxboot/contest/pkg/types"
)
// CurrentAPIVersion is the current version of the API that the clients must be
// able to speak in order to communicate with the server. Versioning starts at
// 1, while 0 is to be considered an error indicator.
const CurrentAPIVersion uint32 = 5
// DefaultEventTimeout is the default time to wait for sending or receiving an
// event on the events channel.
var DefaultEventTimeout = 3 * time.Second
// ServerIDFunc is used to return a custom server ID in api responses.
type ServerIDFunc func() string
// The API structure implements the communication between clients and the
// JobManager. It enables several operations like starting, stopping,
// retrying a job, and getting a job status.
type API struct {
// Config is a set of knobs to change the behavior of API processing.
Config Config
// Events channel is used to route API events between clients and the
// JobManager. It is not necessary to close it explicitly as it will be
// garbage-collected when the API structure in the client goes out of scope.
Events chan *Event
// serverID is used by ServerID() to return a custom server ID in API
// responses.
serverID string
}
// New returns an initialized instance of an API struct with the specified
// server ID generation function.
func New(opts ...Option) (*API, error) {
cfg := getConfig(opts...)
serverID, err := obtainServerID(cfg.ServerIDFunc)
if err != nil {
return nil, fmt.Errorf("Cannot create API instance: %w", err)
}
return &API{
Config: cfg,
Events: make(chan *Event),
serverID: serverID,
}, nil
}
func obtainServerID(serverIDFunc func() string) (string, error) {
serverID := "<unknown>"
if serverIDFunc != nil {
serverID = serverIDFunc()
} else {
if hn, err := os.Hostname(); err == nil {
serverID = hn
}
}
if err := limits.NewValidator().ValidateServerID(serverID); err != nil {
return "", err
}
return serverID, nil
}
// ServerID returns the Server ID to be used in responses. A custom server ID
// generation function can be passed to New().
func (a API) ServerID() string {
return a.serverID
}
// newResponse returns a new Response object with type and server ID set. The
// Data field has to be set by the user.
func (a API) newResponse(rtype ResponseType) Response {
return Response{
Type: rtype,
ServerID: a.ServerID(),
}
}
// Version returns the version of the API. It's the client's responsibility
// to check whether it can talk the right API. If the client speaks an
// incompatible version of the API that the server doesn't understand, it's
// the server's responsibility to return an error upon API calls.
func (a API) Version() Response {
// NOTE: backward-compatibility should be handled by a proxy endpoint that
// speaks the same API, and will detect the version and redirect to the
// appropriate backend. This will simplify the way migrations are carried
// over.
resp := a.newResponse(ResponseTypeVersion)
resp.Data = ResponseDataVersion{
Version: CurrentAPIVersion,
}
return resp
}
// SendEvent sends an Event object on the event channel, without waiting for a
// reply. If the send doesn't complete within the timeout, an error is returned.
func (a *API) SendEvent(ev *Event, timeout *time.Duration) error {
if ev.Msg.Requestor() == "" {
return errors.New("requestor cannot be empty")
}
if err := limits.NewValidator().ValidateRequestorName(string(ev.Msg.Requestor())); err != nil {
return err
}
to := a.Config.EventTimeout
if timeout != nil {
to = *timeout
}
select {
case a.Events <- ev:
return nil
case <-time.After(to):
return fmt.Errorf("sending event timed out after %v", to)
}
}
// SendReceiveEvent sends an Event object on the event channel, and waits for a reply
// from the consumer. The timeout is used once for the send, and once for the
// receive, it's not a cumulative timeout.
func (a *API) SendReceiveEvent(ev *Event, timeout *time.Duration) (*EventResponse, error) {
to := a.Config.EventTimeout
if timeout != nil {
to = *timeout
}
// send
if err := a.SendEvent(ev, &to); err != nil {
return nil, err
}
// receive
var resp *EventResponse
select {
case resp = <-ev.RespCh:
return resp, nil
case <-time.After(to):
return nil, fmt.Errorf("time out waiting for response after %v", to)
}
}
// Start requests to create a new test job, as described by the job descriptor.
// A job descriptor may contain multiple tests, which will be run sequentially,
// not in parallel. If you need parallelism, you need to submit multiple
// independent jobs. If you need coordination across jobs, you need to write
// your own synchronization plugins that use external means (e.g. the events
// API), but no inter-job synchronization is implemented in the framework
// itself. This is intentional, to avoid overcomplicating the orchestration
// for a few edge cases.
// Each job descriptor must be JSON-encoded, and will be deserialized in a
// `contest.JobDescriptor` object by the JobManager.
// This method must return a unique job ID, that can be used for various
// operations via the API, e.g. getting the job status or stopping it.
// This method should return an error if the job description is malformed or
// invalid, and if the API version is incompatible.
func (a *API) Start(ctx context.Context, requestor EventRequestor, jobDescriptor string) (Response, error) {
resp := a.newResponse(ResponseTypeStart)
// To allow jobs to finish we do not allow passing cancel and pause
// signals to the job's context. Therefore we use a fresh context
// (without any cancels and signalings) and just passthrough its
// observability belt.
ctx = newValuesProxyContext(ctx) // ignore the cancel and deadline signals
ctx, _ = signaling.WithSignal(ctx, signals.Paused) // ignore the pause signal
ctx = beltctx.WithField(ctx, "api_method", "start")
ev := &Event{
Context: ctx,
Type: EventTypeStart,
ServerID: resp.ServerID,
Msg: EventStartMsg{
requestor: requestor,
JobDescriptor: jobDescriptor,
},
RespCh: make(chan *EventResponse, 1),
}
respEv, err := a.SendReceiveEvent(ev, nil)
if err != nil {
return resp, err
}
resp.Data = ResponseDataStart{
JobID: respEv.JobID,
}
resp.Err = respEv.Err
return resp, nil
}
// Stop requests a job cancellation by the given job ID.
func (a *API) Stop(ctx context.Context, requestor EventRequestor, jobID types.JobID) (Response, error) {
resp := a.newResponse(ResponseTypeStop)
ev := &Event{
Context: beltctx.WithField(ctx, "api_method", "stop"),
Type: EventTypeStop,
ServerID: resp.ServerID,
Msg: EventStopMsg{
requestor: requestor,
JobID: jobID,
},
RespCh: make(chan *EventResponse, 1),
}
respEv, err := a.SendReceiveEvent(ev, nil)
if err != nil {
return resp, err
}
resp.Data = ResponseDataStop{}
resp.Err = respEv.Err
return resp, nil
}
// Status polls the status of a job by its ID, and returns a contest.Status
// object
func (a *API) Status(ctx context.Context, requestor EventRequestor, jobID types.JobID) (Response, error) {
resp := a.newResponse(ResponseTypeStatus)
ev := &Event{
Context: beltctx.WithField(ctx, "api_method", "status"),
Type: EventTypeStatus,
ServerID: resp.ServerID,
Msg: EventStatusMsg{
requestor: requestor,
JobID: jobID,
},
RespCh: make(chan *EventResponse, 1),
}
respEv, err := a.SendReceiveEvent(ev, nil)
if err != nil {
return resp, err
}
resp.Data = ResponseDataStatus{
Status: respEv.Status,
}
resp.Err = respEv.Err
return resp, nil
}
// Retry will retry a job identified by its ID, using the same job
// description. If the job is still running, an error is returned.
func (a *API) Retry(ctx context.Context, requestor EventRequestor, jobID types.JobID) (Response, error) {
resp := a.newResponse(ResponseTypeRetry)
ev := &Event{
Context: beltctx.WithField(ctx, "api_method", "retry"),
Type: EventTypeRetry,
ServerID: resp.ServerID,
Msg: EventRetryMsg{
requestor: requestor,
JobID: jobID,
},
RespCh: make(chan *EventResponse, 1),
}
respEv, err := a.SendReceiveEvent(ev, nil)
if err != nil {
return resp, err
}
resp.Data = ResponseDataRetry{
// this is the job ID of the job to retry, not the new job ID
JobID: jobID,
// TODO this should set the new Job ID
// NewJobID: ...
}
resp.Err = respEv.Err
return resp, nil
}
// List will list jobs matching the specified criteria.
func (a *API) List(ctx context.Context, requestor EventRequestor, query *storage.JobQuery) (Response, error) {
resp := a.newResponse(ResponseTypeList)
ev := &Event{
Context: beltctx.WithField(ctx, "api_method", "list"),
Type: EventTypeList,
ServerID: resp.ServerID,
Msg: EventListMsg{
requestor: requestor,
Query: query,
},
RespCh: make(chan *EventResponse, 1),
}
respEv, err := a.SendReceiveEvent(ev, nil)
if err != nil {
return resp, err
}
resp.Data = ResponseDataList{
JobIDs: respEv.JobIDs,
}
resp.Err = respEv.Err
return resp, nil
}