From a5597c93b0666420f5c922961ecf6fe550b65b9e Mon Sep 17 00:00:00 2001 From: Rafal Jeczalik Date: Sun, 5 Nov 2017 11:09:11 +0100 Subject: [PATCH 01/11] kontrol: fix etcd default --- kontrol/etcd.go | 2 +- kontrol/kontrol/main.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/kontrol/etcd.go b/kontrol/etcd.go index 91a1c810..e54d65a1 100644 --- a/kontrol/etcd.go +++ b/kontrol/etcd.go @@ -34,7 +34,7 @@ type Etcd struct { func NewEtcd(machines []string, log kite.Logger) *Etcd { if machines == nil || len(machines) == 0 { - machines = []string{"//127.0.0.1:4001"} + machines = []string{"http://127.0.0.1:2379"} } cfg := etcd.Config{ diff --git a/kontrol/kontrol/main.go b/kontrol/kontrol/main.go index f2e793a5..9aa40675 100644 --- a/kontrol/kontrol/main.go +++ b/kontrol/kontrol/main.go @@ -58,7 +58,7 @@ func main() { if conf.Initial { initialKey(conf, publicKey, privateKey) - os.Exit(0) + return } kiteConf := config.MustGet() From fe7a200c1f196d89813cd4fdb59524c5486de4ea Mon Sep 17 00:00:00 2001 From: Rafal Jeczalik Date: Sun, 5 Nov 2017 12:16:50 +0100 Subject: [PATCH 02/11] kontrol: fix etcd storage support Fixes #207. --- kontrol/etcd.go | 54 ++++++++++++++++------------------------- kontrol/kontrol.go | 9 +++++++ kontrol/kontrol/main.go | 4 +-- 3 files changed, 32 insertions(+), 35 deletions(-) diff --git a/kontrol/etcd.go b/kontrol/etcd.go index e54d65a1..acd35295 100644 --- a/kontrol/etcd.go +++ b/kontrol/etcd.go @@ -79,13 +79,13 @@ func (e *Etcd) Delete(k *protocol.Kite) error { etcdKey := KitesPrefix + k.String() etcdIDKey := KitesPrefix + "/" + k.ID - _, err := e.client.Delete(context.TODO(), etcdKey, &etcd.DeleteOptions{ + _, e1 := e.client.Delete(context.TODO(), etcdKey, &etcd.DeleteOptions{ Recursive: true, }) - _, err = e.client.Delete(context.TODO(), etcdIDKey, &etcd.DeleteOptions{ + _, e2 := e.client.Delete(context.TODO(), etcdIDKey, &etcd.DeleteOptions{ Recursive: true, }) - return err + return nonil(e1, e2) } func (e *Etcd) Clear() error { @@ -95,29 +95,29 @@ func (e *Etcd) Clear() error { return err } -func (e *Etcd) Upsert(k *protocol.Kite, value *kontrolprotocol.RegisterValue) error { - return e.Add(k, value) +func (e *Etcd) Upsert(k *protocol.Kite, v *kontrolprotocol.RegisterValue) error { + return e.Add(k, v) } -func (e *Etcd) Add(k *protocol.Kite, value *kontrolprotocol.RegisterValue) error { +func (e *Etcd) Add(k *protocol.Kite, v *kontrolprotocol.RegisterValue) error { etcdKey := KitesPrefix + k.String() etcdIDKey := KitesPrefix + "/" + k.ID - valueBytes, err := json.Marshal(value) + p, err := json.Marshal(v) if err != nil { return err } - valueString := string(valueBytes) + value := string(p) // Set the kite key. // Example "/koding/production/os/0.0.1/sj/kontainer1.sj.koding.com/1234asdf..." _, err = e.client.Set(context.TODO(), etcdKey, - valueString, + value, &etcd.SetOptions{ TTL: KeyTTL, - PrevExist: etcd.PrevExist, + PrevExist: etcd.PrevIgnore, }, ) if err != nil { @@ -127,52 +127,46 @@ func (e *Etcd) Add(k *protocol.Kite, value *kontrolprotocol.RegisterValue) error // Also store the the kite.Key Id for easy lookup _, err = e.client.Set(context.TODO(), etcdIDKey, - valueString, + value, &etcd.SetOptions{ TTL: KeyTTL, - PrevExist: etcd.PrevExist, + PrevExist: etcd.PrevIgnore, }, ) - if err != nil { - return err - } - return nil + return err } -func (e *Etcd) Update(k *protocol.Kite, value *kontrolprotocol.RegisterValue) error { +func (e *Etcd) Update(k *protocol.Kite, v *kontrolprotocol.RegisterValue) error { etcdKey := KitesPrefix + k.String() etcdIDKey := KitesPrefix + "/" + k.ID - valueBytes, err := json.Marshal(value) + p, err := json.Marshal(v) if err != nil { return err } - valueString := string(valueBytes) + value := string(p) // update the kite key. // Example "/koding/production/os/0.0.1/sj/kontainer1.sj.koding.com/1234asdf..." _, err = e.client.Set(context.TODO(), etcdKey, - valueString, + value, &etcd.SetOptions{ TTL: KeyTTL, PrevExist: etcd.PrevExist, }, ) if err != nil { - err = e.Add(k, value) - if err != nil { - return err - } - return nil + // TODO(rjeczalik): Add only if err == KeyNotFound? + return e.Add(k, v) } // Also update the the kite.Key Id for easy lookup _, err = e.client.Set(context.TODO(), etcdIDKey, - valueString, + value, &etcd.SetOptions{ TTL: KeyTTL, PrevExist: etcd.PrevExist, @@ -191,11 +185,7 @@ func (e *Etcd) Update(k *protocol.Kite, value *kontrolprotocol.RegisterValue) er PrevExist: etcd.PrevExist, }, ) - if err != nil { - return err - } - - return nil + return err } func (e *Etcd) Get(query *protocol.KontrolQuery) (Kites, error) { @@ -242,7 +232,6 @@ func (e *Etcd) Get(query *protocol.KontrolQuery) (Kites, error) { KitesPrefix+"/"+etcdKey, &etcd.GetOptions{ Recursive: true, - Sort: false, }, ) if err != nil { @@ -289,7 +278,6 @@ func (e *Etcd) etcdKey(query *protocol.KontrolQuery) (string, error) { KitesPrefix+"/"+query.ID, &etcd.GetOptions{ Recursive: true, - Sort: false, }, ) if err != nil { diff --git a/kontrol/kontrol.go b/kontrol/kontrol.go index 73afae29..66c8ebdf 100644 --- a/kontrol/kontrol.go +++ b/kontrol/kontrol.go @@ -588,3 +588,12 @@ func (k *Kontrol) generateToken(tok *token) (string, error) { return signed, nil } + +func nonil(err ...error) error { + for _, e := range err { + if e != nil { + return e + } + } + return nil +} diff --git a/kontrol/kontrol/main.go b/kontrol/kontrol/main.go index 9aa40675..a8c07a65 100644 --- a/kontrol/kontrol/main.go +++ b/kontrol/kontrol/main.go @@ -81,8 +81,6 @@ func main() { } switch os.Getenv("KONTROL_STORAGE") { - case "etcd": - k.SetStorage(kontrol.NewEtcd(conf.Machines, k.Kite.Log)) case "postgres": postgresConf := &kontrol.PostgresConfig{ Host: conf.Postgres.Host, @@ -95,6 +93,8 @@ func main() { p := kontrol.NewPostgres(postgresConf, k.Kite.Log) k.SetStorage(p) k.SetKeyPairStorage(p) + case "etcd": + fallthrough default: k.SetStorage(kontrol.NewEtcd(conf.Machines, k.Kite.Log)) } From 827d5a35bb296a7895838656db7fbed8dd1bcd37 Mon Sep 17 00:00:00 2001 From: Rafal Jeczalik Date: Tue, 28 Nov 2017 22:59:55 +0100 Subject: [PATCH 03/11] request: refactor Context --- method_test.go | 23 ++++++++++++----------- request.go | 8 ++++++-- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/method_test.go b/method_test.go index 2decc910..684a4ce3 100644 --- a/method_test.go +++ b/method_test.go @@ -1,6 +1,7 @@ package kite import ( + "context" "errors" "strings" "testing" @@ -172,53 +173,53 @@ func TestMethod_Base(t *testing.T) { k.Config.Port = 10000 k.PreHandleFunc(func(r *Request) (interface{}, error) { - r.Context.Set("pre1", "pre1") + r.Context = context.WithValue(r.Context, "pre1", "pre1") return nil, nil }) k.PreHandleFunc(func(r *Request) (interface{}, error) { - res, _ := r.Context.Get("pre1") + res, _ := r.Context.Value("pre1").(string) if res != "pre1" { t.Errorf("Context response from previous pre handler should be pre1, got: %v", res) } - r.Context.Set("pre2", "pre2") + r.Context = context.WithValue(r.Context, "pre2", "pre2") return nil, nil }) k.HandleFunc("foo", func(r *Request) (interface{}, error) { - res, _ := r.Context.Get("funcPre1") + res, _ := r.Context.Value("funcPre1").(string) if res != "funcPre1" { t.Errorf("Context response from previous pre handler should be funcPre1, got: %v", res) } - r.Context.Set("handle", "handle") + r.Context = context.WithValue(r.Context, "handle", "handle") return "main-response", nil }).PreHandleFunc(func(r *Request) (interface{}, error) { - r.Context.Set("funcPre1", "funcPre1") + r.Context = context.WithValue(r.Context, "funcPre1", "funcPre1") return "funcPre1", nil }).PostHandleFunc(func(r *Request) (interface{}, error) { - res, _ := r.Context.Get("handle") + res, _ := r.Context.Value("handle").(string) if res != "handle" { t.Errorf("Context response from previous pre handler should be handle, got: %v", res) } - r.Context.Set("funcPost1", "funcPost1") + r.Context = context.WithValue(r.Context, "funcPost1", "funcPost1") return "funcPost1", nil }) k.PostHandleFunc(func(r *Request) (interface{}, error) { - res, _ := r.Context.Get("funcPost1") + res, _ := r.Context.Value("funcPost1").(string) if res != "funcPost1" { t.Errorf("Context response from previous pre handler should be funcPost1, got: %v", res) } - r.Context.Set("post1", "post1") + r.Context = context.WithValue(r.Context, "post1", "post1") return "post1", nil }) k.PostHandleFunc(func(r *Request) (interface{}, error) { - res, _ := r.Context.Get("post1") + res, _ := r.Context.Value("post1").(string) if res != "post1" { t.Errorf("Context response from previous pre handler should be post1, got: %v", res) } diff --git a/request.go b/request.go index af1162ac..62428ec4 100644 --- a/request.go +++ b/request.go @@ -1,6 +1,7 @@ package kite import ( + "context" "errors" "fmt" "runtime/debug" @@ -44,7 +45,10 @@ type Request struct { // items added to the Context can be fetched from other handlers in the // chain. This is useful with PreHandle and PostHandle handlers to pass // data between handlers. - Context cache.Cache + // + // The context is canceled when client has disconnected or session + // was prematurely terminated. + Context context.Context } // Response is the type of the object that is returned from request handlers @@ -150,7 +154,7 @@ func (c *Client) newRequest(method string, args *dnode.Partial) (*Request, func( LocalKite: c.LocalKite, Client: c, Auth: options.Auth, - Context: cache.NewMemory(), + Context: context.TODO(), } // Call response callback function, send back our response From 19aa18b47b64b147f080c2ef0859ad09f906dcf4 Mon Sep 17 00:00:00 2001 From: Rafal Jeczalik Date: Wed, 29 Nov 2017 17:47:10 +0100 Subject: [PATCH 04/11] kite: cancel Context when client disconnects --- client.go | 29 ++++++++++++++++++ kite.go | 1 + kite_test.go | 84 ++++++++++++++++++++++++++++++++++++++++++++++------ request.go | 2 +- 4 files changed, 106 insertions(+), 10 deletions(-) diff --git a/client.go b/client.go index 54f439fb..ac53ee55 100644 --- a/client.go +++ b/client.go @@ -1,6 +1,7 @@ package kite import ( + "context" "encoding/json" "errors" "fmt" @@ -126,6 +127,11 @@ type Client struct { session sockjs.Session send chan *message + // ctx and cancel keeps track of session lifetime + ctxMu sync.Mutex + ctx context.Context + cancel func() + // muReconnect protects Reconnect muReconnect sync.Mutex @@ -206,8 +212,13 @@ func (k *Kite) NewClient(remoteURL string) *Client { Concurrent: true, send: make(chan *message), interrupt: make(chan error, 1), + ctx: context.TODO(), + cancel: func() {}, } + c.OnConnect(c.setContext) + c.OnDisconnect(c.closeContext) + k.OnRegister(c.updateAuth) return c @@ -262,6 +273,24 @@ func (c *Client) updateAuth(reg *protocol.RegisterResult) { } } +func (c *Client) setContext() { + c.ctxMu.Lock() + c.ctx, c.cancel = context.WithCancel(context.Background()) + c.ctxMu.Unlock() +} + +func (c *Client) closeContext() { + c.ctxMu.Lock() + c.cancel() + c.ctxMu.Unlock() +} + +func (c *Client) context() context.Context { + c.ctxMu.Lock() + defer c.ctxMu.Unlock() + return c.ctx +} + func (c *Client) authCopy() *Auth { c.authMu.Lock() defer c.authMu.Unlock() diff --git a/kite.go b/kite.go index 63ed57df..8c42eedc 100644 --- a/kite.go +++ b/kite.go @@ -283,6 +283,7 @@ func (k *Kite) sockjsHandler(session sockjs.Session) { go c.sendHub() k.callOnConnectHandlers(c) + c.callOnConnectHandlers() // Run after methods are registered and delegate is set c.readLoop() diff --git a/kite_test.go b/kite_test.go index 0fc90481..b2ffc9b5 100644 --- a/kite_test.go +++ b/kite_test.go @@ -2,10 +2,12 @@ package kite import ( "errors" + "flag" "fmt" "math" "math/rand" "os" + "reflect" "strconv" "sync" "testing" @@ -20,6 +22,8 @@ import ( "github.com/igm/sockjs-go/sockjs" ) +var timeout = flag.Duration("telltime", 4*time.Second, "Timeout for kite calls.") + func init() { rand.Seed(time.Now().Unix() + int64(os.Getpid())) } @@ -32,6 +36,76 @@ func panicRegisterHandler(*protocol.RegisterResult) { panic("this panic should be ignored") } +func transportFromEnv() config.Transport { + env := os.Getenv("KITE_TRANSPORT") + tr, ok := config.Transports[env] + if env != "" && !ok { + panic(fmt.Errorf("transport %q doesn't exists", env)) + } + return tr +} + +func TestContext(t *testing.T) { + flag.Parse() + + ch := make(chan int, 4) // checkpoints, to ensure flor of control + + k := New("server", "0.0.1") + k.Config.DisableAuthentication = true + k.Config.Port = 3333 + k.Config.Transport = transportFromEnv() + k.HandleFunc("longrunning", func(r *Request) (interface{}, error) { + ch <- 2 + + go func() { + <-r.Context.Done() + ch <- 4 + close(ch) + }() + return nil, nil + }) + go k.Run() + <-k.ServerReadyNotify() + defer k.Close() + + c := New("client", "0.0.1").NewClient("http://127.0.0.1:3333/kite") + if err := c.Dial(); err != nil { + t.Fatalf("Dial()=%s", err) + } + + ch <- 1 + + if _, err := c.TellWithTimeout("longrunning", *timeout); err != nil { + t.Fatalf("TellWithTimeout()=%s", err) + } + + ch <- 3 + + c.Close() + + var got []int + timeout := time.After(2 * time.Second) + +collect: + for { + select { + case i, ok := <-ch: + if !ok { + break collect + } + got = append(got, i) + case <-timeout: + t.Fatal("timed out collecting checkpoints") + } + } + + want := []int{1, 2, 3, 4} + + if !reflect.DeepEqual(got, want) { + t.Fatalf("got %v, want %v") + } +} + func TestMultiple(t *testing.T) { testDuration := time.Second * 10 @@ -44,15 +118,7 @@ func TestMultiple(t *testing.T) { // ports are starting from 6000 up to 6000 + kiteNumber port := 6000 - var transport config.Transport - if transportName := os.Getenv("KITE_TRANSPORT"); transportName != "" { - tr, ok := config.Transports[transportName] - if !ok { - t.Fatalf("transport '%s' doesn't exists", transportName) - } - - transport = tr - } + transport := transportFromEnv() for i := 0; i < kiteNumber; i++ { m := New("mathworker"+strconv.Itoa(i), "0.1."+strconv.Itoa(i)) diff --git a/request.go b/request.go index 62428ec4..6f602fee 100644 --- a/request.go +++ b/request.go @@ -154,7 +154,7 @@ func (c *Client) newRequest(method string, args *dnode.Partial) (*Request, func( LocalKite: c.LocalKite, Client: c, Auth: options.Auth, - Context: context.TODO(), + Context: c.context(), } // Call response callback function, send back our response From 0033393361e37ce9bae9740506fed8eb79397604 Mon Sep 17 00:00:00 2001 From: Rafal Jeczalik Date: Wed, 29 Nov 2017 17:59:13 +0100 Subject: [PATCH 05/11] example: fix math kite --- examples/math/math-kite/math.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/examples/math/math-kite/math.go b/examples/math/math-kite/math.go index 493e3d62..66ae8b9c 100644 --- a/examples/math/math-kite/math.go +++ b/examples/math/math-kite/math.go @@ -1,6 +1,7 @@ package main import ( + "context" "flag" "fmt" @@ -20,7 +21,7 @@ func main() { resp := "hello from pre handler!" // let us return an hello to base square method! - r.Context.Set("response", resp) + r.Context = context.WithValue(r.Context, "response", resp) return resp, nil }) @@ -30,7 +31,7 @@ func main() { // Pass the response from the previous square method back to the // client, this is imporant if you use post handler. - return r.Context.Get("response") + return r.Context.Value("response").(string), nil }) // Add our handler method, authentication is disabled for this example. From 6b81f9addb784c54edd874317e654944e7a4eeea Mon Sep 17 00:00:00 2001 From: Rafal Jeczalik Date: Wed, 29 Nov 2017 18:03:09 +0100 Subject: [PATCH 06/11] kite: fix race in TestContext --- kite_test.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/kite_test.go b/kite_test.go index b2ffc9b5..23d5dba7 100644 --- a/kite_test.go +++ b/kite_test.go @@ -48,7 +48,7 @@ func transportFromEnv() config.Transport { func TestContext(t *testing.T) { flag.Parse() - ch := make(chan int, 4) // checkpoints, to ensure flor of control + ch := make(chan int, 4) // checkpoints, to ensure correct control flow k := New("server", "0.0.1") k.Config.DisableAuthentication = true @@ -60,7 +60,6 @@ func TestContext(t *testing.T) { go func() { <-r.Context.Done() ch <- 4 - close(ch) }() return nil, nil }) @@ -84,23 +83,23 @@ func TestContext(t *testing.T) { c.Close() var got []int + want := []int{1, 2, 3, 4} timeout := time.After(2 * time.Second) collect: for { select { - case i, ok := <-ch: - if !ok { + case i := <-ch: + got = append(got, i) + + if len(got) == len(want) { break collect } - got = append(got, i) case <-timeout: t.Fatal("timed out collecting checkpoints") } } - want := []int{1, 2, 3, 4} - if !reflect.DeepEqual(got, want) { t.Fatalf("got %v, want %v") } From 9c1c99884999d5a20dbaa152bbcff89c71ef5aac Mon Sep 17 00:00:00 2001 From: Rafal Jeczalik Date: Thu, 30 Nov 2017 12:33:24 +0100 Subject: [PATCH 07/11] kite: refactor TestContext --- client.go | 2 +- kite_test.go | 7 +------ 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/client.go b/client.go index ac53ee55..4e7c3629 100644 --- a/client.go +++ b/client.go @@ -212,7 +212,7 @@ func (k *Kite) NewClient(remoteURL string) *Client { Concurrent: true, send: make(chan *message), interrupt: make(chan error, 1), - ctx: context.TODO(), + ctx: context.Background(), cancel: func() {}, } diff --git a/kite_test.go b/kite_test.go index 23d5dba7..7bb07547 100644 --- a/kite_test.go +++ b/kite_test.go @@ -86,15 +86,10 @@ func TestContext(t *testing.T) { want := []int{1, 2, 3, 4} timeout := time.After(2 * time.Second) -collect: - for { + for len(got) != len(want) { select { case i := <-ch: got = append(got, i) - - if len(got) == len(want) { - break collect - } case <-timeout: t.Fatal("timed out collecting checkpoints") } From 0821624738f66cb83c91537ac60b9309571adc23 Mon Sep 17 00:00:00 2001 From: Cihangir SAVAS Date: Fri, 5 Jan 2018 17:28:18 +0300 Subject: [PATCH 08/11] dep: init deps with dep tool --- .travis.yml | 6 +- Gopkg.lock | 253 +++++++++++++++++++++++++++++++++++++++++++++++++++ Gopkg.toml | 93 +++++++++++++++++++ Makefile | 4 +- appveyor.yml | 5 +- 5 files changed, 354 insertions(+), 7 deletions(-) create mode 100644 Gopkg.lock create mode 100644 Gopkg.toml diff --git a/.travis.yml b/.travis.yml index e255d111..d8189396 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,10 +1,10 @@ language: go sudo: false go: - - 1.8.3 - - 1.9 + - 1.9.1 install: - - go get -d -v -t ./... + - go get -u -v github.com/golang/dep/cmd/dep + - $GOPATH/bin/dep ensure -vendor-only -v script: - export GOMAXPROCS=$(nproc) - make test diff --git a/Gopkg.lock b/Gopkg.lock new file mode 100644 index 00000000..73871039 --- /dev/null +++ b/Gopkg.lock @@ -0,0 +1,253 @@ +# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. + + +[[projects]] + name = "github.com/BurntSushi/toml" + packages = ["."] + revision = "b26d9c308763d68093482582cea63d69be07a0f0" + version = "v0.3.0" + +[[projects]] + branch = "master" + name = "github.com/armon/go-radix" + packages = ["."] + revision = "1fca145dffbcaa8fe914309b1ec0cfc67500fe61" + +[[projects]] + name = "github.com/bgentry/speakeasy" + packages = ["."] + revision = "4aabc24848ce5fd31929f7d1e4ea74d3709c14cd" + version = "v0.1.0" + +[[projects]] + name = "github.com/cenkalti/backoff" + packages = ["."] + revision = "61153c768f31ee5f130071d08fc82b85208528de" + version = "v1.1.0" + +[[projects]] + name = "github.com/coreos/etcd" + packages = [ + "client", + "pkg/pathutil", + "pkg/srv", + "pkg/types", + "version" + ] + revision = "d3c2acf09011e89d61a8b3ec0f844411e200abb5" + version = "v3.3.0-rc.1" + +[[projects]] + name = "github.com/coreos/go-semver" + packages = ["semver"] + revision = "8ab6407b697782a06568d4b7f1db25550ec2e4c6" + version = "v0.2.0" + +[[projects]] + name = "github.com/dgrijalva/jwt-go" + packages = ["."] + revision = "dbeaa9332f19a944acb5736b4456cfcc02140e29" + version = "v3.1.0" + +[[projects]] + branch = "master" + name = "github.com/fatih/camelcase" + packages = ["."] + revision = "44e46d280b43ec1531bb25252440e34f1b800b65" + +[[projects]] + name = "github.com/fatih/color" + packages = ["."] + revision = "570b54cabe6b8eb0bc2dfce68d964677d63b5260" + version = "v1.5.0" + +[[projects]] + name = "github.com/fatih/structs" + packages = ["."] + revision = "a720dfa8df582c51dee1b36feabb906bde1588bd" + version = "v1.0" + +[[projects]] + name = "github.com/gorilla/context" + packages = ["."] + revision = "1ea25387ff6f684839d82767c1733ff4d4d15d0a" + version = "v1.1" + +[[projects]] + name = "github.com/gorilla/mux" + packages = ["."] + revision = "7f08801859139f86dfafd1c296e2cba9a80d292e" + version = "v1.6.0" + +[[projects]] + name = "github.com/gorilla/websocket" + packages = ["."] + revision = "ea4d1f681babbce9545c9c5f3d5194a789c89f5b" + version = "v1.2.0" + +[[projects]] + branch = "master" + name = "github.com/hashicorp/errwrap" + packages = ["."] + revision = "7554cd9344cec97297fa6649b055a8c98c2a1e55" + +[[projects]] + branch = "master" + name = "github.com/hashicorp/go-multierror" + packages = ["."] + revision = "b7773ae218740a7be65057fc60b366a49b538a44" + +[[projects]] + branch = "master" + name = "github.com/hashicorp/go-version" + packages = ["."] + revision = "4fe82ae3040f80a03d04d2cccb5606a626b8e1ee" + +[[projects]] + name = "github.com/igm/sockjs-go" + packages = ["sockjs"] + revision = "c8a8c6429d10e3b6865960ad8cb43779b8a834ef" + +[[projects]] + branch = "master" + name = "github.com/juju/ratelimit" + packages = ["."] + revision = "59fac5042749a5afb9af70e813da1dd5474f0167" + +[[projects]] + branch = "master" + name = "github.com/koding/cache" + packages = ["."] + revision = "e8a81b0b3f20f895153311abde1062894b5912d6" + +[[projects]] + branch = "master" + name = "github.com/koding/logging" + packages = ["."] + revision = "8b5a689ed69b1c7cd1e3595276fc2a352d7818e0" + +[[projects]] + branch = "master" + name = "github.com/koding/multiconfig" + packages = ["."] + revision = "69c27309b2d751c576b59ea9c3726597c2375da3" + +[[projects]] + branch = "master" + name = "github.com/koding/websocketproxy" + packages = ["."] + revision = "424c31d1c6ac4384ddd4aaa81608310a98c3fc95" + +[[projects]] + branch = "master" + name = "github.com/lann/builder" + packages = ["."] + revision = "f22ce00fd9394014049dad11c244859432bd6820" + +[[projects]] + branch = "master" + name = "github.com/lann/ps" + packages = ["."] + revision = "62de8c46ede02a7675c4c79c84883eb164cb71e3" + +[[projects]] + name = "github.com/lann/squirrel" + packages = ["."] + revision = "a6b93000bd219143c56c16e6cb1c4b91da3f224b" + version = "v1.0" + +[[projects]] + branch = "master" + name = "github.com/lib/pq" + packages = [ + ".", + "oid" + ] + revision = "83612a56d3dd153a94a629cd64925371c9adad78" + +[[projects]] + name = "github.com/mattn/go-colorable" + packages = ["."] + revision = "167de6bfdfba052fa6b2d3664c8f5272e23c9072" + version = "v0.0.9" + +[[projects]] + name = "github.com/mattn/go-isatty" + packages = ["."] + revision = "0360b2af4f38e8d38c7fce2a9f4e702702d73a39" + version = "v0.0.3" + +[[projects]] + branch = "master" + name = "github.com/mitchellh/cli" + packages = ["."] + revision = "33edc47170b5df54d2588696d590c5e20ee583fe" + +[[projects]] + name = "github.com/posener/complete" + packages = [ + ".", + "cmd", + "cmd/install", + "match" + ] + revision = "dc2bc5a81accba8782bebea28628224643a8286a" + version = "v1.1" + +[[projects]] + name = "github.com/satori/go.uuid" + packages = ["."] + revision = "879c5887cd475cd7864858769793b2ceb0d44feb" + version = "v1.1.0" + +[[projects]] + branch = "master" + name = "github.com/ugorji/go" + packages = ["codec"] + revision = "ccfe18359b55b97855cee1d3f74e5efbda4869dc" + +[[projects]] + branch = "master" + name = "golang.org/x/crypto" + packages = ["ssh/terminal"] + revision = "0fcca4842a8d74bfddc2c96a073bd2a4d2a7a2e8" + +[[projects]] + branch = "master" + name = "golang.org/x/net" + packages = ["context"] + revision = "42fe2e1c20de1054d3d30f82cc9fb5b41e2e3767" + +[[projects]] + branch = "master" + name = "golang.org/x/sys" + packages = [ + "unix", + "windows" + ] + revision = "a3f2cbd54cf5dfe3fbaccf76375fdb12f67654c8" + +[[projects]] + branch = "v2" + name = "gopkg.in/mgo.v2" + packages = [ + ".", + "bson", + "internal/json", + "internal/sasl", + "internal/scram" + ] + revision = "3f83fa5005286a7fe593b055f0d7771a7dce4655" + +[[projects]] + branch = "v2" + name = "gopkg.in/yaml.v2" + packages = ["."] + revision = "0e4404da71227dcc02fb1deee803d93e86d08f72" + +[solve-meta] + analyzer-name = "dep" + analyzer-version = 1 + inputs-digest = "afe5e9587bf3c94201eed941f85330d5a9822fc9a4d43a672efe01cae9e7654e" + solver-name = "gps-cdcl" + solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml new file mode 100644 index 00000000..199e7f13 --- /dev/null +++ b/Gopkg.toml @@ -0,0 +1,93 @@ +# Gopkg.toml example +# +# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md +# for detailed Gopkg.toml documentation. +# +# required = ["github.com/user/thing/cmd/thing"] +# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"] +# +# [[constraint]] +# name = "github.com/user/project" +# version = "1.0.0" +# +# [[constraint]] +# name = "github.com/user/project2" +# branch = "dev" +# source = "github.com/myfork/project2" +# +# [[override]] +# name = "github.com/x/y" +# version = "2.4.0" + + +[[constraint]] + name = "github.com/cenkalti/backoff" + version = "1.1.0" + +[[constraint]] + name = "github.com/coreos/etcd" + version = "3.3.0-rc.1" + +[[constraint]] + name = "github.com/dgrijalva/jwt-go" + version = "3.1.0" + +[[constraint]] + name = "github.com/fatih/color" + version = "1.5.0" + +[[constraint]] + name = "github.com/gorilla/mux" + version = "1.6.0" + +[[constraint]] + name = "github.com/gorilla/websocket" + version = "1.2.0" + +[[constraint]] + branch = "master" + name = "github.com/hashicorp/go-version" + +[[constraint]] + name = "github.com/igm/sockjs-go" + revision = "c8a8c6429d10e3b6865960ad8cb43779b8a834ef" + +[[constraint]] + branch = "master" + name = "github.com/juju/ratelimit" + +[[constraint]] + branch = "master" + name = "github.com/koding/cache" + +[[constraint]] + branch = "master" + name = "github.com/koding/logging" + +[[constraint]] + branch = "master" + name = "github.com/koding/multiconfig" + +[[constraint]] + branch = "master" + name = "github.com/koding/websocketproxy" + +[[constraint]] + name = "github.com/lann/squirrel" + version = "1.0.0" + +[[constraint]] + branch = "master" + name = "github.com/lib/pq" + +[[constraint]] + branch = "master" + name = "github.com/mitchellh/cli" + +[[constraint]] + name = "github.com/satori/go.uuid" + version = "1.1.0" + +[[constraint]] + branch = "master" + name = "golang.org/x/crypto" diff --git a/Makefile b/Makefile index c5dfafb4..56c696b6 100644 --- a/Makefile +++ b/Makefile @@ -139,8 +139,8 @@ endif @echo "Creating test key" @`which go` run ./testutil/writekey/main.go - - @echo "$(OK_COLOR)==> Downloading dependencies$(NO_COLOR)" + + @echo "$(OK_COLOR)==> Downloading test dependencies$(NO_COLOR)" @`which go` get -d -v -t ./... @echo "$(OK_COLOR)==> Testing packages $(NO_COLOR)" diff --git a/appveyor.yml b/appveyor.yml index 0f891448..7a39799a 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -7,7 +7,7 @@ clone_folder: c:\projects\src\github.com\koding\kite environment: PATH: c:\projects\bin;%PATH% GOPATH: c:\projects - GOVERSION: 1.8 + GOVERSION: 1.9.1 install: - go version @@ -17,7 +17,8 @@ install: - appveyor DownloadFile https://storage.googleapis.com/golang/go%GOVERSION%.windows-amd64.zip - 7z x go%GOVERSION%.windows-amd64.zip -y -oC:\ > NUL - - go get -v -t ./... + - go get -u -v github.com/golang/dep/cmd/dep + - dep ensure -vendor-only -v build_script: - go build ./... From 58136b5105a74a60f2eb144848d513ed1a7912d9 Mon Sep 17 00:00:00 2001 From: Cihangir SAVAS Date: Wed, 27 Dec 2017 01:26:14 +0300 Subject: [PATCH 09/11] protocol: add webrtc support --- client.go | 10 +++ config/config.go | 7 +- handlers.go | 69 ++++++++++++++++++ kite.go | 7 ++ kitetest/kitetest.go | 4 +- kontrol/handlers_test.go | 46 ++++++++++++ kontrol/helper_test.go | 2 +- kontrol/kontrol_test.go | 2 +- kontrolclient.go | 12 ++++ protocol/webrtc.go | 93 ++++++++++++++++++++++++ protocol/webrtc_test.go | 149 +++++++++++++++++++++++++++++++++++++++ 11 files changed, 395 insertions(+), 6 deletions(-) create mode 100644 kontrol/handlers_test.go create mode 100644 protocol/webrtc.go create mode 100644 protocol/webrtc_test.go diff --git a/client.go b/client.go index 4e7c3629..1c873100 100644 --- a/client.go +++ b/client.go @@ -711,6 +711,16 @@ func (c *Client) Tell(method string, args ...interface{}) (result *dnode.Partial return c.TellWithTimeout(method, 0, args...) } +// SendWebRTCRequest sends requests to kontrol for signalling purposes. +func (c *Client) SendWebRTCRequest(req *protocol.WebRTCSignalMessage) error { + timeout := time.Duration(0) + if c.Config != nil { + timeout = c.Config.Timeout + } + _, err := c.TellWithTimeout(WebRTCHandlerName, timeout, req) + return err +} + // TellWithTimeout does the same thing with Tell() method except it takes an // extra argument that is the timeout for waiting reply from the remote Kite. // If timeout is given 0, the behavior is same as Tell(). diff --git a/config/config.go b/config/config.go index 1c9b8f51..71ad99ca 100644 --- a/config/config.go +++ b/config/config.go @@ -19,8 +19,8 @@ import ( "github.com/igm/sockjs-go/sockjs" ) -// the implementation of New() doesn't have any error to be returned yet it -// returns, so it's totally safe to neglect the error +// CookieJar ignoring err: the implementation of New() doesn't have any error to +// be returned yet it returns, so it's totally safe to neglect the error var CookieJar, _ = cookiejar.New(nil) // Options is passed to kite.New when creating new instance. @@ -110,6 +110,9 @@ type Config struct { KontrolURL string KontrolKey string KontrolUser string + + // UseWebRTC is the flag for Kite's to communicate over WebRTC if possible. + UseWebRTC bool } // DefaultConfig contains the default settings. diff --git a/handlers.go b/handlers.go index 8916eca4..262ab005 100644 --- a/handlers.go +++ b/handlers.go @@ -1,19 +1,85 @@ package kite import ( + "errors" "fmt" "net/http" "net/url" "os" "os/exec" "runtime" + "time" "github.com/gorilla/websocket" + "github.com/koding/cache" + "github.com/koding/kite/protocol" "github.com/koding/kite/sockjsclient" "github.com/koding/kite/systeminfo" "golang.org/x/crypto/ssh/terminal" ) +var ( + errDstNotSet = errors.New("dst not set") + errDstNotRegistered = errors.New("dst not registered") +) + +// WebRTCHandlerName provides the naming scheme for the handler +const WebRTCHandlerName = "kite.handleWebRTC" + +type webRTCHandler struct { + kitesColl cache.Cache +} + +// NewWebRCTHandler creates a new handler for web rtc signalling services. +func NewWebRCTHandler() *webRTCHandler { + return &webRTCHandler{ + kitesColl: cache.NewMemory(), + } +} + +func (w *webRTCHandler) registerSrc(src *Client) { + w.kitesColl.Set(src.ID, src) + src.OnDisconnect(func() { + time.Sleep(time.Second * 2) + id := src.ID + // delete from the collection + w.kitesColl.Delete(id) + }) +} + +func (w *webRTCHandler) getDst(dst string) (*Client, error) { + if dst == "" { + return nil, errDstNotSet + } + + dstKite, err := w.kitesColl.Get(dst) + if err != nil { + return nil, errDstNotRegistered + } + + return dstKite.(*Client), nil +} + +// ServeKite implements Hander interface. +func (w *webRTCHandler) ServeKite(r *Request) (interface{}, error) { + var args protocol.WebRTCSignalMessage + + if err := r.Args.One().Unmarshal(&args); err != nil { + return nil, fmt.Errorf("invalid query: %s", err) + } + + args.Src = r.Client.ID + + w.registerSrc(r.Client) + + dst, err := w.getDst(args.Dst) + if err != nil { + return nil, err + } + + return nil, dst.SendWebRTCRequest(&args) +} + func (k *Kite) addDefaultHandlers() { // Default RPC methods k.HandleFunc("kite.systemInfo", handleSystemInfo) @@ -27,6 +93,9 @@ func (k *Kite) addDefaultHandlers() { if runtime.GOOS == "darwin" { k.HandleFunc("kite.notify", handleNotifyDarwin) } + if k.WebRTCHandler != nil { + k.Handle(WebRTCHandlerName, k.WebRTCHandler) + } } // handleSystemInfo returns info about the system (CPU, memory, disk...). diff --git a/kite.go b/kite.go index 8c42eedc..bcb2e7bb 100644 --- a/kite.go +++ b/kite.go @@ -70,6 +70,9 @@ type Kite struct { // Deprecated: Set Config.XHR field instead. ClientFunc func(*sockjsclient.DialOptions) *http.Client + // WebRTCHandler handles the webrtc responses coming from a signalling server. + WebRTCHandler Handler + // Handlers added with Kite.HandleFunc(). handlers map[string]*Method // method map for exported methods preHandlers []Handler // a list of handlers that are executed before any handler @@ -197,6 +200,10 @@ func NewWithConfig(name, version string, cfg *config.Config) *Kite { muxer: mux.NewRouter(), } + if cfg != nil && cfg.UseWebRTC { + k.WebRTCHandler = NewWebRCTHandler() + } + // All sockjs communication is done through this endpoint.. k.muxer.PathPrefix("/kite").Handler(sockjs.NewHandler("/kite", *cfg.SockJS, k.sockjsHandler)) diff --git a/kitetest/kitetest.go b/kitetest/kitetest.go index 02973d4d..b0e75d94 100644 --- a/kitetest/kitetest.go +++ b/kitetest/kitetest.go @@ -9,9 +9,9 @@ import ( "os/user" "time" - "github.com/dgrijalva/jwt-go" + jwt "github.com/dgrijalva/jwt-go" "github.com/koding/kite/protocol" - "github.com/satori/go.uuid" + uuid "github.com/satori/go.uuid" ) // KeyPair represents PEM encoded RSA key pair. diff --git a/kontrol/handlers_test.go b/kontrol/handlers_test.go new file mode 100644 index 00000000..48f9c64d --- /dev/null +++ b/kontrol/handlers_test.go @@ -0,0 +1,46 @@ +package kontrol + +import ( + "fmt" + "strings" + "testing" + + "github.com/koding/kite" + "github.com/koding/kite/protocol" + "github.com/koding/kite/testkeys" +) + +// createTestKite creates a test kite, caller of this func should close the kite +func createTestKite(name string, conf *Config, t *testing.T) *HelloKite { + k, err := NewHelloKite(name, conf) + if err != nil { + t.Fatalf("error creating %s: %s", name, err) + } + + k.Kite.HandleFunc(kite.WebRTCHandlerName, func(req *kite.Request) (interface{}, error) { + return nil, fmt.Errorf("%s is called", name) + }) + + return k +} + +func TestKontrol_HandleWebRTC(t *testing.T) { + kont, conf := startKontrol(testkeys.PrivateThird, testkeys.PublicThird, 5501) + defer kont.Close() + + hk1 := createTestKite("kite1", conf, t) + defer hk1.Close() + + hk2 := createTestKite("kite2", conf, t) + defer hk2.Close() + + err := hk1.Kite.SendWebRTCRequest(&protocol.WebRTCSignalMessage{Dst: hk2.Kite.Id}) + if err == nil || !strings.Contains(err.Error(), "not registered") { + t.Fatalf("expected kite.errDstNotRegistered, got: %+v", err) + } + + err = hk2.Kite.SendWebRTCRequest(&protocol.WebRTCSignalMessage{Dst: hk1.Kite.Id}) + if !strings.Contains(err.Error(), fmt.Sprintf("%s is called", hk1.Kite.Kite().Name)) { + t.Fatalf("expected hk1 error, got: %+v", err) + } +} diff --git a/kontrol/helper_test.go b/kontrol/helper_test.go index 1b646a6e..eda57828 100644 --- a/kontrol/helper_test.go +++ b/kontrol/helper_test.go @@ -51,7 +51,7 @@ func startKontrol(pem, pub string, port int) (*Kontrol, *Config) { conf.KontrolUser = "testuser" conf.KiteKey = testutil.NewToken("testuser", pem, pub).Raw conf.ReadEnvironmentVariables() - + conf.UseWebRTC = true DefaultPort = port kon := New(conf.Copy(), "1.0.0") // kon.Kite.SetLogLevel(kite.DEBUG) diff --git a/kontrol/kontrol_test.go b/kontrol/kontrol_test.go index 09046e86..ba9cfee7 100644 --- a/kontrol/kontrol_test.go +++ b/kontrol/kontrol_test.go @@ -14,7 +14,7 @@ import ( "testing" "time" - "github.com/dgrijalva/jwt-go" + jwt "github.com/dgrijalva/jwt-go" "github.com/koding/kite" "github.com/koding/kite/kitekey" "github.com/koding/kite/protocol" diff --git a/kontrolclient.go b/kontrolclient.go index 80d49114..186cace3 100644 --- a/kontrolclient.go +++ b/kontrolclient.go @@ -205,6 +205,18 @@ func (k *Kite) GetToken(kite *protocol.Kite) (string, error) { return tkn, nil } +// SendWebRTCRequest sends requests to kontrol for signalling purposes. +func (k *Kite) SendWebRTCRequest(req *protocol.WebRTCSignalMessage) error { + if err := k.SetupKontrolClient(); err != nil { + return err + } + + <-k.kontrol.readyConnected + + _, err := k.kontrol.TellWithTimeout(WebRTCHandlerName, k.Config.Timeout, req) + return err +} + // GetTokenForce is used to obtain a new token for the given kite. // // It always returns a new token and forces a Kontrol to diff --git a/protocol/webrtc.go b/protocol/webrtc.go new file mode 100644 index 00000000..a9ee5690 --- /dev/null +++ b/protocol/webrtc.go @@ -0,0 +1,93 @@ +package protocol + +import ( + "encoding/json" + "errors" + "strings" + "sync" + "unicode/utf8" +) + +var ( + errInvalidChar = errors.New("message contains invalid chars") + errInvalidOp = errors.New("invalid start operation") +) + +// WebRTCSignalMessage represents a signalling message between peers and the singalling server +type WebRTCSignalMessage struct { + Type string `json:"type,omitempty"` + Src string `json:"src,omitempty"` + Dst string `json:"dst,omitempty"` + Payload json.RawMessage `json:"payload,omitempty"` + + parsedPayload *Payload + isParsed bool + mu sync.Mutex +} + +// Payload is the content of `payload` in the json +type Payload struct { + Msg *string `json:"msg,omitempty"` + Sdp *struct { + Type *string `json:"type,omitempty"` + Sdp *string `json:"sdp,omitempty"` + } `json:"sdp,omitempty"` + Type *string `json:"type,omitempty"` + Label *string `json:"label,omitempty"` + ConnectionID *string `json:"connectionId,omitempty"` + Reliable *bool `json:"reliable,omitempty"` + Serialization *string `json:"serialization,omitempty"` + Browser *string `json:"browser,omitempty"` + Candidate *struct { + Candidate *string `json:"candidate,omitempty"` + SdpMid *string `json:"sdpMid,omitempty"` + SdpMLineIndex *int `json:"sdpMLineIndex,omitempty"` + } `json:"candidate,omitempty"` +} + +// ParsePayload parses the payload if it is not parsed previously. This method +// can be called concurrently. +func (w *WebRTCSignalMessage) ParsePayload() (*Payload, error) { + w.mu.Lock() + defer w.mu.Unlock() + + if w.isParsed { + return w.parsedPayload, nil + } + + payload := &Payload{} + if err := json.Unmarshal(w.Payload, payload); err != nil { + return nil, err + } + + w.parsedPayload = payload + return payload, nil +} + +// ParseWebRTCSignalMessage parses the web rtc command/message +func ParseWebRTCSignalMessage(msg string) (*WebRTCSignalMessage, error) { + // All messages are text (utf-8 encoded at present) + if !utf8.Valid([]byte(msg)) { + return nil, errInvalidChar + } + + w := &WebRTCSignalMessage{} + if err := json.Unmarshal([]byte(msg), w); err != nil { + return nil, err + } + + if err := validateOperation(w.Type); err != nil { + return nil, err + } + + return w, nil +} + +func validateOperation(op string) error { + switch strings.ToUpper(op) { + case "ANSWER", "OFFER", "CANDIDATE", "LEAVE": + return nil + default: + return errInvalidOp + } +} diff --git a/protocol/webrtc_test.go b/protocol/webrtc_test.go new file mode 100644 index 00000000..c7f09dcb --- /dev/null +++ b/protocol/webrtc_test.go @@ -0,0 +1,149 @@ +package protocol + +import ( + "encoding/json" + "testing" +) + +var msgs = [...]string{ + `{"type":"ANSWER","src":"dest-peer-id","dst":"someid","payload":{"sdp":{"type":"answer","sdp":"v=0\r\no=- 8276888538055714041 2 IN IP4 127.0.0.1\r\ns=-\r\nt=0 0\r\na=group:BUNDLE data\r\na=msid-semantic: WMS\r\nm=application 9 DTLS/SCTP 5000\r\nc=IN IP4 0.0.0.0\r\nb=AS:30\r\na=ice-ufrag:g2yO\r\na=ice-pwd:7A7+wwgodBorD3KsLRQf2oNB\r\na=ice-options:trickle\r\na=fingerprint:sha-256 6D:3C:C9:74:8C:41:AC:3F:93:05:C0:98:44:26:D2:F6:15:95:F8:AC:63:14:22:FA:B7:9E:EC:10:1A:BC:76:7E\r\na=setup:active\r\na=mid:data\r\na=sctpmap:5000 webrtc-datachannel 1024\r\n"},"type":"data","connectionId":"dc_d29iaxm9wi","browser":"Chrome"}}`, + `{"type":"OFFER","payload":{"sdp":{"type":"offer","sdp":"v=0\r\no=- 596709752457229267 2 IN IP4 127.0.0.1\r\ns=-\r\nt=0 0\r\na=group:BUNDLE data\r\na=msid-semantic: WMS\r\nm=application 9 DTLS/SCTP 5000\r\nc=IN IP4 0.0.0.0\r\na=ice-ufrag:0pH9\r\na=ice-pwd:KMJGrCkj0RhH+hFZcwrmIR0l\r\na=ice-options:trickle\r\na=fingerprint:sha-256 0E:62:E9:D9:F6:48:05:5D:F4:EC:0A:8E:2C:48:0E:C3:5E:B2:9D:47:4E:F6:9D:6B:D8:B3:E1:30:58:39:35:41\r\na=setup:actpass\r\na=mid:data\r\na=sctpmap:5000 webrtc-datachannel 1024\r\n"},"type":"data","label":"dc_d29iaxm9wi","connectionId":"dc_d29iaxm9wi","reliable":false,"serialization":"binary","browser":"Chrome"},"dst":"dest-peer-id"}`, + `{"type":"CANDIDATE","payload":{"candidate":{"candidate":"candidate:842163049 1 udp 1677729535 195.174.144.24 56801 typ srflx raddr 0.0.0.0 rport 0 generation 0 ufrag 0pH9 network-cost 50","sdpMid":"data","sdpMLineIndex":0},"type":"data","connectionId":"dc_d29iaxm9wi"},"dst":"dest-peer-id"}`, + `{"type":"ID-TAKEN","payload":{"msg":"ID is taken"}}`, +} + +var payloads = [...]string{ + `{"type":"data","connectionId":"dc_d29iaxm9wi","browser":"Chrome"}`, + `{"sdp":{"type":"offer","sdp":"v=0\r\no=- 596709752457229267 2 IN IP4 127.0.0.1\r\ns=-\r\nt=0 0\r\na=group:BUNDLE data\r\na=msid-semantic: WMS\r\nm=application 9 DTLS/SCTP 5000\r\nc=IN IP4 0.0.0.0\r\na=ice-ufrag:0pH9\r\na=ice-pwd:KMJGrCkj0RhH+hFZcwrmIR0l\r\na=ice-options:trickle\r\na=fingerprint:sha-256 0E:62:E9:D9:F6:48:05:5D:F4:EC:0A:8E:2C:48:0E:C3:5E:B2:9D:47:4E:F6:9D:6B:D8:B3:E1:30:58:39:35:41\r\na=setup:actpass\r\na=mid:data\r\na=sctpmap:5000 webrtc-datachannel 1024\r\n"},"type":"data","label":"dc_d29iaxm9wi","connectionId":"dc_d29iaxm9wi","reliable":false,"serialization":"binary","browser":"Chrome"}`, + `{"candidate":{"candidate":"candidate:842163049 1 udp 1677729535 195.174.144.24 56801 typ srflx raddr 0.0.0.0 rport 0 generation 0 ufrag 0pH9 network-cost 50","sdpMid":"data","sdpMLineIndex":0},"type":"data","connectionId":"dc_d29iaxm9wi"}`, + `{"msg":"ID is taken"}`, +} + +func TestParseWebRTCMessage(t *testing.T) { + tests := []struct { + name string + message string + msg *WebRTCSignalMessage + checkPayload bool + wantErr bool + }{ + { + name: "non-utf8 encoded messages", + message: string([]byte{0x00}), + msg: nil, + checkPayload: false, + wantErr: true, + }, + { + name: "utf8 encoded messages", + message: msgs[0], + msg: &WebRTCSignalMessage{ + Type: "ANSWER", + Src: "dest-peer-id", + Dst: "someid", + }, + checkPayload: false, + wantErr: false, + }, + { + name: "invalid message", + message: "naberpamps", + msg: nil, + checkPayload: false, + wantErr: true, + }, + { + name: "parse answer message", + message: msgs[0], + msg: &WebRTCSignalMessage{ + Type: "ANSWER", + Src: "dest-peer-id", + Dst: "someid", + }, + checkPayload: false, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sm, err := ParseWebRTCSignalMessage(tt.message) + if (err != nil) != tt.wantErr { + t.Errorf("ParseWebRTCMessage() error = %v, wantErr %v", err, tt.wantErr) + return + } + + // in case they are both nil + if sm == tt.msg { + return + } + + if !tt.checkPayload { + sm.Payload = nil + tt.msg.Payload = nil + } + + smJSON, err := json.Marshal(sm) + if err != nil { + t.Errorf("json.Marshal(sm) err = %v", err) + } + + msgJSON, err := json.Marshal(tt.msg) + if err != nil { + t.Errorf("json.Marshal(tt.msg) err = %v", err) + } + + if string(smJSON) != string(msgJSON) { + t.Errorf("string(smJSON) != string(msgJSON) smJSON = %v, want %v", string(smJSON), string(msgJSON)) + } + }) + } +} + +func TestWebRTCSignalMessage_ParsePayload(t *testing.T) { + tests := []struct { + name string + payload json.RawMessage + want *Payload + wantErr bool + }{ + { + name: "", + payload: []byte(payloads[0]), + want: &Payload{ + Type: s("data"), + ConnectionID: s("dc_d29iaxm9wi"), + Browser: s("Chrome"), + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + w := &WebRTCSignalMessage{ + Payload: tt.payload, + } + got, err := w.ParsePayload() + if (err != nil) != tt.wantErr { + t.Errorf("WebRTCSignalMessage.ParsePayload() error = %v, wantErr %v", err, tt.wantErr) + return + } + + gotJSON, err := json.Marshal(got) + if err != nil { + t.Errorf("json.Marshal(sm) err = %v", err) + } + + wantJSON, err := json.Marshal(tt.want) + if err != nil { + t.Errorf("json.Marshal(tt.want) err = %v", err) + } + + if string(gotJSON) != string(wantJSON) { + t.Errorf("string(gotJSON) != string(wantJSON) gotJSON = %v, want %v", string(gotJSON), string(wantJSON)) + } + }) + } +} + +func s(str string) *string { return &str } From 7b99f61b8de00e86a303a0d0701d1a8f06f48c6b Mon Sep 17 00:00:00 2001 From: Rafal Jeczalik Date: Sun, 17 Jun 2018 09:51:49 +0200 Subject: [PATCH 10/11] kite: update dep files --- Gopkg.lock | 89 ++++++++++++++++++++---------------------------------- Gopkg.toml | 2 +- 2 files changed, 34 insertions(+), 57 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 73871039..bf51f805 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -27,15 +27,9 @@ [[projects]] name = "github.com/coreos/etcd" - packages = [ - "client", - "pkg/pathutil", - "pkg/srv", - "pkg/types", - "version" - ] - revision = "d3c2acf09011e89d61a8b3ec0f844411e200abb5" - version = "v3.3.0-rc.1" + packages = ["client","pkg/pathutil","pkg/srv","pkg/types","version"] + revision = "33245c6b5b49130ca99280408fadfab01aac0e48" + version = "v3.3.8" [[projects]] name = "github.com/coreos/go-semver" @@ -46,8 +40,8 @@ [[projects]] name = "github.com/dgrijalva/jwt-go" packages = ["."] - revision = "dbeaa9332f19a944acb5736b4456cfcc02140e29" - version = "v3.1.0" + revision = "06ea1031745cb8b3dab3f6a236daf2b0aa468b7e" + version = "v3.2.0" [[projects]] branch = "master" @@ -58,8 +52,8 @@ [[projects]] name = "github.com/fatih/color" packages = ["."] - revision = "570b54cabe6b8eb0bc2dfce68d964677d63b5260" - version = "v1.5.0" + revision = "5b77d2a35fb0ede96d138fc9a99f5c9b6aef11b4" + version = "v1.7.0" [[projects]] name = "github.com/fatih/structs" @@ -70,14 +64,14 @@ [[projects]] name = "github.com/gorilla/context" packages = ["."] - revision = "1ea25387ff6f684839d82767c1733ff4d4d15d0a" - version = "v1.1" + revision = "08b5f424b9271eedf6f9f0ce86cb9396ed337a42" + version = "v1.1.1" [[projects]] name = "github.com/gorilla/mux" packages = ["."] - revision = "7f08801859139f86dfafd1c296e2cba9a80d292e" - version = "v1.6.0" + revision = "e3702bed27f0d39777b0b37b664b6280e8ef8fbf" + version = "v1.6.2" [[projects]] name = "github.com/gorilla/websocket" @@ -101,7 +95,7 @@ branch = "master" name = "github.com/hashicorp/go-version" packages = ["."] - revision = "4fe82ae3040f80a03d04d2cccb5606a626b8e1ee" + revision = "23480c0665776210b5fbbac6eaaee40e3e6a96b7" [[projects]] name = "github.com/igm/sockjs-go" @@ -109,10 +103,10 @@ revision = "c8a8c6429d10e3b6865960ad8cb43779b8a834ef" [[projects]] - branch = "master" name = "github.com/juju/ratelimit" packages = ["."] revision = "59fac5042749a5afb9af70e813da1dd5474f0167" + version = "1.0.1" [[projects]] branch = "master" @@ -136,13 +130,13 @@ branch = "master" name = "github.com/koding/websocketproxy" packages = ["."] - revision = "424c31d1c6ac4384ddd4aaa81608310a98c3fc95" + revision = "944ae4ae170f3afa1e0683bcc3230d34c21e70eb" [[projects]] branch = "master" name = "github.com/lann/builder" packages = ["."] - revision = "f22ce00fd9394014049dad11c244859432bd6820" + revision = "1b87b36280d04fe7882d1512bf038ea2967ad534" [[projects]] branch = "master" @@ -159,11 +153,8 @@ [[projects]] branch = "master" name = "github.com/lib/pq" - packages = [ - ".", - "oid" - ] - revision = "83612a56d3dd153a94a629cd64925371c9adad78" + packages = [".","oid"] + revision = "90697d60dd844d5ef6ff15135d0203f65d2f53b8" [[projects]] name = "github.com/mattn/go-colorable" @@ -181,73 +172,59 @@ branch = "master" name = "github.com/mitchellh/cli" packages = ["."] - revision = "33edc47170b5df54d2588696d590c5e20ee583fe" + revision = "c48282d14eba4b0817ddef3f832ff8d13851aefd" [[projects]] name = "github.com/posener/complete" - packages = [ - ".", - "cmd", - "cmd/install", - "match" - ] - revision = "dc2bc5a81accba8782bebea28628224643a8286a" - version = "v1.1" + packages = [".","cmd","cmd/install","match"] + revision = "98eb9847f27ba2008d380a32c98be474dea55bdf" + version = "v1.1.1" [[projects]] + branch = "master" name = "github.com/satori/go.uuid" packages = ["."] - revision = "879c5887cd475cd7864858769793b2ceb0d44feb" - version = "v1.1.0" + revision = "36e9d2ebbde5e3f13ab2e25625fd453271d6522e" [[projects]] - branch = "master" name = "github.com/ugorji/go" packages = ["codec"] - revision = "ccfe18359b55b97855cee1d3f74e5efbda4869dc" + revision = "b4c50a2b199d93b13dc15e78929cfb23bfdf21ab" + version = "v1.1.1" [[projects]] branch = "master" name = "golang.org/x/crypto" packages = ["ssh/terminal"] - revision = "0fcca4842a8d74bfddc2c96a073bd2a4d2a7a2e8" + revision = "027cca12c2d63e3d62b670d901e8a2c95854feec" [[projects]] branch = "master" name = "golang.org/x/net" packages = ["context"] - revision = "42fe2e1c20de1054d3d30f82cc9fb5b41e2e3767" + revision = "db08ff08e8622530d9ed3a0e8ac279f6d4c02196" [[projects]] branch = "master" name = "golang.org/x/sys" - packages = [ - "unix", - "windows" - ] - revision = "a3f2cbd54cf5dfe3fbaccf76375fdb12f67654c8" + packages = ["unix","windows"] + revision = "6c888cc515d3ed83fc103cf1d84468aad274b0a7" [[projects]] branch = "v2" name = "gopkg.in/mgo.v2" - packages = [ - ".", - "bson", - "internal/json", - "internal/sasl", - "internal/scram" - ] + packages = [".","bson","internal/json","internal/sasl","internal/scram"] revision = "3f83fa5005286a7fe593b055f0d7771a7dce4655" [[projects]] - branch = "v2" name = "gopkg.in/yaml.v2" packages = ["."] - revision = "0e4404da71227dcc02fb1deee803d93e86d08f72" + revision = "5420a8b6744d3b0345ab293f6fcba19c978f1183" + version = "v2.2.1" [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "afe5e9587bf3c94201eed941f85330d5a9822fc9a4d43a672efe01cae9e7654e" + inputs-digest = "c20f9cb7fa5f208c2f79bc997d789636f027f044dfe9f724582763673f7070b4" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index 199e7f13..1f5feae6 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -85,8 +85,8 @@ name = "github.com/mitchellh/cli" [[constraint]] + branch = "master" name = "github.com/satori/go.uuid" - version = "1.1.0" [[constraint]] branch = "master" From b6117938ac1f83b657f6ec3ae3e3efa0c0c77bd8 Mon Sep 17 00:00:00 2001 From: Rafal Jeczalik Date: Sun, 17 Jun 2018 09:52:05 +0200 Subject: [PATCH 11/11] kite: adapt code to latest go.uuid version Closes #218. Closes #219. Closes #220. --- kite.go | 2 +- kite_test.go | 2 +- kitetest/kitetest.go | 2 +- kontrol/bench_test.go | 4 ++-- kontrol/kontrol.go | 27 ++++++++++++++++++++++----- kontrol/kontrol_test.go | 6 +++--- testutil/testutil.go | 2 +- 7 files changed, 31 insertions(+), 14 deletions(-) diff --git a/kite.go b/kite.go index 8c42eedc..61529abb 100644 --- a/kite.go +++ b/kite.go @@ -171,7 +171,7 @@ func NewWithConfig(name, version string, cfg *config.Config) *Kite { panic("kite: version must be 3-digits semantic version") } - kiteID := uuid.NewV4() + kiteID := uuid.Must(uuid.NewV4()) l, setlevel := newLogger(name) diff --git a/kite_test.go b/kite_test.go index 7bb07547..79e1b82c 100644 --- a/kite_test.go +++ b/kite_test.go @@ -96,7 +96,7 @@ func TestContext(t *testing.T) { } if !reflect.DeepEqual(got, want) { - t.Fatalf("got %v, want %v") + t.Fatalf("got %v, want %v", got, want) } } diff --git a/kitetest/kitetest.go b/kitetest/kitetest.go index 02973d4d..8940723e 100644 --- a/kitetest/kitetest.go +++ b/kitetest/kitetest.go @@ -36,7 +36,7 @@ func (k *KiteKey) id() string { if k.ID != "" { return k.ID } - return uuid.NewV4().String() + return uuid.Must(uuid.NewV4()).String() } func (k *KiteKey) issuer() string { diff --git a/kontrol/bench_test.go b/kontrol/bench_test.go index 6da27b6c..6f0c2034 100644 --- a/kontrol/bench_test.go +++ b/kontrol/bench_test.go @@ -12,7 +12,7 @@ func BenchmarkPostgres(b *testing.B) { kon.SetStorage(NewPostgres(nil, kon.Kite.Log)) newKite := func() *protocol.Kite { - id := uuid.NewV4() + id := uuid.Must(uuid.NewV4()) return &protocol.Kite{ Username: "bench-user", Environment: "bench-env", @@ -53,7 +53,7 @@ func BenchmarkEtcdAdd(b *testing.B) { kon.SetStorage(NewEtcd(nil, kon.Kite.Log)) newKite := func() *protocol.Kite { - id := uuid.NewV4() + id := uuid.Must(uuid.NewV4()) return &protocol.Kite{ Username: "bench-user", Environment: "bench-env", diff --git a/kontrol/kontrol.go b/kontrol/kontrol.go index 66c8ebdf..09cda105 100644 --- a/kontrol/kontrol.go +++ b/kontrol/kontrol.go @@ -283,7 +283,10 @@ func (k *Kontrol) AddKeyPair(id, public, private string) error { } if id == "" { - i := uuid.NewV4() + i, err := uuid.NewV4() + if err != nil { + return err + } id = i.String() } @@ -358,12 +361,16 @@ func (k *Kontrol) InitializeSelf() error { } func (k *Kontrol) registerUser(username, publicKey, privateKey string) (kiteKey string, err error) { + id, err := uuid.NewV4() + if err != nil { + return "", err + } claims := &kitekey.KiteClaims{ StandardClaims: jwt.StandardClaims{ Issuer: k.Kite.Kite().Username, Subject: username, IssuedAt: time.Now().Add(-k.tokenLeeway()).UTC().Unix(), - Id: uuid.NewV4().String(), + Id: id.String(), }, KontrolURL: k.Kite.Config.KontrolURL, KontrolKey: strings.TrimSpace(publicKey), @@ -400,12 +407,17 @@ func (k *Kontrol) registerSelf() { // to generate its kitekey or no kitekey is defined, // use a dummy entry in order to register the kontrol. keyPair = &KeyPair{ - ID: uuid.NewV4().String(), Public: "kontrol-self", Private: "kontrol-self", } - if err := k.keyPair.AddKey(keyPair); err != nil { + if id, err := uuid.NewV4(); err == nil { + keyPair.ID = id.String() + + if err := k.keyPair.AddKey(keyPair); err != nil { + k.log.Error("%s", err) + } + } else { k.log.Error("%s", err) } } @@ -562,6 +574,11 @@ func (k *Kontrol) generateToken(tok *token) (string, error) { return "", err } + id, err := uuid.NewV4() + if err != nil { + return "", err + } + now := time.Now().UTC() claims := &kitekey.KiteClaims{ @@ -571,7 +588,7 @@ func (k *Kontrol) generateToken(tok *token) (string, error) { Audience: tok.audience, ExpiresAt: now.Add(k.tokenTTL()).Add(k.tokenLeeway()).UTC().Unix(), IssuedAt: now.Add(-k.tokenLeeway()).UTC().Unix(), - Id: uuid.NewV4().String(), + Id: id.String(), }, } diff --git a/kontrol/kontrol_test.go b/kontrol/kontrol_test.go index 09046e86..e5c9b351 100644 --- a/kontrol/kontrol_test.go +++ b/kontrol/kontrol_test.go @@ -36,7 +36,7 @@ func init() { func TestUpdateKeys(t *testing.T) { if storage := os.Getenv("KONTROL_STORAGE"); storage != "postgres" { - t.Skip("skipping TestUpdateKeys for storage %q: not implemented", storage) + t.Skipf("skipping TestUpdateKeys for storage %q: not implemented", storage) } kon, conf := startKontrol(testkeys.PrivateThird, testkeys.PublicThird, 5501) @@ -661,10 +661,10 @@ func TestGetQueryKey(t *testing.T) { func TestKontrolMultiKey(t *testing.T) { if storage := os.Getenv("KONTROL_STORAGE"); storage != "postgres" { - t.Skip("%q storage does not currently implement soft key pair deletes", storage) + t.Skipf("%q storage does not currently implement soft key pair deletes", storage) } - i := uuid.NewV4() + i := uuid.Must(uuid.NewV4()) secondID := i.String() // add so we can use it as key diff --git a/testutil/testutil.go b/testutil/testutil.go index b4816bd1..1b8b506e 100644 --- a/testutil/testutil.go +++ b/testutil/testutil.go @@ -34,7 +34,7 @@ func NewKiteKeyWithKeyPair(private, public string) *jwt.Token { // NewToken creates new JWT token for the gien username. It embedds the given // public key as kontrolKey and signs the token with the private one. func NewToken(username, private, public string) *jwt.Token { - tknID := uuid.NewV4() + tknID := uuid.Must(uuid.NewV4()) hostname, err := os.Hostname() if err != nil {