loop = $loop; $this->config = $this->createConfig($config); $this->serverInfo = []; $this->state = self::STATE_INIT; $this->transBox = $this->createTransactionBox(); } /** * @override * @inheritDoc */ public function isPaused() { // TODO return false; } /** * @override * @inheritDoc */ public function pause() { // TODO } /** * @override * @inheritDoc */ public function resume() { // TODO } /** * @override * @inheritDoc */ public function isStarted() { return $this->state >= self::STATE_CONNECT_PENDING; } /** * @override * @inheritDoc */ public function start() { if ($this->isStarted()) { return Promise::doResolve($this); } // TODO return Promise::doReject(new ExecutionException('Not yet implemented.')); } /** * @override * @inheritDoc */ public function stop() { if (!$this->isStarted()) { return Promise::doResolve($this); } // TODO return Promise::doReject(new ExecutionException('Not yet implemented.')); } /** * @override * @inheritDoc */ public function getState() { return $this->state; } /** * @override * @inheritDoc */ public function getInfo() { return $this->serverInfo; } /** * @override * @inheritDoc */ public function setDatabase($dbname) { // TODO return $this->query(sprintf('USE `%s`', $dbname)); } /** * @override * @inheritDoc */ public function getDatabase() { // TODO } /** * @override * @inheritDoc */ public function query($sql, $sqlParams = []) { // TODO return Promise::doReject(new ExecutionException('Not yet implemented.')); } /** * @override * @inheritDoc */ public function execute($sql, $sqlParams = []) { // TODO return $this->query($sql, $sqlParams)->then(function($command) { return $command->affectedRows; }); } /** * @override * @inheritDoc */ public function ping() { // TODO return Promise::doReject(new ExecutionException('Not yet implemented.')); } /** * @override * @inheritDoc */ public function beginTransaction() { $trans = new Transaction($this); $trans->on('commit', function(TransactionInterface $trans, array $queue) { $this->commitTransaction($queue)->then( function() use($trans) { return $trans->emit('success', [ $trans ]); }, function($ex) use($trans) { return $trans->emit('error', [ $trans, $ex ]); } ); $this->transBox->remove($trans); }); $trans->on('rollback', function(TransactionInterface $trans) { $this->transBox->remove($trans); }); return $this->transBox->add($trans); } /** * @override * @inheritDoc */ public function endTransaction(TransactionInterface $trans) { return $trans->rollback(); } /** * Try to commit a transaction. * * @param mixed[] $queue * @return PromiseInterface */ protected function commitTransaction($queue) { // TODO return Promise::doReject(new ExecutionException('Not yet implemented.')); } /** * @override * @inheritDoc */ public function inTransaction() { return !$this->transBox->isEmpty(); } /** * Create transaction box. * * @return TransactionBoxInterface */ protected function createTransactionBox() { return new TransactionBox(); } /** * Create configuration file. * * @param mixed[] $config * @return mixed[] */ protected function createConfig($config = []) { $default = [ 'endpoint' => 'tcp://127.0.0.1:5432', 'user' => 'root', 'pass' => '', 'dbname' => '', ]; return array_merge($default, $config); } }