17 Commits

Author SHA1 Message Date
as2252258 0b3310b82e Revert "改名"
This reverts commit fdf58326
2022-01-14 11:29:16 +08:00
as2252258 e8cf7f29f0 Revert "改名"
This reverts commit fdf58326
2022-01-13 19:02:07 +08:00
as2252258 62e3b001f0 Revert "改名"
This reverts commit fdf58326
2022-01-13 19:00:01 +08:00
as2252258 f655cebdf3 Revert "改名"
This reverts commit fdf58326
2022-01-13 18:58:40 +08:00
as2252258 ee492d97ad Revert "改名"
This reverts commit fdf58326
2022-01-13 18:57:53 +08:00
as2252258 a16d841d98 Revert "改名"
This reverts commit fdf58326
2022-01-13 18:56:41 +08:00
as2252258 f015662fd6 Revert "改名"
This reverts commit fdf58326
2022-01-13 18:52:42 +08:00
as2252258 021dcae59b Revert "改名"
This reverts commit fdf58326
2022-01-13 18:50:19 +08:00
as2252258 c6dc34c821 Revert "改名"
This reverts commit fdf58326
2022-01-13 18:47:34 +08:00
as2252258 88c3958aba Revert "改名"
This reverts commit fdf58326
2022-01-13 18:42:42 +08:00
as2252258 24ebfac360 Revert "改名"
This reverts commit fdf58326
2022-01-13 18:41:16 +08:00
as2252258 b3c200b443 Revert "改名"
This reverts commit fdf58326
2022-01-13 18:34:23 +08:00
as2252258 3e2b50bf60 Revert "改名"
This reverts commit fdf58326
2022-01-13 18:33:56 +08:00
as2252258 89c142930e Revert "改名"
This reverts commit fdf58326
2022-01-13 18:29:51 +08:00
as2252258 1515d9695c Revert "改名"
This reverts commit fdf58326
2022-01-13 18:28:22 +08:00
as2252258 f1f7b1081d Revert "改名"
This reverts commit fdf58326
2022-01-13 18:27:36 +08:00
as2252258 07533e29f9 Revert "改名"
This reverts commit fdf58326
2022-01-13 18:25:29 +08:00
2 changed files with 62 additions and 27 deletions
+28 -20
View File
@@ -2,30 +2,29 @@
namespace Kiri\Rpc;
use Kiri;
use Kiri\Abstracts\Component;
use Kiri\Abstracts\Config;
use Kiri\Annotation\Annotation;
use Kiri\Annotation\Inject;
use Kiri\Consul\Agent;
use Kiri\Context;
use Kiri\Exception\ConfigException;
use Kiri\Message\Constrict\RequestInterface;
use Kiri\Message\Handler\Handler;
use Kiri\Message\Handler\Router;
use Kiri\Message\ServerRequest;
use Kiri\Abstracts\Component;
use Kiri\Abstracts\Config;
use Kiri\Consul\Agent;
use Kiri\Context;
use Kiri\Events\EventProvider;
use Kiri\Exception\ConfigException;
use Kiri;
use Kiri\Annotation\Inject;
use Kiri\Annotation\Annotation;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use Psr\Http\Message\ServerRequestInterface;
use ReflectionException;
use Kiri\Server\Contract\OnCloseInterface;
use Kiri\Server\Contract\OnConnectInterface;
use Kiri\Server\Contract\OnReceiveInterface;
use Kiri\Server\Events\OnBeforeShutdown;
use Kiri\Server\Events\OnServerBeforeStart;
use Kiri\Server\Events\OnTaskerStart;
use Kiri\Server\Events\OnWorkerExit;
use Kiri\Server\Events\OnWorkerStart;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use Psr\Http\Message\ServerRequestInterface;
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use Swoole\Server;
@@ -47,9 +46,11 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
public Annotation $annotation;
private RpcManager $manager;
private int $timerId;
/**
*
* @throws \Exception
@@ -58,9 +59,10 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
{
$this->eventProvider->on(OnBeforeShutdown::class, [$this, 'onBeforeShutdown']);
scan_directory(APP_PATH . 'rpc', 'Rpc');
scan_directory(APP_PATH . 'rpc', 'app\Rpc');
$this->eventProvider->on(OnWorkerStart::class, [$this, 'consulWatches']);
$this->eventProvider->on(OnWorkerExit::class, [$this, 'onWorkerExit']);
$this->eventProvider->on(OnServerBeforeStart::class, [$this, 'register']);
$this->manager = Kiri::getDi()->get(RpcManager::class);
@@ -94,16 +96,22 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
return;
}
$async_time = (int)Config::get('consul.async_time', 1000);
Timer::tick(5000, static function ($timeId) {
if (env('state', 'start') == 'exit') {
Timer::clear($timeId);
return;
}
$this->timerId = Timer::tick($async_time, static function () {
Kiri::getDi()->get(RpcManager::class)->tick();
});
}
/**
* @param OnWorkerExit $exit
* @return void
*/
public function onWorkerExit(OnWorkerExit $exit)
{
Timer::clear($this->timerId);
}
/**
* @param OnServerBeforeStart $server
*/
+34 -7
View File
@@ -2,10 +2,11 @@
namespace Kiri\Rpc;
use Exception;
use Kiri;
use Kiri\Abstracts\Component;
use Kiri\Consul\Agent;
use Kiri\Consul\Health;
use Kiri;
use Kiri\Message\Handler\Handler;
use ReflectionException;
@@ -22,11 +23,12 @@ class RpcManager extends Component
/**
* @param $serviceName
* @return void
* @throws ReflectionException
* @throws \Exception
* @throws Exception
*/
public function async($serviceName): void
{
$this->reRegister($serviceName);
$lists = Kiri::getDi()->get(Health::class)->setQuery('passing=true')->service($serviceName);
if ($lists->getStatusCode() != 200) {
return;
@@ -42,12 +44,37 @@ class RpcManager extends Component
/**
* @throws ReflectionException
* @param string $serviceName
* @return void
* @throws Exception
*/
public function reRegister(string $serviceName)
{
$config = $this->_rpc[$serviceName] ?? [];
if (empty($config)) {
return;
}
$service = Kiri::getDi()->get(Agent::class);
$info = $service->service->service_health($config['config']['ID']);
if ($info->getStatusCode() == 200) {
return;
}
$service->service->register($config['config']);
}
/**
* @throws Exception
*/
public function tick(): void
{
foreach ($this->_rpc as $name => $list) {
$this->async($name);
try {
foreach ($this->_rpc as $name => $list) {
$this->async($name);
}
} catch (\Throwable $throwable) {
$this->logger()->error(error_trigger_format($throwable));
}
}
@@ -55,7 +82,7 @@ class RpcManager extends Component
/**
* @param $serviceName
* @return array
* @throws \Exception
* @throws Exception
*/
public function getServices($serviceName): array
{