diff --git a/Database/Command.php b/Database/Command.php index ab57649d..3984b4e9 100644 --- a/Database/Command.php +++ b/Database/Command.php @@ -15,6 +15,7 @@ use PDO; use PDOStatement; use Snowflake\Abstracts\Component; use Snowflake\Core\Json; +use Swoole\Coroutine; /** * Class Command @@ -174,7 +175,7 @@ class Command extends Component if (!($query = $connect->query($this->sql))) { return $this->addError($connect->errorInfo()[2] ?? '数据库异常, 请稍后再试.'); } - defer(function () use ($query) { + Coroutine::defer(function () use ($query) { $query->closeCursor(); }); if ($type === static::FETCH_COLUMN) { diff --git a/HttpServer/Server.php b/HttpServer/Server.php index e1b8c774..1988d552 100644 --- a/HttpServer/Server.php +++ b/HttpServer/Server.php @@ -3,6 +3,7 @@ namespace HttpServer; +use Exception; use HttpServer\Abstracts\HttpService; use HttpServer\Events\OnClose; use HttpServer\Events\OnConnect; @@ -10,16 +11,13 @@ use HttpServer\Events\OnPacket; use HttpServer\Events\OnReceive; use HttpServer\Events\OnRequest; use HttpServer\Service\Http; -use HttpServer\Service\Receive; use HttpServer\Service\Packet; +use HttpServer\Service\Receive; use HttpServer\Service\Websocket; -use Exception; -use ReflectionException; use Snowflake\Abstracts\Config; use Snowflake\Error\LoggerProcess; use Snowflake\Event; use Snowflake\Exception\ConfigException; -use Snowflake\Exception\NotFindClassException; use Snowflake\Process\Biomonitoring; use Snowflake\Snowflake; use Swoole\Coroutine; @@ -35,241 +33,245 @@ defined('PID_PATH') or define('PID_PATH', APP_PATH . 'storage/server.pid'); class Server extends HttpService { - const HTTP = 'HTTP'; - const TCP = 'TCP'; - const PACKAGE = 'PACKAGE'; - const WEBSOCKET = 'WEBSOCKET'; + const HTTP = 'HTTP'; + const TCP = 'TCP'; + const PACKAGE = 'PACKAGE'; + const WEBSOCKET = 'WEBSOCKET'; - private array $server = [ - 'HTTP' => [SWOOLE_TCP, Http::class], - 'TCP' => [SWOOLE_TCP, Receive::class], - 'PACKAGE' => [SWOOLE_UDP, Packet::class], - 'WEBSOCKET' => [SWOOLE_SOCK_TCP, Websocket::class], - ]; + private array $server = [ + 'HTTP' => [SWOOLE_TCP, Http::class], + 'TCP' => [SWOOLE_TCP, Receive::class], + 'PACKAGE' => [SWOOLE_UDP, Packet::class], + 'WEBSOCKET' => [SWOOLE_SOCK_TCP, Websocket::class], + ]; - private Packet|Websocket|Receive|null|Http $swoole = null; + private Packet|Websocket|Receive|null|Http $swoole = null; - public int $daemon = 0; + public int $daemon = 0; - private array $process = [ - 'biomonitoring' => Biomonitoring::class, - 'logger_process' => LoggerProcess::class - ]; + private array $process = [ + 'biomonitoring' => Biomonitoring::class, + 'logger_process' => LoggerProcess::class + ]; - private array $params = []; + private array $params = []; - /** - * @param $name - * @param $process - * @param array $params - */ - public function addProcess($name, $process, $params = []) - { - $this->process[$name] = $process; - $this->params[$name] = $params; - } + /** + * @param $name + * @param $process + * @param array $params + */ + public function addProcess($name, $process, $params = []) + { + $this->process[$name] = $process; + $this->params[$name] = $params; + } - /** - * @return array - */ - public function getProcesses(): array - { - return $this->process ?? []; - } + /** + * @return array + */ + public function getProcesses(): array + { + return $this->process ?? []; + } - /** - * @param $configs - * @return Packet|Websocket|Receive|Http|null - * @throws Exception - */ - private function initCore($configs): Packet|Websocket|Receive|Http|null - { - $servers = $this->sortServers($configs); - foreach ($servers as $server) { - $this->create($server); - if (!$this->swoole) { - throw new Exception('Base service create fail.'); - } - } - return $this->startRpcService(); - } + /** + * @param $configs + * @return Packet|Websocket|Receive|Http|null + * @throws Exception + */ + private function initCore($configs): Packet|Websocket|Receive|Http|null + { + $servers = $this->sortServers($configs); + foreach ($servers as $server) { + $this->create($server); + if (!$this->swoole) { + throw new Exception('Base service create fail.'); + } + } + return $this->startRpcService(); + } - /** - * @return string start server - * - * start server - * @throws ConfigException - * @throws Exception - */ - public function start(): string - { - $configs = Config::get('servers', [], true); + /** + * @return string start server + * + * start server + * @throws ConfigException + * @throws Exception + */ + public function start(): string + { + $configs = Config::get('servers', [], true); - $baseServer = $this->initCore($configs); - if (!$baseServer) { - return 'ok'; - } + $baseServer = $this->initCore($configs); + if (!$baseServer) { + return 'ok'; + } - Runtime::enableCoroutine(true, SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_BLOCKING_FUNCTION); + Runtime::enableCoroutine(true, SWOOLE_HOOK_TCP | + SWOOLE_HOOK_UNIX | SWOOLE_HOOK_UDP | SWOOLE_HOOK_UDG | + SWOOLE_HOOK_SSL | SWOOLE_HOOK_TLS | SWOOLE_HOOK_SLEEP | + SWOOLE_HOOK_STREAM_FUNCTION | SWOOLE_HOOK_PROC + ); - Coroutine::set(['enable_deadlock_check' => false]); + Coroutine::set(['enable_deadlock_check' => false]); - return $this->execute($baseServer); - } + return $this->execute($baseServer); + } - /** - * @param $baseServer - * @return mixed - * @throws Exception - */ - private function execute($baseServer): mixed - { - $app = Snowflake::app(); - $app->set('base-server', $baseServer); - return $baseServer->start(); - } + /** + * @param $baseServer + * @return mixed + * @throws Exception + */ + private function execute($baseServer): mixed + { + $app = Snowflake::app(); + $app->set('base-server', $baseServer); + return $baseServer->start(); + } - /** - * @param $host - * @param $Port - * @return Packet|Websocket|Receive|Http|null - * @throws Exception - */ - public function error_stop($host, $Port): Packet|Websocket|Receive|Http|null - { - $this->error(sprintf('Port %s::%d is already.', $host, $Port)); - if ($this->swoole) { - $this->swoole->shutdown(); - } else { - $this->shutdown(); - } - return $this->swoole; - } + /** + * @param $host + * @param $Port + * @return Packet|Websocket|Receive|Http|null + * @throws Exception + */ + public function error_stop($host, $Port): Packet|Websocket|Receive|Http|null + { + $this->error(sprintf('Port %s::%d is already.', $host, $Port)); + if ($this->swoole) { + $this->swoole->shutdown(); + } else { + $this->shutdown(); + } + return $this->swoole; + } - /** - * @return bool - * @throws ConfigException - * @throws Exception - */ - public function isRunner(): bool - { - $port = $this->sortServers(Config::get('servers')); - if (empty($port)) { - return false; - } - foreach ($port as $value) { - if ($this->checkPort($value['port'])) { - return true; - } - } - return false; - } + /** + * @return bool + * @throws ConfigException + * @throws Exception + */ + public function isRunner(): bool + { + $port = $this->sortServers(Config::get('servers')); + if (empty($port)) { + return false; + } + foreach ($port as $value) { + if ($this->checkPort($value['port'])) { + return true; + } + } + return false; + } - /** - * @param $port - * @return bool - * @throws Exception - */ - private function checkPort($port): bool - { - if (Snowflake::getPlatform()->isLinux()) { - exec('netstat -tunlp | grep ' . $port, $output); - } else { - exec('lsof -i :' . $port . ' | grep -i "LISTEN"', $output); - } - return !empty($output); - } + /** + * @param $port + * @return bool + * @throws Exception + */ + private function checkPort($port): bool + { + if (Snowflake::getPlatform()->isLinux()) { + exec('netstat -tunlp | grep ' . $port, $output); + } else { + exec('lsof -i :' . $port . ' | grep -i "LISTEN"', $output); + } + return !empty($output); + } - /** - * @return void - * - * start server - * @throws Exception - */ - public function shutdown() - { - /** @var Shutdown $shutdown */ - $shutdown = Snowflake::app()->get('shutdown'); - $shutdown->shutdown(); - } + /** + * @return void + * + * start server + * @throws Exception + */ + public function shutdown() + { + /** @var Shutdown $shutdown */ + $shutdown = Snowflake::app()->get('shutdown'); + $shutdown->shutdown(); + } - /** - * @throws ConfigException - * @throws Exception - */ - public function onProcessListener(): \Swoole\Server|null|Packet|Receive|Http|Websocket - { - if (!($this->swoole instanceof \Swoole\Server)) { - return $this->swoole; - } + /** + * @throws ConfigException + * @throws Exception + */ + public function onProcessListener(): \Swoole\Server|null|Packet|Receive|Http|Websocket + { + if (!($this->swoole instanceof \Swoole\Server)) { + return $this->swoole; + } - $processes = Config::get('processes'); - if (!empty($processes) && is_array($processes)) { - $this->deliveryProcess(merge($processes, $this->process)); - } else { - $this->deliveryProcess($this->process); - } - return $this->swoole; - } + $processes = Config::get('processes'); + if (!empty($processes) && is_array($processes)) { + $this->deliveryProcess(merge($processes, $this->process)); + } else { + $this->deliveryProcess($this->process); + } + return $this->swoole; + } - /** - * @param $processes - * @throws Exception - */ - private function deliveryProcess($processes) - { - $application = Snowflake::app(); - if (empty($processes) || !is_array($processes)) { - return; - } - foreach ($processes as $name => $process) { - $this->debug(sprintf('Process %s', $process)); - if (!is_string($process)) { - continue; - } - $system = Snowflake::createObject($process, [Snowflake::app(), $name, true]); - if (isset($this->params[$name]) && !empty($this->params[$name])) { - $system->write(swoole_serialize($this->params[$name])); - } - $this->swoole->addProcess($system); - $application->set($process, $system); - } - } + /** + * @param $processes + * @throws Exception + */ + private function deliveryProcess($processes) + { + $application = Snowflake::app(); + if (empty($processes) || !is_array($processes)) { + return; + } + foreach ($processes as $name => $process) { + $this->debug(sprintf('Process %s', $process)); + if (!is_string($process)) { + continue; + } + $system = Snowflake::createObject($process, [Snowflake::app(), $name, true]); + if (isset($this->params[$name]) && !empty($this->params[$name])) { + $system->write(swoole_serialize($this->params[$name])); + } + $this->swoole->addProcess($system); + $application->set($process, $system); + } + } - /** - * @param $daemon - * @return Server - */ - public function setDaemon($daemon): static - { - if (!in_array($daemon, [0, 1])) { - return $this; - } - $this->daemon = $daemon; - return $this; - } + /** + * @param $daemon + * @return Server + */ + public function setDaemon($daemon): static + { + if (!in_array($daemon, [0, 1])) { + return $this; + } + $this->daemon = $daemon; + return $this; + } - /** - * @return Packet|Websocket|Receive|Http|null - */ - public function getServer(): Packet|Websocket|Receive|Http|null - { - return $this->swoole; - } + /** + * @return Packet|Websocket|Receive|Http|null + */ + public function getServer(): Packet|Websocket|Receive|Http|null + { + return $this->swoole; + } /** @@ -278,229 +280,229 @@ class Server extends HttpService * @throws ConfigException * @throws Exception */ - private function create($config): \Swoole\Server|null|Packet|Receive|Http|Websocket - { - $settings = Config::get('settings', []); - if (!isset($this->server[$config['type']])) { - throw new Exception('Unknown server type(' . $config['type'] . ').'); - } - $server = $this->dispatchCreate($config, $settings); - if (isset($config['events'])) { - $this->createEventListen($config); - } - return $server; - } + private function create($config): \Swoole\Server|null|Packet|Receive|Http|Websocket + { + $settings = Config::get('settings', []); + if (!isset($this->server[$config['type']])) { + throw new Exception('Unknown server type(' . $config['type'] . ').'); + } + $server = $this->dispatchCreate($config, $settings); + if (isset($config['events'])) { + $this->createEventListen($config); + } + return $server; + } - /** - * @param $config - * @throws Exception - */ - protected function createEventListen($config) - { - if (!is_array($config['events'])) { - return; - } - foreach ($config['events'] as $name => $_event) { - Event::on('listen ' . $config['port'] . ' ' . $name, $_event); - } - } + /** + * @param $config + * @throws Exception + */ + protected function createEventListen($config) + { + if (!is_array($config['events'])) { + return; + } + foreach ($config['events'] as $name => $_event) { + Event::on('listen ' . $config['port'] . ' ' . $name, $_event); + } + } - /** - * @param $config - * @param $settings - * @return \Swoole\Server|Packet|Receive|Http|Websocket|null - * @throws Exception - */ - private function dispatchCreate($config, $settings): \Swoole\Server|Packet|Receive|Http|Websocket|null - { - if (Snowflake::port_already($config['port'])) { - return $this->error_stop($config['host'], $config['port']); - } - if (!($this->swoole instanceof \Swoole\Server)) { - return $this->parseServer($config, $settings); - } - return $this->addListener($config); - } + /** + * @param $config + * @param $settings + * @return \Swoole\Server|Packet|Receive|Http|Websocket|null + * @throws Exception + */ + private function dispatchCreate($config, $settings): \Swoole\Server|Packet|Receive|Http|Websocket|null + { + if (Snowflake::port_already($config['port'])) { + return $this->error_stop($config['host'], $config['port']); + } + if (!($this->swoole instanceof \Swoole\Server)) { + return $this->parseServer($config, $settings); + } + return $this->addListener($config); + } - /** - * @param $config - * @return Http|Packet|Receive|Websocket|null - * @throws Exception - */ - private function addListener($config): Packet|Websocket|Receive|Http|null - { - $newListener = $this->swoole->addlistener($config['host'], $config['port'], $config['mode']); - if (!$newListener) { - exit($this->addError(sprintf('Listen %s::%d fail.', $config['host'], $config['port']))); - } + /** + * @param $config + * @return Http|Packet|Receive|Websocket|null + * @throws Exception + */ + private function addListener($config): Packet|Websocket|Receive|Http|null + { + $newListener = $this->swoole->addlistener($config['host'], $config['port'], $config['mode']); + if (!$newListener) { + exit($this->addError(sprintf('Listen %s::%d fail.', $config['host'], $config['port']))); + } - if (isset($config['settings']) && is_array($config['settings'])) { - $newListener->set($config['settings']); - } - $this->onListenerBind($config); + if (isset($config['settings']) && is_array($config['settings'])) { + $newListener->set($config['settings']); + } + $this->onListenerBind($config); - return $this->swoole; - } + return $this->swoole; + } - /** - * @param $mode - * @return bool - */ - public static function isTcp($mode) - { - return in_array($mode, [SWOOLE_SOCK_TCP, SWOOLE_TCP, SWOOLE_TCP6, SWOOLE_SOCK_TCP6]); - } + /** + * @param $mode + * @return bool + */ + public static function isTcp($mode) + { + return in_array($mode, [SWOOLE_SOCK_TCP, SWOOLE_TCP, SWOOLE_TCP6, SWOOLE_SOCK_TCP6]); + } - /** - * @param $mode - * @return bool - */ - public static function isUdp($mode) - { - return in_array($mode, [SWOOLE_SOCK_UDP, SWOOLE_UDP, SWOOLE_UDP6, SWOOLE_SOCK_UDP6]); - } + /** + * @param $mode + * @return bool + */ + public static function isUdp($mode) + { + return in_array($mode, [SWOOLE_SOCK_UDP, SWOOLE_UDP, SWOOLE_UDP6, SWOOLE_SOCK_UDP6]); + } - /** - * @return Packet|Websocket|Receive|Http|null - * @throws ConfigException - * @throws Exception - */ - private function startRpcService(): Packet|Websocket|Receive|Http|null - { - $rpcService = Config::get('rpc', []); - if (empty($rpcService)) { - return $this->swoole; - } - $this->addListener($rpcService); - return $this->swoole; - } + /** + * @return Packet|Websocket|Receive|Http|null + * @throws ConfigException + * @throws Exception + */ + private function startRpcService(): Packet|Websocket|Receive|Http|null + { + $rpcService = Config::get('rpc', []); + if (empty($rpcService)) { + return $this->swoole; + } + $this->addListener($rpcService); + return $this->swoole; + } - /** - * @param $config - * @param $settings - * @return Packet|Websocket|Receive|Http|null - * @throws Exception - */ - private function parseServer($config, $settings): Packet|Websocket|Receive|Http|null - { - $class = $this->dispatch($config['type']); - if (is_array($config['settings'] ?? null)) { - $settings = array_merge($settings, $config['settings']); - } - $this->debug(Snowflake::listen($config)); - $this->swoole = $this->createServer($class, $config); - $this->swoole->set(array_merge($settings, [ - 'daemonize' => $this->daemon, - 'pid_file' => $settings['pid_file'] ?? PID_PATH - ])); - return $this->onProcessListener(); - } + /** + * @param $config + * @param $settings + * @return Packet|Websocket|Receive|Http|null + * @throws Exception + */ + private function parseServer($config, $settings): Packet|Websocket|Receive|Http|null + { + $class = $this->dispatch($config['type']); + if (is_array($config['settings'] ?? null)) { + $settings = array_merge($settings, $config['settings']); + } + $this->debug(Snowflake::listen($config)); + $this->swoole = $this->createServer($class, $config); + $this->swoole->set(array_merge($settings, [ + 'daemonize' => $this->daemon, + 'pid_file' => $settings['pid_file'] ?? PID_PATH + ])); + return $this->onProcessListener(); + } - /** - * @param $class - * @param $config - * @return mixed - */ - private function createServer($class, $config): mixed - { - return new $class($config['host'], $config['port'], SWOOLE_PROCESS, $config['mode']); - } + /** + * @param $class + * @param $config + * @return mixed + */ + private function createServer($class, $config): mixed + { + return new $class($config['host'], $config['port'], SWOOLE_PROCESS, $config['mode']); + } - /** - * @param $config - * @return Packet|Websocket|Receive|Http|null - * @throws Exception - */ - private function onListenerBind($config): Packet|Websocket|Receive|Http|null - { - $this->bindServerEvent($config['type']); + /** + * @param $config + * @return Packet|Websocket|Receive|Http|null + * @throws Exception + */ + private function onListenerBind($config): Packet|Websocket|Receive|Http|null + { + $this->bindServerEvent($config['type']); - $this->debug(sprintf('Check listen %s::%d -> ok', $config['host'], $config['port'])); + $this->debug(sprintf('Check listen %s::%d -> ok', $config['host'], $config['port'])); - return $this->swoole; - } + return $this->swoole; + } /** * @param string $type * @throws Exception */ - private function bindServerEvent($type = self::TCP) - { - if (in_array($type, [self::PACKAGE, self::TCP])) { - $this->onBindCallback('connect', [make(OnConnect::class), 'onHandler']); - $this->onBindCallback('close', [make(OnClose::class), 'onHandler']); - if ($type == self::PACKAGE) { - $this->onBindCallback('packet', [make(OnPacket::class), 'onHandler']); - } else if ($type == self::TCP) { - $this->onBindCallback('receive', [make(OnReceive::class), 'onHandler']); - } - } else if ($type === self::HTTP) { - $this->onBindCallback('request', [make(OnRequest::class), 'onHandler']); - } else { - throw new Exception('Unknown server type(' . $type . ').'); - } - } + private function bindServerEvent($type = self::TCP) + { + if (in_array($type, [self::PACKAGE, self::TCP])) { + $this->onBindCallback('connect', [make(OnConnect::class), 'onHandler']); + $this->onBindCallback('close', [make(OnClose::class), 'onHandler']); + if ($type == self::PACKAGE) { + $this->onBindCallback('packet', [make(OnPacket::class), 'onHandler']); + } else if ($type == self::TCP) { + $this->onBindCallback('receive', [make(OnReceive::class), 'onHandler']); + } + } else if ($type === self::HTTP) { + $this->onBindCallback('request', [make(OnRequest::class), 'onHandler']); + } else { + throw new Exception('Unknown server type(' . $type . ').'); + } + } - /** - * @param $name - * @param $callback - * @throws Exception - */ - public function onBindCallback($name, $callback) - { - if ($this->swoole->getCallback($name) !== null) { - return; - } - $this->swoole->on($name, $callback); - } + /** + * @param $name + * @param $callback + * @throws Exception + */ + public function onBindCallback($name, $callback) + { + if ($this->swoole->getCallback($name) !== null) { + return; + } + $this->swoole->on($name, $callback); + } - /** - * @param $type - * @return string - */ - private function dispatch($type): string - { - return match ($type) { - self::HTTP => Http::class, - self::WEBSOCKET => Websocket::class, - self::PACKAGE => Packet::class, - default => Receive::class - }; - } + /** + * @param $type + * @return string + */ + private function dispatch($type): string + { + return match ($type) { + self::HTTP => Http::class, + self::WEBSOCKET => Websocket::class, + self::PACKAGE => Packet::class, + default => Receive::class + }; + } - /** - * @param $servers - * @return array - */ - private function sortServers($servers): array - { - $array = []; - foreach ($servers as $server) { - switch ($server['type']) { - case self::WEBSOCKET: - array_unshift($array, $server); - break; - case self::HTTP: - case self::PACKAGE | self::TCP: - $array[] = $server; - break; - default: - $array[] = $server; - } - } - return $array; - } + /** + * @param $servers + * @return array + */ + private function sortServers($servers): array + { + $array = []; + foreach ($servers as $server) { + switch ($server['type']) { + case self::WEBSOCKET: + array_unshift($array, $server); + break; + case self::HTTP: + case self::PACKAGE | self::TCP: + $array[] = $server; + break; + default: + $array[] = $server; + } + } + return $array; + } }