Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions sqlchain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,12 +792,11 @@ func (c *Chain) processBlocks(ctx context.Context) {
}

// Start starts the main process of the sql-chain.
func (c *Chain) Start() (err error) {
func (c *Chain) Start() {
c.rt.goFunc(c.processBlocks)
c.sync()
c.rt.goFunc(c.mainCycle)
c.rt.startService(c)
return
}

// Stop stops the main process of the sql-chain.
Expand Down
5 changes: 1 addition & 4 deletions sqlchain/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,7 @@ func TestMultiChain(t *testing.T) {

// Start all chain instances
for _, v := range chains {
if err = v.chain.Start(); err != nil {
t.Fatalf("error occurred: %v", err)
}

v.chain.Start()
defer func(c *Chain) {
// Stop chain main process before exit
_ = c.Stop()
Expand Down
3 changes: 1 addition & 2 deletions worker/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,8 @@ func NewDatabase(cfg *DBConfig, peers *proto.Peers,
}
if db.chain, err = sqlchain.NewChain(chainCfg); err != nil {
return
} else if err = db.chain.Start(); err != nil {
return
}
db.chain.Start()

// init kayak config
kayakWalPath := filepath.Join(cfg.DataDir, KayakWalFileName)
Expand Down
20 changes: 17 additions & 3 deletions worker/dbms.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,16 +367,30 @@ func (dbms *DBMS) initDatabases(
meta *DBMSMeta, profiles map[proto.DatabaseID]*types.SQLChainProfile) (err error,
) {
currentInstance := make(map[proto.DatabaseID]bool)
wg := &sync.WaitGroup{}
errCh := make(chan error, len(profiles))

for id, profile := range profiles {
currentInstance[id] = true
var instance *types.ServiceInstance
if instance, err = dbms.buildSQLChainServiceInstance(profile); err != nil {
return
}
if err = dbms.Create(instance, false); err != nil {
return
}
wg.Add(1)
go func() {
defer wg.Done()
if err := dbms.Create(instance, false); err != nil {
log.WithFields(log.Fields{
"id": instance.DatabaseID,
}).WithError(err).Error("failed to create database instance")
errCh <- errors.Wrapf(err, "failed to create database %s", instance.DatabaseID)
}
}()
}
wg.Wait()
close(errCh)
for err := range errCh {
return err // omit any other error after this instance
}

// calculate to drop databases
Expand Down