2020-08-31 01:27:08 +08:00
|
|
|
<?php
|
2020-10-29 18:17:25 +08:00
|
|
|
declare(strict_types=1);
|
2020-08-31 01:27:08 +08:00
|
|
|
|
|
|
|
|
namespace Snowflake\Abstracts;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
use Exception;
|
2020-12-17 14:12:44 +08:00
|
|
|
|
2021-01-04 18:40:11 +08:00
|
|
|
use HttpServer\Http\Context;
|
2021-02-15 16:09:36 +08:00
|
|
|
use JetBrains\PhpStorm\Pure;
|
2021-01-05 10:23:20 +08:00
|
|
|
use PDO;
|
|
|
|
|
use Redis;
|
2021-02-23 16:56:34 +08:00
|
|
|
use Snowflake\Exception\ComponentException;
|
2021-02-24 14:53:27 +08:00
|
|
|
use Snowflake\Exception\ConfigException;
|
2021-02-20 13:20:37 +08:00
|
|
|
use Snowflake\Pool\Timeout;
|
2020-08-31 01:27:08 +08:00
|
|
|
use Swoole\Coroutine;
|
|
|
|
|
use Swoole\Coroutine\Channel;
|
2021-02-15 20:31:01 +08:00
|
|
|
use Swoole\Timer;
|
2020-08-31 01:27:08 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Class Pool
|
2020-08-31 12:38:32 +08:00
|
|
|
* @package Snowflake\Snowflake\Pool
|
2020-08-31 01:27:08 +08:00
|
|
|
*/
|
|
|
|
|
abstract class Pool extends Component
|
|
|
|
|
{
|
|
|
|
|
|
2021-02-23 16:56:34 +08:00
|
|
|
/** @var Channel[] */
|
|
|
|
|
private array $_items = [];
|
|
|
|
|
|
2021-02-24 14:44:11 +08:00
|
|
|
public int $max = 60;
|
2021-02-23 16:56:34 +08:00
|
|
|
|
2021-02-24 14:53:27 +08:00
|
|
|
public int $creates = -1;
|
|
|
|
|
|
|
|
|
|
public int $lastTime = 0;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @return array
|
|
|
|
|
* @throws ConfigException
|
|
|
|
|
*/
|
|
|
|
|
private function getClearTime(): array
|
|
|
|
|
{
|
|
|
|
|
$firstClear = Config::get('pool.clear.start', false, 600);
|
|
|
|
|
$lastClear = Config::get('pool.clear.end', false, 300);
|
|
|
|
|
return [$firstClear, $lastClear];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @throws Exception
|
|
|
|
|
*/
|
|
|
|
|
public function Heartbeat_detection()
|
|
|
|
|
{
|
|
|
|
|
if ($this->lastTime == 0) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
[$firstClear, $lastClear] = $this->getClearTime();
|
|
|
|
|
if ($this->lastTime + $firstClear < time()) {
|
|
|
|
|
$this->flush(0);
|
|
|
|
|
} else if ($this->lastTime + $lastClear < time()) {
|
|
|
|
|
$this->flush(2);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param $retain_number
|
|
|
|
|
* @throws Exception
|
|
|
|
|
*/
|
|
|
|
|
protected function flush($retain_number)
|
|
|
|
|
{
|
|
|
|
|
$channels = $this->getChannels();
|
|
|
|
|
foreach ($channels as $name => $channel) {
|
|
|
|
|
$names[] = $name;
|
|
|
|
|
$this->pop($channel, $name, $retain_number);
|
|
|
|
|
}
|
|
|
|
|
if ($retain_number == 0) {
|
|
|
|
|
$this->debug('release Timer::tick');
|
|
|
|
|
Timer::clear($this->creates);
|
|
|
|
|
$this->creates = -1;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param $channel
|
|
|
|
|
* @param $name
|
|
|
|
|
* @param $retain_number
|
|
|
|
|
* @throws Exception
|
|
|
|
|
*/
|
|
|
|
|
protected function pop($channel, $name, $retain_number)
|
|
|
|
|
{
|
|
|
|
|
while ($channel->length() > $retain_number) {
|
|
|
|
|
$connection = $channel->pop();
|
|
|
|
|
if ($connection) {
|
|
|
|
|
unset($connection);
|
|
|
|
|
}
|
|
|
|
|
$this->desc($name);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2021-02-23 16:56:34 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param $driver
|
|
|
|
|
* @param $name
|
|
|
|
|
* @param false $isMaster
|
|
|
|
|
* @param int $max
|
|
|
|
|
*/
|
|
|
|
|
public function initConnections($driver, $name, $isMaster = false, $max = 60)
|
|
|
|
|
{
|
|
|
|
|
$name = $this->name($driver, $name, $isMaster);
|
|
|
|
|
if (isset($this->_items[$name]) && $this->_items[$name] instanceof Channel) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if (!Context::inCoroutine()) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
$this->_items[$name] = new Channel((int)$max);
|
|
|
|
|
$this->max = (int)$max;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param $name
|
|
|
|
|
* @param array $callback
|
|
|
|
|
* @return array
|
|
|
|
|
* @throws Exception
|
|
|
|
|
*/
|
2021-02-23 16:57:35 +08:00
|
|
|
protected function getFromChannel($name, mixed $callback): mixed
|
2021-02-23 16:56:34 +08:00
|
|
|
{
|
|
|
|
|
if (!Context::inCoroutine()) {
|
|
|
|
|
return $this->createClient($name, $callback);
|
|
|
|
|
}
|
|
|
|
|
if (!$this->hasItem($name)) {
|
2021-02-24 14:53:27 +08:00
|
|
|
$this->createByCallback($name, $callback);
|
2021-02-23 16:56:34 +08:00
|
|
|
}
|
|
|
|
|
$connection = $this->_items[$name]->pop(-1);
|
|
|
|
|
if (!$this->checkCanUse($name, $connection)) {
|
|
|
|
|
return $this->createClient($name, $callback);
|
|
|
|
|
} else {
|
|
|
|
|
return $connection;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2021-02-24 14:53:27 +08:00
|
|
|
/**
|
|
|
|
|
* @param $name
|
|
|
|
|
* @param mixed $callback
|
|
|
|
|
*/
|
|
|
|
|
private function createByCallback($name, mixed $callback)
|
|
|
|
|
{
|
|
|
|
|
if ($this->creates === -1 && !is_callable($callback)) {
|
|
|
|
|
$this->creates = Timer::tick(1000, [$this, 'Heartbeat_detection']);
|
|
|
|
|
}
|
|
|
|
|
if (!Context::hasContext('create::client::ing::' . $name)) {
|
|
|
|
|
$this->push($name, $this->createClient($name, $callback));
|
2021-02-25 13:35:48 +08:00
|
|
|
Context::remove('create::client::ing::' . $name);
|
2021-02-24 14:53:27 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2021-02-23 16:56:34 +08:00
|
|
|
/**
|
|
|
|
|
* @param $cds
|
|
|
|
|
* @param $coroutineName
|
|
|
|
|
* @param false $isBefore
|
|
|
|
|
* @throws ComponentException
|
|
|
|
|
*/
|
|
|
|
|
public function printClients($cds, $coroutineName, $isBefore = false)
|
|
|
|
|
{
|
2021-02-26 19:28:25 +08:00
|
|
|
// $this->warning(($isBefore ? 'before ' : '') . 'create client[address: ' . $cds . ', ' . env('workerId') . ', coroutine: ' . Coroutine::getCid() . ', has num: ' . $this->size($coroutineName) . ', has create: ' . $this->_create . ']');
|
2021-02-23 16:56:34 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
abstract public function createClient(string $name, mixed $config): mixed;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param $driver
|
|
|
|
|
* @param $cds
|
|
|
|
|
* @param false $isMaster
|
|
|
|
|
* @return string
|
|
|
|
|
*/
|
|
|
|
|
#[Pure] public function name($driver, $cds, $isMaster = false): string
|
|
|
|
|
{
|
|
|
|
|
if ($isMaster === true) {
|
|
|
|
|
return $cds . '_master';
|
|
|
|
|
} else {
|
|
|
|
|
return $cds . '_slave';
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param string $name
|
|
|
|
|
* @param $client
|
|
|
|
|
* @return mixed
|
|
|
|
|
* 检查连接可靠性
|
|
|
|
|
*/
|
|
|
|
|
public function checkCanUse(string $name, mixed $client): mixed
|
|
|
|
|
{
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param $name
|
|
|
|
|
* @throws Exception
|
|
|
|
|
*/
|
|
|
|
|
public function desc(string $name)
|
|
|
|
|
{
|
|
|
|
|
throw new Exception('Undefined system processing function.');
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param array $config
|
|
|
|
|
* @param $isMaster
|
|
|
|
|
* @return mixed
|
|
|
|
|
* @throws Exception
|
|
|
|
|
*/
|
|
|
|
|
public function get(mixed $config, bool $isMaster): mixed
|
|
|
|
|
{
|
|
|
|
|
throw new Exception('Undefined system processing function.');
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param $name
|
|
|
|
|
* @return bool
|
|
|
|
|
*/
|
|
|
|
|
public function hasItem(string $name): bool
|
|
|
|
|
{
|
|
|
|
|
if (isset($this->_items[$name])) {
|
|
|
|
|
return !$this->_items[$name]->isEmpty();
|
|
|
|
|
}
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param $name
|
|
|
|
|
* @return mixed
|
|
|
|
|
*/
|
|
|
|
|
public function size(string $name): mixed
|
|
|
|
|
{
|
|
|
|
|
if (!Context::inCoroutine()) {
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
if (!isset($this->_items[$name])) {
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
return $this->_items[$name]->length();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param $name
|
|
|
|
|
* @param $client
|
|
|
|
|
*/
|
|
|
|
|
public function push(string $name, mixed $client)
|
|
|
|
|
{
|
|
|
|
|
if (!Context::inCoroutine()) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if (!isset($this->_items[$name])) {
|
|
|
|
|
$this->_items[$name] = new Channel($this->max);
|
|
|
|
|
}
|
|
|
|
|
if (!$this->_items[$name]->isFull()) {
|
|
|
|
|
$this->_items[$name]->push($client);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param string $name
|
|
|
|
|
* @throws Exception
|
|
|
|
|
*/
|
|
|
|
|
public function clean(string $name)
|
|
|
|
|
{
|
|
|
|
|
if (!Context::inCoroutine()) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if (!isset($this->_items[$name])) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
$channel = $this->_items[$name];
|
|
|
|
|
while ($client = $channel->pop(0.001)) {
|
|
|
|
|
unset($client);
|
|
|
|
|
$this->desc($name);
|
|
|
|
|
}
|
2021-02-28 18:27:06 +08:00
|
|
|
$this->_items[$name]->close();
|
|
|
|
|
$this->_items[$name] = null;
|
2021-02-23 16:56:34 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @return Channel[]
|
|
|
|
|
*/
|
|
|
|
|
protected function getChannels(): array
|
|
|
|
|
{
|
|
|
|
|
return $this->_items;
|
|
|
|
|
}
|
2020-09-06 05:37:00 +08:00
|
|
|
|
|
|
|
|
|
2020-08-31 01:27:08 +08:00
|
|
|
}
|