From e5e2c2ea747d8d1544ad774ab180755d5e48eda0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=91=E6=9E=97?= Date: Wed, 16 Aug 2023 00:16:16 +0800 Subject: [PATCH] qqq --- kiri-engine/Abstracts/BaseApplication.php | 2 + kiri-engine/Pool/Pool.php | 194 ---------------------- kiri-engine/Pool/PoolItem.php | 118 ------------- kiri-engine/Pool/QueueInterface.php | 62 ------- kiri-engine/Pool/SplQueue.php | 129 -------------- kiri-engine/Pool/StopHeartbeatCheck.php | 13 -- kiri-engine/Redis/Redis.php | 77 +++------ 7 files changed, 22 insertions(+), 573 deletions(-) delete mode 100644 kiri-engine/Pool/Pool.php delete mode 100644 kiri-engine/Pool/PoolItem.php delete mode 100644 kiri-engine/Pool/QueueInterface.php delete mode 100644 kiri-engine/Pool/SplQueue.php delete mode 100644 kiri-engine/Pool/StopHeartbeatCheck.php diff --git a/kiri-engine/Abstracts/BaseApplication.php b/kiri-engine/Abstracts/BaseApplication.php index fc08b859..7422bb87 100644 --- a/kiri-engine/Abstracts/BaseApplication.php +++ b/kiri-engine/Abstracts/BaseApplication.php @@ -24,6 +24,7 @@ use Psr\Log\LoggerInterface; use Kiri\Events\EventProvider; use ReflectionException; use Monolog\Logger; +use Kiri\Pool\{Pool, PoolInterface}; use Kiri\Error\StdoutLogger; /** @@ -90,6 +91,7 @@ abstract class BaseApplication extends Component public function mapping(ConfigProvider $config): void { $this->container->bind(LoggerInterface::class, new StdoutLogger()); + $this->container->set(PoolInterface::class, Pool::class); foreach ($config->get('mapping', []) as $interface => $class) { $this->container->set($interface, $class); } diff --git a/kiri-engine/Pool/Pool.php b/kiri-engine/Pool/Pool.php deleted file mode 100644 index 8f765a67..00000000 --- a/kiri-engine/Pool/Pool.php +++ /dev/null @@ -1,194 +0,0 @@ - */ - private array $_connections = []; - - - /** - * @param $name - * @param $retain_number - * @throws Exception - */ - public function flush($name, $retain_number): void - { - if ($this->hasChannel($name)) { - $channel = $this->channel($name); - $channel->tailor($retain_number); - } - } - - - /** - * @param PoolItem $channel - * @param $retain_number - */ - protected function pop(PoolItem $channel, $retain_number): void - { - $channel->tailor($retain_number); - } - - - /** - * @param $name - * @param int $max - * @param callable $closure - */ - public function created($name, int $max, callable $closure): void - { - if (!isset($this->_connections[$name])) { - $this->_connections[$name] = new PoolItem($max, $closure); - } - } - - - /** - * @param $name - * @return PoolItem - * @throws ConfigException - * @throws Exception - */ - public function channel($name): PoolItem - { - if (!isset($this->_connections[$name])) { - throw new Exception('Channel is not exists.'); - } - return $this->_connections[$name]; - } - - - /** - * @param $name - * @return bool - */ - public function hasChannel($name): bool - { - return isset($this->_connections[$name]) && $this->_connections[$name] instanceof PoolItem; - } - - - /** - * @param string $name - * @return array - * @throws ConfigException - */ - public function get(string $name): mixed - { - return $this->channel($name)->pop(); - } - - - /** - * @param $name - * @return bool - * @throws ConfigException - */ - public function isNull($name): bool - { - return $this->channel($name)->isEmpty(); - } - - - /** - * @param string $name - * @param mixed $client - * @return bool - * 检查连接可靠性 - */ - public function checkCanUse(string $name, mixed $client): bool - { - return true; - } - - - /** - * @param string $name - * @return bool - */ - public function hasItem(string $name): bool - { - $channel = $this->_connections[$name] ?? null; - if ($channel === null) { - return false; - } - return !$channel->isEmpty(); - } - - - /** - * @param string $name - * @return int - */ - public function size(string $name): int - { - $channel = $this->_connections[$name] ?? null; - if ($channel === null) { - return 0; - } - return $channel->size(); - } - - - /** - * @param string $name - * @param mixed $client - * @throws ConfigException - */ - public function push(string $name, mixed $client): void - { - $this->channel($name)->push($client); - } - - - /** - * @param $name - * @param int $time - * @return array - * @throws ConfigException - */ - public function waite($name, int $time = 30): mixed - { - return $this->channel($name)->pop($time); - } - - - /** - * @param string $name - * @throws Exception - */ - public function clean(string $name): void - { - $channel = $this->_connections[$name] ?? null; - if ($channel === null) { - return; - } - $channel->tailor(0); - $channel->close(); - } - - - /** - * @return PoolItem[] - */ - protected function channels(): array - { - return $this->_connections; - } - - -} diff --git a/kiri-engine/Pool/PoolItem.php b/kiri-engine/Pool/PoolItem.php deleted file mode 100644 index ae0de752..00000000 --- a/kiri-engine/Pool/PoolItem.php +++ /dev/null @@ -1,118 +0,0 @@ -_items = new Channel($this->maxCreated); - } else { - $this->_items = new SplQueue($this->maxCreated); - } - } - - - /** - * @param Channel|SplQueue $items - */ - public function setItems(Channel|SplQueue $items): void - { - $this->_items = $items; - } - - - /** - * @param mixed $item - * @return void - */ - public function push(mixed $item): void - { - if (is_null($item)) { - $item = call_user_func($this->callback); - } - $this->_items->push($item); - } - - - /** - * @return bool - */ - public function isEmpty(): bool - { - return $this->_items->isEmpty(); - } - - - /** - * @return int - */ - public function size(): int - { - return $this->_items->length(); - } - - - /** - * @return bool - */ - public function close(): bool - { - return $this->_items->close(); - } - - - /** - * @param int $min - * @return void - */ - public function tailor(int $min = 0): void - { - while ($this->_items->length() > $min) { - $connection = $this->_items->pop(0.000001); - if ($connection instanceof StopHeartbeatCheck) { - $connection->stopHeartbeatCheck(); - } - $connection = null; - $this->created -= 1; - } - } - - - /** - * @param int $waite - * @return mixed - */ - public function pop(int $waite = 10): mixed - { - if ($this->_items->isEmpty()) { - return call_user_func($this->callback); - } else { - return $this->_items->pop($waite); - } - } -} diff --git a/kiri-engine/Pool/QueueInterface.php b/kiri-engine/Pool/QueueInterface.php deleted file mode 100644 index 9a99ed61..00000000 --- a/kiri-engine/Pool/QueueInterface.php +++ /dev/null @@ -1,62 +0,0 @@ -channel = new \SplQueue(); - } - - - /** - * @return bool - */ - public function isEmpty(): bool - { - // TODO: Implement isEmpty() method. - return $this->channel->count() < 1; - } - - - /** - * @param mixed $data - * @param float $timeout - * @return bool - */ - public function push(mixed $data, float $timeout = -1): bool - { - // TODO: Implement push() method. - if ($this->isFull()) { - return false; - } - $this->channel->enqueue($data); - return true; - } - - - /** - * @param float $timeout - * @return mixed - */ - public function pop(float $timeout = -1): mixed - { - // TODO: Implement pop() method. - if ($this->channel->count() < 1) { - return null; - } - return $this->channel->dequeue(); - } - - - /** - * @return array - */ - public function stats(): array - { - // TODO: Implement stats() method. - return [ - 'consumer_num' => 0, - 'producer_num' => 0, - 'queue_num' => $this->length() - ]; - } - - - /** - * @return bool - */ - public function close(): bool - { - // TODO: Implement close() method. - return false; - } - - - /** - * @return int - */ - public function length(): int - { - // TODO: Implement length() method. - return $this->channel->count(); - } - - - /** - * @return bool - */ - public function isFull(): bool - { - // TODO: Implement isFull() method. - return $this->channel->count() >= $this->max; - } - - - /** - * @return bool - */ - public function isClose(): bool - { - return false; - } - -} diff --git a/kiri-engine/Pool/StopHeartbeatCheck.php b/kiri-engine/Pool/StopHeartbeatCheck.php deleted file mode 100644 index 898dcaae..00000000 --- a/kiri-engine/Pool/StopHeartbeatCheck.php +++ /dev/null @@ -1,13 +0,0 @@ -hasChannel($this->host)) { - $config = $this->get_config(); - $length = \config('cache.redis.pool.max', 10); - $pool->created($config['host'], $length, $this->connect($config)); + $pool->created($this->host, \config('cache.redis.pool.max', 10), [$this, 'connect']); } return $pool; } /** - * @param $config - * @return \Closure + * @return \Redis + * @throws RedisConnectException */ - protected function connect($config): \Closure + protected function connect(): \Redis { - return static function () use ($config) { - $redis = new \Redis(); - if (!$redis->connect($config['host'], $config['port'], $config['timeout'])) { - throw new RedisConnectException(sprintf('The Redis Connect %s::%d Fail.', $config['host'], $config['port'])); - } - if (!empty($config['auth']) && !$redis->auth($config['auth'])) { - throw new RedisConnectException(sprintf('Redis Error: %s, Host %s, Auth %s', $redis->getLastError(), $config['host'], $config['auth'])); - } - if ($config['read_timeout'] < 0) { - $config['read_timeout'] = 0; - } - $redis->select($config['databases']); - if ($config['read_timeout'] > 0) { - $redis->setOption(\Redis::OPT_READ_TIMEOUT, $config['read_timeout']); - } - $redis->setOption(\Redis::OPT_PREFIX, $config['prefix']); - return $redis; - }; + $redis = new \Redis(); + if (!$redis->connect($this->host, $this->port, $this->timeout)) { + throw new RedisConnectException(sprintf('The Redis Connect %s::%d Fail.', $this->host, $this->port)); + } + if (!empty($this->auth) && !$redis->auth($this->auth)) { + throw new RedisConnectException(sprintf('Redis Error: %s, Host %s, Auth %s', $redis->getLastError(), $this->host, $this->auth)); + } + $redis->select($this->databases); + if ($this->read_timeout > 0) { + $redis->setOption(\Redis::OPT_READ_TIMEOUT, $this->read_timeout); + } + if (!empty($this->prefix)) { + $redis->setOption(\Redis::OPT_PREFIX, $this->prefix); + } + return $redis; } - - - /** - * @return array - */ - #[ArrayShape(['host' => "string", 'port' => "int", 'prefix' => "string", 'auth' => "string", 'databases' => "int", 'timeout' => "int", 'read_timeout' => "int", 'pool' => "array|int[]"])] - public function get_config(): array - { - return [ - 'host' => $this->host, - 'port' => $this->port, - 'prefix' => $this->prefix, - 'auth' => $this->auth, - 'databases' => $this->databases, - 'timeout' => $this->timeout, - 'read_timeout' => $this->read_timeout, - 'pool' => $this->pool - ]; - } - }