This commit is contained in:
2023-10-24 17:22:31 +08:00
parent 10a64ee646
commit 0a705b27bc
7 changed files with 46 additions and 39 deletions
+33 -19
View File
@@ -5,13 +5,11 @@ namespace Kiri\Rpc;
use Exception; use Exception;
use Kiri; use Kiri;
use Kiri\Abstracts\Component; use Kiri\Abstracts\Component;
use Kiri\Events\EventProvider;
use Kiri\Exception\ConfigException;
use Kiri\Pool\Pool; use Kiri\Pool\Pool;
use Kiri\Di\Inject\Container;
use Kiri\Server\Events\OnBeforeShutdown; use Kiri\Server\Events\OnBeforeShutdown;
use ReflectionException; use ReflectionException;
use Swoole\Coroutine\Client; use Swoole\Coroutine\Client as CoroutineClient;
use Swoole\Client as AsyncClient;
/** /**
@@ -29,6 +27,9 @@ class ClientPool extends Component
private array $names = []; private array $names = [];
/**
* @return void
*/
public function init(): void public function init(): void
{ {
on(OnBeforeShutdown::class, [$this, 'onBeforeShutdown']); on(OnBeforeShutdown::class, [$this, 'onBeforeShutdown']);
@@ -51,8 +52,7 @@ class ClientPool extends Component
/** /**
* @param $config * @param $config
* @return resource * @return resource
* @throws ConfigException * @throws Exception
* @throws ReflectionException
*/ */
public function get($config): mixed public function get($config): mixed
{ {
@@ -67,34 +67,48 @@ class ClientPool extends Component
/** /**
* @param Client|\Swoole\Client $client * @param CoroutineClient|AsyncClient $client
* @param $host * @param $host
* @param $port * @param $port
* @throws ConfigException|ReflectionException * @throws Exception
*/ */
public function push(Client|\Swoole\Client $client, $host, $port) public function push(CoroutineClient|AsyncClient $client, $host, $port)
{ {
$this->getPool($host, $port)->push($host . '::' . $port, $client); $this->getPool($host, $port)->push($host . '::' . $port, $client);
} }
/** /**
* @param $host * @param string $host
* @param $port * @param int $port
* @return Pool * @return Pool
* @throws ReflectionException * @throws ReflectionException
*/ */
public function getPool($host, $port): Pool public function getPool(string $host, int $port): Pool
{ {
$pool = Kiri::getDi()->get(Pool::class); $pool = Kiri::getDi()->get(Pool::class);
$pool->created($host . '::' . $port, 10, function () use ($host, $port) { $pool->created($host . '::' . $port, 10, [$this, 'connect']);
$client = stream_socket_client("tcp://$host:$port", $errCode, $errMessage, 3);
if ($client === false) {
throw new Exception('Connect ' . $host . '::' . $port . ' fail');
}
return $client;
});
return $pool; return $pool;
} }
/**
* @param $host
* @param $port
* @return CoroutineClient|AsyncClient
* @throws Exception
*/
public function connect($host, $port): CoroutineClient|AsyncClient
{
if (Kiri\Di\Context::inCoroutine()) {
$client = new CoroutineClient(SWOOLE_SOCK_TCP);
} else {
$client = new AsyncClient(SWOOLE_SOCK_TCP);
}
if (!$client->connect($host, $port, 3)) {
throw new Exception('Connect ' . $host . '::' . $port . ' fail');
}
return $client;
}
} }
+2 -3
View File
@@ -5,6 +5,7 @@ namespace Kiri\Rpc;
use Exception; use Exception;
use Kiri\Di\Inject\Container; use Kiri\Di\Inject\Container;
use Kiri\Exception\ConfigException; use Kiri\Exception\ConfigException;
use ReflectionException;
class JsonRpcPoolTransporter implements JsonRpcTransporterInterface class JsonRpcPoolTransporter implements JsonRpcTransporterInterface
{ {
@@ -21,9 +22,8 @@ class JsonRpcPoolTransporter implements JsonRpcTransporterInterface
* @param string $content * @param string $content
* @param string $service * @param string $service
* @return string|bool * @return string|bool
* @throws ConfigException
* @throws RpcServiceException * @throws RpcServiceException
* @throws \ReflectionException * @throws ReflectionException
*/ */
public function push(string $content, string $service): string|bool public function push(string $content, string $service): string|bool
{ {
@@ -38,7 +38,6 @@ class JsonRpcPoolTransporter implements JsonRpcTransporterInterface
/** /**
* @throws ConfigException
* @throws Exception * @throws Exception
*/ */
private function getClient() private function getClient()
-1
View File
@@ -9,7 +9,6 @@ use Kiri\Di\LocalService;
use Kiri\Abstracts\Component; use Kiri\Abstracts\Component;
use Kiri\Core\Json; use Kiri\Core\Json;
use Kiri\Events\EventProvider; use Kiri\Events\EventProvider;
use Kiri\Exception\ConfigException;
use Kiri\Server\Contract\OnCloseInterface; use Kiri\Server\Contract\OnCloseInterface;
use Kiri\Server\Contract\OnConnectInterface; use Kiri\Server\Contract\OnConnectInterface;
use Kiri\Server\Contract\OnReceiveInterface; use Kiri\Server\Contract\OnReceiveInterface;
-6
View File
@@ -26,12 +26,6 @@ class RpcProcess extends BaseProcess
*/ */
public function process(?Process $process): void public function process(?Process $process): void
{ {
// TODO: Implement process() method.
while (true) {
$read = $process->read();
}
} }
-2
View File
@@ -1,8 +1,6 @@
<?php <?php
use Kiri\Rpc\AbstractRpcClient; use Kiri\Rpc\AbstractRpcClient;
use Kiri\Rpc\Annotation\JsonRpc;
use Kiri\Rpc\JsonRpcTransporterInterface;
class TestRpc extends AbstractRpcClient class TestRpc extends AbstractRpcClient
+6 -6
View File
@@ -28,13 +28,13 @@ trait TraitTransporter
* @param $content * @param $content
* @return string|bool * @return string|bool
*/ */
private function request(mixed $client, $content): string|bool protected function request(mixed $client, $content): string|bool
{ {
socket_write($client, $content, mb_strlen($content)); socket_write($client, \msgpack_pack($content), mb_strlen($content));
socket_read($client, 1024); socket_read($client, 1024);
return $client->recv(); return \msgpack_unpack($client->recv());
} }
@@ -43,7 +43,7 @@ trait TraitTransporter
* @return $this * @return $this
* @throws RpcServiceException * @throws RpcServiceException
*/ */
private function get_consul(string $service): static protected function get_consul(string $service): static
{ {
if (empty($service)) { if (empty($service)) {
throw new RpcServiceException('You need set rpc service name if used.'); throw new RpcServiceException('You need set rpc service name if used.');
@@ -62,7 +62,7 @@ trait TraitTransporter
* @return array * @return array
*/ */
#[ArrayShape(['Address' => "mixed", 'Port' => "mixed"])] #[ArrayShape(['Address' => "mixed", 'Port' => "mixed"])]
private function _loadRand($services): array protected function _loadRand($services): array
{ {
$array = []; $array = [];
foreach ($services as $value) { foreach ($services as $value) {
@@ -86,7 +86,7 @@ trait TraitTransporter
* @return Client|Coroutine\Client * @return Client|Coroutine\Client
* @throws Exception * @throws Exception
*/ */
private function newClient(): Coroutine\Client|Client protected function newClient(): Coroutine\Client|Client
{ {
if (Context::inCoroutine()) { if (Context::inCoroutine()) {
$client = new Coroutine\Client(SWOOLE_SOCK_TCP); $client = new Coroutine\Client(SWOOLE_SOCK_TCP);
+4 -1
View File
@@ -11,7 +11,10 @@
"require": { "require": {
"php": ">=8.0", "php": ">=8.0",
"ext-json": "*", "ext-json": "*",
"start-point/etcd-php": "^1.1" "ext-msgpack": "*",
"start-point/etcd-php": "^1.1",
"game-worker/kiri-pool": "~v1.0",
"linkorb/etcd-php": "^1.6"
}, },
"replace": { "replace": {
"symfony/polyfill-apcu": "*", "symfony/polyfill-apcu": "*",