From 550197de96689b91dab3f9cf4e3131b41d166fbe Mon Sep 17 00:00:00 2001 From: as2252258 Date: Sun, 9 Jan 2022 03:49:02 +0800 Subject: [PATCH] e --- Abstracts/BaseProcess.php | 222 ++++---- Abstracts/Server.php | 110 ++-- Constant.php | 266 ++++----- Contract/OnBeforeShutdown.php | 16 +- Contract/OnCloseInterface.php | 42 +- Contract/OnConnectInterface.php | 36 +- Contract/OnDisconnectInterface.php | 36 +- Contract/OnHandshakeInterface.php | 44 +- Contract/OnMessageInterface.php | 36 +- Contract/OnOpenInterface.php | 32 +- Contract/OnPacketInterface.php | 38 +- Contract/OnPipeMessageInterface.php | 42 +- Contract/OnProcessInterface.php | 50 +- Contract/OnReceiveInterface.php | 50 +- Contract/OnTaskInterface.php | 34 +- Events/OnAfterReload.php | 36 +- Events/OnAfterWorkerStart.php | 16 +- Events/OnBeforeReload.php | 36 +- Events/OnBeforeShutdown.php | 36 +- Events/OnBeforeWorkerStart.php | 24 +- Events/OnManagerStart.php | 32 +- Events/OnManagerStop.php | 36 +- Events/OnServerBeforeStart.php | 16 +- Events/OnShutdown.php | 36 +- Events/OnStart.php | 36 +- Events/OnTaskerStart.php | 46 +- Events/OnWorkerError.php | 52 +- Events/OnWorkerExit.php | 46 +- Events/OnWorkerStart.php | 46 +- Events/OnWorkerStop.php | 46 +- Handler/OnPipeMessage.php | 76 +-- Handler/OnServer.php | 124 ++--- Handler/OnServerManager.php | 100 ++-- Handler/OnServerReload.php | 90 +-- Handler/OnServerWorker.php | 242 ++++---- Server.php | 292 +++++----- ServerCommand.php | 240 ++++---- ServerManager.php | 824 ++++++++++++++-------------- ServerProviders.php | 68 +-- State.php | 112 ++-- SwooleServerInterface.php | 28 +- Tasker/AsyncTaskExecute.php | 184 +++---- Tasker/OnServerTask.php | 188 +++---- TraitServer.php | 64 +-- 44 files changed, 2113 insertions(+), 2113 deletions(-) diff --git a/Abstracts/BaseProcess.php b/Abstracts/BaseProcess.php index e4e8e34..34994be 100644 --- a/Abstracts/BaseProcess.php +++ b/Abstracts/BaseProcess.php @@ -1,111 +1,111 @@ -name; - } - - - /** - * @return bool - */ - public function isStop(): bool - { - return $this->isStop; - } - - /** - * @return bool - */ - public function getRedirectStdinAndStdout(): bool - { - return $this->redirect_stdin_and_stdout; - } - - /** - * @return int - */ - public function getPipeType(): int - { - return $this->pipe_type; - } - - /** - * @return bool - */ - public function isEnableCoroutine(): bool - { - return $this->enable_coroutine; - } - - - /** - * - */ - public function onProcessStop(): void - { - $this->isStop = true; - } - - - /** - * - */ - public function onSigterm(): static - { - if (!Context::inCoroutine()) { - Process::signal(SIGTERM, fn($data) => $this->onShutdown($data)); - } else { - Coroutine::create(function () { - $data = Coroutine::waitSignal(SIGTERM, -1); - if ($data) { - $this->onShutdown($data); - } - }); - } - return $this; - } - - - /** - * @param $data - */ - protected function onShutdown($data): void - { - $this->isStop = true; - } - - -} +name; + } + + + /** + * @return bool + */ + public function isStop(): bool + { + return $this->isStop; + } + + /** + * @return bool + */ + public function getRedirectStdinAndStdout(): bool + { + return $this->redirect_stdin_and_stdout; + } + + /** + * @return int + */ + public function getPipeType(): int + { + return $this->pipe_type; + } + + /** + * @return bool + */ + public function isEnableCoroutine(): bool + { + return $this->enable_coroutine; + } + + + /** + * + */ + public function onProcessStop(): void + { + $this->isStop = true; + } + + + /** + * + */ + public function onSigterm(): static + { + if (!Context::inCoroutine()) { + Process::signal(SIGTERM, fn($data) => $this->onShutdown($data)); + } else { + Coroutine::create(function () { + $data = Coroutine::waitSignal(SIGTERM, -1); + if ($data) { + $this->onShutdown($data); + } + }); + } + return $this; + } + + + /** + * @param $data + */ + protected function onShutdown($data): void + { + $this->isStop = true; + } + + +} diff --git a/Abstracts/Server.php b/Abstracts/Server.php index b972d58..dfba1f2 100644 --- a/Abstracts/Server.php +++ b/Abstracts/Server.php @@ -1,55 +1,55 @@ -isMac()) { - return; - } - $name = '[' . Config::get('id', 'system-service') . ']'; - if (!empty($prefix)) { - $name .= '.' . $prefix; - } - swoole_set_process_name($name); - } - - - /** - * Server constructor. - * @throws Exception - */ - public function __construct() - { - } - -} +isMac()) { + return; + } + $name = '[' . Config::get('id', 'system-service') . ']'; + if (!empty($prefix)) { + $name .= '.' . $prefix; + } + swoole_set_process_name($name); + } + + + /** + * Server constructor. + * @throws Exception + */ + public function __construct() + { + } + +} diff --git a/Constant.php b/Constant.php index 0a67242..eea519f 100644 --- a/Constant.php +++ b/Constant.php @@ -1,133 +1,133 @@ -HTTP 404 Not Found
Powered by Swoole'; - const STATUS_405_MESSAGE = '

HTTP 405 Method allow


Powered by Swoole'; - - - const OPTION_REACTOR_NUM = 'reactor_num'; - const OPTION_WORKER_NUM = 'worker_num'; - const OPTION_MAX_REQUEST = 'max_request'; - const OPTION_MAX_CONN = 'max_connection'; - const OPTION_TASK_WORKER_NUM = 'task_worker_num'; - const OPTION_TASK_IPC_MODE = 'task_ipc_mode'; - const OPTION_TASK_MAX_REQUEST = 'task_max_request'; - const OPTION_TASK_TMPDIR = 'task_tmpdir'; - const OPTION_TASK_ENABLE_COROUTINE = 'task_enable_coroutine'; - const OPTION_TASK_USE_OBJECT = 'task_use_object'; - const OPTION_DISPATCH_MODE = 'dispatch_mode'; - const OPTION_DISPATCH_FUNC = 'dispatch_func'; - const OPTION_MESSAGE_QUEUE_KEY = 'message_queue_key'; - const OPTION_DAEMONIZE = 'daemonize'; - const OPTION_BACKLOG = 'backlog'; - const OPTION_LOG_FILE = 'log_file'; - const OPTION_LOG_LEVEL = 'log_level'; - const OPTION_LOG_DATE_WITH_MICROSECONDS = 'log_date_with_microseconds'; - const OPTION_LOG_ROTATION = 'log_rotation'; - const OPTION_LOG_DATE_FORMAT = 'log_date_format'; - const OPTION_OPEN_TCP_KEEPALIVE = 'open_tcp_keepalive'; - const OPTION_HEARTBEAT_CHECK_INTERVAL = 'heartbeat_check_interval'; - const OPTION_HEARTBEAT_IDLE_TIME = 'heartbeat_idle_time'; - const OPTION_OPEN_EOF_CHECK = 'open_eof_check'; - const OPTION_OPEN_EOF_SPLIT = 'open_eof_split'; - const OPTION_PACKAGE_EOF = 'package_eof'; - const OPTION_OPEN_LENGTH_CHECK = 'open_length_check'; - const OPTION_PACKAGE_LENGTH_TYPE = 'package_length_type'; - const OPTION_PACKAGE_LENGTH_FUNC = 'package_length_func'; - const OPTION_PACKAGE_MAX_LENGTH = 'package_max_length'; - const OPTION_OPEN_HTTP_PROTOCOL = 'open_http_protocol'; - const OPTION_OPEN_MQTT_PROTOCOL = 'open_mqtt_protocol'; - const OPTION_OPEN_REDIS_PROTOCOL = 'open_redis_protocol'; - const OPTION_OPEN_WEBSOCKET_PROTOCOL = 'open_websocket_protocol'; - const OPTION_OPEN_WEBSOCKET_CLOSE_FRAME = 'open_websocket_close_frame'; - const OPTION_OPEN_TCP_NODELAY = 'open_tcp_nodelay'; - const OPTION_OPEN_CPU_AFFINITY = 'open_cpu_affinity'; - const OPTION_CPU_AFFINITY_IGNORE = 'cpu_affinity_ignore'; - const OPTION_TCP_DEFER_ACCEPT = 'tcp_defer_accept'; - const OPTION_SSL_CERT_FILE = 'ssl_cert_file'; - const OPTION_SSL_KEY_FILE = 'ssl_key_file'; - const OPTION_SSL_METHOD = 'ssl_method'; - const OPTION_SSL_PROTOCOLS = 'ssl_protocols'; - const OPTION_SSL_SNI_CERTS = 'ssl_sni_certs'; - const OPTION_SSL_CIPHERS = 'ssl_ciphers'; - const OPTION_SSL_VERIFY_PEER = 'ssl_verify_peer'; - const OPTION_SSL_ALLOW_SELF_SIGNED = 'ssl_allow_self_signed'; - const OPTION_SSL_CLIENT_CERT_FILE = 'ssl_client_cert_file'; - const OPTION_SSL_COMPRESS = 'ssl_compress'; - const OPTION_SSL_VERIFY_DEPTH = 'ssl_verify_depth'; - const OPTION_SSL_PREFER_SERVER_CIPHERS = 'ssl_prefer_server_ciphers'; - const OPTION_SSL_DHPARAM = 'ssl_dhparam'; - const OPTION_SSL_ECDH_CURVE = 'ssl_ecdh_curve'; - const OPTION_USER = 'user'; - const OPTION_GROUP = 'group'; - const OPTION_CHROOT = 'chroot'; - const OPTION_PID_FILE = 'pid_file'; - const OPTION_BUFFER_INPUT_SIZE = 'buffer_input_size'; - const OPTION_BUFFER_OUTPUT_SIZE = 'buffer_output_size'; - const OPTION_SOCKET_BUFFER_SIZE = 'socket_buffer_size'; - const OPTION_ENABLE_UNSAFE_EVENT = 'enable_unsafe_event'; - const OPTION_DISCARD_TIMEOUT_REQUEST = 'discard_timeout_request'; - const OPTION_ENABLE_REUSE_PORT = 'enable_reuse_port'; - const OPTION_ENABLE_DELAY_RECEIVE = 'enable_delay_receive'; - const OPTION_RELOAD_ASYNC = 'reload_async'; - const OPTION_MAX_WAIT_TIME = 'max_wait_time'; - const OPTION_TCP_FASTOPEN = 'tcp_fastopen'; - const OPTION_REQUEST_SLOWLOG_FILE = 'request_slowlog_file'; - const OPTION_ENABLE_COROUTINE = 'enable_coroutine'; - const OPTION_MAX_COROUTINE = 'max_coroutine'; - const OPTION_SEND_YIELD = 'send_yield'; - const OPTION_SEND_TIMEOUT = 'send_timeout'; - const OPTION_HOOK_FLAGS = 'hook_flags'; - const OPTION_BUFFER_HIGH_WATERMARK = 'buffer_high_watermark'; - const OPTION_BUFFER_LOW_WATERMARK = 'buffer_low_watermark'; - const OPTION_TCP_USER_TIMEOUT = 'tcp_user_timeout'; - const OPTION_STATS_FILE = 'stats_file'; - const OPTION_EVENT_OBJECT = 'event_object'; - const OPTION_START_SESSION_ID = 'start_session_id'; - const OPTION_SINGLE_THREAD = 'single_thread'; - const OPTION_MAX_QUEUED_BYTES = 'max_queued_bytes'; - - -} +HTTP 404 Not Found
Powered by Swoole'; + const STATUS_405_MESSAGE = '

HTTP 405 Method allow


Powered by Swoole'; + + + const OPTION_REACTOR_NUM = 'reactor_num'; + const OPTION_WORKER_NUM = 'worker_num'; + const OPTION_MAX_REQUEST = 'max_request'; + const OPTION_MAX_CONN = 'max_connection'; + const OPTION_TASK_WORKER_NUM = 'task_worker_num'; + const OPTION_TASK_IPC_MODE = 'task_ipc_mode'; + const OPTION_TASK_MAX_REQUEST = 'task_max_request'; + const OPTION_TASK_TMPDIR = 'task_tmpdir'; + const OPTION_TASK_ENABLE_COROUTINE = 'task_enable_coroutine'; + const OPTION_TASK_USE_OBJECT = 'task_use_object'; + const OPTION_DISPATCH_MODE = 'dispatch_mode'; + const OPTION_DISPATCH_FUNC = 'dispatch_func'; + const OPTION_MESSAGE_QUEUE_KEY = 'message_queue_key'; + const OPTION_DAEMONIZE = 'daemonize'; + const OPTION_BACKLOG = 'backlog'; + const OPTION_LOG_FILE = 'log_file'; + const OPTION_LOG_LEVEL = 'log_level'; + const OPTION_LOG_DATE_WITH_MICROSECONDS = 'log_date_with_microseconds'; + const OPTION_LOG_ROTATION = 'log_rotation'; + const OPTION_LOG_DATE_FORMAT = 'log_date_format'; + const OPTION_OPEN_TCP_KEEPALIVE = 'open_tcp_keepalive'; + const OPTION_HEARTBEAT_CHECK_INTERVAL = 'heartbeat_check_interval'; + const OPTION_HEARTBEAT_IDLE_TIME = 'heartbeat_idle_time'; + const OPTION_OPEN_EOF_CHECK = 'open_eof_check'; + const OPTION_OPEN_EOF_SPLIT = 'open_eof_split'; + const OPTION_PACKAGE_EOF = 'package_eof'; + const OPTION_OPEN_LENGTH_CHECK = 'open_length_check'; + const OPTION_PACKAGE_LENGTH_TYPE = 'package_length_type'; + const OPTION_PACKAGE_LENGTH_FUNC = 'package_length_func'; + const OPTION_PACKAGE_MAX_LENGTH = 'package_max_length'; + const OPTION_OPEN_HTTP_PROTOCOL = 'open_http_protocol'; + const OPTION_OPEN_MQTT_PROTOCOL = 'open_mqtt_protocol'; + const OPTION_OPEN_REDIS_PROTOCOL = 'open_redis_protocol'; + const OPTION_OPEN_WEBSOCKET_PROTOCOL = 'open_websocket_protocol'; + const OPTION_OPEN_WEBSOCKET_CLOSE_FRAME = 'open_websocket_close_frame'; + const OPTION_OPEN_TCP_NODELAY = 'open_tcp_nodelay'; + const OPTION_OPEN_CPU_AFFINITY = 'open_cpu_affinity'; + const OPTION_CPU_AFFINITY_IGNORE = 'cpu_affinity_ignore'; + const OPTION_TCP_DEFER_ACCEPT = 'tcp_defer_accept'; + const OPTION_SSL_CERT_FILE = 'ssl_cert_file'; + const OPTION_SSL_KEY_FILE = 'ssl_key_file'; + const OPTION_SSL_METHOD = 'ssl_method'; + const OPTION_SSL_PROTOCOLS = 'ssl_protocols'; + const OPTION_SSL_SNI_CERTS = 'ssl_sni_certs'; + const OPTION_SSL_CIPHERS = 'ssl_ciphers'; + const OPTION_SSL_VERIFY_PEER = 'ssl_verify_peer'; + const OPTION_SSL_ALLOW_SELF_SIGNED = 'ssl_allow_self_signed'; + const OPTION_SSL_CLIENT_CERT_FILE = 'ssl_client_cert_file'; + const OPTION_SSL_COMPRESS = 'ssl_compress'; + const OPTION_SSL_VERIFY_DEPTH = 'ssl_verify_depth'; + const OPTION_SSL_PREFER_SERVER_CIPHERS = 'ssl_prefer_server_ciphers'; + const OPTION_SSL_DHPARAM = 'ssl_dhparam'; + const OPTION_SSL_ECDH_CURVE = 'ssl_ecdh_curve'; + const OPTION_USER = 'user'; + const OPTION_GROUP = 'group'; + const OPTION_CHROOT = 'chroot'; + const OPTION_PID_FILE = 'pid_file'; + const OPTION_BUFFER_INPUT_SIZE = 'buffer_input_size'; + const OPTION_BUFFER_OUTPUT_SIZE = 'buffer_output_size'; + const OPTION_SOCKET_BUFFER_SIZE = 'socket_buffer_size'; + const OPTION_ENABLE_UNSAFE_EVENT = 'enable_unsafe_event'; + const OPTION_DISCARD_TIMEOUT_REQUEST = 'discard_timeout_request'; + const OPTION_ENABLE_REUSE_PORT = 'enable_reuse_port'; + const OPTION_ENABLE_DELAY_RECEIVE = 'enable_delay_receive'; + const OPTION_RELOAD_ASYNC = 'reload_async'; + const OPTION_MAX_WAIT_TIME = 'max_wait_time'; + const OPTION_TCP_FASTOPEN = 'tcp_fastopen'; + const OPTION_REQUEST_SLOWLOG_FILE = 'request_slowlog_file'; + const OPTION_ENABLE_COROUTINE = 'enable_coroutine'; + const OPTION_MAX_COROUTINE = 'max_coroutine'; + const OPTION_SEND_YIELD = 'send_yield'; + const OPTION_SEND_TIMEOUT = 'send_timeout'; + const OPTION_HOOK_FLAGS = 'hook_flags'; + const OPTION_BUFFER_HIGH_WATERMARK = 'buffer_high_watermark'; + const OPTION_BUFFER_LOW_WATERMARK = 'buffer_low_watermark'; + const OPTION_TCP_USER_TIMEOUT = 'tcp_user_timeout'; + const OPTION_STATS_FILE = 'stats_file'; + const OPTION_EVENT_OBJECT = 'event_object'; + const OPTION_START_SESSION_ID = 'start_session_id'; + const OPTION_SINGLE_THREAD = 'single_thread'; + const OPTION_MAX_QUEUED_BYTES = 'max_queued_bytes'; + + +} diff --git a/Contract/OnBeforeShutdown.php b/Contract/OnBeforeShutdown.php index e5397e5..67f1b66 100644 --- a/Contract/OnBeforeShutdown.php +++ b/Contract/OnBeforeShutdown.php @@ -1,8 +1,8 @@ -setProcessName(sprintf('start[%d].server', $server->master_pid)); - - $this->eventDispatch->dispatch(new OnStart($server)); - } - - - /** - * @param \Swoole\Server $server - * @throws ReflectionException - */ - public function onBeforeShutdown(\Swoole\Server $server) - { - $this->eventDispatch->dispatch(new OnBeforeShutdown($server)); - } - - - /** - * @param \Swoole\Server $server - * @throws ReflectionException - */ - public function onShutdown(\Swoole\Server $server) - { - $this->eventDispatch->dispatch(new OnShutdown($server)); - } - - -} +setProcessName(sprintf('start[%d].server', $server->master_pid)); + + $this->eventDispatch->dispatch(new OnStart($server)); + } + + + /** + * @param \Swoole\Server $server + * @throws ReflectionException + */ + public function onBeforeShutdown(\Swoole\Server $server) + { + $this->eventDispatch->dispatch(new OnBeforeShutdown($server)); + } + + + /** + * @param \Swoole\Server $server + * @throws ReflectionException + */ + public function onShutdown(\Swoole\Server $server) + { + $this->eventDispatch->dispatch(new OnShutdown($server)); + } + + +} diff --git a/Handler/OnServerManager.php b/Handler/OnServerManager.php index 1d4b028..3ac1145 100644 --- a/Handler/OnServerManager.php +++ b/Handler/OnServerManager.php @@ -1,50 +1,50 @@ -setProcessName(sprintf('manger[%d].0', $server->manager_pid)); - - $this->eventDispatch->dispatch(new OnManagerStart($server)); - } - - - /** - * @param \Swoole\Server $server - * @throws ReflectionException - */ - public function onManagerStop(\Swoole\Server $server) - { - $this->eventDispatch->dispatch(new OnManagerStop($server)); - } - - -} +setProcessName(sprintf('manger[%d].0', $server->manager_pid)); + + $this->eventDispatch->dispatch(new OnManagerStart($server)); + } + + + /** + * @param \Swoole\Server $server + * @throws ReflectionException + */ + public function onManagerStop(\Swoole\Server $server) + { + $this->eventDispatch->dispatch(new OnManagerStop($server)); + } + + +} diff --git a/Handler/OnServerReload.php b/Handler/OnServerReload.php index 1d1d015..631df7b 100644 --- a/Handler/OnServerReload.php +++ b/Handler/OnServerReload.php @@ -1,45 +1,45 @@ -eventDispatch->dispatch(new OnBeforeReload($server)); - } - - - /** - * @param Server $server - * @throws \ReflectionException - */ - public function onAfterReload(Server $server) - { - $this->eventDispatch->dispatch(new OnAfterReload($server)); - } - -} +eventDispatch->dispatch(new OnBeforeReload($server)); + } + + + /** + * @param Server $server + * @throws \ReflectionException + */ + public function onAfterReload(Server $server) + { + $this->eventDispatch->dispatch(new OnAfterReload($server)); + } + +} diff --git a/Handler/OnServerWorker.php b/Handler/OnServerWorker.php index 5d8ce45..ed4f2ad 100644 --- a/Handler/OnServerWorker.php +++ b/Handler/OnServerWorker.php @@ -1,121 +1,121 @@ -eventDispatch->dispatch(new OnBeforeWorkerStart($workerId)); - if ($workerId < $server->setting['worker_num']) { - $this->eventDispatch->dispatch(new OnWorkerStart($server, $workerId)); - $this->setProcessName(sprintf('Worker[%d].%d', $server->worker_pid, $workerId)); - set_env('environmental', Kiri::WORKER); - } else { - $this->eventDispatch->dispatch(new OnTaskStart($server, $workerId)); - $this->setProcessName(sprintf('Tasker[%d].%d', $server->worker_pid, $workerId)); - set_env('environmental', Kiri::TASK); - } - $this->eventDispatch->dispatch(new OnAfterWorkerStart()); - } - - - /** - * @param Server $server - * @param int $workerId - * @throws Exception - */ - public function onWorkerStop(Server $server, int $workerId) - { - Timer::clearAll(); - $this->eventDispatch->dispatch(new OnWorkerStop($server, $workerId)); - } - - - /** - * @param Server $server - * @param int $workerId - * @throws Exception - */ - public function onWorkerExit(Server $server, int $workerId) - { - set_env('state', 'exit'); - - $this->eventDispatch->dispatch(new OnWorkerExit($server, $workerId)); - } - - - /** - * @param Server $server - * @param int $worker_id - * @param int $worker_pid - * @param int $exit_code - * @param int $signal - * @throws Exception - */ - public function onWorkerError(Server $server, int $worker_id, int $worker_pid, int $exit_code, int $signal) - { - $this->eventDispatch->dispatch(new OnWorkerError($server, $worker_id, $worker_pid, $exit_code, $signal)); - - $message = sprintf('Worker#%d::%d error stop. signal %d, exit_code %d, msg %s', - $worker_id, $worker_pid, $signal, $exit_code, swoole_strerror(swoole_last_error(), 9) - ); - - $this->logger->error($message); - - $this->system_mail($message); - } - - - /** - * @param $messageContent - * @throws Exception - */ - protected function system_mail($messageContent) - { - try { - $email = Config::get('email', ['enable' => false]); - if (!empty($email) && ($email['enable'] ?? false) == true) { - Help::sendEmail($email, 'Service Error', $messageContent); - } - } catch (\Throwable $e) { - error($e, 'email'); - } - } - -} +eventDispatch->dispatch(new OnBeforeWorkerStart($workerId)); + if ($workerId < $server->setting['worker_num']) { + $this->eventDispatch->dispatch(new OnWorkerStart($server, $workerId)); + $this->setProcessName(sprintf('Worker[%d].%d', $server->worker_pid, $workerId)); + set_env('environmental', Kiri::WORKER); + } else { + $this->eventDispatch->dispatch(new OnTaskStart($server, $workerId)); + $this->setProcessName(sprintf('Tasker[%d].%d', $server->worker_pid, $workerId)); + set_env('environmental', Kiri::TASK); + } + $this->eventDispatch->dispatch(new OnAfterWorkerStart()); + } + + + /** + * @param Server $server + * @param int $workerId + * @throws Exception + */ + public function onWorkerStop(Server $server, int $workerId) + { + Timer::clearAll(); + $this->eventDispatch->dispatch(new OnWorkerStop($server, $workerId)); + } + + + /** + * @param Server $server + * @param int $workerId + * @throws Exception + */ + public function onWorkerExit(Server $server, int $workerId) + { + set_env('state', 'exit'); + + $this->eventDispatch->dispatch(new OnWorkerExit($server, $workerId)); + } + + + /** + * @param Server $server + * @param int $worker_id + * @param int $worker_pid + * @param int $exit_code + * @param int $signal + * @throws Exception + */ + public function onWorkerError(Server $server, int $worker_id, int $worker_pid, int $exit_code, int $signal) + { + $this->eventDispatch->dispatch(new OnWorkerError($server, $worker_id, $worker_pid, $exit_code, $signal)); + + $message = sprintf('Worker#%d::%d error stop. signal %d, exit_code %d, msg %s', + $worker_id, $worker_pid, $signal, $exit_code, swoole_strerror(swoole_last_error(), 9) + ); + + $this->logger->error($message); + + $this->system_mail($message); + } + + + /** + * @param $messageContent + * @throws Exception + */ + protected function system_mail($messageContent) + { + try { + $email = Config::get('email', ['enable' => false]); + if (!empty($email) && ($email['enable'] ?? false) == true) { + Help::sendEmail($email, 'Service Error', $messageContent); + } + } catch (\Throwable $e) { + error($e, 'email'); + } + } + +} diff --git a/Server.php b/Server.php index 68b0082..1e1eff5 100644 --- a/Server.php +++ b/Server.php @@ -1,146 +1,146 @@ -process[] = $process; - } - - - /** - * @return string - * @throws ConfigException - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - * @throws \ReflectionException - * @throws Exception - */ - public function start(): string - { - $this->manager()->initBaseServer(Config::get('server', [], true), $this->daemon); - - $rpcService = Config::get('rpc', []); - if (!empty($rpcService)) { - $this->manager()->addListener($rpcService['type'], $rpcService['host'], $rpcService['port'], - $rpcService['mode'], $rpcService); - } - - $processes = array_merge($this->process, Config::get('processes', [])); - foreach ($processes as $process) { - $this->manager()->addProcess($process); - } - - return $this->manager()->getServer()->start(); - } - - - /** - * @return void - * @throws ConfigException - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - * @throws \ReflectionException - * @throws Exception - */ - public function shutdown() - { - $configs = Config::get('server', [], true); - foreach ($this->manager()->sortService($configs['ports'] ?? []) as $config) { - $this->state->exit($config['port']); - } - $this->container->get(EventDispatch::class)->dispatch(new OnShutdown()); - } - - - /** - * @return bool - * @throws ConfigException - * @throws Exception - */ - public function isRunner(): bool - { - return $this->state->isRunner(); - } - - - /** - * @param $daemon - * @return Server - */ - public function setDaemon($daemon): static - { - if (!in_array($daemon, [0, 1])) { - return $this; - } - $this->daemon = $daemon; - return $this; - } - - - /** - * @return \Swoole\Http\Server|\Swoole\Server|\Swoole\WebSocket\Server|null - * @throws \ReflectionException - */ - public function getServer(): \Swoole\Http\Server|\Swoole\Server|\Swoole\WebSocket\Server|null - { - return $this->manager()->getServer(); - } - - - /** - * @return ServerManager - */ - private function manager(): ServerManager - { - return Kiri::getDi()->get(ServerManager::class); - } - -} +process[] = $process; + } + + + /** + * @return string + * @throws ConfigException + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + * @throws \ReflectionException + * @throws Exception + */ + public function start(): string + { + $this->manager()->initBaseServer(Config::get('server', [], true), $this->daemon); + + $rpcService = Config::get('rpc', []); + if (!empty($rpcService)) { + $this->manager()->addListener($rpcService['type'], $rpcService['host'], $rpcService['port'], + $rpcService['mode'], $rpcService); + } + + $processes = array_merge($this->process, Config::get('processes', [])); + foreach ($processes as $process) { + $this->manager()->addProcess($process); + } + + return $this->manager()->getServer()->start(); + } + + + /** + * @return void + * @throws ConfigException + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + * @throws \ReflectionException + * @throws Exception + */ + public function shutdown() + { + $configs = Config::get('server', [], true); + foreach ($this->manager()->sortService($configs['ports'] ?? []) as $config) { + $this->state->exit($config['port']); + } + $this->container->get(EventDispatch::class)->dispatch(new OnShutdown()); + } + + + /** + * @return bool + * @throws ConfigException + * @throws Exception + */ + public function isRunner(): bool + { + return $this->state->isRunner(); + } + + + /** + * @param $daemon + * @return Server + */ + public function setDaemon($daemon): static + { + if (!in_array($daemon, [0, 1])) { + return $this; + } + $this->daemon = $daemon; + return $this; + } + + + /** + * @return \Swoole\Http\Server|\Swoole\Server|\Swoole\WebSocket\Server|null + * @throws \ReflectionException + */ + public function getServer(): \Swoole\Http\Server|\Swoole\Server|\Swoole\WebSocket\Server|null + { + return $this->manager()->getServer(); + } + + + /** + * @return ServerManager + */ + private function manager(): ServerManager + { + return Kiri::getDi()->get(ServerManager::class); + } + +} diff --git a/ServerCommand.php b/ServerCommand.php index 52677bc..278ff06 100644 --- a/ServerCommand.php +++ b/ServerCommand.php @@ -1,120 +1,120 @@ -setName('sw:server') - ->setDescription('server start|stop|reload|restart') - ->addArgument('action', InputArgument::OPTIONAL, 'run action', 'start') - ->addOption('daemon', 'd', InputOption::VALUE_OPTIONAL, 'is run daemonize'); - } - - - /** - * @param InputInterface $input - * @param OutputInterface $output - * @return int - * @throws ConfigException - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - * @throws \ReflectionException - * @throws Exception - */ - public function execute(InputInterface $input, OutputInterface $output): int - { - $manager = Kiri::app()->getServer(); - $manager->setDaemon((int)!is_null($input->getOption('daemon'))); - if (is_null($input->getArgument('action'))) { - throw new Exception('I don\'t know what I want to do.'); - } - if (!in_array($input->getArgument('action'), self::ACTIONS)) { - throw new Exception('I don\'t know what I want to do.'); - } - if ($input->getArgument('action') == 'restart') { - $manager->shutdown(); - } - if ($input->getArgument('action') != 'stop') { - return $this->generate_runtime_builder($manager); - } - $manager->shutdown(); - return 0; - } - - - /** - * @throws ConfigException - */ - private function configure_set() - { - $enable_coroutine = Config::get('servers.settings.enable_coroutine', false); - Config::set('servers.settings.enable_coroutine', true); - if ($enable_coroutine != true) { - return; - } - Coroutine::set([ - 'hook_flags' => SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_BLOCKING_FUNCTION, - 'enable_deadlock_check' => FALSE, - 'exit_condition' => function () { - return Coroutine::stats()['coroutine_num'] === 0; - } - ]); - } - - - /** - * @param $manager - * @return int - * @throws ConfigException - * @throws Exception - */ - private function generate_runtime_builder($manager): int - { - $this->configure_set(); - - Kiri::app()->getRouter()->read_files(); - - $manager->start(); - - return 1; - } - -} +setName('sw:server') + ->setDescription('server start|stop|reload|restart') + ->addArgument('action', InputArgument::OPTIONAL, 'run action', 'start') + ->addOption('daemon', 'd', InputOption::VALUE_OPTIONAL, 'is run daemonize'); + } + + + /** + * @param InputInterface $input + * @param OutputInterface $output + * @return int + * @throws ConfigException + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + * @throws \ReflectionException + * @throws Exception + */ + public function execute(InputInterface $input, OutputInterface $output): int + { + $manager = Kiri::app()->getServer(); + $manager->setDaemon((int)!is_null($input->getOption('daemon'))); + if (is_null($input->getArgument('action'))) { + throw new Exception('I don\'t know what I want to do.'); + } + if (!in_array($input->getArgument('action'), self::ACTIONS)) { + throw new Exception('I don\'t know what I want to do.'); + } + if ($input->getArgument('action') == 'restart') { + $manager->shutdown(); + } + if ($input->getArgument('action') != 'stop') { + return $this->generate_runtime_builder($manager); + } + $manager->shutdown(); + return 0; + } + + + /** + * @throws ConfigException + */ + private function configure_set() + { + $enable_coroutine = Config::get('servers.settings.enable_coroutine', false); + Config::set('servers.settings.enable_coroutine', true); + if ($enable_coroutine != true) { + return; + } + Coroutine::set([ + 'hook_flags' => SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_BLOCKING_FUNCTION, + 'enable_deadlock_check' => FALSE, + 'exit_condition' => function () { + return Coroutine::stats()['coroutine_num'] === 0; + } + ]); + } + + + /** + * @param $manager + * @return int + * @throws ConfigException + * @throws Exception + */ + private function generate_runtime_builder($manager): int + { + $this->configure_set(); + + Kiri::app()->getRouter()->read_files(); + + $manager->start(); + + return 1; + } + +} diff --git a/ServerManager.php b/ServerManager.php index 5cb9372..16f096c 100644 --- a/ServerManager.php +++ b/ServerManager.php @@ -1,412 +1,412 @@ - */ - public array $ports = []; - - public int $mode = SWOOLE_TCP; - - - private Server|null $server = null; - - - protected array $initProcesses = []; - - - const DEFAULT_EVENT = [ - Constant::WORKER_START => [OnServerWorker::class, 'onWorkerStart'], - Constant::WORKER_EXIT => [OnServerWorker::class, 'onWorkerExit'], - Constant::WORKER_STOP => [OnServerWorker::class, 'onWorkerStop'], - Constant::WORKER_ERROR => [OnServerWorker::class, 'onWorkerError'], - Constant::MANAGER_START => [OnServerManager::class, 'onManagerStart'], - Constant::MANAGER_STOP => [OnServerManager::class, 'onManagerStop'], - Constant::BEFORE_RELOAD => [OnServerReload::class, 'onBeforeReload'], - Constant::AFTER_RELOAD => [OnServerReload::class, 'onAfterReload'], - Constant::START => [OnServer::class, 'onStart'], - Constant::BEFORE_SHUTDOWN => [OnServer::class, 'onBeforeShutdown'], - Constant::SHUTDOWN => [OnServer::class, 'onShutdown'], - ]; - - - private array $eventInterface = [ - OnReceiveInterface::class => 'receive', - OnPacketInterface::class => 'packet', - OnHandshakeInterface::class => 'handshake', - OnMessageInterface::class => 'message', - OnConnectInterface::class => 'connect', - OnCloseInterface::class => 'close', - OnDisconnectInterface::class => 'disconnect' - ]; - - - /** - * @return Server|WServer|HServer|null - * @throws ReflectionException - */ - public function getServer(): Server|WServer|HServer|null - { - di(EventDispatch::class)->dispatch(new OnServerBeforeStart()); - return $this->server; - } - - - /** - * @param string $type - * @param string $host - * @param int $port - * @param int $mode - * @param array $settings - * @throws ConfigException - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - */ - public function addListener(string $type, string $host, int $port, int $mode, array $settings = []) - { - if (!$this->server) { - $this->createBaseServer($type, $host, $port, $mode, $settings); - } else { - if (!isset($settings['settings'])) { - $settings['settings'] = []; - } - $this->addNewListener($type, $host, $port, $mode, $settings); - } - } - - - /** - * @param $configs - * @param int $daemon - * @throws ConfigException - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - */ - public function initBaseServer($configs, int $daemon = 0): void - { - $context = di(ServerManager::class); - foreach ($this->sortService($configs['ports']) as $config) { - $this->startListenerHandler($context, $config, $daemon); - } - $this->bindCallback([Constant::PIPE_MESSAGE => [OnPipeMessage::class, 'onPipeMessage']]); - } - - - /** - * @param string|OnProcessInterface|BaseProcess $customProcess - * @throws Exception - */ - public function addProcess(string|OnProcessInterface|BaseProcess $customProcess) - { - if (is_string($customProcess)) { - $customProcess = Kiri::getDi()->get($customProcess); - } - $system = sprintf('[%s].process', Config::get('id', 'system-service')); - $this->logger->debug($system . ' ' . $customProcess->getName() . ' start.'); - $this->server->addProcess(new Process(function (Process $process) use ($customProcess, $system) { - if (Kiri::getPlatform()->isLinux()) { - $process->name($system . '(' . $customProcess->getName() . ')'); - } - $customProcess->onSigterm()->process($process); - }, - $customProcess->getRedirectStdinAndStdout(), - $customProcess->getPipeType(), - $customProcess->isEnableCoroutine() - )); - } - - - /** - * @return array - */ - public function getProcesses(): array - { - return $this->initProcesses; - } - - - /** - * @return array - */ - public function getSetting(): array - { - return $this->server->setting; - } - - - /** - * @param ServerManager $context - * @param array $config - * @param int $daemon - * @throws ConfigException - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - * @throws Exception - */ - private function startListenerHandler(ServerManager $context, array $config, int $daemon = 0) - { - if (!$this->server) { - $config = $this->mergeConfig($config, $daemon); - } - $context->addListener( - $config['type'], $config['host'], $config['port'], $config['mode'], - $config); - } - - - /** - * @param $config - * @param $daemon - * @return array - * @throws Exception - */ - private function mergeConfig($config, $daemon): array - { - $config['settings'] = $config['settings'] ?? []; - $config['settings']['daemonize'] = $daemon; - if (!isset($config['settings']['log_file'])) { - $config['settings']['log_file'] = storage('system.log'); - } - $config['settings']['pid_file'] = storage('.swoole.pid'); - $config['settings'][Constant::OPTION_ENABLE_REUSE_PORT] = true; - $config['events'] = $config['events'] ?? []; - return $config; - } - - - /** - * @param string $type - * @param string $host - * @param int $port - * @param int $mode - * @param array $settings - * @throws ConfigException - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - * @throws Exception - */ - private function addNewListener(string $type, string $host, int $port, int $mode, array $settings = []) - { - $id = Config::get('id', 'system-service'); - - $this->logger->debug(sprintf('[%s].' . $type . ' service %s::%d start', $id, $host, $port)); - - /** @var Server\Port $service */ - $this->ports[$port] = $this->server->addlistener($host, $port, $mode); - if ($this->ports[$port] === false) { - throw new Exception("The port is already in use[$host::$port]"); - } - if ($type == Constant::SERVER_TYPE_HTTP && !isset($settings['settings']['open_http_protocol'])) { - $settings['settings']['open_http_protocol'] = true; - if (in_array($this->server->setting['dispatch_mode'], [2, 4])) { - $settings['settings']['open_http2_protocol'] = true; - } - } - if ($type == Constant::SERVER_TYPE_WEBSOCKET && !isset($settings['settings']['open_websocket_protocol'])) { - $settings['settings']['open_websocket_protocol'] = true; - } - $this->ports[$port]->set($settings['settings'] ?? []); - $this->addServiceEvents($settings['events'] ?? [], $this->ports[$port]); - } - - - /** - * @param string $type - * @param string $host - * @param int $port - * @param int $mode - * @param array $settings - * @throws ConfigException - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - * @throws Exception - */ - private function createBaseServer(string $type, string $host, int $port, int $mode, array $settings = []) - { - $match = match ($type) { - Constant::SERVER_TYPE_BASE, Constant::SERVER_TYPE_TCP, - Constant::SERVER_TYPE_UDP => Server::class, - Constant::SERVER_TYPE_HTTP => HServer::class, - Constant::SERVER_TYPE_WEBSOCKET => WServer::class - }; - $this->server = new $match($host, $port, SWOOLE_PROCESS, $mode); - $this->server->set(array_merge(Config::get('server.settings', []), $settings['settings'])); - - $data = new Table($this->container->get(OutputInterface::class)); - $data->setHeaders(['key', 'value']); - - $array = []; - foreach ($this->server->setting as $key => $value) { - $array[] = [$key, $value]; - $array[] = new TableSeparator(); - } - - array_pop($array); - - $data->setStyle('box-double'); - $data->setRows($array); - $data->render(); - - $id = Config::get('id', 'system-service'); - - $this->logger->debug(sprintf('[%s].' . $type . ' service %s::%d start', $id, $host, $port)); - - $this->addDefaultListener($settings); - } - - - /** - * @param array $settings - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - */ - private function addDefaultListener(array $settings): void - { - if (($this->server->setting['task_worker_num'] ?? 0) > 0) { - $this->addTaskListener($settings['events']); - } - $this->container->setBindings(SwooleServerInterface::class, $this->server); - $this->addServiceEvents(ServerManager::DEFAULT_EVENT, $this->server); - if (!empty($settings['events']) && is_array($settings['events'])) { - $this->addServiceEvents($settings['events'], $this->server); - } - } - - - /** - * @param array $events - * @param Server|Port $server - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - */ - private function addServiceEvents(array $events, Server|Port $server) - { - foreach ($events as $name => $event) { - if (is_array($event) && is_string($event[0])) { - $event[0] = $this->container->get($event[0]); - } - $server->on($name, $event); - } - } - - - /** - * - */ - public function start() - { - $this->server->start(); - } - - - /** - * @param mixed $message - * @param int $workerId - * @return mixed - */ - public function sendMessage(mixed $message, int $workerId): mixed - { - return $this->server?->sendMessage($message, $workerId); - } - - - /** - * @param array $events - * @return void - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - */ - private function addTaskListener(array $events = []): void - { - $task_use_object = $this->server->setting['task_object'] ?? $this->server->setting['task_use_object'] ?? false; - $reflect = $this->container->get(OnServerTask::class); - $this->server->on('finish', $events[Constant::FINISH] ?? [$reflect, 'onFinish']); - if ($task_use_object || $this->server->setting['task_enable_coroutine']) { - $this->server->on('task', $events[Constant::TASK] ?? [$reflect, 'onCoroutineTask']); - } else { - $this->server->on('task', $events[Constant::TASK] ?? [$reflect, 'onTask']); - } - } - - - /** - * @param array|null $settings - * @return void - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - */ - public function bindCallback(?array $settings = []) - { - if (count($settings) < 1) { - return; - } - foreach ($settings as $event_type => $callback) { - if ($this->server->getCallback($event_type) !== null) { - continue; - } - if (is_array($callback) && !is_object($callback[0])) { - $callback[0] = $this->container->get($callback[0]); - } - $this->server->on($event_type, $callback); - } - } -} + */ + public array $ports = []; + + public int $mode = SWOOLE_TCP; + + + private Server|null $server = null; + + + protected array $initProcesses = []; + + + const DEFAULT_EVENT = [ + Constant::WORKER_START => [OnServerWorker::class, 'onWorkerStart'], + Constant::WORKER_EXIT => [OnServerWorker::class, 'onWorkerExit'], + Constant::WORKER_STOP => [OnServerWorker::class, 'onWorkerStop'], + Constant::WORKER_ERROR => [OnServerWorker::class, 'onWorkerError'], + Constant::MANAGER_START => [OnServerManager::class, 'onManagerStart'], + Constant::MANAGER_STOP => [OnServerManager::class, 'onManagerStop'], + Constant::BEFORE_RELOAD => [OnServerReload::class, 'onBeforeReload'], + Constant::AFTER_RELOAD => [OnServerReload::class, 'onAfterReload'], + Constant::START => [OnServer::class, 'onStart'], + Constant::BEFORE_SHUTDOWN => [OnServer::class, 'onBeforeShutdown'], + Constant::SHUTDOWN => [OnServer::class, 'onShutdown'], + ]; + + + private array $eventInterface = [ + OnReceiveInterface::class => 'receive', + OnPacketInterface::class => 'packet', + OnHandshakeInterface::class => 'handshake', + OnMessageInterface::class => 'message', + OnConnectInterface::class => 'connect', + OnCloseInterface::class => 'close', + OnDisconnectInterface::class => 'disconnect' + ]; + + + /** + * @return Server|WServer|HServer|null + * @throws ReflectionException + */ + public function getServer(): Server|WServer|HServer|null + { + di(EventDispatch::class)->dispatch(new OnServerBeforeStart()); + return $this->server; + } + + + /** + * @param string $type + * @param string $host + * @param int $port + * @param int $mode + * @param array $settings + * @throws ConfigException + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + */ + public function addListener(string $type, string $host, int $port, int $mode, array $settings = []) + { + if (!$this->server) { + $this->createBaseServer($type, $host, $port, $mode, $settings); + } else { + if (!isset($settings['settings'])) { + $settings['settings'] = []; + } + $this->addNewListener($type, $host, $port, $mode, $settings); + } + } + + + /** + * @param $configs + * @param int $daemon + * @throws ConfigException + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + */ + public function initBaseServer($configs, int $daemon = 0): void + { + $context = di(ServerManager::class); + foreach ($this->sortService($configs['ports']) as $config) { + $this->startListenerHandler($context, $config, $daemon); + } + $this->bindCallback([Constant::PIPE_MESSAGE => [OnPipeMessage::class, 'onPipeMessage']]); + } + + + /** + * @param string|OnProcessInterface|BaseProcess $customProcess + * @throws Exception + */ + public function addProcess(string|OnProcessInterface|BaseProcess $customProcess) + { + if (is_string($customProcess)) { + $customProcess = Kiri::getDi()->get($customProcess); + } + $system = sprintf('[%s].process', Config::get('id', 'system-service')); + $this->logger->debug($system . ' ' . $customProcess->getName() . ' start.'); + $this->server->addProcess(new Process(function (Process $process) use ($customProcess, $system) { + if (Kiri::getPlatform()->isLinux()) { + $process->name($system . '(' . $customProcess->getName() . ')'); + } + $customProcess->onSigterm()->process($process); + }, + $customProcess->getRedirectStdinAndStdout(), + $customProcess->getPipeType(), + $customProcess->isEnableCoroutine() + )); + } + + + /** + * @return array + */ + public function getProcesses(): array + { + return $this->initProcesses; + } + + + /** + * @return array + */ + public function getSetting(): array + { + return $this->server->setting; + } + + + /** + * @param ServerManager $context + * @param array $config + * @param int $daemon + * @throws ConfigException + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + * @throws Exception + */ + private function startListenerHandler(ServerManager $context, array $config, int $daemon = 0) + { + if (!$this->server) { + $config = $this->mergeConfig($config, $daemon); + } + $context->addListener( + $config['type'], $config['host'], $config['port'], $config['mode'], + $config); + } + + + /** + * @param $config + * @param $daemon + * @return array + * @throws Exception + */ + private function mergeConfig($config, $daemon): array + { + $config['settings'] = $config['settings'] ?? []; + $config['settings']['daemonize'] = $daemon; + if (!isset($config['settings']['log_file'])) { + $config['settings']['log_file'] = storage('system.log'); + } + $config['settings']['pid_file'] = storage('.swoole.pid'); + $config['settings'][Constant::OPTION_ENABLE_REUSE_PORT] = true; + $config['events'] = $config['events'] ?? []; + return $config; + } + + + /** + * @param string $type + * @param string $host + * @param int $port + * @param int $mode + * @param array $settings + * @throws ConfigException + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + * @throws Exception + */ + private function addNewListener(string $type, string $host, int $port, int $mode, array $settings = []) + { + $id = Config::get('id', 'system-service'); + + $this->logger->debug(sprintf('[%s].' . $type . ' service %s::%d start', $id, $host, $port)); + + /** @var Server\Port $service */ + $this->ports[$port] = $this->server->addlistener($host, $port, $mode); + if ($this->ports[$port] === false) { + throw new Exception("The port is already in use[$host::$port]"); + } + if ($type == Constant::SERVER_TYPE_HTTP && !isset($settings['settings']['open_http_protocol'])) { + $settings['settings']['open_http_protocol'] = true; + if (in_array($this->server->setting['dispatch_mode'], [2, 4])) { + $settings['settings']['open_http2_protocol'] = true; + } + } + if ($type == Constant::SERVER_TYPE_WEBSOCKET && !isset($settings['settings']['open_websocket_protocol'])) { + $settings['settings']['open_websocket_protocol'] = true; + } + $this->ports[$port]->set($settings['settings'] ?? []); + $this->addServiceEvents($settings['events'] ?? [], $this->ports[$port]); + } + + + /** + * @param string $type + * @param string $host + * @param int $port + * @param int $mode + * @param array $settings + * @throws ConfigException + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + * @throws Exception + */ + private function createBaseServer(string $type, string $host, int $port, int $mode, array $settings = []) + { + $match = match ($type) { + Constant::SERVER_TYPE_BASE, Constant::SERVER_TYPE_TCP, + Constant::SERVER_TYPE_UDP => Server::class, + Constant::SERVER_TYPE_HTTP => HServer::class, + Constant::SERVER_TYPE_WEBSOCKET => WServer::class + }; + $this->server = new $match($host, $port, SWOOLE_PROCESS, $mode); + $this->server->set(array_merge(Config::get('server.settings', []), $settings['settings'])); + + $data = new Table($this->container->get(OutputInterface::class)); + $data->setHeaders(['key', 'value']); + + $array = []; + foreach ($this->server->setting as $key => $value) { + $array[] = [$key, $value]; + $array[] = new TableSeparator(); + } + + array_pop($array); + + $data->setStyle('box-double'); + $data->setRows($array); + $data->render(); + + $id = Config::get('id', 'system-service'); + + $this->logger->debug(sprintf('[%s].' . $type . ' service %s::%d start', $id, $host, $port)); + + $this->addDefaultListener($settings); + } + + + /** + * @param array $settings + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + */ + private function addDefaultListener(array $settings): void + { + if (($this->server->setting['task_worker_num'] ?? 0) > 0) { + $this->addTaskListener($settings['events']); + } + $this->container->setBindings(SwooleServerInterface::class, $this->server); + $this->addServiceEvents(ServerManager::DEFAULT_EVENT, $this->server); + if (!empty($settings['events']) && is_array($settings['events'])) { + $this->addServiceEvents($settings['events'], $this->server); + } + } + + + /** + * @param array $events + * @param Server|Port $server + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + */ + private function addServiceEvents(array $events, Server|Port $server) + { + foreach ($events as $name => $event) { + if (is_array($event) && is_string($event[0])) { + $event[0] = $this->container->get($event[0]); + } + $server->on($name, $event); + } + } + + + /** + * + */ + public function start() + { + $this->server->start(); + } + + + /** + * @param mixed $message + * @param int $workerId + * @return mixed + */ + public function sendMessage(mixed $message, int $workerId): mixed + { + return $this->server?->sendMessage($message, $workerId); + } + + + /** + * @param array $events + * @return void + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + */ + private function addTaskListener(array $events = []): void + { + $task_use_object = $this->server->setting['task_object'] ?? $this->server->setting['task_use_object'] ?? false; + $reflect = $this->container->get(OnServerTask::class); + $this->server->on('finish', $events[Constant::FINISH] ?? [$reflect, 'onFinish']); + if ($task_use_object || $this->server->setting['task_enable_coroutine']) { + $this->server->on('task', $events[Constant::TASK] ?? [$reflect, 'onCoroutineTask']); + } else { + $this->server->on('task', $events[Constant::TASK] ?? [$reflect, 'onTask']); + } + } + + + /** + * @param array|null $settings + * @return void + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + */ + public function bindCallback(?array $settings = []) + { + if (count($settings) < 1) { + return; + } + foreach ($settings as $event_type => $callback) { + if ($this->server->getCallback($event_type) !== null) { + continue; + } + if (is_array($callback) && !is_object($callback[0])) { + $callback[0] = $this->container->get($callback[0]); + } + $this->server->on($event_type, $callback); + } + } +} diff --git a/ServerProviders.php b/ServerProviders.php index 19ce79e..9137e72 100644 --- a/ServerProviders.php +++ b/ServerProviders.php @@ -1,34 +1,34 @@ -set('server', ['class' => Server::class]); - - $container = Kiri::getDi(); - - $console = $container->get(\Symfony\Component\Console\Application::class); - $console->add($container->get(ServerCommand::class)); - - } -} +set('server', ['class' => Server::class]); + + $container = Kiri::getDi(); + + $console = $container->get(\Symfony\Component\Console\Application::class); + $console->add($container->get(ServerCommand::class)); + + } +} diff --git a/State.php b/State.php index 6aa4d94..0c1f473 100644 --- a/State.php +++ b/State.php @@ -1,56 +1,56 @@ -servers = Config::get('server.ports'); - } - - - /** - * @return bool - * @throws Exception - */ - public function isRunner(): bool - { - $ports = $this->sortService($this->servers); - foreach ($ports as $config) { - if (checkPortIsAlready($config['port'])) { - return true; - } - } - return false; - } - - - /** - * @param $port - * @throws Exception - */ - public function exit($port) - { - if (!($pid = checkPortIsAlready($port))) { - return; - } - while (checkPortIsAlready($port)) { - Process::kill($pid, SIGTERM); - usleep(300); - } - } - -} +servers = Config::get('server.ports'); + } + + + /** + * @return bool + * @throws Exception + */ + public function isRunner(): bool + { + $ports = $this->sortService($this->servers); + foreach ($ports as $config) { + if (checkPortIsAlready($config['port'])) { + return true; + } + } + return false; + } + + + /** + * @param $port + * @throws Exception + */ + public function exit($port) + { + if (!($pid = checkPortIsAlready($port))) { + return; + } + while (checkPortIsAlready($port)) { + Process::kill($pid, SIGTERM); + usleep(300); + } + } + +} diff --git a/SwooleServerInterface.php b/SwooleServerInterface.php index 038180f..cf5a536 100644 --- a/SwooleServerInterface.php +++ b/SwooleServerInterface.php @@ -1,14 +1,14 @@ -hashMap = new HashMap(); - } - - - /** - * @param string $key - * @param $handler - */ - public function reg(string $key, $handler) - { - $this->hashMap->put($key, $handler); - } - - - /** - * @param OnTaskInterface|string $handler - * @param array $params - * @param int|null $workerId - * @throws Exception - */ - public function execute(OnTaskInterface|string $handler, array $params = [], int $workerId = null) - { - if (!$this->server) { - $this->server = Kiri::getDi()->get(SwooleServerInterface::class); - } - if ($workerId === null || $workerId <= $this->server->setting['worker_num']) { - $workerNum = $this->server->setting['worker_num']; - $taskerNum = $workerNum + $this->server->setting['task_worker_num']; - $workerId = random_int($workerNum, $taskerNum - 1); - } - if (is_string($handler)) { - $handler = $this->handle($handler, $params); - } - $this->server->task(serialize($handler), $workerId); - } - - - /** - * @param $handler - * @param $params - * @return object - * @throws ReflectionException - * @throws Exception - */ - private function handle($handler, $params): object - { - if (!class_exists($handler) && $this->hashMap->has($handler)) { - $handler = $this->hashMap->get($handler); - } - $implements = $this->container->getReflect($handler); - if (!in_array(OnTaskInterface::class, $implements->getInterfaceNames())) { - throw new Exception('Task must instance ' . OnTaskInterface::class); - } - return $implements->newInstanceArgs($params); - } - - -} +hashMap = new HashMap(); + } + + + /** + * @param string $key + * @param $handler + */ + public function reg(string $key, $handler) + { + $this->hashMap->put($key, $handler); + } + + + /** + * @param OnTaskInterface|string $handler + * @param array $params + * @param int|null $workerId + * @throws Exception + */ + public function execute(OnTaskInterface|string $handler, array $params = [], int $workerId = null) + { + if (!$this->server) { + $this->server = Kiri::getDi()->get(SwooleServerInterface::class); + } + if ($workerId === null || $workerId <= $this->server->setting['worker_num']) { + $workerNum = $this->server->setting['worker_num']; + $taskerNum = $workerNum + $this->server->setting['task_worker_num']; + $workerId = random_int($workerNum, $taskerNum - 1); + } + if (is_string($handler)) { + $handler = $this->handle($handler, $params); + } + $this->server->task(serialize($handler), $workerId); + } + + + /** + * @param $handler + * @param $params + * @return object + * @throws ReflectionException + * @throws Exception + */ + private function handle($handler, $params): object + { + if (!class_exists($handler) && $this->hashMap->has($handler)) { + $handler = $this->hashMap->get($handler); + } + $implements = $this->container->getReflect($handler); + if (!in_array(OnTaskInterface::class, $implements->getInterfaceNames())) { + throw new Exception('Task must instance ' . OnTaskInterface::class); + } + return $implements->newInstanceArgs($params); + } + + +} diff --git a/Tasker/OnServerTask.php b/Tasker/OnServerTask.php index 283ffd3..d765b0d 100644 --- a/Tasker/OnServerTask.php +++ b/Tasker/OnServerTask.php @@ -1,94 +1,94 @@ -resolve($data); - } catch (\Throwable $exception) { - $data = jTraceEx($exception); - - $this->logger->error('task', [$data]); - } finally { - $server->finish($data); - } - } - - - /** - * @param Server $server - * @param Server\Task $task - * @throws ConfigException - */ - public function onCoroutineTask(Server $server, Server\Task $task) - { - try { - $data = $this->resolve($task->data); - } catch (\Throwable $exception) { - $data = jTraceEx($exception); - - $this->logger->error('task', [$data]); - } finally { - $server->finish($data); - } - } - - - /** - * @param $data - * @return null - */ - private function resolve($data) - { - $execute = unserialize($data); - if ($execute instanceof OnTaskInterface) { - return $execute->execute(); - } - return null; - } - - - /** - * @param Server $server - * @param int $task_id - * @param mixed $data - */ - public function onFinish(Server $server, int $task_id, mixed $data) - { - if (!($data instanceof OnTaskInterface)) { - return; - } - $data->finish($server, $task_id); - } - - -} +resolve($data); + } catch (\Throwable $exception) { + $data = jTraceEx($exception); + + $this->logger->error('task', [$data]); + } finally { + $server->finish($data); + } + } + + + /** + * @param Server $server + * @param Server\Task $task + * @throws ConfigException + */ + public function onCoroutineTask(Server $server, Server\Task $task) + { + try { + $data = $this->resolve($task->data); + } catch (\Throwable $exception) { + $data = jTraceEx($exception); + + $this->logger->error('task', [$data]); + } finally { + $server->finish($data); + } + } + + + /** + * @param $data + * @return null + */ + private function resolve($data) + { + $execute = unserialize($data); + if ($execute instanceof OnTaskInterface) { + return $execute->execute(); + } + return null; + } + + + /** + * @param Server $server + * @param int $task_id + * @param mixed $data + */ + public function onFinish(Server $server, int $task_id, mixed $data) + { + if (!($data instanceof OnTaskInterface)) { + return; + } + $data->finish($server, $task_id); + } + + +} diff --git a/TraitServer.php b/TraitServer.php index 293cc5f..dc20d1f 100644 --- a/TraitServer.php +++ b/TraitServer.php @@ -1,32 +1,32 @@ -