This commit is contained in:
2023-11-30 17:02:20 +08:00
parent 0a705b27bc
commit 2ecb13c453
+175 -196
View File
@@ -5,7 +5,6 @@ namespace Kiri\Rpc;
use Exception; use Exception;
use JetBrains\PhpStorm\ArrayShape; use JetBrains\PhpStorm\ArrayShape;
use Kiri; use Kiri;
use Kiri\Di\LocalService;
use Kiri\Abstracts\Component; use Kiri\Abstracts\Component;
use Kiri\Core\Json; use Kiri\Core\Json;
use Kiri\Events\EventProvider; use Kiri\Events\EventProvider;
@@ -31,23 +30,23 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
{ {
public RouterCollector $collector; public RouterCollector $collector;
/** /**
* @param ContainerInterface $container * @param ContainerInterface $container
* @param DataGrip $dataGrip * @param DataGrip $dataGrip
* @param RpcManager $manager * @param RpcManager $manager
* @param EventProvider $eventProvider * @param EventProvider $eventProvider
* @throws Exception * @throws Exception
*/ */
public function __construct(public ContainerInterface $container, public function __construct(public ContainerInterface $container,
public DataGrip $dataGrip, public DataGrip $dataGrip,
public RpcManager $manager, public RpcManager $manager,
public EventProvider $eventProvider) public EventProvider $eventProvider)
{ {
parent::__construct(); parent::__construct();
} }
/** /**
@@ -57,206 +56,186 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
* @throws ReflectionException * @throws ReflectionException
* @throws Exception * @throws Exception
*/ */
public function init(): void public function init(): void
{ {
$this->eventProvider->on(OnBeforeShutdown::class, [$this, 'onBeforeShutdown']); $this->eventProvider->on(OnBeforeShutdown::class, [$this, 'onBeforeShutdown']);
$this->collector = $this->dataGrip->get('rpc'); $this->collector = $this->dataGrip->get('rpc');
$this->registerConsumers(); }
}
/**
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public function registerConsumers(): void
{
$consumers = \config('rpc.consumers', []);
if (!is_array($consumers)) {
return;
}
$local = $this->container->get(LocalService::class);
foreach ($consumers as $consumer) {
$local->set($consumer['id'], $consumer);
}
}
/** /**
* @param OnBeforeShutdown $beforeShutdown * @param OnBeforeShutdown $beforeShutdown
* @return void * @return void
*/ */
public function onBeforeShutdown(OnBeforeShutdown $beforeShutdown): void public function onBeforeShutdown(OnBeforeShutdown $beforeShutdown): void
{ {
if (env('environmental') != Kiri::WORKER && env('environmental_workerId') != 0) { if (env('environmental') != Kiri::WORKER && env('environmental_workerId') != 0) {
return; return;
} }
} }
/** /**
* @param OnServerBeforeStart $server * @param OnServerBeforeStart $server
*/ */
public function register(OnServerBeforeStart $server) public function register(OnServerBeforeStart $server)
{ {
} }
/** /**
* @param Server $server * @param Server $server
* @param int $fd * @param int $fd
*/ */
public function onConnect(Server $server, int $fd): void public function onConnect(Server $server, int $fd): void
{ {
// TODO: Implement onConnect() method. // TODO: Implement onConnect() method.
} }
/** /**
* @param Server $server * @param Server $server
* @param int $fd * @param int $fd
* @param int $reactor_id * @param int $reactor_id
* @param string $data * @param string $data
* @return bool * @return bool
*/ */
public function onReceive(Server $server, int $fd, int $reactor_id, string $data): bool public function onReceive(Server $server, int $fd, int $reactor_id, string $data): bool
{ {
try { try {
$data = json_decode($data, true); $data = json_decode($data, true);
if (is_null($data)) return $server->send($fd, 'success', $reactor_id); if (is_null($data)) return $server->send($fd, 'success', $reactor_id);
$data = json_decode($data, true); $data = json_decode($data, true);
if (!is_array($data)) { if (!is_array($data)) {
throw new Exception('Parse error语法解析错误', -32700); throw new Exception('Parse error语法解析错误', -32700);
} }
if (!isset($data['jsonrpc']) || !isset($data['method']) || $data['jsonrpc'] != '2.0') { if (!isset($data['jsonrpc']) || !isset($data['method']) || $data['jsonrpc'] != '2.0') {
throw new Exception('Invalid Request无效请求', -32600); throw new Exception('Invalid Request无效请求', -32600);
} }
return $server->send($fd, $this->batchDispatch($data), $reactor_id); return $server->send($fd, $this->batchDispatch($data), $reactor_id);
} catch (\Throwable $throwable) { } catch (\Throwable $throwable) {
$response = Json::encode($this->failure(-32700, $throwable->getMessage())); $response = Json::encode($this->failure(-32700, $throwable->getMessage()));
return $server->send($fd, $response, $reactor_id); return $server->send($fd, $response, $reactor_id);
} }
} }
/** /**
* @param array $data * @param array $data
* @return string|bool * @return string|bool
*/ */
private function batchDispatch(array $data): string|bool private function batchDispatch(array $data): string|bool
{ {
if (isset($data['jsonrpc'])) { if (isset($data['jsonrpc'])) {
$result = $this->dispatch($data); $result = $this->dispatch($data);
if (!isset($data['id'])) { if (!isset($data['id'])) {
$result = [1]; $result = [1];
} }
} else { } else {
$channel = new Channel($total = count($data)); $channel = new Channel($total = count($data));
foreach ($data as $datum) { foreach ($data as $datum) {
$this->_execute($channel, $datum); $this->_execute($channel, $datum);
} }
$result = []; $result = [];
for ($i = 0; $i < $total; $i++) { for ($i = 0; $i < $total; $i++) {
$result[] = $channel->pop(); $result[] = $channel->pop();
} }
} }
return json_encode($result, JSON_UNESCAPED_UNICODE); return json_encode($result, JSON_UNESCAPED_UNICODE);
} }
/** /**
* @param $channel * @param $channel
* @param $datum * @param $datum
*/ */
private function _execute($channel, $datum) private function _execute($channel, $datum)
{ {
Coroutine::create(function () use ($channel, $datum) { Coroutine::create(function () use ($channel, $datum) {
if (empty($datum) || !isset($datum['jsonrpc'])) { if (empty($datum) || !isset($datum['jsonrpc'])) {
$channel->push($this->failure(-32700, 'Parse error语法解析错误')); $channel->push($this->failure(-32700, 'Parse error语法解析错误'));
} else if (!isset($datum['method'])) { } else if (!isset($datum['method'])) {
$channel->push($this->failure(-32700, 'Parse error语法解析错误')); $channel->push($this->failure(-32700, 'Parse error语法解析错误'));
} else { } else {
$dispatch = $this->dispatch($datum); $dispatch = $this->dispatch($datum);
if (!isset($dispatch['id'])) { if (!isset($dispatch['id'])) {
$dispatch = [1]; $dispatch = [1];
} }
$channel->push($dispatch); $channel->push($dispatch);
} }
}); });
} }
/** /**
* @param $data * @param $data
* @return array * @return array
*/ */
private function dispatch($data): array private function dispatch($data): array
{ {
try { // try {
$class = $this->container->get(LocalService::class)->get($data['service']); // $class = $this->collector->query($data['service'], 'tcp');
if (!$this->container->has($class)) { // if (!$this->container->has($class)) {
throw new Exception('Handler not found', -32601); // throw new Exception('Handler not found', -32601);
} // }
$controller = $this->container->get($class); // $controller = $this->container->get($class);
if (!method_exists($controller, $data['method'])) { // if (!method_exists($controller, $data['method'])) {
throw new Exception('Method not found', -32601); // throw new Exception('Method not found', -32601);
} // }
if (!isset($data['params']) || !is_array($data['params'])) { // if (!isset($data['params']) || !is_array($data['params'])) {
$data['params'] = []; // $data['params'] = [];
} // }
return $this->handler($controller, $data['method'], $data['params']); // return $this->handler($controller, $data['method'], $data['params']);
} catch (\Throwable $throwable) { // } catch (\Throwable $throwable) {
$code = $throwable->getCode() == 0 ? -32603 : $throwable->getCode(); // $code = $throwable->getCode() == 0 ? -32603 : $throwable->getCode();
return $this->failure($code, jTraceEx($throwable), [], $data['id'] ?? null); // return $this->failure($code, jTraceEx($throwable), [], $data['id'] ?? null);
} // }
} }
/** /**
* @param $controller * @param $controller
* @param string $method * @param string $method
* @param $params * @param $params
* @return array * @return array
*/ */
#[ArrayShape([])] #[ArrayShape([])]
private function handler($controller, string $method, $params): array private function handler($controller, string $method, $params): array
{ {
$result = call_user_func([$controller, $method], ...$params); $result = call_user_func([$controller, $method], ...$params);
return [ return [
'jsonrpc' => '2.0', 'jsonrpc' => '2.0',
'result' => $result, 'result' => $result,
'id' => $data['id'] ?? null 'id' => $data['id'] ?? null
]; ];
} }
/** /**
* @param $code * @param $code
* @param $message * @param $message
* @param array $data * @param array $data
* @param null $id * @param null $id
* @return array * @return array
*/ */
#[ArrayShape([])] #[ArrayShape([])]
protected function failure($code, $message, array $data = [], $id = null): array protected function failure($code, $message, array $data = [], $id = null): array
{ {
$error = [ $error = [
'jsonrpc' => '2.0', 'jsonrpc' => '2.0',
'error' => [ 'error' => [
'code' => $code, 'code' => $code,
'message' => $message, 'message' => $message,
'data' => $data 'data' => $data
] ]
]; ];
if (!is_null($id)) { if (!is_null($id)) {
$error['id'] = $id; $error['id'] = $id;
} }
return $error; return $error;
} }
/** /**
@@ -264,8 +243,8 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
* @param int $fd * @param int $fd
* @return void * @return void
*/ */
public function OnClose(Server $server, int $fd): void public function OnClose(Server $server, int $fd): void
{ {
// TODO: Implement onClose() method. // TODO: Implement onClose() method.
} }
} }