diff --git a/RpcJsonp.php b/RpcJsonp.php index 21537d6..ab5d84b 100644 --- a/RpcJsonp.php +++ b/RpcJsonp.php @@ -42,193 +42,196 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa { - private int $timerId; + private int $timerId; - /** - * @param ContainerInterface $container - * @param Router $router - * @param Annotation $annotation - * @param DataGrip $dataGrip - * @param RpcManager $manager - * @param RouterCollector $collector - * @param EventProvider $eventProvider - * @param array $config - * @throws Exception - */ - public function __construct(public ContainerInterface $container, - public Router $router, - public Annotation $annotation, - public DataGrip $dataGrip, - public RpcManager $manager, - public RouterCollector $collector, - public EventProvider $eventProvider, - array $config = []) - { - parent::__construct($config); - } + /** + * @param ContainerInterface $container + * @param Router $router + * @param Annotation $annotation + * @param DataGrip $dataGrip + * @param RpcManager $manager + * @param RouterCollector $collector + * @param EventProvider $eventProvider + * @param array $config + * @throws Exception + */ + public function __construct(public ContainerInterface $container, + public Router $router, + public Annotation $annotation, + public DataGrip $dataGrip, + public RpcManager $manager, + public RouterCollector $collector, + public EventProvider $eventProvider, + array $config = []) + { + parent::__construct($config); + } - /** - * @return void - * @throws ReflectionException - */ - public function init(): void - { - $this->eventProvider->on(OnBeforeShutdown::class, [$this, 'onBeforeShutdown']); + /** + * @return void + * @throws ReflectionException + */ + public function init(): void + { + $this->eventProvider->on(OnBeforeShutdown::class, [$this, 'onBeforeShutdown']); - scan_directory(APP_PATH . 'rpc', 'app\Rpc'); + scan_directory(APP_PATH . 'rpc', 'app\Rpc'); - $this->eventProvider->on(OnWorkerStart::class, [$this, 'consulWatches']); - $this->eventProvider->on(OnWorkerExit::class, [$this, 'onWorkerExit']); - $this->eventProvider->on(OnServerBeforeStart::class, [$this, 'register']); + $this->eventProvider->on(OnWorkerStart::class, [$this, 'consulWatches']); + $this->eventProvider->on(OnWorkerExit::class, [$this, 'onWorkerExit']); + $this->eventProvider->on(OnServerBeforeStart::class, [$this, 'register']); - $this->collector = $this->dataGrip->get('rpc'); - } + $this->collector = $this->dataGrip->get('rpc'); + } - /** - * @param OnBeforeShutdown $beforeShutdown - * @return void - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - */ - public function onBeforeShutdown(OnBeforeShutdown $beforeShutdown): void - { - $doneList = $this->manager->doneList(); - $agent = $this->container->get(Agent::class); - foreach ($doneList as $value) { - $agent->service->deregister($value['config']['ID']); - $agent->checks->deregister($value['config']['Check']['CheckId']); - } - } + /** + * @param OnBeforeShutdown $beforeShutdown + * @return void + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + */ + public function onBeforeShutdown(OnBeforeShutdown $beforeShutdown): void + { + $doneList = $this->manager->doneList(); + $agent = $this->container->get(Agent::class); + foreach ($doneList as $value) { + $agent->service->deregister($value['config']['ID']); + $agent->checks->deregister($value['config']['Check']['CheckId']); + } + } - /** - * @param OnWorkerStart|OnTaskerStart $server - * @throws ConfigException - */ - public function consulWatches(OnWorkerStart|OnTaskerStart $server) - { - if ($server->workerId != 0) { - return; - } - $async_time = (int)Config::get('consul.async_time', 1000); - $this->timerId = Timer::tick($async_time, static function () { - Kiri::getDi()->get(RpcManager::class)->tick(); - }); - } + /** + * @param OnWorkerStart|OnTaskerStart $server + * @throws ConfigException + */ + public function consulWatches(OnWorkerStart|OnTaskerStart $server) + { + if ($server->workerId != 0) { + return; + } + $async_time = (int)Config::get('consul.async_time', 1000); + $this->timerId = Timer::tick($async_time, static function () { + Kiri::getDi()->get(RpcManager::class)->tick(); + }); + } - /** - * @param OnWorkerExit $exit - * @return void - */ - public function onWorkerExit(OnWorkerExit $exit): void - { - Timer::clear($this->timerId); - } + /** + * @param OnWorkerExit $exit + * @return void + */ + public function onWorkerExit(OnWorkerExit $exit): void + { + Timer::clear($this->timerId); + } - /** - * @param OnServerBeforeStart $server - */ - public function register(OnServerBeforeStart $server) - { - $this->manager->register(); - } + /** + * @param OnServerBeforeStart $server + */ + public function register(OnServerBeforeStart $server) + { + $this->manager->register(); + } - /** - * @param Server $server - * @param int $fd - */ - public function onConnect(Server $server, int $fd): void - { - // TODO: Implement onConnect() method. - } + /** + * @param Server $server + * @param int $fd + */ + public function onConnect(Server $server, int $fd): void + { + // TODO: Implement onConnect() method. + } - /** - * @param Server $server - * @param int $fd - * @param int $reactor_id - * @param string $data - */ - public function onReceive(Server $server, int $fd, int $reactor_id, string $data): void - { - $data = json_decode($data, true); - if (is_null($data)) { - $this->failure(-32700, 'Parse error语法解析错误'); - } else if (!isset($data['jsonrpc']) || !isset($data['method']) || $data['jsonrpc'] != '2.0') { - $this->failure(-32600, 'Invalid Request无效请求'); - } else { - $this->batchDispatch($server, $fd, $data); - } - } + /** + * @param Server $server + * @param int $fd + * @param int $reactor_id + * @param string $data + */ + public function onReceive(Server $server, int $fd, int $reactor_id, string $data): void + { + $data = json_decode($data, true); + if (is_null($data)) { + $this->failure(-32700, 'Parse error语法解析错误'); + } else if (!isset($data['jsonrpc']) || !isset($data['method']) || $data['jsonrpc'] != '2.0') { + $this->failure(-32600, 'Invalid Request无效请求'); + } else { + $this->batchDispatch($server, $fd, $data); + } + } - /** - * @param Server $server - * @param int $fd - * @param array $data - * @return void - */ - private function batchDispatch(Server $server, int $fd, array $data): void - { - if (isset($data['jsonrpc'])) { - $dispatch = $this->dispatch($data); - if (!isset($data['id'])) { - $dispatch = [1]; - } - $result = json_encode($dispatch, JSON_UNESCAPED_UNICODE); - } else { - $channel = new Channel($total = count($data)); - foreach ($data as $datum) { - $this->_execute($channel, $datum); - } - $result = []; - for ($i = 0; $i < $total; $i++) { - $result[] = $channel->pop(); - } - } - $server->send($fd, json_encode($result, JSON_UNESCAPED_UNICODE)); - } + /** + * @param Server $server + * @param int $fd + * @param array $data + * @return void + */ + private function batchDispatch(Server $server, int $fd, array $data): void + { + if (isset($data['jsonrpc'])) { + $dispatch = $this->dispatch($data); + if (!isset($data['id'])) { + $dispatch = [1]; + } + $result = json_encode($dispatch, JSON_UNESCAPED_UNICODE); + } else { + $channel = new Channel($total = count($data)); + foreach ($data as $datum) { + $this->_execute($channel, $datum); + } + $result = []; + for ($i = 0; $i < $total; $i++) { + $result[] = $channel->pop(); + } + } + $server->send($fd, json_encode($result, JSON_UNESCAPED_UNICODE)); + } - /** - * @param $channel - * @param $datum - */ - private function _execute($channel, $datum) - { - Coroutine::create(function () use ($channel, $datum) { - if (empty($datum) || !isset($datum['jsonrpc'])) { - $channel->push($this->failure(-32700, 'Parse error语法解析错误')); - } else if (!isset($datum['method'])) { - $channel->push($this->failure(-32700, 'Parse error语法解析错误')); - } else { - $dispatch = $this->dispatch($datum); - if (!isset($dispatch['id'])) { - $dispatch = [1]; - } - $channel->push($dispatch); - } - }); - } + /** + * @param $channel + * @param $datum + */ + private function _execute($channel, $datum) + { + Coroutine::create(function () use ($channel, $datum) { + if (empty($datum) || !isset($datum['jsonrpc'])) { + $channel->push($this->failure(-32700, 'Parse error语法解析错误')); + } else if (!isset($datum['method'])) { + $channel->push($this->failure(-32700, 'Parse error语法解析错误')); + } else { + $dispatch = $this->dispatch($datum); + if (!isset($dispatch['id'])) { + $dispatch = [1]; + } + $channel->push($dispatch); + } + }); + } - /** - * @param $data - * @return array - */ - private function dispatch($data): array - { - try { - $handler = $this->collector->find($data['service'], 'GET'); - var_dump($this->collector, $data); - if (is_integer($handler) || is_null($handler)) { - throw new Exception('Handler not found', -32601); + /** + * @param $data + * @return array + */ + private function dispatch($data): array + { + try { + if (!str_starts_with($data['service'], '/')) { + $data['service'] = '/' . $data['service']; + } + $handler = $this->collector->find($data['service'], 'GET'); + var_dump($handler, $data); + if (is_integer($handler) || is_null($handler)) { + throw new Exception('Handler not found', -32601); } $controller = $handler->callback[0]; @@ -241,73 +244,73 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa return $this->handler($controller, $data['method'], $params); } catch (\Throwable $throwable) { - $code = $throwable->getCode() == 0 ? -32603 : $throwable->getCode(); - return $this->failure($code, jTraceEx($throwable), [], $data['id'] ?? null); - } - } + $code = $throwable->getCode() == 0 ? -32603 : $throwable->getCode(); + return $this->failure($code, jTraceEx($throwable), [], $data['id'] ?? null); + } + } - /** - * @param $params - * @return ServerRequestInterface - * @throws Exception - */ - private function createServerRequest($params): ServerRequestInterface - { - return (new ServerRequest())->withParsedBody($params); - } + /** + * @param $params + * @return ServerRequestInterface + * @throws Exception + */ + private function createServerRequest($params): ServerRequestInterface + { + return (new ServerRequest())->withParsedBody($params); + } - /** - * @param $controller - * @param string $method - * @param $params - * @return array - */ + /** + * @param $controller + * @param string $method + * @param $params + * @return array + */ #[ArrayShape([])] private function handler($controller, string $method, $params): array - { - $result = call_user_func([$controller, $method], ...$params); - return [ - 'jsonrpc' => '2.0', - 'result' => $result, - 'id' => $data['id'] ?? null - ]; - } + { + $result = call_user_func([$controller, $method], ...$params); + return [ + 'jsonrpc' => '2.0', + 'result' => $result, + 'id' => $data['id'] ?? null + ]; + } - /** - * @param $code - * @param $message - * @param array $data - * @param null $id - * @return array - */ + /** + * @param $code + * @param $message + * @param array $data + * @param null $id + * @return array + */ #[ArrayShape([])] - protected function failure($code, $message, array $data = [], $id = null): array - { - $error = [ - 'jsonrpc' => '2.0', - 'error' => [ - 'code' => $code, - 'message' => $message, - 'data' => $data - ] - ]; - if (!is_null($id)) { - $error['id'] = $id; - } - return $error; - } + protected function failure($code, $message, array $data = [], $id = null): array + { + $error = [ + 'jsonrpc' => '2.0', + 'error' => [ + 'code' => $code, + 'message' => $message, + 'data' => $data + ] + ]; + if (!is_null($id)) { + $error['id'] = $id; + } + return $error; + } - /** - * @param \Swoole\WebSocket\Server $server - * @param int $fd - * @return void - */ - public function onClose(\Swoole\WebSocket\Server $server, int $fd): void - { - // TODO: Implement onClose() method. - } + /** + * @param \Swoole\WebSocket\Server $server + * @param int $fd + * @return void + */ + public function onClose(\Swoole\WebSocket\Server $server, int $fd): void + { + // TODO: Implement onClose() method. + } }