From 92b5b248a367b79a278e9757ecc756663d013d10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=91=E6=9E=97?= Date: Fri, 17 Jun 2022 11:59:19 +0800 Subject: [PATCH] modify plugin name --- kiri-engine/Abstracts/CoordinatorManager.php | 27 +++++ kiri-engine/Coordinator.php | 55 ++++++++++ kiri-engine/Pool/Connection.php | 35 ++++++- kiri-engine/Pool/Pool.php | 56 ++++++----- kiri-engine/Pool/PoolQueue.php | 100 +++++++++++++++++++ kiri-engine/Pool/QueueInterface.php | 3 + kiri-engine/Pool/SplQueue.php | 10 ++ 7 files changed, 257 insertions(+), 29 deletions(-) create mode 100644 kiri-engine/Abstracts/CoordinatorManager.php create mode 100644 kiri-engine/Coordinator.php create mode 100644 kiri-engine/Pool/PoolQueue.php diff --git a/kiri-engine/Abstracts/CoordinatorManager.php b/kiri-engine/Abstracts/CoordinatorManager.php new file mode 100644 index 00000000..31486e37 --- /dev/null +++ b/kiri-engine/Abstracts/CoordinatorManager.php @@ -0,0 +1,27 @@ +waite; + } + + + /** + * @return void + */ + public function yield(): void + { + if ($this->waite === false) { + return; + } + $this->yield(); + } + + + /** + * @return void + */ + public function waite(): void + { + $this->waite = true; + } + + + /** + * @return void + */ + public function done(): void + { + $this->waite = false; + } + + +} diff --git a/kiri-engine/Pool/Connection.php b/kiri-engine/Pool/Connection.php index 7d5c7a25..c7d8a98c 100644 --- a/kiri-engine/Pool/Connection.php +++ b/kiri-engine/Pool/Connection.php @@ -13,6 +13,7 @@ use Kiri\Abstracts\Config; use Kiri\Context; use Swoole\Error; use Throwable; +use Kiri\Abstracts\CoordinatorManager; /** * Class Connection @@ -96,8 +97,21 @@ class Connection extends Component public function get(mixed $config): ?PDO { $minx = Config::get('databases.pool.min', 1); - return $this->pool->get($config['cds'], static function () use ($config) { - $connect = Kiri::getDi()->create(PDO::class, [$config]); + + CoordinatorManager::utility($config['cds'])->yield(); + + return $this->pool->get($config['cds'], $this->generate($config), $minx); + } + + + /** + * @param array $config + * @return Closure + */ + public function generate(array $config): Closure + { + return static function () use ($config) { + $connect = new PDO($config); if (!Db::inTransactionsActive()) { return $connect; } @@ -105,7 +119,18 @@ class Connection extends Component $connect->beginTransaction(); } return $connect; - }, $minx); + }; + } + + + /** + * @param string $name + * @return void + * @throws Kiri\Exception\ConfigException + */ + public function check(string $name): void + { + $this->pool->check($name); } @@ -117,7 +142,7 @@ class Connection extends Component public function create($coroutineName, $config): Closure { return static function () use ($coroutineName, $config) { - return Kiri::getDi()->create(PDO::class, [$config]); + return new PDO($config); }; } @@ -129,7 +154,7 @@ class Connection extends Component * @throws Kiri\Exception\ConfigException * @throws Exception */ - public function addItem(string $name, PDO $PDO) + public function addItem(string $name, PDO $PDO): void { $this->pool->push($name, $PDO); } diff --git a/kiri-engine/Pool/Pool.php b/kiri-engine/Pool/Pool.php index 73b26edf..b2ea223a 100644 --- a/kiri-engine/Pool/Pool.php +++ b/kiri-engine/Pool/Pool.php @@ -4,12 +4,13 @@ namespace Kiri\Pool; +use Database\Mysql\PDO; use Exception; use Kiri\Abstracts\Component; use Kiri\Abstracts\Config; +use Kiri\Abstracts\CoordinatorManager; use Kiri\Context; use Kiri\Exception\ConfigException; -use Swoole\Coroutine; use Swoole\Coroutine\Channel; @@ -20,7 +21,7 @@ use Swoole\Coroutine\Channel; class Pool extends Component { - /** @var Channel[] */ + /** @var array */ private static array $_connections = []; public int $max = 60; @@ -40,10 +41,10 @@ class Pool extends Component /** - * @param Channel|SplQueue $channel + * @param PoolQueue $channel * @param $retain_number */ - protected function pop(Channel|SplQueue $channel, $retain_number): void + protected function pop(PoolQueue $channel, $retain_number): void { while ($channel->length() > $retain_number) { if (Context::inCoroutine()) { @@ -56,6 +57,24 @@ class Pool extends Component } + /** + * @param $name + * @return void + * @throws ConfigException + */ + public function check($name): void + { + CoordinatorManager::utility($name)->waite(); + + $channel = $this->channel($name); + while (($pdo = $channel->pop()) instanceof PDO) { + $pdo->check(); + } + + CoordinatorManager::utility($name)->done(); + } + + /** * @param $name * @param int $max @@ -65,7 +84,7 @@ class Pool extends Component { if (isset(static::$_connections[$name])) { $value = static::$_connections[$name]; - if ($value instanceof Channel || $value instanceof SplQueue) { + if ($value instanceof PoolQueue) { return; } } @@ -76,11 +95,11 @@ class Pool extends Component /** * @param $name - * @return Channel|SplQueue + * @return PoolQueue * @throws ConfigException * @throws Exception */ - private function getChannel($name): Channel|SplQueue + public function channel($name): PoolQueue { if (!isset(static::$_connections[$name])) { $this->newChannel($name); @@ -100,11 +119,7 @@ class Pool extends Component if ($max == null) { $max = Config::get('databases.pool.max', 10); } - if (Coroutine::getCid() === -1) { - static::$_connections[$name] = new SplQueue($max); - } else { - static::$_connections[$name] = new Channel($max); - } + static::$_connections[$name] = new PoolQueue($max); } @@ -118,7 +133,7 @@ class Pool extends Component */ public function get($name, $callback, $minx): mixed { - $channel = $this->getChannel($name); + $channel = $this->channel($name); if (!$channel->isEmpty()) { return $this->maxIdleQuantity($channel, $minx); } @@ -149,7 +164,7 @@ class Pool extends Component */ public function isNull($name): bool { - return $this->getChannel($name)->isEmpty(); + return $this->channel($name)->isEmpty(); } @@ -198,7 +213,7 @@ class Pool extends Component */ public function push(string $name, mixed $client) { - $channel = $this->getChannel($name); + $channel = $this->channel($name); if (!$channel->isFull()) { $channel->push($client); } @@ -216,13 +231,6 @@ class Pool extends Component return; } while (static::$_connections[$name]->length() > 0) { - if (static::$_connections[$name] instanceof Channel) - { - if (!Context::inCoroutine()) - { - break; - } - } $client = static::$_connections[$name]->pop(); if ($client instanceof StopHeartbeatCheck) { $client->stopHeartbeatCheck(); @@ -234,9 +242,9 @@ class Pool extends Component /** - * @return Channel[] + * @return PoolQueue[] */ - protected function getChannels(): array + protected function channels(): array { return static::$_connections; } diff --git a/kiri-engine/Pool/PoolQueue.php b/kiri-engine/Pool/PoolQueue.php new file mode 100644 index 00000000..221bb1e2 --- /dev/null +++ b/kiri-engine/Pool/PoolQueue.php @@ -0,0 +1,100 @@ +queue = new Channel($this->max); + } else { + $this->queue = new SplQueue($this->max); + } + } + + + /** + * @return bool + */ + public function isEmpty(): bool + { + return $this->queue->isEmpty(); + } + + /** + * @param mixed $data + * @param float $timeout + * @return bool + */ + public function push(mixed $data, float $timeout = -1): bool + { + return $this->queue->push($data, $timeout); + } + + + /** + * @param float $timeout + * @return mixed + */ + public function pop(float $timeout = -1): mixed + { + return $this->queue->pop($timeout); + } + + + /** + * @return array + */ + public function stats(): array + { + return $this->queue->stats(); + } + + /** + * @return bool + */ + public function close(): bool + { + return $this->queue->close(); + } + + + /** + * @return int + */ + public function length(): int + { + return $this->queue->length(); + } + + + /** + * @return bool + */ + public function isFull(): bool + { + return $this->queue->isFull(); + } + + + /** + * @return bool + */ + public function isClose(): bool + { + if ($this->queue instanceof Channel) { + return $this->queue->errCode == SWOOLE_CHANNEL_CLOSED; + } + return false; + } + +} diff --git a/kiri-engine/Pool/QueueInterface.php b/kiri-engine/Pool/QueueInterface.php index 92809302..8f29b20f 100644 --- a/kiri-engine/Pool/QueueInterface.php +++ b/kiri-engine/Pool/QueueInterface.php @@ -25,4 +25,7 @@ interface QueueInterface public function isFull(): bool; + + public function isClose(): bool; + } diff --git a/kiri-engine/Pool/SplQueue.php b/kiri-engine/Pool/SplQueue.php index 9c410170..08376f6f 100644 --- a/kiri-engine/Pool/SplQueue.php +++ b/kiri-engine/Pool/SplQueue.php @@ -98,4 +98,14 @@ class SplQueue implements QueueInterface // TODO: Implement isFull() method. return $this->channel->count() >= $this->max; } + + + /** + * @return bool + */ + public function isClose(): bool + { + return false; + } + }