diff --git a/composer.json b/composer.json index 736cb82..c9a4dcc 100644 --- a/composer.json +++ b/composer.json @@ -11,7 +11,9 @@ "require": { "php": ">=8.0", "ext-json": "*", - "game-worker/kiri-consul": "dev-master" + "game-worker/kiri-consul": "dev-master", + "psr/http-client": "^1.0", + "psr/http-message": "^1.0" }, "autoload": { "psr-4": { diff --git a/src/ClientPool.php b/src/ClientPool.php new file mode 100644 index 0000000..d9937ee --- /dev/null +++ b/src/ClientPool.php @@ -0,0 +1,25 @@ +get_consul($this->name); - if (Context::inCoroutine()) { - $client = $this->clientOnCoroutine($config); - } else { - $client = $this->clientNotCoroutine($config); - } - $client->send(json_encode(['jsonrpc' => $version, 'service' => $this->name, 'method' => $method, 'params' => $data])); - $client->recv(1); - $client->close(); + $transporter = Kiri::getDi()->get(RpcClientInterface::class); + $transporter->withConfig($config)->sendRequest( + $this->requestBody([ + 'jsonrpc' => $version, + 'service' => $this->name, + 'method' => $method, + 'params' => $data, + ]) + ); + } + + + /** + * @param array $data + * @return ServerRequestInterface + */ + private function requestBody(array $data): ServerRequestInterface + { + $server = Kiri::getDi()->get(ServerRequest::class); + return $server->withBody(new Stream(json_encode($data))); } @@ -53,43 +71,39 @@ abstract class JsonRpcConsumers implements OnRpcConsumerInterface * @param string $id * @return mixed * @throws Exception + * @throws ClientExceptionInterface */ - public function get(string $method, mixed $data, string $version = '2.0', string $id = ''): mixed + public function get(string $method, mixed $data, string $version = '2.0', string $id = ''): ResponseInterface { - $config = $this->get_consul($this->name); - if (Context::inCoroutine()) { - $client = $this->clientOnCoroutine($config); - } else { - $client = $this->clientNotCoroutine($config); - } - if (empty($id)) $id = Number::create(time()); - $client->send(json_encode(['jsonrpc' => $version, 'service' => $this->name, 'method' => $method, 'params' => $data, 'id' => $id])); - $read = $client->recv(); - $client->close(); - return json_decode($read, true); + $config = $this->get_consul($this->name); + $transporter = Kiri::getDi()->get(RpcClientInterface::class); + return $transporter->withConfig($config)->sendRequest( + $this->requestBody([ + 'jsonrpc' => $version, + 'service' => $this->name, + 'method' => $method, + 'params' => $data, + 'id' => $id + ]) + ); } /** - * @param string $service * @param array $data * @return mixed + * @throws ClientExceptionInterface * @throws Exception */ - public function batch(string $service, array $data): mixed + public function batch(array $data): mixed { - $config = $this->get_consul($service); - if (Context::inCoroutine()) { - $client = $this->clientOnCoroutine($config); - } else { - $client = $this->clientNotCoroutine($config); - } - $client->send(json_encode($data, true)); - $read = $client->recv(); - $client->close(); - return json_decode($read, true); + $config = $this->get_consul($this->name); + $transporter = Kiri::getDi()->get(RpcClientInterface::class); + return $transporter->withConfig($config)->sendRequest( + $this->requestBody($data) + ); } diff --git a/src/JsonRpcPoolTransporter.php b/src/JsonRpcPoolTransporter.php new file mode 100644 index 0000000..322cd7c --- /dev/null +++ b/src/JsonRpcPoolTransporter.php @@ -0,0 +1,68 @@ +pool = Kiri::getDi()->get(ClientPool::class, [], $config); + $this->pool->initConnections(self::POOL_NAME, true, $config['max']); + } + + + /** + * @param RequestInterface $request + * @return ResponseInterface + * @throws Exception + */ + public function sendRequest(RequestInterface $request): ResponseInterface + { + $content = $request->getBody()->getContents(); + return (new Response())->withBody( + new Stream($this->request($this->getClient(), $content)) + ); + } + + + /** + * @return Client|\Swoole\Client + * @throws ConfigException + * @throws Exception + */ + private function getClient(): Client|\Swoole\Client + { + return $this->pool->get(self::POOL_NAME, function () { + return $this->newClient(); + }); + } + + +} diff --git a/src/JsonRpcTransporter.php b/src/JsonRpcTransporter.php new file mode 100644 index 0000000..7684e3e --- /dev/null +++ b/src/JsonRpcTransporter.php @@ -0,0 +1,36 @@ +getBody()->getContents(); + return (new Response())->withBody( + new Stream($this->request($this->newClient(), $content)) + ); + } + + +} diff --git a/src/RpcClientInterface.php b/src/RpcClientInterface.php new file mode 100644 index 0000000..34e7cba --- /dev/null +++ b/src/RpcClientInterface.php @@ -0,0 +1,14 @@ +container->mapping(RpcClientInterface::class, JsonRpcPoolTransporter::class); + } else { + $this->container->mapping(RpcClientInterface::class, JsonRpcTransporter::class); + } } diff --git a/src/TraitTransporter.php b/src/TraitTransporter.php new file mode 100644 index 0000000..32b1045 --- /dev/null +++ b/src/TraitTransporter.php @@ -0,0 +1,87 @@ +config = $config; + return $this; + } + + + /** + * @param Client|Coroutine\Client $client + * @param $content + * @return mixed + */ + private function request(Client|Coroutine\Client $client, $content): mixed + { + $client->send($content); + $read = $client->recv(); + $client->close(); + return $read; + } + + + /** + * @return Client|Coroutine\Client + * @throws Exception + */ + private function newClient(): Coroutine\Client|Client + { + if (Context::inCoroutine()) { + $client = $this->clientOnCoroutine($this->config); + } else { + $client = $this->clientNotCoroutine($this->config); + } + return $client; + } + + + /** + * @param $config + * @return Coroutine\Client + * @throws Exception + */ + private function clientOnCoroutine($config): Coroutine\Client + { + $client = new Coroutine\Client(SWOOLE_SOCK_TCP); + if (!$client->connect($config['ServiceAddress'], $config['ServicePort'], 60)) { + throw new Exception('connect fail.'); + } + return $client; + } + + + /** + * @param $config + * @return Client + * @throws Exception + */ + private function clientNotCoroutine($config): Client + { + $client = new Client(SWOOLE_SOCK_TCP); + if (!$client->connect($config['ServiceAddress'], $config['ServicePort'], 60)) { + throw new Exception('connect fail.'); + } + return $client; + } + + +}