Skip to content

Commit a883bca

Browse files
authored
Push new Cid Configs & CAR flag deal and retrieval (textileio#220)
* enable race detector Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com> * tune badger opt to see if can handle size problem Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com> * create cid info refreshed from stores Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com> * support hot storage enabling and disabling in both directions Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com> * update ldevnet, update lotus-client, use IsCAR flag on api, progress cold cid config changes Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com> * add CAR supportm in Store and Retrieve in Deals Module Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com> * check for expected CidConfig in every test Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com> * update dependabot deps Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com> * delete Enable in ColdInfo for lowered complexity, use repFactor diff to consider cold config changes Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com> * create tests for cold config repfactor change, bugfixes Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com> * rename blacklist to excluded miners Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com> * use stored data Cid as payloadcid instead of Store() datacid. add a check to error if that isn't the case Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com> * more renaming about blacklist->excludedMiners Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com> * remove Active from Proposals in FilInfo Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com> * consider that deals are *deleted* from the chain when they expire Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com> * lint checks Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com> * consider RepFactor in renew deal logic Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com> * gofmt Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com> * add test for renweal on a decreased repfactor scenario Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
1 parent 9500445 commit a883bca

27 files changed

Lines changed: 741 additions & 262 deletions

File tree

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@ build: build-cli build-server
1414
.PHONY: build
1515

1616
test:
17-
go test -short -p 1 ./...
17+
go test -short -p 1 -race ./...
1818
.PHONY: test

api/client/deals.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,12 +120,17 @@ func (d *Deals) Watch(ctx context.Context, proposals []cid.Cid) (<-chan WatchEve
120120
channel <- WatchEvent{Err: err}
121121
break
122122
}
123+
cid, err := cid.Cast(event.GetDealInfo().GetPieceCID())
124+
if err != nil {
125+
channel <- WatchEvent{Err: err}
126+
break
127+
}
123128
deal := deals.DealInfo{
124129
ProposalCid: proposalCid,
125130
StateID: event.GetDealInfo().GetStateID(),
126131
StateName: event.GetDealInfo().GetStateName(),
127132
Miner: event.GetDealInfo().GetMiner(),
128-
PieceRef: event.GetDealInfo().GetPieceRef(),
133+
PieceCID: cid,
129134
Size: event.GetDealInfo().GetSize(),
130135
PricePerEpoch: event.GetDealInfo().GetPricePerEpoch(),
131136
Duration: event.GetDealInfo().GetDuration(),

api/server/server.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,9 @@ func NewServer(conf Config) (*Server, error) {
134134
return nil, fmt.Errorf("creating repo folder: %s", err)
135135
}
136136

137-
ds, err := badger.NewDatastore(path, &badger.DefaultOptions)
137+
opts := &badger.DefaultOptions
138+
opts.NumVersionsToKeep = 0
139+
ds, err := badger.NewDatastore(path, opts)
138140
if err != nil {
139141
return nil, fmt.Errorf("opening datastore on repo: %s", err)
140142
}

deals/deals.go

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ import (
66
"fmt"
77
"io"
88
"io/ioutil"
9+
"os"
10+
"path/filepath"
11+
"strings"
912
"time"
1013

1114
"github.com/filecoin-project/go-address"
@@ -23,7 +26,13 @@ const (
2326
)
2427

2528
var (
29+
// ErrRetrievalNotAvailableProviders indicates that the data isn't available on any provided
30+
// to be retrieved.
2631
ErrRetrievalNoAvailableProviders = errors.New("no providers to retrieve the data")
32+
// ErrDealNotFound indicates a particular ProposalCid from a deal isn't found on-chain. Currenty,
33+
// in Lotus this indicates that it may never existed on-chain, or it existed but it already expired
34+
// (currEpoch > StartEpoch+Duration).
35+
ErrDealNotFound = errors.New("deal not found on-chain")
2736

2837
log = logging.Logger("deals")
2938
)
@@ -53,7 +62,7 @@ func New(api *apistruct.FullNodeStruct, opts ...Option) (*Module, error) {
5362

5463
// Store creates a proposal deal for data using wallet addr to all miners indicated
5564
// by dealConfigs for duration epochs
56-
func (m *Module) Store(ctx context.Context, waddr string, data io.Reader, dcfgs []StorageDealConfig, dur uint64) (cid.Cid, []StoreResult, error) {
65+
func (m *Module) Store(ctx context.Context, waddr string, data io.Reader, dcfgs []StorageDealConfig, dur uint64, isCAR bool) (cid.Cid, []StoreResult, error) {
5766
f, err := ioutil.TempFile(m.cfg.ImportPath, "import-*")
5867
if err != nil {
5968
return cid.Undef, nil, fmt.Errorf("error when creating tmpfile: %s", err)
@@ -63,7 +72,8 @@ func (m *Module) Store(ctx context.Context, waddr string, data io.Reader, dcfgs
6372
return cid.Undef, nil, fmt.Errorf("error when copying data to tmpfile: %s", err)
6473
}
6574
ref := api.FileRef{
66-
Path: f.Name(),
75+
Path: f.Name(),
76+
IsCAR: isCAR,
6777
}
6878
dataCid, err := m.api.ClientImport(ctx, ref)
6979
if err != nil {
@@ -111,10 +121,10 @@ func (m *Module) Store(ctx context.Context, waddr string, data io.Reader, dcfgs
111121
}
112122

113123
// Retrieve fetches the data stored in filecoin at a particular cid
114-
func (m *Module) Retrieve(ctx context.Context, waddr string, cid cid.Cid) (io.ReadCloser, error) {
115-
f, err := ioutil.TempFile(m.cfg.ImportPath, "retrieve-*")
124+
func (m *Module) Retrieve(ctx context.Context, waddr string, cid cid.Cid, exportCAR bool) (io.ReadCloser, error) {
125+
rf, err := ioutil.TempDir(m.cfg.ImportPath, "retrieve-*")
116126
if err != nil {
117-
return nil, fmt.Errorf("creating tmpfile: %s", err)
127+
return nil, fmt.Errorf("creating temp dir for retrieval: %s", err)
118128
}
119129
addr, err := address.NewFromString(waddr)
120130
if err != nil {
@@ -127,20 +137,44 @@ func (m *Module) Retrieve(ctx context.Context, waddr string, cid cid.Cid) (io.Re
127137
if len(offers) == 0 {
128138
return nil, ErrRetrievalNoAvailableProviders
129139
}
140+
fpath := filepath.Join(rf, "ret")
130141
for _, o := range offers {
131142
log.Debugf("trying to retrieve data from %s", o.Miner)
132143
ref := api.FileRef{
133-
Path: f.Name(),
144+
Path: fpath,
145+
IsCAR: exportCAR,
134146
}
135147
if err = m.api.ClientRetrieve(ctx, o.Order(addr), ref); err != nil {
136148
log.Infof("retrieving cid %s from %s: %s", cid, o.Miner, err)
137149
continue
138150
}
151+
f, err := os.Open(fpath)
152+
if err != nil {
153+
return nil, fmt.Errorf("opening retrieved file: %s", err)
154+
}
139155
return f, nil
140156
}
141157
return nil, fmt.Errorf("couldn't retrieve data from any miners, last miner err: %s", err)
142158
}
143159

160+
// GetDealStatus returns the current status of the deal, and a flag indicating if the miner of the deal was slashed.
161+
// If the deal doesn't exist, *or has expired* it will return ErrDealNotFound. There's not actual way of distinguishing
162+
// both scenarios in Lotus.
163+
func (m *Module) GetDealStatus(ctx context.Context, pcid cid.Cid) (storagemarket.StorageDealStatus, bool, error) {
164+
di, err := m.api.ClientGetDealInfo(ctx, pcid)
165+
if err != nil {
166+
if strings.Contains(err.Error(), "datastore: key not found") {
167+
return storagemarket.StorageDealUnknown, false, ErrDealNotFound
168+
}
169+
return storagemarket.StorageDealUnknown, false, fmt.Errorf("getting deal info: %s", err)
170+
}
171+
md, err := m.api.StateMarketStorageDeal(ctx, di.DealID, types.EmptyTSK)
172+
if err != nil {
173+
return storagemarket.StorageDealUnknown, false, fmt.Errorf("get storage state: %s", err)
174+
}
175+
return di.State, md.State.SlashEpoch != -1, nil
176+
}
177+
144178
// Watch returns a channel with state changes of indicated proposals
145179
func (m *Module) Watch(ctx context.Context, proposals []cid.Cid) (<-chan DealInfo, error) {
146180
if len(proposals) == 0 {
@@ -186,7 +220,7 @@ func pushNewChanges(ctx context.Context, client *apistruct.FullNodeStruct, currS
186220
StateID: dinfo.State,
187221
StateName: storagemarket.DealStates[dinfo.State],
188222
Miner: dinfo.Provider.String(),
189-
PieceRef: dinfo.PieceRef,
223+
PieceCID: dinfo.PieceCID,
190224
Size: dinfo.Size,
191225
PricePerEpoch: dinfo.PricePerEpoch.Uint64(),
192226
Duration: dinfo.Duration,

deals/deals_test.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,14 @@ func TestStore(t *testing.T) {
3838
client, _, _ := tests.CreateLocalDevnet(t, nm)
3939
m, err := New(client, WithImportPath(filepath.Join(tmpDir, "imports")))
4040
checkErr(t, err)
41-
_, err = storeMultiMiner(m, client, nm, randomBytes(1600))
41+
_, err = storeMultiMiner(m, client, nm, randomBytes(600))
4242
checkErr(t, err)
4343
})
4444
}
4545
}
4646
func TestRetrieve(t *testing.T) {
4747
numMiners := []int{1} // go-fil-markets: doesn't support remembering more than 1 miner
48-
data := randomBytes(1600)
48+
data := randomBytes(600)
4949
for _, nm := range numMiners {
5050
t.Run(fmt.Sprintf("CantMiners%d", nm), func(t *testing.T) {
5151
client, addr, _ := tests.CreateLocalDevnet(t, nm)
@@ -57,7 +57,7 @@ func TestRetrieve(t *testing.T) {
5757
ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
5858
defer cancel()
5959

60-
r, err := m.Retrieve(ctx, addr.String(), dcid)
60+
r, err := m.Retrieve(ctx, addr.String(), dcid, false)
6161
checkErr(t, err)
6262
defer r.Close()
6363
rdata, err := ioutil.ReadAll(r)
@@ -90,9 +90,9 @@ func storeMultiMiner(m *Module, client *apistruct.FullNodeStruct, numMiners int,
9090
EpochPrice: 1000000,
9191
}
9292
}
93-
dcid, srs, err := m.Store(ctx, addr.String(), bytes.NewReader(data), cfgs, 1000)
93+
dcid, srs, err := m.Store(ctx, addr.String(), bytes.NewReader(data), cfgs, 1000, false)
9494
if err != nil {
95-
return cid.Undef, fmt.Errorf("error when calling Store()")
95+
return cid.Undef, fmt.Errorf("error when calling Store(): %s", err)
9696
}
9797
if !dcid.Defined() {
9898
return cid.Undef, fmt.Errorf("data cid is undefined")
@@ -108,7 +108,6 @@ func storeMultiMiner(m *Module, client *apistruct.FullNodeStruct, numMiners int,
108108
return cid.Undef, fmt.Errorf("some deal cids are missing, got %d, expected %d", len(srs), len(cfgs))
109109
}
110110
if err := waitForDealComplete(client, pcids); err != nil {
111-
//time.Sleep(time.Hour)
112111
return cid.Undef, fmt.Errorf("error waiting for deal to complete: %s", err)
113112
}
114113
return dcid, nil

deals/pb/deals.pb.go

Lines changed: 47 additions & 45 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

deals/pb/deals.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ message DealInfo {
1717
string stateName = 3;
1818
string miner = 4;
1919

20-
bytes pieceRef = 5;
20+
bytes pieceCID = 5;
2121
uint64 size = 6;
2222

2323
uint64 pricePerEpoch = 7;

deals/service.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func store(ctx context.Context, dealsModule *Module, storeParams *pb.StoreParams
4141
EpochPrice: dealConfig.GetEpochPrice(),
4242
}
4343
}
44-
dcid, sr, err := dealsModule.Store(ctx, storeParams.GetAddress(), r, dealConfigs, storeParams.GetDuration())
44+
dcid, sr, err := dealsModule.Store(ctx, storeParams.GetAddress(), r, dealConfigs, storeParams.GetDuration(), false)
4545
if err != nil {
4646
ch <- storeResult{Err: err}
4747
return
@@ -136,7 +136,7 @@ func (s *Service) Watch(req *pb.WatchRequest, srv pb.API_WatchServer) error {
136136
StateID: update.StateID,
137137
StateName: update.StateName,
138138
Miner: update.Miner,
139-
PieceRef: update.PieceRef,
139+
PieceCID: update.PieceCID.Bytes(),
140140
Size: update.Size,
141141
PricePerEpoch: update.PricePerEpoch,
142142
Duration: update.Duration,
@@ -153,7 +153,7 @@ func (s *Service) Retrieve(req *pb.RetrieveRequest, srv pb.API_RetrieveServer) e
153153
return err
154154
}
155155

156-
reader, err := s.Module.Retrieve(srv.Context(), req.GetAddress(), cid)
156+
reader, err := s.Module.Retrieve(srv.Context(), req.GetAddress(), cid, false)
157157
if err != nil {
158158
return err
159159
}

deals/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ type DealInfo struct {
2525
StateName string
2626
Miner string
2727

28-
PieceRef []byte
28+
PieceCID cid.Cid
2929
Size uint64
3030

3131
PricePerEpoch uint64

docker/docker-compose-embedded.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ services:
3232
- 5001:5001
3333

3434
lotus:
35-
image: textile/lotus-devnet:lotus-testnet3-d41aeb
35+
image: textile/lotus-devnet:sha-9aab2c6
3636
ports:
3737
- 7777:7777
3838
environment:

0 commit comments

Comments
 (0)