From 2cf42e0d1d032dea5da14a797c695e47a82477bb Mon Sep 17 00:00:00 2001 From: leventeliu Date: Mon, 13 May 2019 21:29:32 +0800 Subject: [PATCH 1/4] Parallelize chain synchronization in startup --- sqlchain/chain.go | 2 +- sqlchain/chain_test.go | 5 +---- worker/db.go | 3 +-- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/sqlchain/chain.go b/sqlchain/chain.go index 336f61557..369dcb1c1 100644 --- a/sqlchain/chain.go +++ b/sqlchain/chain.go @@ -792,7 +792,7 @@ func (c *Chain) processBlocks(ctx context.Context) { } // Start starts the main process of the sql-chain. -func (c *Chain) Start() (err error) { +func (c *Chain) Start() { c.rt.goFunc(c.processBlocks) c.sync() c.rt.goFunc(c.mainCycle) diff --git a/sqlchain/chain_test.go b/sqlchain/chain_test.go index 34cda8515..0b217efb4 100644 --- a/sqlchain/chain_test.go +++ b/sqlchain/chain_test.go @@ -275,10 +275,7 @@ func TestMultiChain(t *testing.T) { // Start all chain instances for _, v := range chains { - if err = v.chain.Start(); err != nil { - t.Fatalf("error occurred: %v", err) - } - + v.chain.Start() defer func(c *Chain) { // Stop chain main process before exit _ = c.Stop() diff --git a/worker/db.go b/worker/db.go index 69bff12a2..1d5759802 100644 --- a/worker/db.go +++ b/worker/db.go @@ -175,9 +175,8 @@ func NewDatabase(cfg *DBConfig, peers *proto.Peers, } if db.chain, err = sqlchain.NewChain(chainCfg); err != nil { return - } else if err = db.chain.Start(); err != nil { - return } + go db.chain.Start() // init kayak config kayakWalPath := filepath.Join(cfg.DataDir, KayakWalFileName) From ae04dc5a7adabd9625d150241aed22730d4fe38d Mon Sep 17 00:00:00 2001 From: leventeliu Date: Mon, 13 May 2019 21:43:38 +0800 Subject: [PATCH 2/4] Minor fix --- sqlchain/chain.go | 1 - 1 file changed, 1 deletion(-) diff --git a/sqlchain/chain.go b/sqlchain/chain.go index 369dcb1c1..c6b52a63e 100644 --- a/sqlchain/chain.go +++ b/sqlchain/chain.go @@ -797,7 +797,6 @@ func (c *Chain) Start() { c.sync() c.rt.goFunc(c.mainCycle) c.rt.startService(c) - return } // Stop stops the main process of the sql-chain. From 5c223974e8295ef34599ddfee86520b58189e08c Mon Sep 17 00:00:00 2001 From: leventeliu Date: Mon, 13 May 2019 22:04:10 +0800 Subject: [PATCH 3/4] Parallelize database initialization instead of chain --- worker/db.go | 2 +- worker/dbms.go | 14 +++++++++++--- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/worker/db.go b/worker/db.go index 1d5759802..ed89098cf 100644 --- a/worker/db.go +++ b/worker/db.go @@ -176,7 +176,7 @@ func NewDatabase(cfg *DBConfig, peers *proto.Peers, if db.chain, err = sqlchain.NewChain(chainCfg); err != nil { return } - go db.chain.Start() + db.chain.Start() // init kayak config kayakWalPath := filepath.Join(cfg.DataDir, KayakWalFileName) diff --git a/worker/dbms.go b/worker/dbms.go index 7e9c297ee..f488d2ac3 100644 --- a/worker/dbms.go +++ b/worker/dbms.go @@ -367,6 +367,7 @@ func (dbms *DBMS) initDatabases( meta *DBMSMeta, profiles map[proto.DatabaseID]*types.SQLChainProfile) (err error, ) { currentInstance := make(map[proto.DatabaseID]bool) + wg := &sync.WaitGroup{} for id, profile := range profiles { currentInstance[id] = true @@ -374,10 +375,17 @@ func (dbms *DBMS) initDatabases( if instance, err = dbms.buildSQLChainServiceInstance(profile); err != nil { return } - if err = dbms.Create(instance, false); err != nil { - return - } + wg.Add(1) + go func() { + defer wg.Done() + if err = dbms.Create(instance, false); err != nil { + log.WithFields(log.Fields{ + "id": instance.DatabaseID, + }).WithError(err).Error("failed to create database instance") + } + }() } + wg.Wait() // calculate to drop databases toDropInstance := make(map[proto.DatabaseID]bool) From 89b19a06c66b3469f2329633251613da4e4965e0 Mon Sep 17 00:00:00 2001 From: leventeliu Date: Mon, 13 May 2019 22:15:51 +0800 Subject: [PATCH 4/4] Fix race and return error via channel --- worker/dbms.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/worker/dbms.go b/worker/dbms.go index f488d2ac3..b67df988e 100644 --- a/worker/dbms.go +++ b/worker/dbms.go @@ -368,6 +368,7 @@ func (dbms *DBMS) initDatabases( ) { currentInstance := make(map[proto.DatabaseID]bool) wg := &sync.WaitGroup{} + errCh := make(chan error, len(profiles)) for id, profile := range profiles { currentInstance[id] = true @@ -378,14 +379,19 @@ func (dbms *DBMS) initDatabases( wg.Add(1) go func() { defer wg.Done() - if err = dbms.Create(instance, false); err != nil { + if err := dbms.Create(instance, false); err != nil { log.WithFields(log.Fields{ "id": instance.DatabaseID, }).WithError(err).Error("failed to create database instance") + errCh <- errors.Wrapf(err, "failed to create database %s", instance.DatabaseID) } }() } wg.Wait() + close(errCh) + for err := range errCh { + return err // omit any other error after this instance + } // calculate to drop databases toDropInstance := make(map[proto.DatabaseID]bool)