Skip to content

Commit 4fbb2cc

Browse files
committed
feat(logger): Contextual logger, pt.3
Introduction of a contextual logger takes four commits. This is the fourth commit: Propagate xcontext
1 parent a894206 commit 4fbb2cc

81 files changed

Lines changed: 1027 additions & 959 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

cmds/contest/main.go

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,23 +12,19 @@ import (
1212
"syscall"
1313

1414
"github.com/facebookincubator/contest/cmds/plugins"
15-
1615
"github.com/facebookincubator/contest/pkg/api"
1716
"github.com/facebookincubator/contest/pkg/config"
1817
"github.com/facebookincubator/contest/pkg/jobmanager"
19-
"github.com/facebookincubator/contest/pkg/logging"
2018
"github.com/facebookincubator/contest/pkg/pluginregistry"
2119
"github.com/facebookincubator/contest/pkg/storage"
2220
"github.com/facebookincubator/contest/pkg/target"
23-
21+
"github.com/facebookincubator/contest/pkg/xcontext/bundles/logrusctx"
22+
"github.com/facebookincubator/contest/pkg/xcontext/logger"
2423
"github.com/facebookincubator/contest/plugins/listeners/httplistener"
2524
"github.com/facebookincubator/contest/plugins/storage/memory"
2625
"github.com/facebookincubator/contest/plugins/storage/rdbms"
27-
2826
"github.com/facebookincubator/contest/plugins/targetlocker/dblocker"
2927
"github.com/facebookincubator/contest/plugins/targetlocker/inmemory"
30-
31-
"github.com/sirupsen/logrus"
3228
)
3329

3430
var (
@@ -37,15 +33,20 @@ var (
3733
flagProcessTimeout = flag.Duration("processTimeout", api.DefaultEventTimeout, "API request processing timeout")
3834
flagTargetLocker = flag.String("targetLocker", inmemory.Name, "Target locker implementation to use")
3935
flagInstanceTag = flag.String("instanceTag", "", "A tag for this instance. Server will only operate on jobs with this tag and will add this tag to the jobs it creates.")
36+
flagLogLevel = flag.String("logLevel", "debug", "A log level, possible values: debug, info, warning, error, panic, fatal")
4037
)
4138

4239
func main() {
4340
flag.Parse()
44-
log := logging.GetLogger("contest")
45-
log.Level = logrus.DebugLevel
46-
logging.Debug()
41+
logLevel, err := logger.ParseLogLevel(*flagLogLevel)
42+
if err != nil {
43+
panic(err)
44+
}
45+
46+
ctx := logrusctx.NewContext(logLevel)
47+
log := ctx.Logger()
4748

48-
pluginRegistry := pluginregistry.NewPluginRegistry()
49+
pluginRegistry := pluginregistry.NewPluginRegistry(ctx)
4950

5051
// primary storage initialization
5152
if *flagDBURI != "" {
@@ -62,7 +63,7 @@ func main() {
6263

6364
dbVerPrim, err := s.Version()
6465
if err != nil {
65-
log.Warningf("Could not determine storage version: %v", err)
66+
log.Warnf("Could not determine storage version: %v", err)
6667
} else {
6768
log.Infof("Storage version: %d", dbVerPrim)
6869
}
@@ -82,7 +83,7 @@ func main() {
8283

8384
dbVerRepl, err := r.Version()
8485
if err != nil {
85-
log.Warningf("Could not determine storage version: %v", err)
86+
log.Warnf("Could not determine storage version: %v", err)
8687
} else {
8788
log.Infof("Storage version: %d", dbVerRepl)
8889
}
@@ -91,7 +92,7 @@ func main() {
9192
log.Fatalf("Primary and Replica DB Versions are different: %v and %v", dbVerPrim, dbVerRepl)
9293
}
9394
} else {
94-
log.Warningf("Using in-memory storage")
95+
log.Warnf("Using in-memory storage")
9596
if ms, err := memory.New(); err == nil {
9697
if err := storage.SetStorage(ms); err != nil {
9798
log.Fatalf("Could not set storage: %v", err)
@@ -118,10 +119,10 @@ func main() {
118119
log.Fatalf("Invalid target locker name %q", *flagTargetLocker)
119120
}
120121

121-
plugins.Init(pluginRegistry, log)
122+
plugins.Init(pluginRegistry, ctx.Logger())
122123

123124
// spawn JobManager
124-
listener := httplistener.HTTPListener{}
125+
listener := httplistener.NewHTTPListener()
125126

126127
opts := []jobmanager.Option{
127128
jobmanager.APIOption(api.OptionEventTimeout(*flagProcessTimeout)),
@@ -133,16 +134,16 @@ func main() {
133134
opts = append(opts, jobmanager.OptionInstanceTag(*flagInstanceTag))
134135
}
135136

136-
jm, err := jobmanager.New(&listener, pluginRegistry, opts...)
137+
jm, err := jobmanager.New(listener, pluginRegistry, opts...)
137138
if err != nil {
138-
log.Fatal(err)
139+
log.Fatalf("%v", err)
139140
}
140-
log.Printf("JobManager %+v", jm)
141+
log.Debugf("JobManager %+v", jm)
141142

142143
sigs := make(chan os.Signal, 1)
143144
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR1)
144145

145-
if err := jm.Start(sigs); err != nil {
146-
log.Fatal(err)
146+
if err := jm.Start(ctx, sigs); err != nil {
147+
log.Fatalf("%v", err)
147148
}
148149
}

cmds/plugins/plugins.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/facebookincubator/contest/pkg/pluginregistry"
1212
"github.com/facebookincubator/contest/pkg/userfunctions/donothing"
1313
"github.com/facebookincubator/contest/pkg/userfunctions/ocp"
14+
"github.com/facebookincubator/contest/pkg/xcontext"
1415

1516
"github.com/facebookincubator/contest/pkg/job"
1617
"github.com/facebookincubator/contest/pkg/target"
@@ -27,7 +28,6 @@ import (
2728
"github.com/facebookincubator/contest/plugins/teststeps/example"
2829
"github.com/facebookincubator/contest/plugins/teststeps/randecho"
2930
"github.com/facebookincubator/contest/plugins/teststeps/sshcmd"
30-
"github.com/sirupsen/logrus"
3131
)
3232

3333
var targetManagers = []target.TargetManagerLoader{
@@ -61,33 +61,34 @@ var userFunctions = []map[string]interface{}{
6161
var testInitOnce sync.Once
6262

6363
// Init initializes the plugin registry
64-
func Init(pluginRegistry *pluginregistry.PluginRegistry, log *logrus.Entry) {
64+
func Init(pluginRegistry *pluginregistry.PluginRegistry, log xcontext.Logger) {
65+
6566
// Register TargetManager plugins
6667
for _, tmloader := range targetManagers {
6768
if err := pluginRegistry.RegisterTargetManager(tmloader()); err != nil {
68-
log.Fatal(err)
69+
log.Fatalf("%v", err)
6970
}
7071
}
7172

7273
// Register TestFetcher plugins
7374
for _, tfloader := range testFetchers {
7475
if err := pluginRegistry.RegisterTestFetcher(tfloader()); err != nil {
75-
log.Fatal(err)
76+
log.Fatalf("%v", err)
7677
}
7778
}
7879

7980
// Register TestStep plugins
8081
for _, tsloader := range testSteps {
8182
if err := pluginRegistry.RegisterTestStep(tsloader()); err != nil {
82-
log.Fatal(err)
83+
log.Fatalf("%v", err)
8384

8485
}
8586
}
8687

8788
// Register Reporter plugins
8889
for _, rfloader := range reporters {
8990
if err := pluginRegistry.RegisterReporter(rfloader()); err != nil {
90-
log.Fatal(err)
91+
log.Fatalf("%v", err)
9192
}
9293
}
9394

@@ -96,7 +97,7 @@ func Init(pluginRegistry *pluginregistry.PluginRegistry, log *logrus.Entry) {
9697
for _, userFunction := range userFunctions {
9798
for name, fn := range userFunction {
9899
if err := test.RegisterFunction(name, fn); err != nil {
99-
log.Fatal(err)
100+
log.Fatalf("%v", err)
100101
}
101102
}
102103
}

db/rdbms/migration/0002_migrate_descriptor_to_extended_descriptor.go

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,11 @@ import (
1515
"github.com/facebookincubator/contest/pkg/job"
1616
"github.com/facebookincubator/contest/pkg/test"
1717
"github.com/facebookincubator/contest/pkg/types"
18+
"github.com/facebookincubator/contest/pkg/xcontext"
1819
"github.com/facebookincubator/contest/tools/migration/rdbms/migrate"
1920

2021
"github.com/facebookincubator/contest/cmds/plugins"
2122
"github.com/facebookincubator/contest/pkg/pluginregistry"
22-
23-
"github.com/sirupsen/logrus"
2423
)
2524

2625
const shardSize = uint64(5000)
@@ -81,7 +80,7 @@ type Request struct {
8180
// so that all the test information can be re-fetched by reading extended_descriptor field in
8281
// jobs table, without any dependency after submission time on the test fetcher.
8382
type DescriptorMigration struct {
84-
log *logrus.Entry
83+
Context xcontext.Context
8584
}
8685

8786
type dbConn interface {
@@ -99,7 +98,8 @@ func ms(d time.Duration) float64 {
9998
}
10099

101100
// fetchJobs fetches job requests based on limit and offset
102-
func (m *DescriptorMigration) fetchJobs(db dbConn, limit, offset uint64, log *logrus.Entry) ([]Request, error) {
101+
func (m *DescriptorMigration) fetchJobs(db dbConn, limit, offset uint64) ([]Request, error) {
102+
log := m.Context.Logger()
103103

104104
log.Debugf("fetching shard limit: %d, offset: %d", limit, offset)
105105
selectStatement := "select job_id, name, requestor, server_id, request_time, descriptor, teststeps from jobs limit ? offset ?"
@@ -158,7 +158,8 @@ func (m *DescriptorMigration) fetchJobs(db dbConn, limit, offset uint64, log *lo
158158
return jobs, nil
159159
}
160160

161-
func (m *DescriptorMigration) migrateJobs(db dbConn, requests []Request, registry *pluginregistry.PluginRegistry, log *logrus.Entry) error {
161+
func (m *DescriptorMigration) migrateJobs(db dbConn, requests []Request, registry *pluginregistry.PluginRegistry) error {
162+
log := m.Context.Logger()
162163

163164
log.Debugf("migrating %d jobs", len(requests))
164165
start := time.Now()
@@ -228,12 +229,12 @@ func (m *DescriptorMigration) migrateJobs(db dbConn, requests []Request, registr
228229
// TestFetcher accordingly and retrieve the name of the test
229230
td := jobDesc.TestDescriptors[index]
230231

231-
tfb, err := registry.NewTestFetcherBundle(td)
232+
tfb, err := registry.NewTestFetcherBundle(m.Context, td)
232233
if err != nil {
233234
return fmt.Errorf("could not instantiate test fetcher for jobID %d based on descriptor %+v: %w", request.JobID, td, err)
234235
}
235236

236-
name, stepDescFetched, err := tfb.TestFetcher.Fetch(tfb.FetchParameters)
237+
name, stepDescFetched, err := tfb.TestFetcher.Fetch(m.Context, tfb.FetchParameters)
237238
if err != nil {
238239
return fmt.Errorf("could not retrieve test description from fetcher for jobID %d: %w", request.JobID, err)
239240
}
@@ -243,16 +244,16 @@ func (m *DescriptorMigration) migrateJobs(db dbConn, requests []Request, registr
243244
/// might have changed.We go ahead anyway assuming assume the test name is still relevant.
244245
stepDescFetchedJSON, err := json.Marshal(stepDescFetched)
245246
if err != nil {
246-
log.Warningf("steps description (`%v`) fetched by test fetcher for job %d cannot be serialized: %v", stepDescFetched, request.JobID, err)
247+
log.Warnf("steps description (`%v`) fetched by test fetcher for job %d cannot be serialized: %v", stepDescFetched, request.JobID, err)
247248
}
248249

249250
stepDescDBJSON, err := json.Marshal(stepDesc)
250251
if err != nil {
251-
log.Warningf("steps description (`%v`) fetched from db for job %d cannot be serialized: %v", stepDesc, request.JobID, err)
252+
log.Warnf("steps description (`%v`) fetched from db for job %d cannot be serialized: %v", stepDesc, request.JobID, err)
252253
}
253254

254255
if string(stepDescDBJSON) != string(stepDescFetchedJSON) {
255-
log.Warningf("steps retrieved by test fetcher and from database do not match (`%v` != `%v`), test description might have changed", string(stepDescDBJSON), string(stepDescFetchedJSON))
256+
log.Warnf("steps retrieved by test fetcher and from database do not match (`%v` != `%v`), test description might have changed", string(stepDescDBJSON), string(stepDescFetchedJSON))
256257
}
257258

258259
newStepsDesc.TestName = name
@@ -343,7 +344,8 @@ func (m *DescriptorMigration) up(db dbConn) error {
343344
// (see https://github.com/lib/pq/issues/81)
344345

345346
count := uint64(0)
346-
m.log.Debugf("counting the number of jobs to migrate")
347+
ctx := m.Context
348+
ctx.Logger().Debugf("counting the number of jobs to migrate")
347349
start := time.Now()
348350
rows, err := db.Query("select count(*) from jobs")
349351
if err != nil {
@@ -360,34 +362,36 @@ func (m *DescriptorMigration) up(db dbConn) error {
360362
return fmt.Errorf("could not fetch number of records to migrate: %w", err)
361363
}
362364
if err := rows.Close(); err != nil {
363-
m.log.Warningf("could not close rows after count(*) query")
365+
ctx.Logger().Warnf("could not close rows after count(*) query")
364366
}
365367

366368
// Create a new plugin registry. This is necessary because some information that need to be
367369
// associated with the extended_descriptor is not available in the db and can only be looked
368370
// up via the TestFetcher.
369-
registry := pluginregistry.NewPluginRegistry()
370-
plugins.Init(registry, m.log)
371+
registry := pluginregistry.NewPluginRegistry(ctx)
372+
plugins.Init(registry, ctx.Logger())
371373

372374
elapsed := time.Since(start)
373-
m.log.Debugf("total number of jobs to migrate: %d, fetched in %.3f ms", count, ms(elapsed))
375+
ctx.Logger().Debugf("total number of jobs to migrate: %d, fetched in %.3f ms", count, ms(elapsed))
374376
for offset := uint64(0); offset < count; offset += shardSize {
375-
jobs, err := m.fetchJobs(db, shardSize, offset, m.log)
377+
jobs, err := m.fetchJobs(db, shardSize, offset)
376378
if err != nil {
377379
return fmt.Errorf("could not fetch events in range offset %d limit %d: %w", offset, shardSize, err)
378380
}
379-
err = m.migrateJobs(db, jobs, registry, m.log)
381+
err = m.migrateJobs(db, jobs, registry)
380382
if err != nil {
381383
return fmt.Errorf("could not migrate events in range offset %d limit %d: %w", offset, shardSize, err)
382384
}
383-
m.log.Infof("migrated %d/%d", offset, count)
385+
ctx.Logger().Infof("migrated %d/%d", offset, count)
384386
}
385387
return nil
386388
}
387389

388390
// NewDescriptorMigration is the factory for DescriptorMigration
389-
func NewDescriptorMigration(log *logrus.Entry) migrate.Migrate {
390-
return &DescriptorMigration{log: log}
391+
func NewDescriptorMigration(ctx xcontext.Context) migrate.Migrate {
392+
return &DescriptorMigration{
393+
Context: ctx,
394+
}
391395
}
392396

393397
// register NewDescriptorMigration at initialization time

pkg/api/api.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/facebookincubator/contest/pkg/job"
1515
"github.com/facebookincubator/contest/pkg/storage/limits"
1616
"github.com/facebookincubator/contest/pkg/types"
17+
"github.com/facebookincubator/contest/pkg/xcontext"
1718
)
1819

1920
// CurrentAPIVersion is the current version of the API that the clients must be
@@ -161,9 +162,12 @@ func (a *API) SendReceiveEvent(ev *Event, timeout *time.Duration) (*EventRespons
161162
// operations via the API, e.g. getting the job status or stopping it.
162163
// This method should return an error if the job description is malformed or
163164
// invalid, and if the API version is incompatible.
164-
func (a *API) Start(requestor EventRequestor, jobDescriptor string) (Response, error) {
165+
func (a *API) Start(ctx xcontext.Context, requestor EventRequestor, jobDescriptor string) (Response, error) {
165166
resp := a.newResponse(ResponseTypeStart)
166167
ev := &Event{
168+
// To allow jobs to finish we do not allow passing cancel and pause
169+
// signals to the job's context (therefore: xcontext.WithResetSignalers).
170+
Context: xcontext.WithResetSignalers(ctx).WithTag("api_method", "start"),
167171
Type: EventTypeStart,
168172
ServerID: resp.ServerID,
169173
Msg: EventStartMsg{
@@ -184,9 +188,10 @@ func (a *API) Start(requestor EventRequestor, jobDescriptor string) (Response, e
184188
}
185189

186190
// Stop requests a job cancellation by the given job ID.
187-
func (a *API) Stop(requestor EventRequestor, jobID types.JobID) (Response, error) {
191+
func (a *API) Stop(ctx xcontext.Context, requestor EventRequestor, jobID types.JobID) (Response, error) {
188192
resp := a.newResponse(ResponseTypeStop)
189193
ev := &Event{
194+
Context: ctx.WithTag("api_method", "stop"),
190195
Type: EventTypeStop,
191196
ServerID: resp.ServerID,
192197
Msg: EventStopMsg{
@@ -206,9 +211,10 @@ func (a *API) Stop(requestor EventRequestor, jobID types.JobID) (Response, error
206211

207212
// Status polls the status of a job by its ID, and returns a contest.Status
208213
//object
209-
func (a *API) Status(requestor EventRequestor, jobID types.JobID) (Response, error) {
214+
func (a *API) Status(ctx xcontext.Context, requestor EventRequestor, jobID types.JobID) (Response, error) {
210215
resp := a.newResponse(ResponseTypeStatus)
211216
ev := &Event{
217+
Context: ctx.WithTag("api_method", "status"),
212218
Type: EventTypeStatus,
213219
ServerID: resp.ServerID,
214220
Msg: EventStatusMsg{
@@ -230,9 +236,10 @@ func (a *API) Status(requestor EventRequestor, jobID types.JobID) (Response, err
230236

231237
// Retry will retry a job identified by its ID, using the same job
232238
// description. If the job is still running, an error is returned.
233-
func (a *API) Retry(requestor EventRequestor, jobID types.JobID) (Response, error) {
239+
func (a *API) Retry(ctx xcontext.Context, requestor EventRequestor, jobID types.JobID) (Response, error) {
234240
resp := a.newResponse(ResponseTypeRetry)
235241
ev := &Event{
242+
Context: ctx.WithTag("api_method", "retry"),
236243
Type: EventTypeRetry,
237244
ServerID: resp.ServerID,
238245
Msg: EventRetryMsg{
@@ -256,9 +263,10 @@ func (a *API) Retry(requestor EventRequestor, jobID types.JobID) (Response, erro
256263
}
257264

258265
// List will list jobs matching the specified criteria.
259-
func (a *API) List(requestor EventRequestor, states []job.State, tags []string) (Response, error) {
266+
func (a *API) List(ctx xcontext.Context, requestor EventRequestor, states []job.State, tags []string) (Response, error) {
260267
resp := a.newResponse(ResponseTypeList)
261268
ev := &Event{
269+
Context: ctx.WithTag("api_method", "list"),
262270
Type: EventTypeList,
263271
ServerID: resp.ServerID,
264272
Msg: EventListMsg{

0 commit comments

Comments
 (0)