From 439e3c566e81e1605876adeb214f0b8612f30b1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=91=E6=9E=97?= Date: Thu, 2 Dec 2021 15:25:41 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/JsonRpcConsumers.php | 38 ++++++++++++++++++------------ src/RpcJsonp.php | 14 +++-------- src/RpcManager.php | 46 ++++++++++++++++++++++++++++++++++++- src/RpcServiceException.php | 8 +++++++ 4 files changed, 79 insertions(+), 27 deletions(-) create mode 100644 src/RpcServiceException.php diff --git a/src/JsonRpcConsumers.php b/src/JsonRpcConsumers.php index 50817ee..a3719e0 100644 --- a/src/JsonRpcConsumers.php +++ b/src/JsonRpcConsumers.php @@ -6,7 +6,6 @@ namespace Kiri\Rpc; use Exception; use Http\Message\ServerRequest; use Http\Message\Stream; -use Kiri\Consul\Agent; use Kiri\Core\Number; use Kiri\Kiri; use Kiri\Pool\Pool; @@ -110,32 +109,41 @@ abstract class JsonRpcConsumers implements OnRpcConsumerInterface /** * @param $service * @return array - * @throws Exception + * @throws RpcServiceException */ private function get_consul($service): array { if (empty($service)) { - throw new Exception('You need set rpc service name if used.'); + throw new RpcServiceException('You need set rpc service name if used.'); } - $sf = Kiri::getDi()->get(Agent::class); + $sf = Kiri::getDi()->get(RpcManager::class)->getServices($service); + if (empty($sf) || !is_array($sf)) { + throw new RpcServiceException('You need set rpc service name if used.'); + } + return $this->_loadRand($sf); + } - $response = $sf->service->setQuery('filter=Service == ' . $service)->list(); - if ($response->getStatusCode() != 200 || $response->getBody()->getSize() <= 2) { - throw new Exception('No microservices found [' . $service . '].'); - } + + /** + * @param $services + * @return array + */ + private function _loadRand($services): array + { $array = []; - - $content = json_decode($response->getBody()->getContents(), true); - foreach ($content as $value) { - $array[] = ['id' => $value['ID'], 'Weights' => $value['Weights']['Passing']]; + foreach ($services as $value) { + $value['Weight'] = $value['Weights']['Passing']; + $array[] = $value; } - if (count($array) < 2) { $luck = $array[0]; } else { - $luck = Luckdraw::luck($array, 'Weights'); + $luck = Luckdraw::luck($array, 'Weight'); } - return ['Address' => $luck['Address'], 'Port' => $luck['Port']]; + return [ + 'Address' => $luck['TaggedAddresses']['wan_ipv4']['Address'], + 'Port' => $luck['TaggedAddresses']['wan_ipv4']['Port'] + ]; } } diff --git a/src/RpcJsonp.php b/src/RpcJsonp.php index 36b3bd6..3ae2c05 100644 --- a/src/RpcJsonp.php +++ b/src/RpcJsonp.php @@ -53,7 +53,6 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa public ContainerInterface $container; - private RpcManager $manager; /** @@ -95,16 +94,9 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa */ public function consulWatches(OnWorkerStart|OnTaskerStart $server) { -// Timer::tick(1000, static function () { -// $lists = Kiri::getDi()->get(RpcManager::class)->doneList(); -// $health = Kiri::getDi()->get(Agent::class)->checks; -// foreach ($lists as $list) { -// -// $health->checks(); -// -// -// } -// }); + Timer::tick(1000, static function () { + Kiri::getDi()->get(RpcManager::class)->tick(); + }); } diff --git a/src/RpcManager.php b/src/RpcManager.php index 3a81815..e3bb265 100644 --- a/src/RpcManager.php +++ b/src/RpcManager.php @@ -3,7 +3,7 @@ namespace Kiri\Rpc; use Kiri\Consul\Agent; -use Kiri\Core\Json; +use Kiri\Consul\Health; use Kiri\Kiri; use ReflectionException; @@ -17,6 +17,50 @@ class RpcManager private array $_rpc = []; + private array $_services = []; + + + /** + * @param $serviceName + * @return array + * @throws ReflectionException + */ + public function async($serviceName): array + { + $lists = Kiri::getDi()->get(Health::class)->setQuery('passing=true')->service($serviceName); + if ($lists->getStatusCode() != 200) { + return []; + } + var_dump($lists->getBody()); + $body = json_decode($lists->getBody(), true); + if (empty($body) || !is_array($body)) { + return $this->_services = []; + } + return $this->_services[$serviceName] = array_column($body, 'service'); + } + + + /** + * @throws ReflectionException + */ + public function tick(): void + { + foreach ($this->_rpc as $name => $list) { + $this->async($name); + } + } + + + /** + * @param $serviceName + * @return array + */ + public function getServices($serviceName): array + { + return $this->_services[$serviceName] ?? []; + } + + /** * @param string $name * @param string $class diff --git a/src/RpcServiceException.php b/src/RpcServiceException.php new file mode 100644 index 0000000..fecc542 --- /dev/null +++ b/src/RpcServiceException.php @@ -0,0 +1,8 @@ +