diff --git a/Task/OnTaskInterface.php b/Task/OnTaskInterface.php new file mode 100644 index 0000000..e5feb43 --- /dev/null +++ b/Task/OnTaskInterface.php @@ -0,0 +1,17 @@ +get($handler[0]); - return call_user_func($handler, $task_id, $src_worker_id, $params); + $handler = Kiri::getDi()->make($handler, $params); + if (!($handler instanceof OnTaskInterface)) { + throw new \Exception('Task process must implements ' . OnTaskInterface::class); + } + + $response = call_user_func([$handler, 'process'], $task_id, $src_worker_id); + } } catch (\Throwable $throwable) { - return $this->exception->emit($throwable, response()); + $response = throwable($throwable); + } finally { + $server->finish($response); } } diff --git a/Task/TaskExecute.php b/Task/TaskExecute.php index d3121c5..f20e904 100644 --- a/Task/TaskExecute.php +++ b/Task/TaskExecute.php @@ -7,17 +7,34 @@ use Kiri\Server\ServerInterface; class TaskExecute implements TaskInterface { + /** + * @param OnTaskInterface $handler + * @param mixed $data + * @param int $dstWorkerId + * @param callable|null $finishFinishCallback + * @return void + */ + public function task(OnTaskInterface $handler, mixed $data, int $dstWorkerId = -1, ?callable $finishFinishCallback = null): void + { + $server = \Kiri::getDi()->get(ServerInterface::class); + + $server->task([$handler, $data], $dstWorkerId, $finishFinishCallback); + } + + + /** + * @param OnTaskInterface $handler * @param mixed $data * @param float $timeout * @param int $dstWorkerId * @return mixed */ - public function taskWait(mixed $data, float $timeout = 0.5, int $dstWorkerId = -1): mixed + public function taskWait(OnTaskInterface $handler, mixed $data, float $timeout = 0.5, int $dstWorkerId = -1): mixed { $server = \Kiri::getDi()->get(ServerInterface::class); - return $server->taskwait($data, $timeout, $dstWorkerId); + return $server->taskwait([$handler, $data], $timeout, $dstWorkerId); } @@ -45,6 +62,4 @@ class TaskExecute implements TaskInterface return $server->taskWaitMulti($tasks, $timeout); } - - -} \ No newline at end of file +} diff --git a/Task/TaskInterface.php b/Task/TaskInterface.php index 4a43615..4c35149 100644 --- a/Task/TaskInterface.php +++ b/Task/TaskInterface.php @@ -7,11 +7,24 @@ interface TaskInterface /** - * @param array $tasks - * @param float $timeout - * @return false|array + * @param OnTaskInterface $handler + * @param mixed $data + * @param int $dstWorkerId + * @param callable|null $finishFinishCallback + * @return void */ - public function taskWaitMulti(array $tasks, float $timeout = 0.5): false|array; + public function task(OnTaskInterface $handler, mixed $data, int $dstWorkerId = -1, ?callable $finishFinishCallback = null): void; + + + /** + * @param OnTaskInterface $handler + * @param mixed $data + * @param float $timeout + * @param int $dstWorkerId + * @return mixed + */ + public function taskWait(OnTaskInterface $handler, mixed $data, float $timeout = 0.5, int $dstWorkerId = -1): mixed; + /** * @param array $tasks @@ -20,12 +33,12 @@ interface TaskInterface */ public function taskCo(array $tasks, float $timeout = 0.5): false|array; + /** - * @param mixed $data + * @param array $tasks * @param float $timeout - * @param int $dstWorkerId - * @return mixed + * @return false|array */ - public function taskWait(mixed $data, float $timeout = 0.5, int $dstWorkerId = -1): mixed; + public function taskWaitMulti(array $tasks, float $timeout = 0.5): false|array; } diff --git a/Task/TestTask.php b/Task/TestTask.php new file mode 100644 index 0000000..818cf7b --- /dev/null +++ b/Task/TestTask.php @@ -0,0 +1,24 @@ +