clearAllOnlineUsers(); $this->queueLoop(); $this->server = new Server($host, $port, false); $this->server->handle('/websocket', function (Request $request, Response $ws) { $this->WebsocketHandler($request, $ws); }); $this->server->handle('/online/lists', function (Request $request, Response $ws) { $data = json_encode($this->transport->getLists(), JSON_UNESCAPED_UNICODE); $ws->end($data); }); echo 'websocket server start at ' . $host . ':' . $port . PHP_EOL; $this->server->start(); } /** * @return void */ abstract public function clearAllOnlineUsers(): void; /** * @return void */ private function queueLoop(): void { Coroutine::create(function () { $QueueLoop = \Kiri::getDi()->get(QueueLoop::class); $QueueLoop->loop(); }); } /** * @param Request $request * @param Response $ws * @return void */ public function WebsocketHandler(Request $request, Response $ws): void { try { $psr7Request = ConstrictRequest::builder($request); if (!$psr7Request->hasQuery('auth')) { throw new \Exception('Auth fail.', 401); } if (!$this->onAuthority($psr7Request, $ws)) { throw new \Exception('Auth fail.', 401); } if ($psr7Request->getAuthority() == null || !$ws->upgrade()) { throw new \Exception('Auth fail.', 401); } $userId = $psr7Request->getAuthority()->getUniqueId(); $ws->push($userId); $user = [ 'nickname' => $psr7Request->getAuthority()->getNickname(), 'avatar' => $psr7Request->getAuthority()->getAvatar(), ]; if (!$this->onConnected($psr7Request)) { throw new \Exception('Auth fail.', 401); } $this->transport->add($userId, new Struct($user, $request->fd, $ws)); while (true) { $frame = $ws->recv(); if ($frame === '' || $frame === false || $frame->data == 'close' || get_class($frame) === CloseFrame::class) { $ws->close(); break; } $this->onMessage($this->server, $frame); } $this->onDisconnect($this->server, $userId); } catch (Throwable $throwable) { echo sprintf("Message: %s \n File: %s \n Line: %d \n", $throwable->getMessage(), $throwable->getFile(), $throwable->getLine()); $ws->setStatusCode(500); $ws->end($throwable->getMessage()); $ws->close(); } finally { if (isset($userId)) { $this->transport->remove($userId); } } } /** * @param ConstrictRequest $request * @return bool */ abstract public function onConnected(ConstrictRequest $request): bool; /** * @param ConstrictRequest $request * @param Response $ws * @return bool */ abstract public function onAuthority(ConstrictRequest $request, Response $ws): bool; /** * @param \Swoole\Server|Server $server * @param Frame $frame * @return void */ abstract public function onMessage(\Swoole\Server|Server $server, Frame $frame): void; /** * @param \Swoole\WebSocket\Server|Server $server * @param int $fd * @return void */ abstract public function onDisconnect(\Swoole\WebSocket\Server|Server $server, int $fd): void; }