diff --git a/System/Abstracts/Crontab.php b/System/Abstracts/Crontab.php index d366cd8e..21c73ad5 100644 --- a/System/Abstracts/Crontab.php +++ b/System/Abstracts/Crontab.php @@ -17,6 +17,8 @@ use Exception; class Crontab extends Component { + const CRONTAB_KEY = 'system:crontab'; + /** * @param \Snowflake\Crontab $crontab @@ -25,9 +27,15 @@ class Crontab extends Component */ public function dispatch(\Snowflake\Crontab $crontab) { - /** @var CrontabProcess $redis */ - $redis = Snowflake::app()->get(CrontabProcess::class); - $redis->write(serialize($crontab)); + $redis = Snowflake::app()->getRedis(); + + $name = md5($crontab->getName()); + + $redis->set('crontab:' . $name, serialize($crontab)); + + $tickTime = time() + $crontab->getTickTime() * 1000; + + $redis->zAdd(self::CRONTAB_KEY, $tickTime, $crontab->getName()); } @@ -37,9 +45,10 @@ class Crontab extends Component */ public function clear(string $name) { - /** @var CrontabProcess $redis */ - $redis = Snowflake::app()->get(CrontabProcess::class); - $redis->write(Json::encode(['action' => 'clear', 'name' => $name])); + $redis = Snowflake::app()->getRedis(); + + $redis->zRem(self::CRONTAB_KEY, $name); + $redis->del('crontab:' . md5($name)); } @@ -48,9 +57,12 @@ class Crontab extends Component */ public function clearAll() { - /** @var CrontabProcess $redis */ - $redis = Snowflake::app()->get(CrontabProcess::class); - $redis->write(Json::encode(['action' => 'clearAll'])); + $redis = Snowflake::app()->getRedis(); + $data = $redis->zRange(self::CRONTAB_KEY, 0, -1); + $redis->del(self::CRONTAB_KEY); + foreach ($data as $datum) { + $redis->del('crontab:' . md5($datum)); + } } diff --git a/System/Process/CrontabProcess.php b/System/Process/CrontabProcess.php index bd26d62d..563e88b5 100644 --- a/System/Process/CrontabProcess.php +++ b/System/Process/CrontabProcess.php @@ -5,6 +5,8 @@ namespace Snowflake\Process; use Snowflake\Crontab; +use Snowflake\Abstracts\Crontab as ACrontab; +use Snowflake\Snowflake; use Swoole\Coroutine; use Swoole\Coroutine\WaitGroup; use Swoole\Coroutine\Channel; @@ -35,18 +37,21 @@ class CrontabProcess extends Process */ public function onHandler(\Swoole\Process $process): void { - $this->readBySocket($process); Timer::tick(1000, function () { $startTime = time(); - var_dump($startTime); + $redis = Snowflake::app()->getRedis(); - $time = $this->scores[$startTime]; - unset($this->scores[$startTime]); - foreach ($time as $value) { + $range = $redis->zRangeByScore(ACrontab::CRONTAB_KEY, 0, $startTime); + $redis->zRemRangeByScore(ACrontab::CRONTAB_KEY, 0, $startTime); + foreach ($range as $value) { + $crontab = $redis->get('crontab:' . md5($value)); + if (empty($crontab) || !($crontab = unserialize($crontab))) { + continue; + } Coroutine::create(function (Crontab $value, int $startTime) { - $this->dispatch($value, $startTime); - }, $value, $startTime); + $this->dispatch($value); + }, $crontab, $startTime); } }); } @@ -57,14 +62,14 @@ class CrontabProcess extends Process * @param int $startTime * @throws \Exception */ - private function dispatch(Crontab $value, int $startTime) + private function dispatch(Crontab $value) { try { $value->increment()->execute(); if ($value->getExecuteNumber() < $value->getMaxExecuteNumber()) { - $this->addTask($value, $startTime + $value->getTickTime()); + $this->addTask($value); } else if ($value->isLoop()) { - $this->addTask($value, $startTime + $value->getTickTime()); + $this->addTask($value); } } catch (\Throwable $exception) { $this->application->error($exception->getMessage()); @@ -72,51 +77,6 @@ class CrontabProcess extends Process } - /** - * @param \Swoole\Process $process - * @throws \Exception - */ - public function readBySocket(\Swoole\Process $process) - { - Coroutine::create(function (\Swoole\Process $process) { - try { - $content = $process->read(); - - $_content = json_decode($content, true); - if (is_null($_content)) { - $this->jobDelivery($content); - } else { - $this->otherAction($_content); - } - } catch (\Throwable $exception) { - $this->application->error($exception->getMessage()); - } finally { - $this->onHandler($process); - } - }, $process); - } - - - /** - * @param $content - */ - private function otherAction($content) - { - call_user_func(match ($content['action']) { - 'clear' => function ($content) { - $this->clear($content['name']); - }, - 'clearAll' => function () { - $this->names = []; - Timer::clearAll(); - }, - default => function () { - $this->application->error('unknown action'); - } - }, $content); - } - - /** * @param string $name */ @@ -135,43 +95,21 @@ class CrontabProcess extends Process } - /** - * @param $content - */ - private function jobDelivery($content) - { - /** @var Crontab $content */ - $content = unserialize($content); - - $ticker = intval($content->getTickTime() * 1000) + time(); - - $this->addTask($content, $ticker); - } - - /** * @param Crontab $content * @param $ticker */ - private function addTask(Crontab $content, $ticker) + private function addTask(Crontab $crontab) { - $name = $content->getName(); - if (isset($this->names[$name])) { - unset($this->names[$content->getName()]); + $redis = Snowflake::app()->getRedis(); - $search = array_search($content->getName(), $this->scores); - unset($this->scores[$search]); - } + $name = md5($crontab->getName()); - if (!isset($this->scores[$ticker])) { - $this->scores[$ticker] = []; - } + $redis->set('crontab:' . $name, serialize($crontab)); - $this->timers[$content->getName()] = $ticker; - $this->scores[$ticker][] = $content->getName(); - $this->names[$content->getName()] = $content; + $tickTime = time() + $crontab->getTickTime(); - ksort($this->scores, SORT_NUMERIC); + $redis->zAdd(ACrontab::CRONTAB_KEY, $tickTime, $crontab->getName()); }