name('Crontab consumer'); $this->channel = new Coroutine\Channel(2000); go(function () { $this->popChannel(); }); $this->tick($process); } /** * @throws Exception */ public function popChannel() { /** @var Crontab $crontab */ $crontab = $this->channel->pop(-1); go(function () use ($crontab) { try { $crontab->increment()->execute(); if ($crontab->getExecuteNumber() < $crontab->getMaxExecuteNumber()) { Consumer::addTask($crontab); } else if ($crontab->isLoop()) { Consumer::addTask($crontab); } } catch (\Throwable $throwable) { $this->application->addError($throwable->getMessage()); } finally { fire(Event::SYSTEM_RESOURCE_RELEASES); } }); $this->popChannel(); } /** * @param \Swoole\Process $process * @throws \ReflectionException * @throws \Snowflake\Exception\ComponentException * @throws \Snowflake\Exception\ConfigException * @throws \Snowflake\Exception\NotFindClassException */ public function tick(\Swoole\Process $process) { [$value, $startTime] = swoole_unserialize($process->read()); $redis = Snowflake::app()->getRedis(); $crontab = swoole_unserialize($redis->get($value)); $redis->del($value); if (is_object($crontab)) { $this->channel->push($crontab); } $redis->release(); $this->tick($process); } /** * @param Crontab $crontab * @throws Exception */ private static function addTask(Crontab $crontab) { $redis = Snowflake::app()->getRedis(); $name = md5($crontab->getName()); $redis->set('crontab:' . $name, swoole_serialize($crontab)); $tickTime = time() + $crontab->getTickTime(); $redis->zAdd(Producer::CRONTAB_KEY, $tickTime, $crontab->getName()); } }