Skip to content

Commit 7d1fded

Browse files
authored
Better list deals API (textileio#508)
* Add flexible list api Signed-off-by: Aaron Sutula <hi@asutula.com> * rpc, client, and cli Signed-off-by: Aaron Sutula <hi@asutula.com> * fix comment, test, and go mod tidy Signed-off-by: Aaron Sutula <hi@asutula.com> * update cli docs Signed-off-by: Aaron Sutula <hi@asutula.com> * move options to deals package Signed-off-by: Aaron Sutula <hi@asutula.com> * little cleanup Signed-off-by: Aaron Sutula <hi@asutula.com> * deals store duration fix Signed-off-by: Aaron Sutula <hi@asutula.com> * update docs Signed-off-by: Aaron Sutula <hi@asutula.com> * use includeFinal, better docs, pr feedback Signed-off-by: Aaron Sutula <hi@asutula.com> * make includeFinal default false Signed-off-by: Aaron Sutula <hi@asutula.com>
1 parent db27e80 commit 7d1fded

30 files changed

Lines changed: 1116 additions & 1386 deletions

api/client/deals.go

Lines changed: 110 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package client
22

33
import (
44
"context"
5+
"fmt"
56
"io"
67

78
"github.com/filecoin-project/go-address"
@@ -20,16 +21,59 @@ type Deals struct {
2021

2122
// WatchEvent is used to send data or error values for Watch.
2223
type WatchEvent struct {
23-
Deal deals.DealInfo
24+
Deal deals.StorageDealInfo
2425
Err error
2526
}
2627

28+
// ListDealRecordsOption updates a ListDealRecordsConfig.
29+
type ListDealRecordsOption func(*rpc.ListDealRecordsConfig)
30+
31+
// WithFromAddrs limits the results deals initated from the provided wallet addresses.
32+
// If WithDataCids is also provided, this is an AND operation.
33+
func WithFromAddrs(addrs ...string) ListDealRecordsOption {
34+
return func(c *rpc.ListDealRecordsConfig) {
35+
c.FromAddrs = addrs
36+
}
37+
}
38+
39+
// WithDataCids limits the results to deals for the provided data cids.
40+
// If WithFromAddrs is also provided, this is an AND operation.
41+
func WithDataCids(cids ...string) ListDealRecordsOption {
42+
return func(c *rpc.ListDealRecordsConfig) {
43+
c.DataCids = cids
44+
}
45+
}
46+
47+
// WithIncludePending specifies whether or not to include pending deals in the results. Default is false.
48+
// Ignored for ListRetrievalDealRecords.
49+
func WithIncludePending(includePending bool) ListDealRecordsOption {
50+
return func(c *rpc.ListDealRecordsConfig) {
51+
c.IncludePending = includePending
52+
}
53+
}
54+
55+
// WithIncludeFinal specifies whether or not to include final deals in the results. Default is false.
56+
// Ignored for ListRetrievalDealRecords.
57+
func WithIncludeFinal(includeFinal bool) ListDealRecordsOption {
58+
return func(c *rpc.ListDealRecordsConfig) {
59+
c.IncludeFinal = includeFinal
60+
}
61+
}
62+
63+
// WithAscending specifies to sort the results in ascending order. Default is descending order.
64+
// Records are sorted by timestamp.
65+
func WithAscending(ascending bool) ListDealRecordsOption {
66+
return func(c *rpc.ListDealRecordsConfig) {
67+
c.Ascending = ascending
68+
}
69+
}
70+
2771
// Store creates a proposal deal for data using wallet addr to all miners indicated
2872
// by dealConfigs for duration epochs.
2973
func (d *Deals) Store(ctx context.Context, addr string, data io.Reader, dealConfigs []deals.StorageDealConfig, minDuration uint64) ([]cid.Cid, []deals.StorageDealConfig, error) {
3074
stream, err := d.client.Store(ctx)
3175
if err != nil {
32-
return nil, nil, err
76+
return nil, nil, fmt.Errorf("calling Store: %v", err)
3377
}
3478

3579
reqDealConfigs := make([]*rpc.DealConfig, len(dealConfigs))
@@ -47,33 +91,33 @@ func (d *Deals) Store(ctx context.Context, addr string, data io.Reader, dealConf
4791
innerReq := &rpc.StoreRequest_StoreParams{StoreParams: storeParams}
4892

4993
if err = stream.Send(&rpc.StoreRequest{Payload: innerReq}); err != nil {
50-
return nil, nil, err
94+
return nil, nil, fmt.Errorf("calling Send: %v", err)
5195
}
5296

5397
buffer := make([]byte, 1024*32) // 32KB
5498
for {
5599
bytesRead, err := data.Read(buffer)
56100
if err != nil && err != io.EOF {
57-
return nil, nil, err
101+
return nil, nil, fmt.Errorf("reading buffer: %v", err)
58102
}
59103
sendErr := stream.Send(&rpc.StoreRequest{Payload: &rpc.StoreRequest_Chunk{Chunk: buffer[:bytesRead]}})
60104
if sendErr != nil {
61-
return nil, nil, sendErr
105+
return nil, nil, fmt.Errorf("calling Send: %v", err)
62106
}
63107
if err == io.EOF {
64108
break
65109
}
66110
}
67111
reply, err := stream.CloseAndRecv()
68112
if err != nil {
69-
return nil, nil, err
113+
return nil, nil, fmt.Errorf("calling CloseAndRecv: %v", err)
70114
}
71115

72116
cids := make([]cid.Cid, len(reply.GetProposalCids()))
73117
for i, replyCid := range reply.GetProposalCids() {
74118
id, err := cid.Decode(replyCid)
75119
if err != nil {
76-
return nil, nil, err
120+
return nil, nil, fmt.Errorf("decoding cid: %v", err)
77121
}
78122
cids[i] = id
79123
}
@@ -82,7 +126,7 @@ func (d *Deals) Store(ctx context.Context, addr string, data io.Reader, dealConf
82126
for i, dealConfig := range reply.GetFailedDeals() {
83127
addr, err := address.NewFromString(dealConfig.GetMiner())
84128
if err != nil {
85-
return nil, nil, err
129+
return nil, nil, fmt.Errorf("decoding address: %v", err)
86130
}
87131
failedDeals[i] = deals.StorageDealConfig{
88132
Miner: addr.String(),
@@ -102,7 +146,7 @@ func (d *Deals) Watch(ctx context.Context, proposals []cid.Cid) (<-chan WatchEve
102146
}
103147
stream, err := d.client.Watch(ctx, &rpc.WatchRequest{Proposals: proposalStrings})
104148
if err != nil {
105-
return nil, err
149+
return nil, fmt.Errorf("calling Watch: %v", err)
106150
}
107151
go func() {
108152
defer close(channel)
@@ -111,21 +155,21 @@ func (d *Deals) Watch(ctx context.Context, proposals []cid.Cid) (<-chan WatchEve
111155
if err != nil {
112156
stat := status.Convert(err)
113157
if stat == nil || (stat.Code() != codes.Canceled) {
114-
channel <- WatchEvent{Err: err}
158+
channel <- WatchEvent{Err: fmt.Errorf("reveiving stream: %v", err)}
115159
}
116160
break
117161
}
118162
proposalCid, err := cid.Decode(event.GetDealInfo().GetProposalCid())
119163
if err != nil {
120-
channel <- WatchEvent{Err: err}
164+
channel <- WatchEvent{Err: fmt.Errorf("decoding cid: %v", err)}
121165
break
122166
}
123167
cid, err := cid.Decode(event.GetDealInfo().GetPieceCid())
124168
if err != nil {
125-
channel <- WatchEvent{Err: err}
169+
channel <- WatchEvent{Err: fmt.Errorf("decoding cid: %v", err)}
126170
break
127171
}
128-
deal := deals.DealInfo{
172+
deal := deals.StorageDealInfo{
129173
ProposalCid: proposalCid,
130174
StateID: event.GetDealInfo().GetStateId(),
131175
StateName: event.GetDealInfo().GetStateName(),
@@ -153,7 +197,7 @@ func (d *Deals) Retrieve(ctx context.Context, waddr string, cid cid.Cid) (io.Rea
153197
}
154198
stream, err := d.client.Retrieve(ctx, req)
155199
if err != nil {
156-
return nil, err
200+
return nil, fmt.Errorf("calling Retrieve: %v", err)
157201
}
158202

159203
reader, writer := io.Pipe()
@@ -179,89 +223,49 @@ func (d *Deals) Retrieve(ctx context.Context, waddr string, cid cid.Cid) (io.Rea
179223
return reader, nil
180224
}
181225

182-
// FinalDealRecords returns a list of all finalized storage deals.
183-
// Records are sorted ascending by activation epoch then timestamp.
184-
func (d *Deals) FinalDealRecords(ctx context.Context) ([]deals.DealRecord, error) {
185-
res, err := d.client.FinalDealRecords(ctx, &rpc.FinalDealRecordsRequest{})
186-
if err != nil {
187-
return nil, err
226+
// ListStorageDealRecords returns a list of storage deals
227+
// according to the provided options.
228+
func (d *Deals) ListStorageDealRecords(ctx context.Context, opts ...ListDealRecordsOption) ([]deals.StorageDealRecord, error) {
229+
conf := &rpc.ListDealRecordsConfig{}
230+
for _, opt := range opts {
231+
opt(conf)
188232
}
189-
ret, err := fromRPCDealRecords(res.Records)
190-
if err != nil {
191-
return nil, err
192-
}
193-
return ret, nil
194-
}
195-
196-
// PendingDealRecords returns a list of all pending storage deals.
197-
// Records are sorted ascending by timestamp.
198-
func (d *Deals) PendingDealRecords(ctx context.Context) ([]deals.DealRecord, error) {
199-
res, err := d.client.PendingDealRecords(ctx, &rpc.PendingDealRecordsRequest{})
233+
res, err := d.client.ListStorageDealRecords(ctx, &rpc.ListStorageDealRecordsRequest{Config: conf})
200234
if err != nil {
201-
return nil, err
235+
return nil, fmt.Errorf("calling ListStorageDealRecords: %v", err)
202236
}
203-
ret, err := fromRPCDealRecords(res.Records)
237+
ret, err := fromRPCStorageDealRecords(res.Records)
204238
if err != nil {
205-
return nil, err
239+
return nil, fmt.Errorf("processing response deal records: %v", err)
206240
}
207241
return ret, nil
208242
}
209243

210-
// AllDealRecords returns a list of all finalized and pending deals.
211-
// Records are sorted ascending by activation epoch, if available, then timestamp.
212-
func (d *Deals) AllDealRecords(ctx context.Context) ([]deals.DealRecord, error) {
213-
res, err := d.client.AllDealRecords(ctx, &rpc.AllDealRecordsRequest{})
214-
if err != nil {
215-
return nil, err
244+
// ListRetrievalDealRecords returns a list of retrieval deals
245+
// according to the provided options.
246+
func (d *Deals) ListRetrievalDealRecords(ctx context.Context, opts ...ListDealRecordsOption) ([]deals.RetrievalDealRecord, error) {
247+
conf := &rpc.ListDealRecordsConfig{}
248+
for _, opt := range opts {
249+
opt(conf)
216250
}
217-
ret, err := fromRPCDealRecords(res.Records)
251+
res, err := d.client.ListRetrievalDealRecords(ctx, &rpc.ListRetrievalDealRecordsRequest{Config: conf})
218252
if err != nil {
219-
return nil, err
253+
return nil, fmt.Errorf("calling ListRetrievalDealRecords: %v", err)
220254
}
221-
return ret, nil
222-
}
223-
224-
// RetrievalRecords returns a list of all retrievals.
225-
// Records are sorted ascending by timestamp.
226-
func (d *Deals) RetrievalRecords(ctx context.Context) ([]deals.RetrievalRecord, error) {
227-
res, err := d.client.RetrievalRecords(ctx, &rpc.RetrievalRecordsRequest{})
255+
ret, err := fromRPCRetrievalDealRecords(res.Records)
228256
if err != nil {
229-
return nil, err
230-
}
231-
var ret []deals.RetrievalRecord
232-
for _, rpcRecord := range res.Records {
233-
if rpcRecord.RetrievalInfo == nil {
234-
continue
235-
}
236-
record := deals.RetrievalRecord{
237-
Addr: rpcRecord.Addr,
238-
Time: rpcRecord.Time,
239-
}
240-
pieceCid, err := cid.Decode(rpcRecord.RetrievalInfo.PieceCid)
241-
if err != nil {
242-
return nil, err
243-
}
244-
record.RetrievalInfo = deals.RetrievalInfo{
245-
PieceCID: pieceCid,
246-
Size: rpcRecord.RetrievalInfo.Size,
247-
MinPrice: rpcRecord.RetrievalInfo.MinPrice,
248-
PaymentInterval: rpcRecord.RetrievalInfo.PaymentInterval,
249-
PaymentIntervalIncrease: rpcRecord.RetrievalInfo.PaymentIntervalIncrease,
250-
Miner: rpcRecord.RetrievalInfo.Miner,
251-
MinerPeerID: rpcRecord.RetrievalInfo.MinerPeerId,
252-
}
253-
ret = append(ret, record)
257+
return nil, fmt.Errorf("processing response deal records: %v", err)
254258
}
255259
return ret, nil
256260
}
257261

258-
func fromRPCDealRecords(records []*rpc.DealRecord) ([]deals.DealRecord, error) {
259-
var ret []deals.DealRecord
262+
func fromRPCStorageDealRecords(records []*rpc.StorageDealRecord) ([]deals.StorageDealRecord, error) {
263+
var ret []deals.StorageDealRecord
260264
for _, rpcRecord := range records {
261265
if rpcRecord.DealInfo == nil {
262266
continue
263267
}
264-
record := deals.DealRecord{
268+
record := deals.StorageDealRecord{
265269
Addr: rpcRecord.Addr,
266270
Time: rpcRecord.Time,
267271
Pending: rpcRecord.Pending,
@@ -274,7 +278,7 @@ func fromRPCDealRecords(records []*rpc.DealRecord) ([]deals.DealRecord, error) {
274278
if err != nil {
275279
return nil, err
276280
}
277-
record.DealInfo = deals.DealInfo{
281+
record.DealInfo = deals.StorageDealInfo{
278282
ProposalCid: proposalCid,
279283
StateID: rpcRecord.DealInfo.StateId,
280284
StateName: rpcRecord.DealInfo.StateName,
@@ -292,3 +296,31 @@ func fromRPCDealRecords(records []*rpc.DealRecord) ([]deals.DealRecord, error) {
292296
}
293297
return ret, nil
294298
}
299+
300+
func fromRPCRetrievalDealRecords(records []*rpc.RetrievalDealRecord) ([]deals.RetrievalDealRecord, error) {
301+
var ret []deals.RetrievalDealRecord
302+
for _, rpcRecord := range records {
303+
if rpcRecord.DealInfo == nil {
304+
continue
305+
}
306+
record := deals.RetrievalDealRecord{
307+
Addr: rpcRecord.Addr,
308+
Time: rpcRecord.Time,
309+
}
310+
pieceCid, err := cid.Decode(rpcRecord.DealInfo.PieceCid)
311+
if err != nil {
312+
return nil, err
313+
}
314+
record.DealInfo = deals.RetrievalDealInfo{
315+
PieceCID: pieceCid,
316+
Size: rpcRecord.DealInfo.Size,
317+
MinPrice: rpcRecord.DealInfo.MinPrice,
318+
PaymentInterval: rpcRecord.DealInfo.PaymentInterval,
319+
PaymentIntervalIncrease: rpcRecord.DealInfo.PaymentIntervalIncrease,
320+
Miner: rpcRecord.DealInfo.Miner,
321+
MinerPeerID: rpcRecord.DealInfo.MinerPeerId,
322+
}
323+
ret = append(ret, record)
324+
}
325+
return ret, nil
326+
}

api/stub/deals.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func driveChannel(ch chan client.WatchEvent, proposals []cid.Cid) {
6666
for _, proposal := range proposals {
6767
time.Sleep(time.Millisecond * 1000)
6868
ch <- client.WatchEvent{
69-
Deal: deals.DealInfo{
69+
Deal: deals.StorageDealInfo{
7070
ProposalCid: proposal,
7171
StateName: dealState,
7272
},

cli-docs/pow/pow_deals.md

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,9 @@ Provides commands to manage storage deals
2121
### SEE ALSO
2222

2323
* [pow](pow.md) - A client for storage and retreival of powergate data
24-
* [pow deals all](pow_deals_all.md) - List pending and final deal records
25-
* [pow deals final](pow_deals_final.md) - List finalized deal records
26-
* [pow deals pending](pow_deals_pending.md) - List pending deal records
27-
* [pow deals retrieval](pow_deals_retrieval.md) - List retrieval records
24+
* [pow deals retrievals](pow_deals_retrievals.md) - List retrieval records
2825
* [pow deals retrieve](pow_deals_retrieve.md) - Retrieve data from filecoin
26+
* [pow deals storage](pow_deals_storage.md) - List storage deal records
2927
* [pow deals store](pow_deals_store.md) - Store data in filecoin
3028
* [pow deals watch](pow_deals_watch.md) - Watch storage process
3129

cli-docs/pow/pow_deals_all.md

Lines changed: 0 additions & 28 deletions
This file was deleted.

cli-docs/pow/pow_deals_final.md

Lines changed: 0 additions & 28 deletions
This file was deleted.

0 commit comments

Comments
 (0)