Revert "改名"

This reverts commit fdf58326
This commit is contained in:
2022-01-12 11:20:33 +08:00
parent b38704374a
commit 33a01eda28
5 changed files with 198 additions and 89 deletions
+9 -2
View File
@@ -48,9 +48,9 @@ abstract class BaseProcess implements OnProcessInterface
/**
* @return bool
*/
*/
public function getRedirectStdinAndStdout(): bool
{
{
return $this->redirect_stdin_and_stdout;
}
@@ -80,6 +80,9 @@ abstract class BaseProcess implements OnProcessInterface
}
abstract public function onBroadcast($message);
/**
*
*/
@@ -105,6 +108,10 @@ abstract class BaseProcess implements OnProcessInterface
protected function onShutdown($data): void
{
$this->isStop = true;
$value = Context::getContext('waite:process:message');
if (Coroutine::exists($value)) {
Coroutine::cancel($value);
}
}
+29
View File
@@ -0,0 +1,29 @@
<?php
namespace Kiri\Server;
use Kiri\Kiri;
class Broadcast
{
/**
* @param $message
* @return void
*/
public function broadcast($message)
{
$di = Kiri::getDi();
$di->get(ProcessManager::class)->push($message);
$server = $di->get(SwooleServerInterface::class);
$total = $server->setting['worker_num'] + $server->setting['task_worker_num'];
for ($i = 0; $i < $total; $i++) {
$server->sendMessage($message, $i);
}
}
}
+101
View File
@@ -0,0 +1,101 @@
<?php
namespace Kiri\Server;
use Kiri\Abstracts\Config;
use Kiri\Context;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use Kiri\Server\Abstracts\BaseProcess;
use Kiri\Server\Contract\OnProcessInterface;
use Swoole\Coroutine;
use Swoole\Process;
class ProcessManager
{
/** @var array<string, Process> */
private array $_process = [];
/**
* @param string|OnProcessInterface|BaseProcess $customProcess
* @return void
* @throws ConfigException
*/
public function add(string|OnProcessInterface|BaseProcess $customProcess)
{
$server = Kiri::getDi()->get(SwooleServerInterface::class);
if (is_string($customProcess)) {
$customProcess = Kiri::getDi()->get($customProcess);
}
$system = sprintf('[%s].process', Config::get('id', 'system-service'));
$server->logger->debug($system . ' ' . $customProcess->getName() . ' start.');
$server->addProcess($process = $this->parse($customProcess, $system));
$this->_process[$customProcess->getName()] = $process;
}
/**
* @param $customProcess
* @param $system
* @return Process
*/
private function parse($customProcess, $system): Process
{
return new Process(function (Process $process) use ($customProcess, $system) {
if (Kiri::getPlatform()->isLinux()) {
$process->name($system . '(' . $customProcess->getName() . ')');
}
$channel = Coroutine::create(function () use ($process, $customProcess) {
while (!$customProcess->isStop()) {
$message = $process->read();
if (!empty($message)) {
$customProcess->onBroadcast($message);
}
}
});
Context::setContext('waite:process:message', $channel);
$customProcess->onSigterm()->process($process);
},
$customProcess->getRedirectStdinAndStdout(),
$customProcess->getPipeType(),
$customProcess->isEnableCoroutine()
);
}
/**
* @param array $processes
* @return void
* @throws ConfigException
*/
public function batch(array $processes)
{
foreach ($processes as $process) {
$this->add($process);
}
}
/**
* @param string $message
* @param string $name
* @return void
*/
public function push(string $message, string $name = '')
{
$processes = $this->_process;
if (!empty($this->_process[$name])) {
$processes = [$this->_process[$name]];
}
foreach ($processes as $process) {
$process->write($message);
}
}
}
+39 -41
View File
@@ -4,16 +4,16 @@
namespace Kiri\Server;
use Exception;
use Kiri\Message\Handler\Abstracts\HttpService;
use Kiri\Message\Handler\Router;
use Kiri\Abstracts\Config;
use Kiri\Annotation\Inject;
use Kiri\Events\EventDispatch;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use Kiri\Annotation\Inject;
use Kiri\Message\Handler\Abstracts\HttpService;
use Kiri\Message\Handler\Router;
use Kiri\Server\Events\OnShutdown;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use Kiri\Server\Events\OnShutdown;
use Swoole\Coroutine;
@@ -76,53 +76,51 @@ class Server extends HttpService
}
$processes = array_merge($this->process, Config::get('processes', []));
foreach ($processes as $process) {
$this->manager()->addProcess($process);
}
$this->container->get(ProcessManager::class)->batch($processes);
return $this->manager()->getServer()->start();
}
/**
* @throws ConfigException
*/
private function configure_set()
{
$enable_coroutine = Config::get('servers.settings.enable_coroutine', false);
Config::set('servers.settings.enable_coroutine', true);
if ($enable_coroutine != true) {
return;
}
Coroutine::set([
'hook_flags' => SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_BLOCKING_FUNCTION,
'enable_deadlock_check' => FALSE,
'exit_condition' => function () {
return Coroutine::stats()['coroutine_num'] === 0;
}
]);
}
/**
* @throws ConfigException
*/
private function configure_set()
{
$enable_coroutine = Config::get('servers.settings.enable_coroutine', false);
Config::set('servers.settings.enable_coroutine', true);
if ($enable_coroutine != true) {
return;
}
Coroutine::set([
'hook_flags' => SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_BLOCKING_FUNCTION,
'enable_deadlock_check' => FALSE,
'exit_condition' => function () {
return Coroutine::stats()['coroutine_num'] === 0;
}
]);
}
/**
* @return void
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws \ReflectionException
* @throws \Exception
*/
public function runtime_start(): void
{
$this->configure_set();
/**
* @return void
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws \ReflectionException
* @throws \Exception
*/
public function runtime_start(): void
{
$this->configure_set();
$this->container->get(Router::class)->read_files();
$this->container->get(Router::class)->read_files();
$this->start();
}
$this->start();
}
/**
/**
* @return void
* @throws ConfigException
* @throws ContainerExceptionInterface
+20 -46
View File
@@ -5,14 +5,12 @@ namespace Kiri\Server;
use Exception;
use Kiri\Abstracts\Component;
use Kiri\Abstracts\Config;
use Kiri\Annotation\Inject;
use Kiri\Context;
use Kiri\Error\Logger;
use Kiri\Events\EventDispatch;
use Kiri\Exception\ConfigException;
use Kiri\Kiri;
use Kiri\Annotation\Inject;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use ReflectionException;
use Kiri\Server\Abstracts\BaseProcess;
use Kiri\Server\Contract\OnCloseInterface;
use Kiri\Server\Contract\OnConnectInterface;
@@ -29,15 +27,15 @@ use Kiri\Server\Handler\OnServerManager;
use Kiri\Server\Handler\OnServerReload;
use Kiri\Server\Handler\OnServerWorker;
use Kiri\Server\Tasker\OnServerTask;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use ReflectionException;
use Swoole\Coroutine;
use Swoole\Http\Server as HServer;
use Swoole\Process;
use Swoole\Server;
use Swoole\Server\Port;
use Swoole\WebSocket\Server as WServer;
use Symfony\Component\Console\Helper\Table;
use Symfony\Component\Console\Helper\TableSeparator;
use Symfony\Component\Console\Output\ConsoleOutput;
use Symfony\Component\Console\Output\OutputInterface;
/**
@@ -151,30 +149,6 @@ class ServerManager extends Component
}
/**
* @param string|OnProcessInterface|BaseProcess $customProcess
* @throws Exception
*/
public function addProcess(string|OnProcessInterface|BaseProcess $customProcess)
{
if (is_string($customProcess)) {
$customProcess = Kiri::getDi()->get($customProcess);
}
$system = sprintf('[%s].process', Config::get('id', 'system-service'));
$this->logger->debug($system . ' ' . $customProcess->getName() . ' start.');
$this->server->addProcess(new Process(function (Process $process) use ($customProcess, $system) {
if (Kiri::getPlatform()->isLinux()) {
$process->name($system . '(' . $customProcess->getName() . ')');
}
$customProcess->onSigterm()->process($process);
},
$customProcess->getRedirectStdinAndStdout(),
$customProcess->getPipeType(),
$customProcess->isEnableCoroutine()
));
}
/**
* @return array<string,Process>
*/
@@ -282,20 +256,20 @@ class ServerManager extends Component
*
*
*
$data = new Table($this->container->get(OutputInterface::class));
$data->setHeaders(['key', 'value']);
$array = [];
foreach ($this->server->setting as $key => $value) {
$array[] = [$key, $value];
$array[] = new TableSeparator();
}
array_pop($array);
$data->setStyle('box-double');
$data->setRows($array);
$data->render();
* $data = new Table($this->container->get(OutputInterface::class));
* $data->setHeaders(['key', 'value']);
*
* $array = [];
* foreach ($this->server->setting as $key => $value) {
* $array[] = [$key, $value];
* $array[] = new TableSeparator();
* }
*
* array_pop($array);
*
* $data->setStyle('box-double');
* $data->setRows($array);
* $data->render();
*/
private function createBaseServer(string $type, string $host, int $port, int $mode, array $settings = [])
{