Skip to content

Commit 133e2f7

Browse files
committed
Merge remote-tracking branch 'origin/develop' into feature/sqlchain
2 parents 3f8c8f6 + 48267b8 commit 133e2f7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+1292
-347
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ vendor/**/.gitignore
88
node_*/
99
kayak_test
1010
*.conf
11-
11+
*.db
12+
*.db-shm
13+
*.db-wal
1214

1315
# Binaries for programs and plugins
1416
bin/

blockproducer/db_service.go

Lines changed: 42 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,11 @@ func (s *DBService) CreateDatabase(req *CreateDatabaseRequest, resp *CreateDatab
118118
DatabaseID: dbID,
119119
Peers: peers,
120120
ResourceMeta: req.ResourceMeta,
121+
GenesisBlock: genesisBlock,
121122
}
122123

124+
log.Debugf("generated instance meta: %v", instanceMeta)
125+
123126
if err = s.ServiceMap.Set(instanceMeta); err != nil {
124127
// critical error
125128
// TODO(xq262144): critical error recover
@@ -192,32 +195,42 @@ func (s *DBService) GetDatabase(req *GetDatabaseRequest, resp *GetDatabaseRespon
192195
func (s *DBService) GetNodeDatabases(req *wt.InitService, resp *wt.InitServiceResponse) (err error) {
193196
// fetch from meta
194197
var instances []wt.ServiceInstance
195-
if instances, err = s.ServiceMap.GetDatabases(proto.NodeID(req.GetNodeID().String())); err != nil {
198+
if instances, err = s.ServiceMap.GetDatabases(req.GetNodeID().ToNodeID()); err != nil {
196199
return
197200
}
198201

202+
log.Debugf("current instance for node %v: %v", req.GetNodeID().ToNodeID(), instances)
203+
199204
// send response to client
200205
resp.Instances = instances
201206

202207
return
203208
}
204209

205210
func (s *DBService) generateDatabaseID(reqNodeID *proto.RawNodeID) (dbID proto.DatabaseID, err error) {
206-
nonceCh := make(chan cpuminer.NonceInfo)
207-
quitCh := make(chan struct{})
208-
miner := cpuminer.NewCPUMiner(quitCh)
209-
go miner.ComputeBlockNonce(cpuminer.MiningBlock{
210-
Data: reqNodeID.CloneBytes(),
211-
NonceChan: nonceCh,
212-
Stop: nil,
213-
}, cpuminer.Uint256{}, 4)
214-
215-
defer close(nonceCh)
216-
defer close(quitCh)
217-
218-
for nonce := range nonceCh {
211+
var startNonce cpuminer.Uint256
212+
213+
for {
214+
nonceCh := make(chan cpuminer.NonceInfo)
215+
quitCh := make(chan struct{})
216+
miner := cpuminer.NewCPUMiner(quitCh)
217+
go miner.ComputeBlockNonce(cpuminer.MiningBlock{
218+
Data: reqNodeID.CloneBytes(),
219+
NonceChan: nonceCh,
220+
Stop: nil,
221+
}, startNonce, 4)
222+
223+
nonce := <-nonceCh
224+
close(quitCh)
225+
close(nonceCh)
226+
227+
// set start nonceCh
228+
startNonce = nonce.Nonce
229+
startNonce.Inc()
219230
dbID = proto.DatabaseID(nonce.Hash.String())
220231

232+
log.Debugf("try generated database id %v", dbID)
233+
221234
// check existence
222235
if _, err = s.ServiceMap.Get(dbID); err == ErrNoSuchDatabase {
223236
err = nil
@@ -231,7 +244,7 @@ func (s *DBService) generateDatabaseID(reqNodeID *proto.RawNodeID) (dbID proto.D
231244
func (s *DBService) allocateNodes(lastTerm uint64, dbID proto.DatabaseID, resourceMeta wt.ResourceMeta) (peers *kayak.Peers, err error) {
232245
curRange := int(resourceMeta.Node)
233246
excludeNodes := make(map[proto.NodeID]bool)
234-
allocated := make([]proto.NodeID, 0)
247+
var allocated []proto.NodeID
235248

236249
if resourceMeta.Node <= 0 {
237250
err = ErrDatabaseAllocation
@@ -252,10 +265,17 @@ func (s *DBService) allocateNodes(lastTerm uint64, dbID proto.DatabaseID, resour
252265

253266
// clear previous allocated
254267
allocated = allocated[:0]
268+
rolesFilter := []proto.ServerRole{
269+
proto.Miner,
270+
}
255271

256-
nodes, err = s.Consistent.GetNeighbors(string(dbID), curRange)
272+
if s.includeBPNodesForAllocation {
273+
rolesFilter = append(rolesFilter, proto.Leader, proto.Follower)
274+
}
275+
276+
nodes, err = s.Consistent.GetNeighborsEx(string(dbID), curRange, proto.ServerRoles(rolesFilter))
257277

258-
log.Debugf("found %d neighbour nodes", len(nodes))
278+
log.Debugf("found %d neighbor nodes", len(nodes))
259279

260280
// TODO(xq262144): brute force implementation to be optimized
261281
var nodeIDs []proto.NodeID
@@ -266,7 +286,7 @@ func (s *DBService) allocateNodes(lastTerm uint64, dbID proto.DatabaseID, resour
266286
}
267287
}
268288

269-
log.Debugf("found %d suitable nodes", len(nodeIDs))
289+
log.Debugf("found %d suitable nodes: %v", len(nodeIDs), nodeIDs)
270290

271291
if len(nodeIDs) < int(resourceMeta.Node) {
272292
continue
@@ -341,6 +361,7 @@ func (s *DBService) getMetric(metric metric.MetricMap, keys []string) (value uin
341361
}
342362

343363
func (s *DBService) buildPeers(term uint64, nodes []proto.Node, allocated []proto.NodeID) (peers *kayak.Peers, err error) {
364+
log.Debugf("build peers for allocated nodes with term: %v, allocated nodes: %v", term, allocated)
344365
// get local private key
345366
var pubKey *asymmetric.PublicKey
346367
if pubKey, err = kms.GetLocalPublicKey(); err != nil {
@@ -362,7 +383,9 @@ func (s *DBService) buildPeers(term uint64, nodes []proto.Node, allocated []prot
362383
allocatedNodes := make([]proto.Node, 0, len(allocated))
363384

364385
for _, node := range nodes {
365-
allocatedNodes = append(allocatedNodes, node)
386+
if allocatedMap[node.ID] {
387+
allocatedNodes = append(allocatedNodes, node)
388+
}
366389
}
367390

368391
peers = &kayak.Peers{

blockproducer/db_service_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func TestService(t *testing.T) {
9191
So(err, ShouldNotBeNil)
9292

9393
// trigger metrics, but does not allow block producer to service as miner
94-
metric.NewCollectClient().UploadMetrics(nodeID, nil)
94+
metric.NewCollectClient().UploadMetrics(nodeID)
9595
createDBRes = new(CreateDatabaseResponse)
9696
err = rpc.NewCaller().CallNode(nodeID, DBServiceName+".CreateDatabase", createDBReq, createDBRes)
9797
So(err, ShouldNotBeNil)

blockproducer/helper_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,8 @@ func initNode() (cleanupFunc func(), dht *route.DHTService, metricService *metri
192192
_, testFile, _, _ := runtime.Caller(0)
193193
pubKeyStoreFile := filepath.Join(d, PubKeyStorePath)
194194
os.Remove(pubKeyStoreFile)
195+
clientPubKeyStoreFile := filepath.Join(d, PubKeyStorePath+"_c")
196+
os.Remove(clientPubKeyStoreFile)
195197
dupConfFile := filepath.Join(d, "config.yaml")
196198
confFile := filepath.Join(filepath.Dir(testFile), "../test/node_standalone/config.yaml")
197199
if err = dupConf(confFile, dupConfFile); err != nil {
@@ -203,7 +205,7 @@ func initNode() (cleanupFunc func(), dht *route.DHTService, metricService *metri
203205
log.Debugf("GConf: %#v", conf.GConf)
204206
// reset the once
205207
route.Once = sync.Once{}
206-
route.InitKMS(pubKeyStoreFile + "_c")
208+
route.InitKMS(clientPubKeyStoreFile)
207209

208210
// init dht
209211
dht, err = route.NewDHTService(pubKeyStoreFile, new(consistent.KMSStorage), true)

client/config.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ package client
1919
import (
2020
"net/url"
2121
"time"
22-
23-
"gitlab.com/thunderdb/ThunderDB/proto"
2422
)
2523

2624
const (
@@ -35,7 +33,7 @@ var (
3533

3634
// Config is a configuration parsed from a DSN string.
3735
type Config struct {
38-
DatabaseID proto.DatabaseID
36+
DatabaseID string
3937

4038
Debug bool
4139
PeersUpdateInterval time.Duration
@@ -59,7 +57,7 @@ func (cfg *Config) FormatDSN() string {
5957

6058
u := &url.URL{
6159
Scheme: "thunderdb",
62-
Host: string(cfg.DatabaseID),
60+
Host: cfg.DatabaseID,
6361
}
6462

6563
newQuery := u.Query()
@@ -85,7 +83,7 @@ func ParseDSN(dsn string) (cfg *Config, err error) {
8583
}
8684

8785
cfg = NewConfig()
88-
cfg.DatabaseID = proto.DatabaseID(u.Host)
86+
cfg.DatabaseID = u.Host
8987

9088
urlQuery := u.Query()
9189

client/conn.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func newConn(cfg *Config) (c *conn, err error) {
8484
}
8585

8686
c = &conn{
87-
dbID: cfg.DatabaseID,
87+
dbID: proto.DatabaseID(cfg.DatabaseID),
8888
connectionID: uint64(connID),
8989
nodeID: nodeID,
9090
privKey: privKey,

client/driver.go

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
bp "gitlab.com/thunderdb/ThunderDB/blockproducer"
2525
"gitlab.com/thunderdb/ThunderDB/conf"
2626
"gitlab.com/thunderdb/ThunderDB/crypto/kms"
27-
"gitlab.com/thunderdb/ThunderDB/pow/cpuminer"
2827
"gitlab.com/thunderdb/ThunderDB/proto"
2928
"gitlab.com/thunderdb/ThunderDB/route"
3029
"gitlab.com/thunderdb/ThunderDB/rpc"
@@ -82,7 +81,7 @@ func Create(meta ResourceMeta) (dsn string, err error) {
8281
}
8382

8483
cfg := NewConfig()
85-
cfg.DatabaseID = res.InstanceMeta.DatabaseID
84+
cfg.DatabaseID = string(res.InstanceMeta.DatabaseID)
8685
dsn = cfg.FormatDSN()
8786

8887
return
@@ -96,7 +95,7 @@ func Drop(dsn string) (err error) {
9695
}
9796

9897
req := &bp.DropDatabaseRequest{
99-
DatabaseID: cfg.DatabaseID,
98+
DatabaseID: proto.DatabaseID(cfg.DatabaseID),
10099
}
101100
res := new(bp.DropDatabaseResponse)
102101
err = requestBP(bp.DBServiceName+".DropDatabase", req, res)
@@ -105,15 +104,10 @@ func Drop(dsn string) (err error) {
105104
}
106105

107106
func requestBP(method string, request interface{}, response interface{}) (err error) {
108-
// TODO(xq262144): unify block producer selection and calls
109-
// get bp node
110-
var nonce *cpuminer.Uint256
111-
nonce, err = kms.GetLocalNonce()
112-
var bps []proto.NodeID
113-
bps = route.GetBPs()
114-
115-
// choose bp node by nonce
116-
bpIdx := int(nonce.A % uint64(len(bps)))
107+
var bpNodeID proto.NodeID
108+
if bpNodeID, err = rpc.GetCurrentBP(); err != nil {
109+
return
110+
}
117111

118-
return rpc.NewCaller().CallNode(bps[bpIdx], method, request, response)
112+
return rpc.NewCaller().CallNode(bpNodeID, method, request, response)
119113
}

client/helper_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,8 @@ func initNode() (cleanupFunc func(), server *rpc.Server, err error) {
196196
_, testFile, _, _ := runtime.Caller(0)
197197
pubKeyStoreFile := filepath.Join(d, PubKeyStorePath)
198198
os.Remove(pubKeyStoreFile)
199+
clientPubKeyStoreFile := filepath.Join(d, PubKeyStorePath+"_c")
200+
os.Remove(clientPubKeyStoreFile)
199201
dupConfFile := filepath.Join(d, "config.yaml")
200202
confFile := filepath.Join(filepath.Dir(testFile), "../test/node_standalone/config.yaml")
201203
if err = dupConf(confFile, dupConfFile); err != nil {
@@ -206,7 +208,7 @@ func initNode() (cleanupFunc func(), server *rpc.Server, err error) {
206208
log.Debugf("GConf: %#v", conf.GConf)
207209
// reset the once
208210
route.Once = sync.Once{}
209-
route.InitKMS(pubKeyStoreFile + "_c")
211+
route.InitKMS(clientPubKeyStoreFile)
210212

211213
var dht *route.DHTService
212214

cmd/idminer/main.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"gitlab.com/thunderdb/ThunderDB/crypto/asymmetric"
3131
"gitlab.com/thunderdb/ThunderDB/crypto/kms"
3232
mine "gitlab.com/thunderdb/ThunderDB/pow/cpuminer"
33+
"gitlab.com/thunderdb/ThunderDB/proto"
3334
"gitlab.com/thunderdb/ThunderDB/utils/log"
3435
)
3536

@@ -147,6 +148,9 @@ func runMiner() {
147148
max = newNonce
148149
}
149150
}
151+
152+
// verify result
153+
log.Infof("verify result: %v", kms.IsIDPubNonceValid(&proto.RawNodeID{Hash: max.Hash}, &max.Nonce, publicKey))
150154
log.Infof("nonce: %v", max)
151155
}
152156

cmd/miner/dbms.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ func createRandomBlock(parent hash.Hash, isGenesis bool) (b *ct.Block, err error
201201
Data: pub.Serialize(),
202202
NonceChan: nonceCh,
203203
Stop: nil,
204-
}, cpuminer.Uint256{A: 0, B: 0, C: 0, D: 0}, 4)
204+
}, cpuminer.Uint256{}, 4)
205205
nonce := <-nonceCh
206206
close(quitCh)
207207
close(nonceCh)

0 commit comments

Comments
 (0)