diff --git a/AbstractRpcClient.php b/AbstractRpcClient.php new file mode 100644 index 0000000..debdb9b --- /dev/null +++ b/AbstractRpcClient.php @@ -0,0 +1,69 @@ +transporter = $transporter; + } + + + /** + * @return string + */ + public function getService(): string + { + if (empty($this->service)) { + return get_called_class(); + } + return $this->service; + } + + + /** + * @param string $method + * @param ...$args + * @return string|bool + */ + protected function send(string $method, ...$args): string|bool + { + $result = $this->transporter->push(Json::encode([ + 'jsonrpc' => $this->version, + 'service' => $this->getService(), + 'method' => $method, + 'params' => $args, + 'id' => Number::create(time()) + ]), $this->getService()); + if (is_string($result)) { + return json_decode($result, true); + } else { + return $result; + } + } + +} diff --git a/Annotation/JsonRpc.php b/Annotation/JsonRpc.php deleted file mode 100644 index 97fc8e9..0000000 --- a/Annotation/JsonRpc.php +++ /dev/null @@ -1,41 +0,0 @@ -get(RpcManager::class); - - return $manager->add($this->service, $class); - } - -} diff --git a/ClientPool.php b/ClientPool.php index e3a796c..fe56d2c 100644 --- a/ClientPool.php +++ b/ClientPool.php @@ -51,7 +51,7 @@ class ClientPool extends Component * @return void * @throws Exception */ - public function onBeforeShutdown() + public function onBeforeShutdown(): void { foreach ($this->names as $name) { $this->getPool()->clean($name); diff --git a/Consul.php b/Consul.php index 4aefa89..8341eb8 100644 --- a/Consul.php +++ b/Consul.php @@ -50,7 +50,7 @@ class Consul extends Component * @throws ContainerExceptionInterface * @throws NotFoundExceptionInterface */ - public function deregister() + public function deregister(): void { if (env('environmental') != Kiri::WORKER && env('environmental_workerId') != 0) { return; diff --git a/JsonRpcConsumers.php b/JsonRpcConsumers.php deleted file mode 100644 index f04707d..0000000 --- a/JsonRpcConsumers.php +++ /dev/null @@ -1,161 +0,0 @@ -client->withConfig($this->get_consul($this->name))->sendRequest( - $this->requestBody([ - 'jsonrpc' => $version, - 'service' => str_starts_with($this->name, '/') ? $this->name : '/' . $this->name, - 'method' => $method, - 'params' => $data, - ]) - ); - } - - - /** - * @param array $data - * @return ServerRequestInterface - */ - private function requestBody(array $data): ServerRequestInterface - { - $server = Kiri::getDi()->get(ServerRequest::class); - return $server->withBody(new Stream(json_encode($data))); - } - - - /** - * @param string $method - * @param mixed $data - * @param string $version - * @param string $id - * @return mixed - * @throws Exception - * @throws ClientExceptionInterface - */ - public function get(string $method, mixed $data, string $version = '2.0', string $id = ''): ResponseInterface - { - if (empty($id)) $id = Number::create(time()); - - return $this->client->withConfig($this->get_consul($this->name))->sendRequest( - $this->requestBody([ - 'jsonrpc' => $version, - 'service' => str_starts_with($this->name, '/') ? $this->name : '/' . $this->name, - 'method' => $method, - 'params' => $data, - 'id' => $id - ]) - ); - } - - - /** - * @param array $data - * @return mixed - * @throws ClientExceptionInterface - * @throws Exception - */ - public function batch(array $data): mixed - { - return $this->client->withConfig($this->get_consul($this->name))->sendRequest( - $this->requestBody($data) - ); - } - - - /** - * @param $service - * @return array - * @throws RpcServiceException|\ReflectionException - * @throws Exception - */ - #[ArrayShape(['Address' => "mixed", 'Port' => "mixed"])] - private function get_consul($service): array - { - if (empty($service)) { - throw new RpcServiceException('You need set rpc service name if used.'); - } - $sf = $this->manager->getServices($service); - if (empty($sf) || !is_array($sf)) { - throw new RpcServiceException('You need set rpc service name if used.'); - } - return $this->_loadRand($sf); - } - - - /** - * @param $services - * @return array - */ - #[ArrayShape(['Address' => "mixed", 'Port' => "mixed"])] - private function _loadRand($services): array - { - $array = []; - foreach ($services as $value) { - $value['Weight'] = $value['Weights']['Passing']; - $array[] = $value; - } - if (count($array) < 2) { - $luck = $array[0]; - } else { - $luck = Luckdraw::luck($array, 'Weight'); - } - return [ - 'Address' => $luck['TaggedAddresses']['wan_ipv4']['Address'], - 'Port' => $luck['TaggedAddresses']['wan_ipv4']['Port'] - ]; - } - -} diff --git a/JsonRpcPoolTransporter.php b/JsonRpcPoolTransporter.php index 3b0b63a..fa040af 100644 --- a/JsonRpcPoolTransporter.php +++ b/JsonRpcPoolTransporter.php @@ -12,7 +12,7 @@ use Psr\Http\Message\RequestInterface; use Psr\Http\Message\ResponseInterface; use Swoole\Coroutine\Client; -class JsonRpcPoolTransporter implements RpcClientInterface +class JsonRpcPoolTransporter implements JsonRpcTransporterInterface { @@ -23,32 +23,21 @@ class JsonRpcPoolTransporter implements RpcClientInterface public ClientPool $pool; - const POOL_NAME = 'rpc.client.pool'; - - /** + * @param string $content + * @param string $service + * @return string|bool + * @throws ConfigException|RpcServiceException */ - public function init() + public function push(string $content, string $service): string|bool { - } - - - /** - * @param RequestInterface $request - * @return ResponseInterface - * @throws Exception - */ - public function sendRequest(RequestInterface $request): ResponseInterface - { - $content = $request->getBody()->getContents(); - - $client = $this->getClient(); + $client = $this->get_consul($service)->getClient(); $response = $this->request($client, $content); $this->pool->push($client, $this->config['Address'], $this->config['Port']); - return (new Response())->withBody(new Stream($response)); + return $response; } diff --git a/JsonRpcTransporter.php b/JsonRpcTransporter.php index 1e8c032..8e46e68 100644 --- a/JsonRpcTransporter.php +++ b/JsonRpcTransporter.php @@ -2,17 +2,13 @@ namespace Kiri\Rpc; -use Kiri\Message\Response; -use Kiri\Message\Stream; -use Psr\Http\Client\ClientInterface; -use Psr\Http\Message\RequestInterface; -use Psr\Http\Message\ResponseInterface; +use Exception; /** * */ -class JsonRpcTransporter implements RpcClientInterface +class JsonRpcTransporter implements JsonRpcTransporterInterface { @@ -20,19 +16,21 @@ class JsonRpcTransporter implements RpcClientInterface /** - * @param RequestInterface $request - * @return ResponseInterface - * @throws \Exception + * @param string $content + * @param string $service + * @return string|bool + * @throws RpcServiceException + * @throws Exception */ - public function sendRequest(RequestInterface $request): ResponseInterface + public function push(string $content, string $service): string|bool { - $content = $request->getBody()->getContents(); + $client = $this->get_consul($service)->newClient(); - $body = $this->request($this->newClient(), $content); + $body = $this->request($client, $content); - $response = \Kiri::getDi()->get(ResponseInterface::class); + $client->close(); - return $response->withBody(new Stream($body)); + return $body; } diff --git a/JsonRpcTransporterInterface.php b/JsonRpcTransporterInterface.php new file mode 100644 index 0000000..c2a52cc --- /dev/null +++ b/JsonRpcTransporterInterface.php @@ -0,0 +1,15 @@ +eventProvider->on(OnBeforeShutdown::class, [$this, 'onBeforeShutdown']); - scan_directory(APP_PATH . 'rpc', 'app\Rpc'); - $this->consul = Config::get('rpc.consul', null); - if (!empty($this->consul)) { - $this->eventProvider->on(OnServerBeforeStart::class, [$this, 'register']); - } +// $this->consul = Config::get('rpc.consul', null); +// if (!empty($this->consul)) { +// $this->eventProvider->on(OnServerBeforeStart::class, [$this, 'register']); +// } $this->collector = $this->dataGrip->get('rpc'); + $this->registerConsumers(); + } + + + /** + * @return void + * @throws ConfigException + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + */ + public function registerConsumers(): void + { + $consumers = Config::get('rpc.consumers', []); + if (!is_array($consumers)) { + return; + } + + $local = $this->container->get(LocalService::class); + foreach ($consumers as $consumer) { + $local->set($consumer['id'], $consumer); + } } @@ -134,10 +153,12 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa * @param int $fd * @param int $reactor_id * @param string $data + * @return bool */ - public function onReceive(Server $server, int $fd, int $reactor_id, string $data): void + public function onReceive(Server $server, int $fd, int $reactor_id, string $data): bool { try { + if (!isJson($data)) return $server->send($fd, 'success', $reactor_id); $data = json_decode($data, true); if (!is_array($data)) { throw new Exception('Parse error语法解析错误', -32700); @@ -145,11 +166,11 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa if (!isset($data['jsonrpc']) || !isset($data['method']) || $data['jsonrpc'] != '2.0') { throw new Exception('Invalid Request无效请求', -32600); } - $server->send($fd, $this->batchDispatch($data)); + return $server->send($fd, $this->batchDispatch($data), $reactor_id); } catch (\Throwable $throwable) { $this->logger->error('JsonRpc: ' . $throwable->getMessage()); $response = Json::encode($this->failure(-32700, $throwable->getMessage())); - $server->send($fd, $response); + return $server->send($fd, $response, $reactor_id); } } @@ -208,19 +229,18 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa private function dispatch($data): array { try { - $handler = $this->collector->find($data['service'], 'GET'); - if (is_integer($handler) || is_null($handler)) { + $class = $this->container->get(LocalService::class)->get($data['service']); + if (!$this->container->has($class)) { throw new Exception('Handler not found', -32601); } - $controller = $this->container->get($handler->callback[0]); + $controller = $this->container->get($class); if (!method_exists($controller, $data['method'])) { throw new Exception('Method not found', -32601); } - $params = $this->container->getArgs($data['method'], $controller::class); - - Context::setContext(RequestInterface::class, $this->createServerRequest($params)); - - return $this->handler($controller, $data['method'], $params); + if (!isset($data['params']) || !is_array($data['params'])) { + $data['params'] = []; + } + return $this->handler($controller, $data['method'], $data['params']); } catch (\Throwable $throwable) { $code = $throwable->getCode() == 0 ? -32603 : $throwable->getCode(); return $this->failure($code, jTraceEx($throwable), [], $data['id'] ?? null); @@ -228,17 +248,6 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa } - /** - * @param $params - * @return ServerRequestInterface - * @throws Exception - */ - private function createServerRequest($params): ServerRequestInterface - { - return (new ServerRequest())->withParsedBody($params); - } - - /** * @param $controller * @param string $method diff --git a/TestRpc.php b/TestRpc.php new file mode 100644 index 0000000..32333ec --- /dev/null +++ b/TestRpc.php @@ -0,0 +1,27 @@ +send(__FUNCTION__, $data, $nba); + + return json_decode($resp, true); + } + + +} diff --git a/TraitTransporter.php b/TraitTransporter.php index 34e8b11..6f81b89 100644 --- a/TraitTransporter.php +++ b/TraitTransporter.php @@ -3,6 +3,8 @@ namespace Kiri\Rpc; use Exception; +use JetBrains\PhpStorm\ArrayShape; +use Kiri\Annotation\Inject; use Kiri\Context; use Kiri\Core\Json; use Swoole\Client; @@ -12,35 +14,72 @@ trait TraitTransporter { - protected array $config; - - - protected array $clients = []; - - /** - * @param $config - * @return $this + * @var RpcManager */ - public function withConfig($config): static - { - $this->config = $config; - return $this; - } + #[Inject(RpcManager::class)] + public RpcManager $manager; + + + protected array $config; /** * @param Client|Coroutine\Client $client * @param $content - * @return mixed + * @return string|bool */ - private function request(Client|Coroutine\Client $client, $content): mixed + private function request(Client|Coroutine\Client $client, $content): string|bool { $client->send($content); return $client->recv(); } + /** + * @param string $service + * @return $this + * @throws RpcServiceException + */ + private function get_consul(string $service): static + { + if (empty($service)) { + throw new RpcServiceException('You need set rpc service name if used.'); + } + $sf = $this->manager->getServices($service); + if (empty($sf) || !is_array($sf)) { + throw new RpcServiceException('You need set rpc service name if used.'); + } + $this->config = $this->_loadRand($sf); + return $this; + } + + + /** + * @param $services + * @return array + */ + #[ArrayShape(['Address' => "mixed", 'Port' => "mixed"])] + private function _loadRand($services): array + { + $array = []; + foreach ($services as $value) { + $value['Weight'] = $value['Weights']['Passing']; + $array[] = $value; + } + if (count($array) < 2) { + $luck = $array[0]; + } else { + $luck = Luckdraw::luck($array, 'Weight'); + } + return [ + 'Address' => $luck['TaggedAddresses']['wan_ipv4']['Address'], + 'Port' => $luck['TaggedAddresses']['wan_ipv4']['Port'] + ]; + } + + + /** * @return Client|Coroutine\Client * @throws Exception @@ -59,15 +98,4 @@ trait TraitTransporter return $client; } - - /** - * @param array $config - * @return string - */ - private function alias(array $config): string - { - return $config['Address'] . '::' . $config['Port']; - } - - } diff --git a/composer.json b/composer.json index d415657..41785e3 100644 --- a/composer.json +++ b/composer.json @@ -10,8 +10,7 @@ "license": "MIT", "require": { "php": ">=8.0", - "ext-json": "*", - "psr/http-client": "^1.0" + "ext-json": "*" }, "autoload": { "psr-4": { diff --git a/config.php b/config.php index 5556d43..25c20f7 100644 --- a/config.php +++ b/config.php @@ -17,17 +17,6 @@ return [ ], 'events' => [ Constant::RECEIVE => [RpcJsonp::class, 'onReceive'] - ], - - - 'consumers' => [ - 'class' => TestRpcService::class, - 'name' => 'test-rpc', - 'package' => 'test', - 'register' => [ - 'host' => '', - 'port' => '' - ] ] ], @@ -37,6 +26,7 @@ return [ "datacenter" => "dc1", "id" => "40e4a748-2192-161a-0510-9bf59fe950b5", "node" => "FriendRpcService", + 'class' => TestRpc::class, "skipNodeUpdate" => false, "service" => [ "id" => "redis1", @@ -58,13 +48,13 @@ return [ "port" => 8000 ], "check" => [ - "node" => "t2.320", - "checkId" => "service:redis1", - "name" => "Redis health check", - "Annotations" => "Script based health check", - "status" => "passing", - "serviceID" => "redis1", - "definition" => [ + "node" => "t2.320", + "checkId" => "service:redis1", + "name" => "Redis health check", + "Annotations" => "Script based health check", + "status" => "passing", + "serviceID" => "redis1", + "definition" => [ "http" => "172.26.221.211:9527", "interval" => "5s", "timeout" => "1s",