get('crontab'); $crontab->clearAll(); if (Snowflake::getPlatform()->isLinux()) { name($this->pid, 'Crontab zookeeper.'); } Timer::tick(1000, function () { [$range, $redis] = $this->loadCarobTask(); $server = Snowflake::app()->getSwoole(); $setting = $server->setting['worker_num']; foreach ($range as $value) { $this->dispatch($server, $redis, $setting, $value); } $redis->release(); }); } /** * @param $server * @param Redis|\Redis $redis * @param int $setting * @param $value * @throws Exception */ private function dispatch($server, Redis|\Redis $redis, int $setting, $value) { $server->sendMessage(swoole_serialize([ 'action' => 'crontab', 'handler' => swoole_unserialize($redis->get('crontab:' . $value)) ]), random_int(0, $setting - 1)); $redis->del('crontab:' . $value); } /** * @return array * @throws Exception */ private function loadCarobTask(): array { $redis = Snowflake::app()->getRedis(); $startTime = time(); $range = $redis->zRangeByScore(Producer::CRONTAB_KEY, '0', (string)$startTime); $redis->zRemRangeByScore(Producer::CRONTAB_KEY, '0', (string)$startTime); return [$range, $redis]; } /** * @param string $name */ public function clear(string $name) { if (!isset($this->names[$name])) { return; } $timers = $this->timers[$name]; $search = array_search($name, $this->scores[$timers]); if ($search !== false) { unset($this->scores[$timers][$search]); } unset($this->timers[$name], $this->names[$name]); } }