改名
This commit is contained in:
+181
-180
@@ -21,216 +21,217 @@ use Swoole\Coroutine\Channel;
|
||||
class Pool extends Component
|
||||
{
|
||||
|
||||
/** @var Channel[] */
|
||||
private static array $_connections = [];
|
||||
/** @var Channel[] */
|
||||
private static array $_connections = [];
|
||||
|
||||
public int $max = 60;
|
||||
public int $max = 60;
|
||||
|
||||
use Alias;
|
||||
use Alias;
|
||||
|
||||
|
||||
/**
|
||||
* @param $channel
|
||||
* @param $retain_number
|
||||
* @throws Exception
|
||||
*/
|
||||
public function flush($channel, $retain_number)
|
||||
{
|
||||
$this->pop($channel, $retain_number);
|
||||
}
|
||||
/**
|
||||
* @param $channel
|
||||
* @param $retain_number
|
||||
* @throws Exception
|
||||
*/
|
||||
public function flush($channel, $retain_number)
|
||||
{
|
||||
$this->pop($channel, $retain_number);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param Channel $channel
|
||||
* @param $retain_number
|
||||
* @throws Exception
|
||||
*/
|
||||
protected function pop(Channel $channel, $retain_number): void
|
||||
{
|
||||
while ($channel->length() > $retain_number) {
|
||||
if (Context::inCoroutine()) {
|
||||
$connection = $channel->pop();
|
||||
if ($connection instanceof StopHeartbeatCheck) {
|
||||
$connection->stopHeartbeatCheck();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
/**
|
||||
* @param Channel $channel
|
||||
* @param $retain_number
|
||||
* @throws Exception
|
||||
*/
|
||||
protected function pop(Channel $channel, $retain_number): void
|
||||
{
|
||||
while ($channel->length() > $retain_number) {
|
||||
if (Context::inCoroutine()) {
|
||||
$connection = $channel->pop();
|
||||
if ($connection instanceof StopHeartbeatCheck) {
|
||||
$connection->stopHeartbeatCheck();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param $name
|
||||
* @param false $isMaster
|
||||
* @param int $max
|
||||
* @throws ConfigException
|
||||
*/
|
||||
public function initConnections($name, bool $isMaster = false, int $max = 60)
|
||||
{
|
||||
$name = $this->name($name, $isMaster);
|
||||
if (isset(static::$_connections[$name])) {
|
||||
$value = static::$_connections[$name];
|
||||
if ($value instanceof Channel || $value instanceof SplQueue) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
$this->newChannel($name, $max);
|
||||
$this->max = $max;
|
||||
}
|
||||
/**
|
||||
* @param $name
|
||||
* @param false $isMaster
|
||||
* @param int $max
|
||||
* @throws ConfigException
|
||||
*/
|
||||
public function initConnections($name, bool $isMaster = false, int $max = 60)
|
||||
{
|
||||
$name = $this->name($name, $isMaster);
|
||||
if (isset(static::$_connections[$name])) {
|
||||
$value = static::$_connections[$name];
|
||||
if ($value instanceof Channel || $value instanceof SplQueue) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
$this->newChannel($name, $max);
|
||||
$this->max = $max;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param $name
|
||||
* @return Channel|SplQueue
|
||||
* @throws ConfigException
|
||||
* @throws Exception
|
||||
*/
|
||||
private function getChannel($name): Channel|SplQueue
|
||||
{
|
||||
if (!isset(static::$_connections[$name])) {
|
||||
$this->newChannel($name);
|
||||
}
|
||||
if (static::$_connections[$name]->errCode == SWOOLE_CHANNEL_CLOSED) {
|
||||
throw new Exception('Channel is Close.');
|
||||
}
|
||||
return static::$_connections[$name];
|
||||
}
|
||||
/**
|
||||
* @param $name
|
||||
* @return Channel|SplQueue
|
||||
* @throws ConfigException
|
||||
* @throws Exception
|
||||
*/
|
||||
private function getChannel($name): Channel|SplQueue
|
||||
{
|
||||
if (!isset(static::$_connections[$name])) {
|
||||
$this->newChannel($name);
|
||||
}
|
||||
if (static::$_connections[$name]->errCode == SWOOLE_CHANNEL_CLOSED) {
|
||||
throw new Exception('Channel is Close.');
|
||||
}
|
||||
return static::$_connections[$name];
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @throws ConfigException
|
||||
*/
|
||||
private function newChannel($name, $max = null)
|
||||
{
|
||||
if ($max == null) {
|
||||
$max = Config::get('databases.pool.max', 10);
|
||||
}
|
||||
if (Coroutine::getCid() === -1) {
|
||||
static::$_connections[$name] = new SplQueue($max);
|
||||
} else {
|
||||
static::$_connections[$name] = new Channel($max);
|
||||
}
|
||||
}
|
||||
/**
|
||||
* @throws ConfigException
|
||||
*/
|
||||
private function newChannel($name, $max = null)
|
||||
{
|
||||
if ($max == null) {
|
||||
$max = Config::get('databases.pool.max', 10);
|
||||
}
|
||||
if (Coroutine::getCid() === -1) {
|
||||
static::$_connections[$name] = new SplQueue($max);
|
||||
} else {
|
||||
static::$_connections[$name] = new Channel($max);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param $name
|
||||
* @param $callback
|
||||
* @return array
|
||||
* @throws ConfigException
|
||||
*/
|
||||
public function get($name, $callback): mixed
|
||||
{
|
||||
$channel = $this->getChannel($name);
|
||||
if (!$channel->isEmpty()) {
|
||||
$connection = $channel->pop();
|
||||
defer(fn() => $this->maxIdleQuantity($channel));
|
||||
if ($this->checkCanUse($name, $connection)) {
|
||||
return $connection;
|
||||
}
|
||||
}
|
||||
return $callback();
|
||||
}
|
||||
/**
|
||||
* @param $name
|
||||
* @param $callback
|
||||
* @return array
|
||||
* @throws ConfigException
|
||||
*/
|
||||
public function get($name, $callback): mixed
|
||||
{
|
||||
$channel = $this->getChannel($name);
|
||||
if (!$channel->isEmpty()) {
|
||||
$connection = $this->maxIdleQuantity($channel);
|
||||
if ($this->checkCanUse($name, $connection)) {
|
||||
return $connection;
|
||||
}
|
||||
}
|
||||
return $callback();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param $channel
|
||||
* @throws ConfigException
|
||||
* @throws Exception
|
||||
*/
|
||||
private function maxIdleQuantity($channel): void
|
||||
{
|
||||
$minx = Config::get('databases.pool.min', 1);
|
||||
if ($channel->length() > $minx) {
|
||||
$this->pop($channel, $minx);
|
||||
}
|
||||
}
|
||||
/**
|
||||
* @param $channel
|
||||
* @return mixed
|
||||
* @throws ConfigException
|
||||
*/
|
||||
private function maxIdleQuantity($channel): mixed
|
||||
{
|
||||
$connection = $channel->pop();
|
||||
$minx = Config::get('databases.pool.min', 1);
|
||||
if ($channel->length() > $minx) {
|
||||
$this->pop($channel, $minx);
|
||||
}
|
||||
return $connection;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param $name
|
||||
* @return bool
|
||||
* @throws ConfigException
|
||||
*/
|
||||
public function isNull($name): bool
|
||||
{
|
||||
return $this->getChannel($name)->isEmpty();
|
||||
}
|
||||
/**
|
||||
* @param $name
|
||||
* @return bool
|
||||
* @throws ConfigException
|
||||
*/
|
||||
public function isNull($name): bool
|
||||
{
|
||||
return $this->getChannel($name)->isEmpty();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param string $name
|
||||
* @param mixed $client
|
||||
* @return bool
|
||||
* 检查连接可靠性
|
||||
*/
|
||||
public function checkCanUse(string $name, mixed $client): bool
|
||||
{
|
||||
return true;
|
||||
}
|
||||
/**
|
||||
* @param string $name
|
||||
* @param mixed $client
|
||||
* @return bool
|
||||
* 检查连接可靠性
|
||||
*/
|
||||
public function checkCanUse(string $name, mixed $client): bool
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param string $name
|
||||
* @return bool
|
||||
*/
|
||||
public function hasItem(string $name): bool
|
||||
{
|
||||
if (isset(static::$_connections[$name])) {
|
||||
return !static::$_connections[$name]->isEmpty();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
/**
|
||||
* @param string $name
|
||||
* @return bool
|
||||
*/
|
||||
public function hasItem(string $name): bool
|
||||
{
|
||||
if (isset(static::$_connections[$name])) {
|
||||
return !static::$_connections[$name]->isEmpty();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param string $name
|
||||
* @return mixed
|
||||
*/
|
||||
public function size(string $name): mixed
|
||||
{
|
||||
if (!isset(static::$_connections[$name])) {
|
||||
return 0;
|
||||
}
|
||||
return static::$_connections[$name]->length();
|
||||
}
|
||||
/**
|
||||
* @param string $name
|
||||
* @return mixed
|
||||
*/
|
||||
public function size(string $name): mixed
|
||||
{
|
||||
if (!isset(static::$_connections[$name])) {
|
||||
return 0;
|
||||
}
|
||||
return static::$_connections[$name]->length();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param string $name
|
||||
* @param mixed $client
|
||||
* @throws ConfigException
|
||||
*/
|
||||
public function push(string $name, mixed $client)
|
||||
{
|
||||
$channel = $this->getChannel($name);
|
||||
if (!$channel->isFull()) {
|
||||
$channel->push($client);
|
||||
}
|
||||
unset($client);
|
||||
}
|
||||
/**
|
||||
* @param string $name
|
||||
* @param mixed $client
|
||||
* @throws ConfigException
|
||||
*/
|
||||
public function push(string $name, mixed $client)
|
||||
{
|
||||
$channel = $this->getChannel($name);
|
||||
if (!$channel->isFull()) {
|
||||
$channel->push($client);
|
||||
}
|
||||
unset($client);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param string $name
|
||||
* @throws Exception
|
||||
*/
|
||||
public function clean(string $name)
|
||||
{
|
||||
if (!isset(static::$_connections[$name])) {
|
||||
return;
|
||||
}
|
||||
static::$_connections[$name] = null;
|
||||
unset(static::$_connections[$name]);
|
||||
}
|
||||
/**
|
||||
* @param string $name
|
||||
* @throws Exception
|
||||
*/
|
||||
public function clean(string $name)
|
||||
{
|
||||
if (!isset(static::$_connections[$name])) {
|
||||
return;
|
||||
}
|
||||
static::$_connections[$name] = null;
|
||||
unset(static::$_connections[$name]);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return Channel[]
|
||||
*/
|
||||
protected function getChannels(): array
|
||||
{
|
||||
return static::$_connections;
|
||||
}
|
||||
/**
|
||||
* @return Channel[]
|
||||
*/
|
||||
protected function getChannels(): array
|
||||
{
|
||||
return static::$_connections;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user