This commit is contained in:
2023-08-16 16:33:57 +08:00
parent c8f7a7dcb2
commit 04db55e79a
3 changed files with 142 additions and 114 deletions
+123 -112
View File
@@ -14,31 +14,31 @@ use Exception;
class Pool implements PoolInterface class Pool implements PoolInterface
{ {
/** @var array<PoolItem> */ /** @var array<PoolItem> */
protected array $_connections = []; protected array $_connections = [];
/** /**
* @param $name * @param $name
* @param $retain_number * @param $retain_number
* @throws Exception * @throws Exception
*/ */
public function flush($name, $retain_number): void public function flush($name, $retain_number): void
{ {
if ($this->hasChannel($name)) { if ($this->hasChannel($name)) {
$this->channel($name)->tailor($retain_number); $this->channel($name)->tailor($retain_number);
} }
} }
/** /**
* @param PoolItem $channel * @param PoolItem $channel
* @param $retain_number * @param $retain_number
*/ */
protected function pop(PoolItem $channel, $retain_number): void protected function pop(PoolItem $channel, $retain_number): void
{ {
$channel->tailor($retain_number); $channel->tailor($retain_number);
} }
/** /**
@@ -46,40 +46,51 @@ class Pool implements PoolInterface
* @param int $max * @param int $max
* @param callable $closure * @param callable $closure
*/ */
public function created($name, int $max, callable $closure): void public function created($name, int $max, callable $closure): void
{ {
if (!isset($this->_connections[$name])) { if (!isset($this->_connections[$name])) {
$this->_connections[$name] = new PoolItem($max, $closure); $this->_connections[$name] = new PoolItem($max, $closure);
} }
} }
/** /**
* @param $name * @param $name
* @return PoolItem * @return PoolItem
* @throws Exception * @throws Exception
*/ */
public function channel($name): PoolItem public function channel($name): PoolItem
{ {
if (!isset($this->_connections[$name])) { if (!isset($this->_connections[$name])) {
throw new Exception('Channel is not exists.'); throw new Exception('Channel is not exists.');
} }
$channel = $this->_connections[$name]; $channel = $this->_connections[$name];
if ($channel->isClose()) { if ($channel->isClose()) {
$channel->reconnect(); $channel->reconnect();
} }
return $channel; return $channel;
} }
/** /**
* @param $name * @param $name
* @return bool * @return bool
*/ */
public function hasChannel($name): bool public function hasChannel($name): bool
{ {
return isset($this->_connections[$name]) && $this->_connections[$name] instanceof PoolItem; return isset($this->_connections[$name]) && $this->_connections[$name] instanceof PoolItem;
} }
/**
* @param string $name
* @return void
* @throws Exception
*/
public function abandon(string $name): void
{
$this->channel($name)->abandon();
}
/** /**
@@ -87,10 +98,10 @@ class Pool implements PoolInterface
* @return array * @return array
* @throws Exception * @throws Exception
*/ */
public function get(string $name): mixed public function get(string $name): mixed
{ {
return $this->channel($name)->pop(); return $this->channel($name)->pop();
} }
/** /**
@@ -98,50 +109,50 @@ class Pool implements PoolInterface
* @return bool * @return bool
* @throws Exception * @throws Exception
*/ */
public function isNull($name): bool public function isNull($name): bool
{ {
return $this->channel($name)->isEmpty(); return $this->channel($name)->isEmpty();
} }
/** /**
* @param string $name * @param string $name
* @param mixed $client * @param mixed $client
* @return bool * @return bool
* 检查连接可靠性 * 检查连接可靠性
*/ */
public function checkCanUse(string $name, mixed $client): bool public function checkCanUse(string $name, mixed $client): bool
{ {
return true; return true;
} }
/** /**
* @param string $name * @param string $name
* @return bool * @return bool
*/ */
public function hasItem(string $name): bool public function hasItem(string $name): bool
{ {
$channel = $this->_connections[$name] ?? null; $channel = $this->_connections[$name] ?? null;
if ($channel === null) { if ($channel === null) {
return false; return false;
} }
return !$channel->isEmpty(); return !$channel->isEmpty();
} }
/** /**
* @param string $name * @param string $name
* @return int * @return int
*/ */
public function size(string $name): int public function size(string $name): int
{ {
$channel = $this->_connections[$name] ?? null; $channel = $this->_connections[$name] ?? null;
if ($channel === null) { if ($channel === null) {
return 0; return 0;
} }
return $channel->size(); return $channel->size();
} }
/** /**
@@ -149,10 +160,10 @@ class Pool implements PoolInterface
* @param mixed $data * @param mixed $data
* @throws Exception * @throws Exception
*/ */
public function push(string $name, mixed $data): void public function push(string $name, mixed $data): void
{ {
$this->channel($name)->push($data); $this->channel($name)->push($data);
} }
/** /**
@@ -161,35 +172,35 @@ class Pool implements PoolInterface
* @return array * @return array
* @throws Exception * @throws Exception
*/ */
public function waite($name, int $time = 30): mixed public function waite($name, int $time = 30): mixed
{ {
return $this->channel($name)->pop($time); return $this->channel($name)->pop($time);
} }
/** /**
* @param string $name * @param string $name
* @throws Exception * @throws Exception
*/ */
public function close(string $name): void public function close(string $name): void
{ {
$channel = $this->_connections[$name] ?? null; $channel = $this->_connections[$name] ?? null;
if ($channel === null) { if ($channel === null) {
return; return;
} }
$channel->tailor(0); $channel->tailor(0);
$channel->close(); $channel->close();
} }
/** /**
* return pool queue lists * return pool queue lists
* @return PoolItem[] * @return PoolItem[]
*/ */
protected function channels(): array protected function channels(): array
{ {
return $this->_connections; return $this->_connections;
} }
/** /**
+7
View File
@@ -22,6 +22,13 @@ interface PoolInterface
public function get(string $name): mixed; public function get(string $name): mixed;
/**
* @param string $name
* @return void
*/
public function abandon(string $name): void;
/** /**
* @param string $name * @param string $name
* @param mixed $data * @param mixed $data
+12 -2
View File
@@ -76,7 +76,7 @@ class PoolItem
public function push(mixed $item): void public function push(mixed $item): void
{ {
if (is_null($item)) { if (is_null($item)) {
$item = call_user_func($this->callback); return;
} }
$this->_items->push($item); $this->_items->push($item);
} }
@@ -126,13 +126,23 @@ class PoolItem
} }
/**
* @return void
*/
public function abandon(): void
{
$this->created -= 1;
}
/** /**
* @param int $waite * @param int $waite
* @return mixed * @return mixed
*/ */
public function pop(int $waite = 10): mixed public function pop(int $waite = 10): mixed
{ {
if ($this->_items->isEmpty()) { if ($this->_items->isEmpty() && $this->created >= $this->maxCreated) {
$this->created += 1;
return call_user_func($this->callback); return call_user_func($this->callback);
} else { } else {
return $this->_items->pop($waite); return $this->_items->pop($waite);