From e1fcefde2d0d53bd2d791ddefc52fba37b7d64a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=91=E6=9E=97?= Date: Thu, 28 Oct 2021 10:20:53 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Consumers.php | 91 +++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 84 insertions(+), 7 deletions(-) diff --git a/src/Consumers.php b/src/Consumers.php index fc87cea..5bbdb32 100644 --- a/src/Consumers.php +++ b/src/Consumers.php @@ -3,10 +3,13 @@ namespace Kiri\Rpc; +use Exception; +use Kiri\Consul\Catalog\Catalog; +use Kiri\Context; use Kiri\Kiri; use Kiri\Pool\Pool; -use SensioLabs\Consul\ServiceFactory; -use SensioLabs\Consul\Services\Agent; +use Swoole\Client; +use Swoole\Coroutine; /** * @@ -26,10 +29,18 @@ class Consumers implements OnRpcConsumerInterface * @param string $method * @param mixed $data * @param string $version + * @throws Exception */ public function notify(string $service, string $method, mixed $data, string $version = '2.0'): void { - + $config = $this->get_consul($service); + if (Context::inCoroutine()) { + $client = $this->clientOnCoroutine($config); + } else { + $client = $this->clientNotCoroutine($config); + } + $client->send(json_encode(['jsonrpc' => $version, 'method' => $method, 'params' => $data])); + $client->close(); } @@ -40,21 +51,87 @@ class Consumers implements OnRpcConsumerInterface * @param string $version * @param string $id * @return mixed + * @throws Exception */ public function get(string $service, string $method, mixed $data, string $version = '2.0', string $id = ''): mixed { - + $config = $this->get_consul($service); + if (Context::inCoroutine()) { + $client = $this->clientOnCoroutine($config); + } else { + $client = $this->clientNotCoroutine($config); + } + $client->send(json_encode(['jsonrpc' => $version, 'method' => $method, 'params' => $data, 'id' => $id])); + $read = $client->recv(); + $client->close(); + return json_decode($read, true); } - private function get_consul($service) + /** + * @param string $service + * @param array $data + * @return mixed + * @throws Exception + */ + public function batch(string $service, array $data): mixed { - $sf = Kiri::getDi()->get(\Kiri\Consul\Agent::class); + $config = $this->get_consul($service); + if (Context::inCoroutine()) { + $client = $this->clientOnCoroutine($config); + } else { + $client = $this->clientNotCoroutine($config); + } + $client->send(json_encode($data, true)); + $read = $client->recv(); + $client->close(); + return json_decode($read, true); + } - $content = $sf->service->service($service)->getBody()->getContents(); + + /** + * @param $service + * @return array + */ + private function get_consul($service): array + { + $sf = Kiri::getDi()->get(Catalog::class); + + $content = $sf->service($service)->getBody()->getContents(); $content = json_decode($content, true); + + return $content[array_rand($content)]; } + /** + * @param $config + * @return Coroutine\Client + * @throws Exception + */ + private function clientOnCoroutine($config): Coroutine\Client + { + $client = new Coroutine\Client(SWOOLE_SOCK_TCP); + if (!$client->connect($config['ServiceAddress'], $config['ServicePort'], 60)) { + throw new Exception('connect fail.'); + } + return $client; + } + + + /** + * @param $config + * @return Client + * @throws Exception + */ + private function clientNotCoroutine($config): Client + { + $client = new Client(SWOOLE_SOCK_TCP); + if (!$client->connect($config['ServiceAddress'], $config['ServicePort'], 60)) { + throw new Exception('connect fail.'); + } + return $client; + } + }