diff --git a/ClientPool.php b/ClientPool.php index 5f6baf2..0bede9c 100644 --- a/ClientPool.php +++ b/ClientPool.php @@ -5,13 +5,11 @@ namespace Kiri\Rpc; use Exception; use Kiri; use Kiri\Abstracts\Component; -use Kiri\Events\EventProvider; -use Kiri\Exception\ConfigException; use Kiri\Pool\Pool; -use Kiri\Di\Inject\Container; use Kiri\Server\Events\OnBeforeShutdown; use ReflectionException; -use Swoole\Coroutine\Client; +use Swoole\Coroutine\Client as CoroutineClient; +use Swoole\Client as AsyncClient; /** @@ -29,6 +27,9 @@ class ClientPool extends Component private array $names = []; + /** + * @return void + */ public function init(): void { on(OnBeforeShutdown::class, [$this, 'onBeforeShutdown']); @@ -51,8 +52,7 @@ class ClientPool extends Component /** * @param $config * @return resource - * @throws ConfigException - * @throws ReflectionException + * @throws Exception */ public function get($config): mixed { @@ -67,34 +67,48 @@ class ClientPool extends Component /** - * @param Client|\Swoole\Client $client + * @param CoroutineClient|AsyncClient $client * @param $host * @param $port - * @throws ConfigException|ReflectionException + * @throws Exception */ - public function push(Client|\Swoole\Client $client, $host, $port) + public function push(CoroutineClient|AsyncClient $client, $host, $port) { $this->getPool($host, $port)->push($host . '::' . $port, $client); } /** - * @param $host - * @param $port + * @param string $host + * @param int $port * @return Pool * @throws ReflectionException */ - public function getPool($host, $port): Pool + public function getPool(string $host, int $port): Pool { $pool = Kiri::getDi()->get(Pool::class); - $pool->created($host . '::' . $port, 10, function () use ($host, $port) { - $client = stream_socket_client("tcp://$host:$port", $errCode, $errMessage, 3); - if ($client === false) { - throw new Exception('Connect ' . $host . '::' . $port . ' fail'); - } - return $client; - }); + $pool->created($host . '::' . $port, 10, [$this, 'connect']); return $pool; } + + /** + * @param $host + * @param $port + * @return CoroutineClient|AsyncClient + * @throws Exception + */ + public function connect($host, $port): CoroutineClient|AsyncClient + { + if (Kiri\Di\Context::inCoroutine()) { + $client = new CoroutineClient(SWOOLE_SOCK_TCP); + } else { + $client = new AsyncClient(SWOOLE_SOCK_TCP); + } + if (!$client->connect($host, $port, 3)) { + throw new Exception('Connect ' . $host . '::' . $port . ' fail'); + } + return $client; + } + } diff --git a/JsonRpcPoolTransporter.php b/JsonRpcPoolTransporter.php index aa73895..2cf7c69 100644 --- a/JsonRpcPoolTransporter.php +++ b/JsonRpcPoolTransporter.php @@ -5,6 +5,7 @@ namespace Kiri\Rpc; use Exception; use Kiri\Di\Inject\Container; use Kiri\Exception\ConfigException; +use ReflectionException; class JsonRpcPoolTransporter implements JsonRpcTransporterInterface { @@ -21,9 +22,8 @@ class JsonRpcPoolTransporter implements JsonRpcTransporterInterface * @param string $content * @param string $service * @return string|bool - * @throws ConfigException * @throws RpcServiceException - * @throws \ReflectionException + * @throws ReflectionException */ public function push(string $content, string $service): string|bool { @@ -38,8 +38,7 @@ class JsonRpcPoolTransporter implements JsonRpcTransporterInterface /** - * @throws ConfigException - * @throws Exception + * @throws Exception */ private function getClient() { diff --git a/RpcJsonp.php b/RpcJsonp.php index f43f19a..4991192 100644 --- a/RpcJsonp.php +++ b/RpcJsonp.php @@ -9,7 +9,6 @@ use Kiri\Di\LocalService; use Kiri\Abstracts\Component; use Kiri\Core\Json; use Kiri\Events\EventProvider; -use Kiri\Exception\ConfigException; use Kiri\Server\Contract\OnCloseInterface; use Kiri\Server\Contract\OnConnectInterface; use Kiri\Server\Contract\OnReceiveInterface; diff --git a/RpcProcess.php b/RpcProcess.php index 30530da..7726aa5 100644 --- a/RpcProcess.php +++ b/RpcProcess.php @@ -26,12 +26,6 @@ class RpcProcess extends BaseProcess */ public function process(?Process $process): void { - // TODO: Implement process() method. - while (true) { - $read = $process->read(); - - - } } diff --git a/TestRpc.php b/TestRpc.php index 32333ec..ceacf16 100644 --- a/TestRpc.php +++ b/TestRpc.php @@ -1,8 +1,6 @@ recv(); + return \msgpack_unpack($client->recv()); } @@ -43,7 +43,7 @@ trait TraitTransporter * @return $this * @throws RpcServiceException */ - private function get_consul(string $service): static + protected function get_consul(string $service): static { if (empty($service)) { throw new RpcServiceException('You need set rpc service name if used.'); @@ -62,7 +62,7 @@ trait TraitTransporter * @return array */ #[ArrayShape(['Address' => "mixed", 'Port' => "mixed"])] - private function _loadRand($services): array + protected function _loadRand($services): array { $array = []; foreach ($services as $value) { @@ -86,7 +86,7 @@ trait TraitTransporter * @return Client|Coroutine\Client * @throws Exception */ - private function newClient(): Coroutine\Client|Client + protected function newClient(): Coroutine\Client|Client { if (Context::inCoroutine()) { $client = new Coroutine\Client(SWOOLE_SOCK_TCP); diff --git a/composer.json b/composer.json index 09a5989..e27c238 100644 --- a/composer.json +++ b/composer.json @@ -11,7 +11,10 @@ "require": { "php": ">=8.0", "ext-json": "*", - "start-point/etcd-php": "^1.1" + "ext-msgpack": "*", + "start-point/etcd-php": "^1.1", + "game-worker/kiri-pool": "~v1.0", + "linkorb/etcd-php": "^1.6" }, "replace": { "symfony/polyfill-apcu": "*",