diff --git a/PoolItem.php b/PoolItem.php index 7f511bf..5f63df1 100644 --- a/PoolItem.php +++ b/PoolItem.php @@ -1,147 +1,163 @@ -reconnect(); - } - - - /** - * @return bool - */ - public function isClose(): bool - { - if ($this->_items instanceof Channel) { - return $this->_items->errCode == SWOOLE_CHANNEL_CLOSED; - } - return false; - } - - - /** - * @return void - */ - public function reconnect(): void - { - if (Coroutine::getCid() > -1) { - $this->_items = new Channel($this->maxCreated); - } else { - $this->_items = new SplQueue($this->maxCreated); - } - } - - - /** - * @param Channel|SplQueue $items - */ - public function setItems(Channel|SplQueue $items): void - { - $this->_items = $items; - } - - - /** - * @param mixed $item - * @return void - */ - public function push(mixed $item): void - { - if (is_null($item)) { - return; - } - $this->_items->push($item); - } - - - /** - * @return bool - */ - public function isEmpty(): bool - { - return $this->_items->isEmpty(); - } - - - /** - * @return int - */ - public function size(): int - { - return $this->_items->length(); - } - - - /** - * @return bool - */ - public function close(): bool - { - return $this->_items->close(); - } - - - /** - * @param int $min - * @return void - */ - public function tailor(int $min = 0): void - { - while ($this->_items->length() > $min) { - $connection = $this->_items->pop(0.000001); - if ($connection instanceof StopHeartbeatCheck) { - $connection->stopHeartbeatCheck(); - } - $connection = null; - } - } - - - /** - * @return void - */ - public function abandon(): void - { - $this->created -= 1; - } - - - /** - * @param int $waite - * @return mixed - */ - public function pop(int $waite = 10): mixed - { - if ($this->_items->isEmpty()) { - return call_user_func($this->callback); - } else { - return $this->_items->pop($waite); - } - } -} +reconnect(); + } + + + /** + * @return bool 判断 Channel 是否已关闭 + */ + public function isClose(): bool + { + if ($this->_items instanceof Channel) { + return $this->_items->errCode == SWOOLE_CHANNEL_CLOSED; + } + return false; + } + + + /** + * 重新创建内部存储队列(Channel 或 SplQueue) + * @return void + */ + public function reconnect(): void + { + if (Coroutine::getCid() > -1) { + $this->_items = new Channel($this->maxCreated); + } else { + $this->_items = new SplQueue($this->maxCreated); + } + $this->created = 0; + } + + + /** + * @param Channel|SplQueue $items + */ + public function setItems(Channel|SplQueue $items): void + { + $this->_items = $items; + } + + + /** + * 将连接推回池中 + * @param mixed $item 连接对象 + * @return void + */ + public function push(mixed $item): void + { + if (is_null($item)) { + return; + } + $this->_items->push($item); + } + + + /** + * @return bool 判断池是否为空 + */ + public function isEmpty(): bool + { + return $this->_items->isEmpty(); + } + + + /** + * @return int 当前池中可用连接数 + */ + public function size(): int + { + return $this->_items->length(); + } + + + /** + * @return bool 关闭连接池 + */ + public function close(): bool + { + return $this->_items->close(); + } + + + /** + * 缩容连接池,将连接数缩减至 min 个 + * @param int $min 保留的最小连接数 + * @return void + */ + public function tailor(int $min = 0): void + { + while ($this->_items->length() > $min) { + $connection = $this->_items->pop(0.000001); + if ($connection instanceof StopHeartbeatCheck) { + $connection->stopHeartbeatCheck(); + } + // 关闭连接释放底层资源 + if (is_object($connection) && method_exists($connection, 'close')) { + $connection->close(); + } + $this->created--; + $connection = null; + } + } + + + /** + * 创建连接失败时回退计数器 + * @return void + */ + public function abandon(): void + { + if ($this->created > 0) { + $this->created -= 1; + } + } + + + /** + * 从连接池中获取一个连接 + * 当未达到最大创建数时直接创建新连接,达到上限后阻塞等待归还 + * @param int $waite 等待超时时间(秒) + * @return mixed 连接对象,超时返回 false + */ + public function pop(int $waite = 10): mixed + { + // 未达到最大创建数,直接创建新连接 + if ($this->created < $this->maxCreated) { + $this->created++; + return call_user_func($this->callback); + } + // 已达到最大创建数,阻塞等待连接归还 + return $this->_items->pop($waite); + } +}