@@ -15,12 +15,11 @@ import (
1515 "github.com/facebookincubator/contest/pkg/job"
1616 "github.com/facebookincubator/contest/pkg/test"
1717 "github.com/facebookincubator/contest/pkg/types"
18+ "github.com/facebookincubator/contest/pkg/xcontext"
1819 "github.com/facebookincubator/contest/tools/migration/rdbms/migrate"
1920
2021 "github.com/facebookincubator/contest/cmds/plugins"
2122 "github.com/facebookincubator/contest/pkg/pluginregistry"
22-
23- "github.com/sirupsen/logrus"
2423)
2524
2625const shardSize = uint64 (5000 )
@@ -81,7 +80,7 @@ type Request struct {
8180// so that all the test information can be re-fetched by reading extended_descriptor field in
8281// jobs table, without any dependency after submission time on the test fetcher.
8382type DescriptorMigration struct {
84- log * logrus. Entry
83+ Context xcontext. Context
8584}
8685
8786type dbConn interface {
@@ -99,7 +98,8 @@ func ms(d time.Duration) float64 {
9998}
10099
101100// fetchJobs fetches job requests based on limit and offset
102- func (m * DescriptorMigration ) fetchJobs (db dbConn , limit , offset uint64 , log * logrus.Entry ) ([]Request , error ) {
101+ func (m * DescriptorMigration ) fetchJobs (db dbConn , limit , offset uint64 ) ([]Request , error ) {
102+ log := m .Context .Logger ()
103103
104104 log .Debugf ("fetching shard limit: %d, offset: %d" , limit , offset )
105105 selectStatement := "select job_id, name, requestor, server_id, request_time, descriptor, teststeps from jobs limit ? offset ?"
@@ -158,7 +158,8 @@ func (m *DescriptorMigration) fetchJobs(db dbConn, limit, offset uint64, log *lo
158158 return jobs , nil
159159}
160160
161- func (m * DescriptorMigration ) migrateJobs (db dbConn , requests []Request , registry * pluginregistry.PluginRegistry , log * logrus.Entry ) error {
161+ func (m * DescriptorMigration ) migrateJobs (db dbConn , requests []Request , registry * pluginregistry.PluginRegistry ) error {
162+ log := m .Context .Logger ()
162163
163164 log .Debugf ("migrating %d jobs" , len (requests ))
164165 start := time .Now ()
@@ -228,12 +229,12 @@ func (m *DescriptorMigration) migrateJobs(db dbConn, requests []Request, registr
228229 // TestFetcher accordingly and retrieve the name of the test
229230 td := jobDesc .TestDescriptors [index ]
230231
231- tfb , err := registry .NewTestFetcherBundle (td )
232+ tfb , err := registry .NewTestFetcherBundle (m . Context , td )
232233 if err != nil {
233234 return fmt .Errorf ("could not instantiate test fetcher for jobID %d based on descriptor %+v: %w" , request .JobID , td , err )
234235 }
235236
236- name , stepDescFetched , err := tfb .TestFetcher .Fetch (tfb .FetchParameters )
237+ name , stepDescFetched , err := tfb .TestFetcher .Fetch (m . Context , tfb .FetchParameters )
237238 if err != nil {
238239 return fmt .Errorf ("could not retrieve test description from fetcher for jobID %d: %w" , request .JobID , err )
239240 }
@@ -243,16 +244,16 @@ func (m *DescriptorMigration) migrateJobs(db dbConn, requests []Request, registr
243244 /// might have changed.We go ahead anyway assuming assume the test name is still relevant.
244245 stepDescFetchedJSON , err := json .Marshal (stepDescFetched )
245246 if err != nil {
246- log .Warningf ("steps description (`%v`) fetched by test fetcher for job %d cannot be serialized: %v" , stepDescFetched , request .JobID , err )
247+ log .Warnf ("steps description (`%v`) fetched by test fetcher for job %d cannot be serialized: %v" , stepDescFetched , request .JobID , err )
247248 }
248249
249250 stepDescDBJSON , err := json .Marshal (stepDesc )
250251 if err != nil {
251- log .Warningf ("steps description (`%v`) fetched from db for job %d cannot be serialized: %v" , stepDesc , request .JobID , err )
252+ log .Warnf ("steps description (`%v`) fetched from db for job %d cannot be serialized: %v" , stepDesc , request .JobID , err )
252253 }
253254
254255 if string (stepDescDBJSON ) != string (stepDescFetchedJSON ) {
255- log .Warningf ("steps retrieved by test fetcher and from database do not match (`%v` != `%v`), test description might have changed" , string (stepDescDBJSON ), string (stepDescFetchedJSON ))
256+ log .Warnf ("steps retrieved by test fetcher and from database do not match (`%v` != `%v`), test description might have changed" , string (stepDescDBJSON ), string (stepDescFetchedJSON ))
256257 }
257258
258259 newStepsDesc .TestName = name
@@ -343,7 +344,8 @@ func (m *DescriptorMigration) up(db dbConn) error {
343344 // (see https://github.com/lib/pq/issues/81)
344345
345346 count := uint64 (0 )
346- m .log .Debugf ("counting the number of jobs to migrate" )
347+ ctx := m .Context
348+ ctx .Logger ().Debugf ("counting the number of jobs to migrate" )
347349 start := time .Now ()
348350 rows , err := db .Query ("select count(*) from jobs" )
349351 if err != nil {
@@ -360,34 +362,36 @@ func (m *DescriptorMigration) up(db dbConn) error {
360362 return fmt .Errorf ("could not fetch number of records to migrate: %w" , err )
361363 }
362364 if err := rows .Close (); err != nil {
363- m . log . Warningf ("could not close rows after count(*) query" )
365+ ctx . Logger (). Warnf ("could not close rows after count(*) query" )
364366 }
365367
366368 // Create a new plugin registry. This is necessary because some information that need to be
367369 // associated with the extended_descriptor is not available in the db and can only be looked
368370 // up via the TestFetcher.
369- registry := pluginregistry .NewPluginRegistry ()
370- plugins .Init (registry , m . log )
371+ registry := pluginregistry .NewPluginRegistry (ctx )
372+ plugins .Init (registry , ctx . Logger () )
371373
372374 elapsed := time .Since (start )
373- m . log .Debugf ("total number of jobs to migrate: %d, fetched in %.3f ms" , count , ms (elapsed ))
375+ ctx . Logger () .Debugf ("total number of jobs to migrate: %d, fetched in %.3f ms" , count , ms (elapsed ))
374376 for offset := uint64 (0 ); offset < count ; offset += shardSize {
375- jobs , err := m .fetchJobs (db , shardSize , offset , m . log )
377+ jobs , err := m .fetchJobs (db , shardSize , offset )
376378 if err != nil {
377379 return fmt .Errorf ("could not fetch events in range offset %d limit %d: %w" , offset , shardSize , err )
378380 }
379- err = m .migrateJobs (db , jobs , registry , m . log )
381+ err = m .migrateJobs (db , jobs , registry )
380382 if err != nil {
381383 return fmt .Errorf ("could not migrate events in range offset %d limit %d: %w" , offset , shardSize , err )
382384 }
383- m . log .Infof ("migrated %d/%d" , offset , count )
385+ ctx . Logger () .Infof ("migrated %d/%d" , offset , count )
384386 }
385387 return nil
386388}
387389
388390// NewDescriptorMigration is the factory for DescriptorMigration
389- func NewDescriptorMigration (log * logrus.Entry ) migrate.Migrate {
390- return & DescriptorMigration {log : log }
391+ func NewDescriptorMigration (ctx xcontext.Context ) migrate.Migrate {
392+ return & DescriptorMigration {
393+ Context : ctx ,
394+ }
391395}
392396
393397// register NewDescriptorMigration at initialization time
0 commit comments