34 Commits

Author SHA1 Message Date
as2252258 8824f711bb Revert "改名"
This reverts commit fdf58326
2022-01-20 19:04:16 +08:00
as2252258 f2d97832b5 Revert "改名"
This reverts commit fdf58326
2022-01-19 17:27:47 +08:00
as2252258 855e57a651 Revert "改名"
This reverts commit fdf58326
2022-01-19 16:39:29 +08:00
as2252258 0b3310b82e Revert "改名"
This reverts commit fdf58326
2022-01-14 11:29:16 +08:00
as2252258 e8cf7f29f0 Revert "改名"
This reverts commit fdf58326
2022-01-13 19:02:07 +08:00
as2252258 62e3b001f0 Revert "改名"
This reverts commit fdf58326
2022-01-13 19:00:01 +08:00
as2252258 f655cebdf3 Revert "改名"
This reverts commit fdf58326
2022-01-13 18:58:40 +08:00
as2252258 ee492d97ad Revert "改名"
This reverts commit fdf58326
2022-01-13 18:57:53 +08:00
as2252258 a16d841d98 Revert "改名"
This reverts commit fdf58326
2022-01-13 18:56:41 +08:00
as2252258 f015662fd6 Revert "改名"
This reverts commit fdf58326
2022-01-13 18:52:42 +08:00
as2252258 021dcae59b Revert "改名"
This reverts commit fdf58326
2022-01-13 18:50:19 +08:00
as2252258 c6dc34c821 Revert "改名"
This reverts commit fdf58326
2022-01-13 18:47:34 +08:00
as2252258 88c3958aba Revert "改名"
This reverts commit fdf58326
2022-01-13 18:42:42 +08:00
as2252258 24ebfac360 Revert "改名"
This reverts commit fdf58326
2022-01-13 18:41:16 +08:00
as2252258 b3c200b443 Revert "改名"
This reverts commit fdf58326
2022-01-13 18:34:23 +08:00
as2252258 3e2b50bf60 Revert "改名"
This reverts commit fdf58326
2022-01-13 18:33:56 +08:00
as2252258 89c142930e Revert "改名"
This reverts commit fdf58326
2022-01-13 18:29:51 +08:00
as2252258 1515d9695c Revert "改名"
This reverts commit fdf58326
2022-01-13 18:28:22 +08:00
as2252258 f1f7b1081d Revert "改名"
This reverts commit fdf58326
2022-01-13 18:27:36 +08:00
as2252258 07533e29f9 Revert "改名"
This reverts commit fdf58326
2022-01-13 18:25:29 +08:00
as2252258 4d60a24fac Revert "改名"
This reverts commit fdf58326
2022-01-12 14:10:33 +08:00
as2252258 4dc6bc661a Revert "改名"
This reverts commit fdf58326
2022-01-11 17:56:16 +08:00
as2252258 0d7fd5e356 Revert "改名"
This reverts commit fdf58326
2022-01-11 14:53:41 +08:00
as2252258 da16b0685d Revert "改名"
This reverts commit fdf58326
2022-01-11 14:51:50 +08:00
as2252258 febbdea8c8 Revert "改名"
This reverts commit fdf58326
2022-01-11 14:44:02 +08:00
as2252258 a45c3d875c Revert "改名"
This reverts commit fdf58326
2022-01-11 14:42:04 +08:00
as2252258 64d01c0a80 Revert "改名"
This reverts commit fdf58326
2022-01-11 14:41:06 +08:00
as2252258 bf5fe594e4 Revert "改名"
This reverts commit fdf58326
2022-01-11 14:30:58 +08:00
as2252258 5cfe3c6d6d Revert "改名"
This reverts commit fdf58326
2022-01-11 14:30:05 +08:00
as2252258 773e3c0f57 Revert "改名"
This reverts commit fdf58326
2022-01-11 14:26:57 +08:00
as2252258 b1c91343ef Revert "改名"
This reverts commit fdf58326
2022-01-11 14:25:24 +08:00
as2252258 3f821ca9d0 Revert "改名"
This reverts commit fdf58326
2022-01-11 14:23:48 +08:00
as2252258 d296b3107e Revert "改名"
This reverts commit fdf58326
2022-01-11 14:15:51 +08:00
as2252258 f606c58204 Revert "改名"
This reverts commit fdf58326
2022-01-11 14:13:51 +08:00
7 changed files with 76 additions and 41 deletions
+2 -2
View File
@@ -5,7 +5,7 @@ namespace Kiri\Rpc\Annotation;
use Kiri\Abstracts\Config;
use Kiri\Core\Network;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use Kiri;
use Kiri\Rpc\RpcManager;
use Kiri\Annotation\Attribute;
use ReflectionException;
@@ -70,7 +70,7 @@ use ReflectionException;
"Check" => [
"CheckId" => "service:rpc.json.{$this->service}." . $this->uniqueId,
"Name" => "service " . $this->service . ' health check',
"Annotations" => "Script based health check",
"Notes" => "Script based health check",
"ServiceID" => $this->service,
"TCP" => $this->checkUrl,
"Interval" => "5s",
+1 -1
View File
@@ -6,7 +6,7 @@ use Exception;
use Kiri\Abstracts\Component;
use Kiri\Context;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use Kiri;
use Kiri\Pool\Alias;
use Kiri\Pool\Pool;
use Swoole\Client;
+2 -1
View File
@@ -4,6 +4,7 @@ namespace Kiri\Rpc;
use JetBrains\PhpStorm\Pure;
use Throwable;
class InvalidRpcParamsException extends \Exception
@@ -15,7 +16,7 @@ class InvalidRpcParamsException extends \Exception
* @param int $code
* @param Throwable|null $previous
*/
public function __construct($message = "", $code = 0, Throwable $previous = null)
#[Pure] public function __construct(string $message = "", int $code = 0, Throwable $previous = null)
{
parent::__construct($message, -32602, $previous);
}
+1 -2
View File
@@ -7,7 +7,7 @@ use Exception;
use Kiri\Message\ServerRequest;
use Kiri\Message\Stream;
use Kiri\Core\Number;
use Kiri\Kiri;
use Kiri;
use Kiri\Pool\Pool;
use Psr\Http\Client\ClientExceptionInterface;
use Psr\Http\Message\ResponseInterface;
@@ -54,7 +54,6 @@ abstract class JsonRpcConsumers implements OnRpcConsumerInterface
/**
* @param array $data
* @return ServerRequestInterface
* @throws \ReflectionException
*/
private function requestBody(array $data): ServerRequestInterface
{
+33 -25
View File
@@ -2,30 +2,29 @@
namespace Kiri\Rpc;
use Kiri;
use Kiri\Abstracts\Component;
use Kiri\Abstracts\Config;
use Kiri\Annotation\Annotation;
use Kiri\Annotation\Inject;
use Kiri\Consul\Agent;
use Kiri\Context;
use Kiri\Exception\ConfigException;
use Kiri\Message\Constrict\RequestInterface;
use Kiri\Message\Handler\Handler;
use Kiri\Message\Handler\Router;
use Kiri\Message\ServerRequest;
use Kiri\Abstracts\Component;
use Kiri\Abstracts\Config;
use Kiri\Consul\Agent;
use Kiri\Context;
use Kiri\Events\EventProvider;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use Kiri\Annotation\Inject;
use Kiri\Annotation\Annotation;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use Psr\Http\Message\ServerRequestInterface;
use ReflectionException;
use Kiri\Server\Contract\OnCloseInterface;
use Kiri\Server\Contract\OnConnectInterface;
use Kiri\Server\Contract\OnReceiveInterface;
use Kiri\Server\Events\OnBeforeShutdown;
use Kiri\Server\Events\OnServerBeforeStart;
use Kiri\Server\Events\OnTaskerStart;
use Kiri\Server\Events\OnWorkerExit;
use Kiri\Server\Events\OnWorkerStart;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use Psr\Http\Message\ServerRequestInterface;
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use Swoole\Server;
@@ -47,21 +46,24 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
public Annotation $annotation;
private RpcManager $manager;
private int $timerId;
/**
*
* @throws \Exception
*/
public function init(): void
{
$this->eventProvider->on(OnBeforeShutdown::class, [$this, 'onBeforeShutdown']);
$this->getEventProvider()->on(OnBeforeShutdown::class, [$this, 'onBeforeShutdown']);
scan_directory(APP_PATH . 'rpc', 'Rpc');
scan_directory(APP_PATH . 'rpc', 'app\Rpc');
$this->eventProvider->on(OnWorkerStart::class, [$this, 'consulWatches']);
$this->eventProvider->on(OnServerBeforeStart::class, [$this, 'register']);
$this->getEventProvider()->on(OnWorkerStart::class, [$this, 'consulWatches']);
$this->getEventProvider()->on(OnWorkerExit::class, [$this, 'onWorkerExit']);
$this->getEventProvider()->on(OnServerBeforeStart::class, [$this, 'register']);
$this->manager = Kiri::getDi()->get(RpcManager::class);
}
@@ -76,7 +78,7 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
public function onBeforeShutdown(OnBeforeShutdown $beforeShutdown)
{
$doneList = $this->manager->doneList();
$agent = $this->container->get(Agent::class);
$agent = $this->getContainer()->get(Agent::class);
foreach ($doneList as $value) {
$agent->service->deregister($value['config']['ID']);
$agent->checks->deregister($value['config']['Check']['CheckId']);
@@ -94,16 +96,22 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
return;
}
$async_time = (int)Config::get('consul.async_time', 1000);
Timer::tick($async_time, static function ($timeId) {
if (env('state', 'start') == 'exit') {
Timer::clear($timeId);
return;
}
$this->timerId = Timer::tick($async_time, static function () {
Kiri::getDi()->get(RpcManager::class)->tick();
});
}
/**
* @param OnWorkerExit $exit
* @return void
*/
public function onWorkerExit(OnWorkerExit $exit)
{
Timer::clear($this->timerId);
}
/**
* @param OnServerBeforeStart $server
*/
@@ -199,7 +207,7 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
private function dispatch($data): array
{
try {
[$handler, $params] = $this->container->get(RpcManager::class)->get($data['service'], $data['method']);
[$handler, $params] = $this->getContainer()->get(RpcManager::class)->get($data['service'], $data['method']);
if (is_null($handler)) {
throw new \Exception('Method not found', -32601);
} else {
+36 -8
View File
@@ -2,11 +2,12 @@
namespace Kiri\Rpc;
use Kiri\Message\Handler\Handler;
use Exception;
use Kiri;
use Kiri\Abstracts\Component;
use Kiri\Consul\Agent;
use Kiri\Consul\Health;
use Kiri\Kiri;
use Kiri\Message\Handler\Handler;
use ReflectionException;
class RpcManager extends Component
@@ -22,11 +23,12 @@ class RpcManager extends Component
/**
* @param $serviceName
* @return void
* @throws ReflectionException
* @throws \Exception
* @throws Exception
*/
public function async($serviceName): void
{
$this->reRegister($serviceName);
$lists = Kiri::getDi()->get(Health::class)->setQuery('passing=true')->service($serviceName);
if ($lists->getStatusCode() != 200) {
return;
@@ -42,12 +44,37 @@ class RpcManager extends Component
/**
* @throws ReflectionException
* @param string $serviceName
* @return void
* @throws Exception
*/
public function reRegister(string $serviceName)
{
$config = $this->_rpc[$serviceName] ?? [];
if (empty($config)) {
return;
}
$service = Kiri::getDi()->get(Agent::class);
$info = $service->service->service_health($config['config']['ID']);
if ($info->getStatusCode() == 200) {
return;
}
$service->service->register($config['config']);
}
/**
* @throws Exception
*/
public function tick(): void
{
foreach ($this->_rpc as $name => $list) {
$this->async($name);
try {
foreach ($this->_rpc as $name => $list) {
$this->async($name);
}
} catch (\Throwable $throwable) {
$this->logger()->error(error_trigger_format($throwable));
}
}
@@ -55,7 +82,7 @@ class RpcManager extends Component
/**
* @param $serviceName
* @return array
* @throws \Exception
* @throws Exception
*/
public function getServices($serviceName): array
{
@@ -117,6 +144,7 @@ class RpcManager extends Component
{
$agent = Kiri::getDi()->get(Agent::class);
foreach ($this->_rpc as $list) {
$agent->service->deregister($list['config']['ID']);
$data = $agent->service->register($list['config']);
if ($data->getStatusCode() != 200) {
return;
+1 -2
View File
@@ -11,8 +11,7 @@
"require": {
"php": ">=8.0",
"ext-json": "*",
"psr/http-client": "^1.0",
"psr/http-message": "^1.0"
"psr/http-client": "^1.0"
},
"autoload": {
"psr-4": {