Files
kiri-pool/PoolItem.php
T
2026-06-24 20:11:12 +08:00

164 lines
3.4 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
declare(strict_types=1);
namespace Kiri\Pool;
use Closure;
use Swoole\Coroutine\Channel;
use Swoole\Coroutine;
class PoolItem
{
/**
* @var Channel|SplQueue
*/
private Channel|SplQueue $_items;
/**
* @var int 当前已创建的连接数
*/
private int $created = 0;
/**
* @param int $maxCreated 最大允许创建的连接数
* @param Closure|array $callback 创建新连接的回调函数
*/
public function __construct(readonly public int $maxCreated, readonly public Closure|array $callback)
{
$this->reconnect();
}
/**
* @return bool 判断 Channel 是否已关闭
*/
public function isClose(): bool
{
if ($this->_items instanceof Channel) {
return $this->_items->errCode == SWOOLE_CHANNEL_CLOSED;
}
return false;
}
/**
* 重新创建内部存储队列(Channel 或 SplQueue
* @return void
*/
public function reconnect(): void
{
if (Coroutine::getCid() > -1) {
$this->_items = new Channel($this->maxCreated);
} else {
$this->_items = new SplQueue($this->maxCreated);
}
$this->created = 0;
}
/**
* @param Channel|SplQueue $items
*/
public function setItems(Channel|SplQueue $items): void
{
$this->_items = $items;
}
/**
* 将连接推回池中
* @param mixed $item 连接对象
* @return void
*/
public function push(mixed $item): void
{
if (is_null($item)) {
return;
}
$this->_items->push($item);
}
/**
* @return bool 判断池是否为空
*/
public function isEmpty(): bool
{
return $this->_items->isEmpty();
}
/**
* @return int 当前池中可用连接数
*/
public function size(): int
{
return $this->_items->length();
}
/**
* @return bool 关闭连接池
*/
public function close(): bool
{
return $this->_items->close();
}
/**
* 缩容连接池,将连接数缩减至 min 个
* @param int $min 保留的最小连接数
* @return void
*/
public function tailor(int $min = 0): void
{
while ($this->_items->length() > $min) {
$connection = $this->_items->pop(0.000001);
if ($connection instanceof StopHeartbeatCheck) {
$connection->stopHeartbeatCheck();
}
// 关闭连接释放底层资源
if (is_object($connection) && method_exists($connection, 'close')) {
$connection->close();
}
$this->created--;
$connection = null;
}
}
/**
* 创建连接失败时回退计数器
* @return void
*/
public function abandon(): void
{
if ($this->created > 0) {
$this->created -= 1;
}
}
/**
* 从连接池中获取一个连接
* 当未达到最大创建数时直接创建新连接,达到上限后阻塞等待归还
* @param int $waite 等待超时时间(秒)
* @return mixed 连接对象,超时返回 false
*/
public function pop(int $waite = 10): mixed
{
// 未达到最大创建数,直接创建新连接
if ($this->created < $this->maxCreated) {
$this->created++;
return call_user_func($this->callback);
}
// 已达到最大创建数,阻塞等待连接归还
return $this->_items->pop($waite);
}
}