This commit is contained in:
2023-04-03 13:45:59 +08:00
parent 7d574e9172
commit ac588a6f6f
5 changed files with 217 additions and 169 deletions
-24
View File
@@ -1,24 +0,0 @@
<?php
namespace Kiri\Pool;
use JetBrains\PhpStorm\Pure;
trait Alias
{
/**
* @param $cds
* @param false $isMaster
* @return string
*/
#[Pure] public function name($cds, bool $isMaster = false): string
{
if ($isMaster === true) {
return $cds . '_master';
} else {
return $cds . '_slave';
}
}
}
+12 -41
View File
@@ -92,53 +92,23 @@ class Connection extends Component
/**
* @param mixed $config
* @param string $cds
* @return PDO|null
* @throws ConfigException
* @throws Exception
*/
public function get(mixed $config): ?\Database\Mysql\PDO
public function get(string $cds): ?\Database\Mysql\PDO
{
if (!$this->pool->hasChannel($config['cds'])) {
$this->pool->initConnections($config['cds'], $config['pool']['max']);
if (!$this->pool->hasChannel($cds)) {
throw new Exception('Queue not exists.');
}
return $this->pool->get($config['cds'], $this->create($config));
}
/**
* @param array $config
* @return Closure
*/
public function generate(array $config): Closure
{
return static function () use ($config) {
Kiri::getDi()->get(Kiri\Error\StdoutLoggerInterface::class)->alert('create database connect(' . $config['cds'] . ')');
$link = new \PDO('mysql:dbname=' . $config['dbname'] . ';host=' . $config['cds'], $config['username'], $config['password'], [
\PDO::ATTR_EMULATE_PREPARES => false,
\PDO::ATTR_CASE => \PDO::CASE_NATURAL,
\PDO::ATTR_PERSISTENT => true,
\PDO::ATTR_TIMEOUT => $config['connect_timeout'],
\PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES ' . ($config['charset'] ?? 'utf8mb4')
]);
$link->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION);
$link->setAttribute(\PDO::ATTR_STRINGIFY_FETCHES, false);
$link->setAttribute(\PDO::ATTR_ORACLE_NULLS, \PDO::NULL_EMPTY_STRING);
foreach ($config['attributes'] as $key => $attribute) {
$link->setAttribute($key, $attribute);
}
if (Db::inTransactionsActive()) {
$link->beginTransaction();
}
return $link;
};
return $this->pool->get($cds);
}
/**
* @param string $name
* @return array
* @throws Kiri\Exception\ConfigException
*/
public function check(string $name): array
{
@@ -178,13 +148,14 @@ class Connection extends Component
/**
* @param $name
* @param $max
* @throws Exception
* @param array $config
* @param int $max
*/
public function initConnections($name, $max)
public function initConnections(array $config, int $max)
{
$this->pool->initConnections($name, $max);
$this->pool->initConnections($config['cds'], $max, function () use ($config) {
return new \Database\Mysql\PDO($config);
});
}
+66 -103
View File
@@ -9,6 +9,7 @@ use Exception;
use Kiri\Abstracts\Component;
use Kiri\Abstracts\Config;
use Kiri\Annotation\Inject;
use Kiri\Di\ContainerInterface;
use Kiri\Exception\ConfigException;
use Kiri\Server\Abstracts\StatusEnum;
use Kiri\Server\WorkerStatus;
@@ -21,8 +22,8 @@ use Kiri\Server\WorkerStatus;
class Pool extends Component
{
/** @var array<PoolQueue> */
private static array $_connections = [];
/** @var array<PoolItem> */
private array $_connections = [];
/**
* @var WorkerStatus
@@ -30,8 +31,11 @@ class Pool extends Component
#[Inject(WorkerStatus::class)]
public WorkerStatus $status;
use Alias;
/**
* @var ContainerInterface
*/
#[Inject(ContainerInterface::class)]
public ContainerInterface $container;
/**
@@ -42,106 +46,93 @@ class Pool extends Component
public function flush($name, $retain_number)
{
if ($this->hasChannel($name)) {
$this->pop($this->channel($name), $retain_number);
$channel = $this->channel($name);
$channel->tailor($retain_number);
}
}
/**
* @param PoolQueue $channel
* @param PoolItem $channel
* @param $retain_number
*/
protected function pop(PoolQueue $channel, $retain_number): void
protected function pop(PoolItem $channel, $retain_number): void
{
while ($channel->length() > $retain_number) {
$connection = $channel->pop(0.001);
if ($connection instanceof StopHeartbeatCheck) {
$connection->stopHeartbeatCheck();
}
}
$channel->tailor($retain_number);
}
/**
* @param $name
* @return array
* @throws ConfigException
*/
public function check($name): array
{
$channel = $this->channel($name);
if ($channel->length() < 1) {
return [0, 0];
}
if ($this->status->is(StatusEnum::EXIT)) {
$channel->close();
return [0, 0];
}
$success = 0;
$lists = [];
$count = $channel->length();
while ($this->status->is(StatusEnum::EXIT) === false) {
if (!(($pdo = $channel->pop(0.001)) instanceof PDO)) {
break;
}
if ($pdo->check()) {
$success += 1;
}
$lists[] = $pdo;
}
if ($this->status->is(StatusEnum::EXIT) === false) {
foreach ($lists as $list) {
$channel->push($list);
}
} else {
$channel->close();
}
return [$count, $success];
// $channel = $this->channel($name);
// if ($channel->size() < 1) {
// return [0, 0];
// }
//
// if ($this->status->is(StatusEnum::EXIT)) {
// $channel->close();
// return [0, 0];
// }
//
// $success = 0;
// $lists = [];
// $count = $channel->size();
// while ($this->status->is(StatusEnum::EXIT) === false) {
// if (!(($pdo = $channel->pop(0.001)) instanceof PDO)) {
// break;
// }
// if ($pdo->check()) {
// $success += 1;
// }
// $lists[] = $pdo;
// }
// if ($this->status->is(StatusEnum::EXIT) === false) {
// foreach ($lists as $list) {
// $channel->push($list);
// }
// } else {
// $channel->close();
// }
// return [$count, $success];
return [0, 0];
}
/**
* @param $name
* @param int $max
* @param \Closure $closure
*/
public function initConnections($name, int $max = 60)
public function initConnections($name, int $max, \Closure $closure)
{
$channel = static::$_connections[$name] ?? null;
if (($channel instanceof PoolQueue) && !$channel->isClose()) {
return;
if (!isset($this->_connections[$name])) {
$this->_connections[$name] = new PoolItem($max, $closure);
}
static::$_connections[$name] = new PoolQueue($max);
}
/**
* @param $name
* @return PoolQueue
* @return PoolItem
* @throws ConfigException
* @throws Exception
*/
public function channel($name): PoolQueue
public function channel($name): PoolItem
{
$channel = static::$_connections[$name] ?? null;
if (!($channel instanceof PoolQueue)) {
if (!isset($this->_connections[$name])) {
throw new Exception('Channel is not exists.');
}
if ($channel->isClose()) {
throw new Exception('Channel is Close.');
}
return $channel;
return $this->_connections[$name];
}
public function hasChannel($name): bool
{
$channel = static::$_connections[$name] ?? null;
if (!($channel instanceof PoolQueue)) {
return false;
}
if ($channel->isClose()) {
if (!isset($this->_connections[$name])) {
return false;
}
return true;
@@ -149,32 +140,13 @@ class Pool extends Component
/**
* @param $name
* @param $callback
* @param string $name
* @return array
* @throws ConfigException
* @throws Exception
*/
public function get($name, $callback): mixed
public function get(string $name): mixed
{
$channel = $this->channel($name);
if (!$channel->isEmpty()) {
return $channel->pop();
}
return $callback();
}
/**
* @param $channel
* @param $minx
* @return void
*/
protected function maxIdleQuantity($channel, $minx): void
{
if ($channel->length() > $minx) {
$this->pop($channel, $minx);
}
return $this->channel($name)->pop();
}
@@ -207,8 +179,8 @@ class Pool extends Component
*/
public function hasItem(string $name): bool
{
$channel = static::$_connections[$name] ?? null;
if (!($channel instanceof PoolQueue) || $channel->isClose()) {
$channel = $this->_connections[$name] ?? null;
if ($channel === null) {
return false;
}
return !$channel->isEmpty();
@@ -221,11 +193,11 @@ class Pool extends Component
*/
public function size(string $name): int
{
$channel = static::$_connections[$name] ?? null;
if (!($channel instanceof PoolQueue) || $channel->isClose()) {
$channel = $this->_connections[$name] ?? null;
if ($channel === null) {
return 0;
}
return $channel->length();
return $channel->size();
}
@@ -236,10 +208,7 @@ class Pool extends Component
*/
public function push(string $name, mixed $client)
{
$channel = $this->channel($name);
if (!$channel->isFull()) {
$channel->push($client);
}
$this->channel($name)->push($client);
}
@@ -255,23 +224,17 @@ class Pool extends Component
}
/**
* @param string $name
* @throws Exception
*/
public function clean(string $name)
{
$channel = static::$_connections[$name] ?? null;
if (!($channel instanceof PoolQueue) || $channel->isClose()) {
$channel = $this->_connections[$name] ?? null;
if ($channel === null) {
return;
}
while ($channel->length() > 0) {
$client = $channel->pop();
if ($client instanceof StopHeartbeatCheck) {
$client->stopHeartbeatCheck();
}
}
$channel->tailor(0);
$channel->close();
}
@@ -281,7 +244,7 @@ class Pool extends Component
*/
protected function channels(): array
{
return static::$_connections;
return $this->_connections;
}
+110
View File
@@ -0,0 +1,110 @@
<?php
namespace Kiri\Pool;
use Kiri\Annotation\Inject;
use Kiri\Di\Context;
use Swoole\Coroutine\Channel;
class PoolItem
{
/**
* @var PoolQueue
*/
private PoolQueue $_items;
/**
* @var int
*/
private int $created = 0;
/**
* @param int $maxCreated
* @param \Closure $callback
*/
public function __construct(readonly public int $maxCreated, readonly public \Closure $callback)
{
$this->_items = new PoolQueue($this->maxCreated);
}
/**
* @param PoolQueue $items
*/
public function setItems(PoolQueue $items): void
{
$this->_items = $items;
}
/**
* @param mixed $item
* @return void
*/
public function push(mixed $item): void
{
$this->_items->push($item);
}
/**
* @return bool
*/
public function isEmpty(): bool
{
return $this->_items->isEmpty();
}
/**
* @return bool
*/
public function size(): bool
{
return $this->_items->length();
}
/**
* @return bool
*/
public function close(): bool
{
return $this->_items->close();
}
/**
* @param int $min
* @return void
*/
public function tailor(int $min = 0): void
{
while ($this->_items->length() > $min) {
$connection = $this->_items->pop(0.000001);
if ($connection instanceof StopHeartbeatCheck) {
$connection->stopHeartbeatCheck();
}
$connection = null;
$this->created -= 1;
}
}
/**
* @param int $waite
* @return mixed
*/
public function pop(int $waite = 10): mixed
{
if ($this->created < $this->maxCreated) {
$this->created += 1;
return call_user_func($this->callback);
}
return $this->_items->pop($waite);
}
}
+29 -1
View File
@@ -6,26 +6,54 @@ interface QueueInterface
{
/**
* @return bool
*/
public function isEmpty(): bool;
public function push(mixed $data, float $timeout = -1): bool;
/**
* @param mixed $data
* @param float $timeout
* @return void
*/
public function push(mixed $data, float $timeout = -1): void;
/**
* @param float $timeout
* @return mixed
*/
public function pop(float $timeout = -1): mixed;
/**
* @return array
*/
public function stats(): array;
/**
* @return bool
*/
public function close(): bool;
/**
* @return int
*/
public function length(): int;
/**
* @return bool
*/
public function isFull(): bool;
/**
* @return bool
*/
public function isClose(): bool;
}