This commit is contained in:
2023-07-31 23:09:00 +08:00
parent 493a58281f
commit 10a64ee646
7 changed files with 287 additions and 309 deletions
+24 -54
View File
@@ -7,26 +7,23 @@ use JetBrains\PhpStorm\ArrayShape;
use Kiri;
use Kiri\Di\LocalService;
use Kiri\Abstracts\Component;
use Kiri\Annotation\Annotation;
use Kiri\Consul\Agent;
use Kiri\Core\Json;
use Kiri\Events\EventProvider;
use Kiri\Exception\ConfigException;
use Kiri\Message\Handler\DataGrip;
use Kiri\Message\Handler\Router;
use Kiri\Message\Handler\RouterCollector;
use Kiri\Server\Contract\OnCloseInterface;
use Kiri\Server\Contract\OnConnectInterface;
use Kiri\Server\Contract\OnReceiveInterface;
use Kiri\Server\Events\OnBeforeShutdown;
use Kiri\Server\Events\OnServerBeforeStart;
use Psr\Container\ContainerExceptionInterface;
use Kiri\Di\ContainerInterface;
use Psr\Container\NotFoundExceptionInterface;
use ReflectionException;
use Swoole\Coroutine;
use Kiri\Router\RouterCollector;
use Psr\Container\ContainerInterface;
use Swoole\Coroutine\Channel;
use Swoole\Server;
use Kiri\Router\DataGrip;
/**
*
@@ -35,46 +32,36 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
{
private array $consul = [];
public RouterCollector $collector;
/**
* @param ContainerInterface $container
* @param Router $router
* @param Annotation $annotation
* @param DataGrip $dataGrip
* @param RpcManager $manager
* @param EventProvider $eventProvider
* @param array $config
* @throws Exception
*/
public function __construct(public ContainerInterface $container,
public Router $router,
public Annotation $annotation,
public DataGrip $dataGrip,
public RpcManager $manager,
public EventProvider $eventProvider,
array $config = [])
public EventProvider $eventProvider)
{
parent::__construct($config);
parent::__construct();
}
/**
* @return void
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
/**
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws ReflectionException
* @throws Exception
*/
public function init(): void
{
$this->eventProvider->on(OnBeforeShutdown::class, [$this, 'onBeforeShutdown']);
// $this->consul = \config('rpc.consul', null);
// if (!empty($this->consul)) {
// $this->eventProvider->on(OnServerBeforeStart::class, [$this, 'register']);
// }
$this->collector = $this->dataGrip->get('rpc');
$this->registerConsumers();
}
@@ -82,8 +69,7 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
/**
* @return void
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public function registerConsumers(): void
@@ -100,40 +86,24 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
}
/**
* @param OnBeforeShutdown $beforeShutdown
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
/**
* @param OnBeforeShutdown $beforeShutdown
* @return void
*/
public function onBeforeShutdown(OnBeforeShutdown $beforeShutdown): void
{
if (env('environmental') != Kiri::WORKER && env('environmental_workerId') != 0) {
return;
}
$agent = $this->container->get(Agent::class);
$this->logger->debug("disconnect consul.");
$agent->service->deregister($this->consul['ID']);
$agent->checks->deregister($this->consul['Check']['CheckId']);
}
/**
* @param OnServerBeforeStart $server
* @throws ConfigException
*/
public function register(OnServerBeforeStart $server)
{
$consumers = \config("rpc.consumers", []);
if (!empty($consumers)) {
foreach ($consumers as $service => $consumer) {
$this->manager->add($service, $consumer);
}
}
$this->manager->register($this->consul);
}
@@ -168,7 +138,6 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
}
return $server->send($fd, $this->batchDispatch($data), $reactor_id);
} catch (\Throwable $throwable) {
$this->logger->error('JsonRpc: ' . $throwable->getMessage());
$response = Json::encode($this->failure(-32700, $throwable->getMessage()));
return $server->send($fd, $response, $reactor_id);
}
@@ -291,11 +260,12 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
}
/**
* @param int $fd
* @return void
*/
public function OnClose(int $fd): void
/**
* @param Server $server
* @param int $fd
* @return void
*/
public function OnClose(Server $server, int $fd): void
{
// TODO: Implement onClose() method.
}