diff --git a/Contract/OnTaskInterface.php b/Contract/OnTaskInterface.php deleted file mode 100644 index 72d59c6..0000000 --- a/Contract/OnTaskInterface.php +++ /dev/null @@ -1,17 +0,0 @@ -sortService($configs['ports']) as $config) { $this->startListenerHandler($context, $config, $daemon); } - $this->bindCallback([Constant::PIPE_MESSAGE => [OnPipeMessage::class, 'onPipeMessage']]); + $this->bindPipeMessage(); + } + + + /** + * @return void + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + */ + public function bindPipeMessage(): void + { + $pipeMessage = $this->getContainer()->get(OnPipeMessage::class); + $this->server->on(Constant::PIPE_MESSAGE, [$pipeMessage, 'onPipeMessage']); + + if (!isset($this->server->setting['task_worker_num']) || $this->server->setting['task_worker_num'] < 1) { + return; + } + + $this->getContainer()->get(TaskManager::class)->taskListener($this->server); } @@ -247,22 +264,6 @@ class ServerManager extends Component * @throws ContainerExceptionInterface * @throws NotFoundExceptionInterface * @throws Exception - * - * - * - * $data = new Table($this->container->get(OutputInterface::class)); - * $data->setHeaders(['key', 'value']); - * - * $array = []; - * foreach ($this->server->setting as $key => $value) { - * $array[] = [$key, $value]; - * $array[] = new TableSeparator(); - * } - * - * array_pop($array); - * - * $data->setStyle('box-double'); - * $data->setRows($array); */ private function createBaseServer(string $type, string $host, int $port, int $mode, array $settings = []) { @@ -292,9 +293,6 @@ class ServerManager extends Component */ private function addDefaultListener(array $settings): void { - if (($this->server->setting['task_worker_num'] ?? 0) > 0) { - $this->addTaskListener($settings['events']); - } $this->addServiceEvents(ServerManager::DEFAULT_EVENT, $this->server); if (!empty($settings['events']) && is_array($settings['events'])) { $this->addServiceEvents($settings['events'], $this->server); @@ -328,55 +326,4 @@ class ServerManager extends Component } - /** - * @param mixed $message - * @param int $workerId - * @return mixed - */ - public function sendMessage(mixed $message, int $workerId): mixed - { - return $this->server?->sendMessage($message, $workerId); - } - - - /** - * @param array $events - * @return void - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - */ - private function addTaskListener(array $events = []): void - { - $task_use_object = $this->server->setting['task_object'] ?? $this->server->setting['task_use_object'] ?? false; - $reflect = $this->container->get(OnServerTask::class); - $this->server->on('finish', $events[Constant::FINISH] ?? [$reflect, 'onFinish']); - if ($task_use_object || $this->server->setting['task_enable_coroutine']) { - $this->server->on('task', $events[Constant::TASK] ?? [$reflect, 'onCoroutineTask']); - } else { - $this->server->on('task', $events[Constant::TASK] ?? [$reflect, 'onTask']); - } - } - - - /** - * @param array|null $settings - * @return void - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - */ - public function bindCallback(?array $settings = []) - { - if (count($settings) < 1) { - return; - } - foreach ($settings as $event_type => $callback) { - if ($this->server->getCallback($event_type) !== null) { - continue; - } - if (is_array($callback) && !is_object($callback[0])) { - $callback[0] = $this->container->get($callback[0]); - } - $this->server->on($event_type, $callback); - } - } } diff --git a/Tasker/AsyncTaskExecute.php b/Tasker/AsyncTaskExecute.php deleted file mode 100644 index 5476847..0000000 --- a/Tasker/AsyncTaskExecute.php +++ /dev/null @@ -1,91 +0,0 @@ -hashMap = new HashMap(); - } - - - /** - * @param string $key - * @param $handler - */ - public function reg(string $key, $handler) - { - $this->hashMap->put($key, $handler); - } - - - /** - * @param OnTaskInterface|string $handler - * @param array $params - * @param int $workerId - * @throws Exception - */ - public function execute(OnTaskInterface|string $handler, array $params = [], int $workerId = -1) - { - if (!$this->server) { - $this->server = Kiri::getDi()->get(SwooleServerInterface::class); - } - if ($workerId < 0 || $workerId > $this->server->setting['task_worker_num']) { - $workerId = random_int(0, $this->server->setting['task_worker_num'] - 1); - } - if (is_string($handler)) { - $handler = $this->handle($handler, $params); - } - $this->server->task(serialize($handler), $workerId); - } - - - /** - * @param $handler - * @param $params - * @return object - * @throws ReflectionException - * @throws Exception - */ - private function handle($handler, $params): object - { - if (!class_exists($handler) && $this->hashMap->has($handler)) { - $handler = $this->hashMap->get($handler); - } - $implements = $this->container->getReflect($handler); - if (!in_array(OnTaskInterface::class, $implements->getInterfaceNames())) { - throw new Exception('Task must instance ' . OnTaskInterface::class); - } - return $implements->newInstanceArgs($params); - } - - -} diff --git a/Tasker/OnServerTask.php b/Tasker/OnServerTask.php deleted file mode 100644 index d99ab05..0000000 --- a/Tasker/OnServerTask.php +++ /dev/null @@ -1,94 +0,0 @@ -resolve($data); - } catch (\Throwable $exception) { - $data = jTraceEx($exception); - - $this->logger->error('task', [error_trigger_format($exception)]); - } finally { - $server->finish($data); - } - } - - - /** - * @param Server $server - * @param Server\Task $task - * @throws ConfigException - */ - public function onCoroutineTask(Server $server, Server\Task $task) - { - try { - $data = $this->resolve($task->data); - } catch (\Throwable $exception) { - $data = jTraceEx($exception); - - $this->logger->error('task', [error_trigger_format($exception)]); - } finally { - $task->finish($data); - } - } - - - /** - * @param $data - * @return null - */ - private function resolve($data) - { - $execute = unserialize($data); - if ($execute instanceof OnTaskInterface) { - return $execute->execute(); - } - return null; - } - - - /** - * @param Server $server - * @param int $task_id - * @param mixed $data - */ - public function onFinish(Server $server, int $task_id, mixed $data) - { - if (!($data instanceof OnTaskInterface)) { - return; - } - $data->finish($server, $task_id); - } - - -}