Files
kiri-core/kiri-task/CoroutineTaskExecute.php
T
as2252258 d8222366b1 Revert "改名"
This reverts commit fdf58326
2022-01-17 18:45:00 +08:00

86 lines
1.4 KiB
PHP

<?php
namespace Kiri\Task;
use Kiri\Abstracts\Component;
use Kiri\Core\HashMap;
use Kiri\Exception\ConfigException;
use ReflectionException;
use Swoole\Coroutine;
use Swoole\Server\Task;
class CoroutineTaskExecute extends Component
{
use TaskResolve;
private HashMap $hashMap;
private Coroutine\Channel $channel;
private OnServerTask $taskServer;
private int $total = 50;
/**
*
*/
public function init()
{
$this->hashMap = new HashMap();
$this->channel = new Coroutine\Channel($this->total);
$this->taskServer = \Kiri::getDi()->get(OnServerTask::class);
Coroutine::create(function () {
$barrier = Coroutine\Barrier::make();
for ($i = 0; $i < 50; $i++) {
Coroutine::create(function () {
$this->handler();
});
}
Coroutine\Barrier::wait($barrier);
});
}
/**
* @return void
* @throws ConfigException
*/
protected function handler()
{
$data = $this->channel->pop(-1);
$task = new Task();
$task->data = $data;
$this->taskServer->onCoroutineTask(null, $task);
$this->handler();
}
/**
* @param OnTaskInterface|string $handler
* @param array $params
* @param int $workerId
* @return void
* @throws ReflectionException
*/
public function execute(OnTaskInterface|string $handler, array $params = [], int $workerId = -1)
{
if (is_string($handler)) {
$handler = $this->handle($handler, $params);
}
$this->channel->push(serialize($handler));
}
}