From b77321ff4bdce3a2ee1850a5fa24d75602ecec83 Mon Sep 17 00:00:00 2001 From: whwyy Date: Sat, 4 Apr 2026 10:29:39 +0800 Subject: [PATCH] eee --- core/Config.php | 79 ++++++++++++++++++++++++ core/QueueLoop.php | 47 -------------- core/Struct.php | 17 ++--- core/Transport.php | 38 ++++++++++-- core/Websocket.php | 151 ++++++++++++++++++++++----------------------- test.php | 113 +++++++++++---------------------- 6 files changed, 232 insertions(+), 213 deletions(-) create mode 100644 core/Config.php delete mode 100644 core/QueueLoop.php diff --git a/core/Config.php b/core/Config.php new file mode 100644 index 0000000..82e30ec --- /dev/null +++ b/core/Config.php @@ -0,0 +1,79 @@ + $this->port; + + set(int $value) { + $this->port = $value; + } + } + + + public string $host { + get => $this->host; + + + set(string $value) { + $this->host = $value; + } + } + + + public string $authKey { + get => $this->authKey; + + set(string $value) { + $this->authKey = $value; + } + } + + + public int $writeTimeout { + get => $this->writeTimeout; + + set(int $value) { + $this->writeTimeout = $value; + } + } + + + public int $maxCoroutine { + get => $this->maxCoroutine; + + set(int $value) { + $this->maxCoroutine = $value; + } + } + + + public int $readTimeout { + get => $this->readTimeout; + + set(int $value) { + $this->readTimeout = $value; + } + } + + + public int $maxPackageSize { + get => $this->maxPackageSize; + + set(int $value) { + $this->maxPackageSize = $value; + } + } + + + public int $maxFrameSize { + get => $this->maxFrameSize; + + set(int $value) { + $this->maxFrameSize = $value; + } + } +} diff --git a/core/QueueLoop.php b/core/QueueLoop.php deleted file mode 100644 index 28a210a..0000000 --- a/core/QueueLoop.php +++ /dev/null @@ -1,47 +0,0 @@ -cleanOnline(); - $redis = \Kiri::getDi()->get(Redis::class); - if ($redis instanceof Redis) { - $data = $redis->blPop(\config('redis.key'), 10); - if (!empty($data)) { - [$key, $data] = $data; - $json = json_decode($data, true); - - $this->transport->send($json['userId'], json_encode($json['data'])); - } - } else { - Coroutine::sleep(0.05); - } - $this->loop(); - } - - - /** - * @return void - */ - public function cleanOnline(): void - { - $client = new Client(config('notice.host'), config('notice.port')); - $client->post('/cleanOnline', []); - $client->close(); - } -} diff --git a/core/Struct.php b/core/Struct.php index d8e9b9c..97ee379 100644 --- a/core/Struct.php +++ b/core/Struct.php @@ -1,18 +1,21 @@ fd = $fd; - $this->user = $user; - $this->ws = $ws; + $this->request = $request; + $this->user = $user; + $this->ws = $ws; } } diff --git a/core/Transport.php b/core/Transport.php index 4603be0..595fd7d 100644 --- a/core/Transport.php +++ b/core/Transport.php @@ -50,18 +50,48 @@ class Transport /** - * @param int $fd + * @param int $fd * @return void */ public function remove(int $fd): void { - if ($this->clients[$fd]) { - $this->clients[$fd]->ws->close(); + if ($this->has($fd)) { + $this->clients[$fd]?->ws->close(); } unset($this->clients[$fd]); } + /** + * @param int $fd + * @return Struct|null + */ + public function getClientId(int $fd): ?Struct + { + return array_find($this->clients, fn($client) => $client->fd == $fd); + } + + + /** + * @param int $fd + * @return Struct|null + */ + public function getUserId(int $fd): ?Struct + { + return $this->clients[$fd] ?? null; + } + + + /** + * @param int $fd + * @return bool + */ + public function has(int $fd): bool + { + return isset($this->clients[$fd]); + } + + /** * @return array */ @@ -71,7 +101,7 @@ class Transport foreach ($this->clients as $fd => $client) { $array[] = [ 'userId' => $fd, - 'nickname' => $client->user['nickname'], + 'nickname' => $client->user->getNickname(), ]; } return $array; diff --git a/core/Websocket.php b/core/Websocket.php index 2753e03..37a0e0d 100644 --- a/core/Websocket.php +++ b/core/Websocket.php @@ -4,17 +4,13 @@ namespace Coroutine\Server; use Kiri\Di\Inject\Container; use Kiri\Router\Constrict\ConstrictRequest; -use Kiri\Server\Contract\OnDisconnectInterface; -use Kiri\Server\Contract\OnMessageInterface; -use Swoole\Coroutine; use Swoole\Coroutine\Http\Server; use Swoole\Http\Request; use Swoole\Http\Response; use Swoole\WebSocket\CloseFrame; -use Swoole\WebSocket\Frame; use Throwable; -abstract class Websocket implements OnMessageInterface, OnDisconnectInterface +abstract class Websocket { #[Container(Transport::class)] @@ -22,50 +18,28 @@ abstract class Websocket implements OnMessageInterface, OnDisconnectInterface /** - * @var Server + * @var Config */ - private Server $server; - - - /** - * @return Server - */ - public function getServer(): Server - { - return $this->server; - } - - /** - * @param string $host - * @param int $port - * @return void - */ - public function start(string $host, int $port): void - { - $this->clearAllOnlineUsers(); - $this->queueLoop(); - $this->server = new Server($host, $port, false); - $this->server->handle('/websocket', function (Request $request, Response $ws) { - $this->WebsocketHandler($request, $ws); - }); - $this->server->handle('/online/lists', function (Request $request, Response $ws) { - $data = json_encode($this->transport->getLists(), JSON_UNESCAPED_UNICODE); - $ws->end($data); - }); - echo 'websocket server start at ' . $host . ':' . $port . PHP_EOL; - $this->server->start(); + public Config $config { + get { + return $this->config; + } } /** + * @param Config $config * @return void */ - private function queueLoop(): void + public function start(Config $config): void { - Coroutine::create(function () { - $QueueLoop = \Kiri::getDi()->get(QueueLoop::class); - $QueueLoop->loop(); - }); + $this->config = $config; + $this->onStart(); + $server = new Server($config->host, $config->port, false); + $server->handle('/websocket', fn(Request $request, Response $ws) => $this->handler($request, $ws)); + $server->handle('/online/lists', fn(Request $request, Response $ws) => $this->getLists($request, $ws)); + echo 'websocket server start at ' . $config->host . ':' . $config->port . PHP_EOL; + $server->start(); } /** @@ -73,33 +47,48 @@ abstract class Websocket implements OnMessageInterface, OnDisconnectInterface * @param Response $ws * @return void */ - public function WebsocketHandler(Request $request, Response $ws): void + public function getLists(Request $request, Response $ws): void + { + $data = json_encode($this->transport->getLists(), JSON_UNESCAPED_UNICODE); + $ws->end($data); + } + + + + /** + * @param Request $request + * @param Response $ws + * @return void + */ + public function handler(Request $request, Response $ws): void { try { $psr7Request = ConstrictRequest::builder($request); - if (!$psr7Request->hasQuery('auth')) { - throw new \Exception('Auth fail.', 401); + if (!$psr7Request->hasQuery($this->config->authKey)) { + throw new \Exception('Params required.'); } - if (!$this->onAuthority($psr7Request, $ws)) { - throw new \Exception('Auth fail.', 401); + if (!$this->onConnected($psr7Request) || $psr7Request->getAuthority() == null) { + throw new \Exception('Token error, unable to obtain user.'); } - if ($psr7Request->getAuthority() == null || !$ws->upgrade()) { - throw new \Exception('Auth fail.', 401); + if ($this->transport->has($psr7Request->getAuthority()->getUniqueId())) { + $this->transport->remove($psr7Request->getAuthority()->getUniqueId()); } + + if (!$ws->upgrade()) { + throw new \Exception('Connection upgrade to websocket failed.'); + } + $userId = $psr7Request->getAuthority()->getUniqueId(); - $ws->push($userId); - $user = [ - 'nickname' => $psr7Request->getAuthority()->getNickname(), - 'avatar' => $psr7Request->getAuthority()->getAvatar(), - ]; + defer(function () use ($userId) { + $this->transport->remove($userId); + }); - if (!$this->onConnected($psr7Request)) { - throw new \Exception('Auth fail.', 401); - } - $this->transport->add($userId, new Struct($user, $request->fd, $ws)); + $this->transport->add($userId, new Struct($psr7Request->getAuthority(), $request, $ws)); + + $this->onBeforeMessageLoop(); while (true) { $frame = $ws->recv(); @@ -108,28 +97,39 @@ abstract class Websocket implements OnMessageInterface, OnDisconnectInterface break; } - $this->onMessage($this->server, $frame); + $this->onMessage($psr7Request, $frame->data); } - $this->onDisconnect($this->server, $userId); } catch (Throwable $throwable) { - echo sprintf("Message: %s \n - File: %s \n - Line: %d \n", $throwable->getMessage(), $throwable->getFile(), $throwable->getLine()); - $ws->setStatusCode(500); - $ws->end($throwable->getMessage()); - $ws->close(); + $this->throwable($ws, $throwable); } finally { if (isset($userId)) { - $this->transport->remove($userId); + $this->onDisconnect($userId); } } } + abstract public function onBeforeMessageLoop(): void; + + /** + * @param $ws + * @param Throwable $throwable * @return void */ - abstract public function clearAllOnlineUsers(): void; + private function throwable($ws, Throwable $throwable): void + { + $error = sprintf("Message: %s \n + File: %s \n + Line: %d \n", $throwable->getMessage(), $throwable->getFile(), $throwable->getLine()); + + $ws->setStatusCode(500); + $ws->end($throwable->getMessage()); + $ws->close(); + + echo $error; + } + /** @@ -140,24 +140,21 @@ abstract class Websocket implements OnMessageInterface, OnDisconnectInterface /** - * @param ConstrictRequest $request - * @param Response $ws - * @return bool + * @return void */ - abstract public function onAuthority(ConstrictRequest $request, Response $ws): bool; + abstract public function onStart(): void; /** - * @param \Swoole\Server|Server $server - * @param Frame $frame + * @param ConstrictRequest $psr7Request + * @param string $frame * @return void */ - abstract public function onMessage(\Swoole\Server|Server $server, Frame $frame): void; + abstract public function onMessage(ConstrictRequest $psr7Request, string $frame): void; /** - * @param \Swoole\WebSocket\Server|Server $server - * @param int $fd + * @param int $userId * @return void */ - abstract public function onDisconnect(\Swoole\WebSocket\Server|Server $server, int $fd): void; + abstract public function onDisconnect(int $userId): void; } diff --git a/test.php b/test.php index d08122e..174b7d7 100644 --- a/test.php +++ b/test.php @@ -1,12 +1,9 @@ withAuthority($authorization);方法将认证后的用户与链接进行关联 + * * @param ConstrictRequest $request * @return bool */ public function onConnected(ConstrictRequest $request): bool { // TODO: Implement onConnected() method. - $this->stateChange($request->getAuthority()->getUniqueId(), 1); - return true; + + // $request->withAuthority($authorization); + + return false; } /** - * @param ConstrictRequest $request - * @param Response $ws - * @return bool - */ - public function onAuthority(ConstrictRequest $request, Response $ws): bool - { - // TODO: Implement onAuthority() method. - $auth = $request->get['auth'] ?? ''; - $client = new Client(config('user.host'), config('user.port'), true); - $client->post('/handshake', [ - 'token' => $auth, - 'fd' => $request->fd, - 'ip' => current(swoole_get_local_ip()), - ]); - $client->close(); - if ($client->getStatusCode() != 200) { - return false; - } else { - $request->withAuthority(json_decode($client->body)); - return true; - } - } - - /** - * @param \Swoole\Server|Server $server - * @param Frame $frame + * + * 客户端发送的消息回调 + * + * @param ConstrictRequest $psr7Request + * @param string $frame * @return void */ - public function onMessage(\Swoole\Server|Server $server, Frame $frame): void + public function onMessage(ConstrictRequest $psr7Request, string $frame): void { // TODO: Implement onMessage() method. } /** - * @param Server|\Swoole\WebSocket\Server $server - * @param int $fd + * 消息循环之前调用的函数(此时链接已加入管理中) * @return void + * */ - public function onDisconnect(Server|\Swoole\WebSocket\Server $server, int $fd): void + public function onBeforeMessageLoop(): void { - // TODO: Implement onDisconnect() method. - $this->stateChange($fd, 0); - } - - - - /** - * @return void - */ - public function clearAllOnlineUsers(): void - { - $request = [ - 'query' => [ - 'bool' => [ - 'filter' => [['term' => ['is_online' => 1]]], - ], - ], - 'script' => [ - 'source' => "ctx._source['is_online'] = 0", - 'lang' => 'painless', - ], - ]; - $client = new Client(config('es_host'), +config('es_port')); - $client->setData(json_encode($request, JSON_NUMERIC_CHECK)); - $client->setHeaders(['Content-Type' => 'application/json']); - $client->setMethod('POST'); - $client->execute('/' . config('es_key') . '/_update_by_query'); - $client->close(); + // TODO: Implement onBeforeMessageLoop() method. } /** * @param int $userId - * @param int $status * @return void */ - private function stateChange(int $userId, int $status): void + public function onDisconnect(int $userId): void { - $data = [ - 'doc' => [ - 'is_online' => $status, - ], - ]; - $client = new Client(config('es_host'), +config('es_port')); - $client->setData(json_encode($data)); - $client->setHeaders(['Content-Type' => 'application/json']); - $client->setMethod('POST'); - $client->execute('/' . config('es_key') . '/_doc/' . $userId . '/_update'); - $client->close(); + // TODO: Implement onDisconnect() method. } + + + /** + * @return void + */ + public function onStart(): void + { + } + } run(function () { + $config = new Config; + $config->port = 9501; + $config->host = '0.0.0.0'; + $config->authKey = 'access_token'; + $websocket = Kiri::getDi()->get(Test::class); - $websocket->start('0.0.0.0', 9504); + $websocket->start($config); });