This commit is contained in:
xl
2022-05-04 03:16:52 +08:00
parent c982e44631
commit abaa2547f7
+223 -220
View File
@@ -42,193 +42,196 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
{ {
private int $timerId; private int $timerId;
/** /**
* @param ContainerInterface $container * @param ContainerInterface $container
* @param Router $router * @param Router $router
* @param Annotation $annotation * @param Annotation $annotation
* @param DataGrip $dataGrip * @param DataGrip $dataGrip
* @param RpcManager $manager * @param RpcManager $manager
* @param RouterCollector $collector * @param RouterCollector $collector
* @param EventProvider $eventProvider * @param EventProvider $eventProvider
* @param array $config * @param array $config
* @throws Exception * @throws Exception
*/ */
public function __construct(public ContainerInterface $container, public function __construct(public ContainerInterface $container,
public Router $router, public Router $router,
public Annotation $annotation, public Annotation $annotation,
public DataGrip $dataGrip, public DataGrip $dataGrip,
public RpcManager $manager, public RpcManager $manager,
public RouterCollector $collector, public RouterCollector $collector,
public EventProvider $eventProvider, public EventProvider $eventProvider,
array $config = []) array $config = [])
{ {
parent::__construct($config); parent::__construct($config);
} }
/** /**
* @return void * @return void
* @throws ReflectionException * @throws ReflectionException
*/ */
public function init(): void public function init(): void
{ {
$this->eventProvider->on(OnBeforeShutdown::class, [$this, 'onBeforeShutdown']); $this->eventProvider->on(OnBeforeShutdown::class, [$this, 'onBeforeShutdown']);
scan_directory(APP_PATH . 'rpc', 'app\Rpc'); scan_directory(APP_PATH . 'rpc', 'app\Rpc');
$this->eventProvider->on(OnWorkerStart::class, [$this, 'consulWatches']); $this->eventProvider->on(OnWorkerStart::class, [$this, 'consulWatches']);
$this->eventProvider->on(OnWorkerExit::class, [$this, 'onWorkerExit']); $this->eventProvider->on(OnWorkerExit::class, [$this, 'onWorkerExit']);
$this->eventProvider->on(OnServerBeforeStart::class, [$this, 'register']); $this->eventProvider->on(OnServerBeforeStart::class, [$this, 'register']);
$this->collector = $this->dataGrip->get('rpc'); $this->collector = $this->dataGrip->get('rpc');
} }
/** /**
* @param OnBeforeShutdown $beforeShutdown * @param OnBeforeShutdown $beforeShutdown
* @return void * @return void
* @throws ContainerExceptionInterface * @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface * @throws NotFoundExceptionInterface
*/ */
public function onBeforeShutdown(OnBeforeShutdown $beforeShutdown): void public function onBeforeShutdown(OnBeforeShutdown $beforeShutdown): void
{ {
$doneList = $this->manager->doneList(); $doneList = $this->manager->doneList();
$agent = $this->container->get(Agent::class); $agent = $this->container->get(Agent::class);
foreach ($doneList as $value) { foreach ($doneList as $value) {
$agent->service->deregister($value['config']['ID']); $agent->service->deregister($value['config']['ID']);
$agent->checks->deregister($value['config']['Check']['CheckId']); $agent->checks->deregister($value['config']['Check']['CheckId']);
} }
} }
/** /**
* @param OnWorkerStart|OnTaskerStart $server * @param OnWorkerStart|OnTaskerStart $server
* @throws ConfigException * @throws ConfigException
*/ */
public function consulWatches(OnWorkerStart|OnTaskerStart $server) public function consulWatches(OnWorkerStart|OnTaskerStart $server)
{ {
if ($server->workerId != 0) { if ($server->workerId != 0) {
return; return;
} }
$async_time = (int)Config::get('consul.async_time', 1000); $async_time = (int)Config::get('consul.async_time', 1000);
$this->timerId = Timer::tick($async_time, static function () { $this->timerId = Timer::tick($async_time, static function () {
Kiri::getDi()->get(RpcManager::class)->tick(); Kiri::getDi()->get(RpcManager::class)->tick();
}); });
} }
/** /**
* @param OnWorkerExit $exit * @param OnWorkerExit $exit
* @return void * @return void
*/ */
public function onWorkerExit(OnWorkerExit $exit): void public function onWorkerExit(OnWorkerExit $exit): void
{ {
Timer::clear($this->timerId); Timer::clear($this->timerId);
} }
/** /**
* @param OnServerBeforeStart $server * @param OnServerBeforeStart $server
*/ */
public function register(OnServerBeforeStart $server) public function register(OnServerBeforeStart $server)
{ {
$this->manager->register(); $this->manager->register();
} }
/** /**
* @param Server $server * @param Server $server
* @param int $fd * @param int $fd
*/ */
public function onConnect(Server $server, int $fd): void public function onConnect(Server $server, int $fd): void
{ {
// TODO: Implement onConnect() method. // TODO: Implement onConnect() method.
} }
/** /**
* @param Server $server * @param Server $server
* @param int $fd * @param int $fd
* @param int $reactor_id * @param int $reactor_id
* @param string $data * @param string $data
*/ */
public function onReceive(Server $server, int $fd, int $reactor_id, string $data): void public function onReceive(Server $server, int $fd, int $reactor_id, string $data): void
{ {
$data = json_decode($data, true); $data = json_decode($data, true);
if (is_null($data)) { if (is_null($data)) {
$this->failure(-32700, 'Parse error语法解析错误'); $this->failure(-32700, 'Parse error语法解析错误');
} else if (!isset($data['jsonrpc']) || !isset($data['method']) || $data['jsonrpc'] != '2.0') { } else if (!isset($data['jsonrpc']) || !isset($data['method']) || $data['jsonrpc'] != '2.0') {
$this->failure(-32600, 'Invalid Request无效请求'); $this->failure(-32600, 'Invalid Request无效请求');
} else { } else {
$this->batchDispatch($server, $fd, $data); $this->batchDispatch($server, $fd, $data);
} }
} }
/** /**
* @param Server $server * @param Server $server
* @param int $fd * @param int $fd
* @param array $data * @param array $data
* @return void * @return void
*/ */
private function batchDispatch(Server $server, int $fd, array $data): void private function batchDispatch(Server $server, int $fd, array $data): void
{ {
if (isset($data['jsonrpc'])) { if (isset($data['jsonrpc'])) {
$dispatch = $this->dispatch($data); $dispatch = $this->dispatch($data);
if (!isset($data['id'])) { if (!isset($data['id'])) {
$dispatch = [1]; $dispatch = [1];
} }
$result = json_encode($dispatch, JSON_UNESCAPED_UNICODE); $result = json_encode($dispatch, JSON_UNESCAPED_UNICODE);
} else { } else {
$channel = new Channel($total = count($data)); $channel = new Channel($total = count($data));
foreach ($data as $datum) { foreach ($data as $datum) {
$this->_execute($channel, $datum); $this->_execute($channel, $datum);
} }
$result = []; $result = [];
for ($i = 0; $i < $total; $i++) { for ($i = 0; $i < $total; $i++) {
$result[] = $channel->pop(); $result[] = $channel->pop();
} }
} }
$server->send($fd, json_encode($result, JSON_UNESCAPED_UNICODE)); $server->send($fd, json_encode($result, JSON_UNESCAPED_UNICODE));
} }
/** /**
* @param $channel * @param $channel
* @param $datum * @param $datum
*/ */
private function _execute($channel, $datum) private function _execute($channel, $datum)
{ {
Coroutine::create(function () use ($channel, $datum) { Coroutine::create(function () use ($channel, $datum) {
if (empty($datum) || !isset($datum['jsonrpc'])) { if (empty($datum) || !isset($datum['jsonrpc'])) {
$channel->push($this->failure(-32700, 'Parse error语法解析错误')); $channel->push($this->failure(-32700, 'Parse error语法解析错误'));
} else if (!isset($datum['method'])) { } else if (!isset($datum['method'])) {
$channel->push($this->failure(-32700, 'Parse error语法解析错误')); $channel->push($this->failure(-32700, 'Parse error语法解析错误'));
} else { } else {
$dispatch = $this->dispatch($datum); $dispatch = $this->dispatch($datum);
if (!isset($dispatch['id'])) { if (!isset($dispatch['id'])) {
$dispatch = [1]; $dispatch = [1];
} }
$channel->push($dispatch); $channel->push($dispatch);
} }
}); });
} }
/** /**
* @param $data * @param $data
* @return array * @return array
*/ */
private function dispatch($data): array private function dispatch($data): array
{ {
try { try {
$handler = $this->collector->find($data['service'], 'GET'); if (!str_starts_with($data['service'], '/')) {
var_dump($this->collector, $data); $data['service'] = '/' . $data['service'];
if (is_integer($handler) || is_null($handler)) { }
throw new Exception('Handler not found', -32601); $handler = $this->collector->find($data['service'], 'GET');
var_dump($handler, $data);
if (is_integer($handler) || is_null($handler)) {
throw new Exception('Handler not found', -32601);
} }
$controller = $handler->callback[0]; $controller = $handler->callback[0];
@@ -241,73 +244,73 @@ class RpcJsonp extends Component implements OnConnectInterface, OnReceiveInterfa
return $this->handler($controller, $data['method'], $params); return $this->handler($controller, $data['method'], $params);
} catch (\Throwable $throwable) { } catch (\Throwable $throwable) {
$code = $throwable->getCode() == 0 ? -32603 : $throwable->getCode(); $code = $throwable->getCode() == 0 ? -32603 : $throwable->getCode();
return $this->failure($code, jTraceEx($throwable), [], $data['id'] ?? null); return $this->failure($code, jTraceEx($throwable), [], $data['id'] ?? null);
} }
} }
/** /**
* @param $params * @param $params
* @return ServerRequestInterface * @return ServerRequestInterface
* @throws Exception * @throws Exception
*/ */
private function createServerRequest($params): ServerRequestInterface private function createServerRequest($params): ServerRequestInterface
{ {
return (new ServerRequest())->withParsedBody($params); return (new ServerRequest())->withParsedBody($params);
} }
/** /**
* @param $controller * @param $controller
* @param string $method * @param string $method
* @param $params * @param $params
* @return array * @return array
*/ */
#[ArrayShape([])] #[ArrayShape([])]
private function handler($controller, string $method, $params): array private function handler($controller, string $method, $params): array
{ {
$result = call_user_func([$controller, $method], ...$params); $result = call_user_func([$controller, $method], ...$params);
return [ return [
'jsonrpc' => '2.0', 'jsonrpc' => '2.0',
'result' => $result, 'result' => $result,
'id' => $data['id'] ?? null 'id' => $data['id'] ?? null
]; ];
} }
/** /**
* @param $code * @param $code
* @param $message * @param $message
* @param array $data * @param array $data
* @param null $id * @param null $id
* @return array * @return array
*/ */
#[ArrayShape([])] #[ArrayShape([])]
protected function failure($code, $message, array $data = [], $id = null): array protected function failure($code, $message, array $data = [], $id = null): array
{ {
$error = [ $error = [
'jsonrpc' => '2.0', 'jsonrpc' => '2.0',
'error' => [ 'error' => [
'code' => $code, 'code' => $code,
'message' => $message, 'message' => $message,
'data' => $data 'data' => $data
] ]
]; ];
if (!is_null($id)) { if (!is_null($id)) {
$error['id'] = $id; $error['id'] = $id;
} }
return $error; return $error;
} }
/** /**
* @param \Swoole\WebSocket\Server $server * @param \Swoole\WebSocket\Server $server
* @param int $fd * @param int $fd
* @return void * @return void
*/ */
public function onClose(\Swoole\WebSocket\Server $server, int $fd): void public function onClose(\Swoole\WebSocket\Server $server, int $fd): void
{ {
// TODO: Implement onClose() method. // TODO: Implement onClose() method.
} }
} }