qqq
This commit is contained in:
+86
-66
@@ -25,78 +25,98 @@ class Task implements TaskInterface
|
||||
|
||||
|
||||
/**
|
||||
* @param Server $server
|
||||
* @return void
|
||||
*/
|
||||
public function initTaskWorker(Server $server): void
|
||||
{
|
||||
if (!isset($server->setting[Constant::OPTION_TASK_WORKER_NUM])) {
|
||||
return;
|
||||
}
|
||||
if ($server->setting[Constant::OPTION_TASK_WORKER_NUM] < 1) {
|
||||
return;
|
||||
}
|
||||
$server->on('finish', [$this, 'onFinish']);
|
||||
$server->on('task', [$this, 'onTask']);
|
||||
}
|
||||
* @return void
|
||||
* @throws ReflectionException
|
||||
*/
|
||||
public function init(): void
|
||||
{
|
||||
$this->server = Kiri::getDi()->get(ServerInterface::class);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param Server $server
|
||||
* @param int $task_id
|
||||
* @param mixed $data
|
||||
* @return void
|
||||
* @throws ContainerExceptionInterface
|
||||
* @throws NotFoundExceptionInterface
|
||||
* @throws ReflectionException
|
||||
*/
|
||||
public function onFinish(Server $server, int $task_id, mixed $data): void
|
||||
{
|
||||
event(new OnTaskFinish($task_id, $data));
|
||||
}
|
||||
/**
|
||||
* @param Server $server
|
||||
* @return void
|
||||
*/
|
||||
public function initTaskWorker(Server $server): void
|
||||
{
|
||||
if (!isset($server->setting[Constant::OPTION_TASK_WORKER_NUM])) {
|
||||
return;
|
||||
}
|
||||
if ($server->setting[Constant::OPTION_TASK_WORKER_NUM] < 1) {
|
||||
return;
|
||||
}
|
||||
$server->on('finish', [$this, 'onFinish']);
|
||||
$server->on('task', [$this, 'onTask']);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param Server $server
|
||||
* @param int $task_id
|
||||
* @param int $src_worker_id
|
||||
* @param mixed $data
|
||||
* @return mixed
|
||||
* @throws ReflectionException
|
||||
*/
|
||||
public function onTask(Server $server, int $task_id, int $src_worker_id, mixed $data): mixed
|
||||
{
|
||||
$data = json_decode($data, true);
|
||||
if (is_null($data)) {
|
||||
return null;
|
||||
}
|
||||
$data[0] = Kiri::getDi()->get($data[0]);
|
||||
return call_user_func($data, $task_id, $src_worker_id);
|
||||
}
|
||||
/**
|
||||
* @param Server $server
|
||||
* @param int $task_id
|
||||
* @param mixed $data
|
||||
* @return void
|
||||
* @throws ContainerExceptionInterface
|
||||
* @throws NotFoundExceptionInterface
|
||||
* @throws ReflectionException
|
||||
*/
|
||||
public function onFinish(Server $server, int $task_id, mixed $data): void
|
||||
{
|
||||
event(new OnTaskFinish($task_id, $data));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param array|string|object $handler
|
||||
* @param int|null $workerId
|
||||
* @return void
|
||||
* @throws ReflectionException
|
||||
*/
|
||||
public function dispatch(array|string|object $handler, ?int $workerId = null): void
|
||||
{
|
||||
if (is_null($workerId)) {
|
||||
$workerId = rand(0, $this->server->setting[Constant::OPTION_TASK_WORKER_NUM] - 1);
|
||||
}
|
||||
if (is_string($handler)) {
|
||||
$this->server->task(serialize([di($handler), 'handle']), $workerId);
|
||||
} else if (is_array($handler)) {
|
||||
if (is_string($handler[0])) {
|
||||
$handler[0] = di($handler[0]);
|
||||
}
|
||||
$this->server->task(serialize($handler), $workerId);
|
||||
} else {
|
||||
$this->server->task(serialize([$handler, 'handle']), $workerId);
|
||||
}
|
||||
}
|
||||
/**
|
||||
* @param Server $server
|
||||
* @param int $task_id
|
||||
* @param int $src_worker_id
|
||||
* @param mixed $data
|
||||
* @return mixed
|
||||
* @throws ReflectionException
|
||||
*/
|
||||
public function onTask(Server $server, int $task_id, int $src_worker_id, mixed $data): mixed
|
||||
{
|
||||
$data = json_decode($data, true);
|
||||
if (is_null($data)) {
|
||||
return null;
|
||||
}
|
||||
$data[0] = Kiri::getDi()->get($data[0]);
|
||||
return call_user_func($data, $task_id, $src_worker_id);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param mixed $data
|
||||
* @param float $timeout
|
||||
* @param int $dstWorkerId
|
||||
* @return mixed
|
||||
*/
|
||||
public function taskWait(mixed $data, float $timeout = 0.5, int $dstWorkerId = -1): mixed
|
||||
{
|
||||
return $this->server->taskwait($data, $timeout, $dstWorkerId);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param array $tasks
|
||||
* @param float $timeout
|
||||
* @return false|array
|
||||
*/
|
||||
public function taskCo(array $tasks, float $timeout = 0.5): false|array
|
||||
{
|
||||
return $this->server->taskCo($tasks, $timeout);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param array $tasks
|
||||
* @param float $timeout
|
||||
* @return false|array
|
||||
*/
|
||||
public function taskWaitMulti(array $tasks, float $timeout = 0.5): false|array
|
||||
{
|
||||
return $this->server->taskWaitMulti($tasks, $timeout);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
+20
-6
@@ -6,12 +6,26 @@ interface TaskInterface
|
||||
{
|
||||
|
||||
|
||||
/**
|
||||
* @param array $handler
|
||||
* @param int|null $workerId
|
||||
* @return void
|
||||
*/
|
||||
public function dispatch(array $handler, ?int $workerId = null): void;
|
||||
/**
|
||||
* @param array $tasks
|
||||
* @param float $timeout
|
||||
* @return false|array
|
||||
*/
|
||||
public function taskWaitMulti(array $tasks, float $timeout = 0.5): false|array;
|
||||
|
||||
/**
|
||||
* @param array $tasks
|
||||
* @param float $timeout
|
||||
* @return false|array
|
||||
*/
|
||||
public function taskCo(array $tasks, float $timeout = 0.5): false|array;
|
||||
|
||||
/**
|
||||
* @param mixed $data
|
||||
* @param float $timeout
|
||||
* @param int $dstWorkerId
|
||||
* @return mixed
|
||||
*/
|
||||
public function taskWait(mixed $data, float $timeout = 0.5, int $dstWorkerId = -1): mixed;
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user