4 Commits

Author SHA1 Message Date
as2252258 2445380bd9 更新 PoolItem.php 2026-06-28 06:34:18 +00:00
as2252258 577bd478c3 eee 2026-06-24 20:11:12 +08:00
as2252258 1ec2bbec2a eee 2026-06-12 23:57:23 +08:00
as2252258 d40e7db483 eee 2025-07-14 15:36:04 +08:00
2 changed files with 164 additions and 148 deletions
+163 -147
View File
@@ -1,147 +1,163 @@
<?php <?php
declare(strict_types=1); declare(strict_types=1);
namespace Kiri\Pool; namespace Kiri\Pool;
use Closure; use Closure;
use Swoole\Coroutine\Channel; use Swoole\Coroutine\Channel;
use Swoole\Coroutine; use Swoole\Coroutine;
class PoolItem class PoolItem
{ {
/** /**
* @var Channel|SplQueue * @var Channel|SplQueue
*/ */
private Channel|SplQueue $_items; private Channel|SplQueue $_items;
/** /**
* @var int * @var int 当前已创建的连接数
*/ */
private int $created = 0; private int $created = 0;
/** /**
* @param int $maxCreated * @param int $maxCreated 最大允许创建的连接数
* @param Closure|array $callback * @param Closure|array $callback 创建新连接的回调函数
*/ */
public function __construct(readonly public int $maxCreated, readonly public Closure|array $callback) public function __construct(readonly public int $maxCreated, readonly public Closure|array $callback)
{ {
$this->reconnect(); $this->reconnect();
} }
/** /**
* @return bool * @return bool 判断 Channel 是否已关闭
*/ */
public function isClose(): bool public function isClose(): bool
{ {
if ($this->_items instanceof Channel) { if ($this->_items instanceof Channel) {
return $this->_items->errCode == SWOOLE_CHANNEL_CLOSED; return $this->_items->errCode == SWOOLE_CHANNEL_CLOSED;
} }
return false; return false;
} }
/** /**
* @return void * 重新创建内部存储队列 Channel 或 SplQueue
*/ * @return void
public function reconnect(): void */
{ public function reconnect(): void
if (Coroutine::getCid() > -1) { {
$this->_items = new Channel($this->maxCreated); if (Coroutine::getCid() > -1) {
} else { $this->_items = new Channel($this->maxCreated);
$this->_items = new SplQueue($this->maxCreated); } else {
} $this->_items = new SplQueue($this->maxCreated);
} }
$this->created = 0;
}
/**
* @param Channel|SplQueue $items
*/ /**
public function setItems(Channel|SplQueue $items): void * @param Channel|SplQueue $items
{ */
$this->_items = $items; public function setItems(Channel|SplQueue $items): void
} {
$this->_items = $items;
}
/**
* @param mixed $item
* @return void /**
*/ * 将连接推回池中
public function push(mixed $item): void * @param mixed $item 连接对象
{ * @return void
if (is_null($item)) { */
return; public function push(mixed $item): void
} {
$this->_items->push($item); if (is_null($item)) {
} return;
}
$this->_items->push($item);
/** }
* @return bool
*/
public function isEmpty(): bool /**
{ * @return bool 判断池是否为空
return $this->_items->isEmpty(); */
} public function isEmpty(): bool
{
return $this->_items->isEmpty();
/** }
* @return int
*/
public function size(): int /**
{ * @return int 当前池中可用连接数
return $this->_items->length(); */
} public function size(): int
{
return $this->_items->length();
/** }
* @return bool
*/
public function close(): bool /**
{ * @return bool 关闭连接池
return $this->_items->close(); */
} public function close(): bool
{
return $this->_items->close();
/** }
* @param int $min
* @return void
*/ /**
public function tailor(int $min = 0): void * 缩容连接池,将连接数缩减至 min 个
{ * @param int $min 保留的最小连接数
while ($this->_items->length() > $min) { * @return void
$connection = $this->_items->pop(0.000001); */
if ($connection instanceof StopHeartbeatCheck) { public function tailor(int $min = 0): void
$connection->stopHeartbeatCheck(); {
} while ($this->_items->length() > $min) {
$connection = null; $connection = $this->_items->pop(0.000001);
} if ($connection instanceof StopHeartbeatCheck) {
} $connection->stopHeartbeatCheck();
}
// 关闭连接释放底层资源
/** if (is_object($connection) && method_exists($connection, 'close')) {
* @return void $connection->close();
*/ }
public function abandon(): void $this->created--;
{ $connection = null;
$this->created -= 1; }
} }
/** /**
* @param int $waite * 创建连接失败时回退计数器
* @return mixed * @return void
*/ */
public function pop(int $waite = 10): mixed public function abandon(): void
{ {
if ($this->_items->isEmpty()) { if ($this->created > 0) {
return call_user_func($this->callback); $this->created -= 1;
} else { }
return $this->_items->pop($waite); }
}
}
} /**
* 从连接池中获取一个连接
* 当未达到最大创建数时直接创建新连接,达到上限后阻塞等待归还
* @param int $waite 等待超时时间(秒)
* @return mixed 连接对象,超时返回 false
*/
public function pop(int $waite = 10): mixed
{
// 未达到最大创建数,直接创建新连接
if ($this->created < $this->maxCreated) {
$this->created++;
return call_user_func($this->callback);
}
// 已达到最大创建数,阻塞等待连接归还
return $this->_items->pop($waite);
}
}
+1 -1
View File
@@ -9,7 +9,7 @@
} }
], ],
"require": { "require": {
"php": ">=8.0", "php": ">=8.5",
"composer-runtime-api": "^2.0" "composer-runtime-api": "^2.0"
}, },
"autoload": { "autoload": {