From 19c9b692d43c0b87c220a8cd673d01066ea1a7eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=91=E6=9E=97?= Date: Thu, 2 Dec 2021 14:06:57 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/JsonRpcConsumers.php | 1 + src/Note/JsonRpc.php | 68 +++++++++++++++++++++++----------------- src/RpcJsonp.php | 61 ++++++++++++++++++++--------------- src/RpcManager.php | 36 +++++++++++++++------ 4 files changed, 102 insertions(+), 64 deletions(-) diff --git a/src/JsonRpcConsumers.php b/src/JsonRpcConsumers.php index 88767d6..50817ee 100644 --- a/src/JsonRpcConsumers.php +++ b/src/JsonRpcConsumers.php @@ -55,6 +55,7 @@ abstract class JsonRpcConsumers implements OnRpcConsumerInterface /** * @param array $data * @return ServerRequestInterface + * @throws \ReflectionException */ private function requestBody(array $data): ServerRequestInterface { diff --git a/src/Note/JsonRpc.php b/src/Note/JsonRpc.php index 14b5099..e7377c9 100644 --- a/src/Note/JsonRpc.php +++ b/src/Note/JsonRpc.php @@ -2,32 +2,31 @@ namespace Kiri\Rpc\Note; -use Kiri\Consul\Catalog\Catalog; -use Note\Attribute; use Kiri\Abstracts\Config; -use Kiri\Consul\Agent; +use Kiri\Core\Network; use Kiri\Exception\ConfigException; use Kiri\Kiri; use Kiri\Rpc\RpcManager; +use Note\Attribute; use ReflectionException; #[\Attribute(\Attribute::TARGET_CLASS)] class JsonRpc extends Attribute { + private string $uniqueId = ''; + + /** * @param string $service * @param string $driver + * @param array $tags + * @param array $meta * @param array $checkOptions */ - public function __construct(public string $service, public string $driver, public array $checkOptions = [ - "DeregisterCriticalServiceAfter" => "1m", - "Http" => "http://127.0.0.1:9527", - "Interval" => "1s", - "Timeout" => "1s" - ]) + public function __construct(public string $service, public string $driver, public array $tags = [], public array $meta = [], public array $checkOptions = []) { - + $this->uniqueId = preg_replace('/(\w{11})(\w{4})(\w{3})(\w{8})(\w{6})/', '$1-$2-$3-$4-$5', md5(__DIR__ . '.' . md5(Network::local()))); } @@ -40,13 +39,7 @@ use ReflectionException; */ public function execute(mixed $class, mixed $method = ''): bool { - $default = $this->create(); - $agent = Kiri::getDi()->get(Catalog::class); - $data = $agent->register($default); - if ($data->getStatusCode() != 200) { - exit($data->getBody()->getContents()); - } - return RpcManager::add($this->service, $class, $default['id']); + return Kiri::getDi()->get(RpcManager::class)->add($this->service, $class, $this->create()); } @@ -55,19 +48,36 @@ use ReflectionException; */ protected function create(): array { - $content = current(swoole_get_local_ip()); + $rpcPort = Config::get('rpc.port'); return [ - "id" => "rpc.json.{$this->service}." . md5(__DIR__ . '.' . md5($content)), - "name" => $this->service, - "address" => $content, - "port" => 9526, - "enableTagOverride" => true, - "check" => [ - "DeregisterCriticalServiceAfter" => "1m", - "TCP" => $content . ":" . Config::get('rpc.port'), - "Interval" => "1s", - "Timeout" => "1s" - ] + "ID" => "rpc.json.{$this->service}." . $this->uniqueId, + "Service" => $this->service, + "Address" => Network::local(), + "EnableTagOverride" => true, + "TaggedAddresses" => [ + "lan" => [ + "address" => "127.0.0.1", + "port" => $rpcPort + ], + "wan" => [ + "address" => Network::local(), + "port" => $rpcPort + ] + ], + "Meta" => $this->meta, + "Port" => $rpcPort, + "Check" => [ + "CheckId" => "service:rpc.json.{$this->service}." . $this->uniqueId, + "Name" => "service " . $this->service . ' health check', + "Notes" => "Script based health check", + "ServiceID" => $this->service, + "Definition" => [ + "TCP" => Network::local() . ":" . Config::get('rpc.port'), + "Interval" => "5s", + "Timeout" => "1s", + "DeregisterCriticalServiceAfter" => "30s" + ] + ], ]; } diff --git a/src/RpcJsonp.php b/src/RpcJsonp.php index ec4c650..4574df2 100644 --- a/src/RpcJsonp.php +++ b/src/RpcJsonp.php @@ -6,12 +6,9 @@ use Http\Constrict\RequestInterface; use Http\Handler\Router; use Http\Message\ServerRequest; use Kiri\Abstracts\Component; -use Kiri\Abstracts\Config; use Kiri\Consul\Agent; -use Kiri\Consul\Catalog\Catalog; use Kiri\Context; use Kiri\Events\EventProvider; -use Kiri\Exception\ConfigException; use Kiri\Kiri; use Note\Inject; use Note\Note; @@ -19,14 +16,18 @@ use Psr\Container\ContainerExceptionInterface; use Psr\Container\ContainerInterface; use Psr\Container\NotFoundExceptionInterface; use Psr\Http\Message\ServerRequestInterface; +use ReflectionException; use Server\Contract\OnCloseInterface; use Server\Contract\OnConnectInterface; use Server\Contract\OnReceiveInterface; use Server\Events\OnBeforeShutdown; +use Server\Events\OnServerBeforeStart; +use Server\Events\OnTaskerStart; use Server\Events\OnWorkerStart; use Swoole\Coroutine; use Swoole\Coroutine\Channel; use Swoole\Server; +use Swoole\Timer; /** @@ -51,6 +52,10 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa #[Inject(ContainerInterface::class)] public ContainerInterface $container; + + + private RpcManager $manager; + /** * * @throws \Exception @@ -61,7 +66,11 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa scan_directory(APP_PATH . 'rpc', 'Rpc'); - $this->eventProvider->on(OnWorkerStart::class, [$this, 'register']); + $this->eventProvider->on(OnWorkerStart::class, [$this, 'consulWatches']); + $this->eventProvider->on(OnTaskerStart::class, [$this, 'consulWatches']); + $this->eventProvider->on(OnServerBeforeStart::class, [$this, 'register']); + + $this->manager = Kiri::getDi()->get(RpcManager::class); } @@ -72,38 +81,40 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa */ public function onBeforeShutdown(OnBeforeShutdown $beforeShutdown) { - $doneList = RpcManager::doneList(); + $doneList = $this->manager->doneList(); $agent = $this->container->get(Agent::class); foreach ($doneList as $value) { - $agent->service->deregister($value); + $agent->service->deregister($value['config']['ID']); + $agent->checks->deregister($value['config']['Check']['CheckId']); } } /** - * @param OnWorkerStart $server - * @throws ConfigException - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface + * @param OnWorkerStart|OnTaskerStart $server */ - public function register(OnWorkerStart $server) + public function consulWatches(OnWorkerStart|OnTaskerStart $server) { - if ($server->workerId != 0) { - return; - } + Timer::tick(1000, static function () { + $lists = Kiri::getDi()->get(RpcManager::class)->doneList(); + $health = Kiri::getDi()->get(Agent::class)->checks; + foreach ($lists as $list) { - $config = Config::get('rpc'); + $health->checks(); - $catalog = $this->container->get(Catalog::class); - $catalog->register([ - ]); + } + }); + } - $agent = $this->container->get(Agent::class); - $data = $agent->service->register($config['registry']['config']); - if ($data->getStatusCode() != 200) { - $server->server->shutdown(); - } + + /** + * @param OnServerBeforeStart $server + * @throws ReflectionException + */ + public function register(OnServerBeforeStart $server) + { + $this->manager->register(); } @@ -193,7 +204,7 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa private function dispatch($data): array { try { - [$handler, $params, $_] = RpcManager::get($data['service'], $data['method']); + [$handler, $params, $_] = $this->container->get(RpcManager::class)->get($data['service'], $data['method']); if (is_null($handler)) { throw new \Exception('Method not found', -32601); } else { @@ -223,7 +234,7 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa * @param array $handler * @param $request * @return array - * @throws \ReflectionException + * @throws ReflectionException */ private function handler(array $handler, $request): array { diff --git a/src/RpcManager.php b/src/RpcManager.php index 2383fc3..17df993 100644 --- a/src/RpcManager.php +++ b/src/RpcManager.php @@ -2,6 +2,7 @@ namespace Kiri\Rpc; +use Kiri\Consul\Agent; use Kiri\Kiri; use ReflectionException; @@ -12,27 +13,27 @@ class RpcManager /** * @var array */ - private static array $_rpc = []; + private array $_rpc = []; /** * @param string $name * @param string $class - * @param string $serviceId + * @param array $serviceConfig * @return bool * @throws ReflectionException */ - public static function add(string $name, string $class, string $serviceId): bool + public function add(string $name, string $class, array $serviceConfig): bool { $methods = Kiri::getDi()->getReflect($class); $lists = $methods->getMethods(\ReflectionMethod::IS_PUBLIC); - if (!isset(static::$_rpc[$name])) static::$_rpc[$name] = ['methods' => [], 'id' => $serviceId]; + if (!isset($this->_rpc[$name])) $this->_rpc[$name] = ['methods' => [], 'id' => $serviceConfig['id'], 'config' => $serviceConfig]; foreach ($lists as $reflection) { $methodName = $reflection->getName(); - static::$_rpc[$name]['methods'][$methodName] = [[$class, $methodName], null]; + $this->_rpc[$name]['methods'][$methodName] = [[$class, $methodName], null]; } return true; } @@ -41,24 +42,39 @@ class RpcManager /** * @return array */ - public static function doneList(): array + public function doneList(): array { $array = []; - foreach (static::$_rpc as $list) { - $array[] = $list['id']; + foreach ($this->_rpc as $list) { + $array[] = $list; } return $array; } + /** + * @throws ReflectionException + */ + public function register() + { + $agent = Kiri::getDi()->get(Agent::class); + foreach ($this->_rpc as $list) { + $data = $agent->service->register($list['config']); + if ($data->getStatusCode() != 200) { + exit($data->getBody()->getContents()); + } + } + } + + /** * @param string $name * @param string $method * @return mixed */ - public static function get(string $name, string $method): array + public function get(string $name, string $method): array { - return static::$_rpc[$name]['methods'][$method] ?? [null, null]; + return $this->_rpc[$name]['methods'][$method] ?? [null, null]; } }