This commit is contained in:
2021-11-03 17:39:08 +08:00
parent 98cecce76a
commit 6a1a0503e2
4 changed files with 70 additions and 82 deletions
+55 -38
View File
@@ -4,7 +4,6 @@ namespace Server\Abstracts;
use JetBrains\PhpStorm\Pure;
use Kiri\Kiri;
use Server\SInterface\OnProcessInterface;
use Swoole\Coroutine;
use Swoole\Process;
@@ -15,13 +14,63 @@ use Swoole\Process;
abstract class BaseProcess implements OnProcessInterface
{
/** @var bool */
protected bool $enableSwooleCoroutine = true;
protected bool $isStop = false;
protected mixed $redirect_stdin_and_stdout = null;
protected int $pipe_type = SOCK_DGRAM;
protected bool $enable_coroutine = true;
protected string $name = 'swoole process.';
/**
* @return string
*/
public function getName(): string
{
return $this->name;
}
/**
* @return bool
*/
public function isStop(): bool
{
return $this->isStop;
}
/**
* @return mixed
*/
public function getRedirectStdinAndStdout(): mixed
{
return $this->redirect_stdin_and_stdout;
}
/**
* @return int
*/
public function getPipeType(): int
{
return $this->pipe_type;
}
/**
* @return bool
*/
public function isEnableCoroutine(): bool
{
return $this->enable_coroutine;
}
/**
*
*/
@@ -40,43 +89,11 @@ abstract class BaseProcess implements OnProcessInterface
}
/**
* @param Process $process
*/
public function signListen(Process $process): void
{
// if (Coroutine::getCid() === -1) {
// Process::signal(SIGTERM | SIGKILL, function ($signo) use ($process) {
// if ($signo) {
// $lists = Kiri::app()->getProcess();
// foreach ($lists as $process) {
// $process->exit(0);
// }
// }
// });
// } else {
// Coroutine::create(function () use ($process) {
// /** @var Coroutine\Socket $message */
// $message = $process->exportSocket();
// if ($message->recv() == 0x03455343213212) {
// $this->waiteExit($process);
// }
// });
// Coroutine::create(function () use ($process) {
// $data = Coroutine::waitSignal(SIGTERM | SIGKILL, -1);
// if ($data) {
// $lists = Kiri::app()->getProcess();
// foreach ($lists as $name => $process) {
// foreach ($process as $item) {
// /** @var Coroutine\Socket $export */
// $export = $item->exportSocket();
// $export->send(0x03455343213212);
// }
// }
// }
// });
// }
}
@@ -116,7 +133,7 @@ abstract class BaseProcess implements OnProcessInterface
*/
private function sleep(): void
{
if ($this->enableSwooleCoroutine) {
if ($this->enable_coroutine) {
Coroutine::sleep(0.1);
} else {
usleep(100);
-1
View File
@@ -18,7 +18,6 @@ class OnTaskerStart extends WorkerStart implements EventDispatcherInterface
/**
* @throws ConfigException
* @throws ReflectionException
*/
public function dispatch(object $event)
{
+1 -1
View File
@@ -60,7 +60,7 @@ class WorkerStart
* @param $event
* @param $isWorker
* @param $time
* @throws \Kiri\Exception\ConfigException
* @throws ConfigException
*/
protected function mixed($event, $isWorker, $time)
{
+14 -42
View File
@@ -10,14 +10,13 @@ use Kiri\Di\ContainerInterface;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use ReflectionException;
use Server\Abstracts\BaseProcess;
use Server\Handler\OnPipeMessage;
use Server\Handler\OnServer;
use Server\Handler\OnServerManager;
use Server\Handler\OnServerReload;
use Server\Handler\OnServerWorker;
use Server\Handler\OnServerTask;
use Server\Handler\OnServerWorker;
use Server\SInterface\OnCloseInterface;
use Server\SInterface\OnConnectInterface;
use Server\SInterface\OnDisconnectInterface;
@@ -155,56 +154,29 @@ class ServerManager
/**
* @param string|OnProcessInterface $customProcess
* @param null $redirect_stdin_and_stdout
* @param int|null $pipe_type
* @param bool $enable_coroutine
* @param string|OnProcessInterface|BaseProcess $customProcess
* @throws Exception
*/
public function addProcess(string|OnProcessInterface $customProcess, $redirect_stdin_and_stdout = null, ?int $pipe_type = SOCK_DGRAM, bool $enable_coroutine = true)
public function addProcess(string|OnProcessInterface|BaseProcess $customProcess)
{
$process = $this->initProcess($customProcess, $redirect_stdin_and_stdout, $pipe_type, $enable_coroutine);
$this->server->addProcess($process);
if ($customProcess instanceof OnProcessInterface) {
Kiri::app()->addProcess($customProcess::class, $process);
} else {
Kiri::app()->addProcess($customProcess, $process);
if (is_string($customProcess)) {
$customProcess = Kiri::getDi()->get($customProcess);
}
}
/**
* @param $customProcess
* @param $redirect_stdin_and_stdout
* @param $pipe_type
* @param $enable_coroutine
* @return Process
*/
private function initProcess($customProcess, $redirect_stdin_and_stdout, $pipe_type, $enable_coroutine): Process
{
$server = $this->server;
return new Process(function (Process $soloProcess) use ($customProcess, $server) {
$time = microtime(true);
if (is_string($customProcess)) {
$customProcess = Kiri::createObject($customProcess, [$server]);
}
$name = $customProcess->getProcessName($soloProcess);
$process = new Process(function (Process $soloProcess) use ($customProcess) {
$system = sprintf('%s.process[%d]', Config::get('id', 'system-service'), $soloProcess->pid);
if (Kiri::getPlatform()->isLinux()) {
$soloProcess->name($system . '.' . $name . ' start.');
$soloProcess->name($system . '.' . $customProcess->getName() . ' start.');
}
$name = Config::get('id', 'system-service');
echo sprintf("\033[36m[" . date('Y-m-d H:i:s') . "]\033[0m [%s]Builder %s[%d].%d use time %s.", $name, 'Process ' . $name,
$server->master_pid, $soloProcess->pid, round(microtime(true) - $time, 6) . 's') . PHP_EOL;
echo $system . '.' . $customProcess->getName() . ' start.';
$customProcess->signListen($soloProcess);
$customProcess->onHandler($soloProcess);
},
$redirect_stdin_and_stdout,
$pipe_type,
$enable_coroutine
$customProcess->getRedirectStdinAndStdout(),
$customProcess->getPipeType(),
$customProcess->isEnableCoroutine()
);
$this->container->setBindings($customProcess::class, $process);
$this->server->addProcess($process);
}