modify plugin name
This commit is contained in:
+19
-10
@@ -12,6 +12,7 @@ use Kiri\Pool\Pool;
|
||||
use Psr\Http\Client\ClientExceptionInterface;
|
||||
use Psr\Http\Message\ResponseInterface;
|
||||
use Psr\Http\Message\ServerRequestInterface;
|
||||
use Kiri\Annotation\Inject;
|
||||
|
||||
/**
|
||||
*
|
||||
@@ -26,6 +27,20 @@ abstract class JsonRpcConsumers implements OnRpcConsumerInterface
|
||||
public Pool $pool;
|
||||
|
||||
|
||||
/**
|
||||
* @var RpcManager
|
||||
*/
|
||||
#[Inject(RpcManager::class)]
|
||||
public RpcManager $manager;
|
||||
|
||||
|
||||
/**
|
||||
* @var RpcClientInterface
|
||||
*/
|
||||
#[Inject(RpcClientInterface::class)]
|
||||
public RpcClientInterface $client;
|
||||
|
||||
|
||||
protected string $name = '';
|
||||
|
||||
|
||||
@@ -38,9 +53,7 @@ abstract class JsonRpcConsumers implements OnRpcConsumerInterface
|
||||
*/
|
||||
public function notify(string $method, mixed $data, string $version = '2.0'): void
|
||||
{
|
||||
$config = $this->get_consul($this->name);
|
||||
$transporter = Kiri::getDi()->get(RpcClientInterface::class);
|
||||
$transporter->withConfig($config)->sendRequest(
|
||||
$this->client->withConfig($this->get_consul($this->name))->sendRequest(
|
||||
$this->requestBody([
|
||||
'jsonrpc' => $version,
|
||||
'service' => $this->name,
|
||||
@@ -75,9 +88,7 @@ abstract class JsonRpcConsumers implements OnRpcConsumerInterface
|
||||
{
|
||||
if (empty($id)) $id = Number::create(time());
|
||||
|
||||
$config = $this->get_consul($this->name);
|
||||
$transporter = Kiri::getDi()->get(RpcClientInterface::class);
|
||||
return $transporter->withConfig($config)->sendRequest(
|
||||
return $this->client->withConfig($this->get_consul($this->name))->sendRequest(
|
||||
$this->requestBody([
|
||||
'jsonrpc' => $version,
|
||||
'service' => $this->name,
|
||||
@@ -97,9 +108,7 @@ abstract class JsonRpcConsumers implements OnRpcConsumerInterface
|
||||
*/
|
||||
public function batch(array $data): mixed
|
||||
{
|
||||
$config = $this->get_consul($this->name);
|
||||
$transporter = Kiri::getDi()->get(RpcClientInterface::class);
|
||||
return $transporter->withConfig($config)->sendRequest(
|
||||
return $this->client->withConfig($this->get_consul($this->name))->sendRequest(
|
||||
$this->requestBody($data)
|
||||
);
|
||||
}
|
||||
@@ -116,7 +125,7 @@ abstract class JsonRpcConsumers implements OnRpcConsumerInterface
|
||||
if (empty($service)) {
|
||||
throw new RpcServiceException('You need set rpc service name if used.');
|
||||
}
|
||||
$sf = Kiri::getDi()->get(RpcManager::class)->getServices($service);
|
||||
$sf = $this->manager->getServices($service);
|
||||
if (empty($sf) || !is_array($sf)) {
|
||||
throw new RpcServiceException('You need set rpc service name if used.');
|
||||
}
|
||||
|
||||
+13
-32
@@ -5,6 +5,7 @@ namespace Kiri\Rpc;
|
||||
use Exception;
|
||||
use Kiri;
|
||||
use Kiri\Abstracts\Component;
|
||||
use Kiri\Annotation\Inject;
|
||||
use Kiri\Consul\Agent;
|
||||
use Kiri\Consul\Health;
|
||||
use Kiri\Message\Handler\Handler;
|
||||
@@ -20,27 +21,8 @@ class RpcManager extends Component
|
||||
private array $_rpc = [];
|
||||
|
||||
|
||||
/**
|
||||
* @param $serviceName
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
public function async($serviceName): void
|
||||
{
|
||||
$this->reRegister($serviceName);
|
||||
|
||||
$lists = Kiri::getDi()->get(Health::class)->setQuery('passing=true')->service($serviceName);
|
||||
if ($lists->getStatusCode() != 200) {
|
||||
return;
|
||||
}
|
||||
$body = json_decode($lists->getBody(), true);
|
||||
$file = storage('.rpc.clients.' . md5($serviceName), 'rpc');
|
||||
if (!empty($body) && is_array($body)) {
|
||||
file_put_contents($file, json_encode(array_column($body, 'Service')), LOCK_EX);
|
||||
} else {
|
||||
file_put_contents($file, json_encode([]), LOCK_EX);
|
||||
}
|
||||
}
|
||||
#[Inject(Health::class)]
|
||||
public Health $health;
|
||||
|
||||
|
||||
/**
|
||||
@@ -71,7 +53,7 @@ class RpcManager extends Component
|
||||
{
|
||||
try {
|
||||
foreach ($this->_rpc as $name => $list) {
|
||||
$this->async($name);
|
||||
$this->reRegister($name);
|
||||
}
|
||||
} catch (\Throwable $throwable) {
|
||||
$this->logger->error(error_trigger_format($throwable));
|
||||
@@ -81,20 +63,19 @@ class RpcManager extends Component
|
||||
|
||||
/**
|
||||
* @param $serviceName
|
||||
* @return array
|
||||
* @throws Exception
|
||||
* @return array|null
|
||||
*/
|
||||
public function getServices($serviceName): array
|
||||
public function getServices($serviceName): ?array
|
||||
{
|
||||
$file = storage('.rpc.clients.' . md5($serviceName), 'rpc');
|
||||
if (!file_exists($file) || filesize($file) < 10) {
|
||||
$this->async($serviceName);
|
||||
$lists = $this->health->setQuery('passing=true')->service($serviceName);
|
||||
if ($lists->getStatusCode() != 200) {
|
||||
return null;
|
||||
}
|
||||
$content = json_decode(file_get_contents($file), true);
|
||||
if (empty($content) || !is_array($content)) {
|
||||
return [];
|
||||
$body = json_decode($lists->getBody(), true);
|
||||
if (empty($body)) {
|
||||
return null;
|
||||
}
|
||||
return $content;
|
||||
return array_column($body, 'Service');
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user