From b969c86f7c3255f6b9eee0aed4d83ab00704d256 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=91=E6=9E=97?= Date: Wed, 3 Nov 2021 16:06:33 +0800 Subject: [PATCH] e --- a.php | 23 + http-server/Abstracts/BaseProcess.php | 126 ---- http-server/Abstracts/OnTaskerStart.php | 36 -- http-server/Abstracts/OnWorkerStart.php | 40 -- http-server/Abstracts/Server.php | 55 -- http-server/Abstracts/WorkerStart.php | 88 --- http-server/Constant.php | 132 ----- http-server/Events/OnAfterCommandExecute.php | 16 - http-server/Events/OnAfterReload.php | 18 - http-server/Events/OnAfterWorkerStart.php | 8 - http-server/Events/OnBeforeCommandExecute.php | 8 - http-server/Events/OnBeforeReload.php | 18 - http-server/Events/OnBeforeShutdown.php | 18 - http-server/Events/OnBeforeWorkerStart.php | 12 - http-server/Events/OnManagerStart.php | 16 - http-server/Events/OnManagerStop.php | 18 - http-server/Events/OnShutdown.php | 18 - http-server/Events/OnStart.php | 18 - http-server/Events/OnTaskerStart.php | 23 - http-server/Events/OnWorkerError.php | 26 - http-server/Events/OnWorkerExit.php | 23 - http-server/Events/OnWorkerStart.php | 23 - http-server/Events/OnWorkerStop.php | 23 - http-server/Handler/OnPipeMessage.php | 38 -- http-server/Handler/OnServer.php | 58 -- http-server/Handler/OnServerManager.php | 48 -- http-server/Handler/OnServerReload.php | 43 -- http-server/Handler/OnServerTask.php | 101 ---- http-server/Handler/OnServerWorker.php | 135 ----- http-server/SInterface/OnBeforeShutdown.php | 8 - http-server/SInterface/OnCloseInterface.php | 22 - http-server/SInterface/OnConnectInterface.php | 18 - .../SInterface/OnDisconnectInterface.php | 19 - .../SInterface/OnDownloadInterface.php | 12 - .../SInterface/OnHandshakeInterface.php | 22 - http-server/SInterface/OnMessageInterface.php | 19 - http-server/SInterface/OnOpenInterface.php | 18 - http-server/SInterface/OnPacketInterface.php | 19 - .../SInterface/OnPipeMessageInterface.php | 21 - http-server/SInterface/OnProcessInterface.php | 43 -- http-server/SInterface/OnReceiveInterface.php | 25 - http-server/SInterface/OnTaskInterface.php | 17 - http-server/Server.php | 137 ----- http-server/ServerCommand.php | 126 ---- http-server/ServerManager.php | 540 ------------------ http-server/ServerProviders.php | 34 -- http-server/SwooleServerInterface.php | 14 - kiri-process/OnProcessInterface.php | 10 + kiri-process/Process.php | 88 +++ kiri-process/TestProcess.php | 52 ++ p.php | 175 ++---- socket.html | 2 +- test.php | 36 +- 53 files changed, 256 insertions(+), 2430 deletions(-) create mode 100644 a.php delete mode 100644 http-server/Abstracts/BaseProcess.php delete mode 100644 http-server/Abstracts/OnTaskerStart.php delete mode 100644 http-server/Abstracts/OnWorkerStart.php delete mode 100644 http-server/Abstracts/Server.php delete mode 100644 http-server/Abstracts/WorkerStart.php delete mode 100644 http-server/Constant.php delete mode 100644 http-server/Events/OnAfterCommandExecute.php delete mode 100644 http-server/Events/OnAfterReload.php delete mode 100644 http-server/Events/OnAfterWorkerStart.php delete mode 100644 http-server/Events/OnBeforeCommandExecute.php delete mode 100644 http-server/Events/OnBeforeReload.php delete mode 100644 http-server/Events/OnBeforeShutdown.php delete mode 100644 http-server/Events/OnBeforeWorkerStart.php delete mode 100644 http-server/Events/OnManagerStart.php delete mode 100644 http-server/Events/OnManagerStop.php delete mode 100644 http-server/Events/OnShutdown.php delete mode 100644 http-server/Events/OnStart.php delete mode 100644 http-server/Events/OnTaskerStart.php delete mode 100644 http-server/Events/OnWorkerError.php delete mode 100644 http-server/Events/OnWorkerExit.php delete mode 100644 http-server/Events/OnWorkerStart.php delete mode 100644 http-server/Events/OnWorkerStop.php delete mode 100644 http-server/Handler/OnPipeMessage.php delete mode 100644 http-server/Handler/OnServer.php delete mode 100644 http-server/Handler/OnServerManager.php delete mode 100644 http-server/Handler/OnServerReload.php delete mode 100644 http-server/Handler/OnServerTask.php delete mode 100644 http-server/Handler/OnServerWorker.php delete mode 100644 http-server/SInterface/OnBeforeShutdown.php delete mode 100644 http-server/SInterface/OnCloseInterface.php delete mode 100644 http-server/SInterface/OnConnectInterface.php delete mode 100644 http-server/SInterface/OnDisconnectInterface.php delete mode 100644 http-server/SInterface/OnDownloadInterface.php delete mode 100644 http-server/SInterface/OnHandshakeInterface.php delete mode 100644 http-server/SInterface/OnMessageInterface.php delete mode 100644 http-server/SInterface/OnOpenInterface.php delete mode 100644 http-server/SInterface/OnPacketInterface.php delete mode 100644 http-server/SInterface/OnPipeMessageInterface.php delete mode 100644 http-server/SInterface/OnProcessInterface.php delete mode 100644 http-server/SInterface/OnReceiveInterface.php delete mode 100644 http-server/SInterface/OnTaskInterface.php delete mode 100644 http-server/Server.php delete mode 100644 http-server/ServerCommand.php delete mode 100644 http-server/ServerManager.php delete mode 100644 http-server/ServerProviders.php delete mode 100644 http-server/SwooleServerInterface.php create mode 100644 kiri-process/OnProcessInterface.php create mode 100644 kiri-process/Process.php create mode 100644 kiri-process/TestProcess.php diff --git a/a.php b/a.php new file mode 100644 index 00000000..6b81885d --- /dev/null +++ b/a.php @@ -0,0 +1,23 @@ + 0) { + $shift = (int)array_shift($first); + $endShift = (int)array_shift($end); + if ($endShift == $shift) { + continue; + } + if ($endShift < $shift) { + return true; + } else { + return false; + } + } + return false; +} + +var_dump(version('1.4.4','1.4.3')); diff --git a/http-server/Abstracts/BaseProcess.php b/http-server/Abstracts/BaseProcess.php deleted file mode 100644 index 13b83572..00000000 --- a/http-server/Abstracts/BaseProcess.php +++ /dev/null @@ -1,126 +0,0 @@ -isStop = true; - } - - - /** - * @return bool - */ - public function checkProcessIsStop(): bool - { - return $this->isStop === true; - } - - - - /** - * @param Process $process - */ - public function signListen(Process $process): void - { -// if (Coroutine::getCid() === -1) { -// Process::signal(SIGTERM | SIGKILL, function ($signo) use ($process) { -// if ($signo) { -// $lists = Kiri::app()->getProcess(); -// foreach ($lists as $process) { -// $process->exit(0); -// } -// } -// }); -// } else { -// Coroutine::create(function () use ($process) { -// /** @var Coroutine\Socket $message */ -// $message = $process->exportSocket(); -// if ($message->recv() == 0x03455343213212) { -// $this->waiteExit($process); -// } -// }); -// Coroutine::create(function () use ($process) { -// $data = Coroutine::waitSignal(SIGTERM | SIGKILL, -1); -// if ($data) { -// $lists = Kiri::app()->getProcess(); -// foreach ($lists as $name => $process) { -// foreach ($process as $item) { -// /** @var Coroutine\Socket $export */ -// $export = $item->exportSocket(); -// $export->send(0x03455343213212); -// } -// } -// } -// }); -// } - } - - - /** - * - */ - protected function exit(): void - { - putenv('process.status=idle'); - } - - - /** - * @return bool - */ - #[Pure] public function isWorking(): bool - { - return env('process.status', 'working') == 'working'; - } - - - /** - * - */ - private function waiteExit(Process $process): void - { - $this->onProcessStop(); - while ($this->isWorking()) { - $this->sleep(); - } - $process->exit(0); - } - - - /** - * - */ - private function sleep(): void - { - if ($this->enableSwooleCoroutine) { - Coroutine::sleep(0.1); - } else { - usleep(100); - } - } - -} diff --git a/http-server/Abstracts/OnTaskerStart.php b/http-server/Abstracts/OnTaskerStart.php deleted file mode 100644 index 9c5c6bc2..00000000 --- a/http-server/Abstracts/OnTaskerStart.php +++ /dev/null @@ -1,36 +0,0 @@ -interpretDirectory(); -// } - - $this->mixed($event, false, $time); - } - - -} diff --git a/http-server/Abstracts/OnWorkerStart.php b/http-server/Abstracts/OnWorkerStart.php deleted file mode 100644 index e4e366aa..00000000 --- a/http-server/Abstracts/OnWorkerStart.php +++ /dev/null @@ -1,40 +0,0 @@ -router->read_files(); -// $this->interpretDirectory(); -// } - $this->mixed($event, true, $time); - } - -} diff --git a/http-server/Abstracts/Server.php b/http-server/Abstracts/Server.php deleted file mode 100644 index a5dc5c82..00000000 --- a/http-server/Abstracts/Server.php +++ /dev/null @@ -1,55 +0,0 @@ -isMac()) { - return; - } - $name = Config::get('id', 'system-service'); - if (!empty($prefix)) { - $name .= '.' . $prefix; - } - swoole_set_process_name($name); - } - - - /** - * Server constructor. - * @throws Exception - */ - public function __construct() - { - } - -} diff --git a/http-server/Abstracts/WorkerStart.php b/http-server/Abstracts/WorkerStart.php deleted file mode 100644 index d93bfe1a..00000000 --- a/http-server/Abstracts/WorkerStart.php +++ /dev/null @@ -1,88 +0,0 @@ -annotation->read(APP_PATH . 'app', 'App'); - - $fileLists = $this->annotation->read(APP_PATH . 'app'); - foreach ($fileLists->runtime(APP_PATH . 'app') as $class) { - foreach (NoteManager::getTargetNote($class) as $value) { - $value->execute($class); - } - $methods = $di->getMethodAttribute($class); - foreach ($methods as $method => $attribute) { - if (empty($attribute)) { - continue; - } - foreach ($attribute as $item) { - $item->execute($class, $method); - } - } - } - } - - - /** - * @param $event - * @param $isWorker - * @param $time - * @throws \Kiri\Exception\ConfigException - */ - protected function mixed($event, $isWorker, $time) - { - $name = Config::get('id', 'system-service'); - echo sprintf("\033[36m[" . date('Y-m-d H:i:s') . "]\033[0m [%s]Builder %s[%d].%d use time %s.", $name, $isWorker ? 'Worker' : 'Taker', - $event->server->worker_pid, $event->workerId, round(microtime(true) - $time, 6) . 's') . PHP_EOL; - } - - /** - * @param $prefix - * @throws ConfigException - */ - protected function setProcessName($prefix) - { - if (Kiri::getPlatform()->isMac()) { - return; - } - $name = Config::get('id', 'system-service'); - if (!empty($prefix)) { - $name .= '.' . $prefix; - } - swoole_set_process_name($name); - } - -} diff --git a/http-server/Constant.php b/http-server/Constant.php deleted file mode 100644 index 5710e84b..00000000 --- a/http-server/Constant.php +++ /dev/null @@ -1,132 +0,0 @@ -HTTP 404 Not Found
Powered by Swoole'; - const STATUS_405_MESSAGE = '

HTTP 405 Method allow


Powered by Swoole'; - - - const OPTION_REACTOR_NUM = 'reactor_num'; - const OPTION_WORKER_NUM = 'worker_num'; - const OPTION_MAX_REQUEST = 'max_request'; - const OPTION_MAX_CONN = 'max_connection'; - const OPTION_TASK_WORKER_NUM = 'task_worker_num'; - const OPTION_TASK_IPC_MODE = 'task_ipc_mode'; - const OPTION_TASK_MAX_REQUEST = 'task_max_request'; - const OPTION_TASK_TMPDIR = 'task_tmpdir'; - const OPTION_TASK_ENABLE_COROUTINE = 'task_enable_coroutine'; - const OPTION_TASK_USE_OBJECT = 'task_use_object'; - const OPTION_DISPATCH_MODE = 'dispatch_mode'; - const OPTION_DISPATCH_FUNC = 'dispatch_func'; - const OPTION_MESSAGE_QUEUE_KEY = 'message_queue_key'; - const OPTION_DAEMONIZE = 'daemonize'; - const OPTION_BACKLOG = 'backlog'; - const OPTION_LOG_FILE = 'log_file'; - const OPTION_LOG_LEVEL = 'log_level'; - const OPTION_LOG_DATE_WITH_MICROSECONDS = 'log_date_with_microseconds'; - const OPTION_LOG_ROTATION = 'log_rotation'; - const OPTION_LOG_DATE_FORMAT = 'log_date_format'; - const OPTION_OPEN_TCP_KEEPALIVE = 'open_tcp_keepalive'; - const OPTION_HEARTBEAT_CHECK_INTERVAL = 'heartbeat_check_interval'; - const OPTION_HEARTBEAT_IDLE_TIME = 'heartbeat_idle_time'; - const OPTION_OPEN_EOF_CHECK = 'open_eof_check'; - const OPTION_OPEN_EOF_SPLIT = 'open_eof_split'; - const OPTION_PACKAGE_EOF = 'package_eof'; - const OPTION_OPEN_LENGTH_CHECK = 'open_length_check'; - const OPTION_PACKAGE_LENGTH_TYPE = 'package_length_type'; - const OPTION_PACKAGE_LENGTH_FUNC = 'package_length_func'; - const OPTION_PACKAGE_MAX_LENGTH = 'package_max_length'; - const OPTION_OPEN_HTTP_PROTOCOL = 'open_http_protocol'; - const OPTION_OPEN_MQTT_PROTOCOL = 'open_mqtt_protocol'; - const OPTION_OPEN_REDIS_PROTOCOL = 'open_redis_protocol'; - const OPTION_OPEN_WEBSOCKET_PROTOCOL = 'open_websocket_protocol'; - const OPTION_OPEN_WEBSOCKET_CLOSE_FRAME = 'open_websocket_close_frame'; - const OPTION_OPEN_TCP_NODELAY = 'open_tcp_nodelay'; - const OPTION_OPEN_CPU_AFFINITY = 'open_cpu_affinity'; - const OPTION_CPU_AFFINITY_IGNORE = 'cpu_affinity_ignore'; - const OPTION_TCP_DEFER_ACCEPT = 'tcp_defer_accept'; - const OPTION_SSL_CERT_FILE = 'ssl_cert_file'; - const OPTION_SSL_KEY_FILE = 'ssl_key_file'; - const OPTION_SSL_METHOD = 'ssl_method'; - const OPTION_SSL_PROTOCOLS = 'ssl_protocols'; - const OPTION_SSL_SNI_CERTS = 'ssl_sni_certs'; - const OPTION_SSL_CIPHERS = 'ssl_ciphers'; - const OPTION_SSL_VERIFY_PEER = 'ssl_verify_peer'; - const OPTION_SSL_ALLOW_SELF_SIGNED = 'ssl_allow_self_signed'; - const OPTION_SSL_CLIENT_CERT_FILE = 'ssl_client_cert_file'; - const OPTION_SSL_COMPRESS = 'ssl_compress'; - const OPTION_SSL_VERIFY_DEPTH = 'ssl_verify_depth'; - const OPTION_SSL_PREFER_SERVER_CIPHERS = 'ssl_prefer_server_ciphers'; - const OPTION_SSL_DHPARAM = 'ssl_dhparam'; - const OPTION_SSL_ECDH_CURVE = 'ssl_ecdh_curve'; - const OPTION_USER = 'user'; - const OPTION_GROUP = 'group'; - const OPTION_CHROOT = 'chroot'; - const OPTION_PID_FILE = 'pid_file'; - const OPTION_BUFFER_INPUT_SIZE = 'buffer_input_size'; - const OPTION_BUFFER_OUTPUT_SIZE = 'buffer_output_size'; - const OPTION_SOCKET_BUFFER_SIZE = 'socket_buffer_size'; - const OPTION_ENABLE_UNSAFE_EVENT = 'enable_unsafe_event'; - const OPTION_DISCARD_TIMEOUT_REQUEST = 'discard_timeout_request'; - const OPTION_ENABLE_REUSE_PORT = 'enable_reuse_port'; - const OPTION_ENABLE_DELAY_RECEIVE = 'enable_delay_receive'; - const OPTION_RELOAD_ASYNC = 'reload_async'; - const OPTION_MAX_WAIT_TIME = 'max_wait_time'; - const OPTION_TCP_FASTOPEN = 'tcp_fastopen'; - const OPTION_REQUEST_SLOWLOG_FILE = 'request_slowlog_file'; - const OPTION_ENABLE_COROUTINE = 'enable_coroutine'; - const OPTION_MAX_COROUTINE = 'max_coroutine'; - const OPTION_SEND_YIELD = 'send_yield'; - const OPTION_SEND_TIMEOUT = 'send_timeout'; - const OPTION_HOOK_FLAGS = 'hook_flags'; - const OPTION_BUFFER_HIGH_WATERMARK = 'buffer_high_watermark'; - const OPTION_BUFFER_LOW_WATERMARK = 'buffer_low_watermark'; - const OPTION_TCP_USER_TIMEOUT = 'tcp_user_timeout'; - const OPTION_STATS_FILE = 'stats_file'; - const OPTION_EVENT_OBJECT = 'event_object'; - const OPTION_START_SESSION_ID = 'start_session_id'; - const OPTION_SINGLE_THREAD = 'single_thread'; - const OPTION_MAX_QUEUED_BYTES = 'max_queued_bytes'; - - -} diff --git a/http-server/Events/OnAfterCommandExecute.php b/http-server/Events/OnAfterCommandExecute.php deleted file mode 100644 index 03805b62..00000000 --- a/http-server/Events/OnAfterCommandExecute.php +++ /dev/null @@ -1,16 +0,0 @@ -setProcessName(sprintf('start[%d].server', $server->master_pid)); - - $this->eventDispatch->dispatch(new OnStart($server)); - } - - - /** - * @param \Swoole\Server $server - */ - public function onBeforeShutdown(\Swoole\Server $server) - { - $this->eventDispatch->dispatch(new OnBeforeShutdown($server)); - } - - - /** - * @param \Swoole\Server $server - */ - public function onShutdown(\Swoole\Server $server) - { - $this->eventDispatch->dispatch(new OnShutdown($server)); - } - - -} diff --git a/http-server/Handler/OnServerManager.php b/http-server/Handler/OnServerManager.php deleted file mode 100644 index 391a36fb..00000000 --- a/http-server/Handler/OnServerManager.php +++ /dev/null @@ -1,48 +0,0 @@ -setProcessName(sprintf('manger[%d].0', $server->manager_pid)); - - $this->eventDispatch->dispatch(new OnManagerStart($server)); - } - - - /** - * @param \Swoole\Server $server - */ - public function onManagerStop(\Swoole\Server $server) - { - $this->eventDispatch->dispatch(new OnManagerStop($server)); - } - - -} diff --git a/http-server/Handler/OnServerReload.php b/http-server/Handler/OnServerReload.php deleted file mode 100644 index ca19e4fa..00000000 --- a/http-server/Handler/OnServerReload.php +++ /dev/null @@ -1,43 +0,0 @@ -eventDispatch->dispatch(new OnBeforeReload($server)); - } - - - /** - * @param Server $server - */ - public function onAfterReload(Server $server) - { - $this->eventDispatch->dispatch(new OnAfterReload($server)); - } - -} diff --git a/http-server/Handler/OnServerTask.php b/http-server/Handler/OnServerTask.php deleted file mode 100644 index d912e45f..00000000 --- a/http-server/Handler/OnServerTask.php +++ /dev/null @@ -1,101 +0,0 @@ -resolve($data); - } catch (\Throwable $exception) { - $data = jTraceEx($exception); - - $this->logger->error('task', [$data]); - } finally { - $server->finish($data); - } - } - - - /** - * @param Server $server - * @param Server\Task $task - * @throws ConfigException - */ - public function onCoroutineTask(Server $server, Server\Task $task) - { - try { - $data = $this->resolve($task->data); - } catch (\Throwable $exception) { - $data = jTraceEx($exception); - - $this->logger->error('task', [$data]); - } finally { - $server->finish($data); - } - } - - - /** - * @param $data - * @return null - * @throws ReflectionException - */ - private function resolve($data) - { - [$class, $params] = json_encode($data, true); - - $reflect = Kiri::getDi()->getReflect($class); - - if (!$reflect->isInstantiable()) { - return null; - } - $class = $reflect->newInstanceArgs($params); - return $class->execute(); - } - - - /** - * @param Server $server - * @param int $task_id - * @param mixed $data - */ - public function onFinish(Server $server, int $task_id, mixed $data) - { - if (!($data instanceof OnTaskInterface)) { - return; - } - $data->finish($server, $task_id); - } - - -} diff --git a/http-server/Handler/OnServerWorker.php b/http-server/Handler/OnServerWorker.php deleted file mode 100644 index d90ace7d..00000000 --- a/http-server/Handler/OnServerWorker.php +++ /dev/null @@ -1,135 +0,0 @@ -eventDispatch->dispatch(new OnBeforeWorkerStart($workerId)); - if ($workerId < $server->setting['worker_num']) { - $this->eventDispatch->dispatch(new OnWorkerStart($server, $workerId)); - $this->setProcessName(sprintf('Worker[%d].%d', $server->worker_pid, $workerId)); - } else { - $this->eventDispatch->dispatch(new OnTaskStart($server, $workerId)); - $this->setProcessName(sprintf('Tasker[%d].%d', $server->worker_pid, $workerId)); - } - $this->eventDispatch->dispatch(new OnAfterWorkerStart()); - } - - - /** - * @param OnBeforeWorkerStart $worker - * @throws Exception - */ - public function setConfigure(OnBeforeWorkerStart $worker) - { - ServerManager::setEnv('worker', $worker->workerId); - $serialize = file_get_contents(storage(Runtime::CONFIG_NAME)); - if (!empty($serialize)) { - Config::sets(unserialize($serialize)); - } - } - - - /** - * @param Server $server - * @param int $workerId - * @throws Exception - */ - public function onWorkerStop(Server $server, int $workerId) - { - $this->eventDispatch->dispatch(new OnWorkerStop($server, $workerId)); - - Timer::clearAll(); - } - - - /** - * @param Server $server - * @param int $workerId - * @throws Exception - */ - public function onWorkerExit(Server $server, int $workerId) - { - $this->eventDispatch->dispatch(new OnWorkerExit($server, $workerId)); - - ServerManager::setEnv('state', 'exit'); - } - - - /** - * @param Server $server - * @param int $worker_id - * @param int $worker_pid - * @param int $exit_code - * @param int $signal - * @throws Exception - */ - public function onWorkerError(Server $server, int $worker_id, int $worker_pid, int $exit_code, int $signal) - { - $this->eventDispatch->dispatch(new OnWorkerError($server, $worker_id, $worker_pid, $exit_code, $signal)); - - $message = sprintf('Worker#%d::%d error stop. signal %d, exit_code %d, msg %s', - $worker_id, $worker_pid, $signal, $exit_code, swoole_strerror(swoole_last_error(), 9) - ); - - $this->logger->error($message); - - $this->system_mail($message); - } - - - /** - * @param $messageContent - * @throws Exception - */ - protected function system_mail($messageContent) - { - try { - $email = Config::get('email'); - if (!empty($email) && ($email['enable'] ?? false) == true) { - Help::sendEmail($email, 'Service Error', $messageContent); - } - } catch (\Throwable $e) { - error($e, 'email'); - } - } - -} diff --git a/http-server/SInterface/OnBeforeShutdown.php b/http-server/SInterface/OnBeforeShutdown.php deleted file mode 100644 index 9e70758b..00000000 --- a/http-server/SInterface/OnBeforeShutdown.php +++ /dev/null @@ -1,8 +0,0 @@ -process[] = $process; - } - - - /** - * @return string start server - * - * start server - * @throws ConfigException - * @throws Exception - */ - public function start(): string - { - $this->manager->initBaseServer(Config::get('server', [], true), $this->daemon); - - $rpcService = Config::get('rpc', []); - if (!empty($rpcService)) { - $this->manager->addListener($rpcService['type'], $rpcService['host'], $rpcService['port'], - $rpcService['mode'], $rpcService); - } - - $processes = array_merge($this->process, Config::get('processes', [])); - foreach ($processes as $process) { - $this->manager->addProcess($process); - } - - return $this->manager->getServer()->start(); - } - - - /** - * @return void - * - * start server - * @throws Exception - */ - public function shutdown() - { - $configs = Config::get('server', [], true); - foreach ($this->manager->sortService($configs['ports'] ?? []) as $config) { - $this->manager->stopServer($config['port']); - } - $this->eventDispatch->dispatch(new OnShutdown()); - } - - - /** - * @return bool - * @throws ConfigException - */ - public function isRunner(): bool - { - return $this->manager->isRunner(); - } - - - /** - * @param $daemon - * @return Server - */ - public function setDaemon($daemon): static - { - if (!in_array($daemon, [0, 1])) { - return $this; - } - $this->daemon = $daemon; - return $this; - } - - - /** - * @return \Swoole\Http\Server|\Swoole\Server|\Swoole\WebSocket\Server|null - */ - #[Pure] public function getServer(): \Swoole\Http\Server|\Swoole\Server|\Swoole\WebSocket\Server|null - { - return $this->manager->getServer(); - } - -} diff --git a/http-server/ServerCommand.php b/http-server/ServerCommand.php deleted file mode 100644 index 84dee579..00000000 --- a/http-server/ServerCommand.php +++ /dev/null @@ -1,126 +0,0 @@ -setName('sw:server') - ->setDescription('server start|stop|reload|restart') - ->addArgument('action', InputArgument::REQUIRED) - ->addOption('daemon', 'd', InputOption::VALUE_OPTIONAL,'is run daemonize',-1); - } - - - /** - * @param InputInterface $input - * @param OutputInterface $output - * @return int - * @throws Exception - */ - public function execute(InputInterface $input, OutputInterface $output): int - { - try { - $manager = Kiri::app()->getServer(); - $manager->setDaemon((int)is_null($input->getOption('daemon'))); - if (!in_array($input->getArgument('action'), self::ACTIONS)) { - throw new Exception('I don\'t know what I want to do.'); - } - if ($manager->isRunner() && $input->getArgument('action') == 'start') { - throw new Exception('Service is running. Please use restart.'); - } - $manager->shutdown(); - if ($input->getArgument('action') == 'stop') { - throw new Exception('shutdown success'); - } - $this->generate_runtime_builder($manager); - } catch (\Throwable $throwable) { - $output->write(jTraceEx($throwable)); - } finally { - return 1; - } - } - - - /** - * @throws ConfigException - */ - private function configure_set() - { - $enable_coroutine = Config::get('servers.settings.enable_coroutine', false); - if ($enable_coroutine != true) { - return; - } - Coroutine::set([ - 'hook_flags' => SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_BLOCKING_FUNCTION, - 'enable_deadlock_check' => FALSE, - 'exit_condition' => function () { - return Coroutine::stats()['coroutine_num'] === 0; - } - ]); - } - - - /** - * @param $manager - * @throws ConfigException - */ - private function generate_runtime_builder($manager): void - { - $this->configure_set(); - - exec(PHP_BINARY . ' ' . APP_PATH . 'kiri.php runtime:builder'); - - $this->eventProvider->on(OnBeforeWorkerStart::class, [di(OnServerWorker::class), 'setConfigure']); - $this->eventProvider->on(OnWorkerStart::class, [di(WorkerDispatch::class), 'dispatch']); - $this->eventProvider->on(OnTaskerStart::class, [di(TaskerDispatch::class), 'dispatch']); - $manager->start(); - } - -} diff --git a/http-server/ServerManager.php b/http-server/ServerManager.php deleted file mode 100644 index c006ed15..00000000 --- a/http-server/ServerManager.php +++ /dev/null @@ -1,540 +0,0 @@ - */ - public array $ports = []; - - public int $mode = SWOOLE_TCP; - - - private Server|null $server = null; - - - /** - * @var ContainerInterface - */ - #[Inject(ContainerInterface::class)] - public ContainerInterface $container; - - - const DEFAULT_EVENT = [ - Constant::WORKER_START => [OnServerWorker::class, 'onWorkerStart'], - Constant::WORKER_EXIT => [OnServerWorker::class, 'onWorkerExit'], - Constant::WORKER_STOP => [OnServerWorker::class, 'onWorkerStop'], - Constant::WORKER_ERROR => [OnServerWorker::class, 'onWorkerError'], - Constant::MANAGER_START => [OnServerManager::class, 'onManagerStart'], - Constant::MANAGER_STOP => [OnServerManager::class, 'onManagerStop'], - Constant::BEFORE_RELOAD => [OnServerReload::class, 'onBeforeReload'], - Constant::AFTER_RELOAD => [OnServerReload::class, 'onAfterReload'], - Constant::START => [OnServer::class, 'onStart'], - Constant::BEFORE_SHUTDOWN => [OnServer::class, 'onBeforeShutdown'], - Constant::SHUTDOWN => [OnServer::class, 'onShutdown'], - ]; - - - private array $eventInterface = [ - OnReceiveInterface::class => 'receive', - OnPacketInterface::class => 'packet', - OnHandshakeInterface::class => 'handshake', - OnMessageInterface::class => 'message', - OnConnectInterface::class => 'connect', - OnCloseInterface::class => 'close', - OnDisconnectInterface::class => 'disconnect' - ]; - - - /** - * @return Server|WServer|HServer|null - */ - public function getServer(): Server|WServer|HServer|null - { - return $this->server; - } - - - /** - * @param string $type - * @param string $host - * @param int $port - * @param int $mode - * @param array $settings - * @throws ReflectionException - * @throws ConfigException - * @throws Exception - */ - public function addListener(string $type, string $host, int $port, int $mode, array $settings = []) - { - if ($this->checkPortIsAlready($port)) $this->stopServer($port); - if (!$this->server) { - $this->createBaseServer($type, $host, $port, $mode, $settings); - } else { - if (!isset($settings['settings'])) { - $settings['settings'] = []; - } - $this->addNewListener($type, $host, $port, $mode, $settings); - } - } - - - /** - * @throws ReflectionException - * @throws ConfigException - */ - public function initBaseServer($configs, int $daemon = 0): void - { - $context = di(ServerManager::class); - foreach ($this->sortService($configs['ports']) as $config) { - $this->startListenerHandler($context, $config, $daemon); - } - $this->bindCallback($this->server, [Constant::PIPE_MESSAGE => [OnPipeMessage::class, 'onPipeMessage']]); -// $this->bindCallback($this->server, $this->getSystemEvents($configs)); - } - - - /** - * @return bool - * @throws ConfigException - * @throws Exception - */ - public function isRunner(): bool - { - $configs = Config::get('server', [], true); - foreach ($this->sortService($configs['ports']) as $config) { - if ($this->checkPortIsAlready($config['port'])) { - return true; - } - } - return false; - } - - - /** - * @param string|OnProcessInterface $customProcess - * @param null $redirect_stdin_and_stdout - * @param int|null $pipe_type - * @param bool $enable_coroutine - * @throws Exception - */ - public function addProcess(string|OnProcessInterface $customProcess, $redirect_stdin_and_stdout = null, ?int $pipe_type = SOCK_DGRAM, bool $enable_coroutine = true) - { - $process = $this->initProcess($customProcess, $redirect_stdin_and_stdout, $pipe_type, $enable_coroutine); - $this->server->addProcess($process); - if ($customProcess instanceof OnProcessInterface) { - Kiri::app()->addProcess($customProcess::class, $process); - } else { - Kiri::app()->addProcess($customProcess, $process); - } - } - - - /** - * @param $customProcess - * @param $redirect_stdin_and_stdout - * @param $pipe_type - * @param $enable_coroutine - * @return Process - */ - private function initProcess($customProcess, $redirect_stdin_and_stdout, $pipe_type, $enable_coroutine): Process - { - $server = $this->server; - return new Process(function (Process $soloProcess) use ($customProcess, $server) { - $time = microtime(true); - if (is_string($customProcess)) { - $customProcess = Kiri::createObject($customProcess, [$server]); - } - - $name = $customProcess->getProcessName($soloProcess); - if (is_enable_file_modification_listening()) { - scan_directory(directory('app'), 'App'); - } - - $system = sprintf('%s.process[%d]', Config::get('id', 'system-service'), $soloProcess->pid); - if (Kiri::getPlatform()->isLinux()) { - $soloProcess->name($system . '.' . $name . ' start.'); - } - $name = Config::get('id', 'system-service'); - echo sprintf("\033[36m[" . date('Y-m-d H:i:s') . "]\033[0m [%s]Builder %s[%d].%d use time %s.", $name, 'Process ' . $name, - $server->master_pid, $soloProcess->pid, round(microtime(true) - $time, 6) . 's') . PHP_EOL; - $customProcess->signListen($soloProcess); - $customProcess->onHandler($soloProcess); - }, - $redirect_stdin_and_stdout, - $pipe_type, - $enable_coroutine - ); - } - - - /** - * @return array - */ - public function getSetting(): array - { - return $this->server->setting; - } - - - /** - * @param array $ports - * @return array - */ - public function sortService(array $ports): array - { - $array = []; - foreach ($ports as $port) { - if ($port['type'] == Constant::SERVER_TYPE_WEBSOCKET) { - array_unshift($array, $port); - } else if ($port['type'] == Constant::SERVER_TYPE_HTTP) { - if (!empty($array) && $array[0]['type'] == Constant::SERVER_TYPE_WEBSOCKET) { - $array[] = $port; - } else { - array_unshift($array, $port); - } - } else { - $array[] = $port; - } - } - return $array; - } - - - /** - * @param string $key - * @param string|int $value - */ - public static function setEnv(string $key, string|int $value): void - { - putenv(sprintf('%s=%s', $key, (string)$value)); - } - - - /** - * @param ServerManager $context - * @param array $config - * @param int $daemon - * @throws ConfigException - * @throws ReflectionException - * @throws Exception - */ - private function startListenerHandler(ServerManager $context, array $config, int $daemon = 0) - { - if (!$this->server) { - $config = $this->mergeConfig($config, $daemon); - } - $context->addListener( - $config['type'], $config['host'], $config['port'], $config['mode'], - $config); - } - - - /** - * @param $config - * @param $daemon - * @return array - * @throws Exception - */ - private function mergeConfig($config, $daemon): array - { - $config['settings'] = $config['settings'] ?? []; - if (!isset($config['settings']['daemonize']) || !$config['settings']['daemonize'] != $daemon) { - $config['settings']['daemonize'] = $daemon; - } - if (!isset($config['settings']['log_file'])) { - $config['settings']['log_file'] = storage('system.log'); - } - $config['settings']['pid_file'] = storage('.swoole.pid'); - $config['events'] = $config['events'] ?? []; - return $config; - } - - - /** - * @param string $type - * @param string $host - * @param int $port - * @param int $mode - * @param array $settings - * @throws Exception - */ - private function addNewListener(string $type, string $host, int $port, int $mode, array $settings = []) - { - $id = Config::get('id', 'system-service'); - - echo sprintf("\033[36m[" . date('Y-m-d H:i:s') . "]\033[0m [%s]$type service %s::%d start.", $id, $host, $port) . PHP_EOL; - /** @var Server\Port $service */ - $this->ports[$port] = $this->server->addlistener($host, $port, $mode); - if ($this->ports[$port] === false) { - throw new Exception("The port is already in use[$host::$port]"); - } - if ($type == Constant::SERVER_TYPE_HTTP && !isset($settings['settings']['open_http_protocol'])) { - $settings['settings']['open_http_protocol'] = true; - if (in_array($this->server->setting['dispatch_mode'], [2, 4])) { - $settings['settings']['open_http2_protocol'] = true; - } - } - if ($type == Constant::SERVER_TYPE_WEBSOCKET && !isset($settings['settings']['open_websocket_protocol'])) { - $settings['settings']['open_websocket_protocol'] = true; - } - $this->ports[$port]->set($settings['settings'] ?? []); - $this->addServiceEvents($settings['events'] ?? [], $this->ports[$port]); - } - - - /** - * @param int $port - * @param string $event - * @return Closure|array|null - */ - public function getPortCallback(int $port, string $event): Closure|array|null - { - /** @var Server\Port $_port */ - $_port = $this->ports[$port] ?? null; - if (is_null($_port)) { - return null; - } - return $_port->getCallback($event); - } - - - /** - * @param string $type - * @param string $host - * @param int $port - * @param int $mode - * @param array $settings - * @throws ReflectionException - * @throws ConfigException - */ - private function createBaseServer(string $type, string $host, int $port, int $mode, array $settings = []) - { - $match = match ($type) { - Constant::SERVER_TYPE_BASE, Constant::SERVER_TYPE_TCP, - Constant::SERVER_TYPE_UDP => Server::class, - Constant::SERVER_TYPE_HTTP => HServer::class, - Constant::SERVER_TYPE_WEBSOCKET => WServer::class - }; - $this->server = new $match($host, $port, SWOOLE_PROCESS, $mode); - $this->server->set(array_merge(Config::get('server.settings', []), $settings['settings'])); - - $id = Config::get('id', 'system-service'); - - echo sprintf("\033[36m[" . date('Y-m-d H:i:s') . "]\033[0m [%s]$type service %s::%d start.", $id, $host, $port) . PHP_EOL; - - $this->addDefaultListener($settings); - } - - - /** - * @param int $port - * @throws Exception - */ - public function stopServer(int $port) - { - if (!($pid = $this->checkPortIsAlready($port))) { - return; - } - while ($this->checkPortIsAlready($port)) { - Process::kill($pid, SIGTERM); - usleep(300); - } - } - - - /** - * @param $port - * @return bool|string - * @throws Exception - */ - private function checkPortIsAlready($port): bool|string - { - if (!Kiri::getPlatform()->isLinux()) { - exec("lsof -i :" . $port . " | grep -i 'LISTEN' | awk '{print $2}'", $output); - if (empty($output)) return false; - $output = explode(PHP_EOL, $output[0]); - return $output[0]; - } - - $serverPid = file_get_contents(storage('.swoole.pid')); - if (!empty($serverPid) && shell_exec('ps -ef | grep ' . $serverPid . ' | grep -v grep')) { - Process::kill($serverPid, SIGTERM); - } - - exec('netstat -lnp | grep ' . $port . ' | grep "LISTEN" | awk \'{print $7}\'', $output); - if (empty($output)) { - return false; - } - return explode('/', $output[0])[0]; - } - - - /** - * @param array $settings - * @throws ReflectionException - * @throws Exception - */ - private function addDefaultListener(array $settings): void - { - if (($this->server->setting['task_worker_num'] ?? 0) > 0) { - $this->addTaskListener($settings['events']); - } - $this->container->setBindings(SwooleServerInterface::class, $this->server); - $this->addServiceEvents(ServerManager::DEFAULT_EVENT, $this->server); - if (!empty($settings['events']) && is_array($settings['events'])) { - $this->addServiceEvents($settings['events'], $this->server); - } - } - - - /** - * @param array $events - * @param Server|Port $server - */ - private function addServiceEvents(array $events, Server|Port $server) - { - foreach ($events as $name => $event) { - if (is_array($event) && is_string($event[0])) { - $event[0] = $this->container->get($event[0]); - } - $server->on($name, $event); - } - } - - - /** - * - */ - public function start() - { - $this->server->start(); - } - - - /** - * @param string $class - * @return object - */ - private function getNewInstance(string $class): object - { - return $this->container->create($class); - } - - - /** - * @param OnTaskInterface|string $handler - * @param array $params - * @param int|null $workerId - * @throws ReflectionException - * @throws Exception - */ - public function task(OnTaskInterface|string $handler, array $params = [], int $workerId = null) - { - if ($workerId === null || $workerId <= $this->server->setting['worker_num']) { - $workerId = random_int($this->server->setting['worker_num'] + 1, - $this->server->setting['worker_num'] + 1 + $this->server->setting['task_worker_num']); - } - if (is_string($handler)) { - $implements = $this->container->getReflect($handler); - if (!in_array(OnTaskInterface::class, $implements->getInterfaceNames())) { - throw new Exception('Task must instance ' . OnTaskInterface::class); - } - $handler = $implements->newInstanceArgs($params); - } - $this->server->task(serialize($handler), $workerId); - } - - - /** - * @param mixed $message - * @param int $workerId - * @return mixed - */ - public function sendMessage(mixed $message, int $workerId): mixed - { - return $this->server?->sendMessage($message, $workerId); - } - - - /** - * @param array $events - * @throws ReflectionException - */ - private function addTaskListener(array $events = []): void - { - $task_use_object = $this->server->setting['task_object'] ?? $this->server->setting['task_use_object'] ?? false; - $reflect = $this->container->getReflect(OnServerTask::class)?->newInstance(); - if ($task_use_object || $this->server->setting['task_enable_coroutine']) { - $this->server->on('task', $events[Constant::TASK] ?? [$reflect, 'onCoroutineTask']); - } else { - $this->server->on('task', $events[Constant::TASK] ?? [$reflect, 'onTask']); - } - $this->server->on('finish', $events[Constant::FINISH] ?? [$reflect, 'onFinish']); - } - - - /** - * @param Port|Server $server - * @param array|null $settings - */ - public function bindCallback(Port|Server $server, ?array $settings = []) - { - // TODO: Implement bindCallback() method. - if (count($settings) < 1) { - return; - } - foreach ($settings as $event_type => $callback) { - if ($this->server->getCallback($event_type) !== null) { - continue; - } - if (is_array($callback) && !is_object($callback[0])) { - $callback[0] = $this->container->get($callback[0]); - } - $this->server->on($event_type, $callback); - } - } -} diff --git a/http-server/ServerProviders.php b/http-server/ServerProviders.php deleted file mode 100644 index 19ce79ea..00000000 --- a/http-server/ServerProviders.php +++ /dev/null @@ -1,34 +0,0 @@ -set('server', ['class' => Server::class]); - - $container = Kiri::getDi(); - - $console = $container->get(\Symfony\Component\Console\Application::class); - $console->add($container->get(ServerCommand::class)); - - } -} diff --git a/http-server/SwooleServerInterface.php b/http-server/SwooleServerInterface.php deleted file mode 100644 index 038180fb..00000000 --- a/http-server/SwooleServerInterface.php +++ /dev/null @@ -1,14 +0,0 @@ -process; + } + + /** + * @return mixed + */ + public function getRedirectStdinAndStdout(): mixed + { + return $this->redirect_stdin_and_stdout; + } + + /** + * @return int + */ + public function getPipeType(): int + { + return $this->pipe_type; + } + + /** + * @return bool + */ + public function isEnableCoroutine(): bool + { + return $this->enable_coroutine; + } + + /** + * @return string + */ + public function getName(): string + { + return $this->name; + } + + + /** + * @param \Swoole\Process $process + */ + public function start(\Swoole\Process $process) + { + $this->process = $process; + } + + +} diff --git a/kiri-process/TestProcess.php b/kiri-process/TestProcess.php new file mode 100644 index 00000000..7b3abd49 --- /dev/null +++ b/kiri-process/TestProcess.php @@ -0,0 +1,52 @@ +getRedirectStdinAndStdout(), + $class->getPipeType(), $class->isEnableCoroutine()); + $process->start(); + + array_push($array, $process); +} +run(function () use ($array) { + + foreach ($array as $value) { + var_dump($value->getCallback()); + } + +}); diff --git a/p.php b/p.php index 0cf8b3d6..50412261 100644 --- a/p.php +++ b/p.php @@ -1,132 +1,51 @@ write("11111"); +// +// after($process); +// }); +//} +// +// +//function read(Process $process, Coroutine\Channel $channel) +//{ +// $data = $process->read(); +// +// $channel->push($data); +// +// var_dump($channel->length()); +// +// read($process, $channel); +//} +// +//$process = new Process(function (Process $process) { +// +// $array = new Coroutine\Channel(9999999); +// +// $barrier = Barrier::make(); +// +// Coroutine::create('read', $process, $array); +// Coroutine::create(function (Coroutine\Channel $channel) { +// var_dump(1111); +// function ch(Coroutine\Channel $channel) { +// $data = $channel->pop(); +// +// var_dump($data); +// +// ch($channel); +// }; +// ch($channel); +// }, $array); +// Barrier::wait($barrier); +//}, null, SWOOLE_UNIX_STREAM, true); +//$process->start(); +// +//Coroutine\run(function () use ($process) { +// after($process); +//}); - public bool $isLoop = false; - - - const LOOP_TYPE_YEAR = 0; - const LOOP_TYPE_MONTH = 1; - const LOOP_TYPE_DAY = 2; - const LOOP_TYPE_HOUR = 3; - const LOOP_TYPE_MINUTE = 4; - const LOOP_TYPE_SECOND = 5; - - - public string $crontab = '2021 * * * */2 */5'; - - - public int $loopType = Crontab::LOOP_TYPE_MINUTE; - - - public int $loopTime = 2; - - - private int|string $month = '*'; - - private int|string $day = '*'; - - private int|string $hour = '*'; - - private int|string $minute = '*/2'; - - private int|string $second = '1-30'; - - private int|string $week = '*'; - - - public function __construct() - { - } - - - /** - * @return bool - */ - public function canExecute(): bool - { - $match = $this->next(); - if (str_contains($match, '^')) { - return false; - } - return true; - } - - - public function next(): string - { - $time = time(); - return sprintf('%s-%s-%s %s:%s:%s %s', - date('Y'), - $this->format($time, $this->month, 'm', 'month'), - $this->format($time, $this->day, 'd', 'day'), - $this->format($time, $this->hour, 'H', 'hour'), - $this->format($time, $this->minute, 'i', 'minute'), - $this->format($time, $this->second, 's', 'second'), - $this->format($time, $this->week, 'N'), - ); - } - - - /** - * @param int $startTime - * @param string $text - * @param string $match - * @param string|null $format - * @return string - */ - private function format(int &$startTime, string $text, string $match, ?string $format = null): string - { - $time = date($match); - if ($text == '*' || $text == '*/1') { - return $time; - } - if (str_contains($text, ',')) { - $explode = explode(',', $text); - sort($explode, SORT_NUMERIC); - if (in_array($time, $explode)) { - return $explode[array_search($time, $explode) + 1]; - } - return '^'; - } - if (str_contains($text, '-')) { - $explode = explode('-', $text); - if ($time >= $explode[0] && $time <= $explode[1]) { - return intval($time); - } - return '^'; - } - if (str_contains($text, '/')) { - $explode = explode('/', $text); - if ($time % $explode[1] !== 0) { - return '^'; - } - if ($explode[0] != '*') { - return $explode[0] == $text ? $time : '^'; - } - return $time; - } - return $time == $text ? $time : '^'; - } - - -} - -//$date = date('Y-m-d H:i:s'); -//var_dump(date('Y-m-d H:i:s', strtotime('+10 month', strtotime($date)))); -//var_dump(date('Y-m-d H:i:s', strtotime('+10 day', strtotime($date)))); -//var_dump(date('Y-m-d H:i:s', strtotime('+10 hour', strtotime($date)))); -//var_dump(date('Y-m-d H:i:s', strtotime('+10 minute', strtotime($date)))); -//var_dump(date('Y-m-d H:i:s', strtotime('+10 second', strtotime($date)))); -//var_dump(date('Y-m-d H:i:s', strtotime('+10 week', strtotime($date)))); - -$c = new Crontab(); - -while (true) { - var_dump($c->next()); - - sleep(1); -} diff --git a/socket.html b/socket.html index b999b445..fbdd4e0c 100644 --- a/socket.html +++ b/socket.html @@ -48,7 +48,7 @@ } function connect() { - sock = new WebSocket('ws://47.92.194.207:9528/'); + sock = new WebSocket('ws://localhost:8080/websocket'); sock.onopen = function () { if (tick) { clearInterval(tick) diff --git a/test.php b/test.php index a26217ba..76c980d2 100644 --- a/test.php +++ b/test.php @@ -1,3 +1,37 @@ get('/v1/agent/services?filter=Service == FriendRpcService'); + $client->close(); + var_dump($client->getBody()); + +});