diff --git a/core/Pool/Pool.php b/core/Pool/Pool.php index 9b1d4409..8f7b594b 100644 --- a/core/Pool/Pool.php +++ b/core/Pool/Pool.php @@ -20,214 +20,218 @@ use Swoole\Coroutine\Channel; class Pool extends Component { - /** @var Channel[] */ - private static array $_connections = []; + /** @var Channel[] */ + private static array $_connections = []; - public int $max = 60; + public int $max = 60; - use Alias; + use Alias; - /** - * @param $channel - * @param $retain_number - * @throws Exception - */ - public function flush($channel, $retain_number) - { - $this->pop($channel, $retain_number); - } + /** + * @param $channel + * @param $retain_number + * @throws Exception + */ + public function flush($channel, $retain_number) + { + $this->pop($channel, $retain_number); + } - /** - * @param Channel $channel - * @param $retain_number - * @throws Exception - */ - protected function pop(Channel $channel, $retain_number): void - { - while ($channel->length() > $retain_number) { - $connection = $channel->pop(); - if ($connection instanceof StopHeartbeatCheck) { - $connection->stopHeartbeatCheck(); - } - } - } + /** + * @param Channel $channel + * @param $retain_number + * @throws Exception + */ + protected function pop(Channel $channel, $retain_number): void + { + if ($channel instanceof Channel && Coroutine::getCid() === -1) { + unset($channel); + return; + } + while ($channel->length() > $retain_number) { + $connection = $channel->pop(); + if ($connection instanceof StopHeartbeatCheck) { + $connection->stopHeartbeatCheck(); + } + } + } - /** - * @param $name - * @param false $isMaster - * @param int $max - * @throws ConfigException - */ - public function initConnections($name, bool $isMaster = false, int $max = 60) - { - $name = $this->name($name, $isMaster); - if (isset(static::$_connections[$name])) { - $value = static::$_connections[$name]; - if ($value instanceof Channel || $value instanceof SplQueue) { - return; - } - } - $this->newChannel($name, $max); - $this->max = $max; - } + /** + * @param $name + * @param false $isMaster + * @param int $max + * @throws ConfigException + */ + public function initConnections($name, bool $isMaster = false, int $max = 60) + { + $name = $this->name($name, $isMaster); + if (isset(static::$_connections[$name])) { + $value = static::$_connections[$name]; + if ($value instanceof Channel || $value instanceof SplQueue) { + return; + } + } + $this->newChannel($name, $max); + $this->max = $max; + } - /** - * @param $name - * @return Channel|SplQueue - * @throws ConfigException - * @throws Exception - */ - private function getChannel($name): Channel|SplQueue - { - if (!isset(static::$_connections[$name])) { - $this->newChannel($name); - } - if (static::$_connections[$name]->errCode == SWOOLE_CHANNEL_CLOSED) { - throw new Exception('Channel is Close.'); - } - return static::$_connections[$name]; - } + /** + * @param $name + * @return Channel|SplQueue + * @throws ConfigException + * @throws Exception + */ + private function getChannel($name): Channel|SplQueue + { + if (!isset(static::$_connections[$name])) { + $this->newChannel($name); + } + if (static::$_connections[$name]->errCode == SWOOLE_CHANNEL_CLOSED) { + throw new Exception('Channel is Close.'); + } + return static::$_connections[$name]; + } - /** - * @throws ConfigException - */ - private function newChannel($name, $max = null) - { - if ($max == null) { - $max = Config::get('databases.pool.max', 10); - } - if (Coroutine::getCid() === -1) { - static::$_connections[$name] = new SplQueue($max); - } else { - static::$_connections[$name] = new Channel($max); - } - } + /** + * @throws ConfigException + */ + private function newChannel($name, $max = null) + { + if ($max == null) { + $max = Config::get('databases.pool.max', 10); + } + if (Coroutine::getCid() === -1) { + static::$_connections[$name] = new SplQueue($max); + } else { + static::$_connections[$name] = new Channel($max); + } + } - /** - * @param $name - * @param $callback - * @return array - * @throws ConfigException - */ - public function get($name, $callback): mixed - { - $channel = $this->getChannel($name); - if (!$channel->isEmpty()) { - $connection = $channel->pop(); - defer(fn() => $this->maxIdleQuantity($channel)); - if ($this->checkCanUse($name, $connection)) { - return $connection; - } - } - return $callback(); - } + /** + * @param $name + * @param $callback + * @return array + * @throws ConfigException + */ + public function get($name, $callback): mixed + { + $channel = $this->getChannel($name); + if (!$channel->isEmpty()) { + $connection = $channel->pop(); + defer(fn() => $this->maxIdleQuantity($channel)); + if ($this->checkCanUse($name, $connection)) { + return $connection; + } + } + return $callback(); + } - /** - * @param $channel - * @throws ConfigException - * @throws Exception - */ - private function maxIdleQuantity($channel): void - { - $minx = Config::get('databases.pool.min', 1); - if ($channel->length() > $minx) { - $this->pop($channel, $minx); - } - } + /** + * @param $channel + * @throws ConfigException + * @throws Exception + */ + private function maxIdleQuantity($channel): void + { + $minx = Config::get('databases.pool.min', 1); + if ($channel->length() > $minx) { + $this->pop($channel, $minx); + } + } - /** - * @param $name - * @return bool - * @throws ConfigException - */ - public function isNull($name): bool - { - return $this->getChannel($name)->isEmpty(); - } + /** + * @param $name + * @return bool + * @throws ConfigException + */ + public function isNull($name): bool + { + return $this->getChannel($name)->isEmpty(); + } - /** - * @param string $name - * @param mixed $client - * @return bool - * 检查连接可靠性 - */ - public function checkCanUse(string $name, mixed $client): bool - { - return true; - } + /** + * @param string $name + * @param mixed $client + * @return bool + * 检查连接可靠性 + */ + public function checkCanUse(string $name, mixed $client): bool + { + return true; + } - /** - * @param string $name - * @return bool - */ - public function hasItem(string $name): bool - { - if (isset(static::$_connections[$name])) { - return !static::$_connections[$name]->isEmpty(); - } - return false; - } + /** + * @param string $name + * @return bool + */ + public function hasItem(string $name): bool + { + if (isset(static::$_connections[$name])) { + return !static::$_connections[$name]->isEmpty(); + } + return false; + } - /** - * @param string $name - * @return mixed - */ - public function size(string $name): mixed - { - if (!isset(static::$_connections[$name])) { - return 0; - } - return static::$_connections[$name]->length(); - } + /** + * @param string $name + * @return mixed + */ + public function size(string $name): mixed + { + if (!isset(static::$_connections[$name])) { + return 0; + } + return static::$_connections[$name]->length(); + } - /** - * @param string $name - * @param mixed $client - * @throws ConfigException - */ - public function push(string $name, mixed $client) - { - $channel = $this->getChannel($name); - if (!$channel->isFull()) { - $channel->push($client); - } - unset($client); - } + /** + * @param string $name + * @param mixed $client + * @throws ConfigException + */ + public function push(string $name, mixed $client) + { + $channel = $this->getChannel($name); + if (!$channel->isFull()) { + $channel->push($client); + } + unset($client); + } - /** - * @param string $name - * @throws Exception - */ - public function clean(string $name) - { - if (!isset(static::$_connections[$name])) { - return; - } - $channel = static::$_connections[$name]; - $this->pop($channel, 0); - } + /** + * @param string $name + * @throws Exception + */ + public function clean(string $name) + { + if (!isset(static::$_connections[$name])) { + return; + } + $channel = static::$_connections[$name]; + $this->pop($channel, 0); + } - /** - * @return Channel[] - */ - protected function getChannels(): array - { - return static::$_connections; - } + /** + * @return Channel[] + */ + protected function getChannels(): array + { + return static::$_connections; + } }