diff --git a/System/Pool/Helper/QueueInterface.php b/System/Pool/Helper/QueueInterface.php new file mode 100644 index 00000000..30e0215e --- /dev/null +++ b/System/Pool/Helper/QueueInterface.php @@ -0,0 +1,28 @@ +channel = new \SplQueue(); + } + + + /** + * @return bool + */ + public function isEmpty(): bool + { + // TODO: Implement isEmpty() method. + return $this->channel->count() < 1; + } + + + /** + * @param mixed $data + * @param float $timeout + * @return bool + */ + public function push(mixed $data, float $timeout = -1): bool + { + // TODO: Implement push() method. + $this->channel->enqueue($data); + return true; + } + + + /** + * @param float $timeout + * @return mixed + */ + public function pop(float $timeout = -1): mixed + { + // TODO: Implement pop() method. + return $this->channel->dequeue(); + } + + + /** + * @return array + */ + public function stats(): array + { + // TODO: Implement stats() method. + return []; + } + + + /** + * @return bool + */ + public function close(): bool + { + // TODO: Implement close() method. + return false; + } + + + /** + * @return int + */ + public function length(): int + { + // TODO: Implement length() method. + return $this->channel->count(); + } + + + /** + * @return bool + */ + public function isFull(): bool + { + // TODO: Implement isFull() method. + return $this->channel->count() >= $this->max; + } +} diff --git a/System/Pool/Pool.php b/System/Pool/Pool.php index ad1caef2..8a26c84e 100644 --- a/System/Pool/Pool.php +++ b/System/Pool/Pool.php @@ -5,14 +5,12 @@ namespace Kiri\Pool; use Exception; -use JetBrains\PhpStorm\Pure; use Kiri\Abstracts\Component; use Kiri\Abstracts\Config; use Kiri\Exception\ConfigException; -use Kiri\Kiri; +use Kiri\Pool\Helper\SplQueue; use Swoole\Coroutine; use Swoole\Coroutine\Channel; -use Swoole\Timer; /** @@ -22,79 +20,94 @@ use Swoole\Timer; class Pool extends Component { - /** @var Channel[] */ - private static array $_connections = []; + /** @var Channel[] */ + private static array $_connections = []; - public int $max = 60; + public int $max = 60; - use Alias; + use Alias; - /** - * @param $channel - * @param $retain_number - * @throws Exception - */ - public function flush($channel, $retain_number) - { - $this->pop($channel, $retain_number); - } + /** + * @param $channel + * @param $retain_number + * @throws Exception + */ + public function flush($channel, $retain_number) + { + $this->pop($channel, $retain_number); + } - /** - * @param Channel $channel - * @param $retain_number - * @throws Exception - */ - protected function pop(Channel $channel, $retain_number): void - { - if (Coroutine::getCid() === -1) { - return; - } - while ($channel->length() > $retain_number) { - $connection = $channel->pop(); - if ($connection instanceof StopHeartbeatCheck) { - $connection->stopHeartbeatCheck(); - } - } - } + /** + * @param Channel $channel + * @param $retain_number + * @throws Exception + */ + protected function pop(Channel $channel, $retain_number): void + { + while ($channel->length() > $retain_number) { + $connection = $channel->pop(); + if ($connection instanceof StopHeartbeatCheck) { + $connection->stopHeartbeatCheck(); + } + } + } - /** - * @param $name - * @param false $isMaster - * @param int $max - */ - public function initConnections($name, bool $isMaster = false, int $max = 60) - { - $name = $this->name($name, $isMaster); - if (isset(static::$_connections[$name]) && static::$_connections[$name] instanceof Channel) { - return; - } - if (Coroutine::getCid() === -1) { - return; - } - static::$_connections[$name] = new Channel($max); - $this->max = $max; - } + /** + * @param $name + * @param false $isMaster + * @param int $max + * @throws ConfigException + */ + public function initConnections($name, bool $isMaster = false, int $max = 60) + { + $name = $this->name($name, $isMaster); + if (isset(static::$_connections[$name])) { + return; + } + $value = static::$_connections[$name]; + if ($value instanceof Channel || $value instanceof SplQueue) { + return; + } + $this->newChannel($name, $max); + $this->max = $max; + } - /** - * @param $name - * @return Channel - * @throws ConfigException - * @throws Exception - */ - private function getChannel($name): Channel - { - if (!isset(static::$_connections[$name])) { - static::$_connections[$name] = new Channel(Config::get('databases.pool.max', 10)); - } - if (static::$_connections[$name]->errCode == SWOOLE_CHANNEL_CLOSED) { - throw new Exception('Channel is Close.'); - } - return static::$_connections[$name]; - } + /** + * @param $name + * @return Channel + * @throws ConfigException + * @throws Exception + */ + private function getChannel($name): Channel + { + if (!isset(static::$_connections[$name])) { + $this->newChannel($name); + } + if (static::$_connections[$name]->errCode == SWOOLE_CHANNEL_CLOSED) { + throw new Exception('Channel is Close.'); + } + return static::$_connections[$name]; + } + + + /** + * @throws ConfigException + */ + private function newChannel($name, $max = null) + { + if ($max !== null) { + $max = Config::get('databases.pool.max', 10); + } + if (Coroutine::getCid() === -1) { + static::$_connections[$name] = new SplQueue($max); + } else { + static::$_connections[$name] = new Channel($max); + } + } /** @@ -103,113 +116,104 @@ class Pool extends Component * @return array * @throws ConfigException */ - public function get($name, $callback): mixed - { - if (Coroutine::getCid() === -1) { - return $callback(); - } - $channel = $this->getChannel($name); - if (!$channel->isEmpty()) { - $connection = $channel->pop(); - if ($this->checkCanUse($name, $connection)) { - return $connection; - } - } - return $callback(); - } + public function get($name, $callback): mixed + { + $channel = $this->getChannel($name); + if (!$channel->isEmpty()) { + $connection = $channel->pop(); + if ($this->checkCanUse($name, $connection)) { + return $connection; + } + } + return $callback(); + } - /** - * @param $name - * @return bool - * @throws ConfigException - */ - public function isNull($name): bool - { - return $this->getChannel($name)->isEmpty(); - } + /** + * @param $name + * @return bool + * @throws ConfigException + */ + public function isNull($name): bool + { + return $this->getChannel($name)->isEmpty(); + } - /** - * @param string $name - * @param mixed $client - * @return bool - * 检查连接可靠性 - */ - public function checkCanUse(string $name, mixed $client): bool - { - return true; - } + /** + * @param string $name + * @param mixed $client + * @return bool + * 检查连接可靠性 + */ + public function checkCanUse(string $name, mixed $client): bool + { + return true; + } - /** - * @param string $name - * @return bool - */ - public function hasItem(string $name): bool - { - if (isset(static::$_connections[$name])) { - return !static::$_connections[$name]->isEmpty(); - } - return false; - } + /** + * @param string $name + * @return bool + */ + public function hasItem(string $name): bool + { + if (isset(static::$_connections[$name])) { + return !static::$_connections[$name]->isEmpty(); + } + return false; + } - /** - * @param string $name - * @return mixed - */ - public function size(string $name): mixed - { - if (Coroutine::getCid() === -1) { - return 0; - } - if (!isset(static::$_connections[$name])) { - return 0; - } - return static::$_connections[$name]->length(); - } + /** + * @param string $name + * @return mixed + */ + public function size(string $name): mixed + { + if (!isset(static::$_connections[$name])) { + return 0; + } + return static::$_connections[$name]->length(); + } - /** - * @param string $name - * @param mixed $client - * @throws ConfigException - */ - public function push(string $name, mixed $client) - { - if (Coroutine::getCid() === -1) { - return; - } - $channel = $this->getChannel($name); - if (!$channel->isFull()) { - $channel->push($client); - } - unset($client); - } + /** + * @param string $name + * @param mixed $client + * @throws ConfigException + */ + public function push(string $name, mixed $client) + { + $channel = $this->getChannel($name); + if (!$channel->isFull()) { + $channel->push($client); + } + unset($client); + } - /** - * @param string $name - * @throws Exception - */ - public function clean(string $name) - { - if (Coroutine::getCid() === -1 || !isset(static::$_connections[$name])) { - return; - } - $channel = static::$_connections[$name]; - $this->pop($channel, 0); - } + /** + * @param string $name + * @throws Exception + */ + public function clean(string $name) + { + if (!isset(static::$_connections[$name])) { + return; + } + $channel = static::$_connections[$name]; + $this->pop($channel, 0); + } - /** - * @return Channel[] - */ - protected function getChannels(): array - { - return static::$_connections; - } + /** + * @return Channel[] + */ + protected function getChannels(): array + { + return static::$_connections; + } }