From 8257e675ba41274e8d93c7d095668d221cda308e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=91=E6=9E=97?= Date: Thu, 17 Aug 2023 16:56:51 +0800 Subject: [PATCH] qqq --- Task/Task.php | 152 +++++++++++++++++++++++------------------ Task/TaskInterface.php | 26 +++++-- 2 files changed, 106 insertions(+), 72 deletions(-) diff --git a/Task/Task.php b/Task/Task.php index 65cb575..386d17b 100644 --- a/Task/Task.php +++ b/Task/Task.php @@ -25,78 +25,98 @@ class Task implements TaskInterface /** - * @param Server $server - * @return void - */ - public function initTaskWorker(Server $server): void - { - if (!isset($server->setting[Constant::OPTION_TASK_WORKER_NUM])) { - return; - } - if ($server->setting[Constant::OPTION_TASK_WORKER_NUM] < 1) { - return; - } - $server->on('finish', [$this, 'onFinish']); - $server->on('task', [$this, 'onTask']); - } + * @return void + * @throws ReflectionException + */ + public function init(): void + { + $this->server = Kiri::getDi()->get(ServerInterface::class); + } - /** - * @param Server $server - * @param int $task_id - * @param mixed $data - * @return void - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - * @throws ReflectionException - */ - public function onFinish(Server $server, int $task_id, mixed $data): void - { - event(new OnTaskFinish($task_id, $data)); - } + /** + * @param Server $server + * @return void + */ + public function initTaskWorker(Server $server): void + { + if (!isset($server->setting[Constant::OPTION_TASK_WORKER_NUM])) { + return; + } + if ($server->setting[Constant::OPTION_TASK_WORKER_NUM] < 1) { + return; + } + $server->on('finish', [$this, 'onFinish']); + $server->on('task', [$this, 'onTask']); + } - /** - * @param Server $server - * @param int $task_id - * @param int $src_worker_id - * @param mixed $data - * @return mixed - * @throws ReflectionException - */ - public function onTask(Server $server, int $task_id, int $src_worker_id, mixed $data): mixed - { - $data = json_decode($data, true); - if (is_null($data)) { - return null; - } - $data[0] = Kiri::getDi()->get($data[0]); - return call_user_func($data, $task_id, $src_worker_id); - } + /** + * @param Server $server + * @param int $task_id + * @param mixed $data + * @return void + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + * @throws ReflectionException + */ + public function onFinish(Server $server, int $task_id, mixed $data): void + { + event(new OnTaskFinish($task_id, $data)); + } - /** - * @param array|string|object $handler - * @param int|null $workerId - * @return void - * @throws ReflectionException - */ - public function dispatch(array|string|object $handler, ?int $workerId = null): void - { - if (is_null($workerId)) { - $workerId = rand(0, $this->server->setting[Constant::OPTION_TASK_WORKER_NUM] - 1); - } - if (is_string($handler)) { - $this->server->task(serialize([di($handler), 'handle']), $workerId); - } else if (is_array($handler)) { - if (is_string($handler[0])) { - $handler[0] = di($handler[0]); - } - $this->server->task(serialize($handler), $workerId); - } else { - $this->server->task(serialize([$handler, 'handle']), $workerId); - } - } + /** + * @param Server $server + * @param int $task_id + * @param int $src_worker_id + * @param mixed $data + * @return mixed + * @throws ReflectionException + */ + public function onTask(Server $server, int $task_id, int $src_worker_id, mixed $data): mixed + { + $data = json_decode($data, true); + if (is_null($data)) { + return null; + } + $data[0] = Kiri::getDi()->get($data[0]); + return call_user_func($data, $task_id, $src_worker_id); + } + + + /** + * @param mixed $data + * @param float $timeout + * @param int $dstWorkerId + * @return mixed + */ + public function taskWait(mixed $data, float $timeout = 0.5, int $dstWorkerId = -1): mixed + { + return $this->server->taskwait($data, $timeout, $dstWorkerId); + } + + + /** + * @param array $tasks + * @param float $timeout + * @return false|array + */ + public function taskCo(array $tasks, float $timeout = 0.5): false|array + { + return $this->server->taskCo($tasks, $timeout); + } + + + /** + * @param array $tasks + * @param float $timeout + * @return false|array + */ + public function taskWaitMulti(array $tasks, float $timeout = 0.5): false|array + { + return $this->server->taskWaitMulti($tasks, $timeout); + } } diff --git a/Task/TaskInterface.php b/Task/TaskInterface.php index 333f35e..4a43615 100644 --- a/Task/TaskInterface.php +++ b/Task/TaskInterface.php @@ -6,12 +6,26 @@ interface TaskInterface { - /** - * @param array $handler - * @param int|null $workerId - * @return void - */ - public function dispatch(array $handler, ?int $workerId = null): void; + /** + * @param array $tasks + * @param float $timeout + * @return false|array + */ + public function taskWaitMulti(array $tasks, float $timeout = 0.5): false|array; + /** + * @param array $tasks + * @param float $timeout + * @return false|array + */ + public function taskCo(array $tasks, float $timeout = 0.5): false|array; + + /** + * @param mixed $data + * @param float $timeout + * @param int $dstWorkerId + * @return mixed + */ + public function taskWait(mixed $data, float $timeout = 0.5, int $dstWorkerId = -1): mixed; }