From b0f2b1077ea20b8becd86c65c5dd4e09aed6b553 Mon Sep 17 00:00:00 2001 From: "as2252258@163.com" Date: Tue, 20 Jul 2021 01:37:16 +0800 Subject: [PATCH] modify --- HttpServer/Server.php | 345 +++++---------------------------------- Rpc/Service.php | 60 +------ Server/ServerManager.php | 5 +- 3 files changed, 44 insertions(+), 366 deletions(-) diff --git a/HttpServer/Server.php b/HttpServer/Server.php index 1523d410..9df59d97 100644 --- a/HttpServer/Server.php +++ b/HttpServer/Server.php @@ -5,22 +5,18 @@ namespace HttpServer; use Exception; use HttpServer\Abstracts\HttpService; -use HttpServer\Events\OnClose; -use HttpServer\Events\OnConnect; -use HttpServer\Events\OnPacket; -use HttpServer\Events\OnReceive; -use HttpServer\Events\OnRequest; use HttpServer\Service\Http; use HttpServer\Service\Packet; use HttpServer\Service\Receive; use HttpServer\Service\Websocket; +use Rpc\Service; +use Server\Constant; +use Server\ServerManager; use Snowflake\Abstracts\Config; use Snowflake\Error\LoggerProcess; -use Snowflake\Event; use Snowflake\Exception\ConfigException; use Snowflake\Process\Biomonitoring; use Snowflake\Snowflake; -use Swoole\Coroutine; use Swoole\Runtime; @@ -33,29 +29,22 @@ defined('PID_PATH') or define('PID_PATH', APP_PATH . 'storage/server.pid'); class Server extends HttpService { - const HTTP = 'HTTP'; - const TCP = 'TCP'; - const PACKAGE = 'PACKAGE'; - const WEBSOCKET = 'WEBSOCKET'; - - private array $server = [ - 'HTTP' => [SWOOLE_TCP, Http::class], - 'TCP' => [SWOOLE_TCP, Receive::class], - 'PACKAGE' => [SWOOLE_UDP, Packet::class], - 'WEBSOCKET' => [SWOOLE_SOCK_TCP, Websocket::class], - ]; - - private Packet|Websocket|Receive|null|Http $swoole = null; - - public int $daemon = 0; - - private array $process = [ 'biomonitoring' => Biomonitoring::class, 'logger_process' => LoggerProcess::class ]; - private array $params = []; + + private ServerManager $manager; + + + /** + * + */ + public function init() + { + $this->manager = ServerManager::getContext(); + } /** @@ -63,37 +52,9 @@ class Server extends HttpService * @param $process * @param array $params */ - public function addProcess($name, $process, array $params = []) + public function addProcess($process) { - $this->process[$name] = $process; - $this->params[$name] = $params; - } - - - /** - * @return array - */ - public function getProcesses(): array - { - return $this->process ?? []; - } - - - /** - * @param $configs - * @return Packet|Websocket|Receive|Http|null - * @throws Exception - */ - private function initCore($configs): Packet|Websocket|Receive|Http|null - { - $servers = $this->sortServers($configs); - foreach ($servers as $server) { - $this->create($server); - if (!$this->swoole) { - throw new Exception('Base service create fail.'); - } - } - return $this->startRpcService(); + $this->manager->addProcess($process); } @@ -106,35 +67,34 @@ class Server extends HttpService */ public function start(): string { - $configs = Config::get('servers', [], true); + $this->manager->initBaseServer(Config::get('servers', [], true)); - $baseServer = $this->initCore($configs); - if (!$baseServer) { - return 'ok'; + $rpcService = Config::get('rpc', []); + if (!empty($rpcService)) { + $this->rpcListener($rpcService); + } + foreach ($this->process as $process) { + $this->manager->addProcess($process); } - Runtime::enableCoroutine(true, SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_BLOCKING_FUNCTION); - $settings['enable_deadlock_check'] = false; - $settings['exit_condition'] = function () { - return Coroutine::stats()['coroutine_num'] === 0; - }; - Coroutine::set($settings); - - return $this->execute($baseServer); + return $this->manager->getServer()->start(); } /** - * @param $baseServer - * @return mixed - * @throws Exception + * @param $rpcService + * @throws \ReflectionException + * @throws \Snowflake\Exception\NotFindClassException */ - private function execute($baseServer): mixed + private function rpcListener($rpcService) { - $app = Snowflake::app(); - $app->set('base-server', $baseServer); - return $baseServer->start(); + $rpcService['events'][Constant::CONNECT] = [Service::class, 'onConnect']; + $rpcService['events'][Constant::DISCONNECT] = [Service::class, 'onClose']; + $rpcService['events'][Constant::CLOSE] = [Service::class, 'onClose']; + $rpcService['events'][Constant::RECEIVE] = [Service::class, 'onReceive']; + $rpcService['events'][Constant::PACKET] = [Service::class, 'onPacket']; + $this->manager->addListener($rpcService['type'], $rpcService['host'], $rpcService['port'], $rpcService['mode'], $rpcService); } @@ -163,7 +123,7 @@ class Server extends HttpService */ public function isRunner(): bool { - $port = $this->sortServers(Config::get('servers')); + $port = Config::get('servers'); if (empty($port)) { return false; } @@ -206,51 +166,6 @@ class Server extends HttpService } - /** - * @throws ConfigException - * @throws Exception - */ - public function onProcessListener(): \Swoole\Server|null|Packet|Receive|Http|Websocket - { - if (!($this->swoole instanceof \Swoole\Server)) { - return $this->swoole; - } - - $processes = Config::get('processes'); - if (!empty($processes) && is_array($processes)) { - $this->deliveryProcess(merge($processes, $this->process)); - } else { - $this->deliveryProcess($this->process); - } - return $this->swoole; - } - - - /** - * @param $processes - * @throws Exception - */ - private function deliveryProcess($processes) - { - $application = Snowflake::app(); - if (empty($processes) || !is_array($processes)) { - return; - } - foreach ($processes as $name => $process) { - $this->debug(sprintf('Process %s', $process)); - if (!is_string($process)) { - continue; - } - $system = Snowflake::createObject($process, [Snowflake::app(), $name, true]); - if (isset($this->params[$name]) && !empty($this->params[$name])) { - $system->write(swoole_serialize($this->params[$name])); - } - $this->swoole->addProcess($system); - $application->set($process, $system); - } - } - - /** * @param $daemon * @return Server @@ -266,156 +181,11 @@ class Server extends HttpService /** - * @return Packet|Websocket|Receive|Http|null + * @return \Swoole\Http\Server|\Swoole\Server|\Swoole\WebSocket\Server|null */ - public function getServer(): Packet|Websocket|Receive|Http|null + public function getServer(): \Swoole\Http\Server|\Swoole\Server|\Swoole\WebSocket\Server|null { - return $this->swoole; - } - - - /** - * @param $config - * @return mixed - * @throws ConfigException - * @throws Exception - */ - private function create($config): \Swoole\Server|null|Packet|Receive|Http|Websocket - { - $settings = Config::get('settings', []); - if (!isset($this->server[$config['type']])) { - throw new Exception('Unknown server type(' . $config['type'] . ').'); - } - $server = $this->dispatchCreate($config, $settings); - if (isset($config['events'])) { - $this->createEventListen($server, $config); - } - return $server; - } - - - /** - * @param $config - * @throws Exception - */ - protected function createEventListen($server, $config) - { - if (!is_array($config['events'])) { - return; - } - foreach ($config['events'] as $name => $_event) { - $server->on(strtolower($name), $_event); - } - } - - /** - * @param $config - * @param $settings - * @return mixed - * @throws Exception - */ - private function dispatchCreate($config, $settings): mixed - { - if (!($this->swoole instanceof \Swoole\Server)) { - return $this->parseServer($config, $settings); - } - return $this->addListener($config); - } - - - /** - * @param $config - * @return Http|Packet|Receive|Websocket|null - * @throws Exception - */ - private function addListener($config): Packet|Websocket|Receive|Http|null - { - $newListener = $this->swoole->addlistener($config['host'], $config['port'], $config['mode']); - if (!$newListener) { - exit($this->addError(sprintf('Listen %s::%d fail.', $config['host'], $config['port']))); - } - - $newListener->set($config['settings'] ?? []); - $this->onListenerBind($newListener, $config); - - return $this->swoole; - } - - - /** - * @return Packet|Websocket|Receive|Http|null - * @throws ConfigException - * @throws Exception - */ - private function startRpcService(): Packet|Websocket|Receive|Http|null - { - $rpcService = Config::get('rpc', []); - if (empty($rpcService)) { - return $this->swoole; - } - $this->addListener($rpcService); - return $this->swoole; - } - - - /** - * @param $config - * @param $settings - * @return Packet|Websocket|Receive|Http|null - * @throws Exception - */ - private function parseServer($config, $settings): Packet|Websocket|Receive|Http|null - { - $class = $this->dispatch($config['type']); - if (is_array($config['settings'] ?? null)) { - $settings = array_merge($settings, $config['settings']); - } - $this->debug(Snowflake::listen($config)); - $this->swoole = $this->createServer($class, $config); - $this->swoole->set(array_merge($settings, [ - 'daemonize' => $this->daemon, - 'pid_file' => $settings['pid_file'] ?? PID_PATH - ])); - return $this->onProcessListener(); - } - - - /** - * @param $class - * @param $config - * @return mixed - */ - private function createServer($class, $config): mixed - { - return new $class($config['host'], $config['port'], SWOOLE_PROCESS, $config['mode']); - } - - - /** - * @param $config - * @return Packet|Websocket|Receive|Http|null - * @throws Exception - */ - private function onListenerBind($server, $config): Packet|Websocket|Receive|Http|null - { - $http = function () {}; - if (self::PACKAGE == $config['type']) { - $this->onBindCallback($server, 'packet', $config['events'][Event::SERVER_ON_PACKET] ?? [make(OnPacket::class), 'onHandler']); - } else if ($config['type'] == self::TCP) { - $this->onBindCallback($server, 'connect', $config['events'][Event::SERVER_ON_CONNECT] ?? [make(OnConnect::class), 'onHandler']); - $this->onBindCallback($server, 'close', $config['events'][Event::SERVER_ON_CLOSE] ?? [make(OnClose::class), 'onHandler']); - $this->onBindCallback($server, 'receive', $config['events'][Event::SERVER_ON_RECEIVE] ?? [make(OnReceive::class), 'onHandler']); - } else if ($config['type'] === self::HTTP) { - $this->onBindCallback($server, 'request', $config['events'][Event::SERVER_ON_REQUEST] ?? [make(OnRequest::class), 'onHandler']); - $server->on('connect', $http); - $server->on('close', $http); - } else { - throw new Exception('Unknown server type(' . $config['type'] . ').'); - } - - $this->debug(sprintf('Check listen %s::%d -> ok', $config['host'], $config['port'])); - - return $this->swoole; + return $this->manager->getServer(); } @@ -432,43 +202,4 @@ class Server extends HttpService $server->on($name, $callback); } - - /** - * @param $type - * @return string - */ - private function dispatch($type): string - { - return match ($type) { - self::HTTP => Http::class, - self::WEBSOCKET => Websocket::class, - self::PACKAGE => Packet::class, - default => Receive::class - }; - } - - /** - * @param $servers - * @return array - */ - private function sortServers($servers): array - { - $array = []; - foreach ($servers as $server) { - switch ($server['type']) { - case self::WEBSOCKET: - array_unshift($array, $server); - break; - case self::HTTP: - case self::PACKAGE | self::TCP: - $array[] = $server; - break; - default: - $array[] = $server; - } - } - return $array; - } - - } diff --git a/Rpc/Service.php b/Rpc/Service.php index b6e7d964..310e8acb 100644 --- a/Rpc/Service.php +++ b/Rpc/Service.php @@ -53,63 +53,13 @@ class Service extends Component } - /** - * @param Packet|Websocket|Receive|Http|null $server - * @throws ConfigException - * @throws Exception - */ - public function instance(Packet|Websocket|Receive|null|Http $server): void - { - $service = Config::get('rpc'); - if (!is_array($service) || empty($service)) { - return; - } - - $listen_type = $service['mode'] ?? SWOOLE_SOCK_TCP6; - $rpcServer = $server->addlistener($service['host'], $service['port'], $listen_type); - if ($rpcServer === false) { - throw new Exception('Listen rpc service fail.'); - } - $this->debug(Snowflake::listen($service)); - - $rpcServer->set($service['setting'] ?? self::defaultConfig); - $this->addCallback($rpcServer, $service, $listen_type); - } - - - /** - * @param $rpcServer - * @param $config - * @param $mode - * @throws Exception - */ - private function addCallback($rpcServer, $config, $mode) - { - $tcp = [SWOOLE_SOCK_TCP, SWOOLE_TCP, SWOOLE_TCP6, SWOOLE_SOCK_TCP6]; - $server = Snowflake::app()->getServer(); - if (in_array($mode, $tcp)) { - $connectCallback = $config['events'][Event::SERVER_ON_CONNECT] ?? [$this, 'onConnect']; - $server->onBindCallback($rpcServer, 'connect', $connectCallback); - - $connectCallback = $config['events'][Event::SERVER_ON_CLOSE] ?? [$this, 'onClose']; - $server->onBindCallback($rpcServer, 'close', $connectCallback); - - $connectCallback = $config['events'][Event::SERVER_ON_CONNECT] ?? [$this, 'onReceive']; - $server->onBindCallback($rpcServer, 'receive', $connectCallback); - } else { - $connectCallback = $config['events'][Event::SERVER_ON_PACKET] ?? [$this, 'onPacket']; - $server->onBindCallback($rpcServer, 'packet', $connectCallback); - } - } - - /** * @param Server $server * @param int $fd * @param int $reactorId * @throws Exception */ - private function onConnect(Server $server, int $fd, int $reactorId) + public function onConnect(Server $server, int $fd, int $reactorId) { defer(fn() => fire(Event::SYSTEM_RESOURCE_RELEASES)); @@ -127,7 +77,7 @@ class Service extends Component * on tcp client close * @throws Exception */ - private function onClose(Server $server, int $fd) + public function onClose(Server $server, int $fd) { defer(fn() => fire(Event::SYSTEM_RESOURCE_RELEASES)); @@ -142,7 +92,7 @@ class Service extends Component * @param string $data * @throws Exception */ - private function onReceive(Server $server, int $fd, int $reID, string $data) + public function onReceive(Server $server, int $fd, int $reID, string $data) { defer(fn() => fire(Event::SYSTEM_RESOURCE_RELEASES)); try { @@ -166,7 +116,7 @@ class Service extends Component * @param array $client * @throws Exception */ - private function onPacket(Server $server, string $data, array $client) + public function onPacket(Server $server, string $data, array $client) { defer(fn() => fire(Event::SYSTEM_RESOURCE_RELEASES)); try { @@ -188,7 +138,7 @@ class Service extends Component * @return mixed * @throws Exception */ - private function requestSpl(int $server_port, string $data): mixed + public function requestSpl(int $server_port, string $data): mixed { $sRequest = new Request(); diff --git a/Server/ServerManager.php b/Server/ServerManager.php index 33a11ce3..802bd62d 100644 --- a/Server/ServerManager.php +++ b/Server/ServerManager.php @@ -85,16 +85,13 @@ class ServerManager extends Abstracts\Server * @throws NotFindClassException * @throws ReflectionException */ - public function start(): void + public function initBaseServer($configs): void { $context = ServerManager::getContext(); - $configs = require_once 'server.php'; - foreach ($this->sortService($configs['server']['ports']) as $config) { $this->startListenerHandler($context, $config); } $this->addServerEventCallback($this->getSystemEvents($configs)); - $context->server->start(); }