diff --git a/kiri-websocket-server/Sender.php b/kiri-websocket-server/Sender.php index dc805e3f..71da0b03 100644 --- a/kiri-websocket-server/Sender.php +++ b/kiri-websocket-server/Sender.php @@ -4,12 +4,14 @@ namespace Kiri\Websocket; use Kiri; use Swoole\{Coroutine\Http\Server as AliasServer, WebSocket\Server}; +use Psr\Container\ContainerExceptionInterface; +use Psr\Container\NotFoundExceptionInterface; /** * */ -class Sender implements WebSocketInterface +class Sender extends Kiri\Abstracts\Component implements WebSocketInterface { @@ -19,6 +21,20 @@ class Sender implements WebSocketInterface private AliasServer|Server|null $server = null; + private FdCollector $collector; + + + /** + * @return void + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + */ + public function init() + { + $this->collector = $this->getContainer()->get(FdCollector::class); + } + + /** * @param AliasServer|Server $server */ @@ -43,9 +59,8 @@ class Sender implements WebSocketInterface if ($this->server instanceof Server) { return $this->server->push($fd, $data, $opcode, $flags); } - $collector = Kiri::getContainer()->get(FdCollector::class); - $response = $collector->get($fd); + $response = $this->collector->get($fd); if (!empty($response)) { return $response->push($data, $opcode, $flags); } diff --git a/kiri-websocket-server/Server.php b/kiri-websocket-server/Server.php index 28bbf190..3e7d5f20 100644 --- a/kiri-websocket-server/Server.php +++ b/kiri-websocket-server/Server.php @@ -10,6 +10,7 @@ use Kiri\Server\Contract\OnCloseInterface; use Kiri\Server\Contract\OnHandshakeInterface; use Kiri\Server\Contract\OnMessageInterface; use Kiri\Server\Contract\OnOpenInterface; +use Kiri\Server\SwooleServerInterface; use Psr\Container\ContainerExceptionInterface; use Psr\Container\NotFoundExceptionInterface; use Swoole\Http\Request; @@ -35,14 +36,26 @@ class Server extends AbstractServer public Sender $sender; + public FdCollector $collector; + + /** * @throws ContainerExceptionInterface * @throws NotFoundExceptionInterface */ public function init() { - $this->router = $this->getContainer()->get(DataGrip::class)->get('ws'); + $container = $this->getContainer(); + + $this->router = $container->get(DataGrip::class)->get('ws'); $handler = $this->router->find('/', 'GET'); + + $this->collector = $container->get(FdCollector::class); + + $this->sender = $container->get(Sender::class); + if ($container->has(SwooleServerInterface::class)) { + $this->sender->setServer($container->get(SwooleServerInterface::class)); + } if (is_int($handler) || is_null($handler)) { return; } @@ -52,55 +65,19 @@ class Server extends AbstractServer /** * @param int $fd - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface */ public function onClose(int $fd): void { - $collector = $this->getContainer()->get(Sender::class); - -// $fds = $this->getContainer()->get(FdCollector::class); - - if (!$collector->isEstablished($fd)) { + $this->collector->remove($fd); + if (!$this->sender->isEstablished($fd)) { return; } -// $fds->remove($fd); - if ($this->callback instanceof OnCloseInterface) { $this->callback->onClose($fd); } } - /** - * @param Request $request - * @param Response $response - * @return void - * @throws Exception - */ - protected function setWebSocketProtocol(Request $request, Response $response) - { - $secWebSocketKey = $request->header['sec-websocket-key']; - $patten = '#^[+/0-9A-Za-z]{21}[AQgw]==$#'; - if (preg_match($patten, $secWebSocketKey) === 0 || strlen(base64_decode($secWebSocketKey)) !== 16) { - throw new Exception('protocol error.', 500); - } - $key = base64_encode(sha1($request->header['sec-websocket-key'] . self::SHA1_KEY, true)); - $headers = [ - 'Upgrade' => 'websocket', - 'Connection' => 'Upgrade', - 'Sec-Websocket-Accept' => $key, - 'Sec-Websocket-Version' => '13', - ]; - if (isset($request->header['sec-websocket-protocol'])) { - $headers['Sec-Websocket-Protocol'] = $request->header['sec-websocket-protocol']; - } - foreach ($headers as $key => $val) { - $response->header($key, $val); - } - } - - /** * @param Request $request * @param Response $response @@ -108,32 +85,35 @@ class Server extends AbstractServer public function onHandshake(Request $request, Response $response): void { try { - $this->setWebSocketProtocol($request, $response); + $secWebSocketKey = $request->header['sec-websocket-key']; + $patten = '#^[+/0-9A-Za-z]{21}[AQgw]==$#'; + if (preg_match($patten, $secWebSocketKey) === 0 || strlen(base64_decode($secWebSocketKey)) !== 16) { + throw new Exception('protocol error.', 500); + } + $key = base64_encode(sha1($request->header['sec-websocket-key'] . self::SHA1_KEY, true)); + $headers = [ + 'Upgrade' => 'websocket', + 'Connection' => 'Upgrade', + 'Sec-Websocket-Accept' => $key, + 'Sec-Websocket-Version' => '13', + ]; + if (isset($request->header['sec-websocket-protocol'])) { + $headers['Sec-Websocket-Protocol'] = $request->header['sec-websocket-protocol']; + } + foreach ($headers as $key => $val) { + $response->header($key, $val); + } + if ($this->callback instanceof OnHandshakeInterface) { $this->callback->onHandshake($request, $response); } else { $response->setStatusCode(101, 'connection success.'); $response->end(); - } -// if ($this->server instanceof \Swoole\Coroutine\Http\Server) { -// $response->upgrade(); -// $this->deferOpen($request); -// while (true) { -// $receive = $response->recv(); -// if ($receive === '' || $receive instanceof CloseFrame) { -// $response->close(); -// if ($this->callback instanceof OnCloseInterface) { -// $this->callback->onClose($this->server, $response->fd); -// } -// break; -// } -// $this->callback->onMessage($this->server, $receive); -// } -// } else { -// $this->deferOpen($request); -// } - if ($response->isWritable()) { - $this->deferOpen($request); + + if ($this->callback instanceof OnOpenInterface) { + $this->callback->onOpen($request); + } + } } catch (\Throwable $throwable) { $response->status(4000 + $throwable->getCode(), $throwable->getMessage()); @@ -142,20 +122,17 @@ class Server extends AbstractServer } - private function deferOpen($request) - { - if ($this->callback instanceof OnOpenInterface) { - $this->callback->onOpen($request); - } - } - - /** * @param Frame $frame */ public function onMessage(Frame $frame): void { - if ($this->callback instanceof OnMessageInterface) { + if ($frame->opcode == 0x08) { + $this->collector->remove($frame->fd); + } else { + if (!($this->callback instanceof OnMessageInterface)) { + return; + } $this->callback->onMessage($frame); } }