modify plugin name

This commit is contained in:
2022-06-17 11:59:19 +08:00
parent 0a70a95b63
commit 92b5b248a3
7 changed files with 257 additions and 29 deletions
@@ -0,0 +1,27 @@
<?php
namespace Kiri\Abstracts;
use Kiri\Coordinator;
class CoordinatorManager
{
private static array $_waite = [];
/**
* @param string $category
* @return Coordinator
*/
public static function utility(string $category): Coordinator
{
if (!((static::$_waite[$category] ?? null) instanceof Coordinator)) {
static::$_waite[$category] = new Coordinator();
}
return static::$_waite[$category];
}
}
+55
View File
@@ -0,0 +1,55 @@
<?php
namespace Kiri;
class Coordinator
{
const WORKER_START = 'worker:start';
private bool $waite = true;
private static array $_waite = [];
/**
* @return bool
*/
public function isWaite(): bool
{
return $this->waite;
}
/**
* @return void
*/
public function yield(): void
{
if ($this->waite === false) {
return;
}
$this->yield();
}
/**
* @return void
*/
public function waite(): void
{
$this->waite = true;
}
/**
* @return void
*/
public function done(): void
{
$this->waite = false;
}
}
+30 -5
View File
@@ -13,6 +13,7 @@ use Kiri\Abstracts\Config;
use Kiri\Context;
use Swoole\Error;
use Throwable;
use Kiri\Abstracts\CoordinatorManager;
/**
* Class Connection
@@ -96,8 +97,21 @@ class Connection extends Component
public function get(mixed $config): ?PDO
{
$minx = Config::get('databases.pool.min', 1);
return $this->pool->get($config['cds'], static function () use ($config) {
$connect = Kiri::getDi()->create(PDO::class, [$config]);
CoordinatorManager::utility($config['cds'])->yield();
return $this->pool->get($config['cds'], $this->generate($config), $minx);
}
/**
* @param array $config
* @return Closure
*/
public function generate(array $config): Closure
{
return static function () use ($config) {
$connect = new PDO($config);
if (!Db::inTransactionsActive()) {
return $connect;
}
@@ -105,7 +119,18 @@ class Connection extends Component
$connect->beginTransaction();
}
return $connect;
}, $minx);
};
}
/**
* @param string $name
* @return void
* @throws Kiri\Exception\ConfigException
*/
public function check(string $name): void
{
$this->pool->check($name);
}
@@ -117,7 +142,7 @@ class Connection extends Component
public function create($coroutineName, $config): Closure
{
return static function () use ($coroutineName, $config) {
return Kiri::getDi()->create(PDO::class, [$config]);
return new PDO($config);
};
}
@@ -129,7 +154,7 @@ class Connection extends Component
* @throws Kiri\Exception\ConfigException
* @throws Exception
*/
public function addItem(string $name, PDO $PDO)
public function addItem(string $name, PDO $PDO): void
{
$this->pool->push($name, $PDO);
}
+32 -24
View File
@@ -4,12 +4,13 @@
namespace Kiri\Pool;
use Database\Mysql\PDO;
use Exception;
use Kiri\Abstracts\Component;
use Kiri\Abstracts\Config;
use Kiri\Abstracts\CoordinatorManager;
use Kiri\Context;
use Kiri\Exception\ConfigException;
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
@@ -20,7 +21,7 @@ use Swoole\Coroutine\Channel;
class Pool extends Component
{
/** @var Channel[] */
/** @var array<PoolQueue> */
private static array $_connections = [];
public int $max = 60;
@@ -40,10 +41,10 @@ class Pool extends Component
/**
* @param Channel|SplQueue $channel
* @param PoolQueue $channel
* @param $retain_number
*/
protected function pop(Channel|SplQueue $channel, $retain_number): void
protected function pop(PoolQueue $channel, $retain_number): void
{
while ($channel->length() > $retain_number) {
if (Context::inCoroutine()) {
@@ -56,6 +57,24 @@ class Pool extends Component
}
/**
* @param $name
* @return void
* @throws ConfigException
*/
public function check($name): void
{
CoordinatorManager::utility($name)->waite();
$channel = $this->channel($name);
while (($pdo = $channel->pop()) instanceof PDO) {
$pdo->check();
}
CoordinatorManager::utility($name)->done();
}
/**
* @param $name
* @param int $max
@@ -65,7 +84,7 @@ class Pool extends Component
{
if (isset(static::$_connections[$name])) {
$value = static::$_connections[$name];
if ($value instanceof Channel || $value instanceof SplQueue) {
if ($value instanceof PoolQueue) {
return;
}
}
@@ -76,11 +95,11 @@ class Pool extends Component
/**
* @param $name
* @return Channel|SplQueue
* @return PoolQueue
* @throws ConfigException
* @throws Exception
*/
private function getChannel($name): Channel|SplQueue
public function channel($name): PoolQueue
{
if (!isset(static::$_connections[$name])) {
$this->newChannel($name);
@@ -100,11 +119,7 @@ class Pool extends Component
if ($max == null) {
$max = Config::get('databases.pool.max', 10);
}
if (Coroutine::getCid() === -1) {
static::$_connections[$name] = new SplQueue($max);
} else {
static::$_connections[$name] = new Channel($max);
}
static::$_connections[$name] = new PoolQueue($max);
}
@@ -118,7 +133,7 @@ class Pool extends Component
*/
public function get($name, $callback, $minx): mixed
{
$channel = $this->getChannel($name);
$channel = $this->channel($name);
if (!$channel->isEmpty()) {
return $this->maxIdleQuantity($channel, $minx);
}
@@ -149,7 +164,7 @@ class Pool extends Component
*/
public function isNull($name): bool
{
return $this->getChannel($name)->isEmpty();
return $this->channel($name)->isEmpty();
}
@@ -198,7 +213,7 @@ class Pool extends Component
*/
public function push(string $name, mixed $client)
{
$channel = $this->getChannel($name);
$channel = $this->channel($name);
if (!$channel->isFull()) {
$channel->push($client);
}
@@ -216,13 +231,6 @@ class Pool extends Component
return;
}
while (static::$_connections[$name]->length() > 0) {
if (static::$_connections[$name] instanceof Channel)
{
if (!Context::inCoroutine())
{
break;
}
}
$client = static::$_connections[$name]->pop();
if ($client instanceof StopHeartbeatCheck) {
$client->stopHeartbeatCheck();
@@ -234,9 +242,9 @@ class Pool extends Component
/**
* @return Channel[]
* @return PoolQueue[]
*/
protected function getChannels(): array
protected function channels(): array
{
return static::$_connections;
}
+100
View File
@@ -0,0 +1,100 @@
<?php
namespace Kiri\Pool;
use Kiri\Context;
use Swoole\Coroutine\Channel;
class PoolQueue implements QueueInterface
{
private Channel|SplQueue $queue;
public function __construct(public int $max)
{
if (Context::inCoroutine()) {
$this->queue = new Channel($this->max);
} else {
$this->queue = new SplQueue($this->max);
}
}
/**
* @return bool
*/
public function isEmpty(): bool
{
return $this->queue->isEmpty();
}
/**
* @param mixed $data
* @param float $timeout
* @return bool
*/
public function push(mixed $data, float $timeout = -1): bool
{
return $this->queue->push($data, $timeout);
}
/**
* @param float $timeout
* @return mixed
*/
public function pop(float $timeout = -1): mixed
{
return $this->queue->pop($timeout);
}
/**
* @return array
*/
public function stats(): array
{
return $this->queue->stats();
}
/**
* @return bool
*/
public function close(): bool
{
return $this->queue->close();
}
/**
* @return int
*/
public function length(): int
{
return $this->queue->length();
}
/**
* @return bool
*/
public function isFull(): bool
{
return $this->queue->isFull();
}
/**
* @return bool
*/
public function isClose(): bool
{
if ($this->queue instanceof Channel) {
return $this->queue->errCode == SWOOLE_CHANNEL_CLOSED;
}
return false;
}
}
+3
View File
@@ -25,4 +25,7 @@ interface QueueInterface
public function isFull(): bool;
public function isClose(): bool;
}
+10
View File
@@ -98,4 +98,14 @@ class SplQueue implements QueueInterface
// TODO: Implement isFull() method.
return $this->channel->count() >= $this->max;
}
/**
* @return bool
*/
public function isClose(): bool
{
return false;
}
}