This commit is contained in:
2021-10-28 18:46:14 +08:00
parent 627b0560de
commit 9ccf9638dc
2 changed files with 54 additions and 2 deletions
+53 -1
View File
@@ -2,17 +2,26 @@
namespace Kiri\Rpc; namespace Kiri\Rpc;
use Exception;
use Kiri\Abstracts\Component;
use Kiri\Context;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use Kiri\Pool\Alias;
use Kiri\Pool\Pool; use Kiri\Pool\Pool;
use Swoole\Client;
/** /**
* *
*/ */
class ClientPool extends Pool class ClientPool extends Component
{ {
const POOL_NAME = 'rpc.client.pool'; const POOL_NAME = 'rpc.client.pool';
use Alias;
public int $max; public int $max;
@@ -22,4 +31,47 @@ class ClientPool extends Pool
public int $waite; public int $waite;
/**
* @param $config
* @param callable $callback
* @return mixed
* @throws ConfigException
* @throws Exception
*/
public function get($config, callable $callback): mixed
{
$coroutineName = $this->name(self::POOL_NAME . '::' . $config['ServiceAddress'] . '::' . $config['ServicePort'], true);
$pool = $config['pool'] ?? ['min' => 1, 'max' => 100];
$clients = $this->getPool()->get($coroutineName, $callback(), $pool['min'] ?? 1);
return Context::setContext($coroutineName, $clients);
}
/**
* @param \Swoole\Coroutine\Client|Client $client
* @param $host
* @param $port
* @throws ConfigException
* @throws Exception
*/
public function push(\Swoole\Coroutine\Client|Client $client, $host, $port)
{
$coroutineName = $this->name(self::POOL_NAME . '::' . $host . '::' . $port, true);
$this->getPool()->push($coroutineName, $client);
}
/**
* @return Pool
* @throws Exception
*/
public function getPool(): Pool
{
return Kiri::getDi()->get(Pool::class);
}
} }
+1 -1
View File
@@ -49,7 +49,7 @@ class JsonRpcPoolTransporter implements ClientInterface
$response = $this->request($client = $this->getClient(), $content, false); $response = $this->request($client = $this->getClient(), $content, false);
$this->pool->push(self::POOL_NAME, $client); $this->pool->push($client, $this->config['ServiceAddress'], $this->config['ServicePort']);
return (new Response())->withBody(new Stream($response)); return (new Response())->withBody(new Stream($response));
} }