getRouter(); foreach ($services as $service) { $this->addService($router, $server, $service); } } /** * @param $router * @param $server * @param $service * @throws Exception */ private function addService($router, $server, $service) { $mode = $service['mode'] ?? SWOOLE_SOCK_TCP6; $rpcServer = $server->addlistener($service['host'], $service['port'], $mode); $rpcServer->set([ 'open_tcp_keepalive' => true, 'tcp_keepidle' => 30, 'tcp_keepinterval' => 10, 'tcp_keepcount' => 10, 'open_http_protocol' => false, 'open_websocket_protocol' => false, ]); $router->addPortListen($service['port'], function () use ($service, $mode) { try { /** @var Request $request */ $request = Context::getContext('request'); if (($node = router()->find_path($this->replace($request, $service))) === null) { throw new Exception('Cmd not find.'); } return serialize($node->dispatch()); } catch (\Throwable $exception) { $this->addError($exception); return serialize(['state' => 'fail', 'message' => $exception->getMessage()]); } finally { fire(Event::SYSTEM_RESOURCE_RELEASES); } }); } /** * @param Request $request * @param array $service * @return Request * @throws Exception */ public function replace(Request $request, array $service): Request { $body = $request->params->getBodyAndClear(); if (is_null($serialize = unserialize($body))) { throw new Exception('Protocol format error.'); } if (!isset($serialize['cmd'])) { throw new Exception('Protocol format error.'); } $request->params->setPosts($serialize); $serialize['cmd'] = ltrim($serialize['cmd'], '/'); $header = $request->headers; $header->replace('request_uri', 'rpc/p' . $service['port'] . '/' . $serialize['cmd']); $header->replace('request_method', Request::HTTP_CMD); return $request; } }