From 2870a64792e0f4ead28962b85366f7de4200f827 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=91=E6=9E=97?= Date: Tue, 30 Nov 2021 14:32:56 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- kiri-server/Abstracts/BaseProcess.php | 143 ++++++++++++ kiri-server/Constant.php | 133 +++++++++++ kiri-server/Contract/OnBeforeShutdown.php | 8 + kiri-server/Contract/OnCloseInterface.php | 22 ++ kiri-server/Contract/OnConnectInterface.php | 18 ++ .../Contract/OnDisconnectInterface.php | 19 ++ kiri-server/Contract/OnDownloadInterface.php | 12 + kiri-server/Contract/OnHandshakeInterface.php | 22 ++ kiri-server/Contract/OnMessageInterface.php | 19 ++ kiri-server/Contract/OnOpenInterface.php | 18 ++ kiri-server/Contract/OnPacketInterface.php | 19 ++ .../Contract/OnPipeMessageInterface.php | 21 ++ kiri-server/Contract/OnProcessInterface.php | 29 +++ kiri-server/Contract/OnReceiveInterface.php | 25 +++ kiri-server/Contract/OnTaskInterface.php | 17 ++ kiri-server/CoroutineServer.php | 207 ++++++++++++++++++ kiri-server/SoloAsyncServer.php | 142 ++++++++++++ kiri-server/SwooleServerInterface.php | 12 + kiri-server/TraitServer.php | 50 +++++ 19 files changed, 936 insertions(+) create mode 100644 kiri-server/Abstracts/BaseProcess.php create mode 100644 kiri-server/Constant.php create mode 100644 kiri-server/Contract/OnBeforeShutdown.php create mode 100644 kiri-server/Contract/OnCloseInterface.php create mode 100644 kiri-server/Contract/OnConnectInterface.php create mode 100644 kiri-server/Contract/OnDisconnectInterface.php create mode 100644 kiri-server/Contract/OnDownloadInterface.php create mode 100644 kiri-server/Contract/OnHandshakeInterface.php create mode 100644 kiri-server/Contract/OnMessageInterface.php create mode 100644 kiri-server/Contract/OnOpenInterface.php create mode 100644 kiri-server/Contract/OnPacketInterface.php create mode 100644 kiri-server/Contract/OnPipeMessageInterface.php create mode 100644 kiri-server/Contract/OnProcessInterface.php create mode 100644 kiri-server/Contract/OnReceiveInterface.php create mode 100644 kiri-server/Contract/OnTaskInterface.php create mode 100644 kiri-server/CoroutineServer.php create mode 100644 kiri-server/SoloAsyncServer.php create mode 100644 kiri-server/SwooleServerInterface.php create mode 100644 kiri-server/TraitServer.php diff --git a/kiri-server/Abstracts/BaseProcess.php b/kiri-server/Abstracts/BaseProcess.php new file mode 100644 index 00000000..56d8ad60 --- /dev/null +++ b/kiri-server/Abstracts/BaseProcess.php @@ -0,0 +1,143 @@ +name; + } + + + /** + * @return bool + */ + public function isStop(): bool + { + return $this->isStop; + } + + /** + * @return mixed + */ + public function getRedirectStdinAndStdout(): mixed + { + return $this->redirect_stdin_and_stdout; + } + + /** + * @return int + */ + public function getPipeType(): int + { + return $this->pipe_type; + } + + /** + * @return bool + */ + public function isEnableCoroutine(): bool + { + return $this->enable_coroutine; + } + + + /** + * + */ + public function onProcessStop(): void + { + $this->isStop = true; + } + + + /** + * @return bool + */ + public function checkProcessIsStop(): bool + { + return $this->isStop === true; + } + + + /** + * @param Process $process + */ + public function signListen(Process $process): void + { + } + + + /** + * + */ + protected function exit(): void + { + putenv('process.status=idle'); + } + + + /** + * @return bool + */ + #[Pure] public function isWorking(): bool + { + return env('process.status', 'working') == 'working'; + } + + + /** + * + */ + private function waiteExit(Process $process): void + { + $this->onProcessStop(); + while ($this->isWorking()) { + $this->sleep(); + } + $process->exit(0); + } + + + /** + * + */ + private function sleep(): void + { + if ($this->enable_coroutine) { + Coroutine::sleep(0.1); + } else { + usleep(100); + } + } + +} diff --git a/kiri-server/Constant.php b/kiri-server/Constant.php new file mode 100644 index 00000000..7c6efc39 --- /dev/null +++ b/kiri-server/Constant.php @@ -0,0 +1,133 @@ +HTTP 404 Not Found
Powered by Swoole'; + const STATUS_405_MESSAGE = '

HTTP 405 Method allow


Powered by Swoole'; + + + const OPTION_REACTOR_NUM = 'reactor_num'; + const OPTION_WORKER_NUM = 'worker_num'; + const OPTION_MAX_REQUEST = 'max_request'; + const OPTION_MAX_CONN = 'max_connection'; + const OPTION_TASK_WORKER_NUM = 'task_worker_num'; + const OPTION_TASK_IPC_MODE = 'task_ipc_mode'; + const OPTION_TASK_MAX_REQUEST = 'task_max_request'; + const OPTION_TASK_TMPDIR = 'task_tmpdir'; + const OPTION_TASK_ENABLE_COROUTINE = 'task_enable_coroutine'; + const OPTION_TASK_USE_OBJECT = 'task_use_object'; + const OPTION_DISPATCH_MODE = 'dispatch_mode'; + const OPTION_DISPATCH_FUNC = 'dispatch_func'; + const OPTION_MESSAGE_QUEUE_KEY = 'message_queue_key'; + const OPTION_DAEMONIZE = 'daemonize'; + const OPTION_BACKLOG = 'backlog'; + const OPTION_LOG_FILE = 'log_file'; + const OPTION_LOG_LEVEL = 'log_level'; + const OPTION_LOG_DATE_WITH_MICROSECONDS = 'log_date_with_microseconds'; + const OPTION_LOG_ROTATION = 'log_rotation'; + const OPTION_LOG_DATE_FORMAT = 'log_date_format'; + const OPTION_OPEN_TCP_KEEPALIVE = 'open_tcp_keepalive'; + const OPTION_HEARTBEAT_CHECK_INTERVAL = 'heartbeat_check_interval'; + const OPTION_HEARTBEAT_IDLE_TIME = 'heartbeat_idle_time'; + const OPTION_OPEN_EOF_CHECK = 'open_eof_check'; + const OPTION_OPEN_EOF_SPLIT = 'open_eof_split'; + const OPTION_PACKAGE_EOF = 'package_eof'; + const OPTION_OPEN_LENGTH_CHECK = 'open_length_check'; + const OPTION_PACKAGE_LENGTH_TYPE = 'package_length_type'; + const OPTION_PACKAGE_LENGTH_FUNC = 'package_length_func'; + const OPTION_PACKAGE_MAX_LENGTH = 'package_max_length'; + const OPTION_OPEN_HTTP_PROTOCOL = 'open_http_protocol'; + const OPTION_OPEN_MQTT_PROTOCOL = 'open_mqtt_protocol'; + const OPTION_OPEN_REDIS_PROTOCOL = 'open_redis_protocol'; + const OPTION_OPEN_WEBSOCKET_PROTOCOL = 'open_websocket_protocol'; + const OPTION_OPEN_WEBSOCKET_CLOSE_FRAME = 'open_websocket_close_frame'; + const OPTION_OPEN_TCP_NODELAY = 'open_tcp_nodelay'; + const OPTION_OPEN_CPU_AFFINITY = 'open_cpu_affinity'; + const OPTION_CPU_AFFINITY_IGNORE = 'cpu_affinity_ignore'; + const OPTION_TCP_DEFER_ACCEPT = 'tcp_defer_accept'; + const OPTION_SSL_CERT_FILE = 'ssl_cert_file'; + const OPTION_SSL_KEY_FILE = 'ssl_key_file'; + const OPTION_SSL_METHOD = 'ssl_method'; + const OPTION_SSL_PROTOCOLS = 'ssl_protocols'; + const OPTION_SSL_SNI_CERTS = 'ssl_sni_certs'; + const OPTION_SSL_CIPHERS = 'ssl_ciphers'; + const OPTION_SSL_VERIFY_PEER = 'ssl_verify_peer'; + const OPTION_SSL_ALLOW_SELF_SIGNED = 'ssl_allow_self_signed'; + const OPTION_SSL_CLIENT_CERT_FILE = 'ssl_client_cert_file'; + const OPTION_SSL_COMPRESS = 'ssl_compress'; + const OPTION_SSL_VERIFY_DEPTH = 'ssl_verify_depth'; + const OPTION_SSL_PREFER_SERVER_CIPHERS = 'ssl_prefer_server_ciphers'; + const OPTION_SSL_DHPARAM = 'ssl_dhparam'; + const OPTION_SSL_ECDH_CURVE = 'ssl_ecdh_curve'; + const OPTION_USER = 'user'; + const OPTION_GROUP = 'group'; + const OPTION_CHROOT = 'chroot'; + const OPTION_PID_FILE = 'pid_file'; + const OPTION_BUFFER_INPUT_SIZE = 'buffer_input_size'; + const OPTION_BUFFER_OUTPUT_SIZE = 'buffer_output_size'; + const OPTION_SOCKET_BUFFER_SIZE = 'socket_buffer_size'; + const OPTION_ENABLE_UNSAFE_EVENT = 'enable_unsafe_event'; + const OPTION_DISCARD_TIMEOUT_REQUEST = 'discard_timeout_request'; + const OPTION_ENABLE_REUSE_PORT = 'enable_reuse_port'; + const OPTION_ENABLE_DELAY_RECEIVE = 'enable_delay_receive'; + const OPTION_RELOAD_ASYNC = 'reload_async'; + const OPTION_MAX_WAIT_TIME = 'max_wait_time'; + const OPTION_TCP_FASTOPEN = 'tcp_fastopen'; + const OPTION_REQUEST_SLOWLOG_FILE = 'request_slowlog_file'; + const OPTION_ENABLE_COROUTINE = 'enable_coroutine'; + const OPTION_MAX_COROUTINE = 'max_coroutine'; + const OPTION_SEND_YIELD = 'send_yield'; + const OPTION_SEND_TIMEOUT = 'send_timeout'; + const OPTION_HOOK_FLAGS = 'hook_flags'; + const OPTION_BUFFER_HIGH_WATERMARK = 'buffer_high_watermark'; + const OPTION_BUFFER_LOW_WATERMARK = 'buffer_low_watermark'; + const OPTION_TCP_USER_TIMEOUT = 'tcp_user_timeout'; + const OPTION_STATS_FILE = 'stats_file'; + const OPTION_EVENT_OBJECT = 'event_object'; + const OPTION_START_SESSION_ID = 'start_session_id'; + const OPTION_SINGLE_THREAD = 'single_thread'; + const OPTION_MAX_QUEUED_BYTES = 'max_queued_bytes'; + + +} diff --git a/kiri-server/Contract/OnBeforeShutdown.php b/kiri-server/Contract/OnBeforeShutdown.php new file mode 100644 index 00000000..e5397e5c --- /dev/null +++ b/kiri-server/Contract/OnBeforeShutdown.php @@ -0,0 +1,8 @@ + Server::class, + Constant::SERVER_TYPE_TCP => Server::class, + Constant::SERVER_TYPE_UDP => Server::class, + Constant::SERVER_TYPE_HTTP => HServer::class, + Constant::SERVER_TYPE_WEBSOCKET => HServer::class, + ]; + + + /** + * @param array $configs + * @param bool $daemon + * @throws Exception + */ + public function initBaseServer(array $configs, bool $daemon) + { + $configs['ports'] = $this->sortService($configs['ports']); + foreach ($configs['ports'] as $n => $config) { + $this->servers[$config['name'] ?? $n] = $this->addListener($config); + } + } + + + /** + * @param array $config + * @return mixed + * @throws ReflectionException + */ + private function addListener(array $config): mixed + { + /** @var HServer|Server $port */ + $class = self::SERVER_CLASS[$config['type']]; + $port = new $class($config['host'], $config['port'], $config['isSsl'] ?? false, $config['reuse_port'] ?? true); + $port->set($config['settings'] ?? []); + if ($config['type'] == Constant::SERVER_TYPE_HTTP) { + $port->handle('/', fn($request, $response) => $this->onRequestHandle($request, $response, $config)); + } else if ($config['type'] == Constant::SERVER_TYPE_WEBSOCKET) { + $port->handle('/', fn($request, $response) => $this->onWebsocketHandle($request, $response, $config)); + } else { + $port->handle(fn(Connection $connection) => $this->onConnectionHandle($connection, $config)); + } + return $this->eventListener($port, $config); + } + + + /** + * @param Request $request + * @param Response $response + * @param $config + */ + protected function onRequestHandle(Request $request, Response $response, $config) + { + if (isset($config[Constant::REQUEST])) { + call_user_func($config[Constant::REQUEST], $request, $response); + return; + } + $response->status(505); + $response->end(); + } + + + /** + * @param Request $request + * @param Response $response + * @param $config + */ + protected function onWebsocketHandle(Request $request, Response $response, $config) + { + $handshake = $config[Constant::HANDSHAKE] ?? null; + if (!is_null($handshake)) { + call_user_func($handshake, $request, $response); + } else { + $response->upgrade(); + $open = $config[Constant::OPEN] ?? null; + if (!is_null($open)) { + call_user_func($open, $request); + } + } + $close = $config[Constant::CLOSE] ?? null; + $message = $config[Constant::MESSAGE] ?? null; + while (true) { + $data = $response->recv(); + if ($data === '' || $data === false) { + $response->close(); + call_user_func($close, $response->fd); + } else { + call_user_func($message, $data); + } + } + } + + + /** + * @param Connection $connection + * @param $config + */ + protected function onConnectionHandle(Connection $connection, $config) + { + call_user_func($config[Constant::RECEIVE] ?? null, $connection); + } + + + /** + * @throws ConfigException + * @throws ReflectionException + */ + public function start(): void + { + $this->startProcess(); + run(function () { + $this->startServers(); + }); + } + + + /** + * @return array + * @throws ConfigException|ReflectionException + */ + private function startProcess(): array + { + $processes = []; + $system = sprintf('[%s].process', Config::get('id', 'system-service')); + foreach ($this->process as $process) { + /** @var BaseProcess $process */ + if (is_string($process)) { + $process = Kiri::getDi()->get($process); + } + $swowProcess = new Process([$process, 'onHandler'], $process->getRedirectStdinAndStdout(), + $process->getPipeType(), $process->isEnableCoroutine()); + if (Kiri::getPlatform()->isLinux()) { + $swowProcess->name($system . '(' . $process->getName() . ')'); + } + $swowProcess->start(); + array_push($processes, $swowProcess); + } + return $processes; + } + + + private function startServers() + { + foreach ($this->servers as $server) { + Coroutine::create(fn() => $server->start()); + } + } + + + /** + * @param mixed $server + * @param array $config + * @return mixed + * @throws ReflectionException + */ + private function eventListener(mixed $server, array $config): mixed + { + foreach ($config['events'] as $key => $value) { + if (is_array($value) && is_string($value[0])) { + $value[0] = Kiri::getDi()->get($value[0]); + } + $server->on($key, $value); + } + return $server; + } + +} diff --git a/kiri-server/SoloAsyncServer.php b/kiri-server/SoloAsyncServer.php new file mode 100644 index 00000000..f2ab915a --- /dev/null +++ b/kiri-server/SoloAsyncServer.php @@ -0,0 +1,142 @@ + Server::class, + Constant::SERVER_TYPE_HTTP => HServer::class, + Constant::SERVER_TYPE_WEBSOCKET => WServer::class + ]; + + + /** + * @param array $configs + * @param bool $daemon + * @throws Exception + */ + public function initBaseServer(array $configs, bool $daemon) + { + $configs['ports'] = $this->sortService($configs['ports']); + foreach ($configs['ports'] as $config) { + $service = $this->addListener($config); + if (!$this->server) { + $this->server = $service; + } + } + $this->startProcess(); + } + + + /** + * @throws ConfigException|\ReflectionException + */ + private function startProcess() + { + $system = sprintf('[%s].process', Config::get('id', 'system-service')); + foreach ($this->process as $process) { + /** @var BaseProcess $process */ + if (is_string($process)) { + $process = Kiri::getDi()->get($process); + } + $sowProcess = new Process([$process, 'onHandler'], $process->getRedirectStdinAndStdout(), + $process->getPipeType(), $process->isEnableCoroutine()); + if (Kiri::getPlatform()->isLinux()) { + $sowProcess->name($system . '(' . $process->getName() . ')'); + } + $this->server->addProcess($sowProcess); + } + } + + + /** + * @param array $config + * @return mixed + * @throws Exception + */ + private function addListener(array $config): Server\Port + { + $config = $this->resetConfig($config); + if (!$this->server) { + $class = self::SERVER_CLASS[$config['type']]; + $port = new $class($config['host'], $config['port'], SWOOLE_PROCESS, $config['mode']); + $config['settings'] = array_merge(Config::get('server.settings', []), $config['settings']); + $config['settings'][Constant::OPTION_DAEMONIZE] = 0; + } else { + $port = $this->server->addlistener($config['host'], $config['port'], $config['mode']); + if ($port === false) { + throw new Exception("The port is already in use[{$config['host']}::{$config['port']}]"); + } + } + $port->set($config['settings'] ?? []); + return $this->eventListener($port, $config); + } + + + /** + * @param Server\Port|Server|HServer|WServer $server + * @throws \ReflectionException + */ + private function eventListener(mixed $server, array $config): Server\Port|HServer|Server|WServer + { + foreach ($config['events'] as $key => $value) { + if (is_array($value) && is_string($value[0])) { + $value[0] = Kiri::getDi()->get($value[0]); + } + $server->on($key, $value); + } + return $server; + } + + + public function start() + { + $this->server->start(); + } + + + /** + * @param array $config + * @return array + */ + private function resetConfig(array $config): array + { + if ($config['type'] == Constant::SERVER_TYPE_HTTP && !isset($config['settings']['open_http_protocol'])) { + $config['settings']['open_http_protocol'] = true; + if ($this->server && in_array($this->server->setting['dispatch_mode'], [2, 4])) { + $config['settings']['open_http2_protocol'] = true; + } + } + if ($config['type'] == Constant::SERVER_TYPE_WEBSOCKET && !isset($config['settings']['open_websocket_protocol'])) { + $config['settings']['open_websocket_protocol'] = true; + } + return $config; + } + + +} diff --git a/kiri-server/SwooleServerInterface.php b/kiri-server/SwooleServerInterface.php new file mode 100644 index 00000000..8b9867c0 --- /dev/null +++ b/kiri-server/SwooleServerInterface.php @@ -0,0 +1,12 @@ +process[] = $process; + } + + + /** + * @param array $ports + * @return array + */ + public function sortService(array $ports): array + { + $array = []; + foreach ($ports as $port) { + if ($port['type'] == Constant::SERVER_TYPE_WEBSOCKET) { + array_unshift($array, $port); + } else if ($port['type'] == Constant::SERVER_TYPE_HTTP) { + if (!empty($array) && $array[0]['type'] == Constant::SERVER_TYPE_WEBSOCKET) { + $array[] = $port; + } else { + array_unshift($array, $port); + } + } else { + $array[] = $port; + } + } + return $array; + } + +}