modify
This commit is contained in:
@@ -5,6 +5,7 @@ namespace Snowflake\Process;
|
||||
|
||||
|
||||
use ReflectionException;
|
||||
use Snowflake\Core\Json;
|
||||
use Snowflake\Crontab;
|
||||
use Snowflake\Exception\ComponentException;
|
||||
use Snowflake\Exception\ConfigException;
|
||||
@@ -23,67 +24,70 @@ use Swoole\Timer;
|
||||
class CrontabProcess extends Process
|
||||
{
|
||||
|
||||
|
||||
public Channel $channel;
|
||||
/** @var Crontab[] $names */
|
||||
public array $names = [];
|
||||
|
||||
|
||||
/**
|
||||
* @param \Swoole\Process $process
|
||||
*/
|
||||
public function onHandler(\Swoole\Process $process): void
|
||||
{
|
||||
$this->channel = new Channel(5000);
|
||||
|
||||
Coroutine::create([$this, 'execute']);
|
||||
|
||||
Timer::tick(1000, [$this, 'systemLoop']);
|
||||
}
|
||||
/**
|
||||
* @param \Swoole\Process $process
|
||||
*/
|
||||
public function onHandler(\Swoole\Process $process): void
|
||||
{
|
||||
while (true) {
|
||||
$content = $process->read();
|
||||
$_content = json_decode($content, true);
|
||||
if (is_null($_content)) {
|
||||
$this->jobDelivery($content);
|
||||
} else {
|
||||
$this->otherAction($content);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @throws \Exception
|
||||
*/
|
||||
public function execute()
|
||||
{
|
||||
while (true) {
|
||||
/** @var Crontab $list */
|
||||
$list = $this->channel->pop(-1);
|
||||
$list->execute();
|
||||
}
|
||||
}
|
||||
/**
|
||||
* @param $content
|
||||
*/
|
||||
private function otherAction($content)
|
||||
{
|
||||
call_user_func(match ($content['action']) {
|
||||
'clear' => function ($content) {
|
||||
if (!isset($this->names[$content['name']])) {
|
||||
return;
|
||||
}
|
||||
$this->names[$content['name']]->clearTimer();
|
||||
},
|
||||
'clearAll' => function () {
|
||||
foreach ($this->names as $name => $crontab) {
|
||||
$crontab->clearTimer();
|
||||
}
|
||||
},
|
||||
default => function () {
|
||||
$this->application->error('unknown action');
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @throws ReflectionException
|
||||
* @throws ComponentException
|
||||
* @throws ConfigException
|
||||
* @throws NotFindClassException
|
||||
* @throws Exception
|
||||
* @throws \Exception
|
||||
*/
|
||||
public function systemLoop()
|
||||
{
|
||||
$score = time();
|
||||
$redis = Snowflake::app()->getRedis();
|
||||
/**
|
||||
* @param $content
|
||||
*/
|
||||
private function jobDelivery($content)
|
||||
{
|
||||
$content = unserialize($content);
|
||||
$this->names[$content->getName()] = $content;
|
||||
if (!($content instanceof Crontab)) {
|
||||
return;
|
||||
}
|
||||
$runTicker = [$content, 'execute'];
|
||||
$timer = $content->getTickTime() * 1000;
|
||||
if ($content->isLoop()) {
|
||||
$timerId = Timer::tick($timer, $runTicker);
|
||||
} else {
|
||||
$timerId = Timer::after($timer, $runTicker);
|
||||
}
|
||||
$content->setTimerId($timerId);
|
||||
}
|
||||
|
||||
$lists = $redis->zRangeByScore('system:crontab', '0', (string)$score);
|
||||
$redis->zRemRangeByScore('system:crontab', '0', (string)$score);
|
||||
|
||||
if (empty($lists)) {
|
||||
$redis->release();
|
||||
return;
|
||||
}
|
||||
|
||||
$barrier = Barrier::make();
|
||||
foreach ($lists as $list) {
|
||||
$list = unserialize($list);
|
||||
if (!($list instanceof Crontab)) {
|
||||
continue;
|
||||
}
|
||||
$this->channel->push($list);
|
||||
}
|
||||
Barrier::wait($barrier);
|
||||
$redis->release();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user