Skip to content

Commit 6760dac

Browse files
author
auxten
committed
route.SetNodeAddrCache and kms.SetNode at kayak 2PC commit; fix server connection leak
1 parent 8cfce94 commit 6760dac

18 files changed

Lines changed: 247 additions & 105 deletions

File tree

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ vendor/**/.gitignore
88
node_*/
99
kayak_test
1010
*.conf
11-
11+
*.db
12+
*.db-shm
13+
*.db-wal
1214

1315
# Binaries for programs and plugins
1416
bin/

cmd/miner/integration_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"time"
2626

2727
"context"
28+
2829
. "github.com/smartystreets/goconvey/convey"
2930
"gitlab.com/thunderdb/ThunderDB/client"
3031
"gitlab.com/thunderdb/ThunderDB/utils"

cmd/thunderdbd/adapter.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ import (
2424

2525
bp "gitlab.com/thunderdb/ThunderDB/blockproducer"
2626
"gitlab.com/thunderdb/ThunderDB/consistent"
27+
"gitlab.com/thunderdb/ThunderDB/crypto/kms"
2728
"gitlab.com/thunderdb/ThunderDB/kayak"
2829
"gitlab.com/thunderdb/ThunderDB/proto"
30+
"gitlab.com/thunderdb/ThunderDB/route"
2931
"gitlab.com/thunderdb/ThunderDB/sqlchain/storage"
3032
"gitlab.com/thunderdb/ThunderDB/twopc"
3133
"gitlab.com/thunderdb/ThunderDB/utils"
@@ -107,11 +109,18 @@ func (s *LocalStorage) Commit(ctx context.Context, wb twopc.WriteBatch) (err err
107109
log.Errorf("compile exec log failed: %s", err)
108110
return
109111
}
110-
112+
err = route.SetNodeAddrCache(nodeToSet.ID.ToRawNodeID(), nodeToSet.Addr)
113+
if err != nil {
114+
log.Errorf("set node addr cache failed: %v", err)
115+
}
116+
err = kms.SetNode(&nodeToSet)
117+
if err != nil {
118+
log.Errorf("kms set node failed: %v", err)
119+
}
111120
err = s.consistent.AddCache(nodeToSet)
112121
if err != nil {
113-
//TODO(auxten) even no error will be returned, there may be some inconsistency need sync periodically
114-
log.Errorf("compile exec log failed: %s", err)
122+
//TODO(auxten) even no error will be returned, there may be some inconsistency and needs sync periodically
123+
log.Errorf("add consistent cache failed: %s", err)
115124
}
116125

117126
return s.Storage.Commit(ctx, execLog)

cmd/thunderdbd/bootstrap.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,8 @@ func runNode(nodeID proto.NodeID, listenAddr string) (err error) {
146146
}
147147

148148
log.Info(conf.StartSucceedMessage)
149-
go periodicPingBlockProducer()
149+
//FIXME(auxten): temporarily comment periodicPingBlockProducer
150+
//go periodicPingBlockProducer()
150151

151152
// start server
152153
server.Serve()

crypto/kms/localkeystore.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@ package kms
1818

1919
import (
2020
"errors"
21+
"runtime"
2122
"sync"
2223

2324
"gitlab.com/thunderdb/ThunderDB/crypto/asymmetric"
2425
"gitlab.com/thunderdb/ThunderDB/crypto/hash"
2526
mine "gitlab.com/thunderdb/ThunderDB/pow/cpuminer"
2627
"gitlab.com/thunderdb/ThunderDB/proto"
28+
"gitlab.com/thunderdb/ThunderDB/utils/log"
2729
)
2830

2931
// LocalKeyStore is the type hold local private & public key
@@ -150,9 +152,9 @@ func GetLocalPrivateKey() (private *asymmetric.PrivateKey, err error) {
150152
}
151153
localKey.RUnlock()
152154

153-
//// log the call stack
154-
//buf := make([]byte, 4096)
155-
//count := runtime.Stack(buf, false)
156-
//log.Debugf("###getting private key from###\n%s\n###getting private key end###\n", buf[:count])
155+
// log the call stack
156+
buf := make([]byte, 4096)
157+
count := runtime.Stack(buf, false)
158+
log.Debugf("###getting private key from###\n%s\n###getting private key end###\n", buf[:count])
157159
return
158160
}

rpc/leak_test.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Copyright 2018 The ThunderDB Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rpc
18+
19+
import (
20+
"os"
21+
"syscall"
22+
"testing"
23+
"time"
24+
25+
"gitlab.com/thunderdb/ThunderDB/conf"
26+
"gitlab.com/thunderdb/ThunderDB/crypto/kms"
27+
"gitlab.com/thunderdb/ThunderDB/proto"
28+
"gitlab.com/thunderdb/ThunderDB/route"
29+
"gitlab.com/thunderdb/ThunderDB/utils"
30+
"gitlab.com/thunderdb/ThunderDB/utils/log"
31+
)
32+
33+
func TestSessionPool_Leak(t *testing.T) {
34+
log.SetLevel(log.DebugLevel)
35+
utils.Build()
36+
var err error
37+
conf.GConf, err = conf.LoadConfig(FJ(testWorkingDir, "./leak/client.yaml"))
38+
if err != nil {
39+
t.Errorf("load config from %s failed: %s", FJ(testWorkingDir, "./leak/client.yaml"), err)
40+
}
41+
log.Debugf("GConf: %##v", conf.GConf)
42+
rootPath := conf.GConf.WorkingRoot
43+
pubKeyStorePath := FJ(rootPath, conf.GConf.PubKeyStoreFile)
44+
privateKeyPath := FJ(rootPath, conf.GConf.PrivateKeyFile)
45+
os.Remove(pubKeyStorePath)
46+
os.Remove(FJ(testWorkingDir, "./leak/leader/dht.db"))
47+
os.Remove(FJ(testWorkingDir, "./leak/leader/dht.db-shm"))
48+
os.Remove(FJ(testWorkingDir, "./leak/leader/dht.db-wal"))
49+
os.Remove(FJ(testWorkingDir, "./leak/leader/kayak.db"))
50+
51+
leader, err := utils.RunCommandNB(
52+
FJ(baseDir, "./bin/thunderdbd"),
53+
[]string{"-config", FJ(testWorkingDir, "./leak/leader.yaml")},
54+
"leak", testWorkingDir, logDir, false,
55+
)
56+
57+
defer func() {
58+
leader.Process.Signal(syscall.SIGKILL)
59+
}()
60+
61+
log.Debugf("leader pid %d", leader.Process.Pid)
62+
time.Sleep(5 * time.Second)
63+
64+
route.InitKMS(pubKeyStorePath)
65+
var masterKey []byte
66+
67+
err = kms.InitLocalKeyPair(privateKeyPath, masterKey)
68+
if err != nil {
69+
t.Errorf("init local key pair failed: %s", err)
70+
return
71+
}
72+
73+
leaderNodeID := kms.BP.NodeID
74+
thisClient, _ := kms.GetNodeInfo(conf.GConf.ThisNodeID)
75+
76+
var reqType string
77+
caller := NewCaller()
78+
79+
reqType = "Ping"
80+
reqPing := &proto.PingReq{
81+
Node: *thisClient,
82+
}
83+
respPing := new(proto.PingResp)
84+
err = caller.CallNode(leaderNodeID, "DHT."+reqType, reqPing, respPing)
85+
log.Debugf("respPing %s: %##v", reqType, respPing)
86+
if err != nil {
87+
t.Error(err)
88+
}
89+
90+
reqType = "Ping"
91+
reqPing = &proto.PingReq{
92+
Node: *thisClient,
93+
}
94+
respPing = new(proto.PingResp)
95+
err = caller.CallNode(leaderNodeID, "DHT."+reqType, reqPing, respPing)
96+
log.Debugf("respPing %s: %##v", reqType, respPing)
97+
if err != nil {
98+
t.Error(err)
99+
}
100+
101+
reqType = "FindNode"
102+
reqFN := &proto.FindNodeReq{
103+
NodeID: thisClient.ID,
104+
}
105+
respFN := new(proto.FindNodeResp)
106+
err = caller.CallNode(leaderNodeID, "DHT."+reqType, reqFN, respFN)
107+
log.Debugf("respFN %s: %##v", reqType, respFN.Node)
108+
if err != nil {
109+
t.Error(err)
110+
}
111+
112+
pool := GetSessionPoolInstance()
113+
sess, _ := pool.getSessionFromPool(leaderNodeID)
114+
log.Debugf("session for %s, %#v", leaderNodeID, sess)
115+
sess.Close()
116+
117+
reqType = "FindNode"
118+
reqFN = &proto.FindNodeReq{
119+
NodeID: thisClient.ID,
120+
}
121+
respFN = new(proto.FindNodeResp)
122+
err = caller.CallNode(leaderNodeID, "DHT."+reqType, reqFN, respFN)
123+
log.Debugf("respFN %s: %##v", reqType, respFN.Node)
124+
if err != nil {
125+
t.Error(err)
126+
}
127+
}

rpc/pool.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,20 +126,26 @@ func (p *SessionPool) Get(id proto.NodeID) (conn net.Conn, err error) {
126126
// first try to get one session from pool
127127
cachedConn, ok := p.getSessionFromPool(id)
128128
if ok {
129-
return cachedConn.Sess.Open()
129+
conn, err = cachedConn.Sess.Open()
130+
if err == nil {
131+
log.Debugf("reusing session to %s", id)
132+
return
133+
}
134+
log.Errorf("open session to %s from pool failed: %v", id, err)
135+
p.Remove(id)
130136
}
131137

132138
log.Debugf("dial new session to %s", id)
133139
// Can't find existing Session, try to dial one
134140
newConn, err := p.nodeDialer(id)
135141
if err != nil {
136-
log.Errorf("dial to new node %s failed: %s", id, err)
142+
log.Errorf("dial new session to node %s failed: %v", id, err)
137143
return
138144
}
139145
newSess, err := toSession(id, newConn)
140146
if err != nil {
141147
newConn.Close()
142-
log.Errorf("dial to new node %s failed: %s", id, err)
148+
log.Errorf("dial new session to node %s failed: %v", id, err)
143149
return
144150
}
145151
sess, loaded := p.LoadOrStore(id, newSess)

rpc/pool_test.go

Lines changed: 0 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,11 @@ import (
2020
"net"
2121
"path/filepath"
2222
"sync"
23-
"syscall"
2423
"testing"
25-
"time"
2624

2725
"github.com/hashicorp/yamux"
2826
. "github.com/smartystreets/goconvey/convey"
29-
"gitlab.com/thunderdb/ThunderDB/conf"
30-
"gitlab.com/thunderdb/ThunderDB/crypto/kms"
3127
"gitlab.com/thunderdb/ThunderDB/proto"
32-
"gitlab.com/thunderdb/ThunderDB/route"
3328
"gitlab.com/thunderdb/ThunderDB/utils"
3429
"gitlab.com/thunderdb/ThunderDB/utils/log"
3530
)
@@ -192,78 +187,3 @@ func TestNewSessionPool(t *testing.T) {
192187
So(GetSessionPoolInstance() == GetSessionPoolInstance(), ShouldBeTrue)
193188
})
194189
}
195-
196-
func TestSessionPool_Leak(t *testing.T) {
197-
log.SetLevel(log.DebugLevel)
198-
//utils.Build()
199-
leader, err := utils.RunCommandNB(
200-
FJ(baseDir, "./bin/thunderdbd"),
201-
[]string{"-config", FJ(testWorkingDir, "./pool/leader.yaml")},
202-
"leader", testWorkingDir, logDir, false,
203-
)
204-
205-
defer func() {
206-
leader.Process.Signal(syscall.SIGKILL)
207-
}()
208-
209-
log.Debugf("leader pid %d", leader.Process.Pid)
210-
time.Sleep(5 * time.Second)
211-
212-
conf.GConf, err = conf.LoadConfig(FJ(testWorkingDir, "./pool/client.yaml"))
213-
if err != nil {
214-
t.Errorf("load config from %s failed: %s", FJ(testWorkingDir, "./pool/client.yaml"), err)
215-
}
216-
rootPath := conf.GConf.WorkingRoot
217-
pubKeyStorePath := filepath.Join(rootPath, conf.GConf.PubKeyStoreFile)
218-
privateKeyPath := filepath.Join(rootPath, conf.GConf.PrivateKeyFile)
219-
220-
route.InitKMS(pubKeyStorePath)
221-
var masterKey []byte
222-
223-
err = kms.InitLocalKeyPair(privateKeyPath, masterKey)
224-
if err != nil {
225-
t.Errorf("init local key pair failed: %s", err)
226-
return
227-
}
228-
229-
leaderNodeID := kms.BP.NodeID
230-
231-
nodePayload := proto.NewNode()
232-
nodePayload.InitNodeCryptoInfo(100 * time.Millisecond)
233-
nodePayload.Addr = "nodePayloadAddr"
234-
235-
var reqType = "Ping"
236-
reqPing := &proto.PingReq{
237-
Node: *nodePayload,
238-
}
239-
respPing := new(proto.PingResp)
240-
caller := NewCaller()
241-
err = caller.CallNode(leaderNodeID, "DHT."+reqType, reqPing, respPing)
242-
log.Debugf("respPing %s: %##v", reqType, respPing)
243-
if err != nil {
244-
t.Error(err)
245-
}
246-
247-
reqType = "FindNode"
248-
reqFN := &proto.FindNodeReq{
249-
NodeID: nodePayload.ID,
250-
}
251-
respFN := new(proto.FindNodeResp)
252-
err = caller.CallNode(leaderNodeID, "DHT."+reqType, reqFN, respFN)
253-
log.Debugf("respFN %s: %##v", reqType, respFN.Node)
254-
if err != nil || respFN.Node.Addr != "nodePayloadAddr" {
255-
t.Error(err)
256-
}
257-
258-
pool := GetSessionPoolInstance()
259-
sess, _ := pool.getSessionFromPool(leaderNodeID)
260-
log.Debugf("session for %s, %#v", leaderNodeID, sess)
261-
//sess.Close()
262-
263-
err = caller.CallNode(leaderNodeID, "DHT."+reqType, reqPing, respPing)
264-
log.Debugf("respPing %s: %##v", reqType, respPing)
265-
if err != nil {
266-
t.Error(err)
267-
}
268-
269-
}

rpc/rpcutil.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func (c *Caller) CallNodeWithContext(
101101
func GetNodeAddr(id *proto.RawNodeID) (addr string, err error) {
102102
addr, err = route.GetNodeAddrCache(id)
103103
if err != nil {
104-
log.Infof("get node \"%s\" addr failed: %s", addr, err)
104+
log.Infof("get node \"%s\" addr failed: %s", id, err)
105105
if err == route.ErrUnknownNodeID {
106106
BPs := route.GetBPs()
107107
if len(BPs) == 0 {

rpc/server.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ serverLoop:
119119

120120
// handleConn do all the work
121121
func (s *Server) handleConn(conn net.Conn) {
122-
//defer conn.Close()
122+
defer conn.Close()
123123

124124
// remote remoteNodeID connection awareness
125125
var remoteNodeID *proto.RawNodeID
@@ -134,6 +134,7 @@ func (s *Server) handleConn(conn net.Conn) {
134134
log.Error(err)
135135
return
136136
}
137+
defer sess.Close()
137138

138139
sessionLoop:
139140
for {
@@ -145,12 +146,10 @@ sessionLoop:
145146
muxConn, err := sess.AcceptStream()
146147
if err != nil {
147148
if err == io.EOF {
148-
log.Info("session connection closed")
149-
break sessionLoop
149+
log.Infof("session %s connection closed", remoteNodeID)
150150
}
151-
log.Errorf("session accept failed: %s", err)
152-
153-
continue
151+
log.Errorf("session %s accept failed: %s", remoteNodeID, err)
152+
break sessionLoop
154153
}
155154
log.Debugf("session accepted %d for %v", muxConn.StreamID(), remoteNodeID)
156155
msgpackCodec := codec.MsgpackSpecRpc.ServerCodec(muxConn, &codec.MsgpackHandle{})
@@ -159,7 +158,7 @@ sessionLoop:
159158
}
160159
}
161160

162-
log.Debugf("Server.handleConn finished for %s", conn.RemoteAddr())
161+
log.Debugf("Server.handleConn finished for %s %s", remoteNodeID, conn.RemoteAddr())
163162
}
164163

165164
// RegisterService with a Service name, used by Client RPC

0 commit comments

Comments
 (0)