Skip to content

Commit b46eada

Browse files
authored
ffs/scheduler: resume executing re-attached Jobs concurrently & update deps (textileio#493)
* update deps and gitignore Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com> * lotus/client: fix logger name Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com> * concurrent resuming of jobs Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
1 parent 236f59a commit b46eada

6 files changed

Lines changed: 83 additions & 37 deletions

File tree

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@
2020
buildtools/protoc
2121
buildtools/protoc-gen-go
2222

23+
# File names that can be used for testing.
2324
myfile
2425
myfile2
26+
27+
# CidConfig name that can be used for testing.
28+
pconfig.pow
29+
2530
tags

cmd/powd/main.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,9 @@ func setupLogging() error {
178178
"fchost",
179179
"maxmind",
180180

181+
// Lotus client
182+
"lotus-client",
183+
181184
// Deals Module
182185
"deals",
183186

ffs/scheduler/scheduler.go

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,6 @@ func (s *Scheduler) push(iid ffs.APIID, cfg ffs.CidConfig, oldCid cid.Cid) (ffs.
131131
return ffs.EmptyJobID, fmt.Errorf("saving new config in store: %s", err)
132132
}
133133

134-
if oldCid.Defined() {
135-
if err := s.Untrack(oldCid); err != nil {
136-
return ffs.EmptyJobID, fmt.Errorf("untracking replaced cid: %s", err)
137-
}
138-
}
139134
select {
140135
case s.evaluateQueuedWork <- struct{}{}:
141136
default:
@@ -262,8 +257,8 @@ func (s *Scheduler) run() {
262257

263258
func (s *Scheduler) resumeStartedDeals() error {
264259
ejids := s.js.GetExecutingJobs()
265-
// No need for rate limit since "Executing" # of jobs are already rate limited on creation.
266-
var wg sync.WaitGroup
260+
// No need for rate limit since the number of "Executing"
261+
// jobs is always rate-limited on creation.
267262
for _, jid := range ejids {
268263
if s.ctx.Err() != nil {
269264
break
@@ -272,15 +267,13 @@ func (s *Scheduler) resumeStartedDeals() error {
272267
if err != nil {
273268
return fmt.Errorf("getting resumed queued job: %s", err)
274269
}
275-
wg.Add(1)
276270
go func(j ffs.Job) {
277-
defer wg.Done()
271+
log.Infof("resuming job %s with cid %s", j.ID, j.Cid)
278272
// We re-execute the pipeline as if was dequeued.
279273
// Both hot and cold storage can detect resumed job execution.
280274
s.executeQueuedJob(j)
281275
}(j)
282276
}
283-
wg.Wait()
284277
return nil
285278
}
286279

@@ -406,13 +399,6 @@ func (s *Scheduler) executeQueuedJob(j ffs.Job) {
406399
s.cancelLock.Unlock()
407400
}()
408401

409-
// Get
410-
a, err := s.as.Get(j.ID)
411-
if err != nil {
412-
log.Errorf("getting push config action data from store: %s", err)
413-
return
414-
}
415-
416402
ctx, cancel := context.WithCancel(context.WithValue(context.Background(), ffs.CtxKeyJid, j.ID))
417403
defer cancel()
418404
go func() {
@@ -422,6 +408,17 @@ func (s *Scheduler) executeQueuedJob(j ffs.Job) {
422408
cancel()
423409
}()
424410

411+
// Get
412+
a, err := s.as.Get(j.ID)
413+
if err != nil {
414+
log.Errorf("getting push config action data from store: %s", err)
415+
if err := s.js.Finalize(j.ID, ffs.Failed, err, nil); err != nil {
416+
log.Errorf("changing job to failed: %s", err)
417+
}
418+
s.l.Log(ctx, a.Cfg.Cid, "Job %s couldn't start: %s.", j.ID, err)
419+
return
420+
}
421+
425422
// Execute
426423
s.l.Log(ctx, a.Cfg.Cid, "Executing job %s...", j.ID)
427424
info, dealErrors, err := s.execute(ctx, a, j)
@@ -433,7 +430,7 @@ func (s *Scheduler) executeQueuedJob(j ffs.Job) {
433430
if err := s.js.Finalize(j.ID, ffs.Failed, err, dealErrors); err != nil {
434431
log.Errorf("changing job to failed: %s", err)
435432
}
436-
s.l.Log(ctx, a.Cfg.Cid, "Job %s execution failed.", j.ID)
433+
s.l.Log(ctx, a.Cfg.Cid, "Job %s execution failed: %s", j.ID, err)
437434
return
438435
}
439436
// Save whatever stored information was completely/partially
@@ -466,6 +463,12 @@ func (s *Scheduler) execute(ctx context.Context, a astore.Action, job ffs.Job) (
466463
return ffs.CidInfo{}, nil, fmt.Errorf("getting current cid info from store: %s", err)
467464
}
468465

466+
if a.ReplacedCid.Defined() {
467+
if err := s.Untrack(a.ReplacedCid); err != nil && err != astore.ErrNotFound {
468+
return ffs.CidInfo{}, nil, fmt.Errorf("untracking replaced cid: %s", err)
469+
}
470+
}
471+
469472
s.l.Log(ctx, a.Cfg.Cid, "Ensuring Hot-Storage satisfies the configuration...")
470473
hot, err := s.executeHotStorage(ctx, ci, a.Cfg.Hot, a.Cfg.Cold.Filecoin.Addr, a.ReplacedCid)
471474
if err != nil {

go.mod

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/textileio/powergate
33
go 1.14
44

55
require (
6-
contrib.go.opencensus.io/exporter/prometheus v0.1.0
6+
contrib.go.opencensus.io/exporter/prometheus v0.2.0
77
github.com/AlecAivazis/survey/v2 v2.0.7
88
github.com/caarlos0/spin v1.1.0
99
github.com/containerd/continuity v0.0.0-20200228182428-0f16d7a0959c // indirect
@@ -14,17 +14,17 @@ require (
1414
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6
1515
github.com/filecoin-project/lotus v0.4.0
1616
github.com/filecoin-project/specs-actors v0.6.2-0.20200617175406-de392ca14121
17-
github.com/gin-contrib/location v0.0.1
17+
github.com/gin-contrib/location v0.0.2
1818
github.com/gin-contrib/static v0.0.0-20191128031702-f81c604d8ac2
1919
github.com/gin-gonic/gin v1.6.3
2020
github.com/golang/protobuf v1.4.2
21-
github.com/google/go-cmp v0.4.1
21+
github.com/google/go-cmp v0.5.0
2222
github.com/google/uuid v1.1.1
2323
github.com/gosuri/uilive v0.0.4
2424
github.com/grpc-ecosystem/go-grpc-middleware v1.2.0
2525
github.com/improbable-eng/grpc-web v0.12.0
2626
github.com/ipfs/go-block-format v0.0.2
27-
github.com/ipfs/go-cid v0.0.6-0.20200501230655-7c82f3b81c00
27+
github.com/ipfs/go-cid v0.0.6
2828
github.com/ipfs/go-datastore v0.4.4
2929
github.com/ipfs/go-ds-badger2 v0.1.0
3030
github.com/ipfs/go-ipfs-files v0.0.8
@@ -35,9 +35,9 @@ require (
3535
github.com/ipfs/interface-go-ipfs-core v0.2.6
3636
github.com/ipld/go-car v0.1.1-0.20200526133713-1c7508d55aae
3737
github.com/jessevdk/go-assets v0.0.0-20160921144138-4f4301a06e15
38-
github.com/libp2p/go-libp2p v0.9.4
39-
github.com/libp2p/go-libp2p-core v0.5.7
40-
github.com/libp2p/go-libp2p-kad-dht v0.8.1
38+
github.com/libp2p/go-libp2p v0.10.0
39+
github.com/libp2p/go-libp2p-core v0.6.0
40+
github.com/libp2p/go-libp2p-kad-dht v0.8.2
4141
github.com/logrusorgru/aurora v0.0.0-20200102142835-e9ef32dff381
4242
github.com/manifoldco/promptui v0.7.0
4343
github.com/mattn/go-runewidth v0.0.8 // indirect
@@ -53,8 +53,8 @@ require (
5353
github.com/spf13/cobra v1.0.0
5454
github.com/spf13/pflag v1.0.5
5555
github.com/spf13/viper v1.7.0
56-
github.com/stretchr/testify v1.6.0
57-
go.opencensus.io v0.22.3
58-
google.golang.org/grpc v1.29.1
56+
github.com/stretchr/testify v1.6.1
57+
go.opencensus.io v0.22.4
58+
google.golang.org/grpc v1.30.0
5959
google.golang.org/protobuf v1.24.0
6060
)

0 commit comments

Comments
 (0)