diff --git a/System/Abstracts/BaseApplication.php b/System/Abstracts/BaseApplication.php index a40268bc..14f34dee 100644 --- a/System/Abstracts/BaseApplication.php +++ b/System/Abstracts/BaseApplication.php @@ -439,16 +439,6 @@ abstract class BaseApplication extends Component } - /** - * @return Pool - * @throws Exception - */ - public function getClientsPool(): Pool - { - return $this->get('clientsPool'); - } - - /** * @param $array * @throws \ReflectionException @@ -493,7 +483,6 @@ abstract class BaseApplication extends Component 'error' => ['class' => ErrorHandler::class], 'connections' => ['class' => Connection::class], 'redis_connections' => ['class' => SRedis::class], - 'clientsPool' => ['class' => Pool::class], 'config' => ['class' => Config::class], 'logger' => ['class' => Logger::class], 'annotation' => ['class' => SAnnotation::class], diff --git a/System/Abstracts/TraitApplication.php b/System/Abstracts/TraitApplication.php index 0120e218..5612bcd7 100644 --- a/System/Abstracts/TraitApplication.php +++ b/System/Abstracts/TraitApplication.php @@ -48,7 +48,6 @@ use Snowflake\Pool\Pool; * @property HttpFilter $filter * @property RPCProducer $rpc * @property Shutdown $shutdown - * @property Pool $clientsPool */ trait TraitApplication { diff --git a/System/Pool/Alias.php b/System/Pool/Alias.php new file mode 100644 index 00000000..6edc9441 --- /dev/null +++ b/System/Pool/Alias.php @@ -0,0 +1,24 @@ +getPool()->name('Mysql:' . $cds, true); - return Context::getContext('begin_' . $name) == 0; - } + /** + * @param $cds + * @return bool + * + * db is in transaction + * @throws Exception + */ + public function inTransaction($cds): bool + { + $name = $this->name('Mysql:' . $cds, true); + return Context::getContext('begin_' . $name) == 0; + } - /** - * @param $coroutineName - * @throws Exception - */ - public function beginTransaction($coroutineName) - { - $coroutineName = $this->getPool()->name('Mysql:' . $coroutineName, true); - if (!Context::hasContext('begin_' . $coroutineName)) { - Context::setContext('begin_' . $coroutineName, 0); - } - Context::increment('begin_' . $coroutineName); - if (Context::getContext('begin_' . $coroutineName) != 0) { - return; - } - $connection = Context::getContext($coroutineName); - if ($connection instanceof PDO && !$connection->inTransaction()) { - $connection->beginTransaction(); - } - } + /** + * @param $coroutineName + * @throws Exception + */ + public function beginTransaction($coroutineName) + { + $coroutineName = $this->name('Mysql:' . $coroutineName, true); + if (!Context::hasContext('begin_' . $coroutineName)) { + Context::setContext('begin_' . $coroutineName, 0); + } + if (Context::increment('begin_' . $coroutineName) != 0) { + return; + } + $connection = Context::getContext($coroutineName); + if (!$connection->inTransaction()) { + $connection->beginTransaction(); + } + } - /** - * @param $coroutineName - * @throws Exception - */ - public function commit($coroutineName) - { - $coroutineName = $this->getPool()->name('Mysql:' . $coroutineName, true); - if (!Context::hasContext('begin_' . $coroutineName)) { - return; - } - if (Context::decrement('begin_' . $coroutineName) > 0) { - return; - } - $connection = Context::getContext($coroutineName); - if (!($connection instanceof PDO)) { - return; - } - Context::setContext('begin_' . $coroutineName, 0); - if ($connection->inTransaction()) { - $connection->commit(); - } - } + /** + * @param $coroutineName + * @throws Exception + */ + public function commit($coroutineName) + { + $coroutineName = $this->name('Mysql:' . $coroutineName, true); + if (Context::decrement('begin_' . $coroutineName) != 0) { + return; + } + $connection = Context::getContext($coroutineName); + if ($connection->inTransaction()) { + $connection->commit(); + } + } - /** - * @param $coroutineName - * @throws Exception - */ - public function rollback($coroutineName) - { - $coroutineName = $this->getPool()->name('Mysql:' . $coroutineName, true); - if (!Context::hasContext('begin_' . $coroutineName)) { - return; - } - if (Context::decrement('begin_' . $coroutineName) > 0) { - return; - } - if (($connection = Context::getContext($coroutineName)) instanceof PDO) { - if ($connection->inTransaction()) { - $connection->rollBack(); - } - } - Context::setContext('begin_' . $coroutineName, 0); - } + /** + * @param $coroutineName + * @throws Exception + */ + public function rollback($coroutineName) + { + $coroutineName = $this->name('Mysql:' . $coroutineName, true); + if (Context::decrement('begin_' . $coroutineName) != 0) { + return; + } + if (($connection = Context::getContext($coroutineName)) instanceof PDO) { + if ($connection->inTransaction()) { + $connection->rollBack(); + } + } + } - /** - * @param mixed $config - * @param bool $isMaster - * @return mixed - * @throws Exception - */ - public function get(mixed $config, bool $isMaster = false): mixed - { - $coroutineName = $this->getPool()->name('Mysql:' . $config['cds'], $isMaster); - if (($pdo = Context::getContext($coroutineName)) instanceof PDO) { - return $pdo; - } - if (Coroutine::getCid() === -1) { - $connections = $this->createClient($coroutineName, $config); - } else { - /** @var PDO $connections */ - $connections = $this->getPool()->getFromChannel($coroutineName); - if (empty($connections)) { - $connections = $this->createClient($coroutineName, $config); - } - } - if ($number = Context::getContext('begin_' . $coroutineName)) { - $number > 0 && $connections->beginTransaction(); - } - return Context::setContext($coroutineName, $connections); - } + /** + * @param mixed $config + * @param bool $isMaster + * @return mixed + * @throws Exception + */ + public function get(mixed $config, bool $isMaster = false): ?PDO + { + $coroutineName = $this->name('Mysql:' . $config['cds'], $isMaster); + if (($pdo = Context::getContext($coroutineName)) instanceof PDO) { + return $pdo; + } + /** @var PDO $connections */ + $connections = $this->getPool()->get($coroutineName, $this->create($coroutineName, $config)); + if ($number = Context::getContext('begin_' . $coroutineName)) { + $number > 0 && $connections->beginTransaction(); + } + return Context::setContext($coroutineName, $connections); + } - /** - * @param $name - * @param $isMaster - * @param $max - * @throws Exception - */ - public function initConnections($name, $isMaster, $max) - { - $this->getPool()->initConnections($name, $isMaster, $max); - } + /** + * @param $coroutineName + * @param $config + * @return \Closure + */ + public function create($coroutineName, $config) + { + return static function () use ($coroutineName, $config) { + if (Coroutine::getCid() === -1) { + Runtime::enableCoroutine(false); + } + $cds = 'mysql:dbname=' . $config['database'] . ';host=' . $config['cds']; + $link = new PDO($cds, $config['username'], $config['password'], [ + PDO::ATTR_EMULATE_PREPARES => false, + PDO::ATTR_CASE => PDO::CASE_NATURAL, + PDO::ATTR_TIMEOUT => 60, + PDO::MYSQL_ATTR_USE_BUFFERED_QUERY => true, + PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES ' . ($config['charset'] ?? 'utf8mb4') + ]); + $link->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); + $link->setAttribute(PDO::ATTR_STRINGIFY_FETCHES, false); + $link->setAttribute(PDO::ATTR_ORACLE_NULLS, PDO::NULL_EMPTY_STRING); + return $link; + }; + } - /** - * @param string $name - * @param mixed $config - * @return PDO - * @throws Exception - */ - public function createClient(string $name, mixed $config): PDO - { - if (Coroutine::getCid() === -1) { - Runtime::enableCoroutine(false); - } - $cds = 'mysql:dbname=' . $config['database'] . ';host=' . $config['cds']; - $link = new PDO($cds, $config['username'], $config['password'], [ - PDO::ATTR_EMULATE_PREPARES => false, - PDO::ATTR_CASE => PDO::CASE_NATURAL, - PDO::ATTR_TIMEOUT => 60, - PDO::MYSQL_ATTR_USE_BUFFERED_QUERY => true, - PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES ' . ($config['charset'] ?? 'utf8mb4') - ]); - $link->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); - $link->setAttribute(PDO::ATTR_STRINGIFY_FETCHES, false); - $link->setAttribute(PDO::ATTR_ORACLE_NULLS, PDO::NULL_EMPTY_STRING); - return $link; - } + /** + * @param $name + * @param $isMaster + * @param $max + * @throws Exception + */ + public function initConnections($name, $isMaster, $max) + { + $this->getPool()->initConnections($name, $isMaster, $max); + } - /** - * @param $coroutineName - * @param $isMaster - * @throws Exception - */ - public function release($coroutineName, $isMaster) - { - $coroutineName = $this->getPool()->name('Mysql:' . $coroutineName, $isMaster); - - /** @var PDO $client */ - if (!($client = Context::getContext($coroutineName)) instanceof PDO) { - return; - } - if ($client->inTransaction()) { - return; - } - $this->getPool()->push($coroutineName, $client); - Context::remove($coroutineName); - } + /** + * @param $coroutineName + * @param $isMaster + * @throws Exception + */ + public function release($coroutineName, $isMaster) + { + $coroutineName = $this->name('Mysql:' . $coroutineName, $isMaster); + /** @var PDO $client */ + if (!($client = Context::getContext($coroutineName)) instanceof PDO) { + return; + } + if ($client->inTransaction()) { + return; + } + $this->getPool()->push($coroutineName, $client); + Context::remove($coroutineName); + } - /** - * @param $coroutineName - * @return bool - */ - private function hasClient($coroutineName): bool - { - return Context::hasContext($coroutineName); - } + /** + * @param $coroutineName + * @return bool + */ + private function hasClient($coroutineName): bool + { + return Context::hasContext($coroutineName); + } - /** - * batch release - * @throws Exception - */ - public function connection_clear($name, $isMaster) - { - $this->getPool()->clean($this->getPool()->name($name, $isMaster)); - } + /** + * batch release + * @throws Exception + */ + public function connection_clear($name, $isMaster) + { + $this->getPool()->clean($this->name($name, $isMaster)); + } - /** - * @param string $name - * @param mixed $client - * @return bool - * @throws Exception - */ - public function checkCanUse(string $name, mixed $client): bool - { - try { - if (empty($client) || !($client instanceof PDO)) { - $result = false; - } else { - $result = true; - } - } catch (Error | Throwable $exception) { - $result = $this->addError($exception, 'mysql'); - } finally { - if (!$result) { - $this->getPool()->decrement($name); - } - return $result; - } - } + /** + * @param string $name + * @param mixed $client + * @return bool + * @throws Exception + */ + public function checkCanUse(string $name, mixed $client): bool + { + try { + if (empty($client) || !($client instanceof PDO)) { + $result = false; + } else { + $result = true; + } + } catch (Error | Throwable $exception) { + $result = $this->addError($exception, 'mysql'); + } finally { + return $result; + } + } - /** - * @param $coroutineName - * @param bool $isMaster - * @throws Exception - */ - public function disconnect($coroutineName, bool $isMaster = false) - { - Context::remove($coroutineName); - $coroutineName = $this->getPool()->name('Mysql:' . $coroutineName, $isMaster); - $this->getPool()->clean($coroutineName); - } + /** + * @param $coroutineName + * @param bool $isMaster + * @throws Exception + */ + public function disconnect($coroutineName, bool $isMaster = false) + { + Context::remove($coroutineName); + $coroutineName = $this->name('Mysql:' . $coroutineName, $isMaster); + $this->getPool()->clean($coroutineName); + } - /** - * @return Pool - * @throws Exception - */ - public function getPool(): Pool - { - if (!$this->clientsPool) { - $this->clientsPool = Snowflake::app()->getClientsPool(); - } - return $this->clientsPool; - } + /** + * @return Pool + * @throws Exception + */ + public function getPool(): Pool + { + return Snowflake::getDi()->get(Pool::class); + } } diff --git a/System/Pool/Pool.php b/System/Pool/Pool.php index 1b9fd28c..24f84d26 100644 --- a/System/Pool/Pool.php +++ b/System/Pool/Pool.php @@ -22,311 +22,278 @@ use Swoole\Timer; class Pool extends Component { - /** @var Channel[] */ - private static array $_connections = []; + /** @var Channel[] */ + private static array $_connections = []; - public int $max = 60; + public int $max = 60; - public int $creates = -1; + public int $creates = -1; + + private array $_times = []; + + use Alias; - private array $_times = []; - - protected static array $hasCreate = []; + /** + * @return array + * @throws ConfigException + */ + private function getClearTime(): array + { + $firstClear = Config::get('pool.clear.start', 600); + $lastClear = Config::get('pool.clear.end', 300); + return [$firstClear, $lastClear]; + } - /** - * @param string $name - */ - public function increment(string $name) - { - if (!isset(static::$hasCreate[$name])) { - static::$hasCreate[$name] = 0; - } - static::$hasCreate[$name] += 1; - } + /** + * @throws Exception + */ + public function Heartbeat_detection($ticker) + { + if (env('state') == 'exit') { + Timer::clear($this->creates); + foreach (static::$_connections as $channel) { + $this->flush($channel, 0); + $channel->close(); + } + static::$_connections = []; + $this->creates = -1; + } else { + $this->heartbeat_flush(); + } + } - /** - * @param string $name - */ - public function decrement(string $name) - { - if (!isset(static::$hasCreate[$name])) { - return; - } - if (static::$hasCreate[$name] <= 0) { - return; - } - static::$hasCreate[$name] -= 1; - } + /** + * @throws ConfigException + * @throws Exception + */ + private function heartbeat_flush() + { + $num = []; + $total = 0; + $min = Config::get('databases.pool.min', 1); + foreach (static::$_connections as $key => $channel) { + if (!isset($num[$key])) { + $num[$key] = 0; + } + if (time() - ($this->_times[$key] ?? time()) > 120) { + $this->flush($channel, 0); + } else if ($channel->length() > $min) { + $this->flush($channel, $min); + } + $num[$key] += ($length = $channel->length()); + $total += $length; + } + $this->clear($total, $num); + } - /** - * @return array - * @throws ConfigException - */ - private function getClearTime(): array - { - $firstClear = Config::get('pool.clear.start', 600); - $lastClear = Config::get('pool.clear.end', 300); - return [$firstClear, $lastClear]; - } + /** + * @param $total + * @throws \Exception + */ + private function clear($total, $num) + { + write(var_export($num, true), 'connections'); + if ($total >= 1) { + return; + } + Timer::clear($this->creates); + if (Snowflake::isWorker() || Snowflake::isTask()) { + $this->debug('Worker #' . env('worker') . ' clear time tick.'); + } + $this->creates = -1; + } - /** - * @throws Exception - */ - public function Heartbeat_detection($ticker) - { - if (env('state') == 'exit') { - Timer::clear($this->creates); - foreach (static::$_connections as $channel) { - $this->flush($channel, 0); - $channel->close(); - } - static::$_connections = []; - $this->creates = -1; - } else { - $this->heartbeat_flush(); - } - } + /** + * @param $channel + * @param $retain_number + * @throws Exception + */ + public function flush($channel, $retain_number) + { + $this->pop($channel, $retain_number); + } - /** - * @throws ConfigException - * @throws Exception - */ - private function heartbeat_flush() - { - $num = []; - $total = 0; - $min = Config::get('databases.pool.min', 1); - foreach (static::$_connections as $key => $channel) { - if (!isset($num[$key])) { - $num[$key] = 0; - } - if (time() - ($this->_times[$key] ?? time()) > 120) { - $this->flush($channel, 0); - } else if ($channel->length() > $min) { - $this->flush($channel, $min); - } - $num[$key] += ($length = $channel->length()); - if (str_starts_with($key, 'Mysql') && (Snowflake::isWorker() || Snowflake::isTask()) && $length > 0) { - $this->debug('Worker #' . env('worker') . ' use client -> ' . $key . ':' . $length); - } - $total += $length; - } - write(var_export($num, true), 'connections'); - if ($total < 1) { - Timer::clear($this->creates); - if (Snowflake::isWorker() || Snowflake::isTask()) { - $this->debug('Worker #' . env('worker') . ' clear time tick.'); - } - $this->creates = -1; - } - } + /** + * @param Channel $channel + * @param $retain_number + * @throws Exception + */ + protected function pop(Channel $channel, $retain_number): void + { + if (Coroutine::getCid() === -1) { + return; + } + while ($channel->length() > $retain_number) { + $connection = $channel->pop(); + if ($connection) { + unset($connection); + } + } + } - /** - * @param $channel - * @param $retain_number - * @throws Exception - */ - public function flush($channel, $retain_number) - { - $this->pop($channel, $retain_number); - } + /** + * @param $name + * @param false $isMaster + * @param int $max + */ + public function initConnections($name, bool $isMaster = false, int $max = 60) + { + $name = $this->name($name, $isMaster); + if (isset(static::$_connections[$name]) && static::$_connections[$name] instanceof Channel) { + return; + } + if (Coroutine::getCid() === -1) { + return; + } + if ($this->creates === -1) { + $this->creates = Timer::tick(60000, [$this, 'Heartbeat_detection']); + } + static::$_connections[$name] = new Channel($max); + $this->max = $max; + } - /** - * @param Channel $channel - * @param $retain_number - * @throws Exception - */ - protected function pop(Channel $channel, $retain_number): void - { - if (Coroutine::getCid() === -1) { - return; - } - while ($channel->length() > $retain_number) { - $connection = $channel->pop(); - if ($connection) { - unset($connection); - } - } - } + /** + * @param $name + * @return Channel + * @throws ConfigException + * @throws Exception + */ + private function getChannel($name): Channel + { + if (!isset(static::$_connections[$name])) { + static::$_connections[$name] = new Channel(Config::get('databases.pool.max', 10)); + } + if (static::$_connections[$name]->errCode == SWOOLE_CHANNEL_CLOSED) { + throw new Exception('Channel is Close.'); + } + if ($this->creates === -1) { + $this->creates = Timer::tick(60000, [$this, 'Heartbeat_detection']); + } + return static::$_connections[$name]; + } - /** - * @param $name - * @param false $isMaster - * @param int $max - */ - public function initConnections($name, bool $isMaster = false, int $max = 60) - { - $name = $this->name($name, $isMaster); - if (isset(static::$_connections[$name]) && static::$_connections[$name] instanceof Channel) { - return; - } - if (Coroutine::getCid() === -1) { - return; - } - if ($this->creates === -1) { - $this->creates = Timer::tick(60000, [$this, 'Heartbeat_detection']); - } - static::$_connections[$name] = new Channel($max); - $this->max = $max; - } + /** + * @param $name + * @return array + * @throws Exception + */ + public function get($name, $callback): mixed + { + if (Coroutine::getCid() === -1) { + return $callback(); + } + $this->_times[$name] = time(); + $channel = $this->getChannel($name); + if (!$channel->isEmpty()) { + $connection = $channel->pop(); + if ($this->checkCanUse($name, $connection)) { + return $connection; + } + } + return $callback(); + } - /** - * @param $name - * @return Channel - * @throws ConfigException - * @throws Exception - */ - private function getChannel($name): Channel - { - if (!isset(static::$_connections[$name])) { - static::$_connections[$name] = new Channel(Config::get('databases.pool.max', 10)); - } - if (static::$_connections[$name]->errCode == SWOOLE_CHANNEL_CLOSED){ - throw new Exception('Channel is Close.'); - } - if ($this->creates === -1) { - $this->creates = Timer::tick(60000, [$this, 'Heartbeat_detection']); - } - return static::$_connections[$name]; - } + /** + * @param $name + * @return bool + * @throws \Snowflake\Exception\ConfigException + */ + public function isNull($name): bool + { + return $this->getChannel($name)->isEmpty(); + } - /** - * @param $name - * @return array - * @throws Exception - */ - public function getFromChannel($name): mixed - { - $this->_times[$name] = time(); - $channel = $this->getChannel($name); - if (!$channel->isEmpty()) { - $connection = $channel->pop(); - if ($this->checkCanUse($name, $connection)) { - return $connection; - } - } - return null; - } + /** + * @param string $name + * @param mixed $client + * @return bool + * 检查连接可靠性 + */ + public function checkCanUse(string $name, mixed $client): bool + { + return true; + } - /** - * @param $cds - * @param false $isMaster - * @return string - */ - #[Pure] public function name($cds, bool $isMaster = false): string - { - if ($isMaster === true) { - return $cds . '_master'; - } else { - return $cds . '_slave'; - } - } + /** + * @param string $name + * @return bool + */ + public function hasItem(string $name): bool + { + if (isset(static::$_connections[$name])) { + return !static::$_connections[$name]->isEmpty(); + } + return false; + } - /** - * @param string $name - * @param mixed $client - * @return bool - * 检查连接可靠性 - */ - public function checkCanUse(string $name, mixed $client): bool - { - return true; - } + /** + * @param string $name + * @return mixed + */ + public function size(string $name): mixed + { + if (Coroutine::getCid() === -1) { + return 0; + } + if (!isset(static::$_connections[$name])) { + return 0; + } + return static::$_connections[$name]->length(); + } - /** - * @param array $config - * @param bool $isMaster - * @return mixed - * @throws Exception - */ - public function get(mixed $config, bool $isMaster): mixed - { - throw new Exception('Undefined system processing function.'); - } + /** + * @param string $name + * @param mixed $client + * @throws ConfigException + */ + public function push(string $name, mixed $client) + { + if (Coroutine::getCid() === -1) { + return; + } + $channel = $this->getChannel($name); + if (!$channel->isFull()) { + $channel->push($client); + } + unset($client); + } - /** - * @param string $name - * @return bool - */ - public function hasItem(string $name): bool - { - if (isset(static::$_connections[$name])) { - return !static::$_connections[$name]->isEmpty(); - } - return false; - } + /** + * @param string $name + * @throws Exception + */ + public function clean(string $name) + { + if (Coroutine::getCid() === -1 || !isset(static::$_connections[$name])) { + return; + } + $channel = static::$_connections[$name]; + $this->pop($channel, 0); + } - /** - * @param string $name - * @return mixed - */ - public function size(string $name): mixed - { - if (Coroutine::getCid() === -1) { - return 0; - } - if (!isset(static::$_connections[$name])) { - return 0; - } - return static::$_connections[$name]->length(); - } - - - /** - * @param string $name - * @param mixed $client - * @throws ConfigException - */ - public function push(string $name, mixed $client) - { - if (Coroutine::getCid() === -1) { - return; - } - $channel = $this->getChannel($name); - if (!$channel->isFull()) { - $channel->push($client); - } - unset($client); - } - - - /** - * @param string $name - * @throws Exception - */ - public function clean(string $name) - { - if (Coroutine::getCid() === -1 || !isset(static::$_connections[$name])) { - return; - } - $channel = static::$_connections[$name]; - $this->pop($channel, 0); - } - - - /** - * @return Channel[] - */ - protected function getChannels(): array - { - return static::$_connections; - } + /** + * @return Channel[] + */ + protected function getChannels(): array + { + return static::$_connections; + } } diff --git a/System/Pool/Redis.php b/System/Pool/Redis.php index af5d3e10..af6708c4 100644 --- a/System/Pool/Redis.php +++ b/System/Pool/Redis.php @@ -22,120 +22,110 @@ use Swoole\Runtime; class Redis extends Component { - private ?Pool $clientsPool = null; + use Alias; - /** - * @param mixed $config - * @param bool $isMaster - * @return mixed - * @throws Exception - */ - public function get(mixed $config, bool $isMaster = false): mixed - { - $coroutineName = $this->getPool()->name('Redis:' . $config['host'], $isMaster); - if (Context::hasContext($coroutineName)) { - return Context::getContext($coroutineName); - } - if (Coroutine::getCid() === -1) { - return Context::setContext($coroutineName, $this->createClient($coroutineName, $config)); - } - $clients = $this->getPool()->getFromChannel($coroutineName); - if (empty($clients)) { - return Context::setContext($coroutineName, $this->createClient($coroutineName, $config)); - } - return Context::setContext($coroutineName, $clients); - } + /** + * @param mixed $config + * @param bool $isMaster + * @return mixed + * @throws Exception + */ + public function get(mixed $config, bool $isMaster = false): mixed + { + $coroutineName = $this->name('Redis:' . $config['host'], $isMaster); + if (Context::hasContext($coroutineName)) { + return Context::getContext($coroutineName); + } + $clients = $this->getPool()->get($coroutineName, $this->create($coroutineName, $config)); + return Context::setContext($coroutineName, $clients); + } - /** - * @param string $name - * @param mixed $config - * @return SRedis - * @throws RedisConnectException - * @throws Exception - */ - public function createClient(string $name, mixed $config): SRedis - { - if (Coroutine::getCid() === -1) { - Runtime::enableCoroutine(false); - } - $redis = new SRedis(); - if (!$redis->pconnect($config['host'], (int)$config['port'], $config['timeout'])) { - throw new RedisConnectException(sprintf('The Redis Connect %s::%d Fail.', $config['host'], $config['port'])); - } - if (!empty($config['auth']) && !$redis->auth($config['auth'])) { - throw new RedisConnectException(sprintf('Redis Error: %s, Host %s, Auth %s', $redis->getLastError(), $config['host'], $config['auth'])); - } - if (!isset($config['read_timeout'])) { - $config['read_timeout'] = 10; - } - $redis->select($config['databases']); - $redis->setOption(SRedis::OPT_READ_TIMEOUT, $config['read_timeout']); - $redis->setOption(SRedis::OPT_PREFIX, $config['prefix']); - - $this->getPool()->increment($name); - - return $redis; - } + /** + * @param string $name + * @param mixed $config + * @return SRedis + * @throws RedisConnectException + * @throws Exception + */ + public function create(string $name, mixed $config): \Closure + { + return static function () use ($name, $config) { + if (Coroutine::getCid() === -1) { + Runtime::enableCoroutine(false); + } + $redis = new SRedis(); + if (!$redis->pconnect($config['host'], (int)$config['port'], $config['timeout'])) { + throw new RedisConnectException(sprintf('The Redis Connect %s::%d Fail.', $config['host'], $config['port'])); + } + if (!empty($config['auth']) && !$redis->auth($config['auth'])) { + throw new RedisConnectException(sprintf('Redis Error: %s, Host %s, Auth %s', $redis->getLastError(), $config['host'], $config['auth'])); + } + if (!isset($config['read_timeout'])) { + $config['read_timeout'] = 10; + } + $redis->select($config['databases']); + $redis->setOption(SRedis::OPT_READ_TIMEOUT, $config['read_timeout']); + $redis->setOption(SRedis::OPT_PREFIX, $config['prefix']); + return $redis; + }; + } - /** - * @param array $config - * @param bool $isMaster - * @throws ConfigException - * @throws Exception - */ - public function release(array $config, bool $isMaster = false) - { - $coroutineName = $this->getPool()->name('Redis:' . $config['host'], $isMaster); - if (!Context::hasContext($coroutineName)) { - return; - } + /** + * @param array $config + * @param bool $isMaster + * @throws ConfigException + * @throws Exception + */ + public function release(array $config, bool $isMaster = false) + { + $coroutineName = $this->name('Redis:' . $config['host'], $isMaster); + if (!Context::hasContext($coroutineName)) { + return; + } - $this->getPool()->push($coroutineName, Context::getContext($coroutineName)); - Context::remove($coroutineName); - } + $this->getPool()->push($coroutineName, Context::getContext($coroutineName)); + Context::remove($coroutineName); + } - /** - * @param array $config - * @param bool $isMaster - * @throws Exception - */ - public function destroy(array $config, bool $isMaster = false) - { - $coroutineName = $this->getPool()->name('Redis:' . $config['host'], $isMaster); - if (Context::hasContext($coroutineName)) { - $this->getPool()->decrement($coroutineName); - } - $this->getPool()->clean($coroutineName); - Context::remove($coroutineName); - } + /** + * @param array $config + * @param bool $isMaster + * @throws Exception + */ + public function destroy(array $config, bool $isMaster = false) + { + $coroutineName = $this->name('Redis:' . $config['host'], $isMaster); + if (Context::hasContext($coroutineName)) { + $this->getPool()->decrement($coroutineName); + } + $this->getPool()->clean($coroutineName); + Context::remove($coroutineName); + } - /** - * @return Pool - * @throws Exception - */ - public function getPool(): Pool - { - if (!$this->clientsPool) { - $this->clientsPool = Snowflake::app()->getClientsPool(); - } - return $this->clientsPool; - } + /** + * @return Pool + * @throws Exception + */ + public function getPool(): Pool + { + return Snowflake::getDi()->get(Pool::class); + } - /** - * @param $name - * @param $isMaster - * @param $max - * @throws Exception - */ - public function initConnections($name, $isMaster, $max) - { - $this->getPool()->initConnections($name, $isMaster, $max); - } + /** + * @param $name + * @param $isMaster + * @param $max + * @throws Exception + */ + public function initConnections($name, $isMaster, $max) + { + $this->getPool()->initConnections($name, $isMaster, $max); + } }