diff --git a/Abstracts/ProcessManager.php b/Abstracts/ProcessManager.php index 307e2ce..aa85f0f 100644 --- a/Abstracts/ProcessManager.php +++ b/Abstracts/ProcessManager.php @@ -10,6 +10,7 @@ use Kiri\Abstracts\Component; use Kiri\Server\Contract\OnProcessInterface; use Psr\Container\ContainerExceptionInterface; use Psr\Container\NotFoundExceptionInterface; +use Swoole\Coroutine; use Swoole\Process; use Kiri\Annotation\Inject; use Kiri\Di\ContainerInterface; @@ -21,7 +22,7 @@ class ProcessManager extends Component { - /** @var array */ + /** @var array */ private array $_process = []; @@ -54,8 +55,20 @@ class ProcessManager extends Component public function OnServerBeforeStart(OnServerBeforeStart $beforeStart): void { $server = $this->container->get(ServerInterface::class); - foreach ($this->_process as $process) { - $server->addProcess($process); + foreach ($this->_process as $custom) { + if (Kiri\Di\Context::inCoroutine()) { + Coroutine::create(function () use ($custom) { + $custom->onSigterm()->process(null); + }); + } else { + $server->addProcess(new Process(function (Process $process) use ($custom) { + $this->extracted($custom, $process); + }, + $custom->getRedirectStdinAndStdout(), + $custom->getPipeType(), + $custom->isEnableCoroutine() + )); + } } } @@ -78,16 +91,12 @@ class ProcessManager extends Component if (is_string($custom)) { $custom = Kiri::getDi()->get($custom); } + if (isset($this->_process[$custom->getName()])) { throw new Exception('Process(' . $custom->getName() . ') is exists.'); } - $this->_process[$custom->getName()] = new Process(function (Process $process) use ($custom) { - $this->extracted($custom, $process); - }, - $custom->getRedirectStdinAndStdout(), - $custom->getPipeType(), - $custom->isEnableCoroutine() - ); + + $this->_process[$custom->getName()] = $custom; } @@ -96,9 +105,9 @@ class ProcessManager extends Component */ public function shutdown(): void { - foreach ($this->_process as $process) { - Process::kill($process->pid, 0) && Process::kill($process->pid, 15); - } +// foreach ($this->_process as $process) { +// Process::kill($process->pid, 0) && Process::kill($process->pid, 15); +// } } @@ -121,17 +130,17 @@ class ProcessManager extends Component */ public function get(?string $name = null, string $tag = 'default'): array|Process|null { - $process = $this->_process[$tag] ?? null; - if (empty($process)) { - return null; - } - if (!empty($name)) { - if (!isset($process[$name])) { - return null; - } - return $process[$name]; - } - return $process; +// $process = $this->_process[$tag] ?? null; +// if (empty($process)) { +// return null; +// } +// if (!empty($name)) { +// if (!isset($process[$name])) { +// return null; +// } +// return $process[$name]; +// } + return null; } @@ -140,9 +149,9 @@ class ProcessManager extends Component */ public function stop(): void { - foreach ($this->_process as $process) { - Process::kill($process->pid, 0) && Process::kill($process->pid, 15); - } +// foreach ($this->_process as $process) { +// Process::kill($process->pid, 0) && Process::kill($process->pid, 15); +// } } @@ -169,11 +178,11 @@ class ProcessManager extends Component */ public function push(string $name, string $message): void { - if (!isset($this->_process[$name])) { - return; - } - $process = $this->_process[$name]; - $process->write($message); +// if (!isset($this->_process[$name])) { +// return; +// } +// $process = $this->_process[$name]; +// $process->write($message); } /** diff --git a/Contract/OnProcessInterface.php b/Contract/OnProcessInterface.php index c327c64..264ad1e 100644 --- a/Contract/OnProcessInterface.php +++ b/Contract/OnProcessInterface.php @@ -16,9 +16,9 @@ interface OnProcessInterface /** - * @param Process $process + * @param ?Process $process */ - public function process(Process $process): void; + public function process(?Process $process): void; diff --git a/CoroutineServer.php b/CoroutineServer.php index 1f8acc1..9bca633 100644 --- a/CoroutineServer.php +++ b/CoroutineServer.php @@ -25,24 +25,25 @@ use Swoole\Coroutine\Server as ScServer; use Swoole\Coroutine\Http\Server as SchServer; use Swoole\Http\Request; use Swoole\Http\Response; +use Swoole\Process; use Swoole\Server; class CoroutineServer implements ServerInterface { - + use TraitServer; - - + + /** @var array */ private array $servers = []; - - + + /** * @var Server|null */ private Server|null $server = null; - - + + /** * @param Config $config * @param ContainerInterface $container @@ -57,8 +58,8 @@ class CoroutineServer implements ServerInterface public ProcessManager $processManager) { } - - + + /** * @param array $service * @param int $daemon @@ -85,8 +86,8 @@ class CoroutineServer implements ServerInterface // $this->processManager->batch(Config::get('processes', [])); } - - + + /** * @param SConfig $config * @throws ContainerExceptionInterface @@ -95,19 +96,19 @@ class CoroutineServer implements ServerInterface public function addListener(SConfig $config): void { $server = new SchServer($config->getHost(), $config->getPort(), false, true); - + $events = $config->getEvents()[Constant::REQUEST] ?? null; if (is_null($events)) { $events = [\Kiri\Message\Server::class, 'onRequest']; } - + $events[0] = $this->container->get($events[0]); $server->handle('/', $events); - + $this->servers[] = $server; } - - + + /** * @param string $name * @return ScServer|SchServer|null @@ -116,8 +117,8 @@ class CoroutineServer implements ServerInterface { return $this->servers[$name] ?? null; } - - + + /** * @return bool * @throws ContainerExceptionInterface @@ -128,13 +129,13 @@ class CoroutineServer implements ServerInterface foreach ($this->servers as $server) { $server->shutdown(); } - + $this->dispatch->dispatch(new OnShutdown()); - + return true; } - - + + /** * @param $no * @param array $signInfo @@ -149,8 +150,8 @@ class CoroutineServer implements ServerInterface $this->logger->error($exception->getMessage()); } } - - + + /** * @param Server\Port|Server $base * @param array $events @@ -167,8 +168,8 @@ class CoroutineServer implements ServerInterface $base->on($name, $event); } } - - + + /** * @return void */ @@ -176,9 +177,9 @@ class CoroutineServer implements ServerInterface { Coroutine\run(function () { $this->dispatch->dispatch(new OnServerBeforeStart()); - + $this->onSignal(Config::get('signal', [])); - + $this->onTasker(); foreach ($this->servers as $server) { Coroutine::create(static function () use ($server) { @@ -192,10 +193,10 @@ class CoroutineServer implements ServerInterface } }); } - - + + private Coroutine\Channel $channel; - + /** * @return void * @throws ConfigException @@ -205,32 +206,32 @@ class CoroutineServer implements ServerInterface private function onTasker(): void { $config = Config::get('server.settings', []); - + if (isset($config[Constant::OPTION_TASK_WORKER_NUM])) { if ($config[Constant::OPTION_TASK_WORKER_NUM] < 1) { return; } } - + $taskEvents = $config['events'][Constant::TASK] ?? null; $finishEvents = $config['events'][Constant::FINISH] ?? null; - + if (is_null($taskEvents)) { return; } - + $taskEvents[0] = $this->container->get($taskEvents[0]); if (!is_null($finishEvents)) { $finishEvents[0] = $this->container->get($finishEvents[0]); } - + $this->channel = new Coroutine\Channel($config[Constant::OPTION_TASK_WORKER_NUM]); for ($i = 0; $i < $config[Constant::OPTION_TASK_WORKER_NUM]; $i++) { Coroutine::create(static fn() => $this->taskRunner($i, $taskEvents, $finishEvents)); } } - - + + /** * @param $taskId * @param $callback @@ -248,5 +249,5 @@ class CoroutineServer implements ServerInterface } $this->taskRunner($taskId, $callback, $finishEvents); } - + }