This commit is contained in:
2023-04-19 10:51:08 +08:00
parent be6953b83f
commit b2066bfad7
6 changed files with 30 additions and 328 deletions
-193
View File
@@ -1,193 +0,0 @@
<?php
declare(strict_types=1);
namespace Kiri\Pool;
use Exception;
use Kiri;
use Kiri\Abstracts\Component;
use Kiri\Di\Context;
use Kiri\Exception\ConfigException;
use PDO;
use Swoole\Error;
use Throwable;
/**
* Class Connection
* @package Kiri\Pool
*/
class Connection extends Component
{
private array $master = [];
private int $total = 0;
/**
* @param Pool $pool
* @param array $config
* @throws Exception
*/
public function __construct(public Pool $pool, array $config = [])
{
parent::__construct();
}
/**
* @param $name
* @return bool
*/
public function inTransaction($name): bool
{
$connection = Context::get($name);
if ($connection instanceof PDO) {
return $connection->inTransaction();
}
return false;
}
/**
* @param $coroutineName
* @throws Exception
*/
public function beginTransaction($coroutineName)
{
$connection = $this->get($coroutineName);
if ($connection instanceof PDO) {
$connection->beginTransaction();
}
}
/**
* @param $coroutineName
* @throws Exception
*/
public function commit($coroutineName)
{
$connection = Context::get($coroutineName);
if ($connection instanceof PDO) {
$connection->commit();
}
}
/**
* @param $coroutineName
* @throws Exception
*/
public function rollback($coroutineName)
{
$connection = Context::get($coroutineName);
if ($connection instanceof PDO) {
$connection->rollBack();
}
}
/**
* @param string $cds
* @return PDO|bool|null
* @throws Exception
*/
public function get(string $cds): null|PDO|bool
{
return $this->pool->get($cds);
}
/**
* @param string $name
* @return array
*/
public function check(string $name): array
{
return $this->pool->check($name);
}
/**
* @param string $name
* @param PDO $PDO
* @return void
* @throws ConfigException
*/
public function addItem(string $name, PDO $PDO): void
{
$this->pool->push($name, $PDO);
}
/**
* @param $coroutineName
* @throws Kiri\Exception\ConfigException
* @throws Exception
*/
public function release($coroutineName)
{
$client = Context::get($coroutineName);
if (!($client instanceof PDO) || $client->inTransaction()) {
return;
}
$this->pool->push($coroutineName, $client);
Context::remove($coroutineName);
}
/**
* @throws Exception
*/
public function flush($coroutineName, $minNumber = 1)
{
$this->pool->flush($coroutineName, $minNumber);
}
/**
* batch release
* @throws Exception
*/
public function connection_clear($name)
{
$this->pool->clean($name);
}
/**
* @param string $name
* @param mixed $client
* @return bool
* @throws Exception
*/
public function checkCanUse(string $name, mixed $client): bool
{
try {
if (empty($client) || !($client instanceof PDO)) {
$result = false;
} else {
$result = true;
}
} catch (Error|Throwable $exception) {
$result = addError($exception, 'mysql');
} finally {
return $result;
}
}
/**
* @param $coroutineName
* @throws Exception
*/
public function disconnect($coroutineName)
{
Context::remove($coroutineName);
$this->pool->clean($coroutineName);
}
}
+5 -5
View File
@@ -26,7 +26,7 @@ class Pool extends Component
* @param $retain_number
* @throws Exception
*/
public function flush($name, $retain_number)
public function flush($name, $retain_number): void
{
if ($this->hasChannel($name)) {
$channel = $this->channel($name);
@@ -90,7 +90,7 @@ class Pool extends Component
* @param int $max
* @param \Closure $closure
*/
public function initConnections($name, int $max, \Closure $closure)
public function initConnections($name, int $max, \Closure $closure): void
{
if (!isset($this->_connections[$name])) {
$this->_connections[$name] = new PoolItem($max, $closure);
@@ -189,7 +189,7 @@ class Pool extends Component
* @param mixed $client
* @throws ConfigException
*/
public function push(string $name, mixed $client)
public function push(string $name, mixed $client): void
{
$this->channel($name)->push($client);
}
@@ -211,7 +211,7 @@ class Pool extends Component
* @param string $name
* @throws Exception
*/
public function clean(string $name)
public function clean(string $name): void
{
$channel = $this->_connections[$name] ?? null;
if ($channel === null) {
@@ -223,7 +223,7 @@ class Pool extends Component
/**
* @return PoolQueue[]
* @return PoolItem[]
*/
protected function channels(): array
{
-110
View File
@@ -1,110 +0,0 @@
<?php
declare(strict_types=1);
namespace Kiri\Pool;
use Kiri\Di\Context;
use Swoole\Coroutine\Channel;
class PoolQueue implements QueueInterface
{
private Channel|SplQueue $queue;
/**
* @param int $max
*/
public function __construct(public int $max)
{
if (Context::inCoroutine()) {
$this->queue = new Channel($this->max);
} else {
$this->queue = new SplQueue($this->max);
}
}
/**
* @return bool
*/
public function isEmpty(): bool
{
return $this->queue->isEmpty();
}
/**
* @param mixed $data
* @param float $timeout
* @return bool
*/
public function push(mixed $data, float $timeout = -1): bool
{
if ($this->isFull()) {
return false;
}
if (!$this->isClose()) {
return $this->queue->push($data, $timeout);
}
return false;
}
/**
* @param float $timeout
* @return mixed
*/
public function pop(float $timeout = 0): mixed
{
return $this->queue->pop($timeout);
}
/**
* @return array
*/
public function stats(): array
{
return $this->queue->stats();
}
/**
* @return bool
*/
public function close(): bool
{
return $this->queue->close();
}
/**
* @return int
*/
public function length(): int
{
return $this->queue->length();
}
/**
* @return bool
*/
public function isFull(): bool
{
return $this->queue->isFull();
}
/**
* @return bool
*/
public function isClose(): bool
{
if ($this->queue instanceof Channel) {
return $this->queue->errCode == SWOOLE_CHANNEL_CLOSED;
}
return false;
}
}
+11 -16
View File
@@ -22,6 +22,8 @@ use Kiri\Server\Events\OnWorkerExit;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\ContainerInterface;
use Psr\Container\NotFoundExceptionInterface;
use RedisException;
use ReflectionException;
/**
* Class Redis
@@ -66,12 +68,8 @@ class Redis extends Component
$config = $this->get_config();
$length = Config::get('cache.redis.pool.max', 10);
$eventProvider = Kiri::getDi()->get(EventProvider::class);
$eventProvider->on(OnWorkerExit::class, [$this, 'destroy'], 0);
$pool = Kiri::getDi()->get(Pool::class);
$pool->initConnections($config['host'], $length, static function () use ($config) {
on(OnWorkerExit::class, [$this, 'destroy']);
Kiri::getPool()->initConnections($config['host'], $length, static function () use ($config) {
$redis = new \Redis();
if (!$redis->connect($config['host'], $config['port'], $config['timeout'])) {
throw new RedisConnectException(sprintf('The Redis Connect %s::%d Fail.', $config['host'], $config['port']));
@@ -113,7 +111,7 @@ class Redis extends Component
* @param $key
* @param int $timeout
* @return bool
* @throws \RedisException
* @throws RedisException
*/
public function waite($key, int $timeout = 5): bool
{
@@ -162,12 +160,11 @@ SCRIPT;
/**
* @return void
* @throws \ReflectionException
* @throws
*/
public function destroy(): void
{
$pool = Kiri::getDi()->get(Pool::class);
$pool->clean($this->host);
Kiri::getPool()->clean($this->host);
}
@@ -178,7 +175,7 @@ SCRIPT;
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws \ReflectionException
* @throws ReflectionException
*/
public function proxy($name, $arguments): mixed
{
@@ -188,8 +185,7 @@ SCRIPT;
} catch (\Throwable $throwable) {
$response = addError($throwable, 'redis');
} finally {
$pool = Kiri::getDi()->get(Pool::class);
$pool->push($this->host, $client);
Kiri::getPool()->push($this->host, $client);
}
return $response;
}
@@ -198,12 +194,11 @@ SCRIPT;
/**
* @return \Redis
* @throws ConfigException
* @throws \ReflectionException
* @throws ReflectionException
*/
private function getClient(): \Redis
{
$pool = Kiri::getDi()->get(Pool::class);
return $pool->get($this->host);
return Kiri::getPool()->get($this->host);
}