This commit is contained in:
2021-10-28 18:31:25 +08:00
parent c7f4b8a0f7
commit d52920418a
5 changed files with 15 additions and 49 deletions
-30
View File
@@ -128,34 +128,4 @@ abstract class JsonRpcConsumers implements OnRpcConsumerInterface
return ['ServiceAddress' => '127.0.0.1', 'ServicePort' => 9526];
}
/**
* @param $config
* @return Coroutine\Client
* @throws Exception
*/
private function clientOnCoroutine($config): Coroutine\Client
{
$client = new Coroutine\Client(SWOOLE_SOCK_TCP);
if (!$client->connect($config['ServiceAddress'], $config['ServicePort'], 60)) {
throw new Exception('connect fail.');
}
return $client;
}
/**
* @param $config
* @return Client
* @throws Exception
*/
private function clientNotCoroutine($config): Client
{
$client = new Client(SWOOLE_SOCK_TCP);
if (!$client->connect($config['ServiceAddress'], $config['ServicePort'], 60)) {
throw new Exception('connect fail.');
}
return $client;
}
}
+6 -3
View File
@@ -46,9 +46,12 @@ class JsonRpcPoolTransporter implements ClientInterface
public function sendRequest(RequestInterface $request): ResponseInterface
{
$content = $request->getBody()->getContents();
return (new Response())->withBody(
new Stream($this->request($this->getClient(), $content))
);
$response = $this->request($client = $this->newClient(), $content, false);
$this->pool->push(self::POOL_NAME, $client);
return (new Response())->withBody(new Stream($response));
}
+4 -3
View File
@@ -27,9 +27,10 @@ class JsonRpcTransporter implements ClientInterface
public function sendRequest(RequestInterface $request): ResponseInterface
{
$content = $request->getBody()->getContents();
return (new Response())->withBody(
new Stream($this->request($this->newClient(), $content))
);
$response = $this->request($this->newClient(), $content, true);
return (new Response())->withBody(new Stream($response));
}
-11
View File
@@ -32,11 +32,6 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
#[Inject(Annotation::class)]
public Annotation $annotation;
#[Inject(ContainerInterface::class)]
public ContainerInterface $container;
/**
*
* @throws \Exception
@@ -62,12 +57,6 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
}
}
}
$config = Config::get('rpc.pool', null);
if (!is_null($config)) {
$this->container->mapping(RpcClientInterface::class, JsonRpcPoolTransporter::class);
} else {
$this->container->mapping(RpcClientInterface::class, JsonRpcTransporter::class);
}
}
+5 -2
View File
@@ -28,13 +28,16 @@ trait TraitTransporter
/**
* @param Client|Coroutine\Client $client
* @param $content
* @param bool $isClose
* @return mixed
*/
private function request(Client|Coroutine\Client $client, $content): mixed
private function request(Client|Coroutine\Client $client, $content, bool $isClose): mixed
{
$client->send($content);
$read = $client->recv();
$client->close();
if ($isClose) {
$client->close();
}
return $read;
}