diff --git a/Connection.php b/Connection.php index 0487661..fcf6897 100644 --- a/Connection.php +++ b/Connection.php @@ -23,7 +23,11 @@ use Kiri\Abstracts\Config; use Kiri\Events\EventProvider; use Kiri\Exception\NotFindClassException; use Kiri\Server\Events\OnWorkerExit; +use Psr\Container\ContainerExceptionInterface; +use Psr\Container\NotFoundExceptionInterface; use ReflectionException; +use Kiri\Pool\Connection as PoolConnection; +use Kiri\Di\ContainerInterface; /** * Class Connection @@ -48,6 +52,9 @@ class Connection extends Component public array $pool; + + private PoolConnection $connection; + /** * @var bool * enable database cache @@ -75,12 +82,17 @@ class Connection extends Component /** * @param EventProvider $eventProvider + * @param Kiri\Di\ContainerInterface $container * @param array $config + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface * @throws Exception */ - public function __construct(public EventProvider $eventProvider, array $config = []) + public function __construct(public EventProvider $eventProvider, public ContainerInterface $container, array $config = []) { parent::__construct($config); + + $this->connection = $this->container->get(PoolConnection::class); } @@ -88,7 +100,7 @@ class Connection extends Component * @return void * @throws Exception */ - public function init() + public function init(): void { $this->eventProvider->on(OnWorkerExit::class, [$this, 'clear_connection'], 0); $this->eventProvider->on(BeginTransaction::class, [$this, 'beginTransaction'], 0); @@ -99,28 +111,16 @@ class Connection extends Component } - /** - * @param $isSearch - * @return PDO - * @throws Exception - */ - public function getConnect($isSearch): PDO - { - return !$isSearch ? $this->getPdo() : $this->getSlaveClient(); - } - - /** * @throws Exception */ public function connectPoolInstance() { - $connections = $this->connections(); $pool = Config::get('databases.pool.max', 10); if (!empty($this->slaveConfig) && $this->cds != $this->slaveConfig['cds']) { - $connections->initConnections('Mysql:' . $this->slaveConfig['cds'], $pool); + $this->connection->initConnections('Mysql:' . $this->slaveConfig['cds'], $pool); } else { - $connections->initConnections('Mysql:' . $this->cds, $pool); + $this->connection->initConnections('Mysql:' . $this->cds, $pool); } } @@ -149,7 +149,7 @@ class Connection extends Component */ public function getMasterClient(): PDO { - return $this->connections()->get([ + return $this->connection->get([ 'cds' => $this->cds, 'username' => $this->username, 'password' => $this->password, @@ -170,20 +170,9 @@ class Connection extends Component if (empty($this->slaveConfig) || $this->slaveConfig['cds'] == $this->cds) { return $this->getPdo(); } - return $this->connections()->get($this->slaveConfig); + return $this->connection->get($this->slaveConfig); } - - /** - * @return \Kiri\Pool\Connection - * @throws Exception - */ - private function connections(): \Kiri\Pool\Connection - { - return Kiri::getDi()->get(\Kiri\Pool\Connection::class); - } - - /** * @return $this * @throws Exception @@ -264,7 +253,7 @@ class Connection extends Component */ public function release(PDO $pdo) { - $connections = $this->connections(); + $connections = $this->connection; if (!$pdo->inTransaction()) { $cds = $this->cds; if (isset($this->slaveConfig['cds'])) { @@ -282,15 +271,11 @@ class Connection extends Component */ public function clear_connection() { - $connections = $this->connections(); - - $connections->connection_clear($this->cds); - - if (!isset($this->slaveConfig['cds'])) { - $this->slaveConfig['cds'] = $this->cds; + $this->connection->connection_clear($this->cds); + if (!isset($this->slaveConfig['cds']) || $this->cds == $this->slaveConfig['cds']) { + return; } - - $connections->connection_clear($this->slaveConfig['cds']); + $this->connection->connection_clear($this->slaveConfig['cds']); } @@ -299,14 +284,11 @@ class Connection extends Component */ public function disconnect() { - $connections = $this->connections(); - $connections->disconnect($this->cds); - - if (!isset($this->slaveConfig['cds'])) { - $this->slaveConfig['cds'] = $this->cds; + $this->connection->disconnect($this->cds); + if (!isset($this->slaveConfig['cds']) || $this->cds == $this->slaveConfig['cds']) { + return; } - - $connections->disconnect($this->slaveConfig['cds']); + $this->connection->disconnect($this->slaveConfig['cds']); } } diff --git a/DatabasesProviders.php b/DatabasesProviders.php index 4bbf8bf..4b43c5d 100644 --- a/DatabasesProviders.php +++ b/DatabasesProviders.php @@ -9,7 +9,12 @@ use Kiri; use Kiri\Abstracts\Config; use Kiri\Abstracts\Providers; use Kiri\Application; +use Kiri\Pool\Connection as PoolConnection; use Kiri\Exception\ConfigException; +use Kiri\Events\EventProvider; +use Kiri\Annotation\Inject; +use Kiri\Server\Events\OnWorkerStart; +use Kiri\Server\Events\OnTaskerStart; /** * Class DatabasesProviders @@ -19,21 +24,29 @@ class DatabasesProviders extends Providers { + /** + * @var EventProvider + */ + #[Inject(EventProvider::class)] + public EventProvider $provider; + + /** * @param Application $application * @return void * @throws ConfigException + * @throws Exception */ - public function onImport(Application $application) + public function onImport(Application $application): void { $databases = Config::get('databases.connections', []); if (empty($databases)) { return; } - - $app = Kiri::app(); + $this->provider->on(OnWorkerStart::class, [$this, 'check']); + $this->provider->on(OnTaskerStart::class, [$this, 'check']); foreach ($databases as $key => $database) { - $app->set($key, $this->_settings($database)); + $application->set($key, $this->_settings($database)); } } @@ -48,6 +61,32 @@ class DatabasesProviders extends Providers return Kiri::app()->get($name); } + + /** + * @param OnTaskerStart|OnWorkerStart $start + * @return void + */ + public function check(OnTaskerStart|OnWorkerStart $start): void + { + $start->server->tick(60000, function () { + $databases = Config::get('databases.connections', []); + if (empty($databases)) { + return; + } + + $connection = Kiri::getDi()->get(PoolConnection::class); + foreach ($databases as $database) { + $connection->check($database['cds']); + if (isset($database['slaveConfig']) && isset($database['slaveConfig']['cds'])) { + if ($database['slaveConfig']['cds'] != $database['cds']) { + $connection->check($database['cds']); + } + } + } + }); + } + + /** * @param $database * @return array diff --git a/Mysql/PDO.php b/Mysql/PDO.php index ae00edd..c399b00 100644 --- a/Mysql/PDO.php +++ b/Mysql/PDO.php @@ -28,8 +28,6 @@ class PDO implements StopHeartbeatCheck private int $_timer = -1; - private int $_last = 0; - public string $dbname; public string $cds; public string $username; @@ -60,10 +58,10 @@ class PDO implements StopHeartbeatCheck /** * @return void + * @throws Exception */ public function init(): void { - $this->heartbeat_check(); $eventProvider = Kiri::getDi()->get(EventProvider::class); $eventProvider->on(OnWorkerExit::class, [$this, 'onWorkerExit']); } @@ -82,50 +80,18 @@ class PDO implements StopHeartbeatCheck * @param Kiri\Server\Events\OnWorkerExit $exit * @return void */ - public function onWorkerExit(OnWorkerExit $exit) + public function onWorkerExit(OnWorkerExit $exit): void { $this->stopHeartbeatCheck(); } - /** - * - */ - public function heartbeat_check(): void - { - if ($this->_timer === -1) { - $this->_timer = Timer::tick(1000, [$this, 'waite']); - } - } - - - /** - * @throws Exception - */ - private function waite(): void - { - try { - $tick = (int)Config::get('databases.pool.tick', 60); - if ($this->_timer == -1 || time() - $this->_last > $tick) { - $this->stopHeartbeatCheck(); - - $this->pdo = null; - } - } catch (\Throwable $throwable) { - error($throwable); - } - } - - /** * */ public function stopHeartbeatCheck(): void { - if ($this->_timer > -1) { - Timer::clear($this->_timer); - } - $this->_timer = -1; + $this->pdo = null; } @@ -254,6 +220,27 @@ class PDO implements StopHeartbeatCheck } + /** + * @return bool + */ + public function check(): bool + { + try { + if (!($this->pdo instanceof \PDO)) { + return $result = false; + } + $this->pdo->getAttribute(\PDO::ATTR_SERVER_INFO); + + $result = true; + } catch (\Throwable $throwable) { + $this->pdo = null; + $result = false; + } finally { + return $result; + } + } + + /** * @param PDOStatement $statement * @param array $params