From 567ceb69c0625f9ebf4ddfa02f526bc8ca65c8a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=91=E6=9E=97?= Date: Mon, 14 Feb 2022 10:42:27 +0800 Subject: [PATCH] modify plugin name --- Coroutine/Http.php | 215 ++++++++++++++++++--------------------------- Server.php | 4 +- 2 files changed, 89 insertions(+), 130 deletions(-) diff --git a/Coroutine/Http.php b/Coroutine/Http.php index 5b24b89..96e77a6 100644 --- a/Coroutine/Http.php +++ b/Coroutine/Http.php @@ -3,7 +3,6 @@ namespace Kiri\Server\Coroutine; use Kiri\Abstracts\Component; -use Kiri\Events\EventDispatch; use Kiri\Exception\ConfigException; use Kiri\Server\Constant; use Kiri\Server\ProcessManager; @@ -79,146 +78,106 @@ class Http extends Component } - public function start(array $config) - { - $this->configs = $this->sortService($config['ports']); - run(function () use ($config) { - $event = \Kiri::getDi()->get(EventDispatch::class); - - $this->startTaskWorker($config); - - foreach ($config as $value) { - $value = $this->resolveCallback($value); - if ($value['type'] == Constant::SERVER_TYPE_HTTP) { - $onRequest = $value['events'][Constant::REQUEST] ?? null; - if (is_null($onRequest)) { - throw new \Exception('Server callback con\'t null.'); - } - Coroutine::create(function () use ($value, $event, $onRequest) { - $this->bindHttpService($value, $onRequest); - }); - } else if ($value['type'] == Constant::SERVER_TYPE_WEBSOCKET) { - $handshake = $value['events'][Constant::HANDSHAKE] ?? null; - - $open = $value['events'][Constant::OPEN] ?? null; - - $close = $value['events'][Constant::CLOSE] ?? null; - $message = $value['events'][Constant::MESSAGE] ?? null; - if (is_null($message)) { - throw new \Exception('Server callback con\'t null.'); - } - - Coroutine::create(function () use ($value, $handshake, $open, $close, $message) { - $this->WebSocketService($value, $handshake, $open, $close, $message); - }); - } else { - $message = $value['events'][Constant::RECEIVE] ?? null; - if (is_null($message)) { - throw new \Exception('Server callback con\'t null.'); - } - $conn = $value['events'][Constant::CONNECT] ?? null; - $close = $value['events'][Constant::CLOSE] ?? null; - - Coroutine::create(function () use ($value, $message, $conn, $close) { - $this->bindTcpService($value, $message, $conn, $close); - }); - } - } - }); - } - - /** - * @param $value - * @param $onRequest - * @return void - * @throws \Exception - */ - protected function bindHttpService($value, $onRequest) - { - $server = new Coroutine\Http\Server($value['host'], $value['port'], null, true); - - $this->servers[$value['port']] = $server; - - $server->handle('/', function (Request $request, Response $response) use ($onRequest) { - call_user_func($onRequest, $request, $response); - }); - $server->start(); - - $this->bindHttpService($value, $onRequest); - } - - - /** - * @param $value - * @param $message - * @param $conn - * @param $close + * @param array $config + * @param $daemon * @return void + * @throws ContainerExceptionInterface * @throws Exception + * @throws NotFoundExceptionInterface */ - protected function bindTcpService($value, $message, $conn, $close) + public function initBaseServer(array $config, $daemon) { - $server = new Coroutine\Server($value['host'], $value['port'], null, true); - - $this->servers[$value['port']] = $server; - - $server->handle(function (Coroutine\Server\Connection $connection) use ($message, $conn, $close) { - if (!is_null($conn)) { - call_user_func($conn, $connection); - } - while (true) { - $data = $connection->recv(1024); - if ($data === '' || $data === false) { - defer(function () use ($close, $connection) { - call_user_func($close, $connection); - }); - $connection->close(); - break; + $this->configs = $config; + foreach ($config as $value) { + $value = $this->resolveCallback($value); + if ($value['type'] == Constant::SERVER_TYPE_HTTP) { + $onRequest = $value['events'][Constant::REQUEST] ?? null; + if (is_null($onRequest)) { + throw new \Exception('Server callback con\'t null.'); } - call_user_func($message, $data); - } - }); - $server->start(); + $server = new Coroutine\Http\Server($value['host'], $value['port'], null, true); - $this->bindTcpService($value, $message, $conn, $close); + $this->servers[$value['port']] = $server; + + $server->handle('/', function (Request $request, Response $response) use ($onRequest) { + call_user_func($onRequest, $request, $response); + }); + } else if ($value['type'] == Constant::SERVER_TYPE_WEBSOCKET) { + $handshake = $value['events'][Constant::HANDSHAKE] ?? null; + + $open = $value['events'][Constant::OPEN] ?? null; + + $close = $value['events'][Constant::CLOSE] ?? null; + $message = $value['events'][Constant::MESSAGE] ?? null; + if (is_null($message)) { + throw new \Exception('Server callback con\'t null.'); + } + + $server = new Coroutine\Http\Server($value['host'], $value['port'], null, true); + + $sender = $this->getContainer()->get(Sender::class); + $sender->setServer($server); + + $this->servers[$value['port']] = $server; + + $server->handle('/', function (Request $request, Response $response) use ($handshake, $open, $close, $message) { + if (is_null($handshake)) { + $response->upgrade(); + } else { + call_user_func($handshake, $request, $response); + } + if ($response->isWritable() && is_callable($open)) { + call_user_func($open, $response); + } + while (($data = $response->recv()) instanceof Frame) { + call_user_func($message, $data); + } + call_user_func($close, $response->fd); + }); + } else { + $message = $value['events'][Constant::RECEIVE] ?? null; + if (is_null($message)) { + throw new \Exception('Server callback con\'t null.'); + } + $conn = $value['events'][Constant::CONNECT] ?? null; + $close = $value['events'][Constant::CLOSE] ?? null; + + $server = new Coroutine\Server($value['host'], $value['port'], null, true); + + $this->servers[$value['port']] = $server; + + $server->handle(function (Coroutine\Server\Connection $connection) use ($message, $conn, $close) { + if (!is_null($conn)) { + call_user_func($conn, $connection); + } + while (true) { + $data = $connection->recv(1024); + if ($data === '' || $data === false) { + defer(function () use ($close, $connection) { + call_user_func($close, $connection); + }); + $connection->close(); + break; + } + call_user_func($message, $data); + } + }); + } + } } - /** - * @param $value - * @param $handshake - * @param $open - * @param $close - * @param $message - * @return void - */ - private function WebSocketService($value, $handshake, $open, $close, $message) + public function start() { - $server = new Coroutine\Http\Server($value['host'], $value['port'], null, true); - - $sender = $this->getContainer()->get(Sender::class); - $sender->setServer($server); - - $this->servers[$value['port']] = $server; - - $server->handle('/', function (Request $request, Response $response) use ($handshake, $open, $close, $message) { - if (is_null($handshake)) { - $response->upgrade(); - } else { - call_user_func($handshake, $request, $response); + run(function () { + $this->startTaskWorker($this->configs); + foreach ($this->servers as $value) { + Coroutine::create(function () use ($value) { + $value->start(); + }); } - if ($response->isWritable() && is_callable($open)) { - call_user_func($open, $response); - } - while (($data = $response->recv()) instanceof Frame) { - call_user_func($message, $data); - } - call_user_func($close, $response->fd); }); - $server->start(); - - $this->WebSocketService($value, $handshake, $open, $close, $message); } diff --git a/Server.php b/Server.php index fc60603..dd4e380 100644 --- a/Server.php +++ b/Server.php @@ -42,7 +42,7 @@ class Server extends HttpService public State $state; - public ServerManager $manager; + public Kiri\Server\Coroutine\Http $manager; /** @@ -50,7 +50,7 @@ class Server extends HttpService */ public function init() { - $this->manager = Kiri::getContainer()->get(ServerManager::class); + $this->manager = Kiri::getContainer()->get(Kiri\Server\Coroutine\Http::class); $enable_coroutine = Config::get('servers.settings.enable_coroutine', false); Config::set('servers.settings.enable_coroutine', true); if ($enable_coroutine != true) {