Files
kiri-core/kiri-engine/Pool/Pool.php
T

250 lines
4.5 KiB
PHP
Raw Normal View History

2020-08-31 01:27:08 +08:00
<?php
2021-08-11 01:04:57 +08:00
namespace Kiri\Pool;
2020-08-31 01:27:08 +08:00
2021-03-31 18:37:53 +08:00
use Exception;
2021-10-26 10:56:03 +08:00
use Kiri\Context;
2021-08-11 01:04:57 +08:00
use Kiri\Abstracts\Component;
use Kiri\Abstracts\Config;
use Kiri\Exception\ConfigException;
2021-08-24 15:47:31 +08:00
use Kiri\Pool\Helper\SplQueue;
2021-07-12 17:45:04 +08:00
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
2020-08-31 01:27:08 +08:00
/**
* Class Pool
2021-08-11 01:04:57 +08:00
* @package Kiri\Pool
2020-08-31 01:27:08 +08:00
*/
2021-07-12 17:45:04 +08:00
class Pool extends Component
2020-08-31 01:27:08 +08:00
{
2021-09-07 14:47:44 +08:00
/** @var Channel[] */
private static array $_connections = [];
public int $max = 60;
use Alias;
/**
* @param $channel
* @param $retain_number
* @throws Exception
*/
public function flush($channel, $retain_number)
{
$this->pop($channel, $retain_number);
}
/**
* @param Channel $channel
* @param $retain_number
* @throws Exception
*/
protected function pop(Channel $channel, $retain_number): void
{
while ($channel->length() > $retain_number) {
if (Context::inCoroutine()) {
$connection = $channel->pop();
if ($connection instanceof StopHeartbeatCheck) {
$connection->stopHeartbeatCheck();
}
}
}
}
/**
* @param $name
* @param false $isMaster
* @param int $max
* @throws ConfigException
*/
public function initConnections($name, bool $isMaster = false, int $max = 60)
{
$name = $this->name($name, $isMaster);
if (isset(static::$_connections[$name])) {
$value = static::$_connections[$name];
if ($value instanceof Channel || $value instanceof SplQueue) {
return;
}
}
$this->newChannel($name, $max);
$this->max = $max;
}
/**
* @param $name
* @return Channel|SplQueue
* @throws ConfigException
* @throws Exception
*/
private function getChannel($name): Channel|SplQueue
{
if (!isset(static::$_connections[$name])) {
$this->newChannel($name);
}
if (static::$_connections[$name]->errCode == SWOOLE_CHANNEL_CLOSED) {
throw new Exception('Channel is Close.');
}
return static::$_connections[$name];
}
/**
* @throws ConfigException
*/
private function newChannel($name, $max = null)
{
if ($max == null) {
$max = Config::get('databases.pool.max', 10);
}
if (Coroutine::getCid() === -1) {
static::$_connections[$name] = new SplQueue($max);
} else {
static::$_connections[$name] = new Channel($max);
}
}
/**
* @param $name
* @param $callback
2021-10-28 18:46:14 +08:00
* @param $minx
2021-09-07 14:47:44 +08:00
* @return array
* @throws ConfigException
2021-10-28 18:46:14 +08:00
* @throws Exception
2021-09-07 14:47:44 +08:00
*/
2021-10-28 18:46:14 +08:00
public function get($name, $callback, $minx): mixed
2021-09-07 14:47:44 +08:00
{
$channel = $this->getChannel($name);
if (!$channel->isEmpty()) {
2021-10-28 18:46:14 +08:00
return $this->maxIdleQuantity($channel, $minx);
2021-09-07 14:47:44 +08:00
}
return $callback();
}
/**
* @param $channel
2021-10-28 18:46:14 +08:00
* @param $minx
2021-09-07 14:47:44 +08:00
* @return mixed
2021-10-18 11:35:13 +08:00
* @throws Exception
2021-09-07 14:47:44 +08:00
*/
2021-10-28 18:46:14 +08:00
protected function maxIdleQuantity($channel, $minx): mixed
2021-09-07 14:47:44 +08:00
{
$connection = $channel->pop();
if ($channel->length() > $minx) {
$this->pop($channel, $minx);
}
return $connection;
}
/**
* @param $name
* @return bool
* @throws ConfigException
*/
public function isNull($name): bool
{
return $this->getChannel($name)->isEmpty();
}
/**
* @param string $name
* @param mixed $client
* @return bool
* 检查连接可靠性
*/
public function checkCanUse(string $name, mixed $client): bool
{
return true;
}
/**
* @param string $name
* @return bool
*/
public function hasItem(string $name): bool
{
if (isset(static::$_connections[$name])) {
return !static::$_connections[$name]->isEmpty();
}
return false;
}
/**
* @param string $name
* @return mixed
*/
public function size(string $name): mixed
{
if (!isset(static::$_connections[$name])) {
return 0;
}
return static::$_connections[$name]->length();
}
/**
* @param string $name
* @param mixed $client
* @throws ConfigException
*/
public function push(string $name, mixed $client)
{
$channel = $this->getChannel($name);
if (!$channel->isFull()) {
$channel->push($client);
}
unset($client);
}
/**
* @param string $name
* @throws Exception
*/
public function clean(string $name)
{
if (!isset(static::$_connections[$name])) {
return;
}
2021-10-20 16:54:32 +08:00
while (static::$_connections[$name]->length() > 0) {
2021-11-11 02:22:14 +08:00
if (static::$_connections[$name] instanceof Channel)
2021-11-11 01:32:04 +08:00
{
2021-11-11 02:22:14 +08:00
if (!Context::inCoroutine())
{
break;
}
2021-11-11 01:32:04 +08:00
}
2021-10-20 16:54:32 +08:00
$client = static::$_connections[$name]->pop();
2021-10-20 14:42:20 +08:00
if ($client instanceof StopHeartbeatCheck) {
$client->stopHeartbeatCheck();
}
}
2021-09-07 14:47:44 +08:00
static::$_connections[$name] = null;
unset(static::$_connections[$name]);
}
/**
* @return Channel[]
*/
protected function getChannels(): array
{
return static::$_connections;
}
2021-07-12 17:45:04 +08:00
2020-08-31 01:27:08 +08:00
}