From dc47bd5106ff56bb4c5c9c8bba38930260b2de4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=91=E6=9E=97?= Date: Fri, 11 Feb 2022 19:00:55 +0800 Subject: [PATCH] modify plugin name --- Coroutine/Http.php | 244 +++++++++++++++++++++++++++++++++++++++++++++ ProcessManager.php | 52 +++++++++- 2 files changed, 291 insertions(+), 5 deletions(-) create mode 100644 Coroutine/Http.php diff --git a/Coroutine/Http.php b/Coroutine/Http.php new file mode 100644 index 0000000..96b3b3e --- /dev/null +++ b/Coroutine/Http.php @@ -0,0 +1,244 @@ +getContainer()->get(ProcessManager::class); + $process->stop(); + + foreach ($this->servers as $server) { + $server->shutdown(); + } + } + + + public function shutdown() + { + + } + + + /** + * @param array $config + * @return void + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + * @throws ConfigException + * + * 异步任务进程 + */ + public function startTaskWorker(array $config) + { + $task_worker_num = $config['settings']['task_worker_num'] ?? 0; + if ($task_worker_num < 1) { + return; + } + $task_enable_coroutine = $config['settings']['task_enable_coroutine']; + if ($task_enable_coroutine) { + $tasker = $this->getContainer()->get(CoroutineTaskExecute::class); + } else { + $tasker = $this->getContainer()->get(AsyncTaskExecute::class); + } + $tasker->setTotal($task_worker_num); + $tasker->start(); + } + + + public function start(array $config) + { + $this->configs = $this->sortService($config['ports']); + run(function () use ($config) { + $event = \Kiri::getDi()->get(EventDispatch::class); + + $this->startTaskWorker($config); + + foreach ($config as $value) { + $value = $this->resolveCallback($value); + if ($value['type'] == Constant::SERVER_TYPE_HTTP) { + $onRequest = $value['events'][Constant::REQUEST] ?? null; + if (is_null($onRequest)) { + throw new \Exception('Server callback con\'t null.'); + } + Coroutine::create(function () use ($value, $event, $onRequest) { + $this->bindHttpService($value, $onRequest); + }); + } else if ($value['type'] == Constant::SERVER_TYPE_WEBSOCKET) { + $handshake = $value['events'][Constant::HANDSHAKE] ?? null; + + $open = $value['events'][Constant::OPEN] ?? null; + + $close = $value['events'][Constant::CLOSE] ?? null; + $message = $value['events'][Constant::MESSAGE] ?? null; + if (is_null($message)) { + throw new \Exception('Server callback con\'t null.'); + } + + Coroutine::create(function () use ($value, $handshake, $open, $close, $message) { + $this->WebSocketService($value, $handshake, $open, $close, $message); + }); + } else { + $message = $value['events'][Constant::RECEIVE] ?? null; + if (is_null($message)) { + throw new \Exception('Server callback con\'t null.'); + } + $conn = $value['events'][Constant::CONNECT] ?? null; + $close = $value['events'][Constant::CLOSE] ?? null; + + Coroutine::create(function () use ($value, $message, $conn, $close) { + $this->bindTcpService($value, $message, $conn, $close); + }); + } + } + }); + } + + + /** + * @param $value + * @param $onRequest + * @return void + * @throws \Exception + */ + protected function bindHttpService($value, $onRequest) + { + $server = new Coroutine\Http\Server($value['host'], $value['port'], null, true); + + $this->servers[$value['port']] = $server; + + $server->handle('/', function (Request $request, Response $response) use ($onRequest) { + call_user_func($onRequest, $request, $response); + }); + $server->start(); + + $this->bindHttpService($value, $onRequest); + } + + + /** + * @param $value + * @param $message + * @param $conn + * @param $close + * @return void + * @throws Exception + */ + protected function bindTcpService($value, $message, $conn, $close) + { + $server = new Coroutine\Server($value['host'], $value['port'], null, true); + + $this->servers[$value['port']] = $server; + + $server->handle(function (Coroutine\Server\Connection $connection) use ($message, $conn, $close) { + if (!is_null($conn)) { + call_user_func($conn, $connection); + } + while (true) { + $data = $connection->recv(1024); + if ($data === '' || $data === false) { + defer(function () use ($close, $connection) { + call_user_func($close, $connection); + }); + $connection->close(); + break; + } + call_user_func($message, $data); + } + }); + $server->start(); + + $this->bindTcpService($value, $message, $conn, $close); + } + + + /** + * @param $value + * @param $handshake + * @param $open + * @param $close + * @param $message + * @return void + */ + private function WebSocketService($value, $handshake, $open, $close, $message) + { + $server = new Coroutine\Http\Server($value['host'], $value['port'], null, true); + + $this->servers[$value['port']] = $server; + + $server->handle('/', function (Request $request, Response $response) use ($handshake, $open, $close, $message) { + if (is_null($handshake)) { + $response->upgrade(); + } else { + call_user_func($handshake, $request, $response); + } + if ($response->isWritable() && is_callable($open)) { + call_user_func($open, $response); + } + while (($data = $response->recv()) instanceof Frame) { + call_user_func($message, $data); + } + call_user_func($close, $response->fd); + }); + $server->start(); + + $this->WebSocketService($value, $handshake, $open, $close, $message); + } + + + /** + * @param array $config + * @return array + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + */ + protected function resolveCallback(array $config): array + { + if (!isset($config['events'])) { + return $config; + } + foreach ($config['events'] as $key => $event) { + if (is_array($event)) { + if (is_string($event[0])) { + $event[0] = $this->getContainer()->get($event[0]); + $config['events'][$key] = $event; + } + } + } + return $config; + } + + +} diff --git a/ProcessManager.php b/ProcessManager.php index 0d528b8..8f4813a 100644 --- a/ProcessManager.php +++ b/ProcessManager.php @@ -24,15 +24,20 @@ class ProcessManager private array $_process = []; + /** @var array */ + private array $_taskProcess = []; + + #[Inject(LoggerInterface::class)] public LoggerInterface $logger; /** * @param string|OnProcessInterface|BaseProcess $customProcess + * @param string $tag * @return void * @throws ConfigException */ - public function add(string|OnProcessInterface|BaseProcess $customProcess) + public function add(string|OnProcessInterface|BaseProcess $customProcess, string $tag = 'default') { $server = Kiri::getDi()->get(SwooleServerInterface::class); if (is_string($customProcess)) { @@ -48,7 +53,42 @@ class ProcessManager } else { $server->addProcess($process = $this->parse($customProcess, $system)); } - $this->_process[$customProcess->getName()] = $process; + $this->_process[$tag][$customProcess->getName()] = $process; + } + + + /** + * @param string|null $name + * @param string $tag + * @return Process|null + */ + public function get(?string $name = null, string $tag = 'default'): array|Process|null + { + $process = $this->_process[$tag] ?? null; + if (empty($process)) { + return null; + } + if (!empty($name)) { + if (!isset($process[$name])) { + return null; + } + return $process[$name]; + } + return $process; + } + + + /** + * @return void + */ + public function stop() + { + foreach ($this->_process as $process) { + $process->exit(0); + } + foreach ($this->_taskProcess as $process) { + $process->exit(0); + } } @@ -92,13 +132,14 @@ class ProcessManager /** * @param array $processes + * @param string $tag * @return void * @throws ConfigException */ - public function batch(array $processes) + public function batch(array $processes, string $tag = 'default') { foreach ($processes as $process) { - $this->add($process); + $this->add($process, $tag); } } @@ -106,9 +147,10 @@ class ProcessManager /** * @param string $message * @param string $name + * @param string $tag * @return void */ - public function push(string $message, string $name = '') + public function push(string $message, string $name = '', string $tag = 'default') { $processes = $this->_process; if (!empty($this->_process[$name])) {