Compare commits
46 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 845b8b36d1 | |||
| 07436f7035 | |||
| 4d93fcaff2 | |||
| 2e01b60e1e | |||
| 8d2ce1d45c | |||
| 8a3ed5aea5 | |||
| d50b6e6ec7 | |||
| 8f3bea7738 | |||
| eec4ad8a8b | |||
| 18d2ce66e6 | |||
| a66de3f016 | |||
| 6fcd26f80f | |||
| 3538393620 | |||
| 9a6575b438 | |||
| 7293e24f9d | |||
| 646f6b1258 | |||
| 1189ab7b0c | |||
| 7c36c32c65 | |||
| f2d60f6982 | |||
| 6d242d42b0 | |||
| 5991ea9d3f | |||
| 9af46754e7 | |||
| 23206bc683 | |||
| 439e3c566e | |||
| 14d395be5e | |||
| a8a202ad6b | |||
| a8d3d1472f | |||
| f374fad591 | |||
| 4e2c4633c1 | |||
| 31a086e338 | |||
| 87d0d28847 | |||
| c41bdd9c8e | |||
| e6ae1acd14 | |||
| d2ec972c69 | |||
| ce3647b5b7 | |||
| 19c9b692d4 | |||
| 7fef63fa39 | |||
| b279e56efd | |||
| 92a1e68bec | |||
| e8b9387252 | |||
| 46e6e2b2c2 | |||
| aa94fea000 | |||
| c6973dbe83 | |||
| 62e0eabe4f | |||
| adaa4ba14d | |||
| ddb55ee5a0 |
@@ -11,7 +11,6 @@
|
||||
"require": {
|
||||
"php": ">=8.0",
|
||||
"ext-json": "*",
|
||||
"game-worker/kiri-consul": "^v1.0",
|
||||
"psr/http-client": "^1.0",
|
||||
"psr/http-message": "^1.0"
|
||||
},
|
||||
|
||||
@@ -1,74 +0,0 @@
|
||||
<?php
|
||||
|
||||
namespace Kiri\Rpc\Annotation;
|
||||
|
||||
use Annotation\Attribute;
|
||||
use Kiri\Abstracts\Config;
|
||||
use Kiri\Consul\Agent;
|
||||
use Kiri\Exception\ConfigException;
|
||||
use Kiri\Kiri;
|
||||
use Kiri\Rpc\RpcManager;
|
||||
use ReflectionException;
|
||||
|
||||
#[\Attribute(\Attribute::TARGET_CLASS)] class JsonRpc extends Attribute
|
||||
{
|
||||
|
||||
|
||||
/**
|
||||
* @param string $service
|
||||
* @param string $driver
|
||||
* @param array $checkOptions
|
||||
*/
|
||||
public function __construct(public string $service, public string $driver, public array $checkOptions = [
|
||||
"DeregisterCriticalServiceAfter" => "1m",
|
||||
"Http" => "http://127.0.0.1:9527",
|
||||
"Interval" => "1s",
|
||||
"Timeout" => "1s"
|
||||
])
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param mixed $class
|
||||
* @param mixed|string $method
|
||||
* @return mixed
|
||||
* @throws ReflectionException
|
||||
* @throws ConfigException
|
||||
*/
|
||||
public function execute(mixed $class, mixed $method = ''): bool
|
||||
{
|
||||
$default = $this->create();
|
||||
$agent = Kiri::getDi()->get(Agent::class);
|
||||
$data = $agent->service->register($default);
|
||||
if ($data->getStatusCode() != 200) {
|
||||
exit($data->getBody()->getContents());
|
||||
}
|
||||
return RpcManager::add($this->service, $class, $default['id']);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @throws ConfigException
|
||||
*/
|
||||
protected function create(): array
|
||||
{
|
||||
$content = current(swoole_get_local_ip());
|
||||
return [
|
||||
"id" => "rpc.json.{$this->service}." . md5(__DIR__ . '.' . md5($content)),
|
||||
"name" => $this->service,
|
||||
"address" => $content,
|
||||
"port" => 9526,
|
||||
"enableTagOverride" => true,
|
||||
"check" => [
|
||||
"DeregisterCriticalServiceAfter" => "1m",
|
||||
"TCP" => $content . ":" . Config::get('rpc.port'),
|
||||
"Interval" => "1s",
|
||||
"Timeout" => "1s"
|
||||
]
|
||||
];
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
+1
-1
@@ -41,7 +41,7 @@ class ClientPool extends Component
|
||||
*/
|
||||
public function get($config, callable $callback): mixed
|
||||
{
|
||||
$coroutineName = $this->name(self::POOL_NAME . '::' . $config['ServiceAddress'] . '::' . $config['ServicePort'], true);
|
||||
$coroutineName = $this->name(self::POOL_NAME . '::' . $config['Address'] . '::' . $config['Port'], true);
|
||||
|
||||
$pool = $config['pool'] ?? ['min' => 1, 'max' => 100];
|
||||
|
||||
|
||||
+24
-15
@@ -6,7 +6,6 @@ namespace Kiri\Rpc;
|
||||
use Exception;
|
||||
use Http\Message\ServerRequest;
|
||||
use Http\Message\Stream;
|
||||
use Kiri\Consul\Agent;
|
||||
use Kiri\Core\Number;
|
||||
use Kiri\Kiri;
|
||||
use Kiri\Pool\Pool;
|
||||
@@ -55,6 +54,7 @@ abstract class JsonRpcConsumers implements OnRpcConsumerInterface
|
||||
/**
|
||||
* @param array $data
|
||||
* @return ServerRequestInterface
|
||||
* @throws \ReflectionException
|
||||
*/
|
||||
private function requestBody(array $data): ServerRequestInterface
|
||||
{
|
||||
@@ -109,32 +109,41 @@ abstract class JsonRpcConsumers implements OnRpcConsumerInterface
|
||||
/**
|
||||
* @param $service
|
||||
* @return array
|
||||
* @throws Exception
|
||||
* @throws RpcServiceException|\ReflectionException
|
||||
*/
|
||||
private function get_consul($service): array
|
||||
{
|
||||
if (empty($service)) {
|
||||
throw new Exception('You need set rpc service name if used.');
|
||||
throw new RpcServiceException('You need set rpc service name if used.');
|
||||
}
|
||||
$sf = Kiri::getDi()->get(Agent::class);
|
||||
$sf = Kiri::getDi()->get(RpcManager::class)->getServices($service);
|
||||
if (empty($sf) || !is_array($sf)) {
|
||||
throw new RpcServiceException('You need set rpc service name if used.');
|
||||
}
|
||||
return $this->_loadRand($sf);
|
||||
}
|
||||
|
||||
$response = $sf->service->setQuery('filter=Service == ' . $service)->list();
|
||||
if ($response->getStatusCode() != 200 || $response->getBody()->getSize() <= 2) {
|
||||
throw new Exception('No microservices found [' . $service . '].');
|
||||
}
|
||||
|
||||
/**
|
||||
* @param $services
|
||||
* @return array
|
||||
*/
|
||||
private function _loadRand($services): array
|
||||
{
|
||||
$array = [];
|
||||
|
||||
$content = json_decode($response->getBody()->getContents(), true);
|
||||
foreach ($content as $value) {
|
||||
$array[] = ['id' => $value['ID'], 'Weights' => $value['Weights']['Passing']];
|
||||
foreach ($services as $value) {
|
||||
$value['Weight'] = $value['Weights']['Passing'];
|
||||
$array[] = $value;
|
||||
}
|
||||
|
||||
if (count($array) < 2) {
|
||||
$luck = $array[0];
|
||||
} else {
|
||||
$luck = Luckdraw::luck($array, 'Weights');
|
||||
$luck = Luckdraw::luck($array, 'Weight');
|
||||
}
|
||||
return ['Address' => $luck['Address'], 'Port' => $luck['Port']];
|
||||
return [
|
||||
'Address' => $luck['TaggedAddresses']['wan_ipv4']['Address'],
|
||||
'Port' => $luck['TaggedAddresses']['wan_ipv4']['Port']
|
||||
];
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
namespace Kiri\Rpc;
|
||||
|
||||
use Annotation\Inject;
|
||||
use Note\Inject;
|
||||
use Exception;
|
||||
use Http\Message\Response;
|
||||
use Http\Message\Stream;
|
||||
@@ -45,7 +45,7 @@ class JsonRpcPoolTransporter implements ClientInterface
|
||||
|
||||
$response = $this->request($client = $this->getClient(), $content, false);
|
||||
|
||||
$this->pool->push($client, $this->config['ServiceAddress'], $this->config['ServicePort']);
|
||||
$this->pool->push($client, $this->config['Address'], $this->config['Port']);
|
||||
|
||||
return (new Response())->withBody(new Stream($response));
|
||||
}
|
||||
|
||||
@@ -0,0 +1,87 @@
|
||||
<?php
|
||||
|
||||
namespace Kiri\Rpc\Note;
|
||||
|
||||
use Kiri\Abstracts\Config;
|
||||
use Kiri\Core\Network;
|
||||
use Kiri\Exception\ConfigException;
|
||||
use Kiri\Kiri;
|
||||
use Kiri\Rpc\RpcManager;
|
||||
use Note\Attribute;
|
||||
use ReflectionException;
|
||||
|
||||
#[\Attribute(\Attribute::TARGET_CLASS)] class JsonRpc extends Attribute
|
||||
{
|
||||
|
||||
|
||||
private string $uniqueId = '';
|
||||
|
||||
|
||||
/**
|
||||
* @param string $service
|
||||
* @param string $driver
|
||||
* @param array $tags
|
||||
* @param array $meta
|
||||
* @param array $checkOptions
|
||||
*/
|
||||
public function __construct(public string $service, public string $driver, public array $tags = [], public array $meta = [], public array $checkOptions = [])
|
||||
{
|
||||
$this->uniqueId = preg_replace('/(\w{11})(\w{4})(\w{3})(\w{8})(\w{6})/', '$1-$2-$3-$4-$5', md5(__DIR__ . '.' . md5(Network::local())));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param mixed $class
|
||||
* @param mixed|string $method
|
||||
* @return mixed
|
||||
* @throws ReflectionException
|
||||
* @throws ConfigException
|
||||
*/
|
||||
public function execute(mixed $class, mixed $method = ''): bool
|
||||
{
|
||||
return Kiri::getDi()->get(RpcManager::class)->add($this->service, $class, $this->create());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @throws ConfigException
|
||||
*/
|
||||
protected function create(): array
|
||||
{
|
||||
$rpcPort = Config::get('rpc.port');
|
||||
$defaultConfig = [
|
||||
"ID" => "rpc.json.{$this->service}." . $this->uniqueId,
|
||||
"Name" => $this->service,
|
||||
"EnableTagOverride" => false,
|
||||
"TaggedAddresses" => [
|
||||
"lan_ipv4" => [
|
||||
"address" => "127.0.0.1",
|
||||
"port" => $rpcPort
|
||||
],
|
||||
"wan_ipv4" => [
|
||||
"address" => Network::local(),
|
||||
"port" => $rpcPort
|
||||
]
|
||||
],
|
||||
"Check" => [
|
||||
"CheckId" => "service:rpc.json.{$this->service}." . $this->uniqueId,
|
||||
"Name" => "service " . $this->service . ' health check',
|
||||
"Notes" => "Script based health check",
|
||||
"ServiceID" => $this->service,
|
||||
"TCP" => Network::local() . ":" . Config::get('rpc.port'),
|
||||
"Interval" => "5s",
|
||||
"Timeout" => "1s",
|
||||
"DeregisterCriticalServiceAfter" => "30s"
|
||||
],
|
||||
];
|
||||
if (!empty($this->meta)) {
|
||||
$defaultConfig["Meta"] = $this->meta;
|
||||
}
|
||||
if (!empty($this->tags)) {
|
||||
$defaultConfig["tags"] = $this->tags;
|
||||
}
|
||||
return $defaultConfig;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
+68
-32
@@ -2,26 +2,34 @@
|
||||
|
||||
namespace Kiri\Rpc;
|
||||
|
||||
use Annotation\Annotation;
|
||||
use Annotation\Inject;
|
||||
use Http\Constrict\RequestInterface;
|
||||
use Http\Handler\Handler;
|
||||
use Http\Handler\Router;
|
||||
use Http\Message\ServerRequest;
|
||||
use Kiri\Abstracts\Component;
|
||||
use Kiri\Abstracts\Config;
|
||||
use Kiri\Consul\Agent;
|
||||
use Psr\Container\ContainerExceptionInterface;
|
||||
use Psr\Container\ContainerInterface;
|
||||
use Kiri\Context;
|
||||
use Kiri\Events\EventProvider;
|
||||
use Kiri\Exception\ConfigException;
|
||||
use Kiri\Kiri;
|
||||
use Note\Inject;
|
||||
use Note\Note;
|
||||
use Psr\Container\ContainerExceptionInterface;
|
||||
use Psr\Container\NotFoundExceptionInterface;
|
||||
use Server\Events\OnBeforeShutdown;
|
||||
use Server\Events\OnStart;
|
||||
use Psr\Http\Message\ServerRequestInterface;
|
||||
use ReflectionException;
|
||||
use Server\Contract\OnCloseInterface;
|
||||
use Server\Contract\OnConnectInterface;
|
||||
use Server\Contract\OnReceiveInterface;
|
||||
use Server\Events\OnBeforeShutdown;
|
||||
use Server\Events\OnServerBeforeStart;
|
||||
use Server\Events\OnTaskerStart;
|
||||
use Server\Events\OnWorkerStart;
|
||||
use Swoole\Coroutine;
|
||||
use Swoole\Coroutine\Channel;
|
||||
use Swoole\Server;
|
||||
use Swoole\Timer;
|
||||
|
||||
|
||||
/**
|
||||
@@ -35,16 +43,15 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
|
||||
public Router $router;
|
||||
|
||||
|
||||
#[Inject(Annotation::class)]
|
||||
public Annotation $annotation;
|
||||
#[Inject(Note::class)]
|
||||
public Note $annotation;
|
||||
|
||||
|
||||
#[Inject(EventProvider::class)]
|
||||
public EventProvider $eventProvider;
|
||||
|
||||
|
||||
#[Inject(ContainerInterface::class)]
|
||||
public ContainerInterface $container;
|
||||
private RpcManager $manager;
|
||||
|
||||
/**
|
||||
*
|
||||
@@ -55,6 +62,11 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
|
||||
$this->eventProvider->on(OnBeforeShutdown::class, [$this, 'onBeforeShutdown']);
|
||||
|
||||
scan_directory(APP_PATH . 'rpc', 'Rpc');
|
||||
|
||||
$this->eventProvider->on(OnWorkerStart::class, [$this, 'consulWatches']);
|
||||
$this->eventProvider->on(OnServerBeforeStart::class, [$this, 'register']);
|
||||
|
||||
$this->manager = Kiri::getDi()->get(RpcManager::class);
|
||||
}
|
||||
|
||||
|
||||
@@ -65,29 +77,42 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
|
||||
*/
|
||||
public function onBeforeShutdown(OnBeforeShutdown $beforeShutdown)
|
||||
{
|
||||
$doneList = RpcManager::doneList();
|
||||
$doneList = $this->manager->doneList();
|
||||
$agent = $this->container->get(Agent::class);
|
||||
foreach ($doneList as $value) {
|
||||
$agent->service->deregister($value);
|
||||
$agent->service->deregister($value['config']['ID']);
|
||||
$agent->checks->deregister($value['config']['Check']['CheckId']);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param OnStart $server
|
||||
* @param OnWorkerStart|OnTaskerStart $server
|
||||
* @throws ConfigException
|
||||
* @throws ContainerExceptionInterface
|
||||
* @throws NotFoundExceptionInterface
|
||||
*/
|
||||
public function register(OnStart $server)
|
||||
public function consulWatches(OnWorkerStart|OnTaskerStart $server)
|
||||
{
|
||||
$config = Config::get('rpc');
|
||||
|
||||
$agent = $this->container->get(Agent::class);
|
||||
$data = $agent->service->register($config['registry']['config']);
|
||||
if ($data->getStatusCode() != 200) {
|
||||
$server->server->shutdown();
|
||||
if ($server->workerId != 0) {
|
||||
return;
|
||||
}
|
||||
$async_time = (int)Config::get('consul.async_time', 1000);
|
||||
Timer::tick($async_time, static function ($timeId) {
|
||||
if (env('state', 'start') == 'exit') {
|
||||
Timer::clear($timeId);
|
||||
return;
|
||||
}
|
||||
Kiri::getDi()->get(RpcManager::class)->tick();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param OnServerBeforeStart $server
|
||||
* @throws ReflectionException
|
||||
*/
|
||||
public function register(OnServerBeforeStart $server)
|
||||
{
|
||||
$this->manager->register();
|
||||
}
|
||||
|
||||
|
||||
@@ -177,11 +202,13 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
|
||||
private function dispatch($data): array
|
||||
{
|
||||
try {
|
||||
[$handler, $params, $_] = RpcManager::get($data['service'], $data['method']);
|
||||
[$handler, $params] = $this->container->get(RpcManager::class)->get($data['service'], $data['method']);
|
||||
if (is_null($handler)) {
|
||||
throw new \Exception('Method not found', -32601);
|
||||
} else {
|
||||
return $this->handler($handler, $data);
|
||||
Context::setContext(RequestInterface::class, $this->createServerRequest($params));
|
||||
|
||||
return $this->handler($handler);
|
||||
}
|
||||
} catch (\Throwable $throwable) {
|
||||
$code = $throwable->getCode() == 0 ? -32603 : $throwable->getCode();
|
||||
@@ -191,18 +218,27 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
|
||||
|
||||
|
||||
/**
|
||||
* @param array $handler
|
||||
* @param $data
|
||||
* @return array
|
||||
* @throws \ReflectionException
|
||||
* @param $params
|
||||
* @return ServerRequestInterface
|
||||
* @throws \Exception
|
||||
*/
|
||||
private function handler(array $handler, $data): array
|
||||
private function createServerRequest($params): ServerRequestInterface
|
||||
{
|
||||
$controller = Kiri::getDi()->get($handler[0]);
|
||||
return (new ServerRequest())->withParsedBody($params);
|
||||
}
|
||||
|
||||
$dispatcher = $controller->{$handler[1]}(...$data['params']);
|
||||
|
||||
return ['jsonrpc' => '2.0', 'result' => $dispatcher, 'id' => $data['id'] ?? null];
|
||||
/**
|
||||
* @param Handler $handler
|
||||
* @return array
|
||||
*/
|
||||
private function handler(Handler $handler): array
|
||||
{
|
||||
return [
|
||||
'jsonrpc' => '2.0',
|
||||
'result' => call_user_func($handler->callback, ...$handler->params),
|
||||
'id' => $data['id'] ?? null
|
||||
];
|
||||
}
|
||||
|
||||
|
||||
|
||||
+94
-12
@@ -2,56 +2,138 @@
|
||||
|
||||
namespace Kiri\Rpc;
|
||||
|
||||
use Http\Handler\Handler;
|
||||
use Kiri\Abstracts\Component;
|
||||
use Kiri\Consul\Agent;
|
||||
use Kiri\Consul\Health;
|
||||
use Kiri\Kiri;
|
||||
use ReflectionException;
|
||||
|
||||
class RpcManager
|
||||
class RpcManager extends Component
|
||||
{
|
||||
|
||||
private static array $_rpc = [];
|
||||
|
||||
/**
|
||||
* @var array
|
||||
*/
|
||||
private array $_rpc = [];
|
||||
|
||||
|
||||
/**
|
||||
* @param $serviceName
|
||||
* @return void
|
||||
* @throws ReflectionException
|
||||
* @throws \Exception
|
||||
*/
|
||||
public function async($serviceName): void
|
||||
{
|
||||
$lists = Kiri::getDi()->get(Health::class)->setQuery('passing=true')->service($serviceName);
|
||||
if ($lists->getStatusCode() != 200) {
|
||||
return;
|
||||
}
|
||||
$body = json_decode($lists->getBody(), true);
|
||||
$file = storage('.rpc.clients.' . md5($serviceName), 'rpc');
|
||||
if (!empty($body) && is_array($body)) {
|
||||
file_put_contents($file, json_encode(array_column($body, 'Service')), LOCK_EX);
|
||||
} else {
|
||||
file_put_contents($file, json_encode([]), LOCK_EX);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @throws ReflectionException
|
||||
*/
|
||||
public function tick(): void
|
||||
{
|
||||
foreach ($this->_rpc as $name => $list) {
|
||||
$this->async($name);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param $serviceName
|
||||
* @return array
|
||||
* @throws \Exception
|
||||
*/
|
||||
public function getServices($serviceName): array
|
||||
{
|
||||
$file = storage('.rpc.clients.' . md5($serviceName), 'rpc');
|
||||
if (!file_exists($file) || filesize($file) < 10) {
|
||||
$this->async($serviceName);
|
||||
}
|
||||
$content = json_decode(file_get_contents($file), true);
|
||||
if (empty($content) || !is_array($content)) {
|
||||
return [];
|
||||
}
|
||||
return $content;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param string $name
|
||||
* @param string $class
|
||||
* @param string $serviceId
|
||||
* @param array $serviceConfig
|
||||
* @return bool
|
||||
* @throws ReflectionException
|
||||
*/
|
||||
public static function add(string $name, string $class, string $serviceId): bool
|
||||
public function add(string $name, string $class, array $serviceConfig): bool
|
||||
{
|
||||
$methods = Kiri::getDi()->getReflect($class);
|
||||
$lists = $methods->getMethods(\ReflectionMethod::IS_PUBLIC);
|
||||
|
||||
if (!isset(static::$_rpc[$name])) static::$_rpc[$name] = ['methods' => [], 'id' => $serviceId];
|
||||
if (!isset($this->_rpc[$name])) {
|
||||
$this->_rpc[$name] = ['methods' => [], 'id' => $serviceConfig['ID'], 'config' => $serviceConfig];
|
||||
}
|
||||
|
||||
foreach ($lists as $reflection) {
|
||||
if ($reflection->getDeclaringClass() != $class) {
|
||||
continue;
|
||||
}
|
||||
$methodName = $reflection->getName();
|
||||
|
||||
static::$_rpc[$name]['methods'][$methodName] = [[$class, $methodName], null];
|
||||
$this->_rpc[$name]['methods'][$methodName] = [new Handler('/', [$class, $methodName]), null];
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
public static function doneList(): array
|
||||
/**
|
||||
* @return array
|
||||
*/
|
||||
public function doneList(): array
|
||||
{
|
||||
$array = [];
|
||||
foreach (static::$_rpc as $list) {
|
||||
$array[] = $list['id'];
|
||||
foreach ($this->_rpc as $list) {
|
||||
$array[] = $list;
|
||||
}
|
||||
return $array;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @throws ReflectionException
|
||||
*/
|
||||
public function register()
|
||||
{
|
||||
$agent = Kiri::getDi()->get(Agent::class);
|
||||
foreach ($this->_rpc as $list) {
|
||||
$data = $agent->service->register($list['config']);
|
||||
if ($data->getStatusCode() != 200) {
|
||||
exit($data->getBody());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param string $name
|
||||
* @param string $method
|
||||
* @return mixed
|
||||
*/
|
||||
public static function get(string $name, string $method): array
|
||||
public function get(string $name, string $method): array
|
||||
{
|
||||
return static::$_rpc[$name]['methods'][$method] ?? [null, null];
|
||||
return $this->_rpc[$name]['methods'][$method] ?? [null, null];
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,8 @@
|
||||
<?php
|
||||
|
||||
namespace Kiri\Rpc;
|
||||
|
||||
class RpcServiceException extends \Exception
|
||||
{
|
||||
|
||||
}
|
||||
+14
-26
@@ -14,6 +14,9 @@ trait TraitTransporter
|
||||
protected array $config;
|
||||
|
||||
|
||||
protected array $clients = [];
|
||||
|
||||
|
||||
/**
|
||||
* @param $config
|
||||
* @return $this
|
||||
@@ -48,24 +51,14 @@ trait TraitTransporter
|
||||
*/
|
||||
private function newClient(): Coroutine\Client|Client
|
||||
{
|
||||
if (Context::inCoroutine()) {
|
||||
$client = $this->clientOnCoroutine($this->config);
|
||||
} else {
|
||||
$client = $this->clientNotCoroutine($this->config);
|
||||
$alias = $this->alias($this->config);
|
||||
$client = $this->clients[$alias] ?? null;
|
||||
if (is_null($client)) {
|
||||
$client = Context::inCoroutine() ? new Coroutine\Client(SWOOLE_SOCK_TCP) : new Client(SWOOLE_SOCK_TCP);
|
||||
$this->clients[$alias] = $client;
|
||||
}
|
||||
return $client;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param $config
|
||||
* @return Coroutine\Client
|
||||
* @throws Exception
|
||||
*/
|
||||
private function clientOnCoroutine($config): Coroutine\Client
|
||||
{
|
||||
$client = new Coroutine\Client(SWOOLE_SOCK_TCP);
|
||||
if (!$client->connect($config['ServiceAddress'], $config['ServicePort'], 60)) {
|
||||
[$host, $port] = [$this->config['Address'], $this->config['Port']];
|
||||
if (!$client->isConnected() && !$client->connect($host, $port, 60)) {
|
||||
throw new Exception('connect fail.');
|
||||
}
|
||||
return $client;
|
||||
@@ -73,17 +66,12 @@ trait TraitTransporter
|
||||
|
||||
|
||||
/**
|
||||
* @param $config
|
||||
* @return Client
|
||||
* @throws Exception
|
||||
* @param array $config
|
||||
* @return string
|
||||
*/
|
||||
private function clientNotCoroutine($config): Client
|
||||
private function alias(array $config): string
|
||||
{
|
||||
$client = new Client(SWOOLE_SOCK_TCP);
|
||||
if (!$client->connect($config['ServiceAddress'], $config['ServicePort'], 60)) {
|
||||
throw new Exception('connect fail.');
|
||||
}
|
||||
return $client;
|
||||
return $config['Address'] . '::' . $config['Port'];
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -29,5 +29,49 @@ return [
|
||||
'port' => ''
|
||||
]
|
||||
]
|
||||
],
|
||||
|
||||
|
||||
'service' => [
|
||||
[
|
||||
"datacenter" => "dc1",
|
||||
"id" => "40e4a748-2192-161a-0510-9bf59fe950b5",
|
||||
"node" => "FriendRpcService",
|
||||
"skipNodeUpdate" => false,
|
||||
"service" => [
|
||||
"id" => "redis1",
|
||||
"service" => "FriendRpcService",
|
||||
"address" => "172.26.221.211",
|
||||
"taggedAddresses" => [
|
||||
"lan" => [
|
||||
"address" => "127.0.0.1",
|
||||
"port" => 8000
|
||||
],
|
||||
"wan" => [
|
||||
"address" => "172.26.221.211",
|
||||
"port" => 80
|
||||
]
|
||||
],
|
||||
"meta" => [
|
||||
"redis_version" => "4.0"
|
||||
],
|
||||
"port" => 8000
|
||||
],
|
||||
"check" => [
|
||||
"node" => "t2.320",
|
||||
"checkId" => "service:redis1",
|
||||
"name" => "Redis health check",
|
||||
"notes" => "Script based health check",
|
||||
"status" => "passing",
|
||||
"serviceID" => "redis1",
|
||||
"definition" => [
|
||||
"http" => "172.26.221.211:9527",
|
||||
"interval" => "5s",
|
||||
"timeout" => "1s",
|
||||
"deregisterCriticalServiceAfter" => "30s"
|
||||
],
|
||||
],
|
||||
]
|
||||
|
||||
]
|
||||
];
|
||||
|
||||
Reference in New Issue
Block a user