diff --git a/Abstracts/AsyncServer.php b/Abstracts/AsyncServer.php index 026c270..ed9b2f8 100644 --- a/Abstracts/AsyncServer.php +++ b/Abstracts/AsyncServer.php @@ -61,6 +61,7 @@ class AsyncServer implements ServerInterface * @throws ContainerExceptionInterface * @throws NotFindClassException * @throws NotFoundExceptionInterface + * @throws Exception */ public function initCoreServers(array $service, int $daemon = 0): void { @@ -73,8 +74,7 @@ class AsyncServer implements ServerInterface if (!empty($rpcService)) { $this->addListener(instance(SConfig::class, [], $rpcService)); } - $this->processManager->batch(Config::get('processes', []), $this->server); - $this->processManager->batch($this->getProcess(), $this->server); + $this->processManager->batch(Config::get('processes', [])); } @@ -254,6 +254,7 @@ class AsyncServer implements ServerInterface * @return void * @throws ContainerExceptionInterface * @throws NotFoundExceptionInterface + * @throws ReflectionException */ public function start(): void { diff --git a/Abstracts/ProcessManager.php b/Abstracts/ProcessManager.php index 4f42bc2..49e861c 100644 --- a/Abstracts/ProcessManager.php +++ b/Abstracts/ProcessManager.php @@ -3,20 +3,21 @@ namespace Kiri\Server\Abstracts; use Closure; +use Exception; use Kiri; use Kiri\Abstracts\Config; -use Kiri\Context; -use Kiri\Events\EventDispatch; -use Kiri\Exception\ConfigException; -use Kiri\Server\Broadcast\Message; +use Kiri\Abstracts\Component; use Kiri\Server\Contract\OnProcessInterface; -use Kiri\Server\Events\OnProcessStart; -use Psr\Log\LoggerInterface; +use Psr\Container\ContainerExceptionInterface; +use Psr\Container\NotFoundExceptionInterface; use Swoole\Process; -use Kiri\Server\Events\OnProcessStop; +use Kiri\Annotation\Inject; use Kiri\Di\ContainerInterface; +use Kiri\Events\EventProvider; +use Kiri\Server\ServerInterface; +use Kiri\Server\Events\OnServerBeforeStart; -class ProcessManager +class ProcessManager extends Component { @@ -25,35 +26,65 @@ class ProcessManager /** - * @param ContainerInterface $container - * @param LoggerInterface $logger + * @var ContainerInterface */ - public function __construct(public ContainerInterface $container, public LoggerInterface $logger) + #[Inject(ContainerInterface::class)] + public ContainerInterface $container; + + + #[Inject(EventProvider::class)] + public EventProvider $provider; + + /** + * @return void + * @throws Exception + */ + public function init(): void { + $this->provider->on(OnServerBeforeStart::class, [$this, 'OnServerBeforeStart']); } /** - * @param string|OnProcessInterface|BaseProcess $customProcess - * @return array + * @param OnServerBeforeStart $beforeStart + * @return void + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface */ - public function add(string|OnProcessInterface|BaseProcess $customProcess): array + public function OnServerBeforeStart(OnServerBeforeStart $beforeStart): void { - if (is_string($customProcess)) { - $customProcess = Kiri::getDi()->get($customProcess); + $server = $this->container->get(ServerInterface::class); + foreach ($this->_process as $process) { + $server->addProcess($process); } + } - if (Context::inCoroutine()) { - return [$customProcess, $this->resolve($customProcess)]; + + /** + * @param string|OnProcessInterface|BaseProcess $custom + * @throws Exception + */ + public function add(string|OnProcessInterface|BaseProcess $custom): void + { + if (is_string($custom)) { + $custom = Kiri::getDi()->get($custom); } - - $process = new Process($this->resolve($customProcess), - $customProcess->getRedirectStdinAndStdout(), - $customProcess->getPipeType(), - $customProcess->isEnableCoroutine() + if (isset($this->process[$custom->getName()])) { + throw new Exception('Process(' . $custom->getName() . ') is exists.'); + } + $this->_process[$custom->getName()] = new Process(function (Process $process) use ($custom) { + set_env('environmental', Kiri::PROCESS); + $system = sprintf('[%s].Custom Process', Config::get('id', 'system-service')); + Kiri::getLogger()->alert($system . ' ' . $custom->getName() . ' start.'); + if (Kiri::getPlatform()->isLinux()) { + $process->name($system . '[' . $process->pid . '].' . $custom->getName()); + } + $custom->onSigterm()->process($process); + }, + $custom->getRedirectStdinAndStdout(), + $custom->getPipeType(), + $custom->isEnableCoroutine() ); - - return [$customProcess, $process]; } @@ -79,7 +110,7 @@ class ProcessManager $system = sprintf('[%s].Custom Process', Config::get('id', 'system-service')); Kiri::getLogger()->alert($system . ' ' . $customProcess->getName() . ' start.'); if (Kiri::getPlatform()->isLinux()) { - $process->name($system .'['.$process->pid.'].'. $customProcess->getName()); + $process->name($system . '[' . $process->pid . '].' . $customProcess->getName()); } $customProcess->onSigterm()->process($process); }; @@ -120,21 +151,16 @@ class ProcessManager /** * @param array|null $processes - * @param \Swoole\Server|null $server * @return void - * @throws ConfigException + * @throws Exception */ - public function batch(?array $processes, ?\Swoole\Server $server = null): void + public function batch(?array $processes): void { if (empty($processes)) { return; } foreach ($processes as $process) { - [$customProcess, $sProcess] = $this->add($process); - - $this->_process[$customProcess->getName()] = $customProcess; - - $server->addProcess($sProcess); + $this->add($process); } } @@ -142,18 +168,15 @@ class ProcessManager /** * @param string $message * @param string $name - * @param string $tag * @return void */ - public function push(string $message, string $name = '', string $tag = 'default'): void + public function push(string $name, string $message): void { - $processes = $this->_process; - if (!empty($this->_process[$name])) { - $processes = [$this->_process[$name]]; - } - foreach ($processes as $process) { - $process->write(serialize(new Message($message))); + if (!isset($this->_process[$name])) { + return; } + $process = $this->_process[$name]; + $process->write($message); } diff --git a/Abstracts/TraitServer.php b/Abstracts/TraitServer.php index 0869945..6d1ea3f 100644 --- a/Abstracts/TraitServer.php +++ b/Abstracts/TraitServer.php @@ -2,6 +2,7 @@ namespace Kiri\Server\Abstracts; +use Exception; use Swoole\Http\Server as HServer; use Swoole\Server; use Kiri\Server\Constant; @@ -13,22 +14,22 @@ trait TraitServer private array $_process = []; - - + + /** * @param string|array|BaseProcess $class * @return void + * @throws Exception */ public function addProcess(string|array|BaseProcess $class): void { - if (is_object($class)) { - $this->_process[] = $class; - } else if (is_string($class)) { - $this->_process[] = $class; - } else { - foreach ($class as $name) { - $this->_process[] = $name; - } + $container = \Kiri::getDi()->get(ProcessManager::class); + + if (!is_array($class)) { + $class = [$class]; + } + foreach ($class as $name) { + $container->add($name); } } diff --git a/Server.php b/Server.php index bdf92ba..39bbe51 100644 --- a/Server.php +++ b/Server.php @@ -39,9 +39,6 @@ defined('PID_PATH') or define('PID_PATH', APP_PATH . 'storage/server.pid'); class Server extends HttpService { - private array $process = []; - - private mixed $daemon = 0; @@ -91,10 +88,11 @@ class Server extends HttpService /** * @param $process + * @throws Exception */ public function addProcess($process) { - $this->process[] = $process; + $this->processManager->add($process); } @@ -108,7 +106,6 @@ class Server extends HttpService public function start(): void { $this->onHotReload(); - $this->manager->addProcess($this->process); $this->manager->initCoreServers(Config::get('server', [], true), $this->daemon); $this->manager->onSignal(Config::get('signal', [])); $this->dispatch->dispatch(new OnServerBeforeStart());