From d90d2eb8445b96e8c89be369275cf97e5f57792b Mon Sep 17 00:00:00 2001 From: whwyy Date: Thu, 26 Feb 2026 14:39:04 +0800 Subject: [PATCH] eee --- composer.json | 24 +++++++ core/QueueLoop.php | 47 ++++++++++++++ core/Struct.php | 18 ++++++ core/Transport.php | 88 +++++++++++++++++++++++++ core/Websocket.php | 155 +++++++++++++++++++++++++++++++++++++++++++++ test.php | 131 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 463 insertions(+) create mode 100644 composer.json create mode 100644 core/QueueLoop.php create mode 100644 core/Struct.php create mode 100644 core/Transport.php create mode 100644 core/Websocket.php create mode 100644 test.php diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..909f42d --- /dev/null +++ b/composer.json @@ -0,0 +1,24 @@ +{ + "name": "game-worker/kiri-coroutine-server", + "description": "kiri coroutine server", + "authors": [ + { + "name": "XiangLin", + "email": "as2252258@163.com" + } + ], + "license": "MIT", + "require": { + "php": ">=8.4", + "ext-json": "*", + "ext-pdo": "*", + "game-worker/kiri-container": "^v1.6" + }, + "autoload": { + "psr-4": { + "Coroutine\\Server\\": "./core" + } + }, + "require-dev": { + } +} diff --git a/core/QueueLoop.php b/core/QueueLoop.php new file mode 100644 index 0000000..28a210a --- /dev/null +++ b/core/QueueLoop.php @@ -0,0 +1,47 @@ +cleanOnline(); + $redis = \Kiri::getDi()->get(Redis::class); + if ($redis instanceof Redis) { + $data = $redis->blPop(\config('redis.key'), 10); + if (!empty($data)) { + [$key, $data] = $data; + $json = json_decode($data, true); + + $this->transport->send($json['userId'], json_encode($json['data'])); + } + } else { + Coroutine::sleep(0.05); + } + $this->loop(); + } + + + /** + * @return void + */ + public function cleanOnline(): void + { + $client = new Client(config('notice.host'), config('notice.port')); + $client->post('/cleanOnline', []); + $client->close(); + } +} diff --git a/core/Struct.php b/core/Struct.php new file mode 100644 index 0000000..d8e9b9c --- /dev/null +++ b/core/Struct.php @@ -0,0 +1,18 @@ +fd = $fd; + $this->user = $user; + $this->ws = $ws; + } +} diff --git a/core/Transport.php b/core/Transport.php new file mode 100644 index 0000000..4603be0 --- /dev/null +++ b/core/Transport.php @@ -0,0 +1,88 @@ + + */ + private array $clients = []; + + + /** + * @param int $fd + * @param Struct $data + * @return void + */ + public function add(int $fd, Struct $data): void + { + if (isset($this->clients[$fd])) { + $this->clients[$fd]->ws->close(); + } + $this->clients[$fd] = $data; + } + + /** + * @param int $fd + * @param mixed $data + * @return void + */ + public function send(int $fd, mixed $data): void + { + if (isset($this->clients[$fd])) { + $this->clients[$fd]->ws->push($data); + } + } + + + /** + * @param int $fd + * @return void + */ + public function close(int $fd): void + { + if (isset($this->clients[$fd])) { + $this->clients[$fd]->ws->close(); + } + } + + + /** + * @param int $fd + * @return void + */ + public function remove(int $fd): void + { + if ($this->clients[$fd]) { + $this->clients[$fd]->ws->close(); + } + unset($this->clients[$fd]); + } + + + /** + * @return array + */ + public function getLists(): array + { + $array = []; + foreach ($this->clients as $fd => $client) { + $array[] = [ + 'userId' => $fd, + 'nickname' => $client->user['nickname'], + ]; + } + return $array; + } + + + /** + * @return int + */ + public function size(): int + { + return count($this->clients); + } +} diff --git a/core/Websocket.php b/core/Websocket.php new file mode 100644 index 0000000..858f1fe --- /dev/null +++ b/core/Websocket.php @@ -0,0 +1,155 @@ +clearAllOnlineUsers(); + $this->queueLoop(); + $this->server = new Server($host, $port, false); + $this->server->handle('/websocket', function (Request $request, Response $ws) { + $this->WebsocketHandler($request, $ws); + }); + $this->server->handle('/online/lists', function (Request $request, Response $ws) { + $data = json_encode($this->transport->getLists(), JSON_UNESCAPED_UNICODE); + $ws->end($data); + }); + echo 'websocket server start at ' . $host . ':' . $port . PHP_EOL; + $this->server->start(); + } + + + /** + * @return void + */ + abstract public function clearAllOnlineUsers(): void; + + + /** + * @return void + */ + private function queueLoop(): void + { + Coroutine::create(function () { + $QueueLoop = \Kiri::getDi()->get(QueueLoop::class); + $QueueLoop->loop(); + }); + } + + /** + * @param Request $request + * @param Response $ws + * @return void + */ + public function WebsocketHandler(Request $request, Response $ws): void + { + try { + $psr7Request = ConstrictRequest::builder($request); + if (!$psr7Request->hasQuery('auth')) { + throw new \Exception('Auth fail.', 401); + } + + if (!$this->onAuthority($psr7Request, $ws)) { + throw new \Exception('Auth fail.', 401); + } + + if ($psr7Request->getAuthority() == null || !$ws->upgrade()) { + throw new \Exception('Auth fail.', 401); + } + $userId = $psr7Request->getAuthority()->getUniqueId(); + + $ws->push($userId); + $user = [ + 'nickname' => $psr7Request->getAuthority()->getNickname(), + 'avatar' => $psr7Request->getAuthority()->getAvatar(), + ]; + + if (!$this->onConnected($psr7Request)) { + throw new \Exception('Auth fail.', 401); + } + $this->transport->add($userId, new Struct($user, $request->fd, $ws)); + + while (true) { + $frame = $ws->recv(); + if ($frame === '' || $frame === false || $frame->data == 'close' || get_class($frame) === CloseFrame::class) { + $ws->close(); + break; + } + + $this->onMessage($this->server, $frame); + } + $this->onDisconnect($this->server, $userId); + } catch (Throwable $throwable) { + echo sprintf("Message: %s \n + File: %s \n + Line: %d \n", $throwable->getMessage(), $throwable->getFile(), $throwable->getLine()); + $ws->setStatusCode(500); + $ws->end($throwable->getMessage()); + $ws->close(); + } finally { + if (isset($userId)) { + $this->transport->remove($userId); + } + } + } + + + /** + * @param ConstrictRequest $request + * @return bool + */ + abstract public function onConnected(ConstrictRequest $request): bool; + + + /** + * @param ConstrictRequest $request + * @param Response $ws + * @return bool + */ + abstract public function onAuthority(ConstrictRequest $request, Response $ws): bool; + + + /** + * @param \Swoole\Server|Server $server + * @param Frame $frame + * @return void + */ + abstract public function onMessage(\Swoole\Server|Server $server, Frame $frame): void; + + /** + * @param \Swoole\WebSocket\Server|Server $server + * @param int $fd + * @return void + */ + abstract public function onDisconnect(\Swoole\WebSocket\Server|Server $server, int $fd): void; +} diff --git a/test.php b/test.php new file mode 100644 index 0000000..ed6af5a --- /dev/null +++ b/test.php @@ -0,0 +1,131 @@ +stateChange($request->getAuthority()->getUniqueId(), 1); + + return true; + } + + + /** + * @param ConstrictRequest $request + * @param Response $ws + * @return bool + */ + public function onAuthority(ConstrictRequest $request, Response $ws): bool + { + // TODO: Implement onAuthority() method. + $auth = $request->get['auth'] ?? ''; + $client = new Client(config('user.host'), config('user.port'), true); + $client->post('/handshake', [ + 'token' => $auth, + 'fd' => $request->fd, + 'ip' => current(swoole_get_local_ip()), + ]); + $client->close(); + if ($client->getStatusCode() != 200) { + return false; + } + + $data = json_decode($client->body, true); + + $request->withAuthority($data); + return true; + } + + /** + * @param \Swoole\Server|Server $server + * @param Frame $frame + * @return void + */ + public function onMessage(\Swoole\Server|Server $server, Frame $frame): void + { + // TODO: Implement onMessage() method. + } + + + /** + * @param Server|\Swoole\WebSocket\Server $server + * @param int $fd + * @return void + */ + public function onDisconnect(Server|\Swoole\WebSocket\Server $server, int $fd): void + { + // TODO: Implement onDisconnect() method. + $this->stateChange($fd, 0); + } + + + + /** + * @return void + */ + public function clearAllOnlineUsers(): void + { + $request = [ + 'query' => [ + 'bool' => [ + 'filter' => [['term' => ['is_online' => 1]]], + ], + ], + 'script' => [ + 'source' => "ctx._source['is_online'] = 0", + 'lang' => 'painless', + ], + ]; + $client = new Client(config('es_host'), +config('es_port')); + $client->setData(json_encode($request, JSON_NUMERIC_CHECK)); + $client->setHeaders(['Content-Type' => 'application/json']); + $client->setMethod('POST'); + $client->execute('/' . config('es_key') . '/_update_by_query'); + $client->close(); + } + + + /** + * @param int $userId + * @param int $status + * @return void + */ + private function stateChange(int $userId, int $status): void + { + $data = [ + 'doc' => [ + 'is_online' => $status, + ], + ]; + $client = new Client(config('es_host'), +config('es_port')); + $client->setData(json_encode($data)); + $client->setHeaders(['Content-Type' => 'application/json']); + $client->setMethod('POST'); + $client->execute('/' . config('es_key') . '/_doc/' . $userId . '/_update'); + $client->close(); + } +} + +run(function () { + $websocket = Kiri::getDi()->get(Test::class); + $websocket->start('0.0.0.0', 9504); +});