diff --git a/Abstracts/AsyncServer.php b/Abstracts/AsyncServer.php index ae88192..f40d9a7 100644 --- a/Abstracts/AsyncServer.php +++ b/Abstracts/AsyncServer.php @@ -9,6 +9,7 @@ use Kiri\Server\Constant; use Kiri\Server\Events\OnServerBeforeStart; use Kiri\Server\Events\OnShutdown; use Kiri\Server\Handler\OnServer; +use Kiri\Server\Processes\TraitProcess; use Kiri\Server\ServerInterface; use Kiri\Server\Task\Task; use Swoole\Server; @@ -21,6 +22,7 @@ class AsyncServer implements ServerInterface { use TraitServer; + use TraitProcess; /** diff --git a/Abstracts/TraitServer.php b/Abstracts/TraitServer.php index 01a0375..a9db1bb 100644 --- a/Abstracts/TraitServer.php +++ b/Abstracts/TraitServer.php @@ -15,65 +15,6 @@ use Swoole\Process; trait TraitServer { - - /** - * @var array - */ - private array $_process = []; - - - /** - * @param string|array|AbstractProcess $class - * @return void - * @throws - */ - public function addProcess(string|array|AbstractProcess $class): void - { - if (!is_array($class)) { - $class = [$class]; - } - foreach ($class as $name) { - if (is_string($name)) { - $name = Kiri::getDi()->get($name); - } - if (isset($this->_process[$name->getName()])) { - throw new Exception('AbstractProcess(' . $name->getName() . ') is exists.'); - } - $process = $this->genProcess($name); - if ($name->isEnableQueue()) { - $process->useQueue(); - } - $this->_process[$name->getName()] = $process; - } - } - - - /** - * @param AbstractProcess $name - * @return Process - */ - private function genProcess(AbstractProcess $name): Process - { - return new Process(function (Process $process) use ($name) { - $process->name('[' . \config('id','system-service') . '].' . $name->getName()); - $name->onSigterm()->process($process); - }, - $name->getRedirectStdinAndStdout(), - $name->getPipeType(), - $name->isEnableCoroutine()); - } - - - /** - * @param string $name - * @return AbstractProcess|null - */ - public function getProcess(string $name): ?Process - { - return $this->_process[$name] ?? null; - } - - /** * @return void * @throws @@ -121,15 +62,6 @@ trait TraitServer } - /** - * @return array - */ - public function getProcesses(): array - { - return $this->_process; - } - - /** * @param array $ports * @return array diff --git a/Processes/AbstractProcess.php b/Processes/AbstractProcess.php index 6601f54..ce19e36 100644 --- a/Processes/AbstractProcess.php +++ b/Processes/AbstractProcess.php @@ -3,8 +3,8 @@ namespace Kiri\Server\Processes; -use Kiri\Di\Context; use Swoole\Coroutine; +use Swoole\Process; /** * @@ -15,6 +15,9 @@ abstract class AbstractProcess implements OnProcessInterface private bool $stop = false; + public Process $process; + + /** * @var bool */ @@ -109,24 +112,46 @@ abstract class AbstractProcess implements OnProcessInterface /** - * @return $this + * @return void */ - abstract public function onSigterm(): static; + abstract public function onSigterm(): void; + + + /** + * @param Process $process + * @return AbstractProcess + */ + public function onShutdown(Process $process): static + { + $this->process = $process; + if ($this->enable_coroutine) { + Coroutine::create(fn () => $this->coroutineWaitSignal()); + } else { + pcntl_signal(SIGTERM, [$this, 'pointWaitSignal']); + } + return $this; + } /** * @param $data * @return void */ - protected function onShutdown($data): void + private function pointWaitSignal($data): void { $this->stop = true; - $value = Context::get('waite:process:message'); - \Kiri::getLogger()->alert('AbstractProcess ' . $this->getName() . ' stop'); - if (!is_null($value) && Coroutine::exists((int)$value)) { - Coroutine::cancel((int)$value); - } } + /** + * @return void + */ + private function coroutineWaitSignal(): void + { + $value = Coroutine::waitSignal(SIGTERM); + if ($value) { + $this->stop = true; + } + } + } diff --git a/Processes/TraitProcess.php b/Processes/TraitProcess.php new file mode 100644 index 0000000..d53035c --- /dev/null +++ b/Processes/TraitProcess.php @@ -0,0 +1,77 @@ +get($name); + } + if (isset($this->_process[$name->getName()])) { + throw new Exception('AbstractProcess(' . $name->getName() . ') is exists.'); + } + $process = $this->genProcess($name); + if ($name->isEnableQueue()) { + $process->useQueue(); + } + $this->_process[$name->getName()] = $process; + } + } + + + /** + * @param AbstractProcess $name + * @return Process + */ + private function genProcess(AbstractProcess $name): Process + { + return new Process(function (Process $process) use ($name) { + $process->name('[' . \config('id', 'system-service') . '].' . $name->getName()); + $name->onShutdown($process)->process($process); + }, + $name->getRedirectStdinAndStdout(), + $name->getPipeType(), + $name->isEnableCoroutine()); + } + + + /** + * @param string $name + * @return AbstractProcess|null + */ + public function getProcess(string $name): ?Process + { + return $this->_process[$name] ?? null; + } + + + /** + * @return array + */ + public function getProcesses(): array + { + return $this->_process; + } +} \ No newline at end of file