Files
kiri-core/kiri-engine/Pool/Pool.php
T

289 lines
5.2 KiB
PHP
Raw Normal View History

2022-01-09 03:50:38 +08:00
<?php
namespace Kiri\Pool;
2022-06-17 11:59:19 +08:00
use Database\Mysql\PDO;
2022-01-09 03:50:38 +08:00
use Exception;
use Kiri\Abstracts\Component;
use Kiri\Abstracts\Config;
2022-07-11 16:09:58 +08:00
use Kiri\Annotation\Inject;
2022-01-09 03:50:38 +08:00
use Kiri\Exception\ConfigException;
2022-07-11 16:09:58 +08:00
use Kiri\Server\Abstracts\StatusEnum;
use Kiri\Server\WorkerStatus;
2022-01-09 03:50:38 +08:00
/**
* Class Pool
* @package Kiri\Pool
*/
class Pool extends Component
{
2022-06-17 11:59:19 +08:00
/** @var array<PoolQueue> */
2022-01-09 03:50:38 +08:00
private static array $_connections = [];
2022-07-11 16:09:58 +08:00
/**
* @var WorkerStatus
*/
#[Inject(WorkerStatus::class)]
public WorkerStatus $status;
2022-01-09 03:50:38 +08:00
use Alias;
/**
2022-09-19 18:39:28 +08:00
* @param $name
2022-01-09 03:50:38 +08:00
* @param $retain_number
* @throws Exception
*/
2022-09-19 18:39:28 +08:00
public function flush($name, $retain_number)
2022-01-09 03:50:38 +08:00
{
2023-02-07 17:46:41 +08:00
if ($this->hasChannel($name)) {
$this->pop($this->channel($name), $retain_number);
}
2022-01-09 03:50:38 +08:00
}
/**
2022-06-17 11:59:19 +08:00
* @param PoolQueue $channel
2022-01-09 03:50:38 +08:00
* @param $retain_number
*/
2022-06-17 11:59:19 +08:00
protected function pop(PoolQueue $channel, $retain_number): void
2022-01-09 03:50:38 +08:00
{
while ($channel->length() > $retain_number) {
2022-07-11 18:33:25 +08:00
$connection = $channel->pop(0.001);
if ($connection instanceof StopHeartbeatCheck) {
$connection->stopHeartbeatCheck();
2022-01-09 03:50:38 +08:00
}
}
}
2022-06-17 11:59:19 +08:00
/**
* @param $name
2022-06-17 14:07:16 +08:00
* @return array
2022-06-17 11:59:19 +08:00
* @throws ConfigException
*/
2022-06-17 14:07:16 +08:00
public function check($name): array
2022-06-17 11:59:19 +08:00
{
$channel = $this->channel($name);
2022-06-17 14:30:31 +08:00
if ($channel->length() < 1) {
return [0, 0];
}
2022-06-17 14:04:23 +08:00
2022-07-11 16:09:58 +08:00
if ($this->status->is(StatusEnum::EXIT)) {
$channel->close();
return [0, 0];
}
2022-07-11 14:50:33 +08:00
$success = 0;
2022-06-17 14:04:23 +08:00
$lists = [];
2022-07-11 14:50:33 +08:00
$count = $channel->length();
2022-07-11 16:21:51 +08:00
while ($this->status->is(StatusEnum::EXIT) === false) {
2022-07-11 17:51:41 +08:00
if (!(($pdo = $channel->pop(0.001)) instanceof PDO)) {
2022-07-11 17:05:32 +08:00
break;
}
2022-06-17 14:07:16 +08:00
if ($pdo->check()) {
$success += 1;
}
2022-06-17 14:04:23 +08:00
$lists[] = $pdo;
}
2022-07-11 16:21:51 +08:00
if ($this->status->is(StatusEnum::EXIT) === false) {
foreach ($lists as $list) {
$channel->push($list);
}
} else {
$channel->close();
2022-06-17 14:04:23 +08:00
}
2022-06-17 14:07:16 +08:00
return [$count, $success];
2022-06-17 11:59:19 +08:00
}
2022-01-09 03:50:38 +08:00
/**
* @param $name
* @param int $max
*/
2022-02-24 11:00:01 +08:00
public function initConnections($name, int $max = 60)
2022-01-09 03:50:38 +08:00
{
2023-02-07 16:46:13 +08:00
$channel = static::$_connections[$name] ?? null;
if (($channel instanceof PoolQueue) && !$channel->isClose()) {
return;
2022-01-09 03:50:38 +08:00
}
2023-02-07 16:46:13 +08:00
static::$_connections[$name] = new PoolQueue($max);
2022-01-09 03:50:38 +08:00
}
/**
* @param $name
2022-06-17 11:59:19 +08:00
* @return PoolQueue
2022-01-09 03:50:38 +08:00
* @throws ConfigException
* @throws Exception
*/
2022-06-17 11:59:19 +08:00
public function channel($name): PoolQueue
2022-01-09 03:50:38 +08:00
{
2023-02-07 16:46:13 +08:00
$channel = static::$_connections[$name] ?? null;
2023-02-07 17:16:10 +08:00
if (!($channel instanceof PoolQueue)) {
2023-02-07 16:46:13 +08:00
throw new Exception('Channel is not exists.');
2022-01-09 03:50:38 +08:00
}
2023-02-07 16:46:13 +08:00
if ($channel->isClose()) {
2022-01-09 03:50:38 +08:00
throw new Exception('Channel is Close.');
}
2023-02-07 16:46:13 +08:00
return $channel;
2022-01-09 03:50:38 +08:00
}
2023-02-07 17:19:31 +08:00
public function hasChannel($name): bool
2023-02-07 17:16:10 +08:00
{
$channel = static::$_connections[$name] ?? null;
if (!($channel instanceof PoolQueue)) {
return false;
}
if ($channel->isClose()) {
return false;
}
return true;
}
2022-01-09 03:50:38 +08:00
/**
* @param $name
* @param $callback
* @return array
* @throws ConfigException
* @throws Exception
*/
2023-04-02 00:34:55 +08:00
public function get($name, $callback): mixed
2022-01-09 03:50:38 +08:00
{
2023-04-02 00:34:55 +08:00
$channel = $this->channel($name);
if (!$channel->isEmpty()) {
return $channel->pop();
}
return $callback();
2022-01-09 03:50:38 +08:00
}
/**
* @param $channel
* @param $minx
2022-09-19 18:39:28 +08:00
* @return void
2022-01-09 03:50:38 +08:00
*/
2022-09-19 18:39:28 +08:00
protected function maxIdleQuantity($channel, $minx): void
2022-01-09 03:50:38 +08:00
{
if ($channel->length() > $minx) {
$this->pop($channel, $minx);
}
}
/**
* @param $name
* @return bool
* @throws ConfigException
*/
public function isNull($name): bool
{
2022-06-17 11:59:19 +08:00
return $this->channel($name)->isEmpty();
2022-01-09 03:50:38 +08:00
}
/**
* @param string $name
* @param mixed $client
* @return bool
* 检查连接可靠性
*/
public function checkCanUse(string $name, mixed $client): bool
{
return true;
}
/**
* @param string $name
* @return bool
*/
public function hasItem(string $name): bool
{
2023-02-07 16:46:13 +08:00
$channel = static::$_connections[$name] ?? null;
if (!($channel instanceof PoolQueue) || $channel->isClose()) {
return false;
2022-01-09 03:50:38 +08:00
}
2023-02-07 16:46:13 +08:00
return !$channel->isEmpty();
2022-01-09 03:50:38 +08:00
}
/**
* @param string $name
2023-01-30 11:03:58 +08:00
* @return int
2022-01-09 03:50:38 +08:00
*/
2023-01-30 11:03:58 +08:00
public function size(string $name): int
2022-01-09 03:50:38 +08:00
{
2023-02-07 16:46:13 +08:00
$channel = static::$_connections[$name] ?? null;
if (!($channel instanceof PoolQueue) || $channel->isClose()) {
2022-01-09 03:50:38 +08:00
return 0;
}
2023-02-07 16:46:13 +08:00
return $channel->length();
2022-01-09 03:50:38 +08:00
}
/**
* @param string $name
* @param mixed $client
* @throws ConfigException
*/
public function push(string $name, mixed $client)
{
2022-06-17 11:59:19 +08:00
$channel = $this->channel($name);
2022-01-09 03:50:38 +08:00
if (!$channel->isFull()) {
$channel->push($client);
}
}
2023-04-03 00:14:29 +08:00
/**
* @param $name
* @param int $time
* @return array
* @throws ConfigException
*/
public function waite($name, int $time = 30): mixed
{
return $this->channel($name)->pop($time);
}
2022-01-09 03:50:38 +08:00
/**
* @param string $name
* @throws Exception
*/
public function clean(string $name)
{
2023-02-07 16:46:13 +08:00
$channel = static::$_connections[$name] ?? null;
if (!($channel instanceof PoolQueue) || $channel->isClose()) {
2022-01-09 03:50:38 +08:00
return;
}
2023-02-07 16:46:13 +08:00
while ($channel->length() > 0) {
$client = $channel->pop();
2022-01-09 03:50:38 +08:00
if ($client instanceof StopHeartbeatCheck) {
$client->stopHeartbeatCheck();
}
}
2023-02-07 16:46:13 +08:00
$channel->close();
2022-01-09 03:50:38 +08:00
}
/**
2022-06-17 11:59:19 +08:00
* @return PoolQueue[]
2022-01-09 03:50:38 +08:00
*/
2022-06-17 11:59:19 +08:00
protected function channels(): array
2022-01-09 03:50:38 +08:00
{
return static::$_connections;
}
}