exception = Kiri::getDi()->get($exception); } /** * @param Server $server * @return void */ public function initTaskWorker(Server $server): void { if (!isset($server->setting[Constant::OPTION_TASK_WORKER_NUM])) { return; } if ($server->setting[Constant::OPTION_TASK_WORKER_NUM] < 1) { return; } $server->on('finish', [$this, 'onFinish']); $server->on('task', [$this, 'onTask']); } /** * @param Server $server * @param int $task_id * @param mixed $data * @return void * @throws */ public function onFinish(Server $server, int $task_id, mixed $data): void { event(new OnTaskFinish($task_id, $data)); } /** * @param Server $server * @param int $task_id * @param int $src_worker_id * @param mixed $data * @return void * @throws */ public function onTask(Server $server, int $task_id, int $src_worker_id, mixed $data): void { $response = 'task data format failed.'; try { $data = json_decode($data, true); if (!is_null($data)) { [$handler, $params] = [$data[0], $data[1]]; $handler = Kiri::getDi()->make($handler, $params); if (!($handler instanceof OnTaskInterface)) { throw new \Exception('Task process must implements ' . OnTaskInterface::class); } $response = call_user_func([$handler, 'process'], $task_id, $src_worker_id); } } catch (\Throwable $throwable) { $response = throwable($throwable); } finally { $server->finish($response); } } }