Skip to content

Commit 2fdbd1f

Browse files
authored
Decrease indexing load & fail in unknown Jobs (textileio#589)
- Decrease Lotus load generated by indices. - Return an error if `WatchJobs` receives an unknown job.
1 parent 7fa2b36 commit 2fdbd1f

6 files changed

Lines changed: 22 additions & 16 deletions

File tree

ffs/api/api_jobs.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ func (i *API) WatchJobs(ctx context.Context, c chan<- ffs.Job, jids ...ffs.JobID
1616
for _, jid := range jids {
1717
j, err := i.sched.GetJob(jid)
1818
if err == scheduler.ErrNotFound {
19-
continue
19+
return fmt.Errorf("job not found: %s", jid)
2020
}
2121
if err != nil {
2222
return fmt.Errorf("getting current job state: %s", err)

index/ask/runner/runner.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ import (
2222
)
2323

2424
var (
25-
qaRatelim = 20
25+
qaRatelim = 5
2626
qaTimeout = time.Second * 20
27-
qaRefreshInterval = 15 * time.Minute
27+
qaRefreshInterval = 30 * time.Minute
2828

2929
log = logging.Logger("index-ask")
3030
)

index/faults/module/faults.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,6 @@ func (s *Index) Close() error {
123123
// start is a long running job that keeps the index up to date with chain updates.
124124
func (s *Index) start() {
125125
defer close(s.finished)
126-
if err := s.updateIndex(); err != nil {
127-
log.Errorf("initial updating faults index: %s", err)
128-
}
129126
for {
130127
select {
131128
case <-s.ctx.Done():

index/miner/module/meta.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ import (
1818
)
1919

2020
var (
21-
metadataRefreshInterval = time.Minute * 15
21+
metadataRefreshInterval = time.Hour
2222
pingTimeout = time.Second * 5
23-
pingRateLim = 10
23+
pingRateLim = 1
2424
)
2525

2626
var (
@@ -54,7 +54,7 @@ func (mi *Index) metaWorker() {
5454
mi.lock.Unlock()
5555
newIndex := updateMetaIndex(mi.ctx, mi.clientBuilder, mi.h, mi.lr, addrs)
5656
if err := mi.persistMetaIndex(newIndex); err != nil {
57-
log.Errorf("error when persisting meta index: %s", err)
57+
log.Errorf("persisting meta index: %s", err)
5858
}
5959
mi.lock.Lock()
6060
mi.index.Meta = newIndex
@@ -80,6 +80,10 @@ func updateMetaIndex(ctx context.Context, clientBuilder lotus.ClientBuilder, h P
8080
rl := make(chan struct{}, pingRateLim)
8181
var lock sync.Mutex
8282
for i, a := range addrs {
83+
if ctx.Err() != nil {
84+
log.Infof("update meta index canceled")
85+
return miner.MetaIndex{}
86+
}
8387
rl <- struct{}{}
8488
go func(a string) {
8589
defer func() { <-rl }()

index/miner/module/miner.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ const (
2525

2626
var (
2727
minersRefreshInterval = time.Minute * 30
28-
maxParallelism = 10
28+
maxParallelism = 1
2929
dsBase = datastore.NewKey("index")
3030

3131
log = logging.Logger("index-miner")
@@ -159,7 +159,7 @@ func (mi *Index) start() {
159159
defer func() { mi.finished <- struct{}{} }()
160160

161161
if err := mi.updateOnChainIndex(); err != nil {
162-
log.Errorf("error on initial updating miner index: %s", err)
162+
log.Errorf("initial updating miner index: %s", err)
163163
}
164164
mi.chMeta <- struct{}{}
165165
for {
@@ -175,7 +175,7 @@ func (mi *Index) start() {
175175
}
176176
case <-mi.minerTicker.C:
177177
if err := mi.updateOnChainIndex(); err != nil {
178-
log.Errorf("error when updating miner index: %s", err)
178+
log.Errorf("updating miner index: %s", err)
179179
continue
180180
}
181181
}

index/miner/module/onchain.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ func (mi *Index) updateOnChainIndex() error {
3434
}
3535
defer cls()
3636
log.Info("updating on-chain index...")
37+
defer log.Info("on-chain index updated")
38+
3739
heaviest, err := client.ChainHead(mi.ctx)
3840
if err != nil {
3941
return err
@@ -50,7 +52,7 @@ func (mi *Index) updateOnChainIndex() error {
5052
newtsk := new.Key()
5153
ts, err := mi.store.LoadAndPrune(mi.ctx, newtsk, &chainIndex)
5254
if err != nil {
53-
return fmt.Errorf("error getting last saved from store: %s", err)
55+
return fmt.Errorf("getting last saved from store: %s", err)
5456
}
5557
if chainIndex.Miners == nil {
5658
chainIndex.Miners = make(map[string]miner.OnChainData)
@@ -66,12 +68,12 @@ func (mi *Index) updateOnChainIndex() error {
6668
if hdiff > fullThreshold || chainIndex.LastUpdated == 0 {
6769
mctx, _ = tag.New(mctx, tag.Insert(metricRefreshType, "full"))
6870
if err := fullRefresh(mi.ctx, client, &chainIndex); err != nil {
69-
return fmt.Errorf("error doing full refresh: %s", err)
71+
return fmt.Errorf("doing full refresh: %s", err)
7072
}
7173
} else {
7274
mctx, _ = tag.New(mctx, tag.Insert(metricRefreshType, "delta"))
7375
if err := deltaRefresh(mi.ctx, client, &chainIndex, *ts, new); err != nil {
74-
return fmt.Errorf("error doing delta refresh: %s", err)
76+
return fmt.Errorf("doing delta refresh: %s", err)
7577
}
7678
}
7779
chainIndex.LastUpdated = int64(new.Height())
@@ -137,12 +139,15 @@ func updateForAddrs(ctx context.Context, api *apistruct.FullNodeStruct, chainInd
137139
var l sync.Mutex
138140
rl := make(chan struct{}, maxParallelism)
139141
for i, a := range addrs {
142+
if ctx.Err() != nil {
143+
return fmt.Errorf("update on-chain index canceled")
144+
}
140145
rl <- struct{}{}
141146
go func(addr address.Address) {
142147
defer func() { <-rl }()
143148
ocd, err := getOnChainData(ctx, api, addr)
144149
if err != nil {
145-
log.Debug("getting power: %s", err)
150+
log.Debugf("getting onchain data: %s", err)
146151
return
147152
}
148153
l.Lock()

0 commit comments

Comments
 (0)