This commit is contained in:
2021-08-24 15:47:31 +08:00
parent 386e855939
commit 16d8d44437
3 changed files with 292 additions and 159 deletions
+28
View File
@@ -0,0 +1,28 @@
<?php
namespace Kiri\Pool\Helper;
interface QueueInterface
{
public function isEmpty(): bool;
public function push(mixed $data, float $timeout = -1): bool;
public function pop(float $timeout = -1): mixed;
public function stats(): array;
public function close(): bool;
public function length(): int;
public function isFull(): bool;
}
+101
View File
@@ -0,0 +1,101 @@
<?php
namespace Kiri\Pool\Helper;
use JetBrains\PhpStorm\Pure;
/**
*
*/
class SplQueue implements QueueInterface
{
private \SplQueue $channel;
public int $errCode = 0;
/**
* @param int $max
*/
#[Pure] public function __construct(public int $max)
{
$this->channel = new \SplQueue();
}
/**
* @return bool
*/
public function isEmpty(): bool
{
// TODO: Implement isEmpty() method.
return $this->channel->count() < 1;
}
/**
* @param mixed $data
* @param float $timeout
* @return bool
*/
public function push(mixed $data, float $timeout = -1): bool
{
// TODO: Implement push() method.
$this->channel->enqueue($data);
return true;
}
/**
* @param float $timeout
* @return mixed
*/
public function pop(float $timeout = -1): mixed
{
// TODO: Implement pop() method.
return $this->channel->dequeue();
}
/**
* @return array
*/
public function stats(): array
{
// TODO: Implement stats() method.
return [];
}
/**
* @return bool
*/
public function close(): bool
{
// TODO: Implement close() method.
return false;
}
/**
* @return int
*/
public function length(): int
{
// TODO: Implement length() method.
return $this->channel->count();
}
/**
* @return bool
*/
public function isFull(): bool
{
// TODO: Implement isFull() method.
return $this->channel->count() >= $this->max;
}
}
+163 -159
View File
@@ -5,14 +5,12 @@ namespace Kiri\Pool;
use Exception;
use JetBrains\PhpStorm\Pure;
use Kiri\Abstracts\Component;
use Kiri\Abstracts\Config;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use Kiri\Pool\Helper\SplQueue;
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use Swoole\Timer;
/**
@@ -22,79 +20,94 @@ use Swoole\Timer;
class Pool extends Component
{
/** @var Channel[] */
private static array $_connections = [];
/** @var Channel[] */
private static array $_connections = [];
public int $max = 60;
public int $max = 60;
use Alias;
use Alias;
/**
* @param $channel
* @param $retain_number
* @throws Exception
*/
public function flush($channel, $retain_number)
{
$this->pop($channel, $retain_number);
}
/**
* @param $channel
* @param $retain_number
* @throws Exception
*/
public function flush($channel, $retain_number)
{
$this->pop($channel, $retain_number);
}
/**
* @param Channel $channel
* @param $retain_number
* @throws Exception
*/
protected function pop(Channel $channel, $retain_number): void
{
if (Coroutine::getCid() === -1) {
return;
}
while ($channel->length() > $retain_number) {
$connection = $channel->pop();
if ($connection instanceof StopHeartbeatCheck) {
$connection->stopHeartbeatCheck();
}
}
}
/**
* @param Channel $channel
* @param $retain_number
* @throws Exception
*/
protected function pop(Channel $channel, $retain_number): void
{
while ($channel->length() > $retain_number) {
$connection = $channel->pop();
if ($connection instanceof StopHeartbeatCheck) {
$connection->stopHeartbeatCheck();
}
}
}
/**
* @param $name
* @param false $isMaster
* @param int $max
*/
public function initConnections($name, bool $isMaster = false, int $max = 60)
{
$name = $this->name($name, $isMaster);
if (isset(static::$_connections[$name]) && static::$_connections[$name] instanceof Channel) {
return;
}
if (Coroutine::getCid() === -1) {
return;
}
static::$_connections[$name] = new Channel($max);
$this->max = $max;
}
/**
* @param $name
* @param false $isMaster
* @param int $max
* @throws ConfigException
*/
public function initConnections($name, bool $isMaster = false, int $max = 60)
{
$name = $this->name($name, $isMaster);
if (isset(static::$_connections[$name])) {
return;
}
$value = static::$_connections[$name];
if ($value instanceof Channel || $value instanceof SplQueue) {
return;
}
$this->newChannel($name, $max);
$this->max = $max;
}
/**
* @param $name
* @return Channel
* @throws ConfigException
* @throws Exception
*/
private function getChannel($name): Channel
{
if (!isset(static::$_connections[$name])) {
static::$_connections[$name] = new Channel(Config::get('databases.pool.max', 10));
}
if (static::$_connections[$name]->errCode == SWOOLE_CHANNEL_CLOSED) {
throw new Exception('Channel is Close.');
}
return static::$_connections[$name];
}
/**
* @param $name
* @return Channel
* @throws ConfigException
* @throws Exception
*/
private function getChannel($name): Channel
{
if (!isset(static::$_connections[$name])) {
$this->newChannel($name);
}
if (static::$_connections[$name]->errCode == SWOOLE_CHANNEL_CLOSED) {
throw new Exception('Channel is Close.');
}
return static::$_connections[$name];
}
/**
* @throws ConfigException
*/
private function newChannel($name, $max = null)
{
if ($max !== null) {
$max = Config::get('databases.pool.max', 10);
}
if (Coroutine::getCid() === -1) {
static::$_connections[$name] = new SplQueue($max);
} else {
static::$_connections[$name] = new Channel($max);
}
}
/**
@@ -103,113 +116,104 @@ class Pool extends Component
* @return array
* @throws ConfigException
*/
public function get($name, $callback): mixed
{
if (Coroutine::getCid() === -1) {
return $callback();
}
$channel = $this->getChannel($name);
if (!$channel->isEmpty()) {
$connection = $channel->pop();
if ($this->checkCanUse($name, $connection)) {
return $connection;
}
}
return $callback();
}
public function get($name, $callback): mixed
{
$channel = $this->getChannel($name);
if (!$channel->isEmpty()) {
$connection = $channel->pop();
if ($this->checkCanUse($name, $connection)) {
return $connection;
}
}
return $callback();
}
/**
* @param $name
* @return bool
* @throws ConfigException
*/
public function isNull($name): bool
{
return $this->getChannel($name)->isEmpty();
}
/**
* @param $name
* @return bool
* @throws ConfigException
*/
public function isNull($name): bool
{
return $this->getChannel($name)->isEmpty();
}
/**
* @param string $name
* @param mixed $client
* @return bool
* 检查连接可靠性
*/
public function checkCanUse(string $name, mixed $client): bool
{
return true;
}
/**
* @param string $name
* @param mixed $client
* @return bool
* 检查连接可靠性
*/
public function checkCanUse(string $name, mixed $client): bool
{
return true;
}
/**
* @param string $name
* @return bool
*/
public function hasItem(string $name): bool
{
if (isset(static::$_connections[$name])) {
return !static::$_connections[$name]->isEmpty();
}
return false;
}
/**
* @param string $name
* @return bool
*/
public function hasItem(string $name): bool
{
if (isset(static::$_connections[$name])) {
return !static::$_connections[$name]->isEmpty();
}
return false;
}
/**
* @param string $name
* @return mixed
*/
public function size(string $name): mixed
{
if (Coroutine::getCid() === -1) {
return 0;
}
if (!isset(static::$_connections[$name])) {
return 0;
}
return static::$_connections[$name]->length();
}
/**
* @param string $name
* @return mixed
*/
public function size(string $name): mixed
{
if (!isset(static::$_connections[$name])) {
return 0;
}
return static::$_connections[$name]->length();
}
/**
* @param string $name
* @param mixed $client
* @throws ConfigException
*/
public function push(string $name, mixed $client)
{
if (Coroutine::getCid() === -1) {
return;
}
$channel = $this->getChannel($name);
if (!$channel->isFull()) {
$channel->push($client);
}
unset($client);
}
/**
* @param string $name
* @param mixed $client
* @throws ConfigException
*/
public function push(string $name, mixed $client)
{
$channel = $this->getChannel($name);
if (!$channel->isFull()) {
$channel->push($client);
}
unset($client);
}
/**
* @param string $name
* @throws Exception
*/
public function clean(string $name)
{
if (Coroutine::getCid() === -1 || !isset(static::$_connections[$name])) {
return;
}
$channel = static::$_connections[$name];
$this->pop($channel, 0);
}
/**
* @param string $name
* @throws Exception
*/
public function clean(string $name)
{
if (!isset(static::$_connections[$name])) {
return;
}
$channel = static::$_connections[$name];
$this->pop($channel, 0);
}
/**
* @return Channel[]
*/
protected function getChannels(): array
{
return static::$_connections;
}
/**
* @return Channel[]
*/
protected function getChannels(): array
{
return static::$_connections;
}
}