From d72ff2f82c5b6555bb3f3059fc56ceaf1844dff8 Mon Sep 17 00:00:00 2001 From: xl Date: Sat, 20 May 2023 23:05:38 +0800 Subject: [PATCH] qqq --- AbstractRpcClient.php | 5 +- ClientPool.php | 135 ++++++++++++++++++------------------- Etcd.php | 86 ++++++++++++++++++++++- JsonRpcPoolTransporter.php | 32 ++++----- RpcManager.php | 61 ----------------- TraitTransporter.php | 13 ++-- composer.json | 14 +++- 7 files changed, 183 insertions(+), 163 deletions(-) diff --git a/AbstractRpcClient.php b/AbstractRpcClient.php index debdb9b..16908c5 100644 --- a/AbstractRpcClient.php +++ b/AbstractRpcClient.php @@ -2,10 +2,9 @@ namespace Kiri\Rpc; -use JetBrains\PhpStorm\ArrayShape; -use Kiri\Annotation\Inject; use Kiri\Core\Json; use Kiri\Core\Number; +use Kiri\Di\Inject\Container; abstract class AbstractRpcClient { @@ -20,7 +19,7 @@ abstract class AbstractRpcClient /** * @var JsonRpcTransporterInterface */ - #[Inject(JsonRpcTransporterInterface::class)] + #[Container(JsonRpcTransporterInterface::class)] private JsonRpcTransporterInterface $transporter; diff --git a/ClientPool.php b/ClientPool.php index fe56d2c..281b99e 100644 --- a/ClientPool.php +++ b/ClientPool.php @@ -5,13 +5,13 @@ namespace Kiri\Rpc; use Exception; use Kiri; use Kiri\Abstracts\Component; -use Kiri\Annotation\Inject; use Kiri\Events\EventProvider; use Kiri\Exception\ConfigException; -use Kiri\Pool\Alias; use Kiri\Pool\Pool; +use Kiri\Di\Inject\Container; use Kiri\Server\Events\OnBeforeShutdown; -use Swoole\Client; +use ReflectionException; +use Swoole\Coroutine\Client; /** @@ -20,86 +20,81 @@ use Swoole\Client; class ClientPool extends Component { - const POOL_NAME = 'rpc.client.pool'; - - use Alias; + public int $max; - public int $max; + public int $min; - public int $min; + private array $names = []; - public int $waite; + public function init(): void + { + on(OnBeforeShutdown::class, [$this, 'onBeforeShutdown']); + } - #[Inject(EventProvider::class)] - public EventProvider $provider; + /** + * @return void + * @throws Exception + */ + public function onBeforeShutdown(): void + { + $pool = Kiri::getDi()->get(Pool::class); + foreach ($this->names as $name) { + $pool->clean($name); + } + } - private array $names = []; + /** + * @param $config + * @return resource + * @throws ConfigException + * @throws ReflectionException + */ + public function get($config): mixed + { + $coroutineName = $config['Address'] . '::' . $config['Port']; + + if (!in_array($coroutineName, $this->names)) { + $this->names[] = $coroutineName; + } + + return $this->getPool($config['Address'], $config['Port'])->get($coroutineName); + } - public function init() - { - $this->provider->on(OnBeforeShutdown::class, [$this, 'onBeforeShutdown']); - } + /** + * @param Client|\Swoole\Client $client + * @param $host + * @param $port + * @throws ConfigException|ReflectionException + */ + public function push(Client|\Swoole\Client $client, $host, $port) + { + $this->getPool($host, $port)->push($host . '::' . $port, $client); + } - /** - * @return void - * @throws Exception - */ - public function onBeforeShutdown(): void - { - foreach ($this->names as $name) { - $this->getPool()->clean($name); - } - } - - - /** - * @param $config - * @param callable $callback - * @return mixed - * @throws ConfigException - * @throws Exception - */ - public function get($config, callable $callback): mixed - { - $coroutineName = $this->name(self::POOL_NAME . '::' . $config['Address'] . '::' . $config['Port'], true); - - $pool = $config['pool'] ?? ['min' => 1, 'max' => 100]; - - $this->names[] = $coroutineName; - - return $this->getPool()->get($coroutineName, $callback, $pool['min'] ?? 1); - } - - - /** - * @param \Swoole\Coroutine\Client|Client $client - * @param $host - * @param $port - * @throws ConfigException - * @throws Exception - */ - public function push(\Swoole\Coroutine\Client|Client $client, $host, $port) - { - $coroutineName = $this->name(self::POOL_NAME . '::' . $host . '::' . $port, true); - - $this->getPool()->push($coroutineName, $client); - } - - - /** - * @return Pool - * @throws Exception - */ - public function getPool(): Pool - { - return Kiri::getDi()->get(Pool::class); - } + /** + * @param $host + * @param $port + * @return Pool + * @throws ReflectionException + */ + public function getPool($host, $port): Pool + { + $pool = Kiri::getDi()->get(Pool::class); + $pool->initConnections($host . '::' . $port, 10, function () use ($host, $port) { + $client = stream_socket_client("tcp://$host:$port", $errCode, $errMessage, 3); + if ($client === false) { + throw new Exception('Connect ' . $host . '::' . $port . ' fail'); + } + return $client; + }); + return $pool; + } } diff --git a/Etcd.php b/Etcd.php index b7c6cba..552a352 100644 --- a/Etcd.php +++ b/Etcd.php @@ -1,6 +1,90 @@ client = new Client('47.92.194.207:' . 2379, 'v3'); + $this->grant = $this->client->grant(60); + if ($this->grant instanceof BadResponseException) { + throw new Exception($this->grant->getMessage()); + } + + $key = 'center.service.' . gethostbyname(gethostname()); + pcntl_signal(SIGINT, function () use ($key) { + $this->isEnd = true; + $this->client->del($key); + }); + $this->client->put($key, json_encode([ + 'address' => gethostbyname(gethostname()) . ':10240', + 'nodeId' => Str::rand(32) + ]), ['lease' => (int)$this->grant["ID"]]); + } + + + /** + * @param string $key + * @return mixed + */ + public function get(string $key): mixed + { + return $this->config[$key] ?? null; + } + + + /** + * @param string $key + * @param mixed $value + * @return void + * @throws Exception + */ + public function put(string $key, mixed $value): void + { + $result = $this->client->put($key, $value); + if ($result instanceof BadResponseException) { + throw new Exception($result->getMessage()); + } + $this->config[$key] = $value; + } + + + /** + * @return void + */ + public function waite(): void + { + while ($this->isEnd == false) { + $this->client->keepAlive((int)$this->grant["ID"]); + sleep(1); + } + } + + } \ No newline at end of file diff --git a/JsonRpcPoolTransporter.php b/JsonRpcPoolTransporter.php index fa040af..aa73895 100644 --- a/JsonRpcPoolTransporter.php +++ b/JsonRpcPoolTransporter.php @@ -2,15 +2,9 @@ namespace Kiri\Rpc; -use Kiri\Annotation\Inject; use Exception; -use Kiri\Message\Response; -use Kiri\Message\Stream; -use Kiri\Abstracts\Config; +use Kiri\Di\Inject\Container; use Kiri\Exception\ConfigException; -use Psr\Http\Message\RequestInterface; -use Psr\Http\Message\ResponseInterface; -use Swoole\Coroutine\Client; class JsonRpcPoolTransporter implements JsonRpcTransporterInterface { @@ -19,16 +13,18 @@ class JsonRpcPoolTransporter implements JsonRpcTransporterInterface use TraitTransporter; - #[Inject(ClientPool::class)] + #[Container(ClientPool::class)] public ClientPool $pool; - /** - * @param string $content - * @param string $service - * @return string|bool - * @throws ConfigException|RpcServiceException - */ + /** + * @param string $content + * @param string $service + * @return string|bool + * @throws ConfigException + * @throws RpcServiceException + * @throws \ReflectionException + */ public function push(string $content, string $service): string|bool { $client = $this->get_consul($service)->getClient(); @@ -42,16 +38,12 @@ class JsonRpcPoolTransporter implements JsonRpcTransporterInterface /** - * @return Client|\Swoole\Client * @throws ConfigException * @throws Exception */ - private function getClient(): Client|\Swoole\Client + private function getClient() { - $this->config['pool'] = Config::get('rpc.pool', ['max' => 10, 'min' => 1, 'waite' => 60]); - return $this->pool->get($this->config, function () { - return $this->newClient(); - }); + return $this->pool->get($this->config); } diff --git a/RpcManager.php b/RpcManager.php index 9a2a128..4f24dda 100644 --- a/RpcManager.php +++ b/RpcManager.php @@ -2,14 +2,8 @@ namespace Kiri\Rpc; -use Exception; use Kiri; -use Kiri\Abstracts\Config; use Kiri\Abstracts\Component; -use Kiri\Annotation\Inject; -use Kiri\Consul\Agent; -use Kiri\Consul\Health; -use Kiri\Message\Handler\Router; /** @@ -18,59 +12,4 @@ use Kiri\Message\Handler\Router; class RpcManager extends Component { - /** - * @var Health - */ - #[Inject(Health::class)] - public Health $health; - - - /** - * @param $serviceName - * @return array|null - */ - public function getServices($serviceName): ?array - { - $lists = $this->health->setQuery('passing=true')->service($serviceName); - if ($lists->getStatusCode() != 200) { - return null; - } - $body = json_decode($lists->getBody(), true); - if (empty($body)) { - return null; - } - return array_column($body, 'Service'); - } - - - /** - * @param string $name - * @param string $class - * @return bool - */ - public function add(string $name, string $class): bool - { - Router::addServer('rpc', static function () use ($name, $class) { - Router::get($name, $class); - }); - return true; - } - - - /** - * @param array $config - * @return void - */ - public function register(array $config): void - { - $agent = Kiri::getDi()->get(Agent::class); - $agent->checks->deregister($config['ID']); - $agent->service->deregister($config['ID']); - $data = $agent->service->register($config); - if ($data->getStatusCode() != 200) { - $this->logger->error($data->getBody()); - } - - } - } diff --git a/TraitTransporter.php b/TraitTransporter.php index 1be471a..58c4024 100644 --- a/TraitTransporter.php +++ b/TraitTransporter.php @@ -4,8 +4,8 @@ namespace Kiri\Rpc; use Exception; use JetBrains\PhpStorm\ArrayShape; -use Kiri\Annotation\Inject; use Kiri\Di\Context; +use Kiri\Di\Inject\Container; use Swoole\Client; use Swoole\Coroutine; @@ -16,7 +16,7 @@ trait TraitTransporter /** * @var RpcManager */ - #[Inject(RpcManager::class)] + #[Container(RpcManager::class)] public RpcManager $manager; @@ -24,13 +24,16 @@ trait TraitTransporter /** - * @param Client|Coroutine\Client $client + * @param resource $client * @param $content * @return string|bool */ - private function request(Client|Coroutine\Client $client, $content): string|bool + private function request(mixed $client, $content): string|bool { - $client->send($content); + socket_write($client, $content, mb_strlen($content)); + + socket_read($client, 1024); + return $client->recv(); } diff --git a/composer.json b/composer.json index 41785e3..09a5989 100644 --- a/composer.json +++ b/composer.json @@ -10,13 +10,21 @@ "license": "MIT", "require": { "php": ">=8.0", - "ext-json": "*" + "ext-json": "*", + "start-point/etcd-php": "^1.1" + }, + "replace": { + "symfony/polyfill-apcu": "*", + "symfony/polyfill-php80": "*", + "symfony/polyfill-mbstring": "*", + "symfony/polyfill-ctype": "*", + "symfony/polyfill-php73": "*", + "symfony/polyfill-php72": "*", + "symfony/polyfill-php81": "*" }, "autoload": { "psr-4": { "Kiri\\Rpc\\": "./" } - }, - "require-dev": { } }