Files
kiri-rpc/src/RpcJsonp.php
T

242 lines
5.3 KiB
PHP
Raw Normal View History

2021-10-26 18:58:09 +08:00
<?php
namespace Kiri\Rpc;
2021-10-28 15:20:45 +08:00
use Annotation\Annotation;
2021-10-26 18:58:09 +08:00
use Annotation\Inject;
use Http\Handler\Router;
2021-10-28 15:20:45 +08:00
use Kiri\Abstracts\Component;
2021-10-29 11:06:52 +08:00
use Kiri\Abstracts\Config;
2021-10-29 11:05:18 +08:00
use Kiri\Consul\Agent;
2021-11-29 10:58:45 +08:00
use Psr\Container\ContainerExceptionInterface;
2021-11-27 17:43:29 +08:00
use Psr\Container\ContainerInterface;
2021-10-29 14:06:56 +08:00
use Kiri\Events\EventProvider;
2021-10-29 11:06:52 +08:00
use Kiri\Exception\ConfigException;
2021-10-28 14:02:25 +08:00
use Kiri\Kiri;
2021-11-29 10:58:45 +08:00
use Psr\Container\NotFoundExceptionInterface;
2021-10-29 14:15:40 +08:00
use Server\Events\OnBeforeShutdown;
2021-10-29 14:06:56 +08:00
use Server\Events\OnStart;
2021-11-18 11:37:13 +08:00
use Server\Contract\OnCloseInterface;
use Server\Contract\OnConnectInterface;
use Server\Contract\OnReceiveInterface;
2021-10-26 18:58:09 +08:00
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use Swoole\Server;
/**
*
*/
2021-10-28 15:20:45 +08:00
class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterface, OnCloseInterface
2021-10-26 18:58:09 +08:00
{
#[Inject(Router::class)]
public Router $router;
2021-10-28 15:20:45 +08:00
#[Inject(Annotation::class)]
public Annotation $annotation;
2021-10-29 14:06:56 +08:00
#[Inject(EventProvider::class)]
public EventProvider $eventProvider;
2021-10-29 14:15:40 +08:00
#[Inject(ContainerInterface::class)]
public ContainerInterface $container;
2021-10-28 15:20:45 +08:00
/**
*
* @throws \Exception
*/
public function init(): void
{
2021-10-29 14:15:40 +08:00
$this->eventProvider->on(OnBeforeShutdown::class, [$this, 'onBeforeShutdown']);
2021-10-29 14:06:56 +08:00
2021-10-29 14:13:41 +08:00
scan_directory(APP_PATH . 'rpc', 'Rpc');
2021-10-29 11:05:18 +08:00
}
2021-10-29 14:15:40 +08:00
/**
* @param OnBeforeShutdown $beforeShutdown
2021-11-29 10:58:45 +08:00
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
2021-10-29 14:15:40 +08:00
*/
public function onBeforeShutdown(OnBeforeShutdown $beforeShutdown)
{
2021-10-29 18:06:04 +08:00
$doneList = RpcManager::doneList();
2021-10-29 14:15:40 +08:00
$agent = $this->container->get(Agent::class);
2021-10-29 18:06:04 +08:00
foreach ($doneList as $value) {
$agent->service->deregister($value);
}
2021-10-29 14:15:40 +08:00
}
2021-10-29 11:06:52 +08:00
/**
2021-11-29 10:58:45 +08:00
* @param OnStart $server
2021-10-29 11:06:52 +08:00
* @throws ConfigException
2021-11-29 10:58:45 +08:00
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
2021-10-29 11:06:52 +08:00
*/
2021-10-29 14:08:17 +08:00
public function register(OnStart $server)
2021-10-29 11:05:18 +08:00
{
2021-10-29 11:06:52 +08:00
$config = Config::get('rpc');
2021-11-04 23:58:05 +08:00
2021-10-29 14:15:40 +08:00
$agent = $this->container->get(Agent::class);
2021-11-04 23:58:05 +08:00
$data = $agent->service->register($config['registry']['config']);
if ($data->getStatusCode() != 200) {
$server->server->shutdown();
}
2021-10-28 15:20:45 +08:00
}
2021-10-26 18:58:09 +08:00
/**
* @param Server $server
* @param int $fd
*/
public function onConnect(Server $server, int $fd): void
{
// TODO: Implement onConnect() method.
}
/**
* @param Server $server
* @param int $fd
* @param int $reactor_id
* @param string $data
*/
public function onReceive(Server $server, int $fd, int $reactor_id, string $data): void
{
$data = json_decode($data, true);
if (is_null($data)) {
$this->failure(-32700, 'Parse error语法解析错误');
} else if (!isset($data['jsonrpc']) || !isset($data['method']) || $data['jsonrpc'] != '2.0') {
$this->failure(-32600, 'Invalid Request无效请求');
} else {
$this->batchDispatch($server, $fd, $data);
}
}
/**
* @param Server $server
* @param int $fd
* @param array $data
* @return void
*/
private function batchDispatch(Server $server, int $fd, array $data): void
{
if (isset($data['jsonrpc'])) {
2021-10-28 16:19:20 +08:00
$dispatch = $this->dispatch($data);
if (!isset($data['id'])) {
$dispatch = [1];
}
$result = json_encode($dispatch, JSON_UNESCAPED_UNICODE);
2021-10-26 18:58:09 +08:00
} else {
$channel = new Channel($total = count($data));
foreach ($data as $datum) {
$this->_execute($channel, $datum);
}
$result = [];
for ($i = 0; $i < $total; $i++) {
2021-10-28 16:17:25 +08:00
$result[] = $channel->pop();
2021-10-26 18:58:09 +08:00
}
}
$server->send($fd, json_encode($result, JSON_UNESCAPED_UNICODE));
}
/**
* @param $channel
* @param $datum
*/
private function _execute($channel, $datum)
{
Coroutine::create(function () use ($channel, $datum) {
if (empty($datum) || !isset($datum['jsonrpc'])) {
$channel->push($this->failure(-32700, 'Parse error语法解析错误'));
} else if (!isset($datum['method'])) {
$channel->push($this->failure(-32700, 'Parse error语法解析错误'));
} else {
2021-10-28 16:19:20 +08:00
$dispatch = $this->dispatch($datum);
if (!isset($dispatch['id'])) {
$dispatch = [1];
}
$channel->push($dispatch);
2021-10-26 18:58:09 +08:00
}
});
}
/**
* @param $data
* @return array
*/
private function dispatch($data): array
{
try {
2021-10-29 18:06:04 +08:00
[$handler, $params, $_] = RpcManager::get($data['service'], $data['method']);
2021-10-28 15:14:21 +08:00
if (is_null($handler)) {
2021-10-26 18:58:09 +08:00
throw new \Exception('Method not found', -32601);
} else {
2021-10-28 16:19:20 +08:00
return $this->handler($handler, $data);
2021-10-26 18:58:09 +08:00
}
} catch (\Throwable $throwable) {
$code = $throwable->getCode() == 0 ? -32603 : $throwable->getCode();
return $this->failure($code, jTraceEx($throwable), [], $data['id'] ?? null);
}
}
/**
2021-10-28 15:14:21 +08:00
* @param array $handler
2021-10-26 18:58:09 +08:00
* @param $data
* @return array
2021-11-29 10:58:45 +08:00
* @throws \ReflectionException
2021-10-26 18:58:09 +08:00
*/
2021-10-28 16:19:20 +08:00
private function handler(array $handler, $data): array
2021-10-26 18:58:09 +08:00
{
2021-10-28 15:14:21 +08:00
$controller = Kiri::getDi()->get($handler[0]);
2021-10-28 14:45:33 +08:00
2021-10-28 15:39:34 +08:00
$dispatcher = $controller->{$handler[1]}(...$data['params']);
2021-10-28 14:02:25 +08:00
2021-10-26 18:58:09 +08:00
return ['jsonrpc' => '2.0', 'result' => $dispatcher, 'id' => $data['id'] ?? null];
}
/**
* @param $code
* @param $message
* @param array $data
* @param null $id
* @return array
*/
protected function failure($code, $message, array $data = [], $id = null): array
{
$error = [
'jsonrpc' => '2.0',
'error' => [
'code' => $code,
'message' => $message,
'data' => $data
]
];
if (!is_null($id)) {
$error['id'] = $id;
}
return $error;
}
/**
* @param Server $server
* @param int $fd
*/
public function onClose(Server $server, int $fd): void
{
// TODO: Implement onClose() method.
}
}