config = $config; $this->configureRuntime(); $this->onStart(); $server = new Server($config->host, $config->port, false); $this->configureServer($server); $server->handle($config->normalizePath($config->websocketPath), fn(Request $request, Response $ws) => $this->handler($request, $ws)); if ($config->exposeOnlineLists) { $server->handle($config->normalizePath($config->onlineListPath), fn(Request $request, Response $ws) => $this->getLists($request, $ws)); } echo 'websocket server start at ' . $config->host . ':' . $config->port . PHP_EOL; $server->start(); } /** * @param Request $request * @param Response $ws * @return void */ public function getLists(Request $request, Response $ws): void { try { if (!$this->config->exposeOnlineLists) { throw new \RuntimeException('Not found.', 404); } $psr7Request = ConstrictRequest::builder($request); if (!$this->authorizeOnlineListRequest($psr7Request)) { throw new \RuntimeException('Unauthorized.', 401); } $payload = json_encode($this->transport->getLists(), JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES); if ($payload === false) { $payload = '[]'; } @$ws->header('Content-Type', 'application/json; charset=utf-8'); $ws->end($payload); } catch (Throwable $throwable) { $this->throwable($ws, $throwable, false); } } /** * @param Request $request * @param Response $ws * @return void */ public function handler(Request $request, Response $ws): void { $upgraded = false; $userId = null; $fd = null; try { $psr7Request = $this->authorizeConnection($request); if (!$ws->upgrade()) { throw new \RuntimeException('Connection upgrade to websocket failed.', 500); } $upgraded = true; $userId = $psr7Request->getAuthority()->getUniqueId(); $connection = new Struct($psr7Request->getAuthority(), $request, $ws); $fd = $connection->fd; $this->transport->add($userId, $connection); defer(function () use ($userId, $fd) { $this->transport->remove($userId, $fd); }); @$ws->push((string)$userId); $this->onBeforeMessageLoop(); while (true) { $frame = $ws->recv(); if ($frame === '' || $frame === false || $frame === null) { break; } if ($frame instanceof CloseFrame) { break; } $payload = (string)($frame->data ?? ''); if ($payload === 'close') { break; } $this->assertPayloadSize($payload); $this->transport->touch($userId, $fd); $this->onMessage($psr7Request, $payload); } } catch (Throwable $throwable) { $this->throwable($ws, $throwable, $upgraded); } finally { if ($userId !== null) { $this->onDisconnect($userId); } } } /** * @return void */ abstract public function onBeforeMessageLoop(): void; /** * @param Request $request * @return ConstrictRequest */ protected function authorizeConnection(Request $request): RequestInterface { /** @var RequestInterface $psr7Request */ $psr7Request = Context::set(RequestInterface::class, ConstrictRequest::builder($request)); if (!$psr7Request->hasQuery($this->config->authKey)) { throw new \RuntimeException('Missing auth parameter.', 400); } if (!$this->onConnected($psr7Request) || $psr7Request->getAuthority() === null) { throw new \RuntimeException('Unauthorized.', 401); } return $psr7Request; } /** * @param ConstrictRequest $request * @return bool */ protected function authorizeOnlineListRequest(RequestInterface $request): bool { if (!$request->hasQuery($this->config->authKey)) { return false; } return $this->canViewOnlineLists($request) && $request->getAuthority() !== null; } /** * @param ConstrictRequest $request * @return bool */ protected function canViewOnlineLists(ConstrictRequest $request): bool { return $this->onConnected($request); } /** * @return void */ private function configureRuntime(): void { if ($this->config->maxCoroutine > 0 && class_exists(Coroutine::class)) { Coroutine::set([ 'max_coroutine' => $this->config->maxCoroutine, ]); } } /** * @param Server $server * @return void */ private function configureServer(Server $server): void { if (!method_exists($server, 'set')) { return; } $settings = []; if ($this->config->maxPackageSize > 0) { $settings['package_max_length'] = $this->config->maxPackageSize; } if ($this->config->maxFrameSize > 0) { $settings['websocket_max_frame_size'] = $this->config->maxFrameSize; } if ($settings !== []) { $server->set($settings); } } /** * @param string $payload * @return void */ private function assertPayloadSize(string $payload): void { $size = strlen($payload); if ($this->config->maxPackageSize > 0 && $size > $this->config->maxPackageSize) { throw new \RuntimeException('Payload too large.', 413); } if ($this->config->maxFrameSize > 0 && $size > $this->config->maxFrameSize) { throw new \RuntimeException('Frame too large.', 413); } } /** * @param Response $ws * @param Throwable $throwable * @param bool $upgraded * @return void */ private function throwable(Response $ws, Throwable $throwable, bool $upgraded): void { $status = $throwable->getCode(); if (!in_array($status, [400, 401, 403, 404, 409, 413, 500], true)) { $status = 500; } $message = match ($status) { 400 => 'Bad Request', 401 => 'Unauthorized', 403 => 'Forbidden', 404 => 'Not Found', 409 => 'Conflict', 413 => 'Payload Too Large', default => 'Internal Server Error', }; error_log(sprintf( '[%s] %s in %s:%d', static::class, $throwable->getMessage(), $throwable->getFile(), $throwable->getLine(), )); if ($upgraded) { @$ws->close(); return; } @$ws->setStatusCode($status); $ws->end($message); } /** * @param RequestInterface $request * @return bool */ abstract public function onConnected(RequestInterface $request): bool; /** * @return void */ abstract public function onStart(): void; /** * @param RequestInterface $psr7Request * @param string $frame * @return void */ abstract public function onMessage(RequestInterface $psr7Request, string $frame): void; /** * @param int $userId * @return void */ abstract public function onDisconnect(int $userId): void; }