Compare commits
20 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 8824f711bb | |||
| f2d97832b5 | |||
| 855e57a651 | |||
| 0b3310b82e | |||
| e8cf7f29f0 | |||
| 62e3b001f0 | |||
| f655cebdf3 | |||
| ee492d97ad | |||
| a16d841d98 | |||
| f015662fd6 | |||
| 021dcae59b | |||
| c6dc34c821 | |||
| 88c3958aba | |||
| 24ebfac360 | |||
| b3c200b443 | |||
| 3e2b50bf60 | |||
| 89c142930e | |||
| 1515d9695c | |||
| f1f7b1081d | |||
| 07533e29f9 |
@@ -4,6 +4,7 @@ namespace Kiri\Rpc;
|
||||
|
||||
|
||||
|
||||
use JetBrains\PhpStorm\Pure;
|
||||
use Throwable;
|
||||
|
||||
class InvalidRpcParamsException extends \Exception
|
||||
@@ -15,7 +16,7 @@ class InvalidRpcParamsException extends \Exception
|
||||
* @param int $code
|
||||
* @param Throwable|null $previous
|
||||
*/
|
||||
public function __construct($message = "", $code = 0, Throwable $previous = null)
|
||||
#[Pure] public function __construct(string $message = "", int $code = 0, Throwable $previous = null)
|
||||
{
|
||||
parent::__construct($message, -32602, $previous);
|
||||
}
|
||||
|
||||
@@ -54,7 +54,6 @@ abstract class JsonRpcConsumers implements OnRpcConsumerInterface
|
||||
/**
|
||||
* @param array $data
|
||||
* @return ServerRequestInterface
|
||||
* @throws \ReflectionException
|
||||
*/
|
||||
private function requestBody(array $data): ServerRequestInterface
|
||||
{
|
||||
|
||||
+33
-25
@@ -2,30 +2,29 @@
|
||||
|
||||
namespace Kiri\Rpc;
|
||||
|
||||
use Kiri;
|
||||
use Kiri\Abstracts\Component;
|
||||
use Kiri\Abstracts\Config;
|
||||
use Kiri\Annotation\Annotation;
|
||||
use Kiri\Annotation\Inject;
|
||||
use Kiri\Consul\Agent;
|
||||
use Kiri\Context;
|
||||
use Kiri\Exception\ConfigException;
|
||||
use Kiri\Message\Constrict\RequestInterface;
|
||||
use Kiri\Message\Handler\Handler;
|
||||
use Kiri\Message\Handler\Router;
|
||||
use Kiri\Message\ServerRequest;
|
||||
use Kiri\Abstracts\Component;
|
||||
use Kiri\Abstracts\Config;
|
||||
use Kiri\Consul\Agent;
|
||||
use Kiri\Context;
|
||||
use Kiri\Events\EventProvider;
|
||||
use Kiri\Exception\ConfigException;
|
||||
use Kiri;
|
||||
use Kiri\Annotation\Inject;
|
||||
use Kiri\Annotation\Annotation;
|
||||
use Psr\Container\ContainerExceptionInterface;
|
||||
use Psr\Container\NotFoundExceptionInterface;
|
||||
use Psr\Http\Message\ServerRequestInterface;
|
||||
use ReflectionException;
|
||||
use Kiri\Server\Contract\OnCloseInterface;
|
||||
use Kiri\Server\Contract\OnConnectInterface;
|
||||
use Kiri\Server\Contract\OnReceiveInterface;
|
||||
use Kiri\Server\Events\OnBeforeShutdown;
|
||||
use Kiri\Server\Events\OnServerBeforeStart;
|
||||
use Kiri\Server\Events\OnTaskerStart;
|
||||
use Kiri\Server\Events\OnWorkerExit;
|
||||
use Kiri\Server\Events\OnWorkerStart;
|
||||
use Psr\Container\ContainerExceptionInterface;
|
||||
use Psr\Container\NotFoundExceptionInterface;
|
||||
use Psr\Http\Message\ServerRequestInterface;
|
||||
use Swoole\Coroutine;
|
||||
use Swoole\Coroutine\Channel;
|
||||
use Swoole\Server;
|
||||
@@ -47,21 +46,24 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
|
||||
public Annotation $annotation;
|
||||
|
||||
|
||||
|
||||
private RpcManager $manager;
|
||||
|
||||
|
||||
private int $timerId;
|
||||
|
||||
/**
|
||||
*
|
||||
* @throws \Exception
|
||||
*/
|
||||
public function init(): void
|
||||
{
|
||||
$this->eventProvider->on(OnBeforeShutdown::class, [$this, 'onBeforeShutdown']);
|
||||
$this->getEventProvider()->on(OnBeforeShutdown::class, [$this, 'onBeforeShutdown']);
|
||||
|
||||
scan_directory(APP_PATH . 'rpc', 'Rpc');
|
||||
scan_directory(APP_PATH . 'rpc', 'app\Rpc');
|
||||
|
||||
$this->eventProvider->on(OnWorkerStart::class, [$this, 'consulWatches']);
|
||||
$this->eventProvider->on(OnServerBeforeStart::class, [$this, 'register']);
|
||||
$this->getEventProvider()->on(OnWorkerStart::class, [$this, 'consulWatches']);
|
||||
$this->getEventProvider()->on(OnWorkerExit::class, [$this, 'onWorkerExit']);
|
||||
$this->getEventProvider()->on(OnServerBeforeStart::class, [$this, 'register']);
|
||||
|
||||
$this->manager = Kiri::getDi()->get(RpcManager::class);
|
||||
}
|
||||
@@ -76,7 +78,7 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
|
||||
public function onBeforeShutdown(OnBeforeShutdown $beforeShutdown)
|
||||
{
|
||||
$doneList = $this->manager->doneList();
|
||||
$agent = $this->container->get(Agent::class);
|
||||
$agent = $this->getContainer()->get(Agent::class);
|
||||
foreach ($doneList as $value) {
|
||||
$agent->service->deregister($value['config']['ID']);
|
||||
$agent->checks->deregister($value['config']['Check']['CheckId']);
|
||||
@@ -94,16 +96,22 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
|
||||
return;
|
||||
}
|
||||
$async_time = (int)Config::get('consul.async_time', 1000);
|
||||
Timer::tick(5000, static function ($timeId) {
|
||||
if (env('state', 'start') == 'exit') {
|
||||
Timer::clear($timeId);
|
||||
return;
|
||||
}
|
||||
$this->timerId = Timer::tick($async_time, static function () {
|
||||
Kiri::getDi()->get(RpcManager::class)->tick();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param OnWorkerExit $exit
|
||||
* @return void
|
||||
*/
|
||||
public function onWorkerExit(OnWorkerExit $exit)
|
||||
{
|
||||
Timer::clear($this->timerId);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param OnServerBeforeStart $server
|
||||
*/
|
||||
@@ -199,7 +207,7 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
|
||||
private function dispatch($data): array
|
||||
{
|
||||
try {
|
||||
[$handler, $params] = $this->container->get(RpcManager::class)->get($data['service'], $data['method']);
|
||||
[$handler, $params] = $this->getContainer()->get(RpcManager::class)->get($data['service'], $data['method']);
|
||||
if (is_null($handler)) {
|
||||
throw new \Exception('Method not found', -32601);
|
||||
} else {
|
||||
|
||||
+34
-7
@@ -2,10 +2,11 @@
|
||||
|
||||
namespace Kiri\Rpc;
|
||||
|
||||
use Exception;
|
||||
use Kiri;
|
||||
use Kiri\Abstracts\Component;
|
||||
use Kiri\Consul\Agent;
|
||||
use Kiri\Consul\Health;
|
||||
use Kiri;
|
||||
use Kiri\Message\Handler\Handler;
|
||||
use ReflectionException;
|
||||
|
||||
@@ -22,11 +23,12 @@ class RpcManager extends Component
|
||||
/**
|
||||
* @param $serviceName
|
||||
* @return void
|
||||
* @throws ReflectionException
|
||||
* @throws \Exception
|
||||
* @throws Exception
|
||||
*/
|
||||
public function async($serviceName): void
|
||||
{
|
||||
$this->reRegister($serviceName);
|
||||
|
||||
$lists = Kiri::getDi()->get(Health::class)->setQuery('passing=true')->service($serviceName);
|
||||
if ($lists->getStatusCode() != 200) {
|
||||
return;
|
||||
@@ -42,12 +44,37 @@ class RpcManager extends Component
|
||||
|
||||
|
||||
/**
|
||||
* @throws ReflectionException
|
||||
* @param string $serviceName
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
public function reRegister(string $serviceName)
|
||||
{
|
||||
$config = $this->_rpc[$serviceName] ?? [];
|
||||
if (empty($config)) {
|
||||
return;
|
||||
}
|
||||
$service = Kiri::getDi()->get(Agent::class);
|
||||
|
||||
$info = $service->service->service_health($config['config']['ID']);
|
||||
if ($info->getStatusCode() == 200) {
|
||||
return;
|
||||
}
|
||||
$service->service->register($config['config']);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @throws Exception
|
||||
*/
|
||||
public function tick(): void
|
||||
{
|
||||
foreach ($this->_rpc as $name => $list) {
|
||||
$this->async($name);
|
||||
try {
|
||||
foreach ($this->_rpc as $name => $list) {
|
||||
$this->async($name);
|
||||
}
|
||||
} catch (\Throwable $throwable) {
|
||||
$this->logger()->error(error_trigger_format($throwable));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,7 +82,7 @@ class RpcManager extends Component
|
||||
/**
|
||||
* @param $serviceName
|
||||
* @return array
|
||||
* @throws \Exception
|
||||
* @throws Exception
|
||||
*/
|
||||
public function getServices($serviceName): array
|
||||
{
|
||||
|
||||
+1
-2
@@ -11,8 +11,7 @@
|
||||
"require": {
|
||||
"php": ">=8.0",
|
||||
"ext-json": "*",
|
||||
"psr/http-client": "^1.0",
|
||||
"psr/http-message": "^1.0"
|
||||
"psr/http-client": "^1.0"
|
||||
},
|
||||
"autoload": {
|
||||
"psr-4": {
|
||||
|
||||
Reference in New Issue
Block a user