This commit is contained in:
2021-12-02 15:25:41 +08:00
parent 14d395be5e
commit 439e3c566e
4 changed files with 79 additions and 27 deletions
+23 -15
View File
@@ -6,7 +6,6 @@ namespace Kiri\Rpc;
use Exception;
use Http\Message\ServerRequest;
use Http\Message\Stream;
use Kiri\Consul\Agent;
use Kiri\Core\Number;
use Kiri\Kiri;
use Kiri\Pool\Pool;
@@ -110,32 +109,41 @@ abstract class JsonRpcConsumers implements OnRpcConsumerInterface
/**
* @param $service
* @return array
* @throws Exception
* @throws RpcServiceException
*/
private function get_consul($service): array
{
if (empty($service)) {
throw new Exception('You need set rpc service name if used.');
throw new RpcServiceException('You need set rpc service name if used.');
}
$sf = Kiri::getDi()->get(Agent::class);
$sf = Kiri::getDi()->get(RpcManager::class)->getServices($service);
if (empty($sf) || !is_array($sf)) {
throw new RpcServiceException('You need set rpc service name if used.');
}
return $this->_loadRand($sf);
}
$response = $sf->service->setQuery('filter=Service == ' . $service)->list();
if ($response->getStatusCode() != 200 || $response->getBody()->getSize() <= 2) {
throw new Exception('No microservices found [' . $service . '].');
}
/**
* @param $services
* @return array
*/
private function _loadRand($services): array
{
$array = [];
$content = json_decode($response->getBody()->getContents(), true);
foreach ($content as $value) {
$array[] = ['id' => $value['ID'], 'Weights' => $value['Weights']['Passing']];
foreach ($services as $value) {
$value['Weight'] = $value['Weights']['Passing'];
$array[] = $value;
}
if (count($array) < 2) {
$luck = $array[0];
} else {
$luck = Luckdraw::luck($array, 'Weights');
$luck = Luckdraw::luck($array, 'Weight');
}
return ['Address' => $luck['Address'], 'Port' => $luck['Port']];
return [
'Address' => $luck['TaggedAddresses']['wan_ipv4']['Address'],
'Port' => $luck['TaggedAddresses']['wan_ipv4']['Port']
];
}
}
+3 -11
View File
@@ -53,7 +53,6 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
public ContainerInterface $container;
private RpcManager $manager;
/**
@@ -95,16 +94,9 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
*/
public function consulWatches(OnWorkerStart|OnTaskerStart $server)
{
// Timer::tick(1000, static function () {
// $lists = Kiri::getDi()->get(RpcManager::class)->doneList();
// $health = Kiri::getDi()->get(Agent::class)->checks;
// foreach ($lists as $list) {
//
// $health->checks();
//
//
// }
// });
Timer::tick(1000, static function () {
Kiri::getDi()->get(RpcManager::class)->tick();
});
}
+45 -1
View File
@@ -3,7 +3,7 @@
namespace Kiri\Rpc;
use Kiri\Consul\Agent;
use Kiri\Core\Json;
use Kiri\Consul\Health;
use Kiri\Kiri;
use ReflectionException;
@@ -17,6 +17,50 @@ class RpcManager
private array $_rpc = [];
private array $_services = [];
/**
* @param $serviceName
* @return array
* @throws ReflectionException
*/
public function async($serviceName): array
{
$lists = Kiri::getDi()->get(Health::class)->setQuery('passing=true')->service($serviceName);
if ($lists->getStatusCode() != 200) {
return [];
}
var_dump($lists->getBody());
$body = json_decode($lists->getBody(), true);
if (empty($body) || !is_array($body)) {
return $this->_services = [];
}
return $this->_services[$serviceName] = array_column($body, 'service');
}
/**
* @throws ReflectionException
*/
public function tick(): void
{
foreach ($this->_rpc as $name => $list) {
$this->async($name);
}
}
/**
* @param $serviceName
* @return array
*/
public function getServices($serviceName): array
{
return $this->_services[$serviceName] ?? [];
}
/**
* @param string $name
* @param string $class
+8
View File
@@ -0,0 +1,8 @@
<?php
namespace Kiri\Rpc;
class RpcServiceException extends \Exception
{
}