[SWOOLE_TCP, Http::class], 'TCP' => [SWOOLE_TCP, Receive::class], 'PACKAGE' => [SWOOLE_UDP, Packet::class], 'WEBSOCKET' => [SWOOLE_SOCK_TCP, Websocket::class], ]; private Packet|Websocket|Receive|null|Http $swoole = null; public int $daemon = 0; private array $process = [ 'biomonitoring' => Biomonitoring::class, 'logger_process' => LoggerProcess::class ]; private array $params = []; /** * @param $name * @param $process * @param array $params */ public function addProcess($name, $process, $params = []) { $this->process[$name] = $process; $this->params[$name] = $params; } /** * @return array */ public function getProcesses(): array { return $this->process ?? []; } /** * @param $configs * @return Packet|Websocket|Receive|Http|null * @throws Exception */ private function initCore($configs): Packet|Websocket|Receive|Http|null { $servers = $this->sortServers($configs); foreach ($servers as $server) { $this->create($server); if (!$this->swoole) { throw new Exception('Base service create fail.'); } } return $this->startRpcService(); } /** * @return string start server * * start server * @throws ConfigException * @throws Exception */ public function start(): string { $configs = Config::get('servers', [], true); $baseServer = $this->initCore($configs); if (!$baseServer) { return 'ok'; } Runtime::enableCoroutine(true, SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_BLOCKING_FUNCTION); $settings['enable_deadlock_check'] = false; $settings['exit_condition'] = function () { return Coroutine::stats()['coroutine_num'] === 0; }; Coroutine::set($settings); return $this->execute($baseServer); } /** * @param $baseServer * @return mixed * @throws Exception */ private function execute($baseServer): mixed { $app = Snowflake::app(); $app->set('base-server', $baseServer); return $baseServer->start(); } /** * @param $host * @param $Port * @return Packet|Websocket|Receive|Http|null * @throws Exception */ public function error_stop($host, $Port): Packet|Websocket|Receive|Http|null { $this->error(sprintf('Port %s::%d is already.', $host, $Port)); if ($this->swoole) { $this->swoole->shutdown(); } else { $this->shutdown(); } return $this->swoole; } /** * @return bool * @throws ConfigException * @throws Exception */ public function isRunner(): bool { $port = $this->sortServers(Config::get('servers')); if (empty($port)) { return false; } foreach ($port as $value) { if ($this->checkPort($value['port'])) { return true; } } return false; } /** * @param $port * @return bool * @throws Exception */ private function checkPort($port): bool { if (Snowflake::getPlatform()->isLinux()) { exec('netstat -tunlp | grep ' . $port, $output); } else { exec('lsof -i :' . $port . ' | grep -i "LISTEN"', $output); } return !empty($output); } /** * @return void * * start server * @throws Exception */ public function shutdown() { /** @var Shutdown $shutdown */ $shutdown = Snowflake::app()->get('shutdown'); $shutdown->shutdown(); } /** * @throws ConfigException * @throws Exception */ public function onProcessListener(): \Swoole\Server|null|Packet|Receive|Http|Websocket { if (!($this->swoole instanceof \Swoole\Server)) { return $this->swoole; } $processes = Config::get('processes'); if (!empty($processes) && is_array($processes)) { $this->deliveryProcess(merge($processes, $this->process)); } else { $this->deliveryProcess($this->process); } return $this->swoole; } /** * @param $processes * @throws Exception */ private function deliveryProcess($processes) { $application = Snowflake::app(); if (empty($processes) || !is_array($processes)) { return; } foreach ($processes as $name => $process) { $this->debug(sprintf('Process %s', $process)); if (!is_string($process)) { continue; } $system = Snowflake::createObject($process, [Snowflake::app(), $name, true]); if (isset($this->params[$name]) && !empty($this->params[$name])) { $system->write(swoole_serialize($this->params[$name])); } $this->swoole->addProcess($system); $application->set($process, $system); } } /** * @param $daemon * @return Server */ public function setDaemon($daemon): static { if (!in_array($daemon, [0, 1])) { return $this; } $this->daemon = $daemon; return $this; } /** * @return Packet|Websocket|Receive|Http|null */ public function getServer(): Packet|Websocket|Receive|Http|null { return $this->swoole; } /** * @param $config * @return \Swoole\Server|Packet|Receive|Http|Websocket|null * @throws ConfigException * @throws Exception */ private function create($config): \Swoole\Server|null|Packet|Receive|Http|Websocket { $settings = Config::get('settings', []); if (!isset($this->server[$config['type']])) { throw new Exception('Unknown server type(' . $config['type'] . ').'); } $server = $this->dispatchCreate($config, $settings); if (isset($config['events'])) { $this->createEventListen($config); } return $server; } /** * @param $config * @throws Exception */ protected function createEventListen($config) { if (!is_array($config['events'])) { return; } foreach ($config['events'] as $name => $_event) { if ($name === Event::SERVER_CLIENT_CLOSE) { Event::on($name, $_event); }else{ Event::on('listen ' . $config['port'] . ' ' . $name, $_event); } } } /** * @param $config * @param $settings * @return \Swoole\Server|Packet|Receive|Http|Websocket|null * @throws Exception */ private function dispatchCreate($config, $settings): \Swoole\Server|Packet|Receive|Http|Websocket|null { if (Snowflake::port_already($config['port'])) { return $this->error_stop($config['host'], $config['port']); } if (!($this->swoole instanceof \Swoole\Server)) { return $this->parseServer($config, $settings); } return $this->addListener($config); } /** * @param $config * @return Http|Packet|Receive|Websocket|null * @throws Exception */ private function addListener($config): Packet|Websocket|Receive|Http|null { $newListener = $this->swoole->addlistener($config['host'], $config['port'], $config['mode']); if (!$newListener) { exit($this->addError(sprintf('Listen %s::%d fail.', $config['host'], $config['port']))); } if (isset($config['settings']) && is_array($config['settings'])) { $newListener->set($config['settings']); } $this->onListenerBind($config); return $this->swoole; } /** * @return Packet|Websocket|Receive|Http|null * @throws ConfigException * @throws Exception */ private function startRpcService(): Packet|Websocket|Receive|Http|null { $rpcService = Config::get('rpc', []); if (empty($rpcService)) { return $this->swoole; } $this->addListener($rpcService); return $this->swoole; } /** * @param $config * @param $settings * @return Packet|Websocket|Receive|Http|null * @throws Exception */ private function parseServer($config, $settings): Packet|Websocket|Receive|Http|null { $class = $this->dispatch($config['type']); if (is_array($config['settings'] ?? null)) { $settings = array_merge($settings, $config['settings']); } $this->debug(Snowflake::listen($config)); $this->swoole = $this->createServer($class, $config); $this->swoole->set(array_merge($settings, [ 'daemonize' => $this->daemon, 'pid_file' => $settings['pid_file'] ?? PID_PATH ])); return $this->onProcessListener(); } /** * @param $class * @param $config * @return mixed */ private function createServer($class, $config): mixed { return new $class($config['host'], $config['port'], SWOOLE_PROCESS, $config['mode']); } /** * @param $config * @return Packet|Websocket|Receive|Http|null * @throws Exception */ private function onListenerBind($config): Packet|Websocket|Receive|Http|null { $this->bindServerEvent($config['type']); $this->debug(sprintf('Check listen %s::%d -> ok', $config['host'], $config['port'])); return $this->swoole; } /** * @param string $type * @throws Exception */ private function bindServerEvent($type = self::TCP) { if (in_array($type, [self::PACKAGE, self::TCP])) { $this->onBindCallback('connect', [make(OnConnect::class), 'onHandler']); $this->onBindCallback('close', [make(OnClose::class), 'onHandler']); if ($type == self::PACKAGE) { $this->onBindCallback('packet', [make(OnPacket::class), 'onHandler']); } else if ($type == self::TCP) { $this->onBindCallback('receive', [make(OnReceive::class), 'onHandler']); } } else if ($type === self::HTTP) { $this->onBindCallback('request', [make(OnRequest::class), 'onHandler']); } else { throw new Exception('Unknown server type(' . $type . ').'); } } /** * @param $name * @param $callback * @throws Exception */ public function onBindCallback($name, $callback) { if ($this->swoole->getCallback($name) !== null) { return; } $this->swoole->on($name, $callback); } /** * @param $type * @return string */ private function dispatch($type): string { return match ($type) { self::HTTP => Http::class, self::WEBSOCKET => Websocket::class, self::PACKAGE => Packet::class, default => Receive::class }; } /** * @param $servers * @return array */ private function sortServers($servers): array { $array = []; foreach ($servers as $server) { switch ($server['type']) { case self::WEBSOCKET: array_unshift($array, $server); break; case self::HTTP: case self::PACKAGE | self::TCP: $array[] = $server; break; default: $array[] = $server; } } return $array; } }