reconnect(); } /** * @return bool 判断 Channel 是否已关闭 */ public function isClose(): bool { if ($this->_items instanceof Channel) { return $this->_items->errCode == SWOOLE_CHANNEL_CLOSED; } return false; } /** * 重新创建内部存储队列 Channel 或 SplQueue * @return void */ public function reconnect(): void { if (Coroutine::getCid() > -1) { $this->_items = new Channel($this->maxCreated); } else { $this->_items = new SplQueue($this->maxCreated); } $this->created = 0; } /** * @param Channel|SplQueue $items */ public function setItems(Channel|SplQueue $items): void { $this->_items = $items; } /** * 将连接推回池中 * @param mixed $item 连接对象 * @return void */ public function push(mixed $item): void { if (is_null($item)) { return; } $this->_items->push($item); } /** * @return bool 判断池是否为空 */ public function isEmpty(): bool { return $this->_items->isEmpty(); } /** * @return int 当前池中可用连接数 */ public function size(): int { return $this->_items->length(); } /** * @return bool 关闭连接池 */ public function close(): bool { return $this->_items->close(); } /** * 缩容连接池,将连接数缩减至 min 个 * @param int $min 保留的最小连接数 * @return void */ public function tailor(int $min = 0): void { while ($this->_items->length() > $min) { $connection = $this->_items->pop(0.000001); if ($connection instanceof StopHeartbeatCheck) { $connection->stopHeartbeatCheck(); } // 关闭连接释放底层资源 if (is_object($connection) && method_exists($connection, 'close')) { $connection->close(); } $this->created--; $connection = null; } } /** * 创建连接失败时回退计数器 * @return void */ public function abandon(): void { if ($this->created > 0) { $this->created -= 1; } } /** * 从连接池中获取一个连接 * 当未达到最大创建数时直接创建新连接,达到上限后阻塞等待归还 * @param int $waite 等待超时时间(秒) * @return mixed 连接对象,超时返回 false */ public function pop(int $waite = 10): mixed { // 未达到最大创建数,直接创建新连接 if ($this->created < $this->maxCreated) { $this->created++; return call_user_func($this->callback); } // 已达到最大创建数,阻塞等待连接归还 return $this->_items->pop($waite); } }