modify plugin name
This commit is contained in:
@@ -4,12 +4,14 @@ namespace Kiri\Websocket;
|
||||
|
||||
use Kiri;
|
||||
use Swoole\{Coroutine\Http\Server as AliasServer, WebSocket\Server};
|
||||
use Psr\Container\ContainerExceptionInterface;
|
||||
use Psr\Container\NotFoundExceptionInterface;
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
class Sender implements WebSocketInterface
|
||||
class Sender extends Kiri\Abstracts\Component implements WebSocketInterface
|
||||
{
|
||||
|
||||
|
||||
@@ -19,6 +21,20 @@ class Sender implements WebSocketInterface
|
||||
private AliasServer|Server|null $server = null;
|
||||
|
||||
|
||||
private FdCollector $collector;
|
||||
|
||||
|
||||
/**
|
||||
* @return void
|
||||
* @throws ContainerExceptionInterface
|
||||
* @throws NotFoundExceptionInterface
|
||||
*/
|
||||
public function init()
|
||||
{
|
||||
$this->collector = $this->getContainer()->get(FdCollector::class);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param AliasServer|Server $server
|
||||
*/
|
||||
@@ -43,9 +59,8 @@ class Sender implements WebSocketInterface
|
||||
if ($this->server instanceof Server) {
|
||||
return $this->server->push($fd, $data, $opcode, $flags);
|
||||
}
|
||||
$collector = Kiri::getContainer()->get(FdCollector::class);
|
||||
|
||||
$response = $collector->get($fd);
|
||||
$response = $this->collector->get($fd);
|
||||
if (!empty($response)) {
|
||||
return $response->push($data, $opcode, $flags);
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ use Kiri\Server\Contract\OnCloseInterface;
|
||||
use Kiri\Server\Contract\OnHandshakeInterface;
|
||||
use Kiri\Server\Contract\OnMessageInterface;
|
||||
use Kiri\Server\Contract\OnOpenInterface;
|
||||
use Kiri\Server\SwooleServerInterface;
|
||||
use Psr\Container\ContainerExceptionInterface;
|
||||
use Psr\Container\NotFoundExceptionInterface;
|
||||
use Swoole\Http\Request;
|
||||
@@ -35,14 +36,26 @@ class Server extends AbstractServer
|
||||
public Sender $sender;
|
||||
|
||||
|
||||
public FdCollector $collector;
|
||||
|
||||
|
||||
/**
|
||||
* @throws ContainerExceptionInterface
|
||||
* @throws NotFoundExceptionInterface
|
||||
*/
|
||||
public function init()
|
||||
{
|
||||
$this->router = $this->getContainer()->get(DataGrip::class)->get('ws');
|
||||
$container = $this->getContainer();
|
||||
|
||||
$this->router = $container->get(DataGrip::class)->get('ws');
|
||||
$handler = $this->router->find('/', 'GET');
|
||||
|
||||
$this->collector = $container->get(FdCollector::class);
|
||||
|
||||
$this->sender = $container->get(Sender::class);
|
||||
if ($container->has(SwooleServerInterface::class)) {
|
||||
$this->sender->setServer($container->get(SwooleServerInterface::class));
|
||||
}
|
||||
if (is_int($handler) || is_null($handler)) {
|
||||
return;
|
||||
}
|
||||
@@ -52,55 +65,19 @@ class Server extends AbstractServer
|
||||
|
||||
/**
|
||||
* @param int $fd
|
||||
* @throws ContainerExceptionInterface
|
||||
* @throws NotFoundExceptionInterface
|
||||
*/
|
||||
public function onClose(int $fd): void
|
||||
{
|
||||
$collector = $this->getContainer()->get(Sender::class);
|
||||
|
||||
// $fds = $this->getContainer()->get(FdCollector::class);
|
||||
|
||||
if (!$collector->isEstablished($fd)) {
|
||||
$this->collector->remove($fd);
|
||||
if (!$this->sender->isEstablished($fd)) {
|
||||
return;
|
||||
}
|
||||
// $fds->remove($fd);
|
||||
|
||||
if ($this->callback instanceof OnCloseInterface) {
|
||||
$this->callback->onClose($fd);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param Request $request
|
||||
* @param Response $response
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
protected function setWebSocketProtocol(Request $request, Response $response)
|
||||
{
|
||||
$secWebSocketKey = $request->header['sec-websocket-key'];
|
||||
$patten = '#^[+/0-9A-Za-z]{21}[AQgw]==$#';
|
||||
if (preg_match($patten, $secWebSocketKey) === 0 || strlen(base64_decode($secWebSocketKey)) !== 16) {
|
||||
throw new Exception('protocol error.', 500);
|
||||
}
|
||||
$key = base64_encode(sha1($request->header['sec-websocket-key'] . self::SHA1_KEY, true));
|
||||
$headers = [
|
||||
'Upgrade' => 'websocket',
|
||||
'Connection' => 'Upgrade',
|
||||
'Sec-Websocket-Accept' => $key,
|
||||
'Sec-Websocket-Version' => '13',
|
||||
];
|
||||
if (isset($request->header['sec-websocket-protocol'])) {
|
||||
$headers['Sec-Websocket-Protocol'] = $request->header['sec-websocket-protocol'];
|
||||
}
|
||||
foreach ($headers as $key => $val) {
|
||||
$response->header($key, $val);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param Request $request
|
||||
* @param Response $response
|
||||
@@ -108,32 +85,35 @@ class Server extends AbstractServer
|
||||
public function onHandshake(Request $request, Response $response): void
|
||||
{
|
||||
try {
|
||||
$this->setWebSocketProtocol($request, $response);
|
||||
$secWebSocketKey = $request->header['sec-websocket-key'];
|
||||
$patten = '#^[+/0-9A-Za-z]{21}[AQgw]==$#';
|
||||
if (preg_match($patten, $secWebSocketKey) === 0 || strlen(base64_decode($secWebSocketKey)) !== 16) {
|
||||
throw new Exception('protocol error.', 500);
|
||||
}
|
||||
$key = base64_encode(sha1($request->header['sec-websocket-key'] . self::SHA1_KEY, true));
|
||||
$headers = [
|
||||
'Upgrade' => 'websocket',
|
||||
'Connection' => 'Upgrade',
|
||||
'Sec-Websocket-Accept' => $key,
|
||||
'Sec-Websocket-Version' => '13',
|
||||
];
|
||||
if (isset($request->header['sec-websocket-protocol'])) {
|
||||
$headers['Sec-Websocket-Protocol'] = $request->header['sec-websocket-protocol'];
|
||||
}
|
||||
foreach ($headers as $key => $val) {
|
||||
$response->header($key, $val);
|
||||
}
|
||||
|
||||
if ($this->callback instanceof OnHandshakeInterface) {
|
||||
$this->callback->onHandshake($request, $response);
|
||||
} else {
|
||||
$response->setStatusCode(101, 'connection success.');
|
||||
$response->end();
|
||||
}
|
||||
// if ($this->server instanceof \Swoole\Coroutine\Http\Server) {
|
||||
// $response->upgrade();
|
||||
// $this->deferOpen($request);
|
||||
// while (true) {
|
||||
// $receive = $response->recv();
|
||||
// if ($receive === '' || $receive instanceof CloseFrame) {
|
||||
// $response->close();
|
||||
// if ($this->callback instanceof OnCloseInterface) {
|
||||
// $this->callback->onClose($this->server, $response->fd);
|
||||
// }
|
||||
// break;
|
||||
// }
|
||||
// $this->callback->onMessage($this->server, $receive);
|
||||
// }
|
||||
// } else {
|
||||
// $this->deferOpen($request);
|
||||
// }
|
||||
if ($response->isWritable()) {
|
||||
$this->deferOpen($request);
|
||||
|
||||
if ($this->callback instanceof OnOpenInterface) {
|
||||
$this->callback->onOpen($request);
|
||||
}
|
||||
|
||||
}
|
||||
} catch (\Throwable $throwable) {
|
||||
$response->status(4000 + $throwable->getCode(), $throwable->getMessage());
|
||||
@@ -142,20 +122,17 @@ class Server extends AbstractServer
|
||||
}
|
||||
|
||||
|
||||
private function deferOpen($request)
|
||||
{
|
||||
if ($this->callback instanceof OnOpenInterface) {
|
||||
$this->callback->onOpen($request);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param Frame $frame
|
||||
*/
|
||||
public function onMessage(Frame $frame): void
|
||||
{
|
||||
if ($this->callback instanceof OnMessageInterface) {
|
||||
if ($frame->opcode == 0x08) {
|
||||
$this->collector->remove($frame->fd);
|
||||
} else {
|
||||
if (!($this->callback instanceof OnMessageInterface)) {
|
||||
return;
|
||||
}
|
||||
$this->callback->onMessage($frame);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user