diff --git a/Abstracts/BaseProcess.php b/Abstracts/BaseProcess.php index d8a3ecc..8c75a21 100644 --- a/Abstracts/BaseProcess.php +++ b/Abstracts/BaseProcess.php @@ -48,9 +48,9 @@ abstract class BaseProcess implements OnProcessInterface /** * @return bool - */ + */ public function getRedirectStdinAndStdout(): bool - { + { return $this->redirect_stdin_and_stdout; } @@ -80,6 +80,9 @@ abstract class BaseProcess implements OnProcessInterface } + abstract public function onBroadcast($message); + + /** * */ @@ -105,6 +108,10 @@ abstract class BaseProcess implements OnProcessInterface protected function onShutdown($data): void { $this->isStop = true; + $value = Context::getContext('waite:process:message'); + if (Coroutine::exists($value)) { + Coroutine::cancel($value); + } } diff --git a/Broadcast.php b/Broadcast.php new file mode 100644 index 0000000..c484a60 --- /dev/null +++ b/Broadcast.php @@ -0,0 +1,29 @@ +get(ProcessManager::class)->push($message); + + $server = $di->get(SwooleServerInterface::class); + + $total = $server->setting['worker_num'] + $server->setting['task_worker_num']; + for ($i = 0; $i < $total; $i++) { + $server->sendMessage($message, $i); + } + } + + +} diff --git a/ProcessManager.php b/ProcessManager.php new file mode 100644 index 0000000..ed687ce --- /dev/null +++ b/ProcessManager.php @@ -0,0 +1,101 @@ + */ + private array $_process = []; + + + /** + * @param string|OnProcessInterface|BaseProcess $customProcess + * @return void + * @throws ConfigException + */ + public function add(string|OnProcessInterface|BaseProcess $customProcess) + { + $server = Kiri::getDi()->get(SwooleServerInterface::class); + if (is_string($customProcess)) { + $customProcess = Kiri::getDi()->get($customProcess); + } + $system = sprintf('[%s].process', Config::get('id', 'system-service')); + $server->logger->debug($system . ' ' . $customProcess->getName() . ' start.'); + $server->addProcess($process = $this->parse($customProcess, $system)); + $this->_process[$customProcess->getName()] = $process; + } + + + /** + * @param $customProcess + * @param $system + * @return Process + */ + private function parse($customProcess, $system): Process + { + return new Process(function (Process $process) use ($customProcess, $system) { + if (Kiri::getPlatform()->isLinux()) { + $process->name($system . '(' . $customProcess->getName() . ')'); + } + + $channel = Coroutine::create(function () use ($process, $customProcess) { + while (!$customProcess->isStop()) { + $message = $process->read(); + if (!empty($message)) { + $customProcess->onBroadcast($message); + } + } + }); + Context::setContext('waite:process:message', $channel); + + $customProcess->onSigterm()->process($process); + }, + $customProcess->getRedirectStdinAndStdout(), + $customProcess->getPipeType(), + $customProcess->isEnableCoroutine() + ); + } + + + /** + * @param array $processes + * @return void + * @throws ConfigException + */ + public function batch(array $processes) + { + foreach ($processes as $process) { + $this->add($process); + } + } + + + /** + * @param string $message + * @param string $name + * @return void + */ + public function push(string $message, string $name = '') + { + $processes = $this->_process; + if (!empty($this->_process[$name])) { + $processes = [$this->_process[$name]]; + } + foreach ($processes as $process) { + $process->write($message); + } + } + + +} diff --git a/Server.php b/Server.php index 731ffe6..38ceaf2 100644 --- a/Server.php +++ b/Server.php @@ -4,16 +4,16 @@ namespace Kiri\Server; use Exception; -use Kiri\Message\Handler\Abstracts\HttpService; -use Kiri\Message\Handler\Router; use Kiri\Abstracts\Config; +use Kiri\Annotation\Inject; use Kiri\Events\EventDispatch; use Kiri\Exception\ConfigException; use Kiri\Kiri; -use Kiri\Annotation\Inject; +use Kiri\Message\Handler\Abstracts\HttpService; +use Kiri\Message\Handler\Router; +use Kiri\Server\Events\OnShutdown; use Psr\Container\ContainerExceptionInterface; use Psr\Container\NotFoundExceptionInterface; -use Kiri\Server\Events\OnShutdown; use Swoole\Coroutine; @@ -76,53 +76,51 @@ class Server extends HttpService } $processes = array_merge($this->process, Config::get('processes', [])); - foreach ($processes as $process) { - $this->manager()->addProcess($process); - } + + $this->container->get(ProcessManager::class)->batch($processes); return $this->manager()->getServer()->start(); } - /** - * @throws ConfigException - */ - private function configure_set() - { - $enable_coroutine = Config::get('servers.settings.enable_coroutine', false); - Config::set('servers.settings.enable_coroutine', true); - if ($enable_coroutine != true) { - return; - } - Coroutine::set([ - 'hook_flags' => SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_BLOCKING_FUNCTION, - 'enable_deadlock_check' => FALSE, - 'exit_condition' => function () { - return Coroutine::stats()['coroutine_num'] === 0; - } - ]); - } + /** + * @throws ConfigException + */ + private function configure_set() + { + $enable_coroutine = Config::get('servers.settings.enable_coroutine', false); + Config::set('servers.settings.enable_coroutine', true); + if ($enable_coroutine != true) { + return; + } + Coroutine::set([ + 'hook_flags' => SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_BLOCKING_FUNCTION, + 'enable_deadlock_check' => FALSE, + 'exit_condition' => function () { + return Coroutine::stats()['coroutine_num'] === 0; + } + ]); + } - /** - * @return void - * @throws ConfigException - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - * @throws \ReflectionException - * @throws \Exception - */ - public function runtime_start(): void - { - $this->configure_set(); + /** + * @return void + * @throws ConfigException + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + * @throws \ReflectionException + * @throws \Exception + */ + public function runtime_start(): void + { + $this->configure_set(); - $this->container->get(Router::class)->read_files(); + $this->container->get(Router::class)->read_files(); - $this->start(); - } + $this->start(); + } - - /** + /** * @return void * @throws ConfigException * @throws ContainerExceptionInterface diff --git a/ServerManager.php b/ServerManager.php index 35081df..2f48085 100644 --- a/ServerManager.php +++ b/ServerManager.php @@ -5,14 +5,12 @@ namespace Kiri\Server; use Exception; use Kiri\Abstracts\Component; use Kiri\Abstracts\Config; +use Kiri\Annotation\Inject; +use Kiri\Context; use Kiri\Error\Logger; use Kiri\Events\EventDispatch; use Kiri\Exception\ConfigException; use Kiri\Kiri; -use Kiri\Annotation\Inject; -use Psr\Container\ContainerExceptionInterface; -use Psr\Container\NotFoundExceptionInterface; -use ReflectionException; use Kiri\Server\Abstracts\BaseProcess; use Kiri\Server\Contract\OnCloseInterface; use Kiri\Server\Contract\OnConnectInterface; @@ -29,15 +27,15 @@ use Kiri\Server\Handler\OnServerManager; use Kiri\Server\Handler\OnServerReload; use Kiri\Server\Handler\OnServerWorker; use Kiri\Server\Tasker\OnServerTask; +use Psr\Container\ContainerExceptionInterface; +use Psr\Container\NotFoundExceptionInterface; +use ReflectionException; +use Swoole\Coroutine; use Swoole\Http\Server as HServer; use Swoole\Process; use Swoole\Server; use Swoole\Server\Port; use Swoole\WebSocket\Server as WServer; -use Symfony\Component\Console\Helper\Table; -use Symfony\Component\Console\Helper\TableSeparator; -use Symfony\Component\Console\Output\ConsoleOutput; -use Symfony\Component\Console\Output\OutputInterface; /** @@ -151,30 +149,6 @@ class ServerManager extends Component } - /** - * @param string|OnProcessInterface|BaseProcess $customProcess - * @throws Exception - */ - public function addProcess(string|OnProcessInterface|BaseProcess $customProcess) - { - if (is_string($customProcess)) { - $customProcess = Kiri::getDi()->get($customProcess); - } - $system = sprintf('[%s].process', Config::get('id', 'system-service')); - $this->logger->debug($system . ' ' . $customProcess->getName() . ' start.'); - $this->server->addProcess(new Process(function (Process $process) use ($customProcess, $system) { - if (Kiri::getPlatform()->isLinux()) { - $process->name($system . '(' . $customProcess->getName() . ')'); - } - $customProcess->onSigterm()->process($process); - }, - $customProcess->getRedirectStdinAndStdout(), - $customProcess->getPipeType(), - $customProcess->isEnableCoroutine() - )); - } - - /** * @return array */ @@ -282,20 +256,20 @@ class ServerManager extends Component * * * - $data = new Table($this->container->get(OutputInterface::class)); - $data->setHeaders(['key', 'value']); - - $array = []; - foreach ($this->server->setting as $key => $value) { - $array[] = [$key, $value]; - $array[] = new TableSeparator(); - } - - array_pop($array); - - $data->setStyle('box-double'); - $data->setRows($array); - $data->render(); + * $data = new Table($this->container->get(OutputInterface::class)); + * $data->setHeaders(['key', 'value']); + * + * $array = []; + * foreach ($this->server->setting as $key => $value) { + * $array[] = [$key, $value]; + * $array[] = new TableSeparator(); + * } + * + * array_pop($array); + * + * $data->setStyle('box-double'); + * $data->setRows($array); + * $data->render(); */ private function createBaseServer(string $type, string $host, int $port, int $mode, array $settings = []) {