diff --git a/DatabasesProviders.php b/DatabasesProviders.php index 376193b..cfd20d5 100644 --- a/DatabasesProviders.php +++ b/DatabasesProviders.php @@ -47,8 +47,6 @@ class DatabasesProviders extends Providers if (empty($databases)) { return; } - $this->provider->on(OnWorkerStart::class, [$this, 'check']); - $this->provider->on(OnTaskerStart::class, [$this, 'check']); $this->provider->on(OnWorkerExit::class, [$this, 'exit'], 9999); foreach ($databases as $key => $database) { $application->set($key, $this->_settings($database)); @@ -85,61 +83,6 @@ class DatabasesProviders extends Providers } - /** - * @param OnTaskerStart|OnWorkerStart $start - * @return void - */ - public function check(OnTaskerStart|OnWorkerStart $start): void - { - Timer::tick(10000, function ($timerId) { - $valid = $count = 0; - $logger = Kiri::getDi()->get(LoggerInterface::class); - $databases = Config::get('databases.connections', []); - if (!empty($databases)) { - [$valid, $count] = $this->each($databases, $logger); - } - $const = 'Worker %d db client has %d, valid %d'; - $logger->alert(sprintf($const, env('environmental_workerId'), $count, $valid)); - if ($this->container->get(WorkerStatus::class)->is(StatusEnum::EXIT)) { - Timer::clear($timerId); - } - }); - } - - - /** - * @param $databases - * @param LoggerInterface $logger - * @return array - */ - public function each($databases, LoggerInterface $logger): array - { - $connection = Kiri::getDi()->get(PoolConnection::class); - $valid = $count = 0; - foreach ($databases as $database) { - try { - [$total, $success] = $connection->check($database['cds']); - - $count += $total; - $valid += $success; - - if (isset($database['slaveConfig']) && isset($database['slaveConfig']['cds'])) { - if ($database['slaveConfig']['cds'] != $database['cds']) { - [$total, $success] = $connection->check($database['slaveConfig']['cds']); - - $count += $total; - $valid += $success; - } - } - } catch (\Throwable $throwable) { - $logger->error($throwable->getMessage()); - } - } - - return [$valid, $count]; - } - - /** * @param $database * @return array diff --git a/Mysql/PDO.php b/Mysql/PDO.php index fd3e9ad..43a98b1 100644 --- a/Mysql/PDO.php +++ b/Mysql/PDO.php @@ -8,6 +8,9 @@ use Kiri\Events\EventProvider; use Kiri\Pool\StopHeartbeatCheck; use Kiri\Server\Events\OnWorkerExit; use PDOStatement; +use Kiri\Server\WorkerStatus; +use Kiri\Server\Abstracts\StatusEnum; +use Swoole\Timer; /** * @@ -15,308 +18,334 @@ use PDOStatement; class PDO implements StopHeartbeatCheck { - const DB_ERROR_MESSAGE = 'The system is busy, please try again later.'; + const DB_ERROR_MESSAGE = 'The system is busy, please try again later.'; - private ?\PDO $pdo = null; + private ?\PDO $pdo = null; - private int $_transaction = 0; + private int $_transaction = 0; - private int $_last = 0; + private int $_last = 0; - public string $dbname; - public string $cds; - public string $username; - public string $password; - public string $charset; - public int $connect_timeout; - public int $read_timeout; + public string $dbname; + public string $cds; + public string $username; + public string $password; + public string $charset; + public int $connect_timeout; + public int $read_timeout; + + private int $_timerId = -1; - public array $attributes = []; + public array $attributes = []; - /** - * @param array $config - */ - public function __construct(array $config) - { - $this->dbname = $config['dbname']; - $this->cds = $config['cds']; - $this->username = $config['username']; - $this->password = $config['password']; - $this->connect_timeout = $config['connect_timeout'] ?? 30; - $this->read_timeout = $config['read_timeout'] ?? 10; - $this->charset = $config['charset'] ?? 'utf8mb4'; - $this->attributes = $config['attributes'] ?? []; - } + /** + * @param array $config + */ + public function __construct(array $config) + { + $this->dbname = $config['dbname']; + $this->cds = $config['cds']; + $this->username = $config['username']; + $this->password = $config['password']; + $this->connect_timeout = $config['connect_timeout'] ?? 30; + $this->read_timeout = $config['read_timeout'] ?? 10; + $this->charset = $config['charset'] ?? 'utf8mb4'; + $this->attributes = $config['attributes'] ?? []; + } - /** - * @return void - * @throws Exception - */ - public function init(): void - { - $eventProvider = Kiri::getDi()->get(EventProvider::class); - $eventProvider->on(OnWorkerExit::class, [$this, 'onWorkerExit']); - } + /** + * @return void + * @throws Exception + */ + public function init(): void + { + $eventProvider = Kiri::getDi()->get(EventProvider::class); + $eventProvider->on(OnWorkerExit::class, [$this, 'onWorkerExit']); + $this->_timerId = Timer::tick(60000, [$this, 'check']); + } - /** - * @return bool - */ - public function inTransaction(): bool - { - return $this->_transaction > 0; - } + /** + * @return bool + */ + public function inTransaction(): bool + { + return $this->_transaction > 0; + } - /** - * @param Kiri\Server\Events\OnWorkerExit $exit - * @return void - */ - public function onWorkerExit(OnWorkerExit $exit): void - { - $this->stopHeartbeatCheck(); - } + /** + * @param Kiri\Server\Events\OnWorkerExit $exit + * @return void + */ + public function onWorkerExit(OnWorkerExit $exit): void + { + $this->stopHeartbeatCheck(); + } - /** - * - */ - public function stopHeartbeatCheck(): void - { - $this->pdo = null; - } + /** + * + */ + public function stopHeartbeatCheck(): void + { + $this->pdo = null; + if ($this->_timerId > -1) { + Timer::clear($this->_timerId); + $this->_timerId = -1; + } + } - /** - * - */ - public function beginTransaction() - { - if ($this->_transaction == 0) { - $this->_pdo()->beginTransaction(); - } - $this->_transaction++; - } + /** + * + */ + public function beginTransaction() + { + if ($this->_transaction == 0) { + $this->_pdo()->beginTransaction(); + } + $this->_transaction++; + } - /** - * - */ - public function commit() - { - $this->_transaction--; - if ($this->_transaction == 0) { - $this->_pdo()->commit(); - } - } + /** + * + */ + public function commit() + { + $this->_transaction--; + if ($this->_transaction == 0) { + $this->_pdo()->commit(); + } + } - /** - * - */ - public function rollback() - { - $this->_transaction--; - if ($this->_transaction == 0) { - $this->_pdo()->rollBack(); - } - } + /** + * + */ + public function rollback() + { + $this->_transaction--; + if ($this->_transaction == 0) { + $this->_pdo()->rollBack(); + } + } - /** - * @param string $sql - * @param array $params - * @return bool|array|null - * @throws Exception - */ - public function fetchAll(string $sql, array $params = []): bool|null|array - { - $pdo = $this->queryPrev($sql, $params); + /** + * @param string $sql + * @param array $params + * @return bool|array|null + * @throws Exception + */ + public function fetchAll(string $sql, array $params = []): bool|null|array + { + $pdo = $this->queryPrev($sql, $params); - $result = $pdo->fetchAll(\PDO::FETCH_ASSOC); - $pdo->closeCursor(); - return $result; - } + $result = $pdo->fetchAll(\PDO::FETCH_ASSOC); + $pdo->closeCursor(); + return $result; + } - /** - * @param string $sql - * @param array $params - * @return bool|array|null - * @throws Exception - */ - public function fetch(string $sql, array $params = []): bool|null|array - { - $pdo = $this->queryPrev($sql, $params); + /** + * @param string $sql + * @param array $params + * @return bool|array|null + * @throws Exception + */ + public function fetch(string $sql, array $params = []): bool|null|array + { + $pdo = $this->queryPrev($sql, $params); - $result = $pdo->fetch(\PDO::FETCH_ASSOC); - $pdo->closeCursor(); - return $result; - } + $result = $pdo->fetch(\PDO::FETCH_ASSOC); + $pdo->closeCursor(); + return $result; + } - /** - * @param string $sql - * @param array $params - * @return bool|array|null - * @throws \Exception - */ - public function fetchColumn(string $sql, array $params = []): bool|null|array - { - $pdo = $this->queryPrev($sql, $params); + /** + * @param string $sql + * @param array $params + * @return bool|array|null + * @throws \Exception + */ + public function fetchColumn(string $sql, array $params = []): bool|null|array + { + $pdo = $this->queryPrev($sql, $params); - $result = $pdo->fetchColumn(\PDO::FETCH_ASSOC); - $pdo->closeCursor(); - return $result; - } + $result = $pdo->fetchColumn(\PDO::FETCH_ASSOC); + $pdo->closeCursor(); + return $result; + } - /** - * @param string $sql - * @param array $params - * @return int - * @throws Exception - */ - public function count(string $sql, array $params = []): int - { - $pdo = $this->queryPrev($sql, $params); + /** + * @param string $sql + * @param array $params + * @return int + * @throws Exception + */ + public function count(string $sql, array $params = []): int + { + $pdo = $this->queryPrev($sql, $params); - $result = $pdo->rowCount(); - $pdo->closeCursor(); - return $result; - } + $result = $pdo->rowCount(); + $pdo->closeCursor(); + return $result; + } - /** - * @param string $sql - * @param array $params - * @return PDOStatement - * @throws Exception - */ - private function queryPrev(string $sql, array $params = []): PDOStatement - { - $this->_last = time(); - try { - if (($statement = $this->_pdo()->query($sql)) === false) { - throw new Exception($this->_pdo()->errorInfo()[1]); - } - return $this->bindValue($statement, $params); - } catch (\PDOException|\Throwable $throwable) { - if (str_contains($throwable->getMessage(), 'MySQL server has gone away')) { - $this->pdo = null; + /** + * @param string $sql + * @param array $params + * @return PDOStatement + * @throws Exception + */ + private function queryPrev(string $sql, array $params = []): PDOStatement + { + try { + if ($this->_timerId === -1) { + $this->_timerId = Timer::tick(6000, [$this, 'check']); + } + $this->_last = time(); + if (($statement = $this->_pdo()->query($sql)) === false) { + throw new Exception($this->_pdo()->errorInfo()[1]); + } + return $this->bindValue($statement, $params); + } catch (\PDOException|\Throwable $throwable) { + if (str_contains($throwable->getMessage(), 'MySQL server has gone away')) { + $this->pdo = null; - return $this->queryPrev($sql, $params); - } - throw new Exception($throwable->getMessage()); - } - } + return $this->queryPrev($sql, $params); + } + throw new Exception($throwable->getMessage()); + } + } - /** - * @return bool - */ - public function check(): bool - { - try { - if ($this->_last == 0) $this->_last = time(); - if (time() - $this->_last >= 600) { - throw new Exception('Idle dis.'); - } - if (!($this->pdo instanceof \PDO)) { - return $result = false; - } - $this->pdo->getAttribute(\PDO::ATTR_SERVER_INFO); - $result = true; - } catch (\Throwable $throwable) { - if (!str_contains($throwable->getMessage(), 'Idle dis')) { - Kiri::getLogger()->error($throwable->getMessage()); - } - $this->pdo = null; - $result = false; - } finally { - return $result; - } - } + /** + * @return bool + */ + public function check(): bool + { + try { + if ($this->_last == 0) $this->_last = time(); + if (time() - $this->_last >= 600) { + return $result = false; + } else if (!($this->pdo instanceof \PDO)) { + return $result = false; + } + $this->pdo->getAttribute(\PDO::ATTR_SERVER_INFO); + $result = true; + } catch (\Throwable $throwable) { + if (!str_contains($throwable->getMessage(), 'Idle dis')) { + Kiri::getLogger()->error($throwable->getMessage()); + } + $result = false; + } finally { + return $this->afterCheck($result); + } + } - /** - * @param PDOStatement $statement - * @param array $params - * @return PDOStatement - */ - private function bindValue(PDOStatement $statement, array $params = []): PDOStatement - { - if (empty($params)) return $statement; - foreach ($params as $key => $param) { - $statement->bindValue($key, $param); - } - return $statement; - } + /** + * @param bool $result + * @return bool + */ + private function afterCheck(bool $result): bool + { + $container = Kiri::getDi()->get(WorkerStatus::class); + if (!$result || $container->is(StatusEnum::EXIT)) { + $this->pdo = null; + $result = Timer::clear($this->_timerId); + $this->_timerId = -1; + } + return $result; + } - /** - * @param string $sql - * @param array $params - * @return int - * @throws Exception - */ - public function execute(string $sql, array $params = []): int - { - $this->_last = time(); - $pdo = $this->_pdo(); - if (!(($prepare = $pdo->prepare($sql)) instanceof PDOStatement)) { - throw new Exception($prepare->errorInfo()[2] ?? static::DB_ERROR_MESSAGE); - } - if ($prepare->execute($params) === false) { - throw new Exception($prepare->errorInfo()[2] ?? static::DB_ERROR_MESSAGE); - } - $result = (int)$pdo->lastInsertId(); - $prepare->closeCursor(); - if ($result == 0) { - return true; - } - return $result; - } + /** + * @param PDOStatement $statement + * @param array $params + * @return PDOStatement + */ + private function bindValue(PDOStatement $statement, array $params = []): PDOStatement + { + if (empty($params)) return $statement; + foreach ($params as $key => $param) { + $statement->bindValue($key, $param); + } + return $statement; + } - /** - * @return \PDO - */ - public function _pdo(): \PDO - { - if (!($this->pdo instanceof \PDO)) { - $this->pdo = $this->newClient(); - } - return $this->pdo; - } + /** + * @param string $sql + * @param array $params + * @return int + * @throws Exception + */ + public function execute(string $sql, array $params = []): int + { + $pdo = $this->_pdo(); + if ($this->_timerId === -1) { + $this->_timerId = Timer::tick(6000, [$this, 'check']); + } + $this->_last = time(); + if (!(($prepare = $pdo->prepare($sql)) instanceof PDOStatement)) { + throw new Exception($prepare->errorInfo()[2] ?? static::DB_ERROR_MESSAGE); + } + if ($prepare->execute($params) === false) { + throw new Exception($prepare->errorInfo()[2] ?? static::DB_ERROR_MESSAGE); + } + + $result = (int)$pdo->lastInsertId(); + $prepare->closeCursor(); + + return $result == 0 ? true : $result; + } - /** - * @return \PDO - */ - private function newClient(): \PDO - { - $link = new \PDO('mysql:dbname=' . $this->dbname . ';host=' . $this->cds, $this->username, $this->password, [ - \PDO::ATTR_EMULATE_PREPARES => false, - \PDO::ATTR_CASE => \PDO::CASE_NATURAL, - \PDO::ATTR_TIMEOUT => $this->connect_timeout, - \PDO::MYSQL_ATTR_USE_BUFFERED_QUERY => true, - \PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES ' . $this->charset - ]); - $link->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION); - $link->setAttribute(\PDO::ATTR_STRINGIFY_FETCHES, false); - $link->setAttribute(\PDO::ATTR_ORACLE_NULLS, \PDO::NULL_EMPTY_STRING); - if (!empty($this->attributes)) { - foreach ($this->attributes as $key => $attribute) { - $link->setAttribute($key, $attribute); - } - } - return $link; - } + /** + * @return \PDO + */ + public function _pdo(): \PDO + { + if (!($this->pdo instanceof \PDO)) { + $this->pdo = $this->newClient(); + } + return $this->pdo; + } + + + /** + * @return \PDO + */ + private function newClient(): \PDO + { + $link = new \PDO('mysql:dbname=' . $this->dbname . ';host=' . $this->cds, $this->username, $this->password, [ + \PDO::ATTR_EMULATE_PREPARES => false, + \PDO::ATTR_CASE => \PDO::CASE_NATURAL, + \PDO::ATTR_TIMEOUT => $this->connect_timeout, + \PDO::MYSQL_ATTR_USE_BUFFERED_QUERY => true, + \PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES ' . $this->charset + ]); + $link->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION); + $link->setAttribute(\PDO::ATTR_STRINGIFY_FETCHES, false); + $link->setAttribute(\PDO::ATTR_ORACLE_NULLS, \PDO::NULL_EMPTY_STRING); + if (!empty($this->attributes)) { + foreach ($this->attributes as $key => $attribute) { + $link->setAttribute($key, $attribute); + } + } + return $link; + } }