From dcc380e6b26d0bb09987737dd3e65267bfd80af1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=91=E6=9E=97?= Date: Wed, 22 Jun 2022 16:29:41 +0800 Subject: [PATCH] modify plugin name --- Abstracts/AsyncServer.php | 62 +++++++-- Abstracts/CoroutineServer.php | 242 ---------------------------------- Abstracts/ProcessManager.php | 26 +--- Server.php | 15 +-- ServerCommand.php | 12 +- ServerProviders.php | 19 +-- 6 files changed, 77 insertions(+), 299 deletions(-) delete mode 100644 Abstracts/CoroutineServer.php diff --git a/Abstracts/AsyncServer.php b/Abstracts/AsyncServer.php index 1f9179a..1859705 100644 --- a/Abstracts/AsyncServer.php +++ b/Abstracts/AsyncServer.php @@ -7,12 +7,14 @@ use Kiri; use Kiri\Abstracts\Config; use Kiri\Di\ContainerInterface; use Kiri\Exception\ConfigException; +use Kiri\Server\Events\OnBeforeShutdown; use Kiri\Server\Events\OnShutdown; use Psr\Container\ContainerExceptionInterface; use Psr\Container\NotFoundExceptionInterface; use Psr\Log\LoggerInterface; use ReflectionException; -use Swoole\Coroutine; +use Kiri\Server\Config as SConfig; +use Kiri\Di\LocalService; use Swoole\Server; use Kiri\Server\ServerInterface; use Kiri\Server\Constant; @@ -67,6 +69,10 @@ class AsyncServer implements ServerInterface foreach ($service as $value) { $this->addListener($value); } + $rpcService = Config::get('rpc', []); + if (!empty($rpcService)) { + $this->addListener(instance(SConfig::class, [], $rpcService)); + } $this->processManager->batch(Config::get('processes', []), $this->server); $this->processManager->batch($this->getProcess(), $this->server); } @@ -97,7 +103,7 @@ class AsyncServer implements ServerInterface /** - * @param \Kiri\Server\Config $config + * @param SConfig $config * @param int $daemon * @return void * @throws ConfigException @@ -105,7 +111,7 @@ class AsyncServer implements ServerInterface * @throws NotFindClassException * @throws NotFoundExceptionInterface */ - private function createBaseServer(\Kiri\Server\Config $config, int $daemon = 0): void + private function createBaseServer(SConfig $config, int $daemon = 0): void { $match = $this->getServerClass($config->type); if (is_null($match)) { @@ -125,13 +131,13 @@ class AsyncServer implements ServerInterface /** - * @param \Kiri\Server\Config $config + * @param SConfig $config * @param int $daemon * @return array * @throws Exception * @throws ConfigException */ - protected function systemConfig(\Kiri\Server\Config $config, int $daemon): array + protected function systemConfig(SConfig $config, int $daemon): array { $settings = array_merge(Config::get('server.settings', []), $config->settings); $settings[Constant::OPTION_DAEMONIZE] = (bool)$daemon; @@ -145,13 +151,13 @@ class AsyncServer implements ServerInterface /** - * @param \Kiri\Server\Config $config + * @param SConfig $config * @return void * @throws ContainerExceptionInterface * @throws NotFoundExceptionInterface * @throws Exception */ - public function addListener(\Kiri\Server\Config $config): void + public function addListener(SConfig $config): void { $port = $this->server->addlistener($config->host, $config->port, $config->mode); if ($port === false) { @@ -163,7 +169,47 @@ class AsyncServer implements ServerInterface $port->set($this->resetSettings($config->type, $config->settings)); $this->onEventListen($port, $config->getEvents()); - Kiri::app()->set($config->getName(), $port); + $this->container->get(LocalService::class)->set($config->getName(), $port); + } + + + /** + * @param array $signal + * @return void + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + * @throws Exception + */ + public function onSignal(array $signal): void + { + pcntl_signal(SIGINT, [$this, 'onSigint']); + foreach ($signal as $sig => $value) { + if (is_array($value) && is_string($value[0])) { + $value[0] = $this->container->get($value[0]); + } + if (!is_callable($value, true)) { + throw new Exception('Register signal callback must can exec.'); + } + pcntl_signal($sig, $value); + } + } + + + /** + * @return void + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + * @throws ReflectionException + */ + public function onSigint(): void + { + try { + $this->dispatch->dispatch(new OnBeforeShutdown()); + } catch (\Throwable $exception) { + $this->logger->error($exception->getMessage()); + } finally { + $this->shutdown(); + } } diff --git a/Abstracts/CoroutineServer.php b/Abstracts/CoroutineServer.php deleted file mode 100644 index f24e009..0000000 --- a/Abstracts/CoroutineServer.php +++ /dev/null @@ -1,242 +0,0 @@ - - */ - private array $servers = []; - - - use TraitServer; - - - private bool $isShutdown = false; - - - /** - * @param Config $config - * @param ContainerInterface $container - * @param EventDispatch $dispatch - * @param EventProvider $provider - * @param LoggerInterface $logger - * @param ProcessManager $processManager - * @param array $params - */ - public function __construct(public Config $config, - public ContainerInterface $container, - public EventDispatch $dispatch, - public EventProvider $provider, - public LoggerInterface $logger, - public ProcessManager $processManager, - public array $params = [] - ) - { - } - - - /** - * @param string $name - * @return Server|Coroutine\Server|Coroutine\Http\Server|null - */ - public function getServer(string $name = ''): Server|Coroutine\Server|Coroutine\Http\Server|null - { - if (empty($this->servers)) { - return null; - } - if (empty($name)) { - return current($this->servers); - } - return $this->servers[$name] ?? null; - } - - - /** - * @param array $service - * @param int $daemon - * @return void - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - */ - public function initCoreServers(array $service, int $daemon = 0): void - { - $this->provider->on(OnShutdown::class, function () { - $process = $this->container->get(ProcessManager::class); - $process->shutdown(); - }); - - // TODO: Implement initCoreServers() method. - $service = $this->genConfigService($service); - foreach ($service as $value) { - $this->addListener($value); - } - } - - - /** - * @param \Kiri\Server\Config $config - * @return void - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - */ - public function addListener(\Kiri\Server\Config $config): void - { - // TODO: Implement addListener() method. - $class = $this->getCoroutineServerClass($config->type); - - /** @var Coroutine\Server|Coroutine\Http\Server $server */ - $server = new $class($config->host, $config->port); - $server->set($config->settings); - if (!($server instanceof Coroutine\Server)) { - $this->onRequestCallback($server, $config); - } else { - $this->onTcpConnection($server, $config); - } - $this->servers[$config->name] = $server; - } - - - /** - * @param Coroutine\Http\Server $server - * @param \Kiri\Server\Config $config - * @return void - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - */ - public function onRequestCallback(Coroutine\Http\Server $server, \Kiri\Server\Config $config): void - { - $requestCallback = $config->events[Constant::REQUEST] ?? null; - if (empty($requestCallback)) { - return; - } - if (is_array($requestCallback) && is_string($requestCallback[0])) { - $requestCallback[0] = $this->container->get($requestCallback[0]); - } - $server->handle('/', $requestCallback); - } - - - /** - * @param Coroutine\Server $server - * @param \Kiri\Server\Config $config - * @return void - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - */ - public function onTcpConnection(Coroutine\Server $server, \Kiri\Server\Config $config): void - { - $requestCallback = $config->events[Constant::RECEIVE] ?? null; - if (is_null($requestCallback)) { - return; - } - if (is_array($requestCallback) && is_string($requestCallback[0])) { - $requestCallback[0] = $this->container->get($requestCallback[0]); - } - $closeCallback = $config->events[Constant::CLOSE] ?? null; - $server->handle(function (Coroutine\Server\Connection $connection) use ($requestCallback, $closeCallback) { - - defer(function () use ($connection, $closeCallback) { - call_user_func($closeCallback, $connection->exportSocket()->fd); - }); - while (true) { - $read = $connection->recv(); - if ($read === null || $read === false) { - break; - } - $requestCallback($read); - } - }); - } - - - /** - * @return void - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - * @throws \ReflectionException - */ - public function shutdown(): void - { - $this->isShutdown = true; - $this->processManager->shutdown(); - foreach ($this->servers as $server) { - $server->shutdown(); - } - $this->dispatch->dispatch(new OnShutdown()); - } - - - /** - * @return void - * @throws ConfigException - */ - public function start(): void - { - run(function () { - $provider = $this->container->get(EventProvider::class); - - - $merge = array_merge(Config::get('processes', []), $this->getProcess()); - $this->processManager->batch($merge); - foreach ($this->servers as $server) { - Coroutine::create(function () use ($server) { - - - $this->runServer($server); - }); - } - }); - Process::wait(); - } - - - /** - * @param Coroutine\Http\Server|Coroutine\Server $server - * @return void - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - * @throws \ReflectionException - */ - public function runServer(Coroutine\Http\Server|Coroutine\Server $server): void - { - $this->dispatch->dispatch(new OnWorkerStart($server, 0)); - try { - $server->start(); - } catch (\Throwable $throwable) { - $this->logger->error($throwable->getMessage(), [$throwable]); - } finally { - $this->dispatch->dispatch(new OnWorkerStop($server, 0)); - if ($this->isShutdown) { - return; - } - $this->runServer($server); - } - } - - -} diff --git a/Abstracts/ProcessManager.php b/Abstracts/ProcessManager.php index 3b83b59..743524b 100644 --- a/Abstracts/ProcessManager.php +++ b/Abstracts/ProcessManager.php @@ -139,34 +139,12 @@ class ProcessManager if (empty($processes)) { return; } - if (is_null($server)) { - $this->poolManager($processes); - } else { - foreach ($processes as $process) { - [$customProcess, $sProcess] = $this->add($process); - - $this->_process[$customProcess->getName()] = $customProcess; - - $server->addProcess($sProcess); - } - } - } - - - /** - * @param array $processes - * @return void - * @throws ConfigException - */ - protected function poolManager(array $processes): void - { foreach ($processes as $process) { - /** @var BaseProcess $customProcess */ [$customProcess, $sProcess] = $this->add($process); - $this->_process[$customProcess->getName()] = $sProcess; + $this->_process[$customProcess->getName()] = $customProcess; - $sProcess->start(); + $server->addProcess($sProcess); } } diff --git a/Server.php b/Server.php index f348ae2..0f27a05 100644 --- a/Server.php +++ b/Server.php @@ -55,7 +55,7 @@ class Server extends HttpService /** * @param State $state - * @param Abstracts\CoroutineServer $manager + * @param AsyncServer $manager * @param ContainerInterface $container * @param ProcessManager $processManager * @param EventDispatch $dispatch @@ -65,7 +65,7 @@ class Server extends HttpService * @throws Exception */ public function __construct(public State $state, - public CoroutineServer $manager, + public AsyncServer $manager, public ContainerInterface $container, public ProcessManager $processManager, public EventDispatch $dispatch, @@ -115,18 +115,9 @@ class Server extends HttpService */ public function start(): void { - $this->container->mapping(Emitter::class, ResponseEmitter::class); - $this->container->mapping(ResponseInterface::class, Response::class); - $this->container->mapping(RequestInterface::class, Request::class); - $this->manager->initCoreServers(Config::get('server', [], true), $this->daemon); - $rpcService = Config::get('rpc', []); - if (!empty($rpcService)) { - $this->manager->addListener($this->container->create(\Kiri\Server\Config::class, [], $rpcService)); - } - - pcntl_signal(SIGINT, [$this, 'onSigint']); + $this->manager->onSignal(Config::get('signal', [])); $this->onHotReload(); diff --git a/ServerCommand.php b/ServerCommand.php index 1cd5fdb..4ffb4db 100644 --- a/ServerCommand.php +++ b/ServerCommand.php @@ -28,11 +28,15 @@ class ServerCommand extends Command const ACTIONS = ['start', 'stop', 'restart']; + private Server $server; + + /** * @return void */ protected function configure(): void { + $this->server = Kiri::getDi()->get(Server::class); $this->setName('sw:server') ->setDescription('server start|stop|reload|restart') ->addArgument('action', InputArgument::OPTIONAL, 'run action', 'start') @@ -85,8 +89,7 @@ class ServerCommand extends Command */ protected function stop(): int { - $manager = Kiri::app()->getServer(); - $manager->shutdown(); + $this->server->shutdown(); return 1; } @@ -101,9 +104,8 @@ class ServerCommand extends Command protected function start(InputInterface $input): int { $this->scan_file(); - $manager = Kiri::app()->getServer(); - $manager->setDaemon((int)!is_null($input->getOption('daemon'))); - $manager->start(); + $this->server->setDaemon((int)!is_null($input->getOption('daemon'))); + $this->server->start(); return 1; } diff --git a/ServerProviders.php b/ServerProviders.php index f31c155..0818613 100644 --- a/ServerProviders.php +++ b/ServerProviders.php @@ -4,9 +4,11 @@ declare(strict_types=1); namespace Kiri\Server; -use Exception; use Kiri\Abstracts\Providers; -use Kiri\Application; +use Kiri\Di\LocalService; +use Psr\Container\ContainerExceptionInterface; +use Psr\Container\NotFoundExceptionInterface; +use Symfony\Component\Console\Application; use Kiri; /** @@ -18,14 +20,15 @@ class ServerProviders extends Providers /** - * @param Application $application - * @throws Exception + * @param LocalService $application + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface */ - public function onImport(Application $application) + public function onImport(LocalService $application) { - $container = Kiri::getDi(); + $server = $this->container->get(ServerCommand::class); - $console = $container->get(\Symfony\Component\Console\Application::class); - $console->add($container->get(ServerCommand::class)); + $console = $this->container->get(Application::class); + $console->add($server); } }