From 000d084abfb3cb2aedfa23868ce666b14e4f060c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=91=E6=9E=97?= Date: Wed, 3 Nov 2021 15:17:52 +0800 Subject: [PATCH] first commit --- Abstracts/BaseProcess.php | 126 ++++++ Abstracts/OnTaskerStart.php | 36 ++ Abstracts/OnWorkerStart.php | 39 ++ Abstracts/Server.php | 55 +++ Abstracts/WorkerStart.php | 88 +++++ Constant.php | 132 +++++++ Events/OnAfterCommandExecute.php | 16 + Events/OnAfterReload.php | 18 + Events/OnAfterWorkerStart.php | 8 + Events/OnBeforeCommandExecute.php | 8 + Events/OnBeforeReload.php | 18 + Events/OnBeforeShutdown.php | 18 + Events/OnBeforeWorkerStart.php | 12 + Events/OnManagerStart.php | 16 + Events/OnManagerStop.php | 18 + Events/OnShutdown.php | 18 + Events/OnStart.php | 18 + Events/OnTaskerStart.php | 23 ++ Events/OnWorkerError.php | 26 ++ Events/OnWorkerExit.php | 23 ++ Events/OnWorkerStart.php | 23 ++ Events/OnWorkerStop.php | 23 ++ Handler/OnPipeMessage.php | 38 ++ Handler/OnServer.php | 58 +++ Handler/OnServerManager.php | 48 +++ Handler/OnServerReload.php | 43 +++ Handler/OnServerTask.php | 101 +++++ Handler/OnServerWorker.php | 135 +++++++ README.md | 0 SInterface/OnBeforeShutdown.php | 8 + SInterface/OnCloseInterface.php | 22 ++ SInterface/OnConnectInterface.php | 18 + SInterface/OnDisconnectInterface.php | 19 + SInterface/OnDownloadInterface.php | 12 + SInterface/OnHandshakeInterface.php | 22 ++ SInterface/OnMessageInterface.php | 19 + SInterface/OnOpenInterface.php | 18 + SInterface/OnPacketInterface.php | 19 + SInterface/OnPipeMessageInterface.php | 21 + SInterface/OnProcessInterface.php | 43 +++ SInterface/OnReceiveInterface.php | 25 ++ SInterface/OnTaskInterface.php | 17 + Server.php | 137 +++++++ ServerCommand.php | 126 ++++++ ServerManager.php | 537 ++++++++++++++++++++++++++ ServerProviders.php | 34 ++ SwooleServerInterface.php | 14 + composer.json | 21 + 48 files changed, 2317 insertions(+) create mode 100644 Abstracts/BaseProcess.php create mode 100644 Abstracts/OnTaskerStart.php create mode 100644 Abstracts/OnWorkerStart.php create mode 100644 Abstracts/Server.php create mode 100644 Abstracts/WorkerStart.php create mode 100644 Constant.php create mode 100644 Events/OnAfterCommandExecute.php create mode 100644 Events/OnAfterReload.php create mode 100644 Events/OnAfterWorkerStart.php create mode 100644 Events/OnBeforeCommandExecute.php create mode 100644 Events/OnBeforeReload.php create mode 100644 Events/OnBeforeShutdown.php create mode 100644 Events/OnBeforeWorkerStart.php create mode 100644 Events/OnManagerStart.php create mode 100644 Events/OnManagerStop.php create mode 100644 Events/OnShutdown.php create mode 100644 Events/OnStart.php create mode 100644 Events/OnTaskerStart.php create mode 100644 Events/OnWorkerError.php create mode 100644 Events/OnWorkerExit.php create mode 100644 Events/OnWorkerStart.php create mode 100644 Events/OnWorkerStop.php create mode 100644 Handler/OnPipeMessage.php create mode 100644 Handler/OnServer.php create mode 100644 Handler/OnServerManager.php create mode 100644 Handler/OnServerReload.php create mode 100644 Handler/OnServerTask.php create mode 100644 Handler/OnServerWorker.php create mode 100644 README.md create mode 100644 SInterface/OnBeforeShutdown.php create mode 100644 SInterface/OnCloseInterface.php create mode 100644 SInterface/OnConnectInterface.php create mode 100644 SInterface/OnDisconnectInterface.php create mode 100644 SInterface/OnDownloadInterface.php create mode 100644 SInterface/OnHandshakeInterface.php create mode 100644 SInterface/OnMessageInterface.php create mode 100644 SInterface/OnOpenInterface.php create mode 100644 SInterface/OnPacketInterface.php create mode 100644 SInterface/OnPipeMessageInterface.php create mode 100644 SInterface/OnProcessInterface.php create mode 100644 SInterface/OnReceiveInterface.php create mode 100644 SInterface/OnTaskInterface.php create mode 100644 Server.php create mode 100644 ServerCommand.php create mode 100644 ServerManager.php create mode 100644 ServerProviders.php create mode 100644 SwooleServerInterface.php create mode 100644 composer.json diff --git a/Abstracts/BaseProcess.php b/Abstracts/BaseProcess.php new file mode 100644 index 0000000..13b8357 --- /dev/null +++ b/Abstracts/BaseProcess.php @@ -0,0 +1,126 @@ +isStop = true; + } + + + /** + * @return bool + */ + public function checkProcessIsStop(): bool + { + return $this->isStop === true; + } + + + + /** + * @param Process $process + */ + public function signListen(Process $process): void + { +// if (Coroutine::getCid() === -1) { +// Process::signal(SIGTERM | SIGKILL, function ($signo) use ($process) { +// if ($signo) { +// $lists = Kiri::app()->getProcess(); +// foreach ($lists as $process) { +// $process->exit(0); +// } +// } +// }); +// } else { +// Coroutine::create(function () use ($process) { +// /** @var Coroutine\Socket $message */ +// $message = $process->exportSocket(); +// if ($message->recv() == 0x03455343213212) { +// $this->waiteExit($process); +// } +// }); +// Coroutine::create(function () use ($process) { +// $data = Coroutine::waitSignal(SIGTERM | SIGKILL, -1); +// if ($data) { +// $lists = Kiri::app()->getProcess(); +// foreach ($lists as $name => $process) { +// foreach ($process as $item) { +// /** @var Coroutine\Socket $export */ +// $export = $item->exportSocket(); +// $export->send(0x03455343213212); +// } +// } +// } +// }); +// } + } + + + /** + * + */ + protected function exit(): void + { + putenv('process.status=idle'); + } + + + /** + * @return bool + */ + #[Pure] public function isWorking(): bool + { + return env('process.status', 'working') == 'working'; + } + + + /** + * + */ + private function waiteExit(Process $process): void + { + $this->onProcessStop(); + while ($this->isWorking()) { + $this->sleep(); + } + $process->exit(0); + } + + + /** + * + */ + private function sleep(): void + { + if ($this->enableSwooleCoroutine) { + Coroutine::sleep(0.1); + } else { + usleep(100); + } + } + +} diff --git a/Abstracts/OnTaskerStart.php b/Abstracts/OnTaskerStart.php new file mode 100644 index 0000000..9c5c6bc --- /dev/null +++ b/Abstracts/OnTaskerStart.php @@ -0,0 +1,36 @@ +interpretDirectory(); +// } + + $this->mixed($event, false, $time); + } + + +} diff --git a/Abstracts/OnWorkerStart.php b/Abstracts/OnWorkerStart.php new file mode 100644 index 0000000..fa178d7 --- /dev/null +++ b/Abstracts/OnWorkerStart.php @@ -0,0 +1,39 @@ +router->read_files(); +// $this->interpretDirectory(); +// } + $this->mixed($event, true, $time); + } + +} diff --git a/Abstracts/Server.php b/Abstracts/Server.php new file mode 100644 index 0000000..a5dc5c8 --- /dev/null +++ b/Abstracts/Server.php @@ -0,0 +1,55 @@ +isMac()) { + return; + } + $name = Config::get('id', 'system-service'); + if (!empty($prefix)) { + $name .= '.' . $prefix; + } + swoole_set_process_name($name); + } + + + /** + * Server constructor. + * @throws Exception + */ + public function __construct() + { + } + +} diff --git a/Abstracts/WorkerStart.php b/Abstracts/WorkerStart.php new file mode 100644 index 0000000..d93bfe1 --- /dev/null +++ b/Abstracts/WorkerStart.php @@ -0,0 +1,88 @@ +annotation->read(APP_PATH . 'app', 'App'); + + $fileLists = $this->annotation->read(APP_PATH . 'app'); + foreach ($fileLists->runtime(APP_PATH . 'app') as $class) { + foreach (NoteManager::getTargetNote($class) as $value) { + $value->execute($class); + } + $methods = $di->getMethodAttribute($class); + foreach ($methods as $method => $attribute) { + if (empty($attribute)) { + continue; + } + foreach ($attribute as $item) { + $item->execute($class, $method); + } + } + } + } + + + /** + * @param $event + * @param $isWorker + * @param $time + * @throws \Kiri\Exception\ConfigException + */ + protected function mixed($event, $isWorker, $time) + { + $name = Config::get('id', 'system-service'); + echo sprintf("\033[36m[" . date('Y-m-d H:i:s') . "]\033[0m [%s]Builder %s[%d].%d use time %s.", $name, $isWorker ? 'Worker' : 'Taker', + $event->server->worker_pid, $event->workerId, round(microtime(true) - $time, 6) . 's') . PHP_EOL; + } + + /** + * @param $prefix + * @throws ConfigException + */ + protected function setProcessName($prefix) + { + if (Kiri::getPlatform()->isMac()) { + return; + } + $name = Config::get('id', 'system-service'); + if (!empty($prefix)) { + $name .= '.' . $prefix; + } + swoole_set_process_name($name); + } + +} diff --git a/Constant.php b/Constant.php new file mode 100644 index 0000000..5710e84 --- /dev/null +++ b/Constant.php @@ -0,0 +1,132 @@ +HTTP 404 Not Found
Powered by Swoole'; + const STATUS_405_MESSAGE = '

HTTP 405 Method allow


Powered by Swoole'; + + + const OPTION_REACTOR_NUM = 'reactor_num'; + const OPTION_WORKER_NUM = 'worker_num'; + const OPTION_MAX_REQUEST = 'max_request'; + const OPTION_MAX_CONN = 'max_connection'; + const OPTION_TASK_WORKER_NUM = 'task_worker_num'; + const OPTION_TASK_IPC_MODE = 'task_ipc_mode'; + const OPTION_TASK_MAX_REQUEST = 'task_max_request'; + const OPTION_TASK_TMPDIR = 'task_tmpdir'; + const OPTION_TASK_ENABLE_COROUTINE = 'task_enable_coroutine'; + const OPTION_TASK_USE_OBJECT = 'task_use_object'; + const OPTION_DISPATCH_MODE = 'dispatch_mode'; + const OPTION_DISPATCH_FUNC = 'dispatch_func'; + const OPTION_MESSAGE_QUEUE_KEY = 'message_queue_key'; + const OPTION_DAEMONIZE = 'daemonize'; + const OPTION_BACKLOG = 'backlog'; + const OPTION_LOG_FILE = 'log_file'; + const OPTION_LOG_LEVEL = 'log_level'; + const OPTION_LOG_DATE_WITH_MICROSECONDS = 'log_date_with_microseconds'; + const OPTION_LOG_ROTATION = 'log_rotation'; + const OPTION_LOG_DATE_FORMAT = 'log_date_format'; + const OPTION_OPEN_TCP_KEEPALIVE = 'open_tcp_keepalive'; + const OPTION_HEARTBEAT_CHECK_INTERVAL = 'heartbeat_check_interval'; + const OPTION_HEARTBEAT_IDLE_TIME = 'heartbeat_idle_time'; + const OPTION_OPEN_EOF_CHECK = 'open_eof_check'; + const OPTION_OPEN_EOF_SPLIT = 'open_eof_split'; + const OPTION_PACKAGE_EOF = 'package_eof'; + const OPTION_OPEN_LENGTH_CHECK = 'open_length_check'; + const OPTION_PACKAGE_LENGTH_TYPE = 'package_length_type'; + const OPTION_PACKAGE_LENGTH_FUNC = 'package_length_func'; + const OPTION_PACKAGE_MAX_LENGTH = 'package_max_length'; + const OPTION_OPEN_HTTP_PROTOCOL = 'open_http_protocol'; + const OPTION_OPEN_MQTT_PROTOCOL = 'open_mqtt_protocol'; + const OPTION_OPEN_REDIS_PROTOCOL = 'open_redis_protocol'; + const OPTION_OPEN_WEBSOCKET_PROTOCOL = 'open_websocket_protocol'; + const OPTION_OPEN_WEBSOCKET_CLOSE_FRAME = 'open_websocket_close_frame'; + const OPTION_OPEN_TCP_NODELAY = 'open_tcp_nodelay'; + const OPTION_OPEN_CPU_AFFINITY = 'open_cpu_affinity'; + const OPTION_CPU_AFFINITY_IGNORE = 'cpu_affinity_ignore'; + const OPTION_TCP_DEFER_ACCEPT = 'tcp_defer_accept'; + const OPTION_SSL_CERT_FILE = 'ssl_cert_file'; + const OPTION_SSL_KEY_FILE = 'ssl_key_file'; + const OPTION_SSL_METHOD = 'ssl_method'; + const OPTION_SSL_PROTOCOLS = 'ssl_protocols'; + const OPTION_SSL_SNI_CERTS = 'ssl_sni_certs'; + const OPTION_SSL_CIPHERS = 'ssl_ciphers'; + const OPTION_SSL_VERIFY_PEER = 'ssl_verify_peer'; + const OPTION_SSL_ALLOW_SELF_SIGNED = 'ssl_allow_self_signed'; + const OPTION_SSL_CLIENT_CERT_FILE = 'ssl_client_cert_file'; + const OPTION_SSL_COMPRESS = 'ssl_compress'; + const OPTION_SSL_VERIFY_DEPTH = 'ssl_verify_depth'; + const OPTION_SSL_PREFER_SERVER_CIPHERS = 'ssl_prefer_server_ciphers'; + const OPTION_SSL_DHPARAM = 'ssl_dhparam'; + const OPTION_SSL_ECDH_CURVE = 'ssl_ecdh_curve'; + const OPTION_USER = 'user'; + const OPTION_GROUP = 'group'; + const OPTION_CHROOT = 'chroot'; + const OPTION_PID_FILE = 'pid_file'; + const OPTION_BUFFER_INPUT_SIZE = 'buffer_input_size'; + const OPTION_BUFFER_OUTPUT_SIZE = 'buffer_output_size'; + const OPTION_SOCKET_BUFFER_SIZE = 'socket_buffer_size'; + const OPTION_ENABLE_UNSAFE_EVENT = 'enable_unsafe_event'; + const OPTION_DISCARD_TIMEOUT_REQUEST = 'discard_timeout_request'; + const OPTION_ENABLE_REUSE_PORT = 'enable_reuse_port'; + const OPTION_ENABLE_DELAY_RECEIVE = 'enable_delay_receive'; + const OPTION_RELOAD_ASYNC = 'reload_async'; + const OPTION_MAX_WAIT_TIME = 'max_wait_time'; + const OPTION_TCP_FASTOPEN = 'tcp_fastopen'; + const OPTION_REQUEST_SLOWLOG_FILE = 'request_slowlog_file'; + const OPTION_ENABLE_COROUTINE = 'enable_coroutine'; + const OPTION_MAX_COROUTINE = 'max_coroutine'; + const OPTION_SEND_YIELD = 'send_yield'; + const OPTION_SEND_TIMEOUT = 'send_timeout'; + const OPTION_HOOK_FLAGS = 'hook_flags'; + const OPTION_BUFFER_HIGH_WATERMARK = 'buffer_high_watermark'; + const OPTION_BUFFER_LOW_WATERMARK = 'buffer_low_watermark'; + const OPTION_TCP_USER_TIMEOUT = 'tcp_user_timeout'; + const OPTION_STATS_FILE = 'stats_file'; + const OPTION_EVENT_OBJECT = 'event_object'; + const OPTION_START_SESSION_ID = 'start_session_id'; + const OPTION_SINGLE_THREAD = 'single_thread'; + const OPTION_MAX_QUEUED_BYTES = 'max_queued_bytes'; + + +} diff --git a/Events/OnAfterCommandExecute.php b/Events/OnAfterCommandExecute.php new file mode 100644 index 0000000..03805b6 --- /dev/null +++ b/Events/OnAfterCommandExecute.php @@ -0,0 +1,16 @@ +setProcessName(sprintf('start[%d].server', $server->master_pid)); + + $this->eventDispatch->dispatch(new OnStart($server)); + } + + + /** + * @param \Swoole\Server $server + */ + public function onBeforeShutdown(\Swoole\Server $server) + { + $this->eventDispatch->dispatch(new OnBeforeShutdown($server)); + } + + + /** + * @param \Swoole\Server $server + */ + public function onShutdown(\Swoole\Server $server) + { + $this->eventDispatch->dispatch(new OnShutdown($server)); + } + + +} diff --git a/Handler/OnServerManager.php b/Handler/OnServerManager.php new file mode 100644 index 0000000..391a36f --- /dev/null +++ b/Handler/OnServerManager.php @@ -0,0 +1,48 @@ +setProcessName(sprintf('manger[%d].0', $server->manager_pid)); + + $this->eventDispatch->dispatch(new OnManagerStart($server)); + } + + + /** + * @param \Swoole\Server $server + */ + public function onManagerStop(\Swoole\Server $server) + { + $this->eventDispatch->dispatch(new OnManagerStop($server)); + } + + +} diff --git a/Handler/OnServerReload.php b/Handler/OnServerReload.php new file mode 100644 index 0000000..ca19e4f --- /dev/null +++ b/Handler/OnServerReload.php @@ -0,0 +1,43 @@ +eventDispatch->dispatch(new OnBeforeReload($server)); + } + + + /** + * @param Server $server + */ + public function onAfterReload(Server $server) + { + $this->eventDispatch->dispatch(new OnAfterReload($server)); + } + +} diff --git a/Handler/OnServerTask.php b/Handler/OnServerTask.php new file mode 100644 index 0000000..d912e45 --- /dev/null +++ b/Handler/OnServerTask.php @@ -0,0 +1,101 @@ +resolve($data); + } catch (\Throwable $exception) { + $data = jTraceEx($exception); + + $this->logger->error('task', [$data]); + } finally { + $server->finish($data); + } + } + + + /** + * @param Server $server + * @param Server\Task $task + * @throws ConfigException + */ + public function onCoroutineTask(Server $server, Server\Task $task) + { + try { + $data = $this->resolve($task->data); + } catch (\Throwable $exception) { + $data = jTraceEx($exception); + + $this->logger->error('task', [$data]); + } finally { + $server->finish($data); + } + } + + + /** + * @param $data + * @return null + * @throws ReflectionException + */ + private function resolve($data) + { + [$class, $params] = json_encode($data, true); + + $reflect = Kiri::getDi()->getReflect($class); + + if (!$reflect->isInstantiable()) { + return null; + } + $class = $reflect->newInstanceArgs($params); + return $class->execute(); + } + + + /** + * @param Server $server + * @param int $task_id + * @param mixed $data + */ + public function onFinish(Server $server, int $task_id, mixed $data) + { + if (!($data instanceof OnTaskInterface)) { + return; + } + $data->finish($server, $task_id); + } + + +} diff --git a/Handler/OnServerWorker.php b/Handler/OnServerWorker.php new file mode 100644 index 0000000..d90ace7 --- /dev/null +++ b/Handler/OnServerWorker.php @@ -0,0 +1,135 @@ +eventDispatch->dispatch(new OnBeforeWorkerStart($workerId)); + if ($workerId < $server->setting['worker_num']) { + $this->eventDispatch->dispatch(new OnWorkerStart($server, $workerId)); + $this->setProcessName(sprintf('Worker[%d].%d', $server->worker_pid, $workerId)); + } else { + $this->eventDispatch->dispatch(new OnTaskStart($server, $workerId)); + $this->setProcessName(sprintf('Tasker[%d].%d', $server->worker_pid, $workerId)); + } + $this->eventDispatch->dispatch(new OnAfterWorkerStart()); + } + + + /** + * @param OnBeforeWorkerStart $worker + * @throws Exception + */ + public function setConfigure(OnBeforeWorkerStart $worker) + { + ServerManager::setEnv('worker', $worker->workerId); + $serialize = file_get_contents(storage(Runtime::CONFIG_NAME)); + if (!empty($serialize)) { + Config::sets(unserialize($serialize)); + } + } + + + /** + * @param Server $server + * @param int $workerId + * @throws Exception + */ + public function onWorkerStop(Server $server, int $workerId) + { + $this->eventDispatch->dispatch(new OnWorkerStop($server, $workerId)); + + Timer::clearAll(); + } + + + /** + * @param Server $server + * @param int $workerId + * @throws Exception + */ + public function onWorkerExit(Server $server, int $workerId) + { + $this->eventDispatch->dispatch(new OnWorkerExit($server, $workerId)); + + ServerManager::setEnv('state', 'exit'); + } + + + /** + * @param Server $server + * @param int $worker_id + * @param int $worker_pid + * @param int $exit_code + * @param int $signal + * @throws Exception + */ + public function onWorkerError(Server $server, int $worker_id, int $worker_pid, int $exit_code, int $signal) + { + $this->eventDispatch->dispatch(new OnWorkerError($server, $worker_id, $worker_pid, $exit_code, $signal)); + + $message = sprintf('Worker#%d::%d error stop. signal %d, exit_code %d, msg %s', + $worker_id, $worker_pid, $signal, $exit_code, swoole_strerror(swoole_last_error(), 9) + ); + + $this->logger->error($message); + + $this->system_mail($message); + } + + + /** + * @param $messageContent + * @throws Exception + */ + protected function system_mail($messageContent) + { + try { + $email = Config::get('email'); + if (!empty($email) && ($email['enable'] ?? false) == true) { + Help::sendEmail($email, 'Service Error', $messageContent); + } + } catch (\Throwable $e) { + error($e, 'email'); + } + } + +} diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/SInterface/OnBeforeShutdown.php b/SInterface/OnBeforeShutdown.php new file mode 100644 index 0000000..9e70758 --- /dev/null +++ b/SInterface/OnBeforeShutdown.php @@ -0,0 +1,8 @@ +process[] = $process; + } + + + /** + * @return string start server + * + * start server + * @throws ConfigException + * @throws Exception + */ + public function start(): string + { + $this->manager->initBaseServer(Config::get('server', [], true), $this->daemon); + + $rpcService = Config::get('rpc', []); + if (!empty($rpcService)) { + $this->manager->addListener($rpcService['type'], $rpcService['host'], $rpcService['port'], + $rpcService['mode'], $rpcService); + } + + $processes = array_merge($this->process, Config::get('processes', [])); + foreach ($processes as $process) { + $this->manager->addProcess($process); + } + + return $this->manager->getServer()->start(); + } + + + /** + * @return void + * + * start server + * @throws Exception + */ + public function shutdown() + { + $configs = Config::get('server', [], true); + foreach ($this->manager->sortService($configs['ports'] ?? []) as $config) { + $this->manager->stopServer($config['port']); + } + $this->eventDispatch->dispatch(new OnShutdown()); + } + + + /** + * @return bool + * @throws ConfigException + */ + public function isRunner(): bool + { + return $this->manager->isRunner(); + } + + + /** + * @param $daemon + * @return Server + */ + public function setDaemon($daemon): static + { + if (!in_array($daemon, [0, 1])) { + return $this; + } + $this->daemon = $daemon; + return $this; + } + + + /** + * @return \Swoole\Http\Server|\Swoole\Server|\Swoole\WebSocket\Server|null + */ + #[Pure] public function getServer(): \Swoole\Http\Server|\Swoole\Server|\Swoole\WebSocket\Server|null + { + return $this->manager->getServer(); + } + +} diff --git a/ServerCommand.php b/ServerCommand.php new file mode 100644 index 0000000..84dee57 --- /dev/null +++ b/ServerCommand.php @@ -0,0 +1,126 @@ +setName('sw:server') + ->setDescription('server start|stop|reload|restart') + ->addArgument('action', InputArgument::REQUIRED) + ->addOption('daemon', 'd', InputOption::VALUE_OPTIONAL,'is run daemonize',-1); + } + + + /** + * @param InputInterface $input + * @param OutputInterface $output + * @return int + * @throws Exception + */ + public function execute(InputInterface $input, OutputInterface $output): int + { + try { + $manager = Kiri::app()->getServer(); + $manager->setDaemon((int)is_null($input->getOption('daemon'))); + if (!in_array($input->getArgument('action'), self::ACTIONS)) { + throw new Exception('I don\'t know what I want to do.'); + } + if ($manager->isRunner() && $input->getArgument('action') == 'start') { + throw new Exception('Service is running. Please use restart.'); + } + $manager->shutdown(); + if ($input->getArgument('action') == 'stop') { + throw new Exception('shutdown success'); + } + $this->generate_runtime_builder($manager); + } catch (\Throwable $throwable) { + $output->write(jTraceEx($throwable)); + } finally { + return 1; + } + } + + + /** + * @throws ConfigException + */ + private function configure_set() + { + $enable_coroutine = Config::get('servers.settings.enable_coroutine', false); + if ($enable_coroutine != true) { + return; + } + Coroutine::set([ + 'hook_flags' => SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_BLOCKING_FUNCTION, + 'enable_deadlock_check' => FALSE, + 'exit_condition' => function () { + return Coroutine::stats()['coroutine_num'] === 0; + } + ]); + } + + + /** + * @param $manager + * @throws ConfigException + */ + private function generate_runtime_builder($manager): void + { + $this->configure_set(); + + exec(PHP_BINARY . ' ' . APP_PATH . 'kiri.php runtime:builder'); + + $this->eventProvider->on(OnBeforeWorkerStart::class, [di(OnServerWorker::class), 'setConfigure']); + $this->eventProvider->on(OnWorkerStart::class, [di(WorkerDispatch::class), 'dispatch']); + $this->eventProvider->on(OnTaskerStart::class, [di(TaskerDispatch::class), 'dispatch']); + $manager->start(); + } + +} diff --git a/ServerManager.php b/ServerManager.php new file mode 100644 index 0000000..6bbd16f --- /dev/null +++ b/ServerManager.php @@ -0,0 +1,537 @@ + */ + public array $ports = []; + + public int $mode = SWOOLE_TCP; + + + private Server|null $server = null; + + + /** + * @var ContainerInterface + */ + #[Inject(ContainerInterface::class)] + public ContainerInterface $container; + + + const DEFAULT_EVENT = [ + Constant::WORKER_START => [OnServerWorker::class, 'onWorkerStart'], + Constant::WORKER_EXIT => [OnServerWorker::class, 'onWorkerExit'], + Constant::WORKER_STOP => [OnServerWorker::class, 'onWorkerStop'], + Constant::WORKER_ERROR => [OnServerWorker::class, 'onWorkerError'], + Constant::MANAGER_START => [OnServerManager::class, 'onManagerStart'], + Constant::MANAGER_STOP => [OnServerManager::class, 'onManagerStop'], + Constant::BEFORE_RELOAD => [OnServerReload::class, 'onBeforeReload'], + Constant::AFTER_RELOAD => [OnServerReload::class, 'onAfterReload'], + Constant::START => [OnServer::class, 'onStart'], + Constant::BEFORE_SHUTDOWN => [OnServer::class, 'onBeforeShutdown'], + Constant::SHUTDOWN => [OnServer::class, 'onShutdown'], + ]; + + + private array $eventInterface = [ + OnReceiveInterface::class => 'receive', + OnPacketInterface::class => 'packet', + OnHandshakeInterface::class => 'handshake', + OnMessageInterface::class => 'message', + OnConnectInterface::class => 'connect', + OnCloseInterface::class => 'close', + OnDisconnectInterface::class => 'disconnect' + ]; + + + /** + * @return Server|WServer|HServer|null + */ + public function getServer(): Server|WServer|HServer|null + { + return $this->server; + } + + + /** + * @param string $type + * @param string $host + * @param int $port + * @param int $mode + * @param array $settings + * @throws ReflectionException + * @throws ConfigException + * @throws Exception + */ + public function addListener(string $type, string $host, int $port, int $mode, array $settings = []) + { + if ($this->checkPortIsAlready($port)) $this->stopServer($port); + if (!$this->server) { + $this->createBaseServer($type, $host, $port, $mode, $settings); + } else { + if (!isset($settings['settings'])) { + $settings['settings'] = []; + } + $this->addNewListener($type, $host, $port, $mode, $settings); + } + } + + + /** + * @throws ReflectionException + * @throws ConfigException + */ + public function initBaseServer($configs, int $daemon = 0): void + { + $context = di(ServerManager::class); + foreach ($this->sortService($configs['ports']) as $config) { + $this->startListenerHandler($context, $config, $daemon); + } + $this->bindCallback($this->server, [Constant::PIPE_MESSAGE => [OnPipeMessage::class, 'onPipeMessage']]); +// $this->bindCallback($this->server, $this->getSystemEvents($configs)); + } + + + /** + * @return bool + * @throws ConfigException + * @throws Exception + */ + public function isRunner(): bool + { + $configs = Config::get('server', [], true); + foreach ($this->sortService($configs['ports']) as $config) { + if ($this->checkPortIsAlready($config['port'])) { + return true; + } + } + return false; + } + + + /** + * @param string|OnProcessInterface $customProcess + * @param null $redirect_stdin_and_stdout + * @param int|null $pipe_type + * @param bool $enable_coroutine + * @throws Exception + */ + public function addProcess(string|OnProcessInterface $customProcess, $redirect_stdin_and_stdout = null, ?int $pipe_type = SOCK_DGRAM, bool $enable_coroutine = true) + { + $process = $this->initProcess($customProcess, $redirect_stdin_and_stdout, $pipe_type, $enable_coroutine); + $this->server->addProcess($process); + if ($customProcess instanceof OnProcessInterface) { + Kiri::app()->addProcess($customProcess::class, $process); + } else { + Kiri::app()->addProcess($customProcess, $process); + } + } + + + /** + * @param $customProcess + * @param $redirect_stdin_and_stdout + * @param $pipe_type + * @param $enable_coroutine + * @return Process + */ + private function initProcess($customProcess, $redirect_stdin_and_stdout, $pipe_type, $enable_coroutine): Process + { + $server = $this->server; + return new Process(function (Process $soloProcess) use ($customProcess, $server) { + $time = microtime(true); + if (is_string($customProcess)) { + $customProcess = Kiri::createObject($customProcess, [$server]); + } + + $name = $customProcess->getProcessName($soloProcess); + + $system = sprintf('%s.process[%d]', Config::get('id', 'system-service'), $soloProcess->pid); + if (Kiri::getPlatform()->isLinux()) { + $soloProcess->name($system . '.' . $name . ' start.'); + } + $name = Config::get('id', 'system-service'); + echo sprintf("\033[36m[" . date('Y-m-d H:i:s') . "]\033[0m [%s]Builder %s[%d].%d use time %s.", $name, 'Process ' . $name, + $server->master_pid, $soloProcess->pid, round(microtime(true) - $time, 6) . 's') . PHP_EOL; + $customProcess->signListen($soloProcess); + $customProcess->onHandler($soloProcess); + }, + $redirect_stdin_and_stdout, + $pipe_type, + $enable_coroutine + ); + } + + + /** + * @return array + */ + public function getSetting(): array + { + return $this->server->setting; + } + + + /** + * @param array $ports + * @return array + */ + public function sortService(array $ports): array + { + $array = []; + foreach ($ports as $port) { + if ($port['type'] == Constant::SERVER_TYPE_WEBSOCKET) { + array_unshift($array, $port); + } else if ($port['type'] == Constant::SERVER_TYPE_HTTP) { + if (!empty($array) && $array[0]['type'] == Constant::SERVER_TYPE_WEBSOCKET) { + $array[] = $port; + } else { + array_unshift($array, $port); + } + } else { + $array[] = $port; + } + } + return $array; + } + + + /** + * @param string $key + * @param string|int $value + */ + public static function setEnv(string $key, string|int $value): void + { + putenv(sprintf('%s=%s', $key, (string)$value)); + } + + + /** + * @param ServerManager $context + * @param array $config + * @param int $daemon + * @throws ConfigException + * @throws ReflectionException + * @throws Exception + */ + private function startListenerHandler(ServerManager $context, array $config, int $daemon = 0) + { + if (!$this->server) { + $config = $this->mergeConfig($config, $daemon); + } + $context->addListener( + $config['type'], $config['host'], $config['port'], $config['mode'], + $config); + } + + + /** + * @param $config + * @param $daemon + * @return array + * @throws Exception + */ + private function mergeConfig($config, $daemon): array + { + $config['settings'] = $config['settings'] ?? []; + if (!isset($config['settings']['daemonize']) || !$config['settings']['daemonize'] != $daemon) { + $config['settings']['daemonize'] = $daemon; + } + if (!isset($config['settings']['log_file'])) { + $config['settings']['log_file'] = storage('system.log'); + } + $config['settings']['pid_file'] = storage('.swoole.pid'); + $config['events'] = $config['events'] ?? []; + return $config; + } + + + /** + * @param string $type + * @param string $host + * @param int $port + * @param int $mode + * @param array $settings + * @throws Exception + */ + private function addNewListener(string $type, string $host, int $port, int $mode, array $settings = []) + { + $id = Config::get('id', 'system-service'); + + echo sprintf("\033[36m[" . date('Y-m-d H:i:s') . "]\033[0m [%s]$type service %s::%d start.", $id, $host, $port) . PHP_EOL; + /** @var Server\Port $service */ + $this->ports[$port] = $this->server->addlistener($host, $port, $mode); + if ($this->ports[$port] === false) { + throw new Exception("The port is already in use[$host::$port]"); + } + if ($type == Constant::SERVER_TYPE_HTTP && !isset($settings['settings']['open_http_protocol'])) { + $settings['settings']['open_http_protocol'] = true; + if (in_array($this->server->setting['dispatch_mode'], [2, 4])) { + $settings['settings']['open_http2_protocol'] = true; + } + } + if ($type == Constant::SERVER_TYPE_WEBSOCKET && !isset($settings['settings']['open_websocket_protocol'])) { + $settings['settings']['open_websocket_protocol'] = true; + } + $this->ports[$port]->set($settings['settings'] ?? []); + $this->addServiceEvents($settings['events'] ?? [], $this->ports[$port]); + } + + + /** + * @param int $port + * @param string $event + * @return Closure|array|null + */ + public function getPortCallback(int $port, string $event): Closure|array|null + { + /** @var Server\Port $_port */ + $_port = $this->ports[$port] ?? null; + if (is_null($_port)) { + return null; + } + return $_port->getCallback($event); + } + + + /** + * @param string $type + * @param string $host + * @param int $port + * @param int $mode + * @param array $settings + * @throws ReflectionException + * @throws ConfigException + */ + private function createBaseServer(string $type, string $host, int $port, int $mode, array $settings = []) + { + $match = match ($type) { + Constant::SERVER_TYPE_BASE, Constant::SERVER_TYPE_TCP, + Constant::SERVER_TYPE_UDP => Server::class, + Constant::SERVER_TYPE_HTTP => HServer::class, + Constant::SERVER_TYPE_WEBSOCKET => WServer::class + }; + $this->server = new $match($host, $port, SWOOLE_PROCESS, $mode); + $this->server->set(array_merge(Config::get('server.settings', []), $settings['settings'])); + + $id = Config::get('id', 'system-service'); + + echo sprintf("\033[36m[" . date('Y-m-d H:i:s') . "]\033[0m [%s]$type service %s::%d start.", $id, $host, $port) . PHP_EOL; + + $this->addDefaultListener($settings); + } + + + /** + * @param int $port + * @throws Exception + */ + public function stopServer(int $port) + { + if (!($pid = $this->checkPortIsAlready($port))) { + return; + } + while ($this->checkPortIsAlready($port)) { + Process::kill($pid, SIGTERM); + usleep(300); + } + } + + + /** + * @param $port + * @return bool|string + * @throws Exception + */ + private function checkPortIsAlready($port): bool|string + { + if (!Kiri::getPlatform()->isLinux()) { + exec("lsof -i :" . $port . " | grep -i 'LISTEN' | awk '{print $2}'", $output); + if (empty($output)) return false; + $output = explode(PHP_EOL, $output[0]); + return $output[0]; + } + + $serverPid = file_get_contents(storage('.swoole.pid')); + if (!empty($serverPid) && shell_exec('ps -ef | grep ' . $serverPid . ' | grep -v grep')) { + Process::kill($serverPid, SIGTERM); + } + + exec('netstat -lnp | grep ' . $port . ' | grep "LISTEN" | awk \'{print $7}\'', $output); + if (empty($output)) { + return false; + } + return explode('/', $output[0])[0]; + } + + + /** + * @param array $settings + * @throws ReflectionException + * @throws Exception + */ + private function addDefaultListener(array $settings): void + { + if (($this->server->setting['task_worker_num'] ?? 0) > 0) { + $this->addTaskListener($settings['events']); + } + $this->container->setBindings(SwooleServerInterface::class, $this->server); + $this->addServiceEvents(ServerManager::DEFAULT_EVENT, $this->server); + if (!empty($settings['events']) && is_array($settings['events'])) { + $this->addServiceEvents($settings['events'], $this->server); + } + } + + + /** + * @param array $events + * @param Server|Port $server + */ + private function addServiceEvents(array $events, Server|Port $server) + { + foreach ($events as $name => $event) { + if (is_array($event) && is_string($event[0])) { + $event[0] = $this->container->get($event[0]); + } + $server->on($name, $event); + } + } + + + /** + * + */ + public function start() + { + $this->server->start(); + } + + + /** + * @param string $class + * @return object + */ + private function getNewInstance(string $class): object + { + return $this->container->create($class); + } + + + /** + * @param OnTaskInterface|string $handler + * @param array $params + * @param int|null $workerId + * @throws ReflectionException + * @throws Exception + */ + public function task(OnTaskInterface|string $handler, array $params = [], int $workerId = null) + { + if ($workerId === null || $workerId <= $this->server->setting['worker_num']) { + $workerId = random_int($this->server->setting['worker_num'] + 1, + $this->server->setting['worker_num'] + 1 + $this->server->setting['task_worker_num']); + } + if (is_string($handler)) { + $implements = $this->container->getReflect($handler); + if (!in_array(OnTaskInterface::class, $implements->getInterfaceNames())) { + throw new Exception('Task must instance ' . OnTaskInterface::class); + } + $handler = $implements->newInstanceArgs($params); + } + $this->server->task(serialize($handler), $workerId); + } + + + /** + * @param mixed $message + * @param int $workerId + * @return mixed + */ + public function sendMessage(mixed $message, int $workerId): mixed + { + return $this->server?->sendMessage($message, $workerId); + } + + + /** + * @param array $events + * @throws ReflectionException + */ + private function addTaskListener(array $events = []): void + { + $task_use_object = $this->server->setting['task_object'] ?? $this->server->setting['task_use_object'] ?? false; + $reflect = $this->container->getReflect(OnServerTask::class)?->newInstance(); + if ($task_use_object || $this->server->setting['task_enable_coroutine']) { + $this->server->on('task', $events[Constant::TASK] ?? [$reflect, 'onCoroutineTask']); + } else { + $this->server->on('task', $events[Constant::TASK] ?? [$reflect, 'onTask']); + } + $this->server->on('finish', $events[Constant::FINISH] ?? [$reflect, 'onFinish']); + } + + + /** + * @param Port|Server $server + * @param array|null $settings + */ + public function bindCallback(Port|Server $server, ?array $settings = []) + { + // TODO: Implement bindCallback() method. + if (count($settings) < 1) { + return; + } + foreach ($settings as $event_type => $callback) { + if ($this->server->getCallback($event_type) !== null) { + continue; + } + if (is_array($callback) && !is_object($callback[0])) { + $callback[0] = $this->container->get($callback[0]); + } + $this->server->on($event_type, $callback); + } + } +} diff --git a/ServerProviders.php b/ServerProviders.php new file mode 100644 index 0000000..19ce79e --- /dev/null +++ b/ServerProviders.php @@ -0,0 +1,34 @@ +set('server', ['class' => Server::class]); + + $container = Kiri::getDi(); + + $console = $container->get(\Symfony\Component\Console\Application::class); + $console->add($container->get(ServerCommand::class)); + + } +} diff --git a/SwooleServerInterface.php b/SwooleServerInterface.php new file mode 100644 index 0000000..038180f --- /dev/null +++ b/SwooleServerInterface.php @@ -0,0 +1,14 @@ +=8.0", + "ext-json": "*", + "composer-runtime-api": "^2.0", + "psr/http-server-middleware": "^1.0", + "psr/http-message": "^1.0", + "psr/event-dispatcher": "^1.0", + "game-worker/kiri-http-message": "dev-master" + } +}