*/ private array $servers = []; /** * @var Server|null */ private Server|null $server = null; /** * @param Config $config * @param ContainerInterface $container * @param EventDispatch $dispatch * @param LoggerInterface $logger * @param ProcessManager $processManager */ public function __construct(public Config $config, public ContainerInterface $container, public EventDispatch $dispatch, public LoggerInterface $logger, public ProcessManager $processManager) { } /** * @param array $service * @param int $daemon * @return void * @throws ConfigException * @throws ContainerExceptionInterface * @throws NotFoundExceptionInterface * @throws Exception */ public function initCoreServers(array $service, int $daemon = 0): void { $service = $this->genConfigService($service); foreach ($service as $value) { if ($value->getType() == Constant::SERVER_TYPE_HTTP) { $this->addListener($value); } } $rpcService = Config::get('rpc', []); if (!empty($rpcService)) { $this->addListener(instance(SConfig::class, [], $rpcService)); } // $this->processManager->batch(Config::get('processes', [])); } /** * @param SConfig $config * @throws ContainerExceptionInterface * @throws NotFoundExceptionInterface */ public function addListener(SConfig $config): void { $server = new SchServer($config->getHost(), $config->getPort(), false, true); $events = $config->getEvents()[Constant::REQUEST] ?? null; if (is_null($events)) { $events = [\Kiri\Message\Server::class, 'onRequest']; } $events[0] = $this->container->get($events[0]); $server->handle('/', $events); $this->servers[] = $server; } /** * @param string $name * @return ScServer|SchServer|null */ public function getServer(string $name = ''): ScServer|SchServer|null { return $this->servers[$name] ?? null; } /** * @return bool * @throws ContainerExceptionInterface * @throws NotFoundExceptionInterface */ public function shutdown(): bool { foreach ($this->servers as $server) { $server->shutdown(); } $this->dispatch->dispatch(new OnShutdown()); return true; } /** * @param $no * @param array $signInfo * @return void */ public function onSigint($no, array $signInfo): void { try { $this->logger->alert('Pid ' . getmypid() . ' get signo ' . $no); $this->shutdown(); } catch (\Throwable $exception) { $this->logger->error($exception->getMessage()); } } /** * @param Server\Port|Server $base * @param array $events * @return void * @throws ContainerExceptionInterface * @throws NotFoundExceptionInterface */ private function onEventListen(Server\Port|Server $base, array $events): void { foreach ($events as $name => $event) { if (is_array($event) && is_string($event[0])) { $event[0] = $this->container->get($event[0]); } $base->on($name, $event); } } /** * @return void */ public function start(): void { Coroutine\run(function () { $this->dispatch->dispatch(new OnServerBeforeStart()); $this->onSignal(Config::get('signal', [])); $this->onTasker(); foreach ($this->servers as $server) { Coroutine::create(static function () use ($server) { $server->start(); }); } }); } private Coroutine\Channel $channel; /** * @return void * @throws ConfigException * @throws ContainerExceptionInterface * @throws NotFoundExceptionInterface */ private function onTasker(): void { $config = Config::get('server.settings', []); if (isset($config[Constant::OPTION_TASK_WORKER_NUM])) { if ($config[Constant::OPTION_TASK_WORKER_NUM] < 1) { return; } } $taskEvents = $config['events'][Constant::TASK] ?? null; $finishEvents = $config['events'][Constant::FINISH] ?? null; if (is_null($taskEvents)) { return; } $taskEvents[0] = $this->container->get($taskEvents[0]); if (!is_null($finishEvents)) { $finishEvents[0] = $this->container->get($finishEvents[0]); } $this->channel = new Coroutine\Channel($config[Constant::OPTION_TASK_WORKER_NUM]); for ($i = 0; $i < $config[Constant::OPTION_TASK_WORKER_NUM]; $i++) { Coroutine::create(static fn() => $this->taskRunner($i, $taskEvents, $finishEvents)); } } /** * @param $taskId * @param $callback * @param $finishEvents * @return void */ private function taskRunner($taskId, $callback, $finishEvents): void { $taskData = $this->channel->pop(); if (!is_null($taskData)) { $result = $callback($taskId, $taskData); if (is_callable($finishEvents, true)) { $finishEvents($taskId, $result); } } $this->taskRunner($taskId, $callback, $finishEvents); } }