From 4daad7d111e5fb24883cfcc259058c205ba64adb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=91=E6=9E=97?= Date: Thu, 16 Jun 2022 17:38:23 +0800 Subject: [PATCH] modify plugin name --- .phpstorm.meta.php | 2 +- Kiri.php | 23 +- function.php | 49 +++- kiri-coroutine-server/Command.php | 75 +++++ kiri-coroutine-server/Server.php | 271 ++++++++++++++++++ kiri-engine/Abstracts/BaseApplication.php | 2 +- kiri-engine/Abstracts/Logger.php | 10 +- kiri-engine/Reload/Inotify.php | 212 ++++++++++++++ kiri-engine/Reload/Scaner.php | 168 +++++++++++ kiri-task/Annotation/AsynchronousTask.php | 8 +- kiri-task/OnServerTask.php | 6 +- kiri-task/OnTaskInterface.php | 2 +- .../{TaskManager.php => TaskContainer.php} | 28 +- .../{AsyncTaskExecute.php => TaskExecute.php} | 53 +++- .../Contract/OnCloseInterface.php | 22 ++ .../Contract/OnHandshakeInterface.php | 21 ++ .../Contract/OnMessageInterface.php | 22 ++ .../Contract/OnOpenInterface.php | 23 ++ kiri-websocket-server/Dispatcher.php | 8 + kiri-websocket-server/Server.php | 219 ++++++++++++++ kiri-websocket-server/TestSocketServer.php | 38 +++ p.php | 14 +- 22 files changed, 1206 insertions(+), 70 deletions(-) create mode 100644 kiri-coroutine-server/Command.php create mode 100644 kiri-coroutine-server/Server.php create mode 100644 kiri-engine/Reload/Inotify.php create mode 100644 kiri-engine/Reload/Scaner.php rename kiri-task/{TaskManager.php => TaskContainer.php} (60%) rename kiri-task/{AsyncTaskExecute.php => TaskExecute.php} (51%) create mode 100644 kiri-websocket-server/Contract/OnCloseInterface.php create mode 100644 kiri-websocket-server/Contract/OnHandshakeInterface.php create mode 100644 kiri-websocket-server/Contract/OnMessageInterface.php create mode 100644 kiri-websocket-server/Contract/OnOpenInterface.php create mode 100644 kiri-websocket-server/Dispatcher.php create mode 100644 kiri-websocket-server/Server.php create mode 100644 kiri-websocket-server/TestSocketServer.php diff --git a/.phpstorm.meta.php b/.phpstorm.meta.php index e1102b5a..3872ac74 100644 --- a/.phpstorm.meta.php +++ b/.phpstorm.meta.php @@ -4,7 +4,7 @@ namespace PHPSTORM_META { // Reflect use Kiri\Di\Container; - use Psr\Container\ContainerInterface; + use Kiri\Di\ContainerInterface; override(ContainerInterface::get(0), map('@')); override(Container::get(0), map('@')); diff --git a/Kiri.php b/Kiri.php index 9ff4c39e..def907cf 100644 --- a/Kiri.php +++ b/Kiri.php @@ -14,7 +14,8 @@ use Kiri\Application; use Kiri\Core\Json; use Kiri\Di\Container; use Kiri\Environmental; -use Psr\Container\ContainerInterface; +use Kiri\Di\ContainerInterface; +use Kiri\Exception\ConfigException; use Swoole\Coroutine; use Swoole\Process; use Swoole\WebSocket\Server; @@ -149,7 +150,6 @@ class Kiri } - /** * @param $className * @param array $construct @@ -172,6 +172,24 @@ class Kiri } + /** + * @param $prefix + * @return void + * @throws ConfigException + */ + public static function setProcessName($prefix): void + { + if (Kiri::getPlatform()->isMac()) { + return; + } + $name = '[' . Config::get('id', 'system-service') . ']'; + if (!empty($prefix)) { + $name .= '.' . $prefix; + } + swoole_set_process_name($name); + } + + /** * @return string * @throws Exception @@ -311,7 +329,6 @@ class Kiri } - const PROCESS = 'process'; const TASK = 'task'; const WORKER = 'worker'; diff --git a/function.php b/function.php index 13ff8505..b123cabb 100644 --- a/function.php +++ b/function.php @@ -29,27 +29,48 @@ if (!function_exists('make')) { */ function make($name, $default = NULL): mixed { - if (class_exists($name)) { - return Kiri::createObject($name); + if (is_string($name)) { + if (Kiri::has($name)) { + return Kiri::app()->get($name); + } + if (empty($default)) { + throw new Exception("Unknown component ID: $name"); + } + if (Kiri::has($default)) { + return Kiri::app()->get($default); + } + return null; } - if (Kiri::has($name)) { - return Kiri::app()->get($name); - } - if (empty($default)) { - throw new Exception("Unknown component ID: $name"); - } - if (Kiri::has($default)) { - return Kiri::app()->get($default); - } - $class = Kiri::createObject($default); - class_alias($name, $default, TRUE); - return $class; + return Kiri::createObject($default); } } +if (!function_exists('call')) { + + + /** + * @param $handler + * @param mixed ...$params + * @return mixed + * @throws Exception + */ + function call($handler, ...$params): mixed + { + if (is_array($handler) && is_string($handler[0])) { + $handler[0] = di($handler[0]); + } + if (!is_callable($handler, true)) { + throw new Exception('Call function not exists.'); + } + return call_user_func($handler, ...$params); + } + +} + + if (!function_exists('map')) { diff --git a/kiri-coroutine-server/Command.php b/kiri-coroutine-server/Command.php new file mode 100644 index 00000000..c1de4614 --- /dev/null +++ b/kiri-coroutine-server/Command.php @@ -0,0 +1,75 @@ + */ + private array $servers = []; + + + /** + * @return void + * @throws \Exception + */ + public function init(): void + { + $this->getServers(); + run(function () { + $this->sig(); + + $waite = new Coroutine\WaitGroup(); + foreach ($this->servers as $server) { + $waite->add(); + $server->run($waite); + } + $waite->wait(); + }); + } + + + /** + * @return void + */ + public function sig(): void + { + Coroutine::create(function () { + $data = Coroutine::waitSignal(SIGTERM | SIGINT, -1); + if ($data) { + foreach ($this->servers as $server) { + $server->stop(); + } + } + }); + } + + + /** + * @return void + * @throws \Exception + */ + public function getServers(): void + { + foreach ($this->arrays as $array) { + if (isset($this->servers[$array['name']])) { + throw new \Exception(''); + } + + $server = new Server(\Kiri::getDi(), []); + $server->setReusePort($array['reuse_port']); + $server->setHost($array['host']); + $server->setPort($array['port']); + $server->setIsSsl($array['isSsl']); + + $this->servers[$array['name']] = $server; + } + } + +} diff --git a/kiri-coroutine-server/Server.php b/kiri-coroutine-server/Server.php new file mode 100644 index 00000000..fc2b0994 --- /dev/null +++ b/kiri-coroutine-server/Server.php @@ -0,0 +1,271 @@ +server = new Coroutine\Http\Server($this->host, $this->port, $this->isSsl, $this->reuse_port); + $this->server->set(['max_coroutine' => 500000]); + $this->server->handle('/', [$this, 'actor']); + } + + + /** + * @param Coroutine\WaitGroup $group + * @return void + */ + public function run(Coroutine\WaitGroup $group): void + { + Coroutine::create(function () use ($group) { + $this->dispatch->dispatch(new OnWorkerStart(null, 0)); + + $this->start($group); + }); + } + + + /** + * @param WaitGroup $group + * @return void + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + * @throws ReflectionException + */ + public function start(WaitGroup $group): void + { + $this->server->start(); + $this->dispatch->dispatch(new OnBeforeShutdown()); + if ($this->isShutdown === false) { + $this->start($group); + } else { + $group->done(); + } + } + + + public function stop() + { + $this->isShutdown = true; + $this->server->shutdown(); + } + + /** + * @param $path + * @param $method + * @param array|Closure $closure + * @return void + */ + public function handler($path, $method, array|Closure $closure): void + { + $this->router[$path] = [strtolower($method), $closure]; + } + + + /** + * @param Request $request + * @param Response $response + * @return mixed + * @throws Exception + */ + public function actor(Request $request, Response $response): mixed + { + Context::setContext(ResponseInterface::class, new \Kiri\Message\Response()); + Context::setContext(RequestInterface::class, ServerRequest::createServerRequest($request)); + + if ($request['request_method'] === 'HEAD') { + return $this->write('', $response, 200); + } + + [$method, $handler] = $this->router[$request['request_uri']] ?? [null, null]; + if (is_null($handler)) { + return $this->write('Page not found.', $response, 404); + } + if (!is_callable($handler, true)) { + return $this->write('Page not found.', $response, 404); + } + if ($method !== $request['request_method']) { + return $this->write('Page allow.', $response, 405); + } + if (isset($request->header['upgrade']) && $request->header['upgrade'] == 'websocket') { + defer(function () use ($handler, $request) { + if (!$handler instanceof OnOpenInterface) { + return; + } + $handler->onOpen($this->server, $request); + }); + if ($handler instanceof OnHandshakeInterface) { + $handler->onHandshake($request, $response); + } else { + $response->upgrade(); + } + while (true) { + $read = $response->recv(); + if ($read === '' || $read === null || $read instanceof CloseFrame) { + break; + } + if ($handler instanceof OnMessageInterface) { + $handler->onMessage($this->server, $read); + } + } + if ($handler instanceof OnCloseInterface) { + $handler->onClose($this->server, $response->fd); + } + return null; + } + + $params = $this->container->getArgs($handler[0], $handler[1] ?? null); + $result = call_user_func($handler, ...$params); + if (is_null($result)) { + return $this->write("", $response); + } else { + return $this->write($result, $response); + } + } + + + /** + * @param mixed $message + * @param Response $response + * @param int $statusCode + * @return mixed + */ + private function write(mixed $message, Response $response, int $statusCode = 200): mixed + { + $result = $message; + if ($message instanceof ResponseInterface) { + $result = $result->getBody()->getContents(); + $response->setStatusCode($message->getStatusCode()); + } else { + $message = Context::getContext(ResponseInterface::class); + $response->setStatusCode($statusCode); + } + + $headers = $message->getHeaders(); + if (is_array($headers)) foreach ($headers as $key => $header) { + $response->setHeader($key, $header); + } + + if (!isset($response->header['content-type'])) { + $response->header('content-type', 'application/json'); + } else if (!isset($response->header['Content-Type'])) { + $response->header('content-type', 'application/json'); + } + + $headers = $message->getCookieParams(); + if (is_array($headers)) foreach ($headers as $key => $header) { + $response->cookie($key, ...$header); + } + + if (is_object($result)) { + $result = $result instanceof ToArray ? $result->toArray() : get_object_vars($result); + } + if (is_array($result)) { + $result = json_encode($result, JSON_UNESCAPED_UNICODE); + } + return $response->end($result); + } + + /** + * @param string $host + */ + public function setHost(string $host): void + { + $this->host = $host; + } + + /** + * @param int $port + */ + public function setPort(int $port): void + { + $this->port = $port; + } + + /** + * @param bool $isSsl + */ + public function setIsSsl(bool $isSsl): void + { + $this->isSsl = $isSsl; + } + + /** + * @param bool $reuse_port + */ + public function setReusePort(bool $reuse_port): void + { + $this->reuse_port = $reuse_port; + } + +} diff --git a/kiri-engine/Abstracts/BaseApplication.php b/kiri-engine/Abstracts/BaseApplication.php index 19d3f9af..0ccf3720 100644 --- a/kiri-engine/Abstracts/BaseApplication.php +++ b/kiri-engine/Abstracts/BaseApplication.php @@ -15,7 +15,7 @@ use Kiri; use Kiri\Di\LocalService; use Kiri\Error\{ErrorHandler, StdoutLogger, StdoutLoggerInterface}; use Kiri\Exception\{InitException}; -use Psr\Container\ContainerInterface; +use Kiri\Di\ContainerInterface; use Kiri\Server\{Server}; use Psr\Container\ContainerExceptionInterface; use Psr\Container\NotFoundExceptionInterface; diff --git a/kiri-engine/Abstracts/Logger.php b/kiri-engine/Abstracts/Logger.php index 056d6866..e4cba1a3 100644 --- a/kiri-engine/Abstracts/Logger.php +++ b/kiri-engine/Abstracts/Logger.php @@ -28,11 +28,9 @@ class Logger implements LoggerInterface const DEBUG = 'debug'; - const LOGGER_LEVELS = [Logger::EMERGENCY, Logger::ALERT, Logger::CRITICAL, Logger::ERROR, Logger::WARNING, Logger::NOTICE, Logger::INFO, Logger::DEBUG]; - /** * @param string $message * @param array $context @@ -158,12 +156,11 @@ class Logger implements LoggerInterface } - /** * @return void * @throws Exception */ - public function flush() + public function flush(): void { $this->removeFile(storage()); } @@ -173,7 +170,7 @@ class Logger implements LoggerInterface * @param string $dirname * @return void */ - private function removeFile(string $dirname) + private function removeFile(string $dirname): void { $paths = new DirectoryIterator($dirname); /** @var DirectoryIterator $path */ @@ -197,6 +194,9 @@ class Logger implements LoggerInterface */ private function _string($message, $context): string { + if ($context instanceof \Throwable) { + $context = ['file' => $context->getFile(), 'line' => $context->getLine()]; + } if (!empty($context)) { return $message . ' ' . PHP_EOL . print_r($context, true) . PHP_EOL; } diff --git a/kiri-engine/Reload/Inotify.php b/kiri-engine/Reload/Inotify.php new file mode 100644 index 00000000..e2188ef7 --- /dev/null +++ b/kiri-engine/Reload/Inotify.php @@ -0,0 +1,212 @@ +isStop()) { + break; + } + sleep(1); + } + return; + } + $this->dirs = Config::get('reload.inotify', []); + $this->start(); + } + + + public function onSigterm(): static + { + pcntl_signal(SIGTERM, function () { + $this->isStop = true; + }); + return $this; + } + + + /** + * @return void + */ + public function error(): void + { + + } + + + /** + * @throws Exception + */ + public function start() + { + $this->inotify = inotify_init(); + $this->events = IN_MODIFY | IN_DELETE | IN_CREATE | IN_MOVE; + foreach ($this->dirs as $dir) { + if (!is_dir($dir)) continue; + $this->watch($dir); + } + Event::add($this->inotify, [$this, 'check']); + Event::cycle(function () { + if ($this->isStop()) { + Event::del($this->inotify); + Event::exit(); + } + }, true); + Event::wait(); + } + + + /** + * 开始监听 + * @throws Exception + */ + public function check() + { + if (!($events = inotify_read($this->inotify))) { + return; + } + if ($this->isReloading) { + return; + } + + $LISTEN_TYPE = [IN_CREATE, IN_DELETE, IN_MODIFY, IN_MOVED_TO, IN_MOVED_FROM]; + foreach ($events as $ev) { + if (!in_array($ev['mask'], $LISTEN_TYPE)) { + continue; + } + + //非重启类型 + if (str_ends_with($ev['name'], '.php')) { + if ($this->isReloading) { + break; + } + $this->isReloading = TRUE; + Timer::after(3000, fn() => $this->reload()); + } + } + } + + /** + * @throws Exception + */ + public function reload() + { + $swollen = \Kiri::getDi()->get(SwooleServerInterface::class); + + $swollen->reload(); + + $this->clearWatch(); + foreach ($this->dirs as $root) { + $this->watch($root); + } + $this->isReloading = FALSE; + } + + + /** + * @throws Exception + */ + public function clearWatch() + { + foreach ($this->watchFiles as $wd) { + @inotify_rm_watch($this->inotify, $wd); + } + $this->watchFiles = []; + } + + + /** + * @param $dir + * @return bool + * @throws Exception + */ + public function watch($dir): bool + { + //目录不存在 + if (!is_dir($dir)) { + return $this->logger->addError("[$dir] is not a directory."); + } + //避免重复监听 + if (isset($this->watchFiles[$dir])) { + return FALSE; + } + + if (in_array($dir, self::IG_DIR)) { + return FALSE; + } + + $wd = @inotify_add_watch($this->inotify, $dir, $this->events); + $this->watchFiles[$dir] = $wd; + + $files = scandir($dir); + foreach ($files as $f) { + if ($f == '.' || $f == '..') { + continue; + } + $path = $dir . '/' . $f; + //递归目录 + if (is_dir($path)) { + $this->watch($path); + } else if (!str_ends_with($f, '.php')) { + continue; + } + //检测文件类型 + if (strstr($f, '.') == '.php') { + $wd = @inotify_add_watch($this->inotify, $path, $this->events); + $this->watchFiles[$path] = $wd; + } + } + return TRUE; + } +} diff --git a/kiri-engine/Reload/Scaner.php b/kiri-engine/Reload/Scaner.php new file mode 100644 index 00000000..fdc7a415 --- /dev/null +++ b/kiri-engine/Reload/Scaner.php @@ -0,0 +1,168 @@ +dirs = Config::get('reload.inotify', []); + + $this->loadDirs(); + $this->tick(); + } + + + /** + * @param bool $isReload + * @throws Exception + */ + private function loadDirs(bool $isReload = FALSE) + { + foreach ($this->dirs as $value) { + if (is_bool($path = realpath($value))) { + continue; + } + + if (!is_dir($path)) continue; + + $this->loadByDir($path, $isReload); + } + } + + + /** + * @param $path + * @param bool $isReload + * @return void + * @throws Exception + */ + private function loadByDir($path, bool $isReload = FALSE): void + { + if (!is_string($path)) { + return; + } + $path = rtrim($path, '/'); + foreach (glob(realpath($path) . '/*') as $value) { + if (is_dir($value)) { + $this->loadByDir($value, $isReload); + } + if (is_file($value)) { + if ($this->checkFile($value, $isReload)) { + if ($this->isReloading) { + break; + } + $this->isReloading = TRUE; + + sleep(2); + + $this->timerReload(); + break; + } + } + } + } + + + /** + * @param $value + * @param $isReload + * @return bool + */ + private function checkFile($value, $isReload): bool + { + $md5 = md5($value); + $mTime = filectime($value); + if (!isset($this->md5Map[$md5])) { + if ($isReload) { + return TRUE; + } + $this->md5Map[$md5] = $mTime; + } else { + if ($this->md5Map[$md5] != $mTime) { + if ($isReload) { + return TRUE; + } + $this->md5Map[$md5] = $mTime; + } + } + return FALSE; + } + + + /** + * @throws Exception + */ + public function timerReload() + { + $this->isReloading = TRUE; + + $this->logger->warning('file change'); + + $swow = \Kiri::getDi()->get(SwooleServerInterface::class); + + $swow->reload(); + + $this->loadDirs(); + + $this->isReloading = FALSE; + + $this->tick(); + } + + + /** + * @return $this + */ + public function onSigterm(): static + { + pcntl_signal(SIGTERM, function () { + $this->onProcessStop(); + }); + return $this; + } + + + /** + * @throws Exception + */ + public function tick() + { + if ($this->isStop) { + return; + } + + $this->loadDirs(TRUE); + + sleep(2); + + $this->tick(); + } + +} diff --git a/kiri-task/Annotation/AsynchronousTask.php b/kiri-task/Annotation/AsynchronousTask.php index ddc67e75..c5903c9d 100644 --- a/kiri-task/Annotation/AsynchronousTask.php +++ b/kiri-task/Annotation/AsynchronousTask.php @@ -4,7 +4,7 @@ namespace Kiri\Task\Annotation; use Kiri\Annotation\AbstractAttribute; -use Kiri\Task\TaskManager; +use Kiri\Task\TaskContainer; #[\Attribute(\Attribute::TARGET_CLASS)] class AsynchronousTask extends AbstractAttribute { @@ -25,9 +25,9 @@ use Kiri\Task\TaskManager; */ public function execute(mixed $class, mixed $method = ''): mixed { - $AsyncTaskExecute = \Kiri::getDi()->get(TaskManager::class); - $AsyncTaskExecute->add($this->name, $class::class); - return parent::execute($class, $method); // TODO: Change the autogenerated stub + // TODO: Change the autogenerated stub + di(TaskContainer::class)->add($this->name, $class::class); + return parent::execute($class, $method); } } diff --git a/kiri-task/OnServerTask.php b/kiri-task/OnServerTask.php index 513c6876..aac5242c 100644 --- a/kiri-task/OnServerTask.php +++ b/kiri-task/OnServerTask.php @@ -30,7 +30,7 @@ class OnServerTask * @param mixed $data * @throws ConfigException */ - public function onTask(Server $server, int $task_id, int $src_worker_id, mixed $data) + public function onTask(Server $server, int $task_id, int $src_worker_id, mixed $data): void { try { $data = $this->resolve($data); @@ -49,7 +49,7 @@ class OnServerTask * @param Server\Task $task * @throws ConfigException */ - public function onCoroutineTask(?Server $server, Server\Task $task) + public function onCoroutineTask(?Server $server, Server\Task $task): void { try { $data = $this->resolve($task->data); @@ -82,7 +82,7 @@ class OnServerTask * @param int $task_id * @param mixed $data */ - public function onFinish(Server $server, int $task_id, mixed $data) + public function onFinish(Server $server, int $task_id, mixed $data): void { if (!($data instanceof OnTaskInterface)) { return; diff --git a/kiri-task/OnTaskInterface.php b/kiri-task/OnTaskInterface.php index 7bcc389c..16cb0f7c 100644 --- a/kiri-task/OnTaskInterface.php +++ b/kiri-task/OnTaskInterface.php @@ -12,6 +12,6 @@ interface OnTaskInterface public function execute(); - public function finish(Server $server, int $task_id); + public function finish(?Server $server, int $task_id); } diff --git a/kiri-task/TaskManager.php b/kiri-task/TaskContainer.php similarity index 60% rename from kiri-task/TaskManager.php rename to kiri-task/TaskContainer.php index c398abf1..31c207f1 100644 --- a/kiri-task/TaskManager.php +++ b/kiri-task/TaskContainer.php @@ -7,11 +7,11 @@ use JetBrains\PhpStorm\Pure; use Kiri\Abstracts\Component; use Kiri\Core\HashMap; use Psr\Container\ContainerExceptionInterface; -use Psr\Container\ContainerInterface; +use Kiri\Di\ContainerInterface; use Psr\Container\NotFoundExceptionInterface; use Swoole\Server; -class TaskManager extends Component +class TaskContainer extends Component { @@ -30,30 +30,6 @@ class TaskManager extends Component } - /** - * @param Server $swollen - * @return void - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - */ - public function taskListener(Server $swollen) - { - if (!isset($swollen->setting['task_worker_num']) || $swollen->setting['task_worker_num'] < 1) { - return; - } - - $task_use_object = $swollen->setting['task_object'] ?? $swollen->setting['task_use_object'] ?? false; - $reflect = $this->container->get(OnServerTask::class); - - $swollen->on('finish', [$reflect, 'onFinish']); - if ($task_use_object || $swollen->setting['task_enable_coroutine']) { - $swollen->on('task', [$reflect, 'onCoroutineTask']); - } else { - $swollen->on('task', [$reflect, 'onTask']); - } - } - - /** * @param string $key * @param $handler diff --git a/kiri-task/AsyncTaskExecute.php b/kiri-task/TaskExecute.php similarity index 51% rename from kiri-task/AsyncTaskExecute.php rename to kiri-task/TaskExecute.php index aa90bd37..106110b0 100644 --- a/kiri-task/AsyncTaskExecute.php +++ b/kiri-task/TaskExecute.php @@ -5,26 +5,28 @@ namespace Kiri\Task; use Exception; use Kiri; use Kiri\Abstracts\Component; -use Kiri\Server\SwooleServerInterface; +use Kiri\Server\ServerInterface; use Psr\Container\ContainerExceptionInterface; -use Psr\Container\ContainerInterface; +use Kiri\Di\ContainerInterface; use Psr\Container\NotFoundExceptionInterface; +use Swoole\Coroutine; +use Swoole\Server; /** * */ -class AsyncTaskExecute extends Component +class TaskExecute extends Component { /** - * @param TaskManager $hashMap + * @param TaskContainer $hashMap * @param ContainerInterface $container * @param array $config * @throws Exception */ - public function __construct(public TaskManager $hashMap, public ContainerInterface $container, array $config = []) + public function __construct(public TaskContainer $hashMap, public ContainerInterface $container, array $config = []) { parent::__construct($config); } @@ -43,16 +45,45 @@ class AsyncTaskExecute extends Component if (is_string($handler)) { $handler = $this->handle($handler, $params); } - if ($this->container->has(SwooleServerInterface::class)) { - $server = $this->container->get(SwooleServerInterface::class); - if ($workerId < 0 || $workerId > $server->setting['task_worker_num']) { - $workerId = random_int(0, $server->setting['task_worker_num'] - 1); - } - $server->task(serialize($handler), $workerId); + if ($this->container->has(ServerInterface::class)) { + $this->onAsyncExec($handler, $workerId); + } else { + Coroutine::create(fn() => $this->onCoronExec($handler)); } } + /** + * @param OnTaskInterface|string $handler + * @param int $workerId + * @return bool + * @throws Exception + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + */ + protected function onAsyncExec(OnTaskInterface|string $handler, int $workerId = -1): bool + { + /** @var Server $server */ + $server = $this->container->get(ServerInterface::class); + if ($workerId < 0 || $workerId > $server->setting['task_worker_num']) { + $workerId = random_int(0, $server->setting['task_worker_num'] - 1); + } + return (bool)$server->task(serialize($handler), $workerId); + } + + + /** + * @param OnTaskInterface|string $handler + * @return bool + */ + protected function onCoronExec(OnTaskInterface|string $handler): bool + { + $handler->execute(); + $handler->finish(null, Coroutine::getCid()); + return true; + } + + /** * @param $handler * @param $params diff --git a/kiri-websocket-server/Contract/OnCloseInterface.php b/kiri-websocket-server/Contract/OnCloseInterface.php new file mode 100644 index 00000000..253c8385 --- /dev/null +++ b/kiri-websocket-server/Contract/OnCloseInterface.php @@ -0,0 +1,22 @@ +router = $this->collector->get('wss'); + } + + + /** + * @return void + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + * @throws ConfigException + */ + public function init(): void + { + $exception = Config::get('exception.http', ExceptionHandlerDispatcher::class); + if (!in_array(ExceptionHandlerInterface::class, class_implements($exception))) { + $exception = ExceptionHandlerDispatcher::class; + } + $this->exception = $this->container->get($exception); + + $this->handler = $this->router->find('/', 'GET'); + } + + /** + * @param Request $request + * @param Response $response + * @return void + * @throws Exception + */ + public function onHandshake(Request $request, Response $response): void + { + try { + /** @var \Kiri\Message\Response $psrResponse */ + $psrResponse = Context::setContext(ResponseInterface::class, new \Kiri\Message\Response()); + + /** @var ServerRequest $psrRequest */ + $psrRequest = Context::setContext(RequestInterface::class, ServerRequest::createServerRequest($request)); + + $handler = $this->router->find('/', 'GET'); + if (is_integer($handler)) { + $psrResponse->withContent('Allow Method[' . $request->getMethod() . '].')->withStatus(405); + } else if (is_null($handler)) { + $psrResponse->withContent('Page not found.')->withStatus(404); + } else { + $psrResponse = $this->dispatcher->with($handler)->handle($psrRequest); + + $executor = $handler->callback; + $response->upgrade(); + if ($handler instanceof OnHandshakeInterface) { + $statusCode = $handler->onHandshake($request, $response); + $response->setStatusCode($statusCode); + $response->write("connect ok."); + } + if ($executor instanceof OnOpenInterface) { + $executor->onOpen($this->server, $request); + } + while (true) { + $frame = $response->recv(); + if ($frame === false || $frame instanceof CloseFrame || $frame === '') { + $handler->onClose($this->server, $response->fd); + break; + } + $handler->onMessage($this->server, $frame); + } + } + } catch (\Throwable $throwable) { + $psrResponse = $this->exception->emit($throwable, di(CResponse::class)); + } finally { + $this->emitter->sender($response, $psrResponse); + } + } + + + /** + * @param WsServer|Coroutine\Http\Server $server + * @param Frame $frame + * @return void + */ + public function onMessage(WsServer|WhServer $server, Frame $frame): void + { + $handler = $this->handler->callback; + if ($handler instanceof OnMessageInterface) { + $handler->onMessage($server, $frame); + } + } + + + /** + * @param WsServer|Coroutine\Http\Server $server + * @param int $fd + * @return void + */ + public function onClose(WsServer|WhServer $server, int $fd): void + { + $handler = $this->handler->callback; + if ($handler instanceof OnCloseInterface) { + $handler->onClose($server, $fd); + } + } + + + /** + * @param WsServer|WhServer $server + * @param Request $request + * @return void + */ + public function onOpen(WsServer|WhServer $server, Request $request): void + { + $handler = $this->handler->callback; + if ($handler instanceof OnOpenInterface) { + $handler->onOpen($server, $request); + } + } + + +} diff --git a/kiri-websocket-server/TestSocketServer.php b/kiri-websocket-server/TestSocketServer.php new file mode 100644 index 00000000..e8f2d306 --- /dev/null +++ b/kiri-websocket-server/TestSocketServer.php @@ -0,0 +1,38 @@ +handle('/', function () { + }); + $server->start(); +});