From 86798409fd8e7575888fe8059879fcf37c8c0e00 Mon Sep 17 00:00:00 2001 From: as2252258 Date: Sun, 10 Jul 2022 01:19:09 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8F=98=E6=9B=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DatabasesProviders.php | 243 ++++++++++++++++++++++------------------- 1 file changed, 131 insertions(+), 112 deletions(-) diff --git a/DatabasesProviders.php b/DatabasesProviders.php index 5cd3400..600a270 100644 --- a/DatabasesProviders.php +++ b/DatabasesProviders.php @@ -27,124 +27,143 @@ class DatabasesProviders extends Providers { - /** - * @var EventProvider - */ - #[Inject(EventProvider::class)] - public EventProvider $provider; + /** + * @var EventProvider + */ + #[Inject(EventProvider::class)] + public EventProvider $provider; - /** - * @param LocalService $application - * @return void - * @throws ConfigException - * @throws Exception - */ - public function onImport(LocalService $application): void - { - $databases = Config::get('databases.connections', []); - if (empty($databases)) { - return; - } - $this->provider->on(OnWorkerStart::class, [$this, 'check']); - $this->provider->on(OnTaskerStart::class, [$this, 'check']); - $this->provider->on(OnWorkerExit::class, [$this, 'exit'], 9999); - foreach ($databases as $key => $database) { - $application->set($key, $this->_settings($database)); - } - } - - - /** - * @param OnWorkerExit $exit - * @return void - * @throws ConfigException - * @throws Exception - */ - public function exit(OnWorkerExit $exit): void - { - Timer::clearAll(); - $databases = Config::get('databases.connections', []); - if (!empty($databases)) { - $connection = Kiri::getDi()->get(PoolConnection::class); - foreach ($databases as $database) { - $connection->disconnect($database['cds']); - } - } - } - - /** - * @param $name - * @return Connection - * @throws Exception - */ - public function get($name): Connection - { - return Kiri::service()->get($name); - } + /** + * @var PoolConnection + */ + #[Inject(PoolConnection::class)] + public PoolConnection $connection; - /** - * @param OnTaskerStart|OnWorkerStart $start - * @return void - */ - public function check(OnTaskerStart|OnWorkerStart $start): void - { - Timer::tick(50000, function () use ($start) { - $databases = Config::get('databases.connections', []); - $valid = 0; - $count = 0; - $logger = Kiri::getDi()->get(LoggerInterface::class); - if (!empty($databases)) { - $connection = Kiri::getDi()->get(PoolConnection::class); - foreach ($databases as $database) { - try { - [$total, $success] = $connection->check($database['cds']); - - $count += $total; - $valid += $success; - - if (isset($database['slaveConfig']) && isset($database['slaveConfig']['cds'])) { - if ($database['slaveConfig']['cds'] != $database['cds']) { - [$total, $success] = $connection->check($database['slaveConfig']['cds']); - - $count += $total; - $valid += $success; - } - } - } catch (\Throwable $throwable) { - $logger->error($throwable->getMessage()); - } - } - } - $logger->alert(sprintf('Worker %d db client has %d, valid %d', $start->workerId, $count, $valid)); - }); - } + /** + * @param LocalService $application + * @return void + * @throws ConfigException + * @throws Exception + */ + public function onImport(LocalService $application): void + { + $databases = Config::get('databases.connections', []); + if (empty($databases)) { + return; + } + $this->provider->on(OnWorkerStart::class, [$this, 'check']); + $this->provider->on(OnTaskerStart::class, [$this, 'check']); + $this->provider->on(OnWorkerExit::class, [$this, 'exit'], 9999); + foreach ($databases as $key => $database) { + $application->set($key, $this->_settings($database)); + } + } - /** - * @param $database - * @return array - */ - private function _settings($database): array - { - $clientPool = $database['pool'] ?? ['min' => 1, 'max' => 5, 'tick' => 60]; - return [ - 'id' => $database['id'], - 'cds' => $database['cds'], - 'class' => Connection::class, - 'username' => $database['username'], - 'password' => $database['password'], - 'tablePrefix' => $database['tablePrefix'], - 'database' => $database['database'], - 'connect_timeout' => $database['connect_timeout'] ?? 30, - 'read_timeout' => $database['read_timeout'] ?? 10, - 'pool' => $clientPool, - 'attributes' => $database['attributes'] ?? [], - 'charset' => $database['charset'] ?? 'utf8mb4', - 'slaveConfig' => $database['slaveConfig'] - ]; - } + /** + * @param OnWorkerExit $exit + * @return void + * @throws ConfigException + * @throws Exception + */ + public function exit(OnWorkerExit $exit): void + { + Timer::clearAll(); + $databases = Config::get('databases.connections', []); + if (!empty($databases)) { + foreach ($databases as $database) { + $this->connection->disconnect($database['cds']); + } + } + } + + /** + * @param $name + * @return Connection + * @throws Exception + */ + public function get($name): Connection + { + return Kiri::service()->get($name); + } + + + /** + * @param OnTaskerStart|OnWorkerStart $start + * @return void + */ + public function check(OnTaskerStart|OnWorkerStart $start): void + { + Timer::tick(50000, static function () use ($start) { + $valid = $count = 0; + $logger = Kiri::getDi()->get(LoggerInterface::class); + $databases = Config::get('databases.connections', []); + if (!empty($databases)) { + DatabasesProviders::each($databases, $logger, $count, $valid); + } + $const = 'Worker %d db client has %d, valid %d'; + $logger->alert(sprintf($const, $start->workerId, $count, $valid)); + }); + } + + + /** + * @param $databases + * @param LoggerInterface $logger + * @param $count + * @param $valid + * @return void + */ + public static function each($databases, LoggerInterface $logger, &$count, &$valid): void + { + $connection = Kiri::getDi()->get(PoolConnection::class); + foreach ($databases as $database) { + try { + [$total, $success] = $connection->check($database['cds']); + + $count += $total; + $valid += $success; + + if (isset($database['slaveConfig']) && isset($database['slaveConfig']['cds'])) { + if ($database['slaveConfig']['cds'] != $database['cds']) { + [$total, $success] = $connection->check($database['slaveConfig']['cds']); + + $count += $total; + $valid += $success; + } + } + } catch (\Throwable $throwable) { + $logger->error($throwable->getMessage()); + } + } + } + + + /** + * @param $database + * @return array + */ + private function _settings($database): array + { + $clientPool = $database['pool'] ?? ['min' => 1, 'max' => 5, 'tick' => 60]; + return [ + 'id' => $database['id'], + 'cds' => $database['cds'], + 'class' => Connection::class, + 'username' => $database['username'], + 'password' => $database['password'], + 'tablePrefix' => $database['tablePrefix'], + 'database' => $database['database'], + 'connect_timeout' => $database['connect_timeout'] ?? 30, + 'read_timeout' => $database['read_timeout'] ?? 10, + 'pool' => $clientPool, + 'attributes' => $database['attributes'] ?? [], + 'charset' => $database['charset'] ?? 'utf8mb4', + 'slaveConfig' => $database['slaveConfig'] + ]; + } }