getRouter(); foreach ($services as $service) { $this->addService($router, $server, $service); } } /** * @param $router * @param $server * @param $service * @throws Exception */ private function addService($router, $server, $service) { $mode = $service['mode'] ?? SWOOLE_SOCK_TCP6; $rpcServer = $server->addlistener($service['host'], $service['port'], $mode); $rpcServer->set([ 'open_tcp_keepalive' => true, 'tcp_keepidle' => 30, 'tcp_keepinterval' => 10, 'tcp_keepcount' => 10, 'open_http_protocol' => false, 'open_websocket_protocol' => false, ]); $router->addPortListen($service['port'], function () use ($service, $mode) { try { /** @var Request $request */ $request = Context::getContext('request'); if (($node = router()->find_path(Service::replace($request, $service))) === null) { throw new Exception('Cmd not find.'); } $response = $node->dispatch(); if (is_string($response)) { return $response; } return Json::encode($response); } catch (\Throwable $exception) { $this->addError($exception); return Json::encode(['state' => 'fail', 'message' => $exception->getMessage()]); } }); } /** * @param Request $request * @param array $service * @return Request * @throws Exception */ public static function replace(Request $request, array $service): Request { $body = $request->params->getBodyAndClear(); if (is_string($body) && is_null($body = Json::decode($body))) { throw new Exception('Protocol format error.'); } var_dump($body); if (!isset($body['cmd'])) { throw new Exception('Unknown system cmd.'); } $request->params->setPosts($body); $body['cmd'] = ltrim($body['cmd'], '/'); $header = $request->headers; $header->replace('request_uri', 'rpc/p' . $service['port'] . '/' . $body['cmd']); $header->replace('request_method', Request::HTTP_CMD); return $request; } }