diff --git a/src/JsonRpcConsumers.php b/src/JsonRpcConsumers.php index 858875d..577997c 100644 --- a/src/JsonRpcConsumers.php +++ b/src/JsonRpcConsumers.php @@ -128,34 +128,4 @@ abstract class JsonRpcConsumers implements OnRpcConsumerInterface return ['ServiceAddress' => '127.0.0.1', 'ServicePort' => 9526]; } - - /** - * @param $config - * @return Coroutine\Client - * @throws Exception - */ - private function clientOnCoroutine($config): Coroutine\Client - { - $client = new Coroutine\Client(SWOOLE_SOCK_TCP); - if (!$client->connect($config['ServiceAddress'], $config['ServicePort'], 60)) { - throw new Exception('connect fail.'); - } - return $client; - } - - - /** - * @param $config - * @return Client - * @throws Exception - */ - private function clientNotCoroutine($config): Client - { - $client = new Client(SWOOLE_SOCK_TCP); - if (!$client->connect($config['ServiceAddress'], $config['ServicePort'], 60)) { - throw new Exception('connect fail.'); - } - return $client; - } - } diff --git a/src/JsonRpcPoolTransporter.php b/src/JsonRpcPoolTransporter.php index 322cd7c..6b872f1 100644 --- a/src/JsonRpcPoolTransporter.php +++ b/src/JsonRpcPoolTransporter.php @@ -46,9 +46,12 @@ class JsonRpcPoolTransporter implements ClientInterface public function sendRequest(RequestInterface $request): ResponseInterface { $content = $request->getBody()->getContents(); - return (new Response())->withBody( - new Stream($this->request($this->getClient(), $content)) - ); + + $response = $this->request($client = $this->newClient(), $content, false); + + $this->pool->push(self::POOL_NAME, $client); + + return (new Response())->withBody(new Stream($response)); } diff --git a/src/JsonRpcTransporter.php b/src/JsonRpcTransporter.php index 7684e3e..c937546 100644 --- a/src/JsonRpcTransporter.php +++ b/src/JsonRpcTransporter.php @@ -27,9 +27,10 @@ class JsonRpcTransporter implements ClientInterface public function sendRequest(RequestInterface $request): ResponseInterface { $content = $request->getBody()->getContents(); - return (new Response())->withBody( - new Stream($this->request($this->newClient(), $content)) - ); + + $response = $this->request($this->newClient(), $content, true); + + return (new Response())->withBody(new Stream($response)); } diff --git a/src/RpcJsonp.php b/src/RpcJsonp.php index 85dfaf4..4c0b05c 100644 --- a/src/RpcJsonp.php +++ b/src/RpcJsonp.php @@ -32,11 +32,6 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa #[Inject(Annotation::class)] public Annotation $annotation; - - #[Inject(ContainerInterface::class)] - public ContainerInterface $container; - - /** * * @throws \Exception @@ -62,12 +57,6 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa } } } - $config = Config::get('rpc.pool', null); - if (!is_null($config)) { - $this->container->mapping(RpcClientInterface::class, JsonRpcPoolTransporter::class); - } else { - $this->container->mapping(RpcClientInterface::class, JsonRpcTransporter::class); - } } diff --git a/src/TraitTransporter.php b/src/TraitTransporter.php index 32b1045..5aa97db 100644 --- a/src/TraitTransporter.php +++ b/src/TraitTransporter.php @@ -28,13 +28,16 @@ trait TraitTransporter /** * @param Client|Coroutine\Client $client * @param $content + * @param bool $isClose * @return mixed */ - private function request(Client|Coroutine\Client $client, $content): mixed + private function request(Client|Coroutine\Client $client, $content, bool $isClose): mixed { $client->send($content); $read = $client->recv(); - $client->close(); + if ($isClose) { + $client->close(); + } return $read; }