From 39af292171f4e04fc09bf945d03ae48c617f3d42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mr=C2=B7x?= Date: Wed, 11 Aug 2021 19:14:26 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Annotation/Rpc/Consumer.php | 44 ----- Console/Command.php | 17 +- HttpServer/Http/Response.php | 36 +++- HttpServer/Route/Router.php | 1 - Rpc/Actuator.php | 45 ----- Rpc/Producer.php | 188 --------------------- Rpc/config.php | 42 ----- Server/Constrict/ResponseEmitter.php | 29 +++- System/Di/Attributes.php | 8 +- rpc-service/composer.json | 26 +++ rpc-service/src/Annotation/RpcConsumer.php | 48 ++++++ rpc-service/src/Annotation/RpcService.php | 57 +++++++ {Rpc => rpc-service/src}/Client.php | 29 +--- {Rpc => rpc-service/src}/Consumer.php | 0 rpc-service/src/DefaultRpcController.php | 29 ++++ {Rpc => rpc-service/src}/IProducer.php | 0 rpc-service/src/Protocol.php | 129 ++++++++++++++ rpc-service/src/Registry.php | 10 ++ {Rpc => rpc-service/src}/Service.php | 96 +++++++---- rpc-service/src/TestRpcClient.php | 52 ++++++ rpc-service/src/config.php | 39 +++++ 21 files changed, 533 insertions(+), 392 deletions(-) delete mode 100644 Annotation/Rpc/Consumer.php delete mode 100644 Rpc/Actuator.php delete mode 100644 Rpc/Producer.php delete mode 100644 Rpc/config.php create mode 100644 rpc-service/composer.json create mode 100644 rpc-service/src/Annotation/RpcConsumer.php create mode 100644 rpc-service/src/Annotation/RpcService.php rename {Rpc => rpc-service/src}/Client.php (78%) rename {Rpc => rpc-service/src}/Consumer.php (100%) create mode 100644 rpc-service/src/DefaultRpcController.php rename {Rpc => rpc-service/src}/IProducer.php (100%) create mode 100644 rpc-service/src/Protocol.php create mode 100644 rpc-service/src/Registry.php rename {Rpc => rpc-service/src}/Service.php (57%) create mode 100644 rpc-service/src/TestRpcClient.php create mode 100644 rpc-service/src/config.php diff --git a/Annotation/Rpc/Consumer.php b/Annotation/Rpc/Consumer.php deleted file mode 100644 index 6077af03..00000000 --- a/Annotation/Rpc/Consumer.php +++ /dev/null @@ -1,44 +0,0 @@ -getRpc(); - $rpc->addConsumer($this->cmd, [$class, $method]); - return true; // TODO: Change the autogenerated stub - } - - -} diff --git a/Console/Command.php b/Console/Command.php index b9ddf138..30311706 100644 --- a/Console/Command.php +++ b/Console/Command.php @@ -3,6 +3,7 @@ declare(strict_types=1); namespace Console; +use Exception; use ReflectionException; use Kiri\Abstracts\BaseObject; use Kiri\Abstracts\TraitApplication; @@ -44,13 +45,13 @@ abstract class Command extends BaseObject implements CommandInterface } - /** - * @param $name - * @return mixed - * @throws ComponentException - * @throws NotFindClassException - * @throws ReflectionException - */ + /** + * @param $name + * @return mixed + * @throws NotFindClassException + * @throws ReflectionException + * @throws Exception + */ public function __get($name): mixed { if ($this->has($name)) { @@ -63,6 +64,7 @@ abstract class Command extends BaseObject implements CommandInterface /** * @param $name * @return bool + * @throws */ private function has($name): bool { @@ -74,7 +76,6 @@ abstract class Command extends BaseObject implements CommandInterface * @param $name * @return mixed * @throws ReflectionException - * @throws ComponentException * @throws NotFindClassException */ private function get($name): mixed diff --git a/HttpServer/Http/Response.php b/HttpServer/Http/Response.php index 537c67ba..7af8deeb 100644 --- a/HttpServer/Http/Response.php +++ b/HttpServer/Http/Response.php @@ -16,8 +16,9 @@ use HttpServer\Http\Formatter\HtmlFormatter; use HttpServer\Http\Formatter\JsonFormatter; use HttpServer\Http\Formatter\XmlFormatter; use HttpServer\IInterface\IFormatter; -use Server\ResponseInterface; use Kiri\Exception\NotFindClassException; +use Server\ResponseInterface; +use Server\ServerManager; use Swoole\Http\Response as SResponse; /** @@ -53,6 +54,29 @@ class Response extends HttpService implements ResponseInterface ]; public int $fd = 0; + private int $clientId = 0; + private int $reactorId = 0; + + + /** + * @param int $int + * @param int $reID + */ + public function setClientId(int $int, int $reID) + { + $this->clientId = $int; + $this->reactorId = $reID; + } + + + /** + * @return mixed + */ + public function getClientInfo(): mixed + { + $server = ServerManager::getContext()->getServer(); + return $server->getClientInfo($this->clientId, $this->reactorId); + } /** @@ -64,6 +88,15 @@ class Response extends HttpService implements ResponseInterface } + /** + * @return int + */ + public function getClientId(): int + { + return $this->clientId; + } + + /** * @param $format * @return $this @@ -117,6 +150,7 @@ class Response extends HttpService implements ResponseInterface /** * @param string $path * @param bool $isChunk + * @param int $offset * @param int $limit * @return $this|Response * @throws Exception diff --git a/HttpServer/Route/Router.php b/HttpServer/Route/Router.php index 573cc883..77b80fe5 100644 --- a/HttpServer/Route/Router.php +++ b/HttpServer/Route/Router.php @@ -14,7 +14,6 @@ use HttpServer\IInterface\Middleware; use HttpServer\IInterface\RouterInterface; use JetBrains\PhpStorm\Pure; use ReflectionException; -use Rpc\Actuator; use Server\RequestInterface; use Kiri\Abstracts\Config; use Kiri\Exception\ConfigException; diff --git a/Rpc/Actuator.php b/Rpc/Actuator.php deleted file mode 100644 index 20ff5e72..00000000 --- a/Rpc/Actuator.php +++ /dev/null @@ -1,45 +0,0 @@ -router = Kiri::getApp('router'); - } - - - /** - * @param string $path - * @param string|callable $callback - * @throws Exception - */ - public function addListener(string $path, string|callable $callback): void - { - $this->router->addRoute('rpc/p' . $this->port . '/' . ltrim($path, '/'), $callback); - } - - -} diff --git a/Rpc/Producer.php b/Rpc/Producer.php deleted file mode 100644 index 77d3e4ac..00000000 --- a/Rpc/Producer.php +++ /dev/null @@ -1,188 +0,0 @@ -getName($name, $producer); - - $snowflake = Kiri::app(); - if (!$snowflake->has($producerName)) { - return $snowflake->set($producerName, $this->definer($name, $producer)); - } else { - return $snowflake->get($producerName); - } - } - - - /** - * @param string $name - * @param string|null $host - * @return Client - * @throws Exception - */ - public function consumer(string $name, string $host = null): Client - { - return $this->getClient($name, $host); - } - - - /** - * @param $name - * @param $producer - * @return array - */ - #[ArrayShape(['class' => "string", 'service' => "", 'config' => ""])] - private function definer($name, $producer): array - { - return ['class' => Client::class, 'service' => $name, 'config' => $producer]; - } - - - /** - * @param $name - * @return Client|bool - * @throws Exception - */ - public function __get($name): Client|bool - { - return $this->get($name); // TODO: Change the autogenerated stub - } - - - - /** - * @param $name - * @param $config - * @return string - */ - private function getName($name, $config): string - { - return 'rpc.client.' . $name . '.' . $config['host']; - } - -} diff --git a/Rpc/config.php b/Rpc/config.php deleted file mode 100644 index a05df00d..00000000 --- a/Rpc/config.php +++ /dev/null @@ -1,42 +0,0 @@ -addRpcService(9527, function (\Rpc\Actuator $actuator) { - $actuator->addListener('', ''); - $actuator->addListener('', ''); -}); - -return [ - 'rpc' => [ - 'type' => Server::TCP, - 'host' => '0.0.0.0', - 'mode' => SWOOLE_SOCK_TCP, - 'port' => 5377, - 'setting' => [ - 'open_tcp_keepalive' => true, - 'tcp_keepidle' => 30, - 'tcp_keepinterval' => 10, - 'tcp_keepcount' => 10, - 'open_http_protocol' => false, - 'open_websocket_protocol' => false, - ], - 'events' => [ - Server::SERVER_ON_CONNECT => [], - Server::SERVER_ON_CLOSE => [], - ], - 'registry' => [ - 'protocol' => 'consul', - 'address' => [ - 'host' => '47.14.25.45', - 'port' => 5527, - 'path' => '' - ], - ], - ] - -]; diff --git a/Server/Constrict/ResponseEmitter.php b/Server/Constrict/ResponseEmitter.php index a6007bee..2205cec6 100644 --- a/Server/Constrict/ResponseEmitter.php +++ b/Server/Constrict/ResponseEmitter.php @@ -4,9 +4,11 @@ namespace Server\Constrict; use Exception; use HttpServer\Http\Formatter\FileFormatter; +use HttpServer\IInterface\IFormatter; +use Kiri\Exception\NotFindClassException; use ReflectionException; use Server\ResponseInterface; -use Kiri\Exception\NotFindClassException; +use Swoole\Server; /** @@ -17,15 +19,19 @@ class ResponseEmitter /** - * @param \Swoole\Http\Response|\Swoole\Http2\Response $response + * @param \Swoole\Http\Response|\Swoole\Http2\Response|Server $response * @param ResponseInterface $emitter - * @throws ReflectionException * @throws NotFindClassException + * @throws ReflectionException * @throws Exception */ - public function sender(\Swoole\Http\Response|\Swoole\Http2\Response $response, ResponseInterface $emitter) + public function sender(\Swoole\Http\Response|\Swoole\Http2\Response|Server $response, ResponseInterface $emitter) { $content = $emitter->configure($response)->getContent(); + if ($response instanceof Server) { + $this->sendTcpData($response, $emitter, $content); + return; + } if ($content instanceof FileFormatter) { $this->download($content->getData(), $response); } else { @@ -35,6 +41,21 @@ class ResponseEmitter } + /** + * @param Server $response + * @param ResponseInterface $emitter + * @param IFormatter $formatter + */ + private function sendTcpData(Server $response, ResponseInterface $emitter, IFormatter $formatter) + { + if ($formatter instanceof FileFormatter) { + $response->sendfile($emitter->getClientId(), $formatter->getData()); + } else { + $response->send($emitter->getClientId(), $formatter->getData()); + } + } + + const IMAGES = [ 'png' => 'image/png', 'jpeg' => 'image/jpeg', diff --git a/System/Di/Attributes.php b/System/Di/Attributes.php index 1d617cd7..4ec7445d 100644 --- a/System/Di/Attributes.php +++ b/System/Di/Attributes.php @@ -112,11 +112,15 @@ trait Attributes /** - * @param ReflectionClass $class + * @param ReflectionClass|string $class * @return array + * @throws \ReflectionException */ - #[Pure] public function getMethods(ReflectionClass $class): array + public function getMethods(ReflectionClass|string $class): array { + if (is_string($class)) { + $class = $this->getReflect($class); + } return $this->_classMethod[$class->getName()] ?? []; } diff --git a/rpc-service/composer.json b/rpc-service/composer.json new file mode 100644 index 00000000..edc9f1de --- /dev/null +++ b/rpc-service/composer.json @@ -0,0 +1,26 @@ +{ + "name": "game-worker/rpc-server", + "description": "db", + "authors": [ + { + "name": "XiangLin", + "email": "as2252258@163.com" + } + ], + "license": "MIT", + "require": { + "php": ">=8.0", + "ext-json": "*", + "ext-pdo": "*", + "game-worker/snowflake": "dev-master", + "game-worker/validator": "dev-master" + }, + "autoload": { + "psr-4": { + "Rpc\\": "src/" + } + }, + "require-dev": { + "kwn/php-rdkafka-stubs": "^2.0" + } +} diff --git a/rpc-service/src/Annotation/RpcConsumer.php b/rpc-service/src/Annotation/RpcConsumer.php new file mode 100644 index 00000000..5f8d8420 --- /dev/null +++ b/rpc-service/src/Annotation/RpcConsumer.php @@ -0,0 +1,48 @@ +getRouter(); + + $methods = Kiri::getDi()->getMethods($class::class); + foreach ($methods as $method => $reflectionMethod) { + $router->addRoute(':rpc/' . $this->package . '/' . $method . '/' . $this->version, [$class, $method], $this->server); + } + return $router; + } + + +} diff --git a/Rpc/Client.php b/rpc-service/src/Client.php similarity index 78% rename from Rpc/Client.php rename to rpc-service/src/Client.php index 12861cca..bee7eb69 100644 --- a/Rpc/Client.php +++ b/rpc-service/src/Client.php @@ -6,7 +6,6 @@ namespace Rpc; use Exception; use Kiri\Abstracts\Component; -use Kiri\Channel; use Kiri\Core\Json; use Swoole\Coroutine\Client as CClient; @@ -28,12 +27,13 @@ class Client extends Component /** - * @param string $cmd + * @param string $package + * @param string $method * @param array $param * @return mixed * @throws Exception */ - public function dispatch(string $cmd, array $param): mixed + public function dispatch(string $package, string $method, array $param): mixed { if (empty($this->config)) { return $this->addError('Related service not found(404)'); @@ -44,12 +44,11 @@ class Client extends Component if (!$this->client->isConnected() && !$this->connect()) { return false; } - - $isSend = $this->client->send(implode("\n", [$cmd, '', serialize($param)])); + $isSend = $this->client->send(Protocol::encode($package, $method, $param)); if ($isSend === false) { return $this->addError($this->client->errMsg . '(' . $this->client->errCode . ')'); } - defer(fn() => $this->clientRecover()); + defer(fn() => $this->client->close()); if (is_bool($unpack = Json::decode($this->client->recv()))) { $unpack = $this->addError('Service return data format error(500)'); } @@ -57,22 +56,6 @@ class Client extends Component } - /** - * @return Client - * @throws Exception - */ - public function clientRecover(): static - { - $host = $this->config['host'] ?? '127.0.0.1'; - $port = $this->config['port'] ?? 0; - if ($port < 0) { - return $this; - } - $this->client = null; - return $this; - } - - /** * @return bool * @throws Exception @@ -113,7 +96,7 @@ class Client extends Component if ($port < 0) { throw new Exception('Related service not have port(404)'); } - $client = new CClient($this->config['mode'] ?? SWOOLE_SOCK_TCP); + $client = new CClient(SWOOLE_SOCK_TCP); $client->set([ 'timeout' => 0.5, 'connect_timeout' => 1.0, diff --git a/Rpc/Consumer.php b/rpc-service/src/Consumer.php similarity index 100% rename from Rpc/Consumer.php rename to rpc-service/src/Consumer.php diff --git a/rpc-service/src/DefaultRpcController.php b/rpc-service/src/DefaultRpcController.php new file mode 100644 index 00000000..33a4ce62 --- /dev/null +++ b/rpc-service/src/DefaultRpcController.php @@ -0,0 +1,29 @@ +int('a') + $request->int('b'); + } + +} diff --git a/Rpc/IProducer.php b/rpc-service/src/IProducer.php similarity index 100% rename from Rpc/IProducer.php rename to rpc-service/src/IProducer.php diff --git a/rpc-service/src/Protocol.php b/rpc-service/src/Protocol.php new file mode 100644 index 00000000..4fe1fb2c --- /dev/null +++ b/rpc-service/src/Protocol.php @@ -0,0 +1,129 @@ +version; + } + + /** + * @return array + */ + public function getHeaders(): array + { + return $this->headers; + } + + /** + * @return mixed + */ + public function getData(): mixed + { + return $this->data; + } + + + /** + * @throws Exception + */ + public static function parse($data): static + { + $protocol = new Protocol(); + $protocol->parseHeaders(...explode("\r\n\r\n", $data)); + return $protocol; + } + + + /** + * @param string $service + * @param string $cmd + * @param array $param + * @return string + */ + public static function encode(string $service, string $cmd, array $param = []): string + { + $proto = 'REQUEST tcp/other.protocol v1.0' . "\r\n"; + $proto .= ':Source: ' . implode(',', swoole_get_local_ip()) . "\r\n"; + $proto .= ':Package: ' . $service . "\r\n"; + $proto .= ':Path: ' . $cmd . "\r\n"; + $proto .= ':Content-Type: ' . $cmd . "\r\n"; + $proto .= ':Method: json-rpc' . "\r\n\r\n"; + + return $proto . json_encode($param) . "\r\n\r\n"; + } + + + /** + * @param string $body + * @return void + */ + private function parseBody(string $body): void + { + if ($this->headers['Content-Type'] == RpcService::PROTOCOL_JSON) { + $this->data = json_decode($body, true); + } else { + $this->data = unserialize($body); + } + } + + + /** + * @param string $headers + * @param string $body + * @return void + * @throws Exception + */ + private function parseHeaders(string $headers, string $body): void + { + $explode = explode("\r\n", $headers); + $this->headers = []; + foreach ($explode as $key => $value) { + if ($key == 0) { + if (!str_starts_with($value, 'REQUEST tcp/other.protocol')) { + throw new Exception('Protocol format error.'); + } + $this->version = str_replace('REQUEST tcp/other.protocol ', '', $value); + continue; + } + [$name, $item] = explode(': ', $value); + $this->headers[str_replace(':', '', $name)] = $item; + } + if (count(array_diff_key(Service::A_DEFAULT, $this->headers)) > 0) { + throw new Exception('Protocol format error.'); + } + $this->parseBody($body); + } + + + /** + * @return string + */ + #[Pure] public function parseUrl(): string + { + return ':rpc/' . $this->headers['Package'] . '/' . $this->headers['Path'] . '/' . $this->getVersion(); + } + +} diff --git a/rpc-service/src/Registry.php b/rpc-service/src/Registry.php new file mode 100644 index 00000000..6fa90f2e --- /dev/null +++ b/rpc-service/src/Registry.php @@ -0,0 +1,10 @@ + '', + 'Package' => '', + 'Path' => '', + 'Content-Type' => '', + 'Method' => '' + ]; private Router $router; @@ -36,17 +51,36 @@ class Service extends \Server\Abstracts\Server public EventProvider $eventProvider; + public Response $response; + + /** @var EventDispatch */ #[Inject(EventDispatch::class)] public EventDispatch $eventDispatch; + #[Inject(ResponseEmitter::class)] + public ResponseEmitter $responseEmitter; + + + /** + * @var ExceptionHandlerInterface + */ + public ExceptionHandlerInterface $exceptionHandler; + + /** * @throws Exception */ public function init() { $this->router = Kiri::getApp('router'); + + $exceptionHandler = Config::get('exception.http', ExceptionHandlerDispatcher::class); + if (!in_array(ExceptionHandlerInterface::class, class_implements($exceptionHandler))) { + $exceptionHandler = ExceptionHandlerDispatcher::class; + } + $this->exceptionHandler = Kiri::getDi()->get($exceptionHandler); } @@ -103,15 +137,18 @@ class Service extends \Server\Abstracts\Server { defer(fn() => $this->eventDispatch->dispatch(new OnAfterRequest())); try { - $client = $server->getClientInfo($fd, $reID); - - $request = $this->requestSpl((int)$client['server_port'], $data, $fd); - - $result = $this->router->find_path($request)?->dispatch(); - - $server->send($fd, $result); + $node = $this->router->Branch_search($this->requestSpl($data, $fd)); + if (!($node instanceof Node)) { + throw new RequestException('

HTTP 404 Not Found


Powered by Swoole', 404); + } + $this->response->setClientId($fd, $reID); + if (!(($responseData = $node->dispatch()) instanceof ResponseInterface)) { + $responseData = $this->response->setContent($responseData)->setStatusCode(200); + } } catch (\Throwable $exception) { - $server->send($fd, $exception->getMessage()); + $responseData = $this->exceptionHandler->emit($exception, $this->response); + } finally { + $this->responseEmitter->sender($server, $responseData); } } @@ -138,34 +175,25 @@ class Service extends \Server\Abstracts\Server /** - * @param int $server_port * @param string $data * @param int $fd * @return mixed + * @throws NotFindClassException + * @throws \ReflectionException * @throws Exception */ - public function requestSpl(int $server_port, string $data, int $fd = 0): \HttpServer\Http\Request + public function requestSpl(string $data, int $fd = 0): RequestInterface { + $data = Protocol::parse($data); $sRequest = new Request(); - - [$cmd, $repeat, $body] = explode("\n", $data); - if (is_null($body) || is_null($cmd) || !empty($repeat)) { - throw new Exception('Protocol format error.'); - } - - if (is_string($body) && is_null($data = Json::decode($body))) { - throw new Exception('Protocol format error.'); - } - - $sRequest->fd = $fd; - $sRequest->post = $data; - $sRequest->header['request_uri'] = 'rpc/p' . $server_port . '/' . ltrim($cmd, '/'); - $sRequest->header['request_method'] = 'rpc'; - + $sRequest->setClientId($fd); + $sRequest->setPosts($data->getData()); + $sRequest->setHeaders(array_merge($data->getHeaders(), [ + 'request_uri' => $data->parseUrl(), + 'request_method' => $data->getHeaders()['Method'] + ])); Context::setContext(Request::class, $sRequest); - - return \request(); + return di(cRequest::class); } - } diff --git a/rpc-service/src/TestRpcClient.php b/rpc-service/src/TestRpcClient.php new file mode 100644 index 00000000..a88a48b7 --- /dev/null +++ b/rpc-service/src/TestRpcClient.php @@ -0,0 +1,52 @@ +getRegistry(); + + $client = di(Client::class); + $client->config = ['host' => $host, 'port' => $port, 'timeout' => $this->timeout]; + $client->dispatch($this->package, $this->method, $param); + } + +} diff --git a/rpc-service/src/config.php b/rpc-service/src/config.php new file mode 100644 index 00000000..1bbc8541 --- /dev/null +++ b/rpc-service/src/config.php @@ -0,0 +1,39 @@ + [ + 'name' => 'json-rpc', + 'type' => Constant::SERVER_TYPE_TCP, + 'host' => '0.0.0.0', + 'mode' => SWOOLE_SOCK_TCP, + 'port' => 5377, + 'setting' => [ + 'open_tcp_keepalive' => true, + 'tcp_keepidle' => 30, + 'tcp_keepinterval' => 10, + 'tcp_keepcount' => 10, + 'open_http_protocol' => false, + 'open_websocket_protocol' => false, + ], + 'events' => [ + Constant::CONNECT => [], + Constant::CLOSE => [], + ], + 'registry' => [ + 'protocol' => 'consul', + 'address' => [ + 'host' => '47.14.25.45', + 'port' => 5527, + 'path' => '' + ], + ], + + 'consumers' => [ + + ] + ] + +];