Skip to content

Commit aaf0d9e

Browse files
authored
Merge pull request #2 from textileio/jsign/dealmodule
deals: module first version
2 parents 9022873 + 83b3a17 commit aaf0d9e

43 files changed

Lines changed: 8658 additions & 1 deletion

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
1-
# filecoin
1+
# filecoin
2+
3+
TODO

client/client.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package client
2+
3+
import (
4+
"net/http"
5+
6+
"github.com/filecoin-project/lotus/api"
7+
"github.com/filecoin-project/lotus/api/client"
8+
)
9+
10+
// New creates a new client to Lotus API
11+
func New(addr string, authToken string) (api.FullNode, func(), error) {
12+
headers := http.Header{
13+
"Authorization": []string{"Bearer " + authToken},
14+
}
15+
client, close, err := client.NewFullNodeRPC("ws://"+addr+"/rpc/v0", headers)
16+
if err != nil {
17+
return nil, nil, err
18+
}
19+
return client, close, nil
20+
}

client/client_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package client
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"testing"
8+
9+
"github.com/textileio/filecoin/tests"
10+
)
11+
12+
var (
13+
authToken = ""
14+
)
15+
16+
func TestMain(m *testing.M) {
17+
var err error
18+
authToken, err = tests.GetLotusToken()
19+
if err != nil {
20+
fmt.Println("couldn't get/generate lotus authtoken")
21+
os.Exit(-1)
22+
}
23+
os.Exit(m.Run())
24+
}
25+
26+
func TestClientVersion(t *testing.T) {
27+
c, cls, err := New(tests.DaemonAddr, authToken)
28+
checkErr(t, err)
29+
defer cls()
30+
if _, err := c.Version(context.Background()); err != nil {
31+
t.Fatalf("error when getting client version: %s", err)
32+
}
33+
}
34+
35+
func checkErr(t *testing.T, err error) {
36+
t.Helper()
37+
if err != nil {
38+
t.Fatal(err)
39+
}
40+
}

deals/askcache.go

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
package deals
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"sort"
7+
"sync"
8+
"time"
9+
10+
"github.com/filecoin-project/lotus/chain/types"
11+
"github.com/ipfs/go-datastore"
12+
)
13+
14+
var (
15+
queryAskRateLim = 25
16+
queryAskTimeout = time.Second * 20
17+
dsStorageAskBase = datastore.NewKey("/deals/storageask")
18+
)
19+
20+
// Query specifies filtering and paging data to retrieve active Asks
21+
type Query struct {
22+
MaxPrice uint64
23+
PieceSize uint64
24+
Limit int
25+
Offset int
26+
}
27+
28+
// StorageAsk has information about an active ask from a storage miner
29+
type StorageAsk struct {
30+
Price uint64
31+
MinPieceSize uint64
32+
Miner string
33+
Timestamp uint64
34+
Expiry uint64
35+
}
36+
37+
// AvailableAsk executes a query to retrieve active Asks
38+
func (d *DealModule) AvailableAsks(q Query) ([]StorageAsk, error) {
39+
d.askCacheLock.RLock()
40+
defer d.askCacheLock.RUnlock()
41+
var res []StorageAsk
42+
offset := q.Offset
43+
for _, sa := range d.askCache {
44+
if q.MaxPrice != 0 && types.BigCmp(sa.Price, types.NewInt(q.MaxPrice)) == 1 {
45+
break
46+
}
47+
if q.PieceSize != 0 && sa.MinPieceSize > q.PieceSize {
48+
continue
49+
}
50+
if offset > 0 {
51+
offset--
52+
continue
53+
}
54+
res = append(res, StorageAsk{
55+
Price: sa.Price.Uint64(),
56+
MinPieceSize: sa.MinPieceSize,
57+
Miner: sa.Miner.String(),
58+
Timestamp: sa.Timestamp,
59+
Expiry: sa.Expiry,
60+
})
61+
if q.Limit != 0 && len(res) == q.Limit {
62+
break
63+
}
64+
}
65+
return res, nil
66+
}
67+
68+
func (d *DealModule) runBackgroundAskCache() {
69+
defer close(d.closed)
70+
if err := d.updateMinerAsks(); err != nil {
71+
log.Errorf("error when updating miners asks: %s", err)
72+
}
73+
for {
74+
select {
75+
case <-d.close:
76+
return
77+
case <-time.After(askRefreshInterval):
78+
log.Debug("refreshing ask cache")
79+
if err := d.updateMinerAsks(); err != nil {
80+
log.Errorf("error when updating miners asks: %s", err)
81+
}
82+
}
83+
}
84+
}
85+
86+
func (d *DealModule) updateMinerAsks() error {
87+
asks, err := takeFreshAskSnapshot(d.api)
88+
if err != nil {
89+
return err
90+
}
91+
92+
sort.Slice(asks, func(i, j int) bool {
93+
return types.BigCmp(asks[i].Price, asks[j].Price) == -1
94+
})
95+
96+
var buf bytes.Buffer
97+
for _, ask := range asks {
98+
buf.Reset()
99+
if err := ask.MarshalCBOR(&buf); err != nil {
100+
log.Errorf("error when marshaling storage ask: %s", err)
101+
return err
102+
}
103+
if err := d.ds.Put(dsStorageAskBase.ChildString(ask.Miner.String()), buf.Bytes()); err != nil {
104+
log.Errorf("error when persiting storage ask: %s", err)
105+
return err
106+
}
107+
}
108+
d.askCacheLock.Lock()
109+
d.askCache = asks
110+
d.askCacheLock.Unlock()
111+
112+
return nil
113+
}
114+
115+
func takeFreshAskSnapshot(api DealerAPI) ([]*types.StorageAsk, error) {
116+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
117+
defer cancel()
118+
rateLim := make(chan struct{}, queryAskRateLim)
119+
addrs, err := api.StateListMiners(ctx, nil)
120+
if err != nil {
121+
return nil, err
122+
}
123+
124+
var wg sync.WaitGroup
125+
askCh := make(chan *types.StorageAsk)
126+
for _, a := range addrs {
127+
a := a
128+
wg.Add(1)
129+
go func() {
130+
rateLim <- struct{}{}
131+
defer wg.Done()
132+
defer func() { <-rateLim }()
133+
ctx, cancel := context.WithTimeout(context.Background(), queryAskTimeout)
134+
defer cancel()
135+
pid, err := api.StateMinerPeerID(ctx, a, nil)
136+
if err != nil {
137+
log.Info("error getting pid of %s: %s", a, err)
138+
return
139+
}
140+
141+
ask, err := api.ClientQueryAsk(ctx, pid, a)
142+
if err != nil {
143+
log.Errorf("error when query asking miner %s: %s", a, err)
144+
return
145+
}
146+
askCh <- ask.Ask
147+
}()
148+
}
149+
go func() {
150+
wg.Wait()
151+
close(askCh)
152+
}()
153+
asks := make([]*types.StorageAsk, 0, len(addrs))
154+
for sa := range askCh {
155+
asks = append(asks, sa)
156+
}
157+
158+
return asks, nil
159+
}

0 commit comments

Comments
 (0)