Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 2445380bd9 | |||
| 577bd478c3 | |||
| 1ec2bbec2a | |||
| d40e7db483 |
+163
-147
@@ -1,147 +1,163 @@
|
|||||||
<?php
|
<?php
|
||||||
declare(strict_types=1);
|
declare(strict_types=1);
|
||||||
|
|
||||||
namespace Kiri\Pool;
|
namespace Kiri\Pool;
|
||||||
|
|
||||||
use Closure;
|
use Closure;
|
||||||
use Swoole\Coroutine\Channel;
|
use Swoole\Coroutine\Channel;
|
||||||
use Swoole\Coroutine;
|
use Swoole\Coroutine;
|
||||||
|
|
||||||
class PoolItem
|
class PoolItem
|
||||||
{
|
{
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @var Channel|SplQueue
|
* @var Channel|SplQueue
|
||||||
*/
|
*/
|
||||||
private Channel|SplQueue $_items;
|
private Channel|SplQueue $_items;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @var int
|
* @var int 当前已创建的连接数
|
||||||
*/
|
*/
|
||||||
private int $created = 0;
|
private int $created = 0;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param int $maxCreated
|
* @param int $maxCreated 最大允许创建的连接数
|
||||||
* @param Closure|array $callback
|
* @param Closure|array $callback 创建新连接的回调函数
|
||||||
*/
|
*/
|
||||||
public function __construct(readonly public int $maxCreated, readonly public Closure|array $callback)
|
public function __construct(readonly public int $maxCreated, readonly public Closure|array $callback)
|
||||||
{
|
{
|
||||||
$this->reconnect();
|
$this->reconnect();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return bool
|
* @return bool 判断 Channel 是否已关闭
|
||||||
*/
|
*/
|
||||||
public function isClose(): bool
|
public function isClose(): bool
|
||||||
{
|
{
|
||||||
if ($this->_items instanceof Channel) {
|
if ($this->_items instanceof Channel) {
|
||||||
return $this->_items->errCode == SWOOLE_CHANNEL_CLOSED;
|
return $this->_items->errCode == SWOOLE_CHANNEL_CLOSED;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return void
|
* 重新创建内部存储队列 Channel 或 SplQueue
|
||||||
*/
|
* @return void
|
||||||
public function reconnect(): void
|
*/
|
||||||
{
|
public function reconnect(): void
|
||||||
if (Coroutine::getCid() > -1) {
|
{
|
||||||
$this->_items = new Channel($this->maxCreated);
|
if (Coroutine::getCid() > -1) {
|
||||||
} else {
|
$this->_items = new Channel($this->maxCreated);
|
||||||
$this->_items = new SplQueue($this->maxCreated);
|
} else {
|
||||||
}
|
$this->_items = new SplQueue($this->maxCreated);
|
||||||
}
|
}
|
||||||
|
$this->created = 0;
|
||||||
|
}
|
||||||
/**
|
|
||||||
* @param Channel|SplQueue $items
|
|
||||||
*/
|
/**
|
||||||
public function setItems(Channel|SplQueue $items): void
|
* @param Channel|SplQueue $items
|
||||||
{
|
*/
|
||||||
$this->_items = $items;
|
public function setItems(Channel|SplQueue $items): void
|
||||||
}
|
{
|
||||||
|
$this->_items = $items;
|
||||||
|
}
|
||||||
/**
|
|
||||||
* @param mixed $item
|
|
||||||
* @return void
|
/**
|
||||||
*/
|
* 将连接推回池中
|
||||||
public function push(mixed $item): void
|
* @param mixed $item 连接对象
|
||||||
{
|
* @return void
|
||||||
if (is_null($item)) {
|
*/
|
||||||
return;
|
public function push(mixed $item): void
|
||||||
}
|
{
|
||||||
$this->_items->push($item);
|
if (is_null($item)) {
|
||||||
}
|
return;
|
||||||
|
}
|
||||||
|
$this->_items->push($item);
|
||||||
/**
|
}
|
||||||
* @return bool
|
|
||||||
*/
|
|
||||||
public function isEmpty(): bool
|
/**
|
||||||
{
|
* @return bool 判断池是否为空
|
||||||
return $this->_items->isEmpty();
|
*/
|
||||||
}
|
public function isEmpty(): bool
|
||||||
|
{
|
||||||
|
return $this->_items->isEmpty();
|
||||||
/**
|
}
|
||||||
* @return int
|
|
||||||
*/
|
|
||||||
public function size(): int
|
/**
|
||||||
{
|
* @return int 当前池中可用连接数
|
||||||
return $this->_items->length();
|
*/
|
||||||
}
|
public function size(): int
|
||||||
|
{
|
||||||
|
return $this->_items->length();
|
||||||
/**
|
}
|
||||||
* @return bool
|
|
||||||
*/
|
|
||||||
public function close(): bool
|
/**
|
||||||
{
|
* @return bool 关闭连接池
|
||||||
return $this->_items->close();
|
*/
|
||||||
}
|
public function close(): bool
|
||||||
|
{
|
||||||
|
return $this->_items->close();
|
||||||
/**
|
}
|
||||||
* @param int $min
|
|
||||||
* @return void
|
|
||||||
*/
|
/**
|
||||||
public function tailor(int $min = 0): void
|
* 缩容连接池,将连接数缩减至 min 个
|
||||||
{
|
* @param int $min 保留的最小连接数
|
||||||
while ($this->_items->length() > $min) {
|
* @return void
|
||||||
$connection = $this->_items->pop(0.000001);
|
*/
|
||||||
if ($connection instanceof StopHeartbeatCheck) {
|
public function tailor(int $min = 0): void
|
||||||
$connection->stopHeartbeatCheck();
|
{
|
||||||
}
|
while ($this->_items->length() > $min) {
|
||||||
$connection = null;
|
$connection = $this->_items->pop(0.000001);
|
||||||
}
|
if ($connection instanceof StopHeartbeatCheck) {
|
||||||
}
|
$connection->stopHeartbeatCheck();
|
||||||
|
}
|
||||||
|
// 关闭连接释放底层资源
|
||||||
/**
|
if (is_object($connection) && method_exists($connection, 'close')) {
|
||||||
* @return void
|
$connection->close();
|
||||||
*/
|
}
|
||||||
public function abandon(): void
|
$this->created--;
|
||||||
{
|
$connection = null;
|
||||||
$this->created -= 1;
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param int $waite
|
* 创建连接失败时回退计数器
|
||||||
* @return mixed
|
* @return void
|
||||||
*/
|
*/
|
||||||
public function pop(int $waite = 10): mixed
|
public function abandon(): void
|
||||||
{
|
{
|
||||||
if ($this->_items->isEmpty()) {
|
if ($this->created > 0) {
|
||||||
return call_user_func($this->callback);
|
$this->created -= 1;
|
||||||
} else {
|
}
|
||||||
return $this->_items->pop($waite);
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
/**
|
||||||
|
* 从连接池中获取一个连接
|
||||||
|
* 当未达到最大创建数时直接创建新连接,达到上限后阻塞等待归还
|
||||||
|
* @param int $waite 等待超时时间(秒)
|
||||||
|
* @return mixed 连接对象,超时返回 false
|
||||||
|
*/
|
||||||
|
public function pop(int $waite = 10): mixed
|
||||||
|
{
|
||||||
|
// 未达到最大创建数,直接创建新连接
|
||||||
|
if ($this->created < $this->maxCreated) {
|
||||||
|
$this->created++;
|
||||||
|
return call_user_func($this->callback);
|
||||||
|
}
|
||||||
|
// 已达到最大创建数,阻塞等待连接归还
|
||||||
|
return $this->_items->pop($waite);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
+1
-1
@@ -9,7 +9,7 @@
|
|||||||
}
|
}
|
||||||
],
|
],
|
||||||
"require": {
|
"require": {
|
||||||
"php": ">=8.0",
|
"php": ">=8.5",
|
||||||
"composer-runtime-api": "^2.0"
|
"composer-runtime-api": "^2.0"
|
||||||
},
|
},
|
||||||
"autoload": {
|
"autoload": {
|
||||||
|
|||||||
Reference in New Issue
Block a user