From f091fd5ba03318ab7c6749b7c7f56a2fc0b177f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mr=C2=B7x?= Date: Sun, 10 Apr 2022 03:37:58 +0800 Subject: [PATCH] modify mysql result --- Handler/OnServerWorker.php | 173 ++++++++++++++++++---------------- Inotify.php | 188 +++++++++++++++++++++++++++++++++++++ Server.php | 27 +++--- 3 files changed, 295 insertions(+), 93 deletions(-) create mode 100644 Inotify.php diff --git a/Handler/OnServerWorker.php b/Handler/OnServerWorker.php index ebf6772..4ffd2a4 100644 --- a/Handler/OnServerWorker.php +++ b/Handler/OnServerWorker.php @@ -16,6 +16,9 @@ use Kiri\Server\Events\OnWorkerError; use Kiri\Server\Events\OnWorkerExit; use Kiri\Server\Events\OnWorkerStart; use Kiri\Server\Events\OnWorkerStop; +use Kiri\Message\Waite; +use Psr\Container\ContainerExceptionInterface; +use Psr\Container\NotFoundExceptionInterface; use Swoole\Server; use Swoole\Timer; @@ -28,100 +31,110 @@ class OnServerWorker extends \Kiri\Server\Abstracts\Server { - public Router $collector; + public Router $router; - /** - * @return void - */ - public function init() - { -// $this->collector = Kiri::getDi()->get(Router::class); - } + /** + * @return void + */ + public function init() + { + $this->router = Kiri::getDi()->get(Router::class); + } - /** - * @param Server $server - * @param int $workerId - * @throws Exception - */ - public function onWorkerStart(Server $server, int $workerId) - { - $dispatch = \Kiri::getDi()->get(EventDispatch::class); - $dispatch->dispatch(new OnBeforeWorkerStart($workerId)); - set_env('environmental_workerId', $workerId); - if ($workerId < $server->setting['worker_num']) { - set_env('environmental', Kiri::WORKER); - $this->setProcessName(sprintf('Worker Process[%d].%d', $server->worker_pid, $workerId)); - $dispatch->dispatch(new OnWorkerStart($server, $workerId)); - } else { - set_env('environmental', Kiri::TASK); - $this->setProcessName(sprintf('Tasker Process[%d].%d', $server->worker_pid, $workerId)); - $dispatch->dispatch(new OnTaskStart($server, $workerId)); - } - $dispatch->dispatch(new OnAfterWorkerStart()); - } + /** + * @param Server $server + * @param int $workerId + * @return void + * @throws Kiri\Exception\ConfigException + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + * @throws Exception + */ + public function onWorkerStart(Server $server, int $workerId) + { + $dispatch = \Kiri::getDi()->get(EventDispatch::class); + $dispatch->dispatch(new OnBeforeWorkerStart($workerId)); + set_env('environmental_workerId', $workerId); + if ($workerId < $server->setting['worker_num']) { + set_env('environmental', Kiri::WORKER); + $this->setProcessName(sprintf('Worker Process[%d].%d', $server->worker_pid, $workerId)); + $dispatch->dispatch(new OnWorkerStart($server, $workerId)); + + $this->router->scan_build_route(); + } else { + set_env('environmental', Kiri::TASK); + $this->setProcessName(sprintf('Tasker Process[%d].%d', $server->worker_pid, $workerId)); + $dispatch->dispatch(new OnTaskStart($server, $workerId)); + } + $dispatch->dispatch(new OnAfterWorkerStart()); + } - /** - * @param Server $server - * @param int $workerId - * @throws Exception - */ - public function onWorkerStop(Server $server, int $workerId) - { - Timer::clearAll(); - \Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnWorkerStop($server, $workerId)); - } + /** + * @param Server $server + * @param int $workerId + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + */ + public function onWorkerStop(Server $server, int $workerId) + { + Timer::clearAll(); + \Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnWorkerStop($server, $workerId)); + } - /** - * @param Server $server - * @param int $workerId - * @throws Exception - */ - public function onWorkerExit(Server $server, int $workerId) - { - \Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnWorkerExit($server, $workerId)); - } + /** + * @param Server $server + * @param int $workerId + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + */ + public function onWorkerExit(Server $server, int $workerId) + { + \Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnWorkerExit($server, $workerId)); + } - /** - * @param Server $server - * @param int $worker_id - * @param int $worker_pid - * @param int $exit_code - * @param int $signal - * @throws Exception - */ - public function onWorkerError(Server $server, int $worker_id, int $worker_pid, int $exit_code, int $signal) - { - \Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnWorkerError($server, $worker_id, $worker_pid, $exit_code, $signal)); + /** + * @param Server $server + * @param int $worker_id + * @param int $worker_pid + * @param int $exit_code + * @param int $signal + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + * @throws Exception + */ + public function onWorkerError(Server $server, int $worker_id, int $worker_pid, int $exit_code, int $signal) + { + \Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnWorkerError($server, $worker_id, $worker_pid, $exit_code, $signal)); - $message = sprintf('Worker#%d::%d error stop. signal %d, exit_code %d, msg %s', - $worker_id, $worker_pid, $signal, $exit_code, swoole_strerror(swoole_last_error(), 9) - ); + $message = sprintf('Worker#%d::%d error stop. signal %d, exit_code %d, msg %s', + $worker_id, $worker_pid, $signal, $exit_code, swoole_strerror(swoole_last_error(), 9) + ); - $this->logger->error($message); + $this->logger->error($message); - $this->system_mail($message); - } + $this->system_mail($message); + } - /** - * @param $messageContent - * @throws Exception - */ - protected function system_mail($messageContent) - { - try { - $email = Config::get('email', ['enable' => false]); - if (!empty($email) && ($email['enable'] ?? false) == true) { - Help::sendEmail($email, 'Service Error', $messageContent); - } - } catch (\Throwable $e) { - error($e, 'email'); - } - } + /** + * @param $messageContent + * @throws Exception + */ + protected function system_mail($messageContent) + { + try { + $email = Config::get('email', ['enable' => false]); + if (!empty($email) && ($email['enable'] ?? false) == true) { + Help::sendEmail($email, 'Service Error', $messageContent); + } + } catch (\Throwable $e) { + error($e, 'email'); + } + } } diff --git a/Inotify.php b/Inotify.php new file mode 100644 index 0000000..adfc5f4 --- /dev/null +++ b/Inotify.php @@ -0,0 +1,188 @@ +dirs = [CONTROLLER_PATH, MODEL_PATH, APP_PATH . 'routes', APP_PATH . 'vendor/game-worker']; + + $this->start(); + } + + + /** + * @return void + */ + public function error(): void + { + + } + + + /** + * @throws Exception + */ + public function start() + { + $this->inotify = inotify_init(); + $this->events = IN_MODIFY | IN_DELETE | IN_CREATE | IN_MOVE; + foreach ($this->dirs as $dir) { + if (!is_dir($dir)) continue; + $this->watch($dir); + } + Event::add($this->inotify, [$this, 'check']); + Event::wait(); + } + + + public function clear() + { + Event::del($this->inotify); + Event::exit(); + } + + + /** + * 开始监听 + * @throws Exception + */ + public function check() + { + if (!($events = inotify_read($this->inotify))) { + return; + } + if ($this->isReloading) { + return; + } + + $LISTEN_TYPE = [IN_CREATE, IN_DELETE, IN_MODIFY, IN_MOVED_TO, IN_MOVED_FROM]; + foreach ($events as $ev) { + if (!in_array($ev['mask'], $LISTEN_TYPE)) { + continue; + } + + //非重启类型 + if (str_ends_with($ev['name'], '.php')) { + if ($this->isReloading) { + break; + } + $this->isReloading = TRUE; + Timer::after(3000, fn() => $this->reload()); + } + } + } + + /** + * @throws Exception + */ + public function reload() + { + $swollen = \Kiri::getDi()->get(SwooleServerInterface::class); + + $swollen->reload(); + + $this->clearWatch(); + foreach ($this->dirs as $root) { + $this->watch($root); + } + $this->isReloading = FALSE; + } + + + /** + * @throws Exception + */ + public function clearWatch() + { + foreach ($this->watchFiles as $wd) { + @inotify_rm_watch($this->inotify, $wd); + } + $this->watchFiles = []; + } + + + /** + * @param $dir + * @return bool + * @throws Exception + */ + public function watch($dir): bool + { + //目录不存在 + if (!is_dir($dir)) { + return $this->logger->addError("[$dir] is not a directory."); + } + //避免重复监听 + if (isset($this->watchFiles[$dir])) { + return FALSE; + } + + if (in_array($dir, self::IG_DIR)) { + return FALSE; + } + + $wd = @inotify_add_watch($this->inotify, $dir, $this->events); + $this->watchFiles[$dir] = $wd; + + $files = scandir($dir); + foreach ($files as $f) { + if ($f == '.' || $f == '..') { + continue; + } + $path = $dir . '/' . $f; + //递归目录 + if (is_dir($path)) { + $this->watch($path); + } else if (!str_ends_with($f, '.php')) { + continue; + } + //检测文件类型 + if (strstr($f, '.') == '.php') { + $wd = @inotify_add_watch($this->inotify, $path, $this->events); + $this->watchFiles[$path] = $wd; + } + } + return TRUE; + } +} diff --git a/Server.php b/Server.php index 6397791..99134a7 100644 --- a/Server.php +++ b/Server.php @@ -39,13 +39,16 @@ class Server extends HttpService private mixed $daemon = 0; - /** - * @param State $state - * @param ServerManager $manager - * @param ContainerInterface $container - * @param array $config - * @throws Exception - */ + /** + * @param State $state + * @param ServerManager $manager + * @param ContainerInterface $container + * @param ProcessManager $processManager + * @param EventDispatch $eventDispatch + * @param Router $router + * @param array $config + * @throws Exception + */ public function __construct(public State $state, public ServerManager $manager, public ContainerInterface $container, @@ -97,8 +100,7 @@ class Server extends HttpService * @throws ConfigException * @throws ContainerExceptionInterface * @throws NotFoundExceptionInterface - * @throws ReflectionException - * @throws Exception + * @throws Exception */ public function start(): void { @@ -109,14 +111,14 @@ class Server extends HttpService $this->manager->addListener($rpcService['type'], $rpcService['host'], $rpcService['port'], $rpcService['mode'], $rpcService); } + $this->process[] = Inotify::class; + $processes = array_merge($this->process, Config::get('processes', [])); $this->processManager->batch($processes); $this->eventDispatch->dispatch(new OnServerBeforeStart()); - $this->router->scan_build_route(); - $this->manager->start(); } @@ -126,8 +128,7 @@ class Server extends HttpService * @throws ConfigException * @throws ContainerExceptionInterface * @throws NotFoundExceptionInterface - * @throws ReflectionException - * @throws Exception + * @throws Exception */ public function shutdown() {