Files
kiri-core/kiri-actor/Actor.php
T
2023-04-18 23:47:31 +08:00

239 lines
3.2 KiB
PHP

<?php
declare(strict_types=1);
namespace Kiri\Actor;
use Exception;
use JsonSerializable;
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
/**
* @Actor
*/
abstract class Actor implements ActorInterface, JsonSerializable
{
/**
* @var Channel
*/
private Channel $channel;
/**
* @var bool
*/
private bool $isShutdown = false;
/**
* @var int
*/
private int $messageId = -1;
/**
* @var int
*/
private int $coroutineId = -1;
/**
* @var ActorState
*/
private ActorState $state;
/**
* @var float
*/
private float $startTime = 0;
/**
* @var int
*/
private int $refreshInterval = 0;
/**
* @return ActorState
*/
public function getState(): ActorState
{
return $this->state;
}
/**
* @param ActorState $state
*/
public function setState(ActorState $state): void
{
$this->state = $state;
}
/**
* @return float
*/
public function getRunTime(): float
{
return microtime(true) - $this->startTime;
}
/**
* @param string $uniqueId
*/
private function __construct(readonly public string $uniqueId)
{
$this->channel = new Channel(99);
$this->startTime = microtime(true);
}
/**
* @return void
*/
public function init(): void
{
}
/**
* @return bool
*/
public function isShutdown(): bool
{
return $this->isShutdown;
}
/**
* @param $id
* @return static
*/
public static function newActor($id): static
{
$actor = new static($id);
$actor->listen();
return $actor;
}
/**
* @return void
*/
private function listen(): void
{
Coroutine::create(function (Actor $actor) {
$actor->coroutineId = Coroutine::getCid();
$this->run();
}, $this);
}
/**
* @return string
*/
public function getName(): string
{
return $this->uniqueId;
}
/**
* @param mixed $response
* @return bool
*/
public function write(mixed $response): bool
{
return $this->channel->push($response);
}
/**
* @return void
*/
public function shutdown(): void
{
$this->isShutdown = true;
Coroutine::cancel($this->coroutineId);
if ($this->messageId > -1) {
Coroutine::cancel($this->messageId);
}
$this->channel->close();
}
/**
* @return void
*/
public function onUpdate(): void
{
}
/**
* @return void
* @throws Exception
*/
public function run(): void
{
if ($this->refreshInterval < 1) {
throw new Exception('Refresh interval must be greater than 1');
}
$this->setState(ActorState::BUSY);
$this->init();
$this->messageId = Coroutine::create(fn() => $this->loop());
$this->interval();
$this->setState(ActorState::IDLE);
}
/**
* @return void
*/
private function interval(): void
{
if ($this->isShutdown()) {
return;
}
try {
$this->onUpdate();
} catch (\Throwable $exception) {
error($exception);
}
Coroutine::sleep($this->refreshInterval / 1000);
$this->interval();
}
/**
* @return bool
*/
private function loop(): bool
{
if ($this->messageId == -1) {
$this->messageId = Coroutine::getCid();
}
if ($this->channel->errCode == SWOOLE_CHANNEL_CLOSED) {
$this->channel = new Channel(99);
}
$message = $this->channel->pop();
$this->process($message);
if ($this->isShutdown()) {
return true;
}
return $this->loop();
}
}