From eeb925b5b93f8449f891227fb64a7d4f8a34d8aa Mon Sep 17 00:00:00 2001 From: whwyy Date: Wed, 31 Dec 2025 00:19:28 +0800 Subject: [PATCH] eee --- Abstracts/Server.php | 26 +++-- Abstracts/TraitServer.php | 2 +- Handler/OnPipeMessage.php | 2 +- Handler/OnServerWorker.php | 230 ++++++++++++++++++------------------- Task/Task.php | 4 +- 5 files changed, 139 insertions(+), 125 deletions(-) diff --git a/Abstracts/Server.php b/Abstracts/Server.php index d7da7bb..02fec2c 100644 --- a/Abstracts/Server.php +++ b/Abstracts/Server.php @@ -4,6 +4,8 @@ namespace Kiri\Server\Abstracts; +use Kiri\Error\StdoutLogger; + /** * Class Server * @package Server\Abstracts @@ -11,12 +13,22 @@ namespace Kiri\Server\Abstracts; abstract class Server { - /** - * Server constructor. - * @throws - */ - public function __construct() - { - } + /** + * Server constructor. + * @throws + */ + public function __construct() + { + } + + + + /** + * @return StdoutLogger + */ + protected function getLogger(): StdoutLogger + { + return \Kiri::getLogger(); + } } diff --git a/Abstracts/TraitServer.php b/Abstracts/TraitServer.php index 9039f13..dcca763 100644 --- a/Abstracts/TraitServer.php +++ b/Abstracts/TraitServer.php @@ -46,7 +46,7 @@ trait TraitServer Kiri::getLogger()->alert('Pid ' . getmypid() . ' get signo ' . $no); $this->shutdown(); } catch (\Throwable $exception) { - error($exception); + \Kiri::getLogger()->json_log($exception); } } diff --git a/Handler/OnPipeMessage.php b/Handler/OnPipeMessage.php index dc1fcc3..1bd83f1 100644 --- a/Handler/OnPipeMessage.php +++ b/Handler/OnPipeMessage.php @@ -30,7 +30,7 @@ class OnPipeMessage extends Server } call_user_func([$message, 'process'], $server, $src_worker_id); } catch (\Throwable $throwable) { - Kiri::getLogger()->error(throwable($throwable)); + \Kiri::getLogger()->json_log($throwable, ['src_worker_id' => $src_worker_id, 'message' => $message]); } } diff --git a/Handler/OnServerWorker.php b/Handler/OnServerWorker.php index 5104544..d847608 100644 --- a/Handler/OnServerWorker.php +++ b/Handler/OnServerWorker.php @@ -30,139 +30,139 @@ class OnServerWorker extends Kiri\Server\Abstracts\Server { - /** - * @var EventDispatch - */ - #[Container(EventDispatch::class)] - public EventDispatch $dispatch; + /** + * @var EventDispatch + */ + #[Container(EventDispatch::class)] + public EventDispatch $dispatch; - /** - * @return void - */ - public function init(): void - { - on(OnBeforeWorkerStart::class, [$this, 'onWorkerNameAlias']); - } + /** + * @return void + */ + public function init(): void + { + on(OnBeforeWorkerStart::class, [$this, 'onWorkerNameAlias']); + } - /** - * @param OnBeforeWorkerStart $workerStart - * @return void - */ - public function onWorkerNameAlias(OnBeforeWorkerStart $workerStart): void - { - set_env('environmental_workerId', $workerStart->workerId); - if ($workerStart->workerId < $workerStart->server->setting['worker_num']) { - $this->processName($workerStart->server, 'Worker'); - set_env('environmental', Kiri::WORKER); - } else { - $this->processName($workerStart->server, 'Tasker'); - set_env('environmental', Kiri::TASK); - } - } + /** + * @param OnBeforeWorkerStart $workerStart + * @return void + */ + public function onWorkerNameAlias(OnBeforeWorkerStart $workerStart): void + { + if ($workerStart->workerId < $workerStart->server->setting['worker_num']) { + $this->processName($workerStart->server, 'Worker'); + set_env('environmental', Kiri::WORKER); + } else { + $this->processName($workerStart->server, 'Tasker'); + set_env('environmental', Kiri::TASK); + } + set_env('environmental_worker_id', $workerStart->workerId); + } - /** - * @param Server $server - * @param int $workerId - * @return void - * @throws - */ - public function onWorkerStart(Server $server, int $workerId): void - { - try { - $this->dispatch->dispatch(new OnBeforeWorkerStart($server, $workerId)); - if ($workerId < $server->setting['worker_num']) { - CoordinatorManager::utility(Coordinator::WORKER_START)->waite(); + /** + * @param Server $server + * @param int $workerId + * @return void + * @throws + */ + public function onWorkerStart(Server $server, int $workerId): void + { + try { + $this->dispatch->dispatch(new OnBeforeWorkerStart($server, $workerId)); + if ($workerId < $server->setting['worker_num']) { + CoordinatorManager::utility(Coordinator::WORKER_START)->waite(); - $this->dispatch->dispatch(new OnWorkerStart($server, $workerId)); - } else { - $this->dispatch->dispatch(new OnTaskerStart($server, $workerId)); - } - $this->dispatch->dispatch(new OnAfterWorkerStart($server, $workerId)); - } catch (Throwable $exception) { - Kiri::getLogger()->println(throwable($exception)); - } - } + $this->dispatch->dispatch(new OnWorkerStart($server, $workerId)); + } else { + $this->dispatch->dispatch(new OnTaskerStart($server, $workerId)); + } + } catch (Throwable $exception) { + \Kiri::getLogger()->json_log($exception); + } finally { + $this->dispatch->dispatch(new OnAfterWorkerStart($server, $workerId)); + } + } - /** - * @param Server $server - * @param string $prefix - * @return void - */ - protected function processName(Server $server, string $prefix): void - { - Kiri::setProcessName(sprintf($prefix . '[%d]', $server->worker_pid)); - } + /** + * @param Server $server + * @param string $prefix + * @return void + */ + protected function processName(Server $server, string $prefix): void + { + Kiri::setProcessName(sprintf($prefix . '[%d]', $server->worker_pid)); + } - /** - * @param Server $server - * @param int $workerId - * @throws - */ - public function onWorkerStop(Server $server, int $workerId): void - { - event(new OnWorkerStop($server, $workerId)); - Timer::clearAll(); - } + /** + * @param Server $server + * @param int $workerId + * @throws + */ + public function onWorkerStop(Server $server, int $workerId): void + { + event(new OnWorkerStop($server, $workerId)); + Timer::clearAll(); + } - /** - * @param Server $server - * @param int $workerId - * @throws - */ - public function onWorkerExit(Server $server, int $workerId): void - { - event(new OnWorkerExit($server, $workerId)); - } + /** + * @param Server $server + * @param int $workerId + * @throws + */ + public function onWorkerExit(Server $server, int $workerId): void + { + event(new OnWorkerExit($server, $workerId)); + } - /** - * @param Server $server - * @param int $worker_id - * @param int $worker_pid - * @param int $exit_code - * @param int $signal - * @throws - */ - public function onWorkerError(Server $server, int $worker_id, int $worker_pid, int $exit_code, int $signal): void - { - event(new OnWorkerError($server, $worker_id, $worker_pid, $exit_code, $signal)); + /** + * @param Server $server + * @param int $worker_id + * @param int $worker_pid + * @param int $exit_code + * @param int $signal + * @throws + */ + public function onWorkerError(Server $server, int $worker_id, int $worker_pid, int $exit_code, int $signal): void + { + event(new OnWorkerError($server, $worker_id, $worker_pid, $exit_code, $signal)); - debug_print_backtrace(DEBUG_BACKTRACE_PROVIDE_OBJECT); + debug_print_backtrace(DEBUG_BACKTRACE_PROVIDE_OBJECT); - /** @var RequestInterface $context */ - $context = Kiri\Di\Context::get(RequestInterface::class); - if (is_null($context)) { - $message = sprintf('Worker#%d::%d error stop. signal %d, exit_code %d, msg %s', $worker_id, $worker_pid, $signal, $exit_code, swoole_strerror(swoole_last_error(), $signal)); - } else { - $message = sprintf('Worker#%d::%d error stop. signal %d, exit_code %d, msg %s, method: %s, path: %s, query: %s', $worker_id, $worker_pid, $signal, $exit_code, swoole_strerror(swoole_last_error(), $signal), - $context->getMethod(), $context->getUri()->getPath(), $context->getUri()->getQuery()); - } - error($message . PHP_EOL); - - $this->system_mail($message); - } + /** @var RequestInterface $context */ + $context = Kiri\Di\Context::get(RequestInterface::class); + if (is_null($context)) { + $message = sprintf('Worker#%d::%d error stop. signal %d, exit_code %d, msg %s', $worker_id, $worker_pid, $signal, $exit_code, swoole_strerror(swoole_last_error(), $signal)); + } else { + $message = sprintf('Worker#%d::%d error stop. signal %d, exit_code %d, msg %s, method: %s, path: %s, query: %s', $worker_id, $worker_pid, $signal, $exit_code, swoole_strerror(swoole_last_error(), $signal), + $context->getMethod(), $context->getUri()->getPath(), $context->getUri()->getQuery()); + } + $this->getLogger()->println($message); + $this->system_mail($message); + } - /** - * @param $messageContent - * @throws - */ - protected function system_mail($messageContent): void - { - try { - $email = config('email', ['enable' => false]); - if (!empty($email) && ($email['enable'] ?? false)) { - Help::sendEmail($email, 'Service Error', $messageContent); - } - } catch (Throwable $e) { - error($e, ['email']); - } - } + /** + * @param $messageContent + * @throws + */ + protected function system_mail($messageContent): void + { + try { + $email = config('email', ['enable' => false]); + if (!empty($email) && ($email['enable'] ?? false)) { + Help::sendEmail($email, 'Service Error', $messageContent); + } + } catch (Throwable $e) { + \Kiri::getLogger()->json_log($e); + } + } } diff --git a/Task/Task.php b/Task/Task.php index f2161f1..0f01ba4 100644 --- a/Task/Task.php +++ b/Task/Task.php @@ -82,7 +82,9 @@ class Task $response = call_user_func([$handler, 'process'], $task_id, $src_worker_id); } catch (\Throwable $throwable) { - $response = throwable($throwable); + \Kiri::getLogger()->json_log($throwable, ['task_id' => $task_id, 'src_worker_id' => $src_worker_id, 'data' => $data]); + + $response = throwable($throwable); } finally { $server->finish($response); }