From 51c3637ed2d18c4a876d607429670c890a14b211 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mr=C2=B7x?= Date: Sat, 17 Jul 2021 02:16:49 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .phpstorm.meta.php | 13 + HttpServer/Http/Formatter/JsonFormatter.php | 2 +- HttpServer/Http/Response.php | 592 ++++++++++---------- HttpServer/Route/Router.php | 27 +- ex/BASEServerListener.php | 151 ++++- ex/HTTPServerListener.php | 22 +- ex/ListenerHelper.php | 12 +- ex/Manager/ServerBase.php | 37 ++ ex/Manager/ServerManager.php | 19 + ex/Router.php | 33 ++ ex/SInterface/CustomProcess.php | 37 ++ ex/TCPServerListener.php | 89 +-- ex/Task/ServerTask.php | 4 +- ex/UDPServerListener.php | 4 + ex/WebSocketServerListener.php | 110 +++- ex/Worker/ServerWorker.php | 52 ++ ex/server.php | 78 +-- ex/test/RelationshipSystemProcess.php | 50 ++ 18 files changed, 912 insertions(+), 420 deletions(-) create mode 100644 .phpstorm.meta.php create mode 100644 ex/Manager/ServerBase.php create mode 100644 ex/Manager/ServerManager.php create mode 100644 ex/Router.php create mode 100644 ex/SInterface/CustomProcess.php create mode 100644 ex/Worker/ServerWorker.php create mode 100644 ex/test/RelationshipSystemProcess.php diff --git a/.phpstorm.meta.php b/.phpstorm.meta.php new file mode 100644 index 00000000..2ea8ba78 --- /dev/null +++ b/.phpstorm.meta.php @@ -0,0 +1,13 @@ + JsonFormatter::class, - self::XML => XmlFormatter::class, - self::HTML => HtmlFormatter::class - ]; + private array $_format_maps = [ + self::JSON => JsonFormatter::class, + self::XML => XmlFormatter::class, + self::HTML => HtmlFormatter::class + ]; - public int $fd = 0; + public int $fd = 0; - /** - * @param $format - * @return $this - */ - public function setFormat($format): static - { - $this->format = $format; - return $this; - } + /** + * @param $format + * @return $this + */ + public function setFormat($format): static + { + $this->format = $format; + return $this; + } - /** - * 清理无用数据 - */ - public function clear(): void - { - $this->fd = 0; - $this->isWebSocket = false; - $this->format = null; - } + /** + * 清理无用数据 + */ + public function clear(): void + { + $this->fd = 0; + $this->format = null; + } - /** - * @return string - */ - public function getContentType(): string - { - if ($this->format == null || $this->format == static::JSON) { - return 'application/json;charset=utf-8'; - } else if ($this->format == static::XML) { - return 'application/xml;charset=utf-8'; - } else { - return 'text/html;charset=utf-8'; - } - } + /** + * @return string + */ + public function getContentType(): string + { + if ($this->format == null || $this->format == static::JSON) { + return 'application/json;charset=utf-8'; + } else if ($this->format == static::XML) { + return 'application/xml;charset=utf-8'; + } else { + return 'text/html;charset=utf-8'; + } + } - /** - * @param $content - * @return mixed - */ - public function toHtml($content): mixed - { - $this->format = self::HTML; - return $content; - } + /** + * @param $content + * @return string + */ + public function toHtml($content): string + { + $this->format = self::HTML; + return (string)$content; + } - /** - * @param $content - * @return mixed - */ - public function toJson($content): mixed - { - $this->format = self::JSON; - return $content; - } + /** + * @param $content + * @return string|bool + */ + public function toJson($content): string|bool + { + $this->format = self::JSON; + return json_encode($content, JSON_UNESCAPED_UNICODE); + } - /** - * @param $content - * @return mixed - */ - public function toXml($content): mixed - { - $this->format = self::XML; - return $content; - } + /** + * @param $content + * @return mixed + */ + public function toXml($content): mixed + { + $this->format = self::XML; + return $content; + } - /** - * @return mixed - * @throws Exception - */ - public function sender(): mixed - { - return $this->send(func_get_args()); - } + /** + * @return mixed + * @throws Exception + */ + public function sender(): mixed + { + return $this->send(func_get_args()); + } - /** - * @param $key - * @param $value - * @return Response - */ - public function addHeader($key, $value): static - { - $this->headers[$key] = $value; - return $this; - } + /** + * @param $key + * @param $value + * @return Response + */ + public function addHeader($key, $value): static + { + $this->headers[$key] = $value; + return $this; + } - /** - * @return bool - */ - private function isClient(): bool - { - return !($this->response instanceof SResponse) && !($this->response instanceof S2Response); - } + /** + * @param $name + * @param null $value + * @param null $expires + * @param null $path + * @param null $domain + * @param null $secure + * @param null $httponly + * @param null $samesite + * @param null $priority + * @return Response + */ + public function addCookie($name, $value = null, $expires = null, $path = null, $domain = null, $secure = null, $httponly = null, $samesite = null, $priority = null): static + { + $this->cookies[] = func_get_args(); + return $this; + } /** * @param mixed $context @@ -155,212 +162,211 @@ class Response extends HttpService * @return bool * @throws Exception */ - public function send(mixed $context = '', int $statusCode = 200): mixed - { - $sendData = $this->parseData($context); + public function send(mixed $context = '', int $statusCode = 200): mixed + { + $sendData = $this->parseData($context); + $response = Context::getContext('response'); + if ($response instanceof SResponse) { + $this->sendData($response, $sendData, $statusCode); + } else { + $this->printResult($sendData); + } + return $sendData; + } - $response = Context::getContext('response'); - if ($response instanceof SResponse) { - $this->sendData($response, $sendData, $statusCode); - } else { - if (!empty(request()->fd)) { - return ''; - } - $this->printResult($sendData); - } - return $sendData; - } + /** + * @param $context + * @return mixed + * @throws Exception + */ + private function parseData($context): mixed + { + if (!empty($context) && !is_string($context)) { + /** @var IFormatter $class */ + $class = $this->_format_maps[$this->format] ?? HtmlFormatter::class; - /** - * @param $context - * @return mixed - * @throws Exception - */ - private function parseData($context): mixed - { - if (empty($context)) { - return $context; - } - if (isset($this->_format_maps[$this->format])) { - $class['class'] = $this->_format_maps[$this->format]; - } else { - $class['class'] = HtmlFormatter::class; - } - $format = Snowflake::createObject($class); - return $format->send($context)->getData(); - } + $di = Snowflake::getDi()->get($class); + $context = $di->send($context)->getData(); + } + return $context; + } - /** - * @param $result - * @return void - * @throws Exception - */ - private function printResult($result): void - { - $result = Help::toString($result); - $string = PHP_EOL . 'Command Result: ' . PHP_EOL . PHP_EOL; + /** + * @param $result + * @return void + * @throws Exception + */ + private function printResult($result): void + { + $result = Help::toString($result); + $string = PHP_EOL . 'Command Result: ' . PHP_EOL . PHP_EOL; + fire('CONSOLE_END'); + if (str_contains((string)$result, 'Event::rshutdown(): Event::wait()')) { + return; + } + if (empty($result)) { + $string .= 'success!' . PHP_EOL . PHP_EOL; + } else { + $string .= $result . PHP_EOL . PHP_EOL; + } + $string .= 'Command End!' . PHP_EOL . PHP_EOL; + print_r($string); + } - fire('CONSOLE_END'); - if (str_contains((string)$result, 'Event::rshutdown(): Event::wait()')) { - return; - } - if (empty($result)) { - $string .= 'success!' . PHP_EOL . PHP_EOL; - } else { - $string .= $result . PHP_EOL . PHP_EOL; - } - $string .= 'Command End!' . PHP_EOL . PHP_EOL; - print_r($string); - } - - /** - * @param $response - * @param $sendData - * @param $status - * @throws Exception - */ - private function sendData($response, $sendData, $status): void - { - $server = Snowflake::app()->getSwoole(); - if (!$server->exist($response->fd)) { - return; - } - if (is_array($sendData)) { - $sendData = Json::encode($sendData); - } - $this->setHeaders($response, $status)->end($sendData); - } + /** + * @param SResponse $response + * @param $sendData + * @param $status + * @throws Exception + */ + private function sendData(SResponse $response, $sendData, $status): void + { + if (!$response->isWritable()) { + return; + } + $this->setHeaders($response); + $response->status($status); + $response->end($sendData); + } - /** - * @param SResponse $response - * @param $status - * @return SResponse - */ - private function setHeaders(SResponse $response, $status): SResponse - { - $response->status($status); - $response->header('Content-Type', $this->getContentType()); - $response->header('Run-Time', $this->getRuntime()); - - if (empty($this->headers) || !is_array($this->headers)) { - return $response; - } - foreach ($this->headers as $key => $header) { - $response->header($key, $header); - } - $this->headers = []; - return $response; - } + /** + * @param SResponse $response + * @return void + */ + private function setHeaders(SResponse $response): void + { + $response->header('Content-Type', $this->getContentType()); + $response->header('Run-Time', $this->getRuntime()); + if (empty($this->headers) || !is_array($this->headers)) { + return; + } + foreach ($this->headers as $key => $header) { + $response->header($key, $header); + } + $this->headers = []; + } - /** - * @param $url - * @param array $param - * @return int - */ - public function redirect($url, array $param = []): int - { - if (!empty($param)) { - $url .= '?' . http_build_query($param); - } - $url = ltrim($url, '/'); - if (!preg_match('/^http/', $url)) { - $url = '/' . $url; - } - return $this->response->redirect($url); - } - - /** - * @param null $response - * @return static - * @throws Exception - */ - public static function create($response = null): static - { - Context::setContext('response', $response); - - $ciResponse = Snowflake::getApp('response'); - $ciResponse->response = $response; - $ciResponse->startTime = microtime(true); - $ciResponse->format = self::JSON; - return $ciResponse; - } + /** + * @param SResponse $response + * @return void + */ + private function setCookies(SResponse $response): void + { + if (empty($this->cookies) || !is_array($this->cookies)) { + return; + } + foreach ($this->cookies as $header) { + $response->setCookie(...$header); + } + $this->cookies = []; + } - /** - * @param int $statusCode - * @param string $message - * @return mixed - * @throws Exception - */ - public function close(int $statusCode = 200, string $message = ''): mixed - { - return $this->send($message, $statusCode); - } + /** + * @param $url + * @param array $param + * @return int + */ + public function redirect($url, array $param = []): mixed + { + if (!empty($param)) { + $url .= '?' . http_build_query($param); + } + $url = ltrim($url, '/'); + if (!preg_match('/^http/', $url)) { + $url = '/' . $url; + } + /** @var SResponse $response */ + $response = Context::getContext('response'); + if (!empty($response)) { + return $response->redirect($url); + } + return false; + } - /** - * @param $clientId - * @param int $statusCode - * @param string $message - * @return mixed - */ - public function closeClient($clientId,int $statusCode = 200,string $message = ''): mixed - { - $socket = Snowflake::getWebSocket(); - if (!$socket->exist($clientId)) { - return true; - } - return $socket->close($clientId, true); - } + /** + * @param null $response + * @return static + * @throws Exception + */ + public static function create($response = null): static + { + Context::setContext('response', $response); + $ciResponse = Snowflake::getDi()->get(SResponse::class); + $ciResponse->startTime = microtime(true); + $ciResponse->format = self::JSON; + return $ciResponse; + } - /** - * @param string $path - * @param int $offset - * @param int $limit - * @param int $sleep - * @return string - */ - public function sendFile(string $path, int $offset = 0, int $limit = 1024000, int $sleep = 0): string - { - $open = fopen($path, 'r'); - - $stat = fstat($open); - - while ($file = fread($open, $limit)) { - $this->response->write($file); - fseek($open, $offset); - if ($sleep > 0) { - sleep($sleep); - } - if ($offset >= $stat['size']) { - break; - } - $offset += $limit; - } - $this->response->end(); - $this->response = null; - return ''; - } + /** + * @param int $statusCode + * @param string $message + * @return mixed + * @throws Exception + */ + public function close(int $statusCode = 200, string $message = ''): mixed + { + return $this->send($message, $statusCode); + } - /** - * @throws Exception - */ - public function sendNotFind() - { - $this->format = static::HTML; - $this->send('', 404); - } + /** + * @param $clientId + * @param int $statusCode + * @param string $message + * @return mixed + */ + public function closeClient($clientId, int $statusCode = 200, string $message = ''): mixed + { + $socket = Snowflake::getWebSocket(); + if (!$socket->exist($clientId)) { + return true; + } + return $socket->close($clientId, true); + } - /** - * @return string - */ - #[Pure] public function getRuntime(): string - { - return sprintf('%.5f', microtime(TRUE) - $this->startTime); - } + + /** + * @param string $path + * @param int $offset + * @param int $limit + * @param int $sleep + * @return string + */ + public function sendFile(string $path, int $offset = 0, int $limit = 1024000, int $sleep = 0): string + { + $open = fopen($path, 'r'); + + $stat = fstat($open); + + + /** @var SResponse $response */ + $response = Context::getContext('response'); + $response->header('Content-length', $stat['size']); + while ($file = fread($open, $limit)) { + $response->write($file); + fseek($open, $offset); + if ($sleep > 0) sleep($sleep); + if ($offset >= $stat['size']) { + break; + } + $offset += $limit; + } + $response->end(); + return ''; + } + + + /** + * @return string + */ + #[Pure] public function getRuntime(): string + { + return sprintf('%.5f', microtime(TRUE) - $this->startTime); + } } diff --git a/HttpServer/Route/Router.php b/HttpServer/Route/Router.php index 80f2e56c..43991dfa 100644 --- a/HttpServer/Route/Router.php +++ b/HttpServer/Route/Router.php @@ -127,7 +127,7 @@ class Router extends HttpService implements RouterInterface * @param string $method * @return ?Node */ - private function hash($path, $handler, $method = 'any'): ?Node + private function hash($path, $handler, string $method = 'any'): ?Node { $path = $this->resolve($path); @@ -162,7 +162,7 @@ class Router extends HttpService implements RouterInterface * @return Node * @throws Exception */ - private function tree($path, $handler, $method = 'any'): Node + private function tree($path, $handler, string $method = 'any'): Node { list($first, $explode) = $this->split($path); @@ -243,12 +243,13 @@ class Router extends HttpService implements RouterInterface } - /** - * @param $port - * @param callable $callback - * @return false|mixed - */ - public function addRpcService($port, callable $callback) + /** + * @param $port + * @param callable $callback + * @return mixed + * @throws Exception + */ + public function addRpcService($port, callable $callback): mixed { return call_user_func($callback, new Actuator($port)); } @@ -296,8 +297,8 @@ class Router extends HttpService implements RouterInterface /** * @param $route * @param $handler - * @return \HttpServer\Route\Node|null - * @throws \Exception + * @return Node|null + * @throws Exception */ public function head($route, $handler): ?Node { @@ -318,12 +319,12 @@ class Router extends HttpService implements RouterInterface /** * @param $value - * @param $index - * @param $method + * @param int $index + * @param string $method * @return Node * @throws */ - public function NodeInstance($value, $index = 0, $method = 'get'): Node + public function NodeInstance($value, int $index = 0, string $method = 'get'): Node { $node = new Node(); $node->childes = []; diff --git a/ex/BASEServerListener.php b/ex/BASEServerListener.php index a266205a..2049224c 100644 --- a/ex/BASEServerListener.php +++ b/ex/BASEServerListener.php @@ -1,6 +1,9 @@ sortService($configs['server']['ports']) as $config) { $this->startListenerHandler($context, $config); } + $this->addProcess(RelationshipSystemProcess::class); + $this->addServerEventCallback($this->getSystemEvents($configs)); $context->server->start(); } + /** + * @param string|CustomProcess $customProcess + * @param null $redirect_stdin_and_stdout + * @param int|null $pipe_type + * @param bool $enable_coroutine + */ + public function addProcess(string|CustomProcess $customProcess, $redirect_stdin_and_stdout = null, ?int $pipe_type = SOCK_DGRAM, bool $enable_coroutine = true) + { + if (is_string($customProcess)) { + $implements = class_implements($customProcess); + if (!in_array(CustomProcess::class, $implements)) { + trigger_error('custom process must implement ' . CustomProcess::class); + } + $customProcess = new $customProcess($this->server); + } + /** @var Process $process */ + $process = $this->server->addProcess( + new Process( + function (Process $soloProcess) use ($customProcess) { + $soloProcess->name($customProcess->getProcessName($soloProcess)); + /** @var \Swoole\Coroutine\Socket $export */ + $export = $soloProcess->exportSocket(); + loop(function () use ($export, $customProcess) { + $read = $export->recv(); + if (!empty($read)) { + $customProcess->receive($read); + } + }); + $customProcess->onHandler($soloProcess); + }, + $redirect_stdin_and_stdout, + $pipe_type, + $enable_coroutine + ) + ); + /** @var \Swoole\Coroutine\Socket $socket */ + $socket = $process->exportSocket(); + $socket->send(""); + } + + + /** + * @param array $ports + * @return array + */ + private function sortService(array $ports): array + { + $array = []; + foreach ($ports as $port) { + if ($port['type'] == static::SERVER_TYPE_WEBSOCKET) { + array_unshift($array, $port); + } else if ($port['type'] == static::SERVER_TYPE_HTTP) { + if (!empty($array) && $array[0]['type'] == self::SERVER_TYPE_WEBSOCKET) { + $array[] = $port; + } else { + array_unshift($array, $port); + } + } else { + $array[] = $port; + } + } + return $array; + } + + + /** + * @param array $configs + * @return array + */ + private function getSystemEvents(array $configs): array + { + return array_intersect_key($configs['server']['events'] ?? [], [ + BASEServerListener::SERVER_ON_PIPE_MESSAGE => '', + BASEServerListener::SERVER_ON_SHUTDOWN => '', + BASEServerListener::SERVER_ON_WORKER_START => '', + BASEServerListener::SERVER_ON_WORKER_ERROR => '', + BASEServerListener::SERVER_ON_WORKER_EXIT => '', + BASEServerListener::SERVER_ON_WORKER_STOP => '', + BASEServerListener::SERVER_ON_MANAGER_START => '', + BASEServerListener::SERVER_ON_MANAGER_STOP => '', + BASEServerListener::SERVER_ON_BEFORE_RELOAD => '', + BASEServerListener::SERVER_ON_AFTER_RELOAD => '', + BASEServerListener::SERVER_ON_START => '', + ]); + } + + /** * @param BASEServerListener $context * @param array $config @@ -143,18 +252,18 @@ class BASEServerListener private function addNewListener(string $type, string $host, int $port, int $mode, array $settings = []) { switch ($type) { - case self::SERVER_TYPE_TCP: - TCPServerListener::instance($this->server, $host, $port, $mode, $settings); - break; - case self::SERVER_TYPE_UDP: - UDPServerListener::instance($this->server, $host, $port, $mode, $settings); - break; - case self::SERVER_TYPE_HTTP: - HTTPServerListener::instance($this->server, $host, $port, $mode, $settings); - break; - case self::SERVER_TYPE_WEBSOCKET: - WebSocketServerListener::instance($this->server, $host, $port, $mode, $settings); - break; + case self::SERVER_TYPE_TCP: + TCPServerListener::instance($this->server, $host, $port, $mode, $settings); + break; + case self::SERVER_TYPE_UDP: + UDPServerListener::instance($this->server, $host, $port, $mode, $settings); + break; + case self::SERVER_TYPE_HTTP: + HTTPServerListener::instance($this->server, $host, $port, $mode, $settings); + break; + case self::SERVER_TYPE_WEBSOCKET: + WebSocketServerListener::instance($this->server, $host, $port, $mode, $settings); + break; } } @@ -186,7 +295,7 @@ class BASEServerListener */ private function addDefaultListener(string $type, array $settings): void { - if ($this->server->setting['task_worker_num'] > 0) $this->addTaskListener($settings['events']); + if (($this->server->setting['task_worker_num'] ?? 0) > 0) $this->addTaskListener($settings['events']); if ($type === BASEServerListener::SERVER_TYPE_WEBSOCKET) { $this->server->on('handshake', $settings['events'][static::SERVER_ON_HANDSHAKE] ?? [WebSocketServerListener::class, 'onHandshake']); $this->server->on('message', $settings['events'][static::SERVER_ON_MESSAGE] ?? [WebSocketServerListener::class, 'onMessage']); @@ -198,7 +307,19 @@ class BASEServerListener } else { $this->server->on('receive', $settings['events'][static::SERVER_ON_RECEIVE] ?? [TCPServerListener::class, 'onReceive']); } - foreach ($settings['events'] as $event_type => $callback) { + $this->addServerEventCallback($settings['events']); + } + + + /** + * @param array $events + */ + private function addServerEventCallback(array $events) + { + if (count($events) < 1) { + return; + } + foreach ($events as $event_type => $callback) { if ($this->server->getCallback($event_type) !== null) { continue; } diff --git a/ex/HTTPServerListener.php b/ex/HTTPServerListener.php index 32a32f4b..5b97719c 100644 --- a/ex/HTTPServerListener.php +++ b/ex/HTTPServerListener.php @@ -1,5 +1,7 @@ addlistener($host, $port, $mode); static::$_http->set($settings['settings'] ?? []); - if ($server->getCallback('request') === null) { - $server->on('request', $settings['events'][BASEServerListener::SERVER_ON_REQUEST] ?? [static::class, 'onRequest']); - } - static::onConnectAndClose($server, static::$_http); + static::$_http->on('request', $settings['events'][BASEServerListener::SERVER_ON_REQUEST] ?? [static::class, 'onRequest']); + static::onConnectAndClose($server, static::$_http); } @@ -38,7 +41,6 @@ class HTTPServerListener */ public static function onConnect(Server $server, int $fd) { - var_dump(__FILE__ . ':' . __LINE__); } @@ -48,7 +50,15 @@ class HTTPServerListener */ public static function onRequest(Request $request, Response $response) { - $response->setStatusCode(200); + $controller = Router::findPath($request->server['request_uri']); + if (empty($controller)) { + $response->status(404); + } else { + $response->status(200); + } + if (!$response->isWritable()) { + return; + } $response->end(''); } diff --git a/ex/ListenerHelper.php b/ex/ListenerHelper.php index 877d1eda..92d432f3 100644 --- a/ex/ListenerHelper.php +++ b/ex/ListenerHelper.php @@ -11,12 +11,12 @@ trait ListenerHelper */ public static function onConnectAndClose($server, $newServer) { - if (in_array($server->setting['dispatch_mode'] ?? 2, [1, 3])){ - return; - } - if (!($server->setting['enable_unsafe_event'] ?? false)) { - return; - } +// if (in_array($server->setting['dispatch_mode'] ?? 2, [1, 3])){ +// return; +// } +// if (!($server->setting['enable_unsafe_event'] ?? false)) { +// return; +// } $newServer->on('connect', $settings['events'][BASEServerListener::SERVER_ON_CONNECT] ?? [static::class, 'onConnect']); $newServer->on('close', $settings['events'][BASEServerListener::SERVER_ON_CLOSE] ?? [static::class, 'onClose']); } diff --git a/ex/Manager/ServerBase.php b/ex/Manager/ServerBase.php new file mode 100644 index 00000000..934afd99 --- /dev/null +++ b/ex/Manager/ServerBase.php @@ -0,0 +1,37 @@ +addlistener($host, $port, $mode); - static::$_tcp->set($settings); - static::$_tcp->on('receive', $settings['events'][BASEServerListener::SERVER_ON_RECEIVE] ?? [static::class, 'onReceive']); - static::onConnectAndClose($server, static::$_tcp); - } + /** + * UDPServerListener constructor. + * @param Server $server + * @param string $host + * @param int $port + * @param int $mode + * @param array|null $settings + */ + public static function instance(Server $server, string $host, int $port, int $mode, ?array $settings = []) + { + if (!in_array($mode, [SWOOLE_TCP, SWOOLE_TCP6])) { + trigger_error('Port mode ' . $host . '::' . $port . ' must is tcp listener type.'); + } + static::$_tcp = $server->addlistener($host, $port, $mode); + static::$_tcp->set($settings['settings'] ?? []); + static::$_tcp->on('receive', $settings['events'][BASEServerListener::SERVER_ON_RECEIVE] ?? [static::class, 'onReceive']); + static::onConnectAndClose($server, static::$_tcp); + } - /** - * @param Server $server - * @param int $fd - */ - public static function onConnect(Server $server, int $fd) - { - var_dump(__FILE__ . ':' . __LINE__); - } + /** + * @param Server $server + * @param int $fd + */ + public static function onConnect(Server $server, int $fd) + { + var_dump(__FILE__ . ':' . __LINE__); + } - /** - * @param Server $server - * @param int $fd - * @param int $reactor_id - * @param string $data - */ - public static function onReceive(Server $server, int $fd, int $reactor_id, string $data) - { - $server->send($fd, $data); - } + /** + * @param Server $server + * @param int $fd + * @param int $reactor_id + * @param string $data + */ + public static function onReceive(Server $server, int $fd, int $reactor_id, string $data) + { + var_dump($data); + $server->send($fd, $data); + } - /** - * @param Server $server - * @param int $fd - */ - public static function onClose(Server $server, int $fd) - { - } + /** + * @param Server $server + * @param int $fd + */ + public static function onClose(Server $server, int $fd) + { + } } diff --git a/ex/Task/ServerTask.php b/ex/Task/ServerTask.php index b84293b9..0d59e46a 100644 --- a/ex/Task/ServerTask.php +++ b/ex/Task/ServerTask.php @@ -28,7 +28,7 @@ class ServerTask } catch (\Throwable $exception) { $data = [$exception->getMessage()]; } finally { - $server->finish(serialize($data)); + $server->finish($data); } } @@ -48,7 +48,7 @@ class ServerTask } catch (\Throwable $exception) { $data = [$exception->getMessage()]; } finally { - $server->finish(serialize($data)); + $server->finish($data); } } diff --git a/ex/UDPServerListener.php b/ex/UDPServerListener.php index 845048bc..4d4669bc 100644 --- a/ex/UDPServerListener.php +++ b/ex/UDPServerListener.php @@ -1,5 +1,6 @@ addlistener($host, $port, $mode); static::$_udp->set($settings['settings'] ?? []); static::$_udp->on('packet', $settings['events'][BASEServerListener::SERVER_ON_PACKET] ?? [static::class, 'onPacket']); diff --git a/ex/WebSocketServerListener.php b/ex/WebSocketServerListener.php index 211c69ac..87ccc160 100644 --- a/ex/WebSocketServerListener.php +++ b/ex/WebSocketServerListener.php @@ -5,6 +5,8 @@ use Swoole\Http\Response; use Swoole\Server; use Swoole\WebSocket\Frame; +require_once 'ListenerHelper.php'; + /** * Class WebSocketServerListener @@ -13,7 +15,7 @@ use Swoole\WebSocket\Frame; class WebSocketServerListener { - protected static mixed $_http; + protected static Server\Port $_http; /** @@ -26,25 +28,125 @@ class WebSocketServerListener */ public static function instance(mixed $server, string $host, int $port, int $mode, ?array $settings = []) { + if (!in_array($mode, [SWOOLE_TCP, SWOOLE_TCP6])) { + trigger_error('Port mode ' . $host . '::' . $port . ' must is udp listener type.'); + } static::$_http = $server->addlistener($host, $port, $mode); static::$_http->set($settings['settings'] ?? []); static::$_http->on('handshake', $settings['events'][BASEServerListener::SERVER_ON_HANDSHAKE] ?? [static::class, 'onHandshake']); static::$_http->on('message', $settings['events'][BASEServerListener::SERVER_ON_MESSAGE] ?? [static::class, 'onMessage']); - static::$_http->on('connect', $settings['events'][BASEServerListener::SERVER_ON_CONNECT] ?? [static::class, 'onConnect']); static::$_http->on('close', $settings['events'][BASEServerListener::SERVER_ON_CLOSE] ?? [static::class, 'onClose']); - } + } /** * @param Request $request * @param Response $response + * @throws Exception */ public static function onHandshake(Request $request, Response $response) { + /** @var \Swoole\WebSocket\Server $server */ + $secWebSocketKey = $request->header['sec-websocket-key']; + $patten = '#^[+/0-9A-Za-z]{21}[AQgw]==$#'; + if (0 === preg_match($patten, $secWebSocketKey) || 16 !== strlen(base64_decode($secWebSocketKey))) { + throw new Exception('protocol error.', 500); + } + $key = base64_encode(sha1($request->header['sec-websocket-key'] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11', TRUE)); + $headers = [ + 'Upgrade' => 'websocket', + 'Connection' => 'Upgrade', + 'Sec-websocket-Accept' => $key, + 'Sec-websocket-Version' => '13', + ]; + if (isset($request->header['sec-websocket-protocol'])) { + $headers['Sec-websocket-Protocol'] = $request->header['sec-websocket-protocol']; + } + foreach ($headers as $key => $val) { + $response->setHeader($key, $val); + } + $response->setStatusCode(101); + $response->end(); } +// +// public static function decode($received): ?string +// { +// $decoded = null; +// $buffer = $received; +// $len = ord($buffer[1]) & 127; +// if ($len === 126) { +// $masks = substr($buffer, 4, 4); +// $data = substr($buffer, 8); +// } else { +// if ($len === 127) { +// $masks = substr($buffer, 10, 4); +// $data = substr($buffer, 14); +// } else { +// $masks = substr($buffer, 2, 4); +// $data = substr($buffer, 6); +// } +// } +// for ($index = 0; $index < strlen($data); $index++) { +// $decoded .= $data[$index] ^ $masks[$index % 4]; +// } +// +// return $decoded; +// } +// +// const BINARY_TYPE_BLOB = "\x81"; +// +// +// public static function encode($buffer): string +// { +// $len = strlen($buffer); +// +// $first_byte = self::BINARY_TYPE_BLOB; +// +// if ($len <= 125) { +// $encode_buffer = $first_byte . chr($len) . $buffer; +// } else { +// if ($len <= 65535) { +// $encode_buffer = $first_byte . chr(126) . pack("n", $len) . $buffer; +// } else { +// //pack("xxxN", $len)pack函数只处理2的32次方大小的文件,实际上2的32次方已经4G了。 +// $encode_buffer = $first_byte . chr(127) . pack("xxxxN", $len) . $buffer; +// } +// } +// +// return $encode_buffer; +// } +// +// +// private static function socketConnection($server, $fd, $data) +// { +// $http_protocol = []; +// foreach ($data as $key => $datum) { +// if (empty($datum) || $key == 0) { +// continue; +// } +// [$key, $value] = explode(': ', $datum); +// +// $http_protocol[trim($key)] = trim($value); +// } +// +// $key = base64_encode(sha1($http_protocol['Sec-WebSocket-Key'] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11', TRUE)); +// $headers = [ +// 'HTTP/1.1 101 Switching Protocols', +// 'Upgrade: websocket', +// 'Connection: Upgrade', +// 'Sec-WebSocket-Accept: ' . $key, +// 'Sec-WebSocket-Version: 13', +// ]; +// if (isset($http_protocol['Sec-WebSocket-Protocol'])) { +// $headers[] = 'Sec-WebSocket-Protocol: ' . $http_protocol['Sec-WebSocket-Protocol']; +// } +// $server->send($fd, implode("\r\n", $headers) . "\r\n\r\n"); +// } + + /** * @param Server $server * @param int $fd @@ -52,6 +154,7 @@ class WebSocketServerListener public static function onConnect(Server $server, int $fd) { var_dump(__FILE__ . ':' . __LINE__); + $server->confirm($fd); } @@ -70,6 +173,7 @@ class WebSocketServerListener */ public static function onClose(Server $server, int $fd) { + var_dump($server->getClientInfo($fd)); } } diff --git a/ex/Worker/ServerWorker.php b/ex/Worker/ServerWorker.php new file mode 100644 index 00000000..15666ee6 --- /dev/null +++ b/ex/Worker/ServerWorker.php @@ -0,0 +1,52 @@ + [ + 'server' => [ 'settings' => [ - 'worker_num' => swoole_cpu_num() * 3, + 'worker_num' => swoole_cpu_num(), 'reactor_num' => swoole_cpu_num(), - 'log_file' => APP_PATH . 'storage/request.log', - 'stats_file' => APP_PATH . 'storage/stats.log', 'dispatch_mode' => 3, 'task_worker_num' => 1, 'enable_coroutine' => true, @@ -21,24 +19,41 @@ return [ 'tcp_keepcount' => 2, 'max_wait_time' => 60, 'reload_async' => true, + 'enable_delay_receive' => true, 'tcp_fastopen' => 1, 'tcp_defer_accept' => 1 ], 'events' => [ - BASEServerListener::SERVER_ON_PIPE_MESSAGE => [], - BASEServerListener::SERVER_ON_SHUTDOWN => [], - BASEServerListener::SERVER_ON_TASK => [], - BASEServerListener::SERVER_ON_WORKER_START => [], - BASEServerListener::SERVER_ON_WORKER_ERROR => [], - BASEServerListener::SERVER_ON_WORKER_EXIT => [], - BASEServerListener::SERVER_ON_WORKER_STOP => [], - BASEServerListener::SERVER_ON_MANAGER_START => [], - BASEServerListener::SERVER_ON_MANAGER_STOP => [], - BASEServerListener::SERVER_ON_BEFORE_RELOAD => [], - BASEServerListener::SERVER_ON_AFTER_RELOAD => [], - BASEServerListener::SERVER_ON_START => [], + BASEServerListener::SERVER_ON_PIPE_MESSAGE => [ServerBase::class, 'onPipeMessage'], + BASEServerListener::SERVER_ON_SHUTDOWN => [ServerBase::class, 'onShutdown'], + BASEServerListener::SERVER_ON_WORKER_START => [ServerWorker::class, 'onWorkerStart'], + BASEServerListener::SERVER_ON_WORKER_ERROR => [ServerWorker::class, 'onWorkerError'], + BASEServerListener::SERVER_ON_WORKER_EXIT => [ServerWorker::class, 'onWorkerExit'], + BASEServerListener::SERVER_ON_WORKER_STOP => [ServerWorker::class, 'onWorkerStop'], + BASEServerListener::SERVER_ON_MANAGER_START => [ServerManager::class, 'onManagerStart'], + BASEServerListener::SERVER_ON_MANAGER_STOP => [ServerManager::class, 'onManagerStop'], + BASEServerListener::SERVER_ON_BEFORE_RELOAD => [ServerBase::class, 'onBeforeReload'], + BASEServerListener::SERVER_ON_AFTER_RELOAD => [ServerBase::class, 'onAfterReload'], + BASEServerListener::SERVER_ON_START => [ServerBase::class, 'onStart'], ], - 'handler' => [ + 'ports' => [ + [ + 'type' => BASEServerListener::SERVER_TYPE_HTTP, + 'host' => '0.0.0.0', + 'port' => 9002, + 'mode' => SWOOLE_SOCK_TCP, + 'events' => [ + BASEServerListener::SERVER_ON_REQUEST => [HTTPServerListener::class, 'onRequest'], + ], + 'settings' => [ + 'open_http_protocol' => true, + 'open_http2_protocol' => false, + 'http_parse_cookie' => true, + 'http_compression' => true, + 'http_compression_level' => 5, + 'enable_unsafe_event' => false, + ] + ], [ 'type' => BASEServerListener::SERVER_TYPE_WEBSOCKET, 'host' => '0.0.0.0', @@ -54,38 +69,23 @@ return [ BASEServerListener::SERVER_ON_MESSAGE => [WebSocketServerListener::class, 'onMessage'], BASEServerListener::SERVER_ON_CLOSE => [WebSocketServerListener::class, 'onClose'], ] - ], [ - 'type' => BASEServerListener::SERVER_TYPE_HTTP, - 'host' => '0.0.0.0', - 'port' => 9001, - 'mode' => SWOOLE_SOCK_TCP, - 'events' => [ - BASEServerListener::SERVER_ON_REQUEST => [HTTPServerListener::class, 'onRequest'], - ], - 'settings' => [ - 'open_http_protocol' => true, - 'open_http2_protocol' => false, - 'upload_tmp_dir' => APP_PATH . 'storage', - 'http_parse_cookie' => true, - 'http_compression' => true, - 'http_compression_level' => 5, - 'enable_unsafe_event' => false, - ] - ], [ + ], + [ 'type' => BASEServerListener::SERVER_TYPE_TCP, 'host' => '0.0.0.0', - 'port' => 9001, + 'port' => 9003, 'mode' => SWOOLE_SOCK_TCP, 'events' => [ BASEServerListener::SERVER_ON_CONNECT => [TCPServerListener::class, 'onConnect'], BASEServerListener::SERVER_ON_RECEIVE => [TCPServerListener::class, 'onReceive'], BASEServerListener::SERVER_ON_CLOSE => [TCPServerListener::class, 'onClose'], ] - ], [ + ], + [ 'type' => BASEServerListener::SERVER_TYPE_UDP, 'host' => '0.0.0.0', - 'port' => 9001, - 'mode' => SWOOLE_SOCK_TCP, + 'port' => 9004, + 'mode' => SWOOLE_SOCK_UDP, 'events' => [ BASEServerListener::SERVER_ON_PACKET => [UDPServerListener::class, 'onPacket'], ] diff --git a/ex/test/RelationshipSystemProcess.php b/ex/test/RelationshipSystemProcess.php new file mode 100644 index 00000000..8f738e43 --- /dev/null +++ b/ex/test/RelationshipSystemProcess.php @@ -0,0 +1,50 @@ +pid . ']'; + } + + + /** + * @param mixed $data + */ + public function receive(mixed $data): void + { + + } + + + /** + * + */ + public function onHandler(Process $process): void + { + // TODO: Implement onHandler() method. + } +}