diff --git a/src/ClientPool.php b/src/ClientPool.php index d9937ee..172a4e5 100644 --- a/src/ClientPool.php +++ b/src/ClientPool.php @@ -2,17 +2,26 @@ namespace Kiri\Rpc; +use Exception; +use Kiri\Abstracts\Component; +use Kiri\Context; +use Kiri\Exception\ConfigException; +use Kiri\Kiri; +use Kiri\Pool\Alias; use Kiri\Pool\Pool; +use Swoole\Client; /** * */ -class ClientPool extends Pool +class ClientPool extends Component { const POOL_NAME = 'rpc.client.pool'; + use Alias; + public int $max; @@ -22,4 +31,47 @@ class ClientPool extends Pool public int $waite; + + /** + * @param $config + * @param callable $callback + * @return mixed + * @throws ConfigException + * @throws Exception + */ + public function get($config, callable $callback): mixed + { + $coroutineName = $this->name(self::POOL_NAME . '::' . $config['ServiceAddress'] . '::' . $config['ServicePort'], true); + + $pool = $config['pool'] ?? ['min' => 1, 'max' => 100]; + + $clients = $this->getPool()->get($coroutineName, $callback(), $pool['min'] ?? 1); + return Context::setContext($coroutineName, $clients); + } + + + /** + * @param \Swoole\Coroutine\Client|Client $client + * @param $host + * @param $port + * @throws ConfigException + * @throws Exception + */ + public function push(\Swoole\Coroutine\Client|Client $client, $host, $port) + { + $coroutineName = $this->name(self::POOL_NAME . '::' . $host . '::' . $port, true); + + $this->getPool()->push($coroutineName, $client); + } + + + /** + * @return Pool + * @throws Exception + */ + public function getPool(): Pool + { + return Kiri::getDi()->get(Pool::class); + } + } diff --git a/src/JsonRpcPoolTransporter.php b/src/JsonRpcPoolTransporter.php index 6ec2f29..7b3c961 100644 --- a/src/JsonRpcPoolTransporter.php +++ b/src/JsonRpcPoolTransporter.php @@ -49,7 +49,7 @@ class JsonRpcPoolTransporter implements ClientInterface $response = $this->request($client = $this->getClient(), $content, false); - $this->pool->push(self::POOL_NAME, $client); + $this->pool->push($client, $this->config['ServiceAddress'], $this->config['ServicePort']); return (new Response())->withBody(new Stream($response)); }