This commit is contained in:
2021-08-19 16:01:07 +08:00
parent 6fa51c2fe3
commit fb453f674d
5 changed files with 76 additions and 9 deletions
+8 -5
View File
@@ -4,19 +4,18 @@ declare(strict_types=1);
namespace Kafka;
use Kiri\Abstracts\Config;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use RdKafka\Conf;
use RdKafka\Consumer;
use RdKafka\ConsumerTopic;
use RdKafka\Exception;
use RdKafka\KafkaConsumer;
use RdKafka\TopicConf;
use Kiri\Abstracts\Config;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use Swoole\Coroutine\Channel;
use Server\Abstracts\CustomProcess;
use Swoole\Process;
use Throwable;
use Server\Abstracts\CustomProcess;
/**
* Class Queue
@@ -74,6 +73,10 @@ class Kafka extends CustomProcess
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
do {
if ($this->isExit()) {
$this->exit();
break;
}
$this->resolve($topic, $conf['interval'] ?? 1000);
} while (true);
} catch (Throwable $exception) {
+44 -1
View File
@@ -3,6 +3,7 @@
namespace Server\Abstracts;
use JetBrains\PhpStorm\Pure;
use Swoole\Coroutine;
use Swoole\Process;
@@ -16,20 +17,31 @@ abstract class CustomProcess implements \Server\SInterface\CustomProcess
protected bool $enableSwooleCoroutine = true;
/** @var Coroutine\Channel */
protected Coroutine\Channel $channel;
/**
* @param Process $process
*/
public function signListen(Process $process): void
{
$this->channel = new Coroutine\Channel(1);
$this->channel->push(1);
if (!$this->enableSwooleCoroutine) {
Process::signal(SIGTERM | SIGKILL, function ($signo)
use ($process) {
putenv('processStatus=exit');
$this->waiteExit($process);
});
} else {
go(function () use ($process) {
$data = Coroutine::waitSignal(SIGTERM | SIGKILL, -1);
if ($data) {
putenv('processStatus=exit');
$this->waiteExit($process);
}
});
@@ -37,9 +49,40 @@ abstract class CustomProcess implements \Server\SInterface\CustomProcess
}
/**
* @return string
*/
#[Pure] protected function getStatus(): string
{
return env('processStatus', 'working');
}
/**
* @return bool
*/
#[Pure] protected function isExit(): bool
{
return $this->getStatus() == 'exit';
}
/**
*
*/
protected function exit()
{
$this->channel->pop();
$this->channel->close();
}
/**
* @return bool
*/
public function isWorking(): bool
{
return false;
return $this->channel->isEmpty();
}
+11 -3
View File
@@ -5,12 +5,12 @@ namespace Kiri\Crontab;
use Exception;
use Server\ServerManager;
use Server\Abstracts\CustomProcess;
use Kiri\Abstracts\Config;
use Kiri\Cache\Redis;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use Server\Abstracts\CustomProcess;
use Server\ServerManager;
use Swoole\Process;
use Swoole\Timer;
use Throwable;
@@ -26,6 +26,9 @@ class Zookeeper extends CustomProcess
private int $workerNum = 0;
private int $_timer = -1;
/**
* @param Process $process
* @return string
@@ -47,7 +50,7 @@ class Zookeeper extends CustomProcess
*/
public function onHandler(Process $process): void
{
Timer::tick(300, [$this, 'loop']);
$this->_timer = Timer::tick(300, [$this, 'loop']);
}
@@ -57,6 +60,11 @@ class Zookeeper extends CustomProcess
*/
public function loop()
{
if ($this->isExit()) {
Timer::clear($this->_timer);
$this->exit();
return;
}
$redis = Kiri::app()->getRedis();
defer(fn() => $redis->release());
$range = $this->loadCarobTask($redis);
+4
View File
@@ -49,6 +49,10 @@ class LoggerProcess extends CustomProcess
*/
public function message(Process $process)
{
if ($this->isExit()) {
$this->exit();
return;
}
$message = Json::decode($process->read());
if (!empty($message)) {
Kiri::writeFile($this->getDirName($message), $message[0], FILE_APPEND);
@@ -54,6 +54,15 @@ class FileChangeCustomProcess extends CustomProcess
}
/**
* @param Process $process
*/
public function signListen(Process $process): void
{
}
/**
* @param $code
* @param $message