This commit is contained in:
2023-08-16 00:16:16 +08:00
parent 2c46a752f9
commit e5e2c2ea74
7 changed files with 22 additions and 573 deletions
@@ -24,6 +24,7 @@ use Psr\Log\LoggerInterface;
use Kiri\Events\EventProvider; use Kiri\Events\EventProvider;
use ReflectionException; use ReflectionException;
use Monolog\Logger; use Monolog\Logger;
use Kiri\Pool\{Pool, PoolInterface};
use Kiri\Error\StdoutLogger; use Kiri\Error\StdoutLogger;
/** /**
@@ -90,6 +91,7 @@ abstract class BaseApplication extends Component
public function mapping(ConfigProvider $config): void public function mapping(ConfigProvider $config): void
{ {
$this->container->bind(LoggerInterface::class, new StdoutLogger()); $this->container->bind(LoggerInterface::class, new StdoutLogger());
$this->container->set(PoolInterface::class, Pool::class);
foreach ($config->get('mapping', []) as $interface => $class) { foreach ($config->get('mapping', []) as $interface => $class) {
$this->container->set($interface, $class); $this->container->set($interface, $class);
} }
-194
View File
@@ -1,194 +0,0 @@
<?php
namespace Kiri\Pool;
use Exception;
use Kiri\Abstracts\Component;
use Kiri\Exception\ConfigException;
/**
* Class Pool
* @package Kiri\Pool
*/
class Pool extends Component
{
/** @var array<PoolItem> */
private array $_connections = [];
/**
* @param $name
* @param $retain_number
* @throws Exception
*/
public function flush($name, $retain_number): void
{
if ($this->hasChannel($name)) {
$channel = $this->channel($name);
$channel->tailor($retain_number);
}
}
/**
* @param PoolItem $channel
* @param $retain_number
*/
protected function pop(PoolItem $channel, $retain_number): void
{
$channel->tailor($retain_number);
}
/**
* @param $name
* @param int $max
* @param callable $closure
*/
public function created($name, int $max, callable $closure): void
{
if (!isset($this->_connections[$name])) {
$this->_connections[$name] = new PoolItem($max, $closure);
}
}
/**
* @param $name
* @return PoolItem
* @throws ConfigException
* @throws Exception
*/
public function channel($name): PoolItem
{
if (!isset($this->_connections[$name])) {
throw new Exception('Channel is not exists.');
}
return $this->_connections[$name];
}
/**
* @param $name
* @return bool
*/
public function hasChannel($name): bool
{
return isset($this->_connections[$name]) && $this->_connections[$name] instanceof PoolItem;
}
/**
* @param string $name
* @return array
* @throws ConfigException
*/
public function get(string $name): mixed
{
return $this->channel($name)->pop();
}
/**
* @param $name
* @return bool
* @throws ConfigException
*/
public function isNull($name): bool
{
return $this->channel($name)->isEmpty();
}
/**
* @param string $name
* @param mixed $client
* @return bool
* 检查连接可靠性
*/
public function checkCanUse(string $name, mixed $client): bool
{
return true;
}
/**
* @param string $name
* @return bool
*/
public function hasItem(string $name): bool
{
$channel = $this->_connections[$name] ?? null;
if ($channel === null) {
return false;
}
return !$channel->isEmpty();
}
/**
* @param string $name
* @return int
*/
public function size(string $name): int
{
$channel = $this->_connections[$name] ?? null;
if ($channel === null) {
return 0;
}
return $channel->size();
}
/**
* @param string $name
* @param mixed $client
* @throws ConfigException
*/
public function push(string $name, mixed $client): void
{
$this->channel($name)->push($client);
}
/**
* @param $name
* @param int $time
* @return array
* @throws ConfigException
*/
public function waite($name, int $time = 30): mixed
{
return $this->channel($name)->pop($time);
}
/**
* @param string $name
* @throws Exception
*/
public function clean(string $name): void
{
$channel = $this->_connections[$name] ?? null;
if ($channel === null) {
return;
}
$channel->tailor(0);
$channel->close();
}
/**
* @return PoolItem[]
*/
protected function channels(): array
{
return $this->_connections;
}
}
-118
View File
@@ -1,118 +0,0 @@
<?php
declare(strict_types=1);
namespace Kiri\Pool;
use Closure;
use Kiri\Di\Context;
use Swoole\Coroutine\Channel;
class PoolItem
{
/**
* @var Channel|SplQueue
*/
private Channel|SplQueue $_items;
/**
* @var int
*/
private int $created = 0;
/**
* @param int $maxCreated
* @param Closure|array $callback
*/
public function __construct(readonly public int $maxCreated, readonly public Closure|array $callback)
{
if (Context::inCoroutine()) {
$this->_items = new Channel($this->maxCreated);
} else {
$this->_items = new SplQueue($this->maxCreated);
}
}
/**
* @param Channel|SplQueue $items
*/
public function setItems(Channel|SplQueue $items): void
{
$this->_items = $items;
}
/**
* @param mixed $item
* @return void
*/
public function push(mixed $item): void
{
if (is_null($item)) {
$item = call_user_func($this->callback);
}
$this->_items->push($item);
}
/**
* @return bool
*/
public function isEmpty(): bool
{
return $this->_items->isEmpty();
}
/**
* @return int
*/
public function size(): int
{
return $this->_items->length();
}
/**
* @return bool
*/
public function close(): bool
{
return $this->_items->close();
}
/**
* @param int $min
* @return void
*/
public function tailor(int $min = 0): void
{
while ($this->_items->length() > $min) {
$connection = $this->_items->pop(0.000001);
if ($connection instanceof StopHeartbeatCheck) {
$connection->stopHeartbeatCheck();
}
$connection = null;
$this->created -= 1;
}
}
/**
* @param int $waite
* @return mixed
*/
public function pop(int $waite = 10): mixed
{
if ($this->_items->isEmpty()) {
return call_user_func($this->callback);
} else {
return $this->_items->pop($waite);
}
}
}
-62
View File
@@ -1,62 +0,0 @@
<?php
declare(strict_types=1);
namespace Kiri\Pool;
interface QueueInterface
{
/**
* @return bool
*/
public function isEmpty(): bool;
/**
* @param mixed $data
* @param float $timeout
* @return bool
*/
public function push(mixed $data, float $timeout = -1): bool;
/**
* @param float $timeout
* @return mixed
*/
public function pop(float $timeout = -1): mixed;
/**
* @return array
*/
public function stats(): array;
/**
* @return bool
*/
public function close(): bool;
/**
* @return int
*/
public function length(): int;
/**
* @return bool
*/
public function isFull(): bool;
/**
* @return bool
*/
public function isClose(): bool;
}
-129
View File
@@ -1,129 +0,0 @@
<?php
declare(strict_types=1);
namespace Kiri\Pool;
use JetBrains\PhpStorm\Pure;
/**
*
*/
class SplQueue implements QueueInterface
{
/**
* @var \SplQueue
*/
private \SplQueue $channel;
/**
* @var int
*/
public int $errCode = 0;
/**
* @param int $max
*/
#[Pure] public function __construct(public int $max)
{
$this->channel = new \SplQueue();
}
/**
* @return bool
*/
public function isEmpty(): bool
{
// TODO: Implement isEmpty() method.
return $this->channel->count() < 1;
}
/**
* @param mixed $data
* @param float $timeout
* @return bool
*/
public function push(mixed $data, float $timeout = -1): bool
{
// TODO: Implement push() method.
if ($this->isFull()) {
return false;
}
$this->channel->enqueue($data);
return true;
}
/**
* @param float $timeout
* @return mixed
*/
public function pop(float $timeout = -1): mixed
{
// TODO: Implement pop() method.
if ($this->channel->count() < 1) {
return null;
}
return $this->channel->dequeue();
}
/**
* @return array
*/
public function stats(): array
{
// TODO: Implement stats() method.
return [
'consumer_num' => 0,
'producer_num' => 0,
'queue_num' => $this->length()
];
}
/**
* @return bool
*/
public function close(): bool
{
// TODO: Implement close() method.
return false;
}
/**
* @return int
*/
public function length(): int
{
// TODO: Implement length() method.
return $this->channel->count();
}
/**
* @return bool
*/
public function isFull(): bool
{
// TODO: Implement isFull() method.
return $this->channel->count() >= $this->max;
}
/**
* @return bool
*/
public function isClose(): bool
{
return false;
}
}
-13
View File
@@ -1,13 +0,0 @@
<?php
declare(strict_types=1);
namespace Kiri\Pool;
interface StopHeartbeatCheck
{
public function stopHeartbeatCheck();
}
+20 -57
View File
@@ -10,19 +10,10 @@ declare(strict_types=1);
namespace Kiri\Redis; namespace Kiri\Redis;
use Exception; use Exception;
use JetBrains\PhpStorm\ArrayShape;
use Kiri; use Kiri;
use Kiri\Abstracts\Component;
use Kiri\Events\EventProvider;
use Kiri\Di\Inject\Container;
use Kiri\Exception\ConfigException;
use Kiri\Exception\RedisConnectException; use Kiri\Exception\RedisConnectException;
use Kiri\Pool\Pool; use Kiri\Pool\Pool;
use Kiri\Server\Events\OnWorkerExit; use Kiri\Server\Events\OnWorkerExit;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\ContainerInterface;
use Psr\Container\NotFoundExceptionInterface;
use RedisException;
use ReflectionException; use ReflectionException;
/** /**
@@ -156,10 +147,7 @@ SCRIPT;
* @param $name * @param $name
* @param $arguments * @param $arguments
* @return mixed * @return mixed
* @throws ConfigException * @throws
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
*/ */
public function proxy($name, $arguments): mixed public function proxy($name, $arguments): mixed
{ {
@@ -193,57 +181,32 @@ SCRIPT;
{ {
$pool = Kiri::getPool(); $pool = Kiri::getPool();
if (!$pool->hasChannel($this->host)) { if (!$pool->hasChannel($this->host)) {
$config = $this->get_config(); $pool->created($this->host, \config('cache.redis.pool.max', 10), [$this, 'connect']);
$length = \config('cache.redis.pool.max', 10);
$pool->created($config['host'], $length, $this->connect($config));
} }
return $pool; return $pool;
} }
/** /**
* @param $config * @return \Redis
* @return \Closure * @throws RedisConnectException
*/ */
protected function connect($config): \Closure protected function connect(): \Redis
{ {
return static function () use ($config) { $redis = new \Redis();
$redis = new \Redis(); if (!$redis->connect($this->host, $this->port, $this->timeout)) {
if (!$redis->connect($config['host'], $config['port'], $config['timeout'])) { throw new RedisConnectException(sprintf('The Redis Connect %s::%d Fail.', $this->host, $this->port));
throw new RedisConnectException(sprintf('The Redis Connect %s::%d Fail.', $config['host'], $config['port'])); }
} if (!empty($this->auth) && !$redis->auth($this->auth)) {
if (!empty($config['auth']) && !$redis->auth($config['auth'])) { throw new RedisConnectException(sprintf('Redis Error: %s, Host %s, Auth %s', $redis->getLastError(), $this->host, $this->auth));
throw new RedisConnectException(sprintf('Redis Error: %s, Host %s, Auth %s', $redis->getLastError(), $config['host'], $config['auth'])); }
} $redis->select($this->databases);
if ($config['read_timeout'] < 0) { if ($this->read_timeout > 0) {
$config['read_timeout'] = 0; $redis->setOption(\Redis::OPT_READ_TIMEOUT, $this->read_timeout);
} }
$redis->select($config['databases']); if (!empty($this->prefix)) {
if ($config['read_timeout'] > 0) { $redis->setOption(\Redis::OPT_PREFIX, $this->prefix);
$redis->setOption(\Redis::OPT_READ_TIMEOUT, $config['read_timeout']); }
} return $redis;
$redis->setOption(\Redis::OPT_PREFIX, $config['prefix']);
return $redis;
};
} }
/**
* @return array
*/
#[ArrayShape(['host' => "string", 'port' => "int", 'prefix' => "string", 'auth' => "string", 'databases' => "int", 'timeout' => "int", 'read_timeout' => "int", 'pool' => "array|int[]"])]
public function get_config(): array
{
return [
'host' => $this->host,
'port' => $this->port,
'prefix' => $this->prefix,
'auth' => $this->auth,
'databases' => $this->databases,
'timeout' => $this->timeout,
'read_timeout' => $this->read_timeout,
'pool' => $this->pool
];
}
} }