This commit is contained in:
xl
2023-05-20 23:05:38 +08:00
parent de2698ee12
commit d72ff2f82c
7 changed files with 183 additions and 163 deletions
+65 -70
View File
@@ -5,13 +5,13 @@ namespace Kiri\Rpc;
use Exception;
use Kiri;
use Kiri\Abstracts\Component;
use Kiri\Annotation\Inject;
use Kiri\Events\EventProvider;
use Kiri\Exception\ConfigException;
use Kiri\Pool\Alias;
use Kiri\Pool\Pool;
use Kiri\Di\Inject\Container;
use Kiri\Server\Events\OnBeforeShutdown;
use Swoole\Client;
use ReflectionException;
use Swoole\Coroutine\Client;
/**
@@ -20,86 +20,81 @@ use Swoole\Client;
class ClientPool extends Component
{
const POOL_NAME = 'rpc.client.pool';
use Alias;
public int $max;
public int $max;
public int $min;
public int $min;
private array $names = [];
public int $waite;
public function init(): void
{
on(OnBeforeShutdown::class, [$this, 'onBeforeShutdown']);
}
#[Inject(EventProvider::class)]
public EventProvider $provider;
/**
* @return void
* @throws Exception
*/
public function onBeforeShutdown(): void
{
$pool = Kiri::getDi()->get(Pool::class);
foreach ($this->names as $name) {
$pool->clean($name);
}
}
private array $names = [];
/**
* @param $config
* @return resource
* @throws ConfigException
* @throws ReflectionException
*/
public function get($config): mixed
{
$coroutineName = $config['Address'] . '::' . $config['Port'];
if (!in_array($coroutineName, $this->names)) {
$this->names[] = $coroutineName;
}
return $this->getPool($config['Address'], $config['Port'])->get($coroutineName);
}
public function init()
{
$this->provider->on(OnBeforeShutdown::class, [$this, 'onBeforeShutdown']);
}
/**
* @param Client|\Swoole\Client $client
* @param $host
* @param $port
* @throws ConfigException|ReflectionException
*/
public function push(Client|\Swoole\Client $client, $host, $port)
{
$this->getPool($host, $port)->push($host . '::' . $port, $client);
}
/**
* @return void
* @throws Exception
*/
public function onBeforeShutdown(): void
{
foreach ($this->names as $name) {
$this->getPool()->clean($name);
}
}
/**
* @param $config
* @param callable $callback
* @return mixed
* @throws ConfigException
* @throws Exception
*/
public function get($config, callable $callback): mixed
{
$coroutineName = $this->name(self::POOL_NAME . '::' . $config['Address'] . '::' . $config['Port'], true);
$pool = $config['pool'] ?? ['min' => 1, 'max' => 100];
$this->names[] = $coroutineName;
return $this->getPool()->get($coroutineName, $callback, $pool['min'] ?? 1);
}
/**
* @param \Swoole\Coroutine\Client|Client $client
* @param $host
* @param $port
* @throws ConfigException
* @throws Exception
*/
public function push(\Swoole\Coroutine\Client|Client $client, $host, $port)
{
$coroutineName = $this->name(self::POOL_NAME . '::' . $host . '::' . $port, true);
$this->getPool()->push($coroutineName, $client);
}
/**
* @return Pool
* @throws Exception
*/
public function getPool(): Pool
{
return Kiri::getDi()->get(Pool::class);
}
/**
* @param $host
* @param $port
* @return Pool
* @throws ReflectionException
*/
public function getPool($host, $port): Pool
{
$pool = Kiri::getDi()->get(Pool::class);
$pool->initConnections($host . '::' . $port, 10, function () use ($host, $port) {
$client = stream_socket_client("tcp://$host:$port", $errCode, $errMessage, 3);
if ($client === false) {
throw new Exception('Connect ' . $host . '::' . $port . ' fail');
}
return $client;
});
return $pool;
}
}