diff --git a/Abstracts/AsyncServer.php b/Abstracts/AsyncServer.php index 6ee3221..04c8a8a 100644 --- a/Abstracts/AsyncServer.php +++ b/Abstracts/AsyncServer.php @@ -173,29 +173,6 @@ class AsyncServer implements ServerInterface $this->container->get(LocalService::class)->set($config->getName(), $port); } - - /** - * @param array $signal - * @return void - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - * @throws Exception - */ - public function onSignal(array $signal): void - { - pcntl_signal(SIGINT, [$this, 'onSigint']); - foreach ($signal as $sig => $value) { - if (is_array($value) && is_string($value[0])) { - $value[0] = $this->container->get($value[0]); - } - if (!is_callable($value, true)) { - throw new Exception('Register signal callback must can exec.'); - } - pcntl_signal($sig, $value); - } - } - - /** * @param $no * @param array $signInfo @@ -254,7 +231,6 @@ class AsyncServer implements ServerInterface * @return void * @throws ContainerExceptionInterface * @throws NotFoundExceptionInterface - * @throws ReflectionException */ public function start(): void { diff --git a/Abstracts/TraitServer.php b/Abstracts/TraitServer.php index 6d1ea3f..defd075 100644 --- a/Abstracts/TraitServer.php +++ b/Abstracts/TraitServer.php @@ -3,6 +3,8 @@ namespace Kiri\Server\Abstracts; use Exception; +use Psr\Container\ContainerExceptionInterface; +use Psr\Container\NotFoundExceptionInterface; use Swoole\Http\Server as HServer; use Swoole\Server; use Kiri\Server\Constant; @@ -32,6 +34,30 @@ trait TraitServer $container->add($name); } } + + + + + /** + * @param array $signal + * @return void + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + * @throws Exception + */ + public function onSignal(array $signal): void + { + pcntl_signal(SIGINT, [$this, 'onSigint']); + foreach ($signal as $sig => $value) { + if (is_array($value) && is_string($value[0])) { + $value[0] = $this->container->get($value[0]); + } + if (!is_callable($value, true)) { + throw new Exception('Register signal callback must can exec.'); + } + pcntl_signal($sig, $value); + } + } /** diff --git a/CoroutineServer.php b/CoroutineServer.php new file mode 100644 index 0000000..1a181c6 --- /dev/null +++ b/CoroutineServer.php @@ -0,0 +1,240 @@ + */ + private array $servers = []; + + + /** + * @var Server|null + */ + private Server|null $server = null; + + + /** + * @param Config $config + * @param ContainerInterface $container + * @param EventDispatch $dispatch + * @param LoggerInterface $logger + * @param ProcessManager $processManager + */ + public function __construct(public Config $config, + public ContainerInterface $container, + public EventDispatch $dispatch, + public LoggerInterface $logger, + public ProcessManager $processManager) + { + } + + + /** + * @param array $service + * @param int $daemon + * @return void + * @throws ConfigException + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + * @throws Exception + */ + public function initCoreServers(array $service, int $daemon = 0): void + { + $service = $this->genConfigService($service); + foreach ($service as $value) { + if ($value['type'] == Constant::SERVER_TYPE_HTTP) { + $this->addListener($value); + } + } + $rpcService = Config::get('rpc', []); + if (!empty($rpcService)) { + $this->addListener(instance(SConfig::class, [], $rpcService)); + } +// $this->processManager->batch(Config::get('processes', [])); + } + + + /** + * @param SConfig $config + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + */ + public function addListener(SConfig $config): void + { + $server = new Coroutine\Http\Server($config->getHost(), $config->getPort(), false, true); + + $events = $config->getEvents()[Constant::REQUEST] ?? null; + if (is_null($events)) { + $events = [\Kiri\Message\Server::class, 'onRequest']; + } + + $events[0] = $this->container->get($events[0]); + $server->handle('/', $events); + + $this->servers[] = $server; + } + + + /** + * @param string $name + * @return ScServer|SchServer|null + */ + public function getServer(string $name = ''): ScServer|SchServer|null + { + return $this->servers[$name] ?? null; + } + + + /** + * @return bool + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + */ + public function shutdown(): bool + { + foreach ($this->servers as $server) { + $server->shutdown(); + } + + $this->dispatch->dispatch(new OnShutdown()); + + return true; + } + + + /** + * @param $no + * @param array $signInfo + * @return void + */ + public function onSigint($no, array $signInfo): void + { + try { + $this->logger->alert('Pid ' . getmypid() . ' get signo ' . $no); + $this->shutdown(); + } catch (\Throwable $exception) { + $this->logger->error($exception->getMessage()); + } + } + + + /** + * @param Server\Port|Server $base + * @param array $events + * @return void + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + */ + private function onEventListen(Server\Port|Server $base, array $events): void + { + foreach ($events as $name => $event) { + if (is_array($event) && is_string($event[0])) { + $event[0] = $this->container->get($event[0]); + } + $base->on($name, $event); + } + } + + + /** + * @return void + */ + public function start(): void + { + Coroutine\run(function () { + $this->dispatch->dispatch(new OnServerBeforeStart()); + + $this->onTasker(); + foreach ($this->servers as $server) { + Coroutine::create(function () use ($server) { + $server->start(); + }); + } + }); + } + + + private Coroutine\Channel $channel; + + /** + * @return void + * @throws ConfigException + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + */ + private function onTasker(): void + { + $config = Config::get('server.settings', []); + + if (isset($config[Constant::OPTION_TASK_WORKER_NUM])) { + if ($config[Constant::OPTION_TASK_WORKER_NUM] < 1) { + return; + } + } + + $taskEvents = $config['events'][Constant::TASK] ?? null; + $finishEvents = $config['events'][Constant::FINISH] ?? null; + + if (is_null($taskEvents)) { + return; + } + + $taskEvents[0] = $this->container->get($taskEvents[0]); + if (!is_null($finishEvents)) { + $finishEvents[0] = $this->container->get($finishEvents[0]); + } + + $this->channel = new Coroutine\Channel($config[Constant::OPTION_TASK_WORKER_NUM]); + for ($i = 0; $i < $config[Constant::OPTION_TASK_WORKER_NUM]; $i++) { + Coroutine::create(fn() => $this->taskRunner($i, $taskEvents, $finishEvents)); + } + } + + + /** + * @param $taskId + * @param $callback + * @param $finishEvents + * @return void + */ + private function taskRunner($taskId, $callback, $finishEvents): void + { + $taskData = $this->channel->pop(); + if (!is_null($taskData)) { + $result = $callback($taskId, $taskData); + if (is_callable($finishEvents, true)) { + $finishEvents($taskId, $result); + } + } + $this->taskRunner($taskId, $callback, $finishEvents); + } + +} diff --git a/Server.php b/Server.php index 32f2a11..35aada1 100644 --- a/Server.php +++ b/Server.php @@ -38,13 +38,13 @@ defined('PID_PATH') or define('PID_PATH', APP_PATH . 'storage/server.pid'); */ class Server extends HttpService { - + private mixed $daemon = 0; - - + + /** * @param State $state - * @param AsyncServer $manager + * @param CoroutineServer $manager * @param ContainerInterface $container * @param ProcessManager $processManager * @param EventDispatch $dispatch @@ -54,7 +54,7 @@ class Server extends HttpService * @throws Exception */ public function __construct(public State $state, - public AsyncServer $manager, + public CoroutineServer $manager, public ContainerInterface $container, public ProcessManager $processManager, public EventDispatch $dispatch, @@ -64,8 +64,8 @@ class Server extends HttpService { parent::__construct($config); } - - + + /** * @return void * @throws ConfigException @@ -77,15 +77,15 @@ class Server extends HttpService return; } Coroutine::set([ - 'hook_flags' => (SWOOLE_HOOK_ALL | SWOOLE_HOOK_CURL) ^ SWOOLE_HOOK_BLOCKING_FUNCTION, + 'hook_flags' => (SWOOLE_HOOK_ALL | SWOOLE_HOOK_CURL) ^ SWOOLE_HOOK_BLOCKING_FUNCTION, 'enable_deadlock_check' => FALSE, - 'exit_condition' => function () { + 'exit_condition' => function () { return Coroutine::stats()['coroutine_num'] === 0; } ]); } - - + + /** * @param $process * @throws Exception @@ -94,8 +94,8 @@ class Server extends HttpService { $this->processManager->add($process); } - - + + /** * @return void * @throws ConfigException @@ -110,8 +110,8 @@ class Server extends HttpService $this->manager->onSignal(Config::get('signal', [])); $this->manager->start(); } - - + + /** * @return void * @throws Exception @@ -122,8 +122,8 @@ class Server extends HttpService $this->provider->on(OnWorkerStart::class, [$this, 'setWorkerName']); $this->provider->on(OnTaskerStart::class, [$this, 'setTaskerName']); } - - + + /** * @param OnWorkerStart $onWorkerStart * @throws ConfigException @@ -135,11 +135,11 @@ class Server extends HttpService } $prefix = sprintf('Worker Process[%d].%d', $onWorkerStart->server->worker_pid, $onWorkerStart->workerId); set_env('environmental', Kiri::WORKER); - + Kiri::setProcessName($prefix); } - - + + /** * @param OnTaskerStart $onWorkerStart * @throws ConfigException @@ -151,11 +151,11 @@ class Server extends HttpService } $prefix = sprintf('Tasker Process[%d].%d', $onWorkerStart->server->worker_pid, $onWorkerStart->workerId); set_env('environmental', Kiri::TASK); - + Kiri::setProcessName($prefix); } - - + + /** * @return void * @throws ConfigException @@ -172,8 +172,8 @@ class Server extends HttpService $this->router->scan_build_route(); } } - - + + /** * @return void * @throws ConfigException @@ -189,8 +189,8 @@ class Server extends HttpService } $this->dispatch->dispatch(new OnShutdown()); } - - + + /** * @return bool * @throws ConfigException @@ -200,8 +200,8 @@ class Server extends HttpService { return $this->state->isRunner(); } - - + + /** * @param $daemon * @return Server @@ -214,14 +214,4 @@ class Server extends HttpService $this->daemon = $daemon; return $this; } - - - /** - * @return HServer|SServer|WsServer|null - */ - #[Pure] public function getServer(): HServer|SServer|WsServer|null - { - return $this->manager->getServer(); - } - }