Files
kiri-core/kiri-actor/ActorProcess.php
T
2022-10-11 18:28:33 +08:00

83 lines
1.6 KiB
PHP

<?php
namespace Kiri\Actor;
use Exception;
use Kiri\Di\ContainerInterface;
use Kiri\Server\Abstracts\BaseProcess;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use Swoole\Process;
class ActorProcess extends BaseProcess
{
/**
* @var bool
*/
protected bool $enable_coroutine = true;
/**
* @var string
*/
public string $name = 'actor-process';
/**
* @param ContainerInterface $container
*/
public function __construct(public ContainerInterface $container)
{
}
/**
* @param Process $process
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws Exception
*/
public function process(Process $process): void
{
// TODO: Implement process() method.
$actorManager = $this->container->get(ActorManager::class);
while (!$this->isStop()) {
$read = json_decode($process->read(), true);
if (is_null($read) || !isset($read['category'])) {
continue;
}
$message = new ActorMessage($read['userId'], $read['event'], $read['body']);
switch ($read['category']) {
case ActorState::MESSAGE:
$actorManager->write($read['name'], $message);
break;
case ActorState::CREATE:
/** @var ActorInterface $actor */
$actor = $this->container->create($read['class'], $read['constrict'], $read['config']);
$actorManager->addActor($actor);
break;
case ActorState::SHUTDOWN:
$actorManager->closeActor($read['name']);
break;
}
}
}
/**
* @return $this
*/
public function onSigterm(): static
{
pcntl_signal(SIGTERM, function () {
$this->onProcessStop();
});
return $this;
}
}