This commit is contained in:
xl
2024-09-12 09:06:02 +08:00
parent 97a3335217
commit fbd3799081
6 changed files with 0 additions and 506 deletions
-238
View File
@@ -1,238 +0,0 @@
<?php
declare(strict_types=1);
namespace Kiri\Actor;
use Exception;
use JsonSerializable;
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
/**
* @Actor
*/
abstract class Actor implements ActorInterface, JsonSerializable
{
/**
* @var Channel
*/
private Channel $channel;
/**
* @var bool
*/
private bool $isShutdown = false;
/**
* @var int
*/
private int $messageId = -1;
/**
* @var int
*/
private int $coroutineId = -1;
/**
* @var ActorState
*/
private ActorState $state;
/**
* @var float
*/
private float $startTime = 0;
/**
* @var int
*/
private int $refreshInterval = 0;
/**
* @return ActorState
*/
public function getState(): ActorState
{
return $this->state;
}
/**
* @param ActorState $state
*/
public function setState(ActorState $state): void
{
$this->state = $state;
}
/**
* @return float
*/
public function getRunTime(): float
{
return microtime(true) - $this->startTime;
}
/**
* @param string $uniqueId
*/
private function __construct(readonly public string $uniqueId)
{
$this->channel = new Channel(99);
$this->startTime = microtime(true);
}
/**
* @return void
*/
public function init(): void
{
}
/**
* @return bool
*/
public function isShutdown(): bool
{
return $this->isShutdown;
}
/**
* @param $id
* @return static
*/
public static function newActor($id): static
{
$actor = new static($id);
$actor->listen();
return $actor;
}
/**
* @return void
*/
private function listen(): void
{
Coroutine::create(function (Actor $actor) {
$actor->coroutineId = Coroutine::getCid();
$this->run();
}, $this);
}
/**
* @return string
*/
public function getName(): string
{
return $this->uniqueId;
}
/**
* @param mixed $response
* @return bool
*/
public function write(mixed $response): bool
{
return $this->channel->push($response);
}
/**
* @return void
*/
public function shutdown(): void
{
$this->isShutdown = true;
Coroutine::cancel($this->coroutineId);
if ($this->messageId > -1) {
Coroutine::cancel($this->messageId);
}
$this->channel->close();
}
/**
* @return void
*/
public function onUpdate(): void
{
}
/**
* @return void
* @throws Exception
*/
public function run(): void
{
if ($this->refreshInterval < 1) {
throw new Exception('Refresh interval must be greater than 1');
}
$this->setState(ActorState::BUSY);
$this->init();
$this->messageId = Coroutine::create(fn() => $this->loop());
$this->interval();
$this->setState(ActorState::IDLE);
}
/**
* @return void
*/
private function interval(): void
{
if ($this->isShutdown()) {
return;
}
try {
$this->onUpdate();
} catch (\Throwable $exception) {
error($exception);
}
Coroutine::sleep($this->refreshInterval / 1000);
$this->interval();
}
/**
* @return bool
*/
private function loop(): bool
{
if ($this->messageId == -1) {
$this->messageId = Coroutine::getCid();
}
if ($this->channel->errCode == SWOOLE_CHANNEL_CLOSED) {
$this->channel = new Channel(99);
}
$message = $this->channel->pop();
$this->process($message);
if ($this->isShutdown()) {
return true;
}
return $this->loop();
}
}
-15
View File
@@ -1,15 +0,0 @@
<?php
declare(strict_types=1);
namespace Kiri\Actor;
interface ActorInterface
{
/**
* @param ActorMessage $message
* @return void
*/
public function process(ActorMessage $message): void;
}
-110
View File
@@ -1,110 +0,0 @@
<?php
declare(strict_types=1);
namespace Kiri\Actor;
use Swoole\Coroutine;
class ActorManager
{
/** @var array<string, ActorInterface> */
private array $nodes = [];
/**
* @param Actor $actor
* @return void
*/
public function addActor(ActorInterface $actor): void
{
$this->nodes[$actor->uniqueId] = $actor;
Coroutine::create(function (Actor $actor) {
$actor->run();
}, $actor);
}
/**
* @param $name
* @return void
*/
public function closeActor($name): void
{
$node = $this->nodes[$name] ?? null;
if (is_null($node)) {
return;
}
foreach ($node as $actor) {
$actor->shutdown();
}
}
/**
* @param $name
* @param $message
* @return bool
*/
public function write($name, $message): bool
{
$actor = $this->nodes[$name] ?? null;
if (is_null($actor)) {
return false;
}
return $actor->write($message);
}
/**
* @param $name
* @return array
*/
public function lists($name): array
{
$array = [];
foreach ($this->nodes[$name] as $actor) {
$array[] = [
'id' => $actor->getName(),
'state' => $actor->getState()->name,
'runTime' => $actor->getRunTime()
];
}
return $array;
}
/**
* @param string $uniqueId
* @return bool
*/
public function hasActor(string $uniqueId): bool
{
return isset($this->nodes[$uniqueId]) && $this->nodes[$uniqueId] instanceof ActorInterface;
}
/**
* @param array|null $data
* @return void
*/
public static function exec(?array $data): void
{
if (is_null($data)) {
return;
}
}
/**
* @return void
*/
public function clean(): void
{
foreach ($this->nodes as $actor) {
$actor->shutdown();
}
$this->nodes = [];
}
}
-78
View File
@@ -1,78 +0,0 @@
<?php
declare(strict_types=1);
namespace Kiri\Actor;
use JetBrains\PhpStorm\ArrayShape;
class ActorMessage implements \JsonSerializable
{
/**
* @var int
*/
private int $userId;
/**
* @var string
*/
private string $event;
/**
* @var array
*/
private array $body;
/**
* @param int $userId
* @param string $event
* @param array $body
*/
public function __construct(int $userId, string $event, array $body)
{
$this->userId = $userId;
$this->event = $event;
$this->body = $body;
}
/**
* @return int
*/
public function getUserId(): int
{
return $this->userId;
}
/**
* @return string
*/
public function getEvent(): string
{
return $this->event;
}
/**
* @return array
*/
public function getBody(): array
{
return $this->body;
}
/**
* @return array
*/
#[ArrayShape(['userId' => "int", 'event' => "string", 'body' => "array"])]
public function jsonSerialize(): array
{
return [
'userId' => $this->userId,
'event' => $this->event,
'body' => $this->body
];
}
}
-47
View File
@@ -1,47 +0,0 @@
<?php
namespace Kiri\Actor;
use Kiri\Server\Processes\AbstractProcess;
use Swoole\Coroutine;
use Swoole\Process;
class ActorProcess extends AbstractProcess
{
/**
* @var bool
*/
protected bool $enable_coroutine = true;
/**
* @return string
*/
public function getName(): string
{
// TODO: Change the autogenerated stub
return 'Actor Manager';
}
public function process(?Process $process): void
{
// TODO: Implement process() method.
while ($this->isStop() === false) {
$read = $process->read();
ActorManager::exec(json_decode($read, true));
Coroutine::sleep(1000 / 120);
}
}
/**
* @return void
*/
public function onSigterm(): void
{
}
}
-18
View File
@@ -1,18 +0,0 @@
<?php
namespace Kiri\Actor;
enum ActorState
{
case IDLE;
case BUSY;
/**
*
*/
case CREATE;
case MESSAGE;
case SHUTDOWN;
}