diff --git a/.phpstorm.meta.php b/.phpstorm.meta.php index e1102b5..3872ac7 100644 --- a/.phpstorm.meta.php +++ b/.phpstorm.meta.php @@ -4,7 +4,7 @@ namespace PHPSTORM_META { // Reflect use Kiri\Di\Container; - use Psr\Container\ContainerInterface; + use Kiri\Di\ContainerInterface; override(ContainerInterface::get(0), map('@')); override(Container::get(0), map('@')); diff --git a/Consul.php b/Consul.php new file mode 100644 index 0000000..4aefa89 --- /dev/null +++ b/Consul.php @@ -0,0 +1,103 @@ +config = $settings; + } + + + /** + * @return void + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + */ + public function init(): void + { + $this->agent = $this->container->get(Agent::class); + } + + + /** + * @return void + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + */ + public function deregister() + { + if (env('environmental') != Kiri::WORKER && env('environmental_workerId') != 0) { + return; + } + + $agent = $this->container->get(Agent::class); + + $this->logger->debug("disconnect consul."); + + $agent->service->deregister($this->config['ID']); + $agent->checks->deregister($this->config['Check']['CheckId']); + } + + + /** + * @return void + */ + public function service_health(): void + { + $info = $this->agent->service->service_health($this->config['ID']); + if ($info->getStatusCode() == 200) { + return; + } + $this->agent->service->register($this->config); + } + + + public function watches() + { + + + } + + + /** + * @return void + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + */ + public function register(): void + { + $this->deregister(); + $data = $this->agent->service->register($this->config); + if ($data->getStatusCode() != 200) { + $this->logger->error($data->getBody()); + } + } + + +} diff --git a/RpcJsonp.php b/RpcJsonp.php index 0b1e949..e83b443 100644 --- a/RpcJsonp.php +++ b/RpcJsonp.php @@ -26,7 +26,7 @@ use Kiri\Server\Events\OnTaskerStart; use Kiri\Server\Events\OnWorkerExit; use Kiri\Server\Events\OnWorkerStart; use Psr\Container\ContainerExceptionInterface; -use Psr\Container\ContainerInterface; +use Kiri\Di\ContainerInterface; use Psr\Container\NotFoundExceptionInterface; use Psr\Http\Message\ServerRequestInterface; use ReflectionException; @@ -42,289 +42,255 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa { - private int $timerId = -1; + private array $consul = []; - /** - * @param ContainerInterface $container - * @param Router $router - * @param Annotation $annotation - * @param DataGrip $dataGrip - * @param RpcManager $manager - * @param RouterCollector $collector - * @param EventProvider $eventProvider - * @param array $config - * @throws Exception - */ - public function __construct(public ContainerInterface $container, - public Router $router, - public Annotation $annotation, - public DataGrip $dataGrip, - public RpcManager $manager, - public RouterCollector $collector, - public EventProvider $eventProvider, - array $config = []) - { - parent::__construct($config); - } + /** + * @param ContainerInterface $container + * @param Router $router + * @param Annotation $annotation + * @param DataGrip $dataGrip + * @param RpcManager $manager + * @param RouterCollector $collector + * @param EventProvider $eventProvider + * @param array $config + * @throws Exception + */ + public function __construct(public ContainerInterface $container, + public Router $router, + public Annotation $annotation, + public DataGrip $dataGrip, + public RpcManager $manager, + public RouterCollector $collector, + public EventProvider $eventProvider, + array $config = []) + { + parent::__construct($config); + } - /** - * @return void - * @throws ReflectionException - */ - public function init(): void - { - $this->eventProvider->on(OnBeforeShutdown::class, [$this, 'onBeforeShutdown']); - - scan_directory(APP_PATH . 'rpc', 'app\Rpc'); - - $this->eventProvider->on(OnWorkerStart::class, [$this, 'consulWatches']); - $this->eventProvider->on(OnWorkerExit::class, [$this, 'onWorkerExit']); - $this->eventProvider->on(OnServerBeforeStart::class, [$this, 'register']); - - $this->collector = $this->dataGrip->get('rpc'); - } + /** + * @return void + * @throws ReflectionException|ConfigException + */ + public function init(): void + { + $this->eventProvider->on(OnBeforeShutdown::class, [$this, 'onBeforeShutdown']); + scan_directory(APP_PATH . 'rpc', 'app\Rpc'); + $this->consul = Config::get('rpc.consul', null); + if (!empty($this->consul)) { + $this->eventProvider->on(OnServerBeforeStart::class, [$this, 'register']); + } + $this->collector = $this->dataGrip->get('rpc'); + } - /** - * @param OnBeforeShutdown $beforeShutdown - * @return void - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface|ConfigException - */ - public function onBeforeShutdown(OnBeforeShutdown $beforeShutdown): void - { - if (env('environmental_workerId') != 0) { + /** + * @param OnBeforeShutdown $beforeShutdown + * @return void + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + */ + public function onBeforeShutdown(OnBeforeShutdown $beforeShutdown): void + { + if (env('environmental') != Kiri::WORKER && env('environmental_workerId') != 0) { return; } - $agent = $this->container->get(Agent::class); - $value = Config::get("rpc.consul", []); - if (empty($value)) { - return; - } - + $agent = $this->container->get(Agent::class); + $this->logger->debug("disconnect consul."); - - $agent->service->deregister($value['ID']); - $agent->checks->deregister($value['Check']['CheckId']); - } + + $agent->service->deregister($this->consul['ID']); + $agent->checks->deregister($this->consul['Check']['CheckId']); + } - /** - * @param OnWorkerStart|OnTaskerStart $server - * @throws ConfigException - */ - public function consulWatches(OnWorkerStart|OnTaskerStart $server) - { - if ($server->workerId != 0) { - return; - } - $async_time = (int)Config::get('consul.async_time', 1000); - $this->timerId = Timer::tick($async_time, static function () { - Kiri::getDi()->get(RpcManager::class)->tick(); - }); - } - - - /** - * @param OnWorkerExit $exit - * @return void - */ - public function onWorkerExit(OnWorkerExit $exit): void - { - if ($this->timerId) { - Timer::clear($this->timerId); - } - } - - /** * @param OnServerBeforeStart $server * @throws ConfigException */ - public function register(OnServerBeforeStart $server) - { + public function register(OnServerBeforeStart $server) + { $consumers = Config::get("rpc.consumers", []); if (!empty($consumers)) { - $manager = Kiri::getDi()->get(RpcManager::class); foreach ($consumers as $service => $consumer) { - $manager->add($service, $consumer); + $this->manager->add($service, $consumer); } } - $this->manager->register(); - } + $this->manager->register($this->consul); + } - /** - * @param Server $server - * @param int $fd - */ - public function onConnect(Server $server, int $fd): void - { - // TODO: Implement onConnect() method. - } + /** + * @param Server $server + * @param int $fd + */ + public function onConnect(Server $server, int $fd): void + { + // TODO: Implement onConnect() method. + } - /** - * @param Server $server - * @param int $fd - * @param int $reactor_id - * @param string $data - */ - public function onReceive(Server $server, int $fd, int $reactor_id, string $data): void - { - $data = json_decode($data, true); - if (is_null($data)) { - $this->failure(-32700, 'Parse error语法解析错误'); - } else if (!isset($data['jsonrpc']) || !isset($data['method']) || $data['jsonrpc'] != '2.0') { - $this->failure(-32600, 'Invalid Request无效请求'); - } else { - $this->batchDispatch($server, $fd, $data); - } - } + /** + * @param Server $server + * @param int $fd + * @param int $reactor_id + * @param string $data + */ + public function onReceive(Server $server, int $fd, int $reactor_id, string $data): void + { + try { + $data = json_decode($data, true); + if (!is_array($data)) { + throw new Exception('Parse error语法解析错误', -32700); + } + if (!isset($data['jsonrpc']) || !isset($data['method']) || $data['jsonrpc'] != '2.0') { + throw new Exception('Invalid Request无效请求', -32600); + } + $server->send($fd, $this->batchDispatch($data)); + } catch (\Throwable $throwable) { + $this->logger->error('JsonRpc: ' . $throwable->getMessage()); + $server->send($fd, $this->failure(-32700, 'Parse error语法解析错误')); + } + } - /** - * @param Server $server - * @param int $fd - * @param array $data - * @return void - */ - private function batchDispatch(Server $server, int $fd, array $data): void - { - if (isset($data['jsonrpc'])) { - $dispatch = $this->dispatch($data); - if (!isset($data['id'])) { - $dispatch = [1]; - } - $result = json_encode($dispatch, JSON_UNESCAPED_UNICODE); - } else { - $channel = new Channel($total = count($data)); - foreach ($data as $datum) { - $this->_execute($channel, $datum); - } - $result = []; - for ($i = 0; $i < $total; $i++) { - $result[] = $channel->pop(); - } - $result = json_encode($result, JSON_UNESCAPED_UNICODE); - } - $server->send($fd, $result); - } + /** + * @param array $data + * @return string|bool + */ + private function batchDispatch(array $data): string|bool + { + if (isset($data['jsonrpc'])) { + $result = $this->dispatch($data); + if (!isset($data['id'])) { + $result = [1]; + } + } else { + $channel = new Channel($total = count($data)); + foreach ($data as $datum) { + $this->_execute($channel, $datum); + } + $result = []; + for ($i = 0; $i < $total; $i++) { + $result[] = $channel->pop(); + } + } + return json_encode($result, JSON_UNESCAPED_UNICODE); + } - /** - * @param $channel - * @param $datum - */ - private function _execute($channel, $datum) - { - Coroutine::create(function () use ($channel, $datum) { - if (empty($datum) || !isset($datum['jsonrpc'])) { - $channel->push($this->failure(-32700, 'Parse error语法解析错误')); - } else if (!isset($datum['method'])) { - $channel->push($this->failure(-32700, 'Parse error语法解析错误')); - } else { - $dispatch = $this->dispatch($datum); - if (!isset($dispatch['id'])) { - $dispatch = [1]; - } - $channel->push($dispatch); - } - }); - } + /** + * @param $channel + * @param $datum + */ + private function _execute($channel, $datum) + { + Coroutine::create(function () use ($channel, $datum) { + if (empty($datum) || !isset($datum['jsonrpc'])) { + $channel->push($this->failure(-32700, 'Parse error语法解析错误')); + } else if (!isset($datum['method'])) { + $channel->push($this->failure(-32700, 'Parse error语法解析错误')); + } else { + $dispatch = $this->dispatch($datum); + if (!isset($dispatch['id'])) { + $dispatch = [1]; + } + $channel->push($dispatch); + } + }); + } - /** - * @param $data - * @return array - */ - private function dispatch($data): array - { - try { - $handler = $this->collector->find($data['service'], 'GET'); - if (is_integer($handler) || is_null($handler)) { - throw new Exception('Handler not found', -32601); - } - $controller = $this->container->get($handler->callback[0]); - if (!method_exists($controller, $data['method'])) { - throw new Exception('Method not found', -32601); - } - $params = $this->container->getMethodParameters($controller::class, $data['method']); + /** + * @param $data + * @return array + */ + private function dispatch($data): array + { + try { + $handler = $this->collector->find($data['service'], 'GET'); + if (is_integer($handler) || is_null($handler)) { + throw new Exception('Handler not found', -32601); + } + $controller = $this->container->get($handler->callback[0]); + if (!method_exists($controller, $data['method'])) { + throw new Exception('Method not found', -32601); + } + $params = $this->container->getArgs($controller::class, $data['method']); - Context::setContext(RequestInterface::class, $this->createServerRequest($params)); + Context::setContext(RequestInterface::class, $this->createServerRequest($params)); - return $this->handler($controller, $data['method'], $params); - } catch (\Throwable $throwable) { - $code = $throwable->getCode() == 0 ? -32603 : $throwable->getCode(); - return $this->failure($code, jTraceEx($throwable), [], $data['id'] ?? null); - } - } + return $this->handler($controller, $data['method'], $params); + } catch (\Throwable $throwable) { + $code = $throwable->getCode() == 0 ? -32603 : $throwable->getCode(); + return $this->failure($code, jTraceEx($throwable), [], $data['id'] ?? null); + } + } - /** - * @param $params - * @return ServerRequestInterface - * @throws Exception - */ - private function createServerRequest($params): ServerRequestInterface - { - return (new ServerRequest())->withParsedBody($params); - } + /** + * @param $params + * @return ServerRequestInterface + * @throws Exception + */ + private function createServerRequest($params): ServerRequestInterface + { + return (new ServerRequest())->withParsedBody($params); + } - /** - * @param $controller - * @param string $method - * @param $params - * @return array - */ - #[ArrayShape([])] - private function handler($controller, string $method, $params): array - { - $result = call_user_func([$controller, $method], ...$params); - return [ - 'jsonrpc' => '2.0', - 'result' => $result, - 'id' => $data['id'] ?? null - ]; - } + /** + * @param $controller + * @param string $method + * @param $params + * @return array + */ + #[ArrayShape([])] + private function handler($controller, string $method, $params): array + { + $result = call_user_func([$controller, $method], ...$params); + return [ + 'jsonrpc' => '2.0', + 'result' => $result, + 'id' => $data['id'] ?? null + ]; + } - /** - * @param $code - * @param $message - * @param array $data - * @param null $id - * @return array - */ - #[ArrayShape([])] - protected function failure($code, $message, array $data = [], $id = null): array - { - $error = [ - 'jsonrpc' => '2.0', - 'error' => [ - 'code' => $code, - 'message' => $message, - 'data' => $data - ] - ]; - if (!is_null($id)) { - $error['id'] = $id; - } - return $error; - } + /** + * @param $code + * @param $message + * @param array $data + * @param null $id + * @return array + */ + #[ArrayShape([])] + protected function failure($code, $message, array $data = [], $id = null): array + { + $error = [ + 'jsonrpc' => '2.0', + 'error' => [ + 'code' => $code, + 'message' => $message, + 'data' => $data + ] + ]; + if (!is_null($id)) { + $error['id'] = $id; + } + return $error; + } - /** - * @param \Swoole\WebSocket\Server $server - * @param int $fd - * @return void - */ - public function onClose(\Swoole\WebSocket\Server $server, int $fd): void - { - // TODO: Implement onClose() method. - } + /** + * @param \Swoole\WebSocket\Server $server + * @param int $fd + * @return void + */ + public function onClose(\Swoole\WebSocket\Server $server, int $fd): void + { + // TODO: Implement onClose() method. + } } diff --git a/RpcManager.php b/RpcManager.php index 8cb8300..9a2a128 100644 --- a/RpcManager.php +++ b/RpcManager.php @@ -11,51 +11,20 @@ use Kiri\Consul\Agent; use Kiri\Consul\Health; use Kiri\Message\Handler\Router; + +/** + * class RpcManager + */ class RpcManager extends Component { - - + /** - * @var array + * @var Health */ - private array $_rpc = []; - - #[Inject(Health::class)] public Health $health; - - - /** - * @return void - * @throws Kiri\Exception\ConfigException - */ - public function reRegister(): void - { - $service = Kiri::getDi()->get(Agent::class); - - $config = Config::get("rpc.consul", null, true); - - $info = $service->service->service_health($config['ID']); - if ($info->getStatusCode() == 200) { - return; - } - $service->service->register($config); - } - - - /** - * @throws Exception - */ - public function tick(): void - { - try { - $this->reRegister(); - } catch (\Throwable $throwable) { - $this->logger->error(error_trigger_format($throwable)); - } - } - - + + /** * @param $serviceName * @return array|null @@ -72,8 +41,8 @@ class RpcManager extends Component } return array_column($body, 'Service'); } - - + + /** * @param string $name * @param string $class @@ -86,37 +55,22 @@ class RpcManager extends Component }); return true; } - - - /** - * @return array - */ - public function doneList(): array - { - $array = []; - foreach ($this->_rpc as $list) { - $array[] = $list; - } - return $array; - } - - + + /** + * @param array $config * @return void - * @throws Kiri\Exception\ConfigException */ - public function register(): void + public function register(array $config): void { $agent = Kiri::getDi()->get(Agent::class); - - $list = Config::get("rpc.consul", null, true); - - $agent->service->deregister($list['ID']); - $data = $agent->service->register($list); + $agent->checks->deregister($config['ID']); + $agent->service->deregister($config['ID']); + $data = $agent->service->register($config); if ($data->getStatusCode() != 200) { $this->logger->error($data->getBody()); } - + } - + }