From 219d246d4a41e30d7c9e1fb471f423ecff5e9de2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=91=E6=9E=97?= Date: Wed, 2 Mar 2022 15:15:36 +0800 Subject: [PATCH] modify plugin name --- JsonRpcConsumers.php | 29 ++++++++++++++++++---------- RpcManager.php | 45 +++++++++++++------------------------------- 2 files changed, 32 insertions(+), 42 deletions(-) diff --git a/JsonRpcConsumers.php b/JsonRpcConsumers.php index 6c06ad9..2c89e56 100644 --- a/JsonRpcConsumers.php +++ b/JsonRpcConsumers.php @@ -12,6 +12,7 @@ use Kiri\Pool\Pool; use Psr\Http\Client\ClientExceptionInterface; use Psr\Http\Message\ResponseInterface; use Psr\Http\Message\ServerRequestInterface; +use Kiri\Annotation\Inject; /** * @@ -26,6 +27,20 @@ abstract class JsonRpcConsumers implements OnRpcConsumerInterface public Pool $pool; + /** + * @var RpcManager + */ + #[Inject(RpcManager::class)] + public RpcManager $manager; + + + /** + * @var RpcClientInterface + */ + #[Inject(RpcClientInterface::class)] + public RpcClientInterface $client; + + protected string $name = ''; @@ -38,9 +53,7 @@ abstract class JsonRpcConsumers implements OnRpcConsumerInterface */ public function notify(string $method, mixed $data, string $version = '2.0'): void { - $config = $this->get_consul($this->name); - $transporter = Kiri::getDi()->get(RpcClientInterface::class); - $transporter->withConfig($config)->sendRequest( + $this->client->withConfig($this->get_consul($this->name))->sendRequest( $this->requestBody([ 'jsonrpc' => $version, 'service' => $this->name, @@ -75,9 +88,7 @@ abstract class JsonRpcConsumers implements OnRpcConsumerInterface { if (empty($id)) $id = Number::create(time()); - $config = $this->get_consul($this->name); - $transporter = Kiri::getDi()->get(RpcClientInterface::class); - return $transporter->withConfig($config)->sendRequest( + return $this->client->withConfig($this->get_consul($this->name))->sendRequest( $this->requestBody([ 'jsonrpc' => $version, 'service' => $this->name, @@ -97,9 +108,7 @@ abstract class JsonRpcConsumers implements OnRpcConsumerInterface */ public function batch(array $data): mixed { - $config = $this->get_consul($this->name); - $transporter = Kiri::getDi()->get(RpcClientInterface::class); - return $transporter->withConfig($config)->sendRequest( + return $this->client->withConfig($this->get_consul($this->name))->sendRequest( $this->requestBody($data) ); } @@ -116,7 +125,7 @@ abstract class JsonRpcConsumers implements OnRpcConsumerInterface if (empty($service)) { throw new RpcServiceException('You need set rpc service name if used.'); } - $sf = Kiri::getDi()->get(RpcManager::class)->getServices($service); + $sf = $this->manager->getServices($service); if (empty($sf) || !is_array($sf)) { throw new RpcServiceException('You need set rpc service name if used.'); } diff --git a/RpcManager.php b/RpcManager.php index 7267cef..a4f9e15 100644 --- a/RpcManager.php +++ b/RpcManager.php @@ -5,6 +5,7 @@ namespace Kiri\Rpc; use Exception; use Kiri; use Kiri\Abstracts\Component; +use Kiri\Annotation\Inject; use Kiri\Consul\Agent; use Kiri\Consul\Health; use Kiri\Message\Handler\Handler; @@ -20,27 +21,8 @@ class RpcManager extends Component private array $_rpc = []; - /** - * @param $serviceName - * @return void - * @throws Exception - */ - public function async($serviceName): void - { - $this->reRegister($serviceName); - - $lists = Kiri::getDi()->get(Health::class)->setQuery('passing=true')->service($serviceName); - if ($lists->getStatusCode() != 200) { - return; - } - $body = json_decode($lists->getBody(), true); - $file = storage('.rpc.clients.' . md5($serviceName), 'rpc'); - if (!empty($body) && is_array($body)) { - file_put_contents($file, json_encode(array_column($body, 'Service')), LOCK_EX); - } else { - file_put_contents($file, json_encode([]), LOCK_EX); - } - } + #[Inject(Health::class)] + public Health $health; /** @@ -71,7 +53,7 @@ class RpcManager extends Component { try { foreach ($this->_rpc as $name => $list) { - $this->async($name); + $this->reRegister($name); } } catch (\Throwable $throwable) { $this->logger->error(error_trigger_format($throwable)); @@ -81,20 +63,19 @@ class RpcManager extends Component /** * @param $serviceName - * @return array - * @throws Exception + * @return array|null */ - public function getServices($serviceName): array + public function getServices($serviceName): ?array { - $file = storage('.rpc.clients.' . md5($serviceName), 'rpc'); - if (!file_exists($file) || filesize($file) < 10) { - $this->async($serviceName); + $lists = $this->health->setQuery('passing=true')->service($serviceName); + if ($lists->getStatusCode() != 200) { + return null; } - $content = json_decode(file_get_contents($file), true); - if (empty($content) || !is_array($content)) { - return []; + $body = json_decode($lists->getBody(), true); + if (empty($body)) { + return null; } - return $content; + return array_column($body, 'Service'); }