diff --git a/kiri-engine/Pool/Alias.php b/kiri-engine/Pool/Alias.php deleted file mode 100644 index 2df8eb47..00000000 --- a/kiri-engine/Pool/Alias.php +++ /dev/null @@ -1,24 +0,0 @@ -pool->hasChannel($config['cds'])) { - $this->pool->initConnections($config['cds'], $config['pool']['max']); + if (!$this->pool->hasChannel($cds)) { + throw new Exception('Queue not exists.'); } - return $this->pool->get($config['cds'], $this->create($config)); - } - - - /** - * @param array $config - * @return Closure - */ - public function generate(array $config): Closure - { - return static function () use ($config) { - Kiri::getDi()->get(Kiri\Error\StdoutLoggerInterface::class)->alert('create database connect(' . $config['cds'] . ')'); - - $link = new \PDO('mysql:dbname=' . $config['dbname'] . ';host=' . $config['cds'], $config['username'], $config['password'], [ - \PDO::ATTR_EMULATE_PREPARES => false, - \PDO::ATTR_CASE => \PDO::CASE_NATURAL, - \PDO::ATTR_PERSISTENT => true, - \PDO::ATTR_TIMEOUT => $config['connect_timeout'], - \PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES ' . ($config['charset'] ?? 'utf8mb4') - ]); - $link->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION); - $link->setAttribute(\PDO::ATTR_STRINGIFY_FETCHES, false); - $link->setAttribute(\PDO::ATTR_ORACLE_NULLS, \PDO::NULL_EMPTY_STRING); - foreach ($config['attributes'] as $key => $attribute) { - $link->setAttribute($key, $attribute); - } - if (Db::inTransactionsActive()) { - $link->beginTransaction(); - } - return $link; - }; + return $this->pool->get($cds); } /** * @param string $name * @return array - * @throws Kiri\Exception\ConfigException */ public function check(string $name): array { @@ -178,13 +148,14 @@ class Connection extends Component /** - * @param $name - * @param $max - * @throws Exception + * @param array $config + * @param int $max */ - public function initConnections($name, $max) + public function initConnections(array $config, int $max) { - $this->pool->initConnections($name, $max); + $this->pool->initConnections($config['cds'], $max, function () use ($config) { + return new \Database\Mysql\PDO($config); + }); } diff --git a/kiri-engine/Pool/Pool.php b/kiri-engine/Pool/Pool.php index 6f7f3908..7a1bffb6 100644 --- a/kiri-engine/Pool/Pool.php +++ b/kiri-engine/Pool/Pool.php @@ -9,6 +9,7 @@ use Exception; use Kiri\Abstracts\Component; use Kiri\Abstracts\Config; use Kiri\Annotation\Inject; +use Kiri\Di\ContainerInterface; use Kiri\Exception\ConfigException; use Kiri\Server\Abstracts\StatusEnum; use Kiri\Server\WorkerStatus; @@ -21,8 +22,8 @@ use Kiri\Server\WorkerStatus; class Pool extends Component { - /** @var array */ - private static array $_connections = []; + /** @var array */ + private array $_connections = []; /** * @var WorkerStatus @@ -30,8 +31,11 @@ class Pool extends Component #[Inject(WorkerStatus::class)] public WorkerStatus $status; - - use Alias; + /** + * @var ContainerInterface + */ + #[Inject(ContainerInterface::class)] + public ContainerInterface $container; /** @@ -42,106 +46,93 @@ class Pool extends Component public function flush($name, $retain_number) { if ($this->hasChannel($name)) { - $this->pop($this->channel($name), $retain_number); + $channel = $this->channel($name); + $channel->tailor($retain_number); } } /** - * @param PoolQueue $channel + * @param PoolItem $channel * @param $retain_number */ - protected function pop(PoolQueue $channel, $retain_number): void + protected function pop(PoolItem $channel, $retain_number): void { - while ($channel->length() > $retain_number) { - $connection = $channel->pop(0.001); - if ($connection instanceof StopHeartbeatCheck) { - $connection->stopHeartbeatCheck(); - } - } + $channel->tailor($retain_number); } /** * @param $name * @return array - * @throws ConfigException */ public function check($name): array { - $channel = $this->channel($name); - if ($channel->length() < 1) { - return [0, 0]; - } - - if ($this->status->is(StatusEnum::EXIT)) { - $channel->close(); - return [0, 0]; - } - - $success = 0; - $lists = []; - $count = $channel->length(); - while ($this->status->is(StatusEnum::EXIT) === false) { - if (!(($pdo = $channel->pop(0.001)) instanceof PDO)) { - break; - } - if ($pdo->check()) { - $success += 1; - } - $lists[] = $pdo; - } - if ($this->status->is(StatusEnum::EXIT) === false) { - foreach ($lists as $list) { - $channel->push($list); - } - } else { - $channel->close(); - } - return [$count, $success]; +// $channel = $this->channel($name); +// if ($channel->size() < 1) { +// return [0, 0]; +// } +// +// if ($this->status->is(StatusEnum::EXIT)) { +// $channel->close(); +// return [0, 0]; +// } +// +// $success = 0; +// $lists = []; +// $count = $channel->size(); +// while ($this->status->is(StatusEnum::EXIT) === false) { +// if (!(($pdo = $channel->pop(0.001)) instanceof PDO)) { +// break; +// } +// if ($pdo->check()) { +// $success += 1; +// } +// $lists[] = $pdo; +// } +// if ($this->status->is(StatusEnum::EXIT) === false) { +// foreach ($lists as $list) { +// $channel->push($list); +// } +// } else { +// $channel->close(); +// } +// return [$count, $success]; + return [0, 0]; } /** * @param $name * @param int $max + * @param \Closure $closure */ - public function initConnections($name, int $max = 60) + public function initConnections($name, int $max, \Closure $closure) { - $channel = static::$_connections[$name] ?? null; - if (($channel instanceof PoolQueue) && !$channel->isClose()) { - return; + if (!isset($this->_connections[$name])) { + $this->_connections[$name] = new PoolItem($max, $closure); } - static::$_connections[$name] = new PoolQueue($max); } /** * @param $name - * @return PoolQueue + * @return PoolItem * @throws ConfigException * @throws Exception */ - public function channel($name): PoolQueue + public function channel($name): PoolItem { - $channel = static::$_connections[$name] ?? null; - if (!($channel instanceof PoolQueue)) { + if (!isset($this->_connections[$name])) { throw new Exception('Channel is not exists.'); } - if ($channel->isClose()) { - throw new Exception('Channel is Close.'); - } - return $channel; + return $this->_connections[$name]; } public function hasChannel($name): bool { - $channel = static::$_connections[$name] ?? null; - if (!($channel instanceof PoolQueue)) { - return false; - } - if ($channel->isClose()) { + if (!isset($this->_connections[$name])) { return false; } return true; @@ -149,32 +140,13 @@ class Pool extends Component /** - * @param $name - * @param $callback + * @param string $name * @return array * @throws ConfigException - * @throws Exception */ - public function get($name, $callback): mixed + public function get(string $name): mixed { - $channel = $this->channel($name); - if (!$channel->isEmpty()) { - return $channel->pop(); - } - return $callback(); - } - - - /** - * @param $channel - * @param $minx - * @return void - */ - protected function maxIdleQuantity($channel, $minx): void - { - if ($channel->length() > $minx) { - $this->pop($channel, $minx); - } + return $this->channel($name)->pop(); } @@ -207,8 +179,8 @@ class Pool extends Component */ public function hasItem(string $name): bool { - $channel = static::$_connections[$name] ?? null; - if (!($channel instanceof PoolQueue) || $channel->isClose()) { + $channel = $this->_connections[$name] ?? null; + if ($channel === null) { return false; } return !$channel->isEmpty(); @@ -221,11 +193,11 @@ class Pool extends Component */ public function size(string $name): int { - $channel = static::$_connections[$name] ?? null; - if (!($channel instanceof PoolQueue) || $channel->isClose()) { + $channel = $this->_connections[$name] ?? null; + if ($channel === null) { return 0; } - return $channel->length(); + return $channel->size(); } @@ -236,10 +208,7 @@ class Pool extends Component */ public function push(string $name, mixed $client) { - $channel = $this->channel($name); - if (!$channel->isFull()) { - $channel->push($client); - } + $this->channel($name)->push($client); } @@ -255,23 +224,17 @@ class Pool extends Component } - /** * @param string $name * @throws Exception */ public function clean(string $name) { - $channel = static::$_connections[$name] ?? null; - if (!($channel instanceof PoolQueue) || $channel->isClose()) { + $channel = $this->_connections[$name] ?? null; + if ($channel === null) { return; } - while ($channel->length() > 0) { - $client = $channel->pop(); - if ($client instanceof StopHeartbeatCheck) { - $client->stopHeartbeatCheck(); - } - } + $channel->tailor(0); $channel->close(); } @@ -281,7 +244,7 @@ class Pool extends Component */ protected function channels(): array { - return static::$_connections; + return $this->_connections; } diff --git a/kiri-engine/Pool/PoolItem.php b/kiri-engine/Pool/PoolItem.php new file mode 100644 index 00000000..3078dc72 --- /dev/null +++ b/kiri-engine/Pool/PoolItem.php @@ -0,0 +1,110 @@ +_items = new PoolQueue($this->maxCreated); + } + + + /** + * @param PoolQueue $items + */ + public function setItems(PoolQueue $items): void + { + $this->_items = $items; + } + + + /** + * @param mixed $item + * @return void + */ + public function push(mixed $item): void + { + $this->_items->push($item); + } + + + /** + * @return bool + */ + public function isEmpty(): bool + { + return $this->_items->isEmpty(); + } + + + /** + * @return bool + */ + public function size(): bool + { + return $this->_items->length(); + } + + + /** + * @return bool + */ + public function close(): bool + { + return $this->_items->close(); + } + + + /** + * @param int $min + * @return void + */ + public function tailor(int $min = 0): void + { + while ($this->_items->length() > $min) { + $connection = $this->_items->pop(0.000001); + if ($connection instanceof StopHeartbeatCheck) { + $connection->stopHeartbeatCheck(); + } + $connection = null; + $this->created -= 1; + } + } + + + /** + * @param int $waite + * @return mixed + */ + public function pop(int $waite = 10): mixed + { + if ($this->created < $this->maxCreated) { + $this->created += 1; + return call_user_func($this->callback); + } + return $this->_items->pop($waite); + } +} diff --git a/kiri-engine/Pool/QueueInterface.php b/kiri-engine/Pool/QueueInterface.php index 8f29b20f..6e24515e 100644 --- a/kiri-engine/Pool/QueueInterface.php +++ b/kiri-engine/Pool/QueueInterface.php @@ -6,26 +6,54 @@ interface QueueInterface { + /** + * @return bool + */ public function isEmpty(): bool; - public function push(mixed $data, float $timeout = -1): bool; + + /** + * @param mixed $data + * @param float $timeout + * @return void + */ + public function push(mixed $data, float $timeout = -1): void; + /** + * @param float $timeout + * @return mixed + */ public function pop(float $timeout = -1): mixed; + /** + * @return array + */ public function stats(): array; + /** + * @return bool + */ public function close(): bool; + /** + * @return int + */ public function length(): int; + /** + * @return bool + */ public function isFull(): bool; + /** + * @return bool + */ public function isClose(): bool; }