diff --git a/Connection.php b/Connection.php index 1dc52d3..4099899 100644 --- a/Connection.php +++ b/Connection.php @@ -26,11 +26,13 @@ use Kiri\Events\EventProvider; use Kiri\Exception\NotFindClassException; use PDO; use Kiri\Error\StdoutLogger; -use PDOStatement; use Psr\Log\LoggerInterface; use ReflectionException; +use Kiri\Server\Events\OnWorkerStart; +use Kiri\Server\Events\OnTaskerStart; use Kiri\Server\Events\OnAfterRequest; use Kiri\Di\Inject\Container; +use Swoole\Timer; /** * Class Connection @@ -39,11 +41,11 @@ use Kiri\Di\Inject\Container; class Connection extends Component { - public string $id = 'db'; - public string $cds = ''; + public string $id = 'db'; + public string $cds = ''; public string $password = ''; public string $username = ''; - public string $charset = 'utf-8'; + public string $charset = 'utf-8'; public string $tablePrefix = ''; @@ -115,6 +117,48 @@ class Connection extends Component $eventProvider->on(Commit::class, [$this, 'commit'], 0); $eventProvider->on(OnAfterRequest::class, [$this, 'clear']); $eventProvider->on(OnWorkerExit::class, [$this, 'disconnect']); + $eventProvider->on(OnWorkerStart::class, [$this, 'tick']); + $eventProvider->on(OnTaskerStart::class, [$this, 'tick']); + } + + + /** + * @return void + */ + public function tick(): void + { + Timer::tick(10000, fn() => $this->checkClientHealth($this->pool())); + } + + + /** + * @param Pool $pool + * @return void + * @throws Exception + */ + protected function checkClientHealth(Pool $pool): void + { + $length = $pool->size($this->cds); + for ($i = 0; $i < $length; $i++) { + try { + $bool = $pool->get($this->cds); + if ($bool === false) { + break; + } + /** @var PDO $client */ + [$client, $time] = $bool; + if ((time() - $time) > $this->idle_time) { + throw new Exception('Client timeout.'); + } + if ($client->query('select 1') === false) { + throw new Exception($client->errorInfo()[1]); + } + $pool->push($this->cds, [$client, time()]); + } catch (\Throwable $exception) { + $this->logger->error(throwable($exception)); + $pool->abandon($this->cds); + } + } } @@ -129,7 +173,7 @@ class Connection extends Component if ($this->_schema === null) { $this->_schema = Kiri::createObject([ 'class' => Schema::class, - 'db' => $this + 'db' => $this ]); } return $this->_schema; @@ -162,14 +206,13 @@ class Connection extends Component } [$client, $time] = $data; - if ((time() - $time) < $this->idle_time && $this->canUse($client)) { + if ((time() - $time) < $this->idle_time) { return $client; } $this->logger->error('PDO连接已失效, 空闲超时或已不可用,重新获取.', [$this->cds]); $this->pool()->abandon($this->cds); - Waite::sleep(10); return $this->getNormalClientHealth(); } @@ -256,7 +299,7 @@ class Connection extends Component if ($pdo === null) { throw new Exception('Failed to rollback transaction: connection was exists.'); } - if ($this->inTransaction()) { + if ($pdo->inTransaction()) { $pdo->rollback(); } $this->pool()->push($this->cds, [$pdo, time()]); @@ -350,28 +393,27 @@ class Connection extends Component /** - * @return array + * @return array */ public function newConnect(): array { - $options = array_merge($this->attributes, [ - PDO::ATTR_CASE => PDO::CASE_NATURAL, - PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION, - PDO::ATTR_ORACLE_NULLS => PDO::NULL_NATURAL, - PDO::ATTR_STRINGIFY_FETCHES => false, - PDO::ATTR_EMULATE_PREPARES => true, - PDO::ATTR_TIMEOUT => $this->connect_timeout, - PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES ' . $this->charset - ]); - $link = new PDO('mysql:dbname=' . $this->database . ';host=' . $this->cds, $this->username, $this->password, $options); - return [$link, time()]; + return [new PDO('mysql:dbname=' . $this->database . ';host=' . $this->cds, + $this->username, $this->password, array_merge($this->attributes, [ + PDO::ATTR_CASE => PDO::CASE_NATURAL, + PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION, + PDO::ATTR_ORACLE_NULLS => PDO::NULL_NATURAL, + PDO::ATTR_STRINGIFY_FETCHES => false, + PDO::ATTR_EMULATE_PREPARES => true, + PDO::ATTR_TIMEOUT => $this->connect_timeout, + PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES ' . $this->charset + ])), time()]; } /** * @return Pool */ - private function pool(): Pool + protected function pool(): Pool { if (!$this->connections->hasChannel($this->cds)) { $this->connections->created($this->cds, $this->pool['max'] ?? 1, [$this, 'newConnect']);