This commit is contained in:
2022-10-11 14:33:33 +08:00
parent bc2275300d
commit 3dd5311430
6 changed files with 344 additions and 0 deletions
+1
View File
@@ -33,6 +33,7 @@
"autoload": {
"psr-4": {
"Kiri\\": "kiri-engine/",
"Kiri\\Actor\\": "kiri-actor/",
"Kiri\\Annotation\\": "kiri-annotation/",
"Kiri\\Task\\": "kiri-task/"
},
+136
View File
@@ -0,0 +1,136 @@
<?php
namespace Kiri\Actor;
use Swoole\Coroutine\Channel;
abstract class Actor implements ActorInterface
{
/**
* @var Channel
*/
private Channel $channel;
/**
* @var bool
*/
private bool $isShutdown = false;
/**
* @var ActorState
*/
private ActorState $state;
/**
* @var float
*/
private float $startTime = 0;
/**
* @return ActorState
*/
public function getState(): ActorState
{
return $this->state;
}
/**
* @param ActorState $state
*/
public function setState(ActorState $state): void
{
$this->state = $state;
}
/**
* @return float
*/
public function getRunTime(): float
{
return microtime(true) - $this->startTime;
}
/**
* @param string $uniqueId
*/
private function __construct(readonly public string $uniqueId)
{
$this->channel = new Channel(1000);
$this->startTime = microtime(true);
}
/**
* @param $id
* @return static
*/
public static function newActor($id): static
{
return new static($id);
}
/**
* @return string
*/
public function getName(): string
{
return $this->uniqueId;
}
/**
* @param mixed $response
* @return bool
*/
public function write(mixed $response): bool
{
return $this->channel->push($response);
}
/**
* @return void
*/
public function shutdown(): void
{
$this->isShutdown = true;
$this->channel->close();
}
/**
* @return void
*/
public function run(): void
{
if ($this->channel->errCode == SWOOLE_CHANNEL_CLOSED) {
if ($this->isShutdown) {
return;
}
$this->channel = new Channel(1000);
}
$this->setState(ActorState::BUSY);
while (!$this->isShutdown) {
$message = $this->channel->pop();
$this->process($message);
unset($message);
}
$this->setState(ActorState::IDLE);
if ($this->isShutdown) {
$this->channel->close();
return;
}
$this->run();
}
}
+14
View File
@@ -0,0 +1,14 @@
<?php
namespace Kiri\Actor;
interface ActorInterface
{
/**
* @param mixed $message
* @return void
*/
public function process(mixed $message): void;
}
+97
View File
@@ -0,0 +1,97 @@
<?php
namespace Kiri\Actor;
use Swoole\Coroutine;
class ActorManager
{
/** @var array<string, ActorInterface> */
private array $nodes = [];
/**
* @param Actor $actor
* @return void
*/
public function addActor(ActorInterface $actor): void
{
$this->nodes[$actor->uniqueId] = $actor;
Coroutine::create(function (Actor $actor) {
$actor->run();
}, $actor);
}
/**
* @param $name
* @return void
*/
public function closeActor($name): void
{
$node = $this->nodes[$name] ?? null;
if (is_null($node)) {
return;
}
foreach ($node as $actor) {
$actor->shutdown();
}
}
/**
* @param $name
* @param $message
* @return bool
*/
public function write($name, $message): bool
{
$actor = $this->nodes[$name] ?? null;
if (is_null($actor)) {
return false;
}
return $actor->write($message);
}
/**
* @param $name
* @return array
*/
public function lists($name): array
{
$array = [];
foreach ($this->nodes[$name] as $actor) {
$array[] = [
'id' => $actor->getName(),
'state' => $actor->getState()->name,
'runTime' => $actor->getRunTime()
];
}
return $array;
}
/**
* @param string $uniqueId
* @return bool
*/
public function hasActor(string $uniqueId): bool
{
return isset($this->nodes[$uniqueId]) && $this->nodes[$uniqueId] instanceof ActorInterface;
}
/**
* @return void
*/
public function clean(): void
{
foreach ($this->nodes as $actor) {
$actor->shutdown();
}
$this->nodes = [];
}
}
+81
View File
@@ -0,0 +1,81 @@
<?php
namespace Kiri\Actor;
use Exception;
use Kiri\Di\ContainerInterface;
use Kiri\Server\Abstracts\BaseProcess;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use Swoole\Process;
class ActorProcess extends BaseProcess
{
/**
* @var bool
*/
protected bool $enable_coroutine = true;
/**
* @var string
*/
public string $name = 'actor-process';
/**
* @param ContainerInterface $container
*/
public function __construct(public ContainerInterface $container)
{
}
/**
* @param Process $process
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws Exception
*/
public function process(Process $process): void
{
// TODO: Implement process() method.
$actorManager = $this->container->get(ActorManager::class);
while (!$this->isStop()) {
$read = json_decode($process->read(), true);
if (is_null($read) || !isset($read['event'])) {
continue;
}
switch ($read['event']) {
case ActorState::MESSAGE:
$actorManager->write($read['name'], $read['message']);
break;
case ActorState::CREATE:
/** @var ActorInterface $actor */
$actor = $this->container->create($read['class']);
$actorManager->addActor($actor);
break;
case ActorState::SHUTDOWN:
$actorManager->closeActor($read['name']);
break;
}
}
}
/**
* @return $this
*/
public function onSigterm(): static
{
pcntl_signal(SIGTERM, function () {
$this->onProcessStop();
});
return $this;
}
}
+15
View File
@@ -0,0 +1,15 @@
<?php
namespace Kiri\Actor;
enum ActorState
{
case IDLE;
case BUSY;
case CREATE;
case MESSAGE;
case SHUTDOWN;
}