Files
kiri-core/System/Crontab/Zookeeper.php
T

122 lines
2.2 KiB
PHP
Raw Normal View History

2021-03-26 01:01:21 +08:00
<?php
2021-08-11 01:04:57 +08:00
namespace Kiri\Crontab;
2021-03-26 01:01:21 +08:00
use Exception;
2021-07-23 17:38:09 +08:00
use Server\ServerManager;
2021-07-20 11:28:19 +08:00
use Server\SInterface\CustomProcess;
2021-08-11 01:04:57 +08:00
use Kiri\Abstracts\Config;
use Kiri\Cache\Redis;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
2021-07-20 11:28:19 +08:00
use Swoole\Process;
2021-04-28 12:08:10 +08:00
use Swoole\Timer;
2021-04-16 01:17:37 +08:00
use Throwable;
2021-03-26 01:01:21 +08:00
/**
2021-04-12 02:55:12 +08:00
* Class Zookeeper
2021-08-11 01:04:57 +08:00
* @package Kiri\Process
2021-03-26 01:01:21 +08:00
*/
2021-07-20 11:28:19 +08:00
class Zookeeper implements CustomProcess
2021-03-26 01:01:21 +08:00
{
2021-04-23 03:55:51 +08:00
2021-04-28 12:08:10 +08:00
private int $workerNum = 0;
2021-04-23 03:55:51 +08:00
2021-04-26 17:34:33 +08:00
/**
2021-07-20 11:28:19 +08:00
* @param Process $process
2021-04-26 17:34:33 +08:00
* @return string
* @throws ConfigException
*/
2021-07-20 11:28:19 +08:00
public function getProcessName(Process $process): string
2021-04-28 12:08:10 +08:00
{
2021-07-20 11:28:19 +08:00
$name = Config::get('id', 'system') . '[' . $process->pid . ']';
2021-04-28 12:08:10 +08:00
if (!empty($prefix)) {
$name .= '.Crontab zookeeper';
}
return $name;
}
/**
2021-07-20 11:28:19 +08:00
* @param Process $process
2021-04-28 12:08:10 +08:00
* @throws Exception
*/
2021-07-20 11:28:19 +08:00
public function onHandler(Process $process): void
2021-04-28 12:08:10 +08:00
{
2021-07-23 17:38:09 +08:00
Timer::tick(300, [$this, 'loop']);
2021-04-28 12:08:10 +08:00
}
/**
* @throws ConfigException
* @throws Exception
*/
public function loop()
{
2021-08-11 01:04:57 +08:00
$redis = Kiri::app()->getRedis();
2021-04-28 12:08:10 +08:00
defer(fn() => $redis->release());
$range = $this->loadCarobTask($redis);
foreach ($range as $value) {
$this->dispatch($redis, $value);
}
}
2021-04-23 03:55:51 +08:00
2021-04-23 12:03:24 +08:00
/**
* @param Redis|\Redis $redis
* @param $value
* @throws Exception
*/
2021-04-28 12:08:10 +08:00
private function dispatch(Redis|\Redis $redis, $value)
{
try {
if (empty($handler = $redis->get('crontab:' . $value))) {
return;
}
2021-07-23 17:38:09 +08:00
$server = ServerManager::getContext()->getServer();
2021-07-23 17:54:08 +08:00
$server->sendMessage(swoole_unserialize($handler), $this->getWorker());
2021-04-28 12:08:10 +08:00
} catch (Throwable $exception) {
logger()->addError($exception);
}
}
/**
* @return int
2021-07-23 17:38:09 +08:00
* @throws Exception
2021-04-28 12:08:10 +08:00
*/
private function getWorker(): int
{
2021-07-23 17:38:09 +08:00
if ($this->workerNum == 0) {
$server = ServerManager::getContext()->getServer();
$this->workerNum = $server->setting['worker_num'] + ($server->setting['task_worker_num'] ?? 0);
}
2021-04-28 12:08:10 +08:00
return random_int(0, $this->workerNum - 1);
}
2021-04-23 03:55:51 +08:00
2021-04-26 17:34:33 +08:00
/**
* @param Redis|\Redis $redis
* @return array
*/
2021-04-28 12:08:10 +08:00
private function loadCarobTask(Redis|\Redis $redis): array
{
2021-06-07 11:49:24 +08:00
$script = <<<SCRIPT
2021-06-07 12:03:37 +08:00
local _two = redis.call('zRangeByScore', KEYS[1], '0', ARGV[1])
2021-06-07 12:05:03 +08:00
if (table.getn(_two) > 0) then
redis.call('ZREM', KEYS[1], unpack(_two))
end
2021-06-07 12:03:37 +08:00
return _two
2021-06-07 11:49:24 +08:00
SCRIPT;
2021-06-07 12:05:39 +08:00
return $redis->eval($script, [Producer::CRONTAB_KEY, (string)time()], 1);
2021-04-28 12:08:10 +08:00
}
2021-04-16 00:33:13 +08:00
2021-03-26 01:01:21 +08:00
}