161 lines
3.5 KiB
PHP
161 lines
3.5 KiB
PHP
<?php
|
|
|
|
namespace Coroutine\Server;
|
|
|
|
use Kiri\Di\Inject\Container;
|
|
use Kiri\Router\Constrict\ConstrictRequest;
|
|
use Swoole\Coroutine\Http\Server;
|
|
use Swoole\Http\Request;
|
|
use Swoole\Http\Response;
|
|
use Swoole\WebSocket\CloseFrame;
|
|
use Throwable;
|
|
|
|
abstract class Websocket
|
|
{
|
|
|
|
#[Container(Transport::class)]
|
|
public Transport $transport;
|
|
|
|
|
|
/**
|
|
* @var Config
|
|
*/
|
|
public Config $config {
|
|
get {
|
|
return $this->config;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* @param Config $config
|
|
* @return void
|
|
*/
|
|
public function start(Config $config): void
|
|
{
|
|
$this->config = $config;
|
|
$this->onStart();
|
|
$server = new Server($config->host, $config->port, false);
|
|
$server->handle('/websocket', fn(Request $request, Response $ws) => $this->handler($request, $ws));
|
|
$server->handle('/online/lists', fn(Request $request, Response $ws) => $this->getLists($request, $ws));
|
|
echo 'websocket server start at ' . $config->host . ':' . $config->port . PHP_EOL;
|
|
$server->start();
|
|
}
|
|
|
|
/**
|
|
* @param Request $request
|
|
* @param Response $ws
|
|
* @return void
|
|
*/
|
|
public function getLists(Request $request, Response $ws): void
|
|
{
|
|
$data = json_encode($this->transport->getLists(), JSON_UNESCAPED_UNICODE);
|
|
$ws->end($data);
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
* @param Request $request
|
|
* @param Response $ws
|
|
* @return void
|
|
*/
|
|
public function handler(Request $request, Response $ws): void
|
|
{
|
|
try {
|
|
$psr7Request = ConstrictRequest::builder($request);
|
|
if (!$psr7Request->hasQuery($this->config->authKey)) {
|
|
throw new \Exception('Params required.');
|
|
}
|
|
|
|
if (!$this->onConnected($psr7Request) || $psr7Request->getAuthority() == null) {
|
|
throw new \Exception('Token error, unable to obtain user.');
|
|
}
|
|
|
|
if ($this->transport->has($psr7Request->getAuthority()->getUniqueId())) {
|
|
$this->transport->remove($psr7Request->getAuthority()->getUniqueId());
|
|
}
|
|
|
|
if (!$ws->upgrade()) {
|
|
throw new \Exception('Connection upgrade to websocket failed.');
|
|
}
|
|
|
|
$userId = $psr7Request->getAuthority()->getUniqueId();
|
|
$ws->push($userId);
|
|
defer(function () use ($userId) {
|
|
$this->transport->remove($userId);
|
|
});
|
|
|
|
$this->transport->add($userId, new Struct($psr7Request->getAuthority(), $request, $ws));
|
|
|
|
$this->onBeforeMessageLoop();
|
|
|
|
while (true) {
|
|
$frame = $ws->recv();
|
|
if ($frame === '' || $frame === false || $frame->data == 'close' || get_class($frame) === CloseFrame::class) {
|
|
$ws->close();
|
|
break;
|
|
}
|
|
|
|
$this->onMessage($psr7Request, $frame->data);
|
|
}
|
|
} catch (Throwable $throwable) {
|
|
$this->throwable($ws, $throwable);
|
|
} finally {
|
|
if (isset($userId)) {
|
|
$this->onDisconnect($userId);
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
abstract public function onBeforeMessageLoop(): void;
|
|
|
|
|
|
/**
|
|
* @param $ws
|
|
* @param Throwable $throwable
|
|
* @return void
|
|
*/
|
|
private function throwable($ws, Throwable $throwable): void
|
|
{
|
|
$error = sprintf("Message: %s \n
|
|
File: %s \n
|
|
Line: %d \n", $throwable->getMessage(), $throwable->getFile(), $throwable->getLine());
|
|
|
|
$ws->setStatusCode(500);
|
|
$ws->end($throwable->getMessage());
|
|
$ws->close();
|
|
|
|
echo $error;
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
* @param ConstrictRequest $request
|
|
* @return bool
|
|
*/
|
|
abstract public function onConnected(ConstrictRequest $request): bool;
|
|
|
|
|
|
/**
|
|
* @return void
|
|
*/
|
|
abstract public function onStart(): void;
|
|
|
|
|
|
/**
|
|
* @param ConstrictRequest $psr7Request
|
|
* @param string $frame
|
|
* @return void
|
|
*/
|
|
abstract public function onMessage(ConstrictRequest $psr7Request, string $frame): void;
|
|
|
|
/**
|
|
* @param int $userId
|
|
* @return void
|
|
*/
|
|
abstract public function onDisconnect(int $userId): void;
|
|
}
|