This commit is contained in:
2026-04-04 10:29:39 +08:00
parent 4e999574a9
commit b77321ff4b
6 changed files with 232 additions and 213 deletions
+74 -77
View File
@@ -4,17 +4,13 @@ namespace Coroutine\Server;
use Kiri\Di\Inject\Container;
use Kiri\Router\Constrict\ConstrictRequest;
use Kiri\Server\Contract\OnDisconnectInterface;
use Kiri\Server\Contract\OnMessageInterface;
use Swoole\Coroutine;
use Swoole\Coroutine\Http\Server;
use Swoole\Http\Request;
use Swoole\Http\Response;
use Swoole\WebSocket\CloseFrame;
use Swoole\WebSocket\Frame;
use Throwable;
abstract class Websocket implements OnMessageInterface, OnDisconnectInterface
abstract class Websocket
{
#[Container(Transport::class)]
@@ -22,50 +18,28 @@ abstract class Websocket implements OnMessageInterface, OnDisconnectInterface
/**
* @var Server
* @var Config
*/
private Server $server;
/**
* @return Server
*/
public function getServer(): Server
{
return $this->server;
}
/**
* @param string $host
* @param int $port
* @return void
*/
public function start(string $host, int $port): void
{
$this->clearAllOnlineUsers();
$this->queueLoop();
$this->server = new Server($host, $port, false);
$this->server->handle('/websocket', function (Request $request, Response $ws) {
$this->WebsocketHandler($request, $ws);
});
$this->server->handle('/online/lists', function (Request $request, Response $ws) {
$data = json_encode($this->transport->getLists(), JSON_UNESCAPED_UNICODE);
$ws->end($data);
});
echo 'websocket server start at ' . $host . ':' . $port . PHP_EOL;
$this->server->start();
public Config $config {
get {
return $this->config;
}
}
/**
* @param Config $config
* @return void
*/
private function queueLoop(): void
public function start(Config $config): void
{
Coroutine::create(function () {
$QueueLoop = \Kiri::getDi()->get(QueueLoop::class);
$QueueLoop->loop();
});
$this->config = $config;
$this->onStart();
$server = new Server($config->host, $config->port, false);
$server->handle('/websocket', fn(Request $request, Response $ws) => $this->handler($request, $ws));
$server->handle('/online/lists', fn(Request $request, Response $ws) => $this->getLists($request, $ws));
echo 'websocket server start at ' . $config->host . ':' . $config->port . PHP_EOL;
$server->start();
}
/**
@@ -73,33 +47,48 @@ abstract class Websocket implements OnMessageInterface, OnDisconnectInterface
* @param Response $ws
* @return void
*/
public function WebsocketHandler(Request $request, Response $ws): void
public function getLists(Request $request, Response $ws): void
{
$data = json_encode($this->transport->getLists(), JSON_UNESCAPED_UNICODE);
$ws->end($data);
}
/**
* @param Request $request
* @param Response $ws
* @return void
*/
public function handler(Request $request, Response $ws): void
{
try {
$psr7Request = ConstrictRequest::builder($request);
if (!$psr7Request->hasQuery('auth')) {
throw new \Exception('Auth fail.', 401);
if (!$psr7Request->hasQuery($this->config->authKey)) {
throw new \Exception('Params required.');
}
if (!$this->onAuthority($psr7Request, $ws)) {
throw new \Exception('Auth fail.', 401);
if (!$this->onConnected($psr7Request) || $psr7Request->getAuthority() == null) {
throw new \Exception('Token error, unable to obtain user.');
}
if ($psr7Request->getAuthority() == null || !$ws->upgrade()) {
throw new \Exception('Auth fail.', 401);
if ($this->transport->has($psr7Request->getAuthority()->getUniqueId())) {
$this->transport->remove($psr7Request->getAuthority()->getUniqueId());
}
if (!$ws->upgrade()) {
throw new \Exception('Connection upgrade to websocket failed.');
}
$userId = $psr7Request->getAuthority()->getUniqueId();
$ws->push($userId);
$user = [
'nickname' => $psr7Request->getAuthority()->getNickname(),
'avatar' => $psr7Request->getAuthority()->getAvatar(),
];
defer(function () use ($userId) {
$this->transport->remove($userId);
});
if (!$this->onConnected($psr7Request)) {
throw new \Exception('Auth fail.', 401);
}
$this->transport->add($userId, new Struct($user, $request->fd, $ws));
$this->transport->add($userId, new Struct($psr7Request->getAuthority(), $request, $ws));
$this->onBeforeMessageLoop();
while (true) {
$frame = $ws->recv();
@@ -108,28 +97,39 @@ abstract class Websocket implements OnMessageInterface, OnDisconnectInterface
break;
}
$this->onMessage($this->server, $frame);
$this->onMessage($psr7Request, $frame->data);
}
$this->onDisconnect($this->server, $userId);
} catch (Throwable $throwable) {
echo sprintf("Message: %s \n
File: %s \n
Line: %d \n", $throwable->getMessage(), $throwable->getFile(), $throwable->getLine());
$ws->setStatusCode(500);
$ws->end($throwable->getMessage());
$ws->close();
$this->throwable($ws, $throwable);
} finally {
if (isset($userId)) {
$this->transport->remove($userId);
$this->onDisconnect($userId);
}
}
}
abstract public function onBeforeMessageLoop(): void;
/**
* @param $ws
* @param Throwable $throwable
* @return void
*/
abstract public function clearAllOnlineUsers(): void;
private function throwable($ws, Throwable $throwable): void
{
$error = sprintf("Message: %s \n
File: %s \n
Line: %d \n", $throwable->getMessage(), $throwable->getFile(), $throwable->getLine());
$ws->setStatusCode(500);
$ws->end($throwable->getMessage());
$ws->close();
echo $error;
}
/**
@@ -140,24 +140,21 @@ abstract class Websocket implements OnMessageInterface, OnDisconnectInterface
/**
* @param ConstrictRequest $request
* @param Response $ws
* @return bool
* @return void
*/
abstract public function onAuthority(ConstrictRequest $request, Response $ws): bool;
abstract public function onStart(): void;
/**
* @param \Swoole\Server|Server $server
* @param Frame $frame
* @param ConstrictRequest $psr7Request
* @param string $frame
* @return void
*/
abstract public function onMessage(\Swoole\Server|Server $server, Frame $frame): void;
abstract public function onMessage(ConstrictRequest $psr7Request, string $frame): void;
/**
* @param \Swoole\WebSocket\Server|Server $server
* @param int $fd
* @param int $userId
* @return void
*/
abstract public function onDisconnect(\Swoole\WebSocket\Server|Server $server, int $fd): void;
abstract public function onDisconnect(int $userId): void;
}