Skip to content

Commit 3a9586e

Browse files
authored
Get storage job API (textileio#598)
* Add GetStorageJob throughout stack Signed-off-by: Aaron Sutula <hi@asutula.com> * update docs Signed-off-by: Aaron Sutula <hi@asutula.com>
1 parent da4bd1b commit 3a9586e

19 files changed

Lines changed: 887 additions & 521 deletions

File tree

api/client/ffs.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,15 @@ func (f *FFS) CancelJob(ctx context.Context, jid ffs.JobID) error {
279279
return err
280280
}
281281

282+
// GetStorageJob returns the current state of the specified job.
283+
func (f *FFS) GetStorageJob(ctx context.Context, jid ffs.JobID) (ffs.StorageJob, error) {
284+
res, err := f.client.GetStorageJob(ctx, &rpc.GetStorageJobRequest{Jid: jid.String()})
285+
if err != nil {
286+
return ffs.StorageJob{}, err
287+
}
288+
return fromRPCJob(res.Job)
289+
}
290+
282291
// WatchJobs pushes JobEvents to the provided channel. The provided channel will be owned
283292
// by the client after the call, so it shouldn't be closed by the client. To stop receiving
284293
// events, the provided ctx should be canceled. If an error occurs, it will be returned
@@ -802,6 +811,42 @@ func fromRPCRetrievalDealRecords(records []*rpc.RetrievalDealRecord) ([]deals.Re
802811
return ret, nil
803812
}
804813

814+
func fromRPCJob(job *rpc.Job) (ffs.StorageJob, error) {
815+
c, err := util.CidFromString(job.Cid)
816+
if err != nil {
817+
return ffs.StorageJob{}, err
818+
}
819+
dealErrors, err := fromRPCDealErrors(job.DealErrors)
820+
if err != nil {
821+
return ffs.StorageJob{}, err
822+
}
823+
var status ffs.JobStatus
824+
switch job.Status {
825+
case rpc.JobStatus_JOB_STATUS_UNSPECIFIED:
826+
status = ffs.Unspecified
827+
case rpc.JobStatus_JOB_STATUS_QUEUED:
828+
status = ffs.Queued
829+
case rpc.JobStatus_JOB_STATUS_EXECUTING:
830+
status = ffs.Executing
831+
case rpc.JobStatus_JOB_STATUS_FAILED:
832+
status = ffs.Failed
833+
case rpc.JobStatus_JOB_STATUS_CANCELED:
834+
status = ffs.Canceled
835+
case rpc.JobStatus_JOB_STATUS_SUCCESS:
836+
status = ffs.Success
837+
default:
838+
return ffs.StorageJob{}, fmt.Errorf("unknown job status: %v", job.Status.String())
839+
}
840+
return ffs.StorageJob{
841+
ID: ffs.JobID(job.Id),
842+
APIID: ffs.APIID(job.ApiId),
843+
Cid: c,
844+
Status: status,
845+
ErrCause: job.ErrCause,
846+
DealErrors: dealErrors,
847+
}, nil
848+
}
849+
805850
func newDecoratedIPFSAPI(proxyAddr, ffsToken string) (*httpapi.HttpApi, error) {
806851
ipport := strings.Split(proxyAddr, ":")
807852
if len(ipport) != 2 {

cli-docs/pow/pow_ffs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,6 @@ Provides commands to manage ffs
3737
* [pow ffs show](pow_ffs_show.md) - Show pinned cid data
3838
* [pow ffs stage](pow_ffs_stage.md) - Temporarily stage data in the Hot layer in preparation for pushing a cid storage config
3939
* [pow ffs storage](pow_ffs_storage.md) - List storage deal records for an FFS instance
40+
* [pow ffs storage-job](pow_ffs_storage-job.md) - Get a storage job's current status
4041
* [pow ffs watch](pow_ffs_watch.md) - Watch for job status updates
4142

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
## pow ffs storage-job
2+
3+
Get a storage job's current status
4+
5+
### Synopsis
6+
7+
Get a storage job's current status
8+
9+
```
10+
pow ffs storage-job [jobid] [flags]
11+
```
12+
13+
### Options
14+
15+
```
16+
-h, --help help for storage-job
17+
-t, --token string FFS auth token
18+
```
19+
20+
### Options inherited from parent commands
21+
22+
```
23+
--serverAddress string address of the powergate service api (default "127.0.0.1:5002")
24+
```
25+
26+
### SEE ALSO
27+
28+
* [pow ffs](pow_ffs.md) - Provides commands to manage ffs
29+

cmd/pow/cmd/ffs_storage_job.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package cmd
2+
3+
import (
4+
"context"
5+
"os"
6+
"strings"
7+
8+
"github.com/spf13/cobra"
9+
"github.com/spf13/viper"
10+
"github.com/textileio/powergate/ffs"
11+
)
12+
13+
func init() {
14+
ffsStorageJobCmd.Flags().StringP("token", "t", "", "FFS auth token")
15+
16+
ffsCmd.AddCommand(ffsStorageJobCmd)
17+
}
18+
19+
var ffsStorageJobCmd = &cobra.Command{
20+
Use: "storage-job [jobid]",
21+
Short: "Get a storage job's current status",
22+
Long: `Get a storage job's current status`,
23+
Args: cobra.ExactArgs(1),
24+
PreRun: func(cmd *cobra.Command, args []string) {
25+
err := viper.BindPFlags(cmd.Flags())
26+
checkErr(err)
27+
},
28+
Run: func(cmd *cobra.Command, args []string) {
29+
idStrings := strings.Split(args[0], ",")
30+
jobIds := make([]ffs.JobID, len(idStrings))
31+
for i, s := range idStrings {
32+
jobIds[i] = ffs.JobID(s)
33+
}
34+
35+
jid := ffs.JobID(args[0])
36+
37+
ctx, cancel := context.WithTimeout(context.Background(), cmdTimeout)
38+
defer cancel()
39+
40+
job, err := fcClient.FFS.GetStorageJob(authCtx(ctx), jid)
41+
checkErr(err)
42+
dealErrorStrings := make([]string, len(job.DealErrors))
43+
for i, dealError := range job.DealErrors {
44+
dealErrorStrings[i] = dealError.Error()
45+
}
46+
rows := [][]string{
47+
{"API ID", job.APIID.String()},
48+
{"Job ID", job.ID.String()},
49+
{"CID", job.Cid.String()},
50+
{"Status", ffs.JobStatusStr[job.Status]},
51+
{"Error Cause", job.ErrCause},
52+
{"Deal Errors", strings.Join(dealErrorStrings, "\n")},
53+
}
54+
RenderTable(os.Stdout, []string{}, rows)
55+
},
56+
}

ffs/api/api_jobs.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,18 @@ import (
88
"github.com/textileio/powergate/ffs/scheduler"
99
)
1010

11+
// GetStorageJob returns the current state of the specified job.
12+
func (i *API) GetStorageJob(ctx context.Context, jid ffs.JobID) (ffs.StorageJob, error) {
13+
job, err := i.sched.GetJob(jid)
14+
if err == scheduler.ErrNotFound {
15+
return ffs.StorageJob{}, fmt.Errorf("job id %s not found", jid)
16+
}
17+
if err != nil {
18+
return ffs.StorageJob{}, fmt.Errorf("getting current job state: %s", err)
19+
}
20+
return job, nil
21+
}
22+
1123
// WatchJobs subscribes to Job status changes. If jids is empty, it subscribes to
1224
// all Job status changes corresonding to the instance. If jids is not empty,
1325
// it immediately sends current state of those Jobs. If empty, it doesn't.

ffs/integrationtest/config/config_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,14 @@ func TestEnabledConfigChange(t *testing.T) {
6666

6767
jid, err := fapi.PushStorageConfig(cid, api.WithStorageConfig(config))
6868
require.NoError(t, err)
69-
it.RequireJobState(t, fapi, jid, ffs.Success)
69+
it.RequireEventualJobState(t, fapi, jid, ffs.Success)
7070
it.RequireStorageConfig(t, fapi, cid, &config)
7171
it.RequireIpfsPinnedCid(ctx, t, cid, ipfsAPI)
7272

7373
config = fapi.DefaultStorageConfig().WithHotEnabled(false)
7474
jid, err = fapi.PushStorageConfig(cid, api.WithStorageConfig(config), api.WithOverride(true))
7575
require.NoError(t, err)
76-
it.RequireJobState(t, fapi, jid, ffs.Success)
76+
it.RequireEventualJobState(t, fapi, jid, ffs.Success)
7777
it.RequireStorageConfig(t, fapi, cid, &config)
7878
it.RequireIpfsUnpinnedCid(ctx, t, cid, ipfsAPI)
7979
})
@@ -91,14 +91,14 @@ func TestEnabledConfigChange(t *testing.T) {
9191

9292
jid, err := fapi.PushStorageConfig(cid, api.WithStorageConfig(config))
9393
require.NoError(t, err)
94-
it.RequireJobState(t, fapi, jid, ffs.Success)
94+
it.RequireEventualJobState(t, fapi, jid, ffs.Success)
9595
it.RequireStorageConfig(t, fapi, cid, &config)
9696
it.RequireIpfsUnpinnedCid(ctx, t, cid, ipfsAPI)
9797

9898
config = fapi.DefaultStorageConfig().WithHotEnabled(true)
9999
jid, err = fapi.PushStorageConfig(cid, api.WithStorageConfig(config), api.WithOverride(true))
100100
require.NoError(t, err)
101-
it.RequireJobState(t, fapi, jid, ffs.Success)
101+
it.RequireEventualJobState(t, fapi, jid, ffs.Success)
102102
it.RequireStorageConfig(t, fapi, cid, &config)
103103
it.RequireIpfsPinnedCid(ctx, t, cid, ipfsAPI)
104104
})
@@ -117,14 +117,14 @@ func TestEnabledConfigChange(t *testing.T) {
117117

118118
jid, err := fapi.PushStorageConfig(cid, api.WithStorageConfig(config))
119119
require.NoError(t, err)
120-
it.RequireJobState(t, fapi, jid, ffs.Success)
120+
it.RequireEventualJobState(t, fapi, jid, ffs.Success)
121121
it.RequireStorageConfig(t, fapi, cid, &config)
122122
it.RequireFilUnstored(ctx, t, client, cid)
123123

124124
config = fapi.DefaultStorageConfig().WithHotEnabled(true)
125125
jid, err = fapi.PushStorageConfig(cid, api.WithStorageConfig(config), api.WithOverride(true))
126126
require.NoError(t, err)
127-
it.RequireJobState(t, fapi, jid, ffs.Success)
127+
it.RequireEventualJobState(t, fapi, jid, ffs.Success)
128128
it.RequireStorageConfig(t, fapi, cid, &config)
129129
it.RequireFilStored(ctx, t, client, cid)
130130
})
@@ -143,14 +143,14 @@ func TestEnabledConfigChange(t *testing.T) {
143143

144144
jid, err := fapi.PushStorageConfig(cid, api.WithStorageConfig(config))
145145
require.NoError(t, err)
146-
it.RequireJobState(t, fapi, jid, ffs.Success)
146+
it.RequireEventualJobState(t, fapi, jid, ffs.Success)
147147
it.RequireStorageConfig(t, fapi, cid, &config)
148148
it.RequireFilUnstored(ctx, t, client, cid)
149149

150150
config = fapi.DefaultStorageConfig().WithHotEnabled(true)
151151
jid, err = fapi.PushStorageConfig(cid, api.WithStorageConfig(config), api.WithOverride(true))
152152
require.NoError(t, err)
153-
it.RequireJobState(t, fapi, jid, ffs.Success)
153+
it.RequireEventualJobState(t, fapi, jid, ffs.Success)
154154

155155
// Yes, still stored in filecoin since deals can't be
156156
// undone.
@@ -194,7 +194,7 @@ func TestFilecoinEnableConfig(t *testing.T) {
194194
require.NoError(t, err)
195195

196196
expectedJobState := ffs.Success
197-
it.RequireJobState(t, fapi, jid, expectedJobState)
197+
it.RequireEventualJobState(t, fapi, jid, expectedJobState)
198198

199199
if expectedJobState == ffs.Success {
200200
it.RequireStorageConfig(t, fapi, cid, &config)
@@ -241,7 +241,7 @@ func TestHotTimeoutConfig(t *testing.T) {
241241
config := fapi.DefaultStorageConfig().WithHotIpfsAddTimeout(1)
242242
jid, err := fapi.PushStorageConfig(cid, api.WithStorageConfig(config))
243243
require.NoError(t, err)
244-
it.RequireJobState(t, fapi, jid, ffs.Failed)
244+
it.RequireEventualJobState(t, fapi, jid, ffs.Failed)
245245
})
246246
})
247247
}
@@ -259,7 +259,7 @@ func TestDurationConfig(t *testing.T) {
259259
config := fapi.DefaultStorageConfig().WithColdFilDealDuration(duration)
260260
jid, err := fapi.PushStorageConfig(cid, api.WithStorageConfig(config))
261261
require.NoError(t, err)
262-
it.RequireJobState(t, fapi, jid, ffs.Success)
262+
it.RequireEventualJobState(t, fapi, jid, ffs.Success)
263263
it.RequireStorageConfig(t, fapi, cid, &config)
264264
cinfo, err := fapi.Show(cid)
265265
require.NoError(t, err)

ffs/integrationtest/filters/filters_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func TestFilecoinExcludedMiners(t *testing.T) {
3838

3939
jid, err := fapi.PushStorageConfig(cid, api.WithStorageConfig(config))
4040
require.NoError(t, err)
41-
it.RequireJobState(t, fapi, jid, ffs.Success)
41+
it.RequireEventualJobState(t, fapi, jid, ffs.Success)
4242
it.RequireStorageConfig(t, fapi, cid, &config)
4343
cinfo, err := fapi.Show(cid)
4444
require.NoError(t, err)
@@ -61,7 +61,7 @@ func TestFilecoinTrustedMiner(t *testing.T) {
6161

6262
jid, err := fapi.PushStorageConfig(cid, api.WithStorageConfig(config))
6363
require.NoError(t, err)
64-
it.RequireJobState(t, fapi, jid, ffs.Success)
64+
it.RequireEventualJobState(t, fapi, jid, ffs.Success)
6565
it.RequireStorageConfig(t, fapi, cid, &config)
6666
cinfo, err := fapi.Show(cid)
6767
require.NoError(t, err)
@@ -103,7 +103,7 @@ func TestFilecoinCountryFilter(t *testing.T) {
103103

104104
jid, err := fapi.PushStorageConfig(cid, api.WithStorageConfig(config))
105105
require.NoError(t, err)
106-
it.RequireJobState(t, fapi, jid, ffs.Success)
106+
it.RequireEventualJobState(t, fapi, jid, ffs.Success)
107107
it.RequireStorageConfig(t, fapi, cid, &config)
108108
cinfo, err := fapi.Show(cid)
109109
require.NoError(t, err)
@@ -135,12 +135,12 @@ func TestFilecoinMaxPriceFilter(t *testing.T) {
135135
config := fapi.DefaultStorageConfig().WithColdMaxPrice(400000000)
136136
jid, err := fapi.PushStorageConfig(cid, api.WithStorageConfig(config))
137137
require.NoError(t, err)
138-
it.RequireJobState(t, fapi, jid, ffs.Failed)
138+
it.RequireEventualJobState(t, fapi, jid, ffs.Failed)
139139

140140
config = fapi.DefaultStorageConfig().WithColdMaxPrice(600000000)
141141
jid, err = fapi.PushStorageConfig(cid, api.WithStorageConfig(config), api.WithOverride(true))
142142
require.NoError(t, err)
143-
it.RequireJobState(t, fapi, jid, ffs.Success)
143+
it.RequireEventualJobState(t, fapi, jid, ffs.Success)
144144
it.RequireStorageConfig(t, fapi, cid, &config)
145145
cinfo, err := fapi.Show(cid)
146146
require.NoError(t, err)

ffs/integrationtest/general/general_test.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ func TestAdd(t *testing.T) {
3939
cid, _ := it.AddRandomFile(t, r, ipfsAPI)
4040
jid, err := fapi.PushStorageConfig(cid)
4141
require.NoError(t, err)
42-
it.RequireJobState(t, fapi, jid, ffs.Success)
42+
it.RequireStorageJobState(t, fapi, jid, ffs.Queued, ffs.Executing)
43+
it.RequireEventualJobState(t, fapi, jid, ffs.Success)
4344
it.RequireStorageConfig(t, fapi, cid, nil)
4445
it.RequireFilStored(ctx, t, client, cid)
4546
it.RequireIpfsPinnedCid(ctx, t, cid, ipfsAPI)
@@ -59,7 +60,8 @@ func TestAdd(t *testing.T) {
5960
config := fapi.DefaultStorageConfig().WithHotEnabled(false).WithColdFilDealDuration(util.MinDealDuration + 1234)
6061
jid, err := fapi.PushStorageConfig(cid, api.WithStorageConfig(config))
6162
require.NoError(t, err)
62-
it.RequireJobState(t, fapi, jid, ffs.Success)
63+
it.RequireStorageJobState(t, fapi, jid, ffs.Queued, ffs.Executing)
64+
it.RequireEventualJobState(t, fapi, jid, ffs.Success)
6365
it.RequireStorageConfig(t, fapi, cid, &config)
6466
it.RequireStorageDealRecord(t, fapi, cid)
6567
})
@@ -78,7 +80,7 @@ func TestGet(t *testing.T) {
7880
cid, data := it.AddRandomFile(t, r, ipfs)
7981
jid, err := fapi.PushStorageConfig(cid)
8082
require.NoError(t, err)
81-
it.RequireJobState(t, fapi, jid, ffs.Success)
83+
it.RequireEventualJobState(t, fapi, jid, ffs.Success)
8284
it.RequireStorageConfig(t, fapi, cid, nil)
8385

8486
rr, err := fapi.Get(ctx, cid)
@@ -177,7 +179,7 @@ func TestShow(t *testing.T) {
177179
cid, _ := it.AddRandomFile(t, r, ipfs)
178180
jid, err := fapi.PushStorageConfig(cid)
179181
require.NoError(t, err)
180-
it.RequireJobState(t, fapi, jid, ffs.Success)
182+
it.RequireEventualJobState(t, fapi, jid, ffs.Success)
181183
it.RequireStorageConfig(t, fapi, cid, nil)
182184

183185
inf, err := fapi.Info(ctx)
@@ -224,7 +226,7 @@ func TestColdInstanceLoad(t *testing.T) {
224226
cid, data := it.AddRandomFile(t, ra, ipfs)
225227
jid, err := fapi.PushStorageConfig(cid)
226228
require.NoError(t, err)
227-
it.RequireJobState(t, fapi, jid, ffs.Success)
229+
it.RequireEventualJobState(t, fapi, jid, ffs.Success)
228230
it.RequireStorageConfig(t, fapi, cid, nil)
229231

230232
info, err := fapi.Info(ctx)
@@ -272,15 +274,15 @@ func TestRemove(t *testing.T) {
272274
config := fapi.DefaultStorageConfig().WithColdEnabled(false)
273275
jid, err := fapi.PushStorageConfig(c1, api.WithStorageConfig(config))
274276
require.NoError(t, err)
275-
it.RequireJobState(t, fapi, jid, ffs.Success)
277+
it.RequireEventualJobState(t, fapi, jid, ffs.Success)
276278
it.RequireStorageConfig(t, fapi, c1, &config)
277279

278280
err = fapi.Remove(c1)
279281
require.Equal(t, api.ErrActiveInStorage, err)
280282

281283
config = config.WithHotEnabled(false)
282284
jid, err = fapi.PushStorageConfig(c1, api.WithStorageConfig(config), api.WithOverride(true))
283-
it.RequireJobState(t, fapi, jid, ffs.Success)
285+
it.RequireEventualJobState(t, fapi, jid, ffs.Success)
284286
require.NoError(t, err)
285287

286288
err = fapi.Remove(c1)

ffs/integrationtest/integrationtest.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,18 @@ func NewFFSManager(t require.TestingT, ds datastore.TxnDatastore, clientBuilder
182182
}
183183
}
184184

185-
// RequireJobState watches a Job for a desired status.
186-
func RequireJobState(t require.TestingT, fapi *api.API, jid ffs.JobID, status ffs.JobStatus) ffs.StorageJob {
185+
// RequireStorageJobState checks if the current status of a job matches one of the specified statuses.
186+
func RequireStorageJobState(t require.TestingT, fapi *api.API, jid ffs.JobID, statuses ...ffs.JobStatus) ffs.StorageJob {
187+
ctx, cancel := context.WithCancel(context.Background())
188+
defer cancel()
189+
job, err := fapi.GetStorageJob(ctx, jid)
190+
require.NoError(t, err)
191+
require.Contains(t, statuses, job.Status)
192+
return job
193+
}
194+
195+
// RequireEventualJobState watches a Job for a desired status.
196+
func RequireEventualJobState(t require.TestingT, fapi *api.API, jid ffs.JobID, status ffs.JobStatus) ffs.StorageJob {
187197
ch := make(chan ffs.StorageJob, 10)
188198
ctx, cancel := context.WithCancel(context.Background())
189199
defer cancel()

0 commit comments

Comments
 (0)