From adc63be3e4b9052ebb8ceee9fd1c8c70f72e4fc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=91=E6=9E=97?= Date: Wed, 17 Nov 2021 16:39:12 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Abstracts/OnTaskerStart.php | 27 --------- Abstracts/OnWorkerStart.php | 31 ---------- Abstracts/WorkerStart.php | 47 ---------------- Handler/OnServerWorker.php | 19 ++----- ServerCommand.php | 50 +++++++---------- ServerManager.php | 100 +++++++-------------------------- TraitServer.php | 32 +++++++++++ Websocket/Server.php | 109 ------------------------------------ 8 files changed, 75 insertions(+), 340 deletions(-) delete mode 100644 Abstracts/OnTaskerStart.php delete mode 100644 Abstracts/OnWorkerStart.php delete mode 100644 Abstracts/WorkerStart.php create mode 100644 TraitServer.php delete mode 100644 Websocket/Server.php diff --git a/Abstracts/OnTaskerStart.php b/Abstracts/OnTaskerStart.php deleted file mode 100644 index 269949c..0000000 --- a/Abstracts/OnTaskerStart.php +++ /dev/null @@ -1,27 +0,0 @@ -isMac()) { - return; - } - $name = Config::get('id', 'system-service'); - if (!empty($prefix)) { - $name .= '.' . $prefix; - } - swoole_set_process_name($name); - } - -} diff --git a/Handler/OnServerWorker.php b/Handler/OnServerWorker.php index f6954a0..61e0b7e 100644 --- a/Handler/OnServerWorker.php +++ b/Handler/OnServerWorker.php @@ -7,6 +7,7 @@ use Exception; use Kiri\Abstracts\Config; use Kiri\Core\Help; use Kiri\Events\EventDispatch; +use Kiri\Kiri; use Kiri\Runtime; use Server\Events\OnAfterWorkerStart; use Server\Events\OnBeforeWorkerStart; @@ -46,28 +47,16 @@ class OnServerWorker extends \Server\Abstracts\Server if ($workerId < $server->setting['worker_num']) { $this->eventDispatch->dispatch(new OnWorkerStart($server, $workerId)); $this->setProcessName(sprintf('Worker[%d].%d', $server->worker_pid, $workerId)); + set_env('environmental', Kiri::WORKER); } else { $this->eventDispatch->dispatch(new OnTaskStart($server, $workerId)); $this->setProcessName(sprintf('Tasker[%d].%d', $server->worker_pid, $workerId)); + set_env('environmental', Kiri::TASK); } $this->eventDispatch->dispatch(new OnAfterWorkerStart()); } - /** - * @param OnBeforeWorkerStart $worker - * @throws Exception - */ - public function setConfigure(OnBeforeWorkerStart $worker) - { - ServerManager::setEnv('worker', $worker->workerId); - $serialize = file_get_contents(storage(Runtime::CONFIG_NAME)); - if (!empty($serialize)) { - Config::sets(unserialize($serialize)); - } - } - - /** * @param Server $server * @param int $workerId @@ -87,7 +76,7 @@ class OnServerWorker extends \Server\Abstracts\Server */ public function onWorkerExit(Server $server, int $workerId) { - ServerManager::setEnv('state', 'exit'); + set_env('state', 'exit'); $this->eventDispatch->dispatch(new OnWorkerExit($server, $workerId)); } diff --git a/ServerCommand.php b/ServerCommand.php index 660ece0..63b4a0b 100644 --- a/ServerCommand.php +++ b/ServerCommand.php @@ -10,12 +10,6 @@ use Kiri\Abstracts\Config; use Kiri\Events\EventProvider; use Kiri\Exception\ConfigException; use Kiri\Kiri; -use Server\Abstracts\OnTaskerStart as TaskerDispatch; -use Server\Abstracts\OnWorkerStart as WorkerDispatch; -use Server\Events\OnBeforeWorkerStart; -use Server\Events\OnTaskerStart; -use Server\Events\OnWorkerStart; -use Server\Handler\OnServerWorker; use Swoole\Coroutine; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputArgument; @@ -61,28 +55,22 @@ class ServerCommand extends Command */ public function execute(InputInterface $input, OutputInterface $output): int { - try { - $manager = Kiri::app()->getServer(); - $manager->setDaemon((int)!is_null($input->getOption('daemon'))); - if (is_null($input->getArgument('action'))) { - $input->setArgument('action', 'restart'); - } - if (!in_array($input->getArgument('action'), self::ACTIONS)) { - throw new Exception('I don\'t know what I want to do.'); - } - if ($manager->isRunner() && $input->getArgument('action') == 'start') { - throw new Exception('Service is running. Please use restart.'); - } - $manager->shutdown(); - if ($input->getArgument('action') == 'stop') { - throw new Exception('shutdown success'); - } - $this->generate_runtime_builder($manager); - } catch (\Throwable $throwable) { - $output->write(jTraceEx($throwable)); - } finally { - return 1; + $manager = Kiri::app()->getServer(); + $manager->setDaemon((int)!is_null($input->getOption('daemon'))); + if (is_null($input->getArgument('action'))) { + $input->setArgument('action', 'restart'); } + if (!in_array($input->getArgument('action'), self::ACTIONS)) { + throw new Exception('I don\'t know what I want to do.'); + } + if ($manager->isRunner() && $input->getArgument('action') == 'start') { + throw new Exception('Service is running. Please use restart.'); + } + $manager->shutdown(); + if ($input->getArgument('action') == 'stop') { + throw new Exception('shutdown success'); + } + return $this->generate_runtime_builder($manager); } @@ -107,19 +95,19 @@ class ServerCommand extends Command /** * @param $manager + * @return int * @throws ConfigException * @throws Exception */ - private function generate_runtime_builder($manager): void + private function generate_runtime_builder($manager): int { $this->configure_set(); Kiri::app()->getRouter()->read_files(); - $this->eventProvider->on(OnBeforeWorkerStart::class, [di(OnServerWorker::class), 'setConfigure']); - $this->eventProvider->on(OnWorkerStart::class, [di(WorkerDispatch::class), 'dispatch']); - $this->eventProvider->on(OnTaskerStart::class, [di(TaskerDispatch::class), 'dispatch']); $manager->start(); + + return 1; } } diff --git a/ServerManager.php b/ServerManager.php index be433e1..0b69a07 100644 --- a/ServerManager.php +++ b/ServerManager.php @@ -7,6 +7,7 @@ use Closure; use Exception; use Kiri\Abstracts\Config; use Kiri\Di\ContainerInterface; +use Kiri\Error\Logger; use Kiri\Exception\ConfigException; use Kiri\Kiri; use ReflectionException; @@ -40,12 +41,19 @@ use Swoole\WebSocket\Server as WServer; class ServerManager { + + use TraitServer; + /** @var string */ public string $host = ''; public int $port = 0; + #[Inject(Logger::class)] + public Logger $logger; + + /** @var array */ public array $ports = []; @@ -109,7 +117,6 @@ class ServerManager */ public function addListener(string $type, string $host, int $port, int $mode, array $settings = []) { - if ($this->checkPortIsAlready($port)) $this->stopServer($port); if (!$this->server) { $this->createBaseServer($type, $host, $port, $mode, $settings); } else { @@ -131,8 +138,7 @@ class ServerManager foreach ($this->sortService($configs['ports']) as $config) { $this->startListenerHandler($context, $config, $daemon); } - $this->bindCallback($this->server, [Constant::PIPE_MESSAGE => [OnPipeMessage::class, 'onPipeMessage']]); -// $this->bindCallback($this->server, $this->getSystemEvents($configs)); + $this->bindCallback([Constant::PIPE_MESSAGE => [OnPipeMessage::class, 'onPipeMessage']]); } @@ -145,7 +151,7 @@ class ServerManager { $configs = Config::get('server', [], true); foreach ($this->sortService($configs['ports']) as $config) { - if ($this->checkPortIsAlready($config['port'])) { + if (checkPortIsAlready($config['port'])) { return true; } } @@ -167,7 +173,7 @@ class ServerManager if (Kiri::getPlatform()->isLinux()) { $soloProcess->name($system . '(' . $customProcess->getName() . ')'); } - echo "\033[36m[" . date('Y-m-d H:i:s') . "]\033[0m " . $system . $customProcess->getName() . ' start.' . PHP_EOL; + $this->logger->debug($system . $customProcess->getName() . ' start.'); $customProcess->signListen($soloProcess); $customProcess->onHandler($soloProcess); }, @@ -175,7 +181,7 @@ class ServerManager $customProcess->getPipeType(), $customProcess->isEnableCoroutine() ); -// $this->container->setBindings($customProcess::class, $process); + $this->container->setBindings($customProcess->getName(), $process); $this->server->addProcess($process); } @@ -189,30 +195,6 @@ class ServerManager } - /** - * @param array $ports - * @return array - */ - public function sortService(array $ports): array - { - $array = []; - foreach ($ports as $port) { - if ($port['type'] == Constant::SERVER_TYPE_WEBSOCKET) { - array_unshift($array, $port); - } else if ($port['type'] == Constant::SERVER_TYPE_HTTP) { - if (!empty($array) && $array[0]['type'] == Constant::SERVER_TYPE_WEBSOCKET) { - $array[] = $port; - } else { - array_unshift($array, $port); - } - } else { - $array[] = $port; - } - } - return $array; - } - - /** * @param string $key * @param string|int $value @@ -273,7 +255,8 @@ class ServerManager { $id = Config::get('id', 'system-service'); - echo sprintf("\033[36m[" . date('Y-m-d H:i:s') . "]\033[0m [%s]$type service %s::%d start.", $id, $host, $port) . PHP_EOL; + $this->logger->debug(sprintf('[%s]' . $type . ' service %s::%d start', $id, $host, $port)); + /** @var Server\Port $service */ $this->ports[$port] = $this->server->addlistener($host, $port, $mode); if ($this->ports[$port] === false) { @@ -293,22 +276,6 @@ class ServerManager } - /** - * @param int $port - * @param string $event - * @return Closure|array|null - */ - public function getPortCallback(int $port, string $event): Closure|array|null - { - /** @var Server\Port $_port */ - $_port = $this->ports[$port] ?? null; - if (is_null($_port)) { - return null; - } - return $_port->getCallback($event); - } - - /** * @param string $type * @param string $host @@ -317,6 +284,7 @@ class ServerManager * @param array $settings * @throws ReflectionException * @throws ConfigException + * @throws Exception */ private function createBaseServer(string $type, string $host, int $port, int $mode, array $settings = []) { @@ -331,7 +299,7 @@ class ServerManager $id = Config::get('id', 'system-service'); - echo sprintf("\033[36m[" . date('Y-m-d H:i:s') . "]\033[0m [%s]$type service %s::%d start.", $id, $host, $port) . PHP_EOL; + $this->logger->debug(sprintf('[%s]' . $type . ' service %s::%d start', $id, $host, $port)); $this->addDefaultListener($settings); } @@ -343,43 +311,16 @@ class ServerManager */ public function stopServer(int $port) { - if (!($pid = $this->checkPortIsAlready($port))) { + if (!($pid = checkPortIsAlready($port))) { return; } - while ($this->checkPortIsAlready($port)) { + while (checkPortIsAlready($port)) { Process::kill($pid, SIGTERM); usleep(300); } } - /** - * @param $port - * @return bool|string - * @throws Exception - */ - private function checkPortIsAlready($port): bool|string - { - if (!Kiri::getPlatform()->isLinux()) { - exec("lsof -i :" . $port . " | grep -i 'LISTEN' | awk '{print $2}'", $output); - if (empty($output)) return false; - $output = explode(PHP_EOL, $output[0]); - return $output[0]; - } - - $serverPid = file_get_contents(storage('.swoole.pid')); - if (!empty($serverPid) && shell_exec('ps -ef | grep ' . $serverPid . ' | grep -v grep')) { - Process::kill($serverPid, SIGTERM); - } - - exec('netstat -lnp | grep ' . $port . ' | grep "LISTEN" | awk \'{print $7}\'', $output); - if (empty($output)) { - return false; - } - return explode('/', $output[0])[0]; - } - - /** * @param array $settings * @throws ReflectionException @@ -475,20 +416,19 @@ class ServerManager { $task_use_object = $this->server->setting['task_object'] ?? $this->server->setting['task_use_object'] ?? false; $reflect = $this->container->getReflect(OnServerTask::class)?->newInstance(); + $this->server->on('finish', $events[Constant::FINISH] ?? [$reflect, 'onFinish']); if ($task_use_object || $this->server->setting['task_enable_coroutine']) { $this->server->on('task', $events[Constant::TASK] ?? [$reflect, 'onCoroutineTask']); } else { $this->server->on('task', $events[Constant::TASK] ?? [$reflect, 'onTask']); } - $this->server->on('finish', $events[Constant::FINISH] ?? [$reflect, 'onFinish']); } /** - * @param Port|Server $server * @param array|null $settings */ - public function bindCallback(Port|Server $server, ?array $settings = []) + public function bindCallback(?array $settings = []) { // TODO: Implement bindCallback() method. if (count($settings) < 1) { diff --git a/TraitServer.php b/TraitServer.php new file mode 100644 index 0000000..293cc5f --- /dev/null +++ b/TraitServer.php @@ -0,0 +1,32 @@ +server->start(); - } - - - /** - * @param string $Host - * @param int $Port - * @param bool $isSsl - * @param array $settings - */ - public function initCore(string $Host, int $Port, bool $isSsl, array $settings) - { - $this->server = new CoroutineServer($Host, $Port, $isSsl); - $this->server->set($settings['setting'] ?? []); - - $this->server->handle('/', function (Request $request, Response $response) { - $class = new \stdClass(); - if ($class instanceof OnHandshakeInterface) { - $class->onHandshake($request, $response); - } else if ($class instanceof OnOpenInterface) { - $response->upgrade(); - $class->onOpen($this->server, $request); - } - if (!($class instanceof OnMessageInterface)) { - $response->setStatusCode(200); - $response->end(); - } else { - $this->recover($class, $response); - } - }); - } - - - /** - * @param OnCloseInterface|OnMessageInterface $class - * @param Response $response - * @return bool - * @throws Exception - */ - private function recover(OnCloseInterface|OnMessageInterface $class, Response $response): bool - { - $frame = $response->recv(); - if ($frame === '' || $frame === FALSE) { - return $this->onClose($class, $response); - } - if ($frame->data == 'close' || get_class($frame) === CloseFrame::class) { - return $this->onClose($class, $response); - } - $class->onMessage($this->server, $frame); - return $this->recover($class, $response); - } - - - /** - * @param OnCloseInterface|OnMessageInterface $class - * @param Response $response - * @return bool - * @throws Exception - */ - private function onClose(OnCloseInterface|OnMessageInterface $class, Response $response): bool - { - if (!($close = $response->close())) { - Kiri::getDi()->get(Logger::class)->warning('close websocket fail.'); - } - $class->onClose($this->server, $response->fd); - return $close; - } - - - /** - * @return CoroutineServer - */ - public function getServer(): CoroutineServer - { - return $this->server; - } - -} \ No newline at end of file