@@ -2,6 +2,7 @@ package client
22
33import (
44 "context"
5+ "fmt"
56 "io"
67
78 "github.com/filecoin-project/go-address"
@@ -20,16 +21,59 @@ type Deals struct {
2021
2122// WatchEvent is used to send data or error values for Watch.
2223type WatchEvent struct {
23- Deal deals.DealInfo
24+ Deal deals.StorageDealInfo
2425 Err error
2526}
2627
28+ // ListDealRecordsOption updates a ListDealRecordsConfig.
29+ type ListDealRecordsOption func (* rpc.ListDealRecordsConfig )
30+
31+ // WithFromAddrs limits the results deals initated from the provided wallet addresses.
32+ // If WithDataCids is also provided, this is an AND operation.
33+ func WithFromAddrs (addrs ... string ) ListDealRecordsOption {
34+ return func (c * rpc.ListDealRecordsConfig ) {
35+ c .FromAddrs = addrs
36+ }
37+ }
38+
39+ // WithDataCids limits the results to deals for the provided data cids.
40+ // If WithFromAddrs is also provided, this is an AND operation.
41+ func WithDataCids (cids ... string ) ListDealRecordsOption {
42+ return func (c * rpc.ListDealRecordsConfig ) {
43+ c .DataCids = cids
44+ }
45+ }
46+
47+ // WithIncludePending specifies whether or not to include pending deals in the results. Default is false.
48+ // Ignored for ListRetrievalDealRecords.
49+ func WithIncludePending (includePending bool ) ListDealRecordsOption {
50+ return func (c * rpc.ListDealRecordsConfig ) {
51+ c .IncludePending = includePending
52+ }
53+ }
54+
55+ // WithIncludeFinal specifies whether or not to include final deals in the results. Default is false.
56+ // Ignored for ListRetrievalDealRecords.
57+ func WithIncludeFinal (includeFinal bool ) ListDealRecordsOption {
58+ return func (c * rpc.ListDealRecordsConfig ) {
59+ c .IncludeFinal = includeFinal
60+ }
61+ }
62+
63+ // WithAscending specifies to sort the results in ascending order. Default is descending order.
64+ // Records are sorted by timestamp.
65+ func WithAscending (ascending bool ) ListDealRecordsOption {
66+ return func (c * rpc.ListDealRecordsConfig ) {
67+ c .Ascending = ascending
68+ }
69+ }
70+
2771// Store creates a proposal deal for data using wallet addr to all miners indicated
2872// by dealConfigs for duration epochs.
2973func (d * Deals ) Store (ctx context.Context , addr string , data io.Reader , dealConfigs []deals.StorageDealConfig , minDuration uint64 ) ([]cid.Cid , []deals.StorageDealConfig , error ) {
3074 stream , err := d .client .Store (ctx )
3175 if err != nil {
32- return nil , nil , err
76+ return nil , nil , fmt . Errorf ( "calling Store: %v" , err )
3377 }
3478
3579 reqDealConfigs := make ([]* rpc.DealConfig , len (dealConfigs ))
@@ -47,33 +91,33 @@ func (d *Deals) Store(ctx context.Context, addr string, data io.Reader, dealConf
4791 innerReq := & rpc.StoreRequest_StoreParams {StoreParams : storeParams }
4892
4993 if err = stream .Send (& rpc.StoreRequest {Payload : innerReq }); err != nil {
50- return nil , nil , err
94+ return nil , nil , fmt . Errorf ( "calling Send: %v" , err )
5195 }
5296
5397 buffer := make ([]byte , 1024 * 32 ) // 32KB
5498 for {
5599 bytesRead , err := data .Read (buffer )
56100 if err != nil && err != io .EOF {
57- return nil , nil , err
101+ return nil , nil , fmt . Errorf ( "reading buffer: %v" , err )
58102 }
59103 sendErr := stream .Send (& rpc.StoreRequest {Payload : & rpc.StoreRequest_Chunk {Chunk : buffer [:bytesRead ]}})
60104 if sendErr != nil {
61- return nil , nil , sendErr
105+ return nil , nil , fmt . Errorf ( "calling Send: %v" , err )
62106 }
63107 if err == io .EOF {
64108 break
65109 }
66110 }
67111 reply , err := stream .CloseAndRecv ()
68112 if err != nil {
69- return nil , nil , err
113+ return nil , nil , fmt . Errorf ( "calling CloseAndRecv: %v" , err )
70114 }
71115
72116 cids := make ([]cid.Cid , len (reply .GetProposalCids ()))
73117 for i , replyCid := range reply .GetProposalCids () {
74118 id , err := cid .Decode (replyCid )
75119 if err != nil {
76- return nil , nil , err
120+ return nil , nil , fmt . Errorf ( "decoding cid: %v" , err )
77121 }
78122 cids [i ] = id
79123 }
@@ -82,7 +126,7 @@ func (d *Deals) Store(ctx context.Context, addr string, data io.Reader, dealConf
82126 for i , dealConfig := range reply .GetFailedDeals () {
83127 addr , err := address .NewFromString (dealConfig .GetMiner ())
84128 if err != nil {
85- return nil , nil , err
129+ return nil , nil , fmt . Errorf ( "decoding address: %v" , err )
86130 }
87131 failedDeals [i ] = deals.StorageDealConfig {
88132 Miner : addr .String (),
@@ -102,7 +146,7 @@ func (d *Deals) Watch(ctx context.Context, proposals []cid.Cid) (<-chan WatchEve
102146 }
103147 stream , err := d .client .Watch (ctx , & rpc.WatchRequest {Proposals : proposalStrings })
104148 if err != nil {
105- return nil , err
149+ return nil , fmt . Errorf ( "calling Watch: %v" , err )
106150 }
107151 go func () {
108152 defer close (channel )
@@ -111,21 +155,21 @@ func (d *Deals) Watch(ctx context.Context, proposals []cid.Cid) (<-chan WatchEve
111155 if err != nil {
112156 stat := status .Convert (err )
113157 if stat == nil || (stat .Code () != codes .Canceled ) {
114- channel <- WatchEvent {Err : err }
158+ channel <- WatchEvent {Err : fmt . Errorf ( "reveiving stream: %v" , err ) }
115159 }
116160 break
117161 }
118162 proposalCid , err := cid .Decode (event .GetDealInfo ().GetProposalCid ())
119163 if err != nil {
120- channel <- WatchEvent {Err : err }
164+ channel <- WatchEvent {Err : fmt . Errorf ( "decoding cid: %v" , err ) }
121165 break
122166 }
123167 cid , err := cid .Decode (event .GetDealInfo ().GetPieceCid ())
124168 if err != nil {
125- channel <- WatchEvent {Err : err }
169+ channel <- WatchEvent {Err : fmt . Errorf ( "decoding cid: %v" , err ) }
126170 break
127171 }
128- deal := deals.DealInfo {
172+ deal := deals.StorageDealInfo {
129173 ProposalCid : proposalCid ,
130174 StateID : event .GetDealInfo ().GetStateId (),
131175 StateName : event .GetDealInfo ().GetStateName (),
@@ -153,7 +197,7 @@ func (d *Deals) Retrieve(ctx context.Context, waddr string, cid cid.Cid) (io.Rea
153197 }
154198 stream , err := d .client .Retrieve (ctx , req )
155199 if err != nil {
156- return nil , err
200+ return nil , fmt . Errorf ( "calling Retrieve: %v" , err )
157201 }
158202
159203 reader , writer := io .Pipe ()
@@ -179,89 +223,49 @@ func (d *Deals) Retrieve(ctx context.Context, waddr string, cid cid.Cid) (io.Rea
179223 return reader , nil
180224}
181225
182- // FinalDealRecords returns a list of all finalized storage deals.
183- // Records are sorted ascending by activation epoch then timestamp .
184- func (d * Deals ) FinalDealRecords (ctx context.Context ) ([]deals.DealRecord , error ) {
185- res , err := d . client . FinalDealRecords ( ctx , & rpc.FinalDealRecordsRequest {})
186- if err != nil {
187- return nil , err
226+ // ListStorageDealRecords returns a list of storage deals
227+ // according to the provided options .
228+ func (d * Deals ) ListStorageDealRecords (ctx context.Context , opts ... ListDealRecordsOption ) ([]deals.StorageDealRecord , error ) {
229+ conf := & rpc.ListDealRecordsConfig {}
230+ for _ , opt := range opts {
231+ opt ( conf )
188232 }
189- ret , err := fromRPCDealRecords (res .Records )
190- if err != nil {
191- return nil , err
192- }
193- return ret , nil
194- }
195-
196- // PendingDealRecords returns a list of all pending storage deals.
197- // Records are sorted ascending by timestamp.
198- func (d * Deals ) PendingDealRecords (ctx context.Context ) ([]deals.DealRecord , error ) {
199- res , err := d .client .PendingDealRecords (ctx , & rpc.PendingDealRecordsRequest {})
233+ res , err := d .client .ListStorageDealRecords (ctx , & rpc.ListStorageDealRecordsRequest {Config : conf })
200234 if err != nil {
201- return nil , err
235+ return nil , fmt . Errorf ( "calling ListStorageDealRecords: %v" , err )
202236 }
203- ret , err := fromRPCDealRecords (res .Records )
237+ ret , err := fromRPCStorageDealRecords (res .Records )
204238 if err != nil {
205- return nil , err
239+ return nil , fmt . Errorf ( "processing response deal records: %v" , err )
206240 }
207241 return ret , nil
208242}
209243
210- // AllDealRecords returns a list of all finalized and pending deals.
211- // Records are sorted ascending by activation epoch, if available, then timestamp .
212- func (d * Deals ) AllDealRecords (ctx context.Context ) ([]deals.DealRecord , error ) {
213- res , err := d . client . AllDealRecords ( ctx , & rpc.AllDealRecordsRequest {})
214- if err != nil {
215- return nil , err
244+ // ListRetrievalDealRecords returns a list of retrieval deals
245+ // according to the provided options .
246+ func (d * Deals ) ListRetrievalDealRecords (ctx context.Context , opts ... ListDealRecordsOption ) ([]deals.RetrievalDealRecord , error ) {
247+ conf := & rpc.ListDealRecordsConfig {}
248+ for _ , opt := range opts {
249+ opt ( conf )
216250 }
217- ret , err := fromRPCDealRecords ( res . Records )
251+ res , err := d . client . ListRetrievalDealRecords ( ctx , & rpc. ListRetrievalDealRecordsRequest { Config : conf } )
218252 if err != nil {
219- return nil , err
253+ return nil , fmt . Errorf ( "calling ListRetrievalDealRecords: %v" , err )
220254 }
221- return ret , nil
222- }
223-
224- // RetrievalRecords returns a list of all retrievals.
225- // Records are sorted ascending by timestamp.
226- func (d * Deals ) RetrievalRecords (ctx context.Context ) ([]deals.RetrievalRecord , error ) {
227- res , err := d .client .RetrievalRecords (ctx , & rpc.RetrievalRecordsRequest {})
255+ ret , err := fromRPCRetrievalDealRecords (res .Records )
228256 if err != nil {
229- return nil , err
230- }
231- var ret []deals.RetrievalRecord
232- for _ , rpcRecord := range res .Records {
233- if rpcRecord .RetrievalInfo == nil {
234- continue
235- }
236- record := deals.RetrievalRecord {
237- Addr : rpcRecord .Addr ,
238- Time : rpcRecord .Time ,
239- }
240- pieceCid , err := cid .Decode (rpcRecord .RetrievalInfo .PieceCid )
241- if err != nil {
242- return nil , err
243- }
244- record .RetrievalInfo = deals.RetrievalInfo {
245- PieceCID : pieceCid ,
246- Size : rpcRecord .RetrievalInfo .Size ,
247- MinPrice : rpcRecord .RetrievalInfo .MinPrice ,
248- PaymentInterval : rpcRecord .RetrievalInfo .PaymentInterval ,
249- PaymentIntervalIncrease : rpcRecord .RetrievalInfo .PaymentIntervalIncrease ,
250- Miner : rpcRecord .RetrievalInfo .Miner ,
251- MinerPeerID : rpcRecord .RetrievalInfo .MinerPeerId ,
252- }
253- ret = append (ret , record )
257+ return nil , fmt .Errorf ("processing response deal records: %v" , err )
254258 }
255259 return ret , nil
256260}
257261
258- func fromRPCDealRecords (records []* rpc.DealRecord ) ([]deals.DealRecord , error ) {
259- var ret []deals.DealRecord
262+ func fromRPCStorageDealRecords (records []* rpc.StorageDealRecord ) ([]deals.StorageDealRecord , error ) {
263+ var ret []deals.StorageDealRecord
260264 for _ , rpcRecord := range records {
261265 if rpcRecord .DealInfo == nil {
262266 continue
263267 }
264- record := deals.DealRecord {
268+ record := deals.StorageDealRecord {
265269 Addr : rpcRecord .Addr ,
266270 Time : rpcRecord .Time ,
267271 Pending : rpcRecord .Pending ,
@@ -274,7 +278,7 @@ func fromRPCDealRecords(records []*rpc.DealRecord) ([]deals.DealRecord, error) {
274278 if err != nil {
275279 return nil , err
276280 }
277- record .DealInfo = deals.DealInfo {
281+ record .DealInfo = deals.StorageDealInfo {
278282 ProposalCid : proposalCid ,
279283 StateID : rpcRecord .DealInfo .StateId ,
280284 StateName : rpcRecord .DealInfo .StateName ,
@@ -292,3 +296,31 @@ func fromRPCDealRecords(records []*rpc.DealRecord) ([]deals.DealRecord, error) {
292296 }
293297 return ret , nil
294298}
299+
300+ func fromRPCRetrievalDealRecords (records []* rpc.RetrievalDealRecord ) ([]deals.RetrievalDealRecord , error ) {
301+ var ret []deals.RetrievalDealRecord
302+ for _ , rpcRecord := range records {
303+ if rpcRecord .DealInfo == nil {
304+ continue
305+ }
306+ record := deals.RetrievalDealRecord {
307+ Addr : rpcRecord .Addr ,
308+ Time : rpcRecord .Time ,
309+ }
310+ pieceCid , err := cid .Decode (rpcRecord .DealInfo .PieceCid )
311+ if err != nil {
312+ return nil , err
313+ }
314+ record .DealInfo = deals.RetrievalDealInfo {
315+ PieceCID : pieceCid ,
316+ Size : rpcRecord .DealInfo .Size ,
317+ MinPrice : rpcRecord .DealInfo .MinPrice ,
318+ PaymentInterval : rpcRecord .DealInfo .PaymentInterval ,
319+ PaymentIntervalIncrease : rpcRecord .DealInfo .PaymentIntervalIncrease ,
320+ Miner : rpcRecord .DealInfo .Miner ,
321+ MinerPeerID : rpcRecord .DealInfo .MinerPeerId ,
322+ }
323+ ret = append (ret , record )
324+ }
325+ return ret , nil
326+ }
0 commit comments