This commit is contained in:
2021-10-28 18:24:46 +08:00
parent d29020f065
commit c7f4b8a0f7
8 changed files with 292 additions and 34 deletions
+3 -1
View File
@@ -11,7 +11,9 @@
"require": {
"php": ">=8.0",
"ext-json": "*",
"game-worker/kiri-consul": "dev-master"
"game-worker/kiri-consul": "dev-master",
"psr/http-client": "^1.0",
"psr/http-message": "^1.0"
},
"autoload": {
"psr-4": {
+25
View File
@@ -0,0 +1,25 @@
<?php
namespace Kiri\Rpc;
use Kiri\Pool\Pool;
/**
*
*/
class ClientPool extends Pool
{
const POOL_NAME = 'rpc.client.pool';
public int $max;
public int $min;
public int $waite;
}
+47 -33
View File
@@ -4,9 +4,14 @@ namespace Kiri\Rpc;
use Exception;
use Kiri\Context;
use Http\Message\ServerRequest;
use Http\Message\Stream;
use Kiri\Core\Number;
use Kiri\Kiri;
use Kiri\Pool\Pool;
use Psr\Http\Client\ClientExceptionInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use Swoole\Client;
use Swoole\Coroutine;
@@ -31,18 +36,31 @@ abstract class JsonRpcConsumers implements OnRpcConsumerInterface
* @param mixed $data
* @param string $version
* @throws Exception
* @throws ClientExceptionInterface
*/
public function notify(string $method, mixed $data, string $version = '2.0'): void
{
$config = $this->get_consul($this->name);
if (Context::inCoroutine()) {
$client = $this->clientOnCoroutine($config);
} else {
$client = $this->clientNotCoroutine($config);
}
$client->send(json_encode(['jsonrpc' => $version, 'service' => $this->name, 'method' => $method, 'params' => $data]));
$client->recv(1);
$client->close();
$transporter = Kiri::getDi()->get(RpcClientInterface::class);
$transporter->withConfig($config)->sendRequest(
$this->requestBody([
'jsonrpc' => $version,
'service' => $this->name,
'method' => $method,
'params' => $data,
])
);
}
/**
* @param array $data
* @return ServerRequestInterface
*/
private function requestBody(array $data): ServerRequestInterface
{
$server = Kiri::getDi()->get(ServerRequest::class);
return $server->withBody(new Stream(json_encode($data)));
}
@@ -53,43 +71,39 @@ abstract class JsonRpcConsumers implements OnRpcConsumerInterface
* @param string $id
* @return mixed
* @throws Exception
* @throws ClientExceptionInterface
*/
public function get(string $method, mixed $data, string $version = '2.0', string $id = ''): mixed
public function get(string $method, mixed $data, string $version = '2.0', string $id = ''): ResponseInterface
{
$config = $this->get_consul($this->name);
if (Context::inCoroutine()) {
$client = $this->clientOnCoroutine($config);
} else {
$client = $this->clientNotCoroutine($config);
}
if (empty($id)) $id = Number::create(time());
$client->send(json_encode(['jsonrpc' => $version, 'service' => $this->name, 'method' => $method, 'params' => $data, 'id' => $id]));
$read = $client->recv();
$client->close();
return json_decode($read, true);
$config = $this->get_consul($this->name);
$transporter = Kiri::getDi()->get(RpcClientInterface::class);
return $transporter->withConfig($config)->sendRequest(
$this->requestBody([
'jsonrpc' => $version,
'service' => $this->name,
'method' => $method,
'params' => $data,
'id' => $id
])
);
}
/**
* @param string $service
* @param array $data
* @return mixed
* @throws ClientExceptionInterface
* @throws Exception
*/
public function batch(string $service, array $data): mixed
public function batch(array $data): mixed
{
$config = $this->get_consul($service);
if (Context::inCoroutine()) {
$client = $this->clientOnCoroutine($config);
} else {
$client = $this->clientNotCoroutine($config);
}
$client->send(json_encode($data, true));
$read = $client->recv();
$client->close();
return json_decode($read, true);
$config = $this->get_consul($this->name);
$transporter = Kiri::getDi()->get(RpcClientInterface::class);
return $transporter->withConfig($config)->sendRequest(
$this->requestBody($data)
);
}
+68
View File
@@ -0,0 +1,68 @@
<?php
namespace Kiri\Rpc;
use Exception;
use Http\Message\Response;
use Http\Message\Stream;
use Kiri\Abstracts\Config;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use Psr\Http\Client\ClientInterface;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use Swoole\Coroutine\Client;
class JsonRpcPoolTransporter implements ClientInterface
{
use TraitTransporter;
public ClientPool $pool;
const POOL_NAME = 'rpc.client.pool';
/**
* @throws ConfigException
*/
public function init()
{
$config = Config::get('rpc.pool', null);
$this->pool = Kiri::getDi()->get(ClientPool::class, [], $config);
$this->pool->initConnections(self::POOL_NAME, true, $config['max']);
}
/**
* @param RequestInterface $request
* @return ResponseInterface
* @throws Exception
*/
public function sendRequest(RequestInterface $request): ResponseInterface
{
$content = $request->getBody()->getContents();
return (new Response())->withBody(
new Stream($this->request($this->getClient(), $content))
);
}
/**
* @return Client|\Swoole\Client
* @throws ConfigException
* @throws Exception
*/
private function getClient(): Client|\Swoole\Client
{
return $this->pool->get(self::POOL_NAME, function () {
return $this->newClient();
});
}
}
+36
View File
@@ -0,0 +1,36 @@
<?php
namespace Kiri\Rpc;
use Http\Message\Response;
use Http\Message\Stream;
use Psr\Http\Client\ClientInterface;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
/**
*
*/
class JsonRpcTransporter implements ClientInterface
{
use TraitTransporter;
/**
* @param RequestInterface $request
* @return ResponseInterface
* @throws \Exception
*/
public function sendRequest(RequestInterface $request): ResponseInterface
{
$content = $request->getBody()->getContents();
return (new Response())->withBody(
new Stream($this->request($this->newClient(), $content))
);
}
}
+14
View File
@@ -0,0 +1,14 @@
<?php
namespace Kiri\Rpc;
use Psr\Http\Client\ClientInterface;
/**
* @mixin JsonRpcTransporter
*/
interface RpcClientInterface extends ClientInterface
{
}
+12
View File
@@ -6,6 +6,8 @@ use Annotation\Annotation;
use Annotation\Inject;
use Http\Handler\Router;
use Kiri\Abstracts\Component;
use Kiri\Abstracts\Config;
use Kiri\Di\ContainerInterface;
use Kiri\Di\NoteManager;
use Kiri\Kiri;
use Server\SInterface\OnCloseInterface;
@@ -31,6 +33,10 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
public Annotation $annotation;
#[Inject(ContainerInterface::class)]
public ContainerInterface $container;
/**
*
* @throws \Exception
@@ -56,6 +62,12 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
}
}
}
$config = Config::get('rpc.pool', null);
if (!is_null($config)) {
$this->container->mapping(RpcClientInterface::class, JsonRpcPoolTransporter::class);
} else {
$this->container->mapping(RpcClientInterface::class, JsonRpcTransporter::class);
}
}
+87
View File
@@ -0,0 +1,87 @@
<?php
namespace Kiri\Rpc;
use Exception;
use Kiri\Context;
use Swoole\Client;
use Swoole\Coroutine;
trait TraitTransporter
{
protected array $config;
/**
* @param $config
* @return $this
*/
public function withConfig($config): static
{
$this->config = $config;
return $this;
}
/**
* @param Client|Coroutine\Client $client
* @param $content
* @return mixed
*/
private function request(Client|Coroutine\Client $client, $content): mixed
{
$client->send($content);
$read = $client->recv();
$client->close();
return $read;
}
/**
* @return Client|Coroutine\Client
* @throws Exception
*/
private function newClient(): Coroutine\Client|Client
{
if (Context::inCoroutine()) {
$client = $this->clientOnCoroutine($this->config);
} else {
$client = $this->clientNotCoroutine($this->config);
}
return $client;
}
/**
* @param $config
* @return Coroutine\Client
* @throws Exception
*/
private function clientOnCoroutine($config): Coroutine\Client
{
$client = new Coroutine\Client(SWOOLE_SOCK_TCP);
if (!$client->connect($config['ServiceAddress'], $config['ServicePort'], 60)) {
throw new Exception('connect fail.');
}
return $client;
}
/**
* @param $config
* @return Client
* @throws Exception
*/
private function clientNotCoroutine($config): Client
{
$client = new Client(SWOOLE_SOCK_TCP);
if (!$client->connect($config['ServiceAddress'], $config['ServicePort'], 60)) {
throw new Exception('connect fail.');
}
return $client;
}
}