From 4ae3001fe34ba84ff954f201056888a96de6eaf7 Mon Sep 17 00:00:00 2001 From: whwyy Date: Fri, 17 Apr 2026 14:41:13 +0800 Subject: [PATCH] eee --- core/Config.php | 74 +++----------- core/Struct.php | 27 ++++- core/Transport.php | 146 +++++++++++++++------------ core/Websocket.php | 242 +++++++++++++++++++++++++++++---------------- 4 files changed, 277 insertions(+), 212 deletions(-) diff --git a/core/Config.php b/core/Config.php index 82e30ec..617c2c2 100644 --- a/core/Config.php +++ b/core/Config.php @@ -4,76 +4,30 @@ namespace Coroutine\Server; class Config { + public string $host = '0.0.0.0'; - public int $port { - get => $this->port; + public int $port = 9501; - set(int $value) { - $this->port = $value; - } - } + public string $authKey = 'access_token'; + public int $writeTimeout = 0; - public string $host { - get => $this->host; + public int $maxCoroutine = 100000; + public int $readTimeout = 0; - set(string $value) { - $this->host = $value; - } - } + public int $maxPackageSize = 2097152; + public int $maxFrameSize = 2097152; - public string $authKey { - get => $this->authKey; + public bool $exposeOnlineLists = false; - set(string $value) { - $this->authKey = $value; - } - } + public string $websocketPath = '/websocket'; + public string $onlineListPath = '/online/lists'; - public int $writeTimeout { - get => $this->writeTimeout; - - set(int $value) { - $this->writeTimeout = $value; - } - } - - - public int $maxCoroutine { - get => $this->maxCoroutine; - - set(int $value) { - $this->maxCoroutine = $value; - } - } - - - public int $readTimeout { - get => $this->readTimeout; - - set(int $value) { - $this->readTimeout = $value; - } - } - - - public int $maxPackageSize { - get => $this->maxPackageSize; - - set(int $value) { - $this->maxPackageSize = $value; - } - } - - - public int $maxFrameSize { - get => $this->maxFrameSize; - - set(int $value) { - $this->maxFrameSize = $value; - } + public function normalizePath(string $path): string + { + return '/' . ltrim($path, '/'); } } diff --git a/core/Struct.php b/core/Struct.php index 97ee379..02d7eb1 100644 --- a/core/Struct.php +++ b/core/Struct.php @@ -8,14 +8,33 @@ use Swoole\Http\Response; class Struct { + public int $userId; + + public int $fd; + + public int $connectedAt; + + public int $lastSeenAt; + public AuthorizationInterface $user; - public Request $request; - public Response $ws; + + public Request $request; + + public Response $ws; public function __construct(AuthorizationInterface $user, Request $request, Response $ws) { + $this->user = $user; $this->request = $request; - $this->user = $user; - $this->ws = $ws; + $this->ws = $ws; + $this->userId = $user->getUniqueId(); + $this->fd = (int)($request->fd ?? 0); + $this->connectedAt = time(); + $this->lastSeenAt = $this->connectedAt; + } + + public function touch(): void + { + $this->lastSeenAt = time(); } } diff --git a/core/Transport.php b/core/Transport.php index 904d136..68a96fd 100644 --- a/core/Transport.php +++ b/core/Transport.php @@ -4,126 +4,148 @@ namespace Coroutine\Server; class Transport { - /** - * @var array + * @var array */ private array $clients = []; - /** - * @param int $userId - * @param Struct $data - * @return void + * @var array */ - public function add(int $userId, Struct $data): void + private array $userIdByFd = []; + + public function add(int $userId, Struct $data): ?Struct { - if (isset($this->clients[$userId])) { - $this->clients[$userId]->ws->close(); - } + $previous = $this->clients[$userId] ?? null; + $this->clients[$userId] = $data; + if ($data->fd > 0) { + $this->userIdByFd[$data->fd] = $userId; + } + + if ($previous && $previous->fd !== $data->fd) { + unset($this->userIdByFd[$previous->fd]); + @$previous->ws->close(); + } + + return $previous; } - /** - * @param int $userId - * @param mixed $data - * @return void - */ public function sendUserId(int $userId, mixed $data): void { - if (isset($this->clients[$userId])) { - $this->clients[$userId]->ws->push($data); + $struct = $this->clients[$userId] ?? null; + if ($struct === null) { + return; } + + $struct->touch(); + @$struct->ws->push($this->normalizePayload($data)); } - /** - * @param int $fd - * @param mixed $data - * @return void - */ public function sendFd(int $fd, mixed $data): void { $struct = $this->getClientId($fd); - $struct?->ws->push($data); + if ($struct === null) { + return; + } + + $struct->touch(); + @$struct->ws->push($this->normalizePayload($data)); } - - /** - * @param int $userId - * @return void - */ public function close(int $userId): void { - if (isset($this->clients[$userId])) { - $this->clients[$userId]->ws->close(); + $struct = $this->clients[$userId] ?? null; + if ($struct === null) { + return; } + + @$struct->ws->close(); } - - /** - * @param int $userId - * @return void - */ - public function remove(int $userId): void + public function remove(int $userId, ?int $fd = null): void { - if ($this->has($userId)) { - $this->clients[$userId]?->ws->close(); + $struct = $this->clients[$userId] ?? null; + if ($struct === null) { + return; } + + if ($fd !== null && $struct->fd !== $fd) { + return; + } + unset($this->clients[$userId]); + unset($this->userIdByFd[$struct->fd]); + + @$struct->ws->close(); } - - /** - * @param int $fd - * @return Struct|null - */ public function getClientId(int $fd): ?Struct { - return array_find($this->clients, fn($client) => $client->fd == $fd); + $userId = $this->userIdByFd[$fd] ?? null; + if ($userId === null) { + return null; + } + + return $this->clients[$userId] ?? null; } - - /** - * @param int $userId - * @return Struct|null - */ public function getUserId(int $userId): ?Struct { return $this->clients[$userId] ?? null; } - - /** - * @param int $userId - * @return bool - */ public function has(int $userId): bool { return isset($this->clients[$userId]); } + public function touch(int $userId, ?int $fd = null): void + { + $struct = $this->clients[$userId] ?? null; + if ($struct === null) { + return; + } + + if ($fd !== null && $struct->fd !== $fd) { + return; + } + + $struct->touch(); + } - /** - * @return array - */ public function getLists(): array { $array = []; foreach ($this->clients as $userId => $client) { $array[] = [ - 'userId' => $userId, + 'userId' => $userId, + 'fd' => $client->fd, 'nickname' => $client->user->getNickname(), + 'connectedAt' => $client->connectedAt, + 'lastSeenAt' => $client->lastSeenAt, ]; } + return $array; } - - /** - * @return int - */ public function size(): int { return count($this->clients); } + + private function normalizePayload(mixed $data): string + { + if (is_string($data)) { + return $data; + } + + if (is_scalar($data) || $data === null) { + return (string)$data; + } + + $payload = json_encode($data, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES); + return $payload === false ? '' : $payload; + } } diff --git a/core/Websocket.php b/core/Websocket.php index 37a0e0d..c1a341a 100644 --- a/core/Websocket.php +++ b/core/Websocket.php @@ -12,149 +12,219 @@ use Throwable; abstract class Websocket { - #[Container(Transport::class)] public Transport $transport; + public Config $config; - /** - * @var Config - */ - public Config $config { - get { - return $this->config; - } - } - - - /** - * @param Config $config - * @return void - */ public function start(Config $config): void { $this->config = $config; + $this->configureRuntime(); $this->onStart(); + $server = new Server($config->host, $config->port, false); - $server->handle('/websocket', fn(Request $request, Response $ws) => $this->handler($request, $ws)); - $server->handle('/online/lists', fn(Request $request, Response $ws) => $this->getLists($request, $ws)); + $this->configureServer($server); + + $server->handle($config->normalizePath($config->websocketPath), fn(Request $request, Response $ws) => $this->handler($request, $ws)); + if ($config->exposeOnlineLists) { + $server->handle($config->normalizePath($config->onlineListPath), fn(Request $request, Response $ws) => $this->getLists($request, $ws)); + } + echo 'websocket server start at ' . $config->host . ':' . $config->port . PHP_EOL; $server->start(); } - /** - * @param Request $request - * @param Response $ws - * @return void - */ public function getLists(Request $request, Response $ws): void { - $data = json_encode($this->transport->getLists(), JSON_UNESCAPED_UNICODE); - $ws->end($data); + try { + if (!$this->config->exposeOnlineLists) { + throw new \RuntimeException('Not found.', 404); + } + + $psr7Request = ConstrictRequest::builder($request); + if (!$this->authorizeOnlineListRequest($psr7Request)) { + throw new \RuntimeException('Unauthorized.', 401); + } + + $payload = json_encode($this->transport->getLists(), JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES); + if ($payload === false) { + $payload = '[]'; + } + + @$ws->header('Content-Type', 'application/json; charset=utf-8'); + $ws->end($payload); + } catch (Throwable $throwable) { + $this->throwable($ws, $throwable, false); + } } - - - /** - * @param Request $request - * @param Response $ws - * @return void - */ public function handler(Request $request, Response $ws): void { + $upgraded = false; + $userId = null; + $fd = null; + try { - $psr7Request = ConstrictRequest::builder($request); - if (!$psr7Request->hasQuery($this->config->authKey)) { - throw new \Exception('Params required.'); - } - - if (!$this->onConnected($psr7Request) || $psr7Request->getAuthority() == null) { - throw new \Exception('Token error, unable to obtain user.'); - } - - if ($this->transport->has($psr7Request->getAuthority()->getUniqueId())) { - $this->transport->remove($psr7Request->getAuthority()->getUniqueId()); - } + $psr7Request = $this->authorizeConnection($request); if (!$ws->upgrade()) { - throw new \Exception('Connection upgrade to websocket failed.'); + throw new \RuntimeException('Connection upgrade to websocket failed.', 500); } + $upgraded = true; $userId = $psr7Request->getAuthority()->getUniqueId(); - $ws->push($userId); - defer(function () use ($userId) { - $this->transport->remove($userId); + + $connection = new Struct($psr7Request->getAuthority(), $request, $ws); + $fd = $connection->fd; + + $this->transport->add($userId, $connection); + defer(function () use ($userId, $fd) { + $this->transport->remove($userId, $fd); }); - $this->transport->add($userId, new Struct($psr7Request->getAuthority(), $request, $ws)); + @$ws->push((string)$userId); $this->onBeforeMessageLoop(); while (true) { $frame = $ws->recv(); - if ($frame === '' || $frame === false || $frame->data == 'close' || get_class($frame) === CloseFrame::class) { - $ws->close(); + if ($frame === '' || $frame === false || $frame === null) { break; } - $this->onMessage($psr7Request, $frame->data); + if ($frame instanceof CloseFrame) { + break; + } + + $payload = (string)($frame->data ?? ''); + if ($payload === 'close') { + break; + } + + $this->assertPayloadSize($payload); + $this->transport->touch($userId, $fd); + $this->onMessage($psr7Request, $payload); } } catch (Throwable $throwable) { - $this->throwable($ws, $throwable); + $this->throwable($ws, $throwable, $upgraded); } finally { - if (isset($userId)) { + if ($userId !== null) { $this->onDisconnect($userId); } } } - abstract public function onBeforeMessageLoop(): void; - - /** - * @param $ws - * @param Throwable $throwable - * @return void - */ - private function throwable($ws, Throwable $throwable): void + protected function authorizeConnection(Request $request): ConstrictRequest { - $error = sprintf("Message: %s \n - File: %s \n - Line: %d \n", $throwable->getMessage(), $throwable->getFile(), $throwable->getLine()); + $psr7Request = ConstrictRequest::builder($request); + if (!$psr7Request->hasQuery($this->config->authKey)) { + throw new \RuntimeException('Missing auth parameter.', 400); + } - $ws->setStatusCode(500); - $ws->end($throwable->getMessage()); - $ws->close(); + if (!$this->onConnected($psr7Request) || $psr7Request->getAuthority() === null) { + throw new \RuntimeException('Unauthorized.', 401); + } - echo $error; + return $psr7Request; } + protected function authorizeOnlineListRequest(ConstrictRequest $request): bool + { + if (!$request->hasQuery($this->config->authKey)) { + return false; + } + return $this->canViewOnlineLists($request) && $request->getAuthority() !== null; + } + + protected function canViewOnlineLists(ConstrictRequest $request): bool + { + return $this->onConnected($request); + } + + private function configureRuntime(): void + { + if ($this->config->maxCoroutine > 0 && class_exists(\Swoole\Coroutine::class)) { + \Swoole\Coroutine::set([ + 'max_coroutine' => $this->config->maxCoroutine, + ]); + } + } + + private function configureServer(Server $server): void + { + if (!method_exists($server, 'set')) { + return; + } + + $settings = []; + if ($this->config->maxPackageSize > 0) { + $settings['package_max_length'] = $this->config->maxPackageSize; + } + if ($this->config->maxFrameSize > 0) { + $settings['websocket_max_frame_size'] = $this->config->maxFrameSize; + } + + if ($settings !== []) { + $server->set($settings); + } + } + + private function assertPayloadSize(string $payload): void + { + $size = strlen($payload); + + if ($this->config->maxPackageSize > 0 && $size > $this->config->maxPackageSize) { + throw new \RuntimeException('Payload too large.', 413); + } + + if ($this->config->maxFrameSize > 0 && $size > $this->config->maxFrameSize) { + throw new \RuntimeException('Frame too large.', 413); + } + } + + private function throwable(Response $ws, Throwable $throwable, bool $upgraded): void + { + $status = $throwable->getCode(); + if (!in_array($status, [400, 401, 403, 404, 409, 413, 500], true)) { + $status = 500; + } + + $message = match ($status) { + 400 => 'Bad Request', + 401 => 'Unauthorized', + 403 => 'Forbidden', + 404 => 'Not Found', + 409 => 'Conflict', + 413 => 'Payload Too Large', + default => 'Internal Server Error', + }; + + error_log(sprintf( + '[%s] %s in %s:%d', + static::class, + $throwable->getMessage(), + $throwable->getFile(), + $throwable->getLine(), + )); + + if ($upgraded) { + @$ws->close(); + return; + } + + @$ws->setStatusCode($status); + $ws->end($message); + } - /** - * @param ConstrictRequest $request - * @return bool - */ abstract public function onConnected(ConstrictRequest $request): bool; - - /** - * @return void - */ abstract public function onStart(): void; - - /** - * @param ConstrictRequest $psr7Request - * @param string $frame - * @return void - */ abstract public function onMessage(ConstrictRequest $psr7Request, string $frame): void; - /** - * @param int $userId - * @return void - */ abstract public function onDisconnect(int $userId): void; }