This commit is contained in:
2021-02-24 14:53:27 +08:00
parent e219b9a0ac
commit cfda39ebdd
3 changed files with 92 additions and 79 deletions
+88 -8
View File
@@ -11,6 +11,7 @@ use JetBrains\PhpStorm\Pure;
use PDO;
use Redis;
use Snowflake\Exception\ComponentException;
use Snowflake\Exception\ConfigException;
use Snowflake\Pool\Timeout;
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
@@ -28,7 +29,76 @@ abstract class Pool extends Component
public int $max = 60;
use Timeout;
public int $creates = -1;
public int $lastTime = 0;
/**
* @return array
* @throws ConfigException
*/
private function getClearTime(): array
{
$firstClear = Config::get('pool.clear.start', false, 600);
$lastClear = Config::get('pool.clear.end', false, 300);
return [$firstClear, $lastClear];
}
/**
* @throws Exception
*/
public function Heartbeat_detection()
{
if ($this->lastTime == 0) {
return;
}
[$firstClear, $lastClear] = $this->getClearTime();
if ($this->lastTime + $firstClear < time()) {
$this->flush(0);
} else if ($this->lastTime + $lastClear < time()) {
$this->flush(2);
}
}
/**
* @param $retain_number
* @throws Exception
*/
protected function flush($retain_number)
{
$channels = $this->getChannels();
foreach ($channels as $name => $channel) {
$names[] = $name;
$this->pop($channel, $name, $retain_number);
}
if ($retain_number == 0) {
$this->debug('release Timer::tick');
Timer::clear($this->creates);
$this->creates = -1;
}
}
/**
* @param $channel
* @param $name
* @param $retain_number
* @throws Exception
*/
protected function pop($channel, $name, $retain_number)
{
while ($channel->length() > $retain_number) {
$connection = $channel->pop();
if ($connection) {
unset($connection);
}
$this->desc($name);
}
}
/**
* @param $driver
@@ -62,13 +132,7 @@ abstract class Pool extends Component
return $this->createClient($name, $callback);
}
if (!$this->hasItem($name)) {
if ($this->creates === -1 && !is_callable($callback)) {
$this->creates = Timer::tick(1000, [$this, 'Heartbeat_detection']);
}
if (!Context::hasContext('create::client::ing::' . $name)) {
$this->push($name, $this->createClient($name, $callback));
Context::deleteContext('create::client::ing::' . $name);
}
$this->createByCallback($name, $callback);
}
$connection = $this->_items[$name]->pop(-1);
if (!$this->checkCanUse($name, $connection)) {
@@ -79,6 +143,22 @@ abstract class Pool extends Component
}
/**
* @param $name
* @param mixed $callback
*/
private function createByCallback($name, mixed $callback)
{
if ($this->creates === -1 && !is_callable($callback)) {
$this->creates = Timer::tick(1000, [$this, 'Heartbeat_detection']);
}
if (!Context::hasContext('create::client::ing::' . $name)) {
$this->push($name, $this->createClient($name, $callback));
Context::deleteContext('create::client::ing::' . $name);
}
}
/**
* @param $cds
* @param $coroutineName